Skip to content

ding.framework.middleware.functional.evaluator

ding.framework.middleware.functional.evaluator

IMetric

Bases: ABC

gt(metric1, metric2) abstractmethod

Overview

Whether metric1 is greater than metric2 (>=)

.. note:: If metric2 is None, return True

VectorEvalMonitor

Bases: object

Overview

In some cases, different environment in evaluator may collect different length episode. For example, suppose we want to collect 12 episodes in evaluator but only have 5 environments, if we didn’t do any thing, it is likely that we will get more short episodes than long episodes. As a result, our average reward will have a bias and may not be accurate. we use VectorEvalMonitor to solve the problem.

Interfaces: init, is_finished, update_info, update_reward, get_episode_return, get_latest_reward, get_current_episode, get_episode_info, update_video, get_episode_video

__init__(env_num, n_episode)

Overview

Init method. According to the number of episodes and the number of environments, determine how many episodes need to be opened for each environment, and initialize the reward, info and other information

Arguments: - env_num (:obj:int): the number of episodes need to be open - n_episode (:obj:int): the number of environments

is_finished()

Overview

Determine whether the evaluator has completed the work.

Return: - result: (:obj:bool): whether the evaluator has completed the work

update_info(env_id, info)

Overview

Update the information of the environment indicated by env_id.

Arguments: - env_id: (:obj:int): the id of the environment we need to update information - info: (:obj:Any): the information we need to update

update_reward(env_id, reward)

Overview

Update the reward indicated by env_id.

Arguments: - env_id: (:obj:int): the id of the environment we need to update the reward - reward: (:obj:Any): the reward we need to update

get_episode_return()

Overview

Sum up all reward and get the total return of one episode.

get_latest_reward(env_id)

Overview

Get the latest reward of a certain environment.

Arguments: - env_id: (:obj:int): the id of the environment we need to get reward.

get_current_episode()

Overview

Get the current episode. We can know which episode our evaluator is executing now.

get_episode_info()

Overview

Get all episode information, such as total return of one episode.

get_episode_video()

Overview

Convert list of videos into [N, T, C, H, W] tensor, containing worst, median, best evaluation trajectories for video logging.

interaction_evaluator(cfg, policy, env, render=False, **kwargs)

Overview

The middleware that executes the evaluation.

Arguments: - cfg (:obj:EasyDict): Config. - policy (:obj:Policy): The policy to be evaluated. - env (:obj:BaseEnvManager): The env for the evaluation. - render (:obj:bool): Whether to render env images and policy logits. - kwargs: (:obj:Any): Other arguments for specific evaluation.

interaction_evaluator_ttorch(seed, policy, env, n_evaluator_episode=None, stop_value=np.inf, eval_freq=1000, render=False)

Overview

The middleware that executes the evaluation with ttorch data.

Arguments: - policy (:obj:Policy): The policy to be evaluated. - env (:obj:BaseEnvManager): The env for the evaluation. - render (:obj:bool): Whether to render env images and policy logits.

Full Source Code

../ding/framework/middleware/functional/evaluator.py

1from typing import Callable, Any, List, Union, Optional 2from abc import ABC, abstractmethod 3from collections import deque 4from ditk import logging 5import numpy as np 6import torch 7import treetensor.numpy as tnp 8import treetensor.torch as ttorch 9from easydict import EasyDict 10from ding.envs import BaseEnvManager 11from ding.framework.context import Context, OfflineRLContext, OnlineRLContext 12from ding.policy import Policy 13from ding.data import Dataset, DataLoader 14from ding.framework import task 15from ding.torch_utils import to_ndarray, get_shape0 16from ding.utils import lists_to_dicts 17 18 19class IMetric(ABC): 20 21 @abstractmethod 22 def eval(self, inputs: Any, label: Any) -> dict: 23 raise NotImplementedError 24 25 @abstractmethod 26 def reduce_mean(self, inputs: List[Any]) -> Any: 27 raise NotImplementedError 28 29 @abstractmethod 30 def gt(self, metric1: Any, metric2: Any) -> bool: 31 """ 32 Overview: 33 Whether metric1 is greater than metric2 (>=) 34 35 .. note:: 36 If metric2 is None, return True 37 """ 38 raise NotImplementedError 39 40 41class VectorEvalMonitor(object): 42 """ 43 Overview: 44 In some cases, different environment in evaluator may collect different length episode. For example, \ 45 suppose we want to collect 12 episodes in evaluator but only have 5 environments, if we didn’t do \ 46 any thing, it is likely that we will get more short episodes than long episodes. As a result, \ 47 our average reward will have a bias and may not be accurate. we use VectorEvalMonitor to solve the problem. 48 Interfaces: 49 __init__, is_finished, update_info, update_reward, get_episode_return, get_latest_reward, get_current_episode,\ 50 get_episode_info, update_video, get_episode_video 51 """ 52 53 def __init__(self, env_num: int, n_episode: int) -> None: 54 """ 55 Overview: 56 Init method. According to the number of episodes and the number of environments, determine how many \ 57 episodes need to be opened for each environment, and initialize the reward, info and other \ 58 information 59 Arguments: 60 - env_num (:obj:`int`): the number of episodes need to be open 61 - n_episode (:obj:`int`): the number of environments 62 """ 63 assert n_episode >= env_num, "n_episode < env_num, please decrease the number of eval env" 64 self._env_num = env_num 65 self._n_episode = n_episode 66 each_env_episode = [n_episode // env_num for _ in range(env_num)] 67 for i in range(n_episode % env_num): 68 each_env_episode[i] += 1 69 self._reward = {env_id: deque(maxlen=maxlen) for env_id, maxlen in enumerate(each_env_episode)} 70 self._info = {env_id: deque(maxlen=maxlen) for env_id, maxlen in enumerate(each_env_episode)} 71 self._video = { 72 env_id: deque([[] for _ in range(maxlen)], maxlen=maxlen) 73 for env_id, maxlen in enumerate(each_env_episode) 74 } 75 self._output = { 76 env_id: deque([[] for _ in range(maxlen)], maxlen=maxlen) 77 for env_id, maxlen in enumerate(each_env_episode) 78 } 79 80 def is_finished(self) -> bool: 81 """ 82 Overview: 83 Determine whether the evaluator has completed the work. 84 Return: 85 - result: (:obj:`bool`): whether the evaluator has completed the work 86 """ 87 return all([len(v) == v.maxlen for v in self._reward.values()]) 88 89 def update_info(self, env_id: int, info: Any) -> None: 90 """ 91 Overview: 92 Update the information of the environment indicated by env_id. 93 Arguments: 94 - env_id: (:obj:`int`): the id of the environment we need to update information 95 - info: (:obj:`Any`): the information we need to update 96 """ 97 self._info[env_id].append(info) 98 99 def update_reward(self, env_id: Union[int, np.ndarray], reward: Any) -> None: 100 """ 101 Overview: 102 Update the reward indicated by env_id. 103 Arguments: 104 - env_id: (:obj:`int`): the id of the environment we need to update the reward 105 - reward: (:obj:`Any`): the reward we need to update 106 """ 107 if isinstance(reward, torch.Tensor): 108 reward = reward.item() 109 if isinstance(env_id, np.ndarray): 110 env_id = env_id.item() 111 self._reward[env_id].append(reward) 112 113 def get_episode_return(self) -> list: 114 """ 115 Overview: 116 Sum up all reward and get the total return of one episode. 117 """ 118 return sum([list(v) for v in self._reward.values()], []) # sum(iterable, start) 119 120 def get_latest_reward(self, env_id: int) -> int: 121 """ 122 Overview: 123 Get the latest reward of a certain environment. 124 Arguments: 125 - env_id: (:obj:`int`): the id of the environment we need to get reward. 126 """ 127 return self._reward[env_id][-1] 128 129 def get_current_episode(self) -> int: 130 """ 131 Overview: 132 Get the current episode. We can know which episode our evaluator is executing now. 133 """ 134 return sum([len(v) for v in self._reward.values()]) 135 136 def get_episode_info(self) -> dict: 137 """ 138 Overview: 139 Get all episode information, such as total return of one episode. 140 """ 141 if len(self._info[0]) == 0: 142 return None 143 else: 144 # sum among all envs 145 total_info = sum([list(v) for v in self._info.values()], []) 146 if isinstance(total_info[0], tnp.ndarray): 147 total_info = [t.json() for t in total_info] 148 total_info = lists_to_dicts(total_info) 149 new_dict = {} 150 for k in total_info.keys(): 151 try: 152 if np.isscalar(total_info[k][0].item()): 153 new_dict[k + '_mean'] = np.mean(total_info[k]) 154 except: # noqa 155 pass 156 return new_dict 157 158 def _select_idx(self): 159 reward = [t.item() for t in self.get_episode_return()] 160 sortarg = np.argsort(reward) 161 # worst, median(s), best 162 if len(sortarg) == 1: 163 idxs = [sortarg[0]] 164 elif len(sortarg) == 2: 165 idxs = [sortarg[0], sortarg[-1]] 166 elif len(sortarg) == 3: 167 idxs = [sortarg[0], sortarg[len(sortarg) // 2], sortarg[-1]] 168 else: 169 # TensorboardX pad the number of videos to even numbers with black frames, 170 # therefore providing even number of videos prevents black frames being rendered. 171 idxs = [sortarg[0], sortarg[len(sortarg) // 2 - 1], sortarg[len(sortarg) // 2], sortarg[-1]] 172 return idxs 173 174 def update_video(self, imgs): 175 for env_id, img in imgs.items(): 176 if len(self._reward[env_id]) == self._reward[env_id].maxlen: 177 continue 178 self._video[env_id][len(self._reward[env_id])].append(img) 179 180 def get_episode_video(self): 181 """ 182 Overview: 183 Convert list of videos into [N, T, C, H, W] tensor, containing 184 worst, median, best evaluation trajectories for video logging. 185 """ 186 videos = sum([list(v) for v in self._video.values()], []) 187 videos = [np.transpose(np.stack(video, 0), [0, 3, 1, 2]) for video in videos] 188 idxs = self._select_idx() 189 videos = [videos[idx] for idx in idxs] 190 # pad videos to the same length with last frames 191 max_length = max(video.shape[0] for video in videos) 192 for i in range(len(videos)): 193 if videos[i].shape[0] < max_length: 194 padding = np.tile([videos[i][-1]], (max_length - videos[i].shape[0], 1, 1, 1)) 195 videos[i] = np.concatenate([videos[i], padding], 0) 196 videos = np.stack(videos, 0) 197 assert len(videos.shape) == 5, 'Need [N, T, C, H, W] input tensor for video logging!' 198 return videos 199 200 def update_output(self, output): 201 for env_id, o in output.items(): 202 if len(self._reward[env_id]) == self._reward[env_id].maxlen: 203 continue 204 self._output[env_id][len(self._reward[env_id])].append(to_ndarray(o)) 205 206 def get_episode_output(self): 207 output = sum([list(v) for v in self._output.values()], []) 208 idxs = self._select_idx() 209 output = [output[idx] for idx in idxs] 210 return output 211 212 213def interaction_evaluator( 214 cfg: EasyDict, policy: Policy, env: BaseEnvManager, render: bool = False, **kwargs 215) -> Callable: 216 """ 217 Overview: 218 The middleware that executes the evaluation. 219 Arguments: 220 - cfg (:obj:`EasyDict`): Config. 221 - policy (:obj:`Policy`): The policy to be evaluated. 222 - env (:obj:`BaseEnvManager`): The env for the evaluation. 223 - render (:obj:`bool`): Whether to render env images and policy logits. 224 - kwargs: (:obj:`Any`): Other arguments for specific evaluation. 225 """ 226 if task.router.is_active and not task.has_role(task.role.EVALUATOR): 227 return task.void() 228 229 env.seed(cfg.seed, dynamic_seed=False) 230 231 def _evaluate(ctx: Union["OnlineRLContext", "OfflineRLContext"]): 232 """ 233 Overview: 234 - The evaluation will be executed if the task begins and enough train_iter passed \ 235 since last evaluation. 236 Input of ctx: 237 - last_eval_iter (:obj:`int`): Last evaluation iteration. 238 - train_iter (:obj:`int`): Current train iteration. 239 Output of ctx: 240 - eval_value (:obj:`float`): The average reward in the current evaluation. 241 """ 242 243 # evaluation will be executed if the task begins or enough train_iter after last evaluation 244 if ctx.last_eval_iter != -1 and \ 245 (ctx.train_iter - ctx.last_eval_iter < cfg.policy.eval.evaluator.eval_freq): 246 if ctx.train_iter != ctx.last_eval_iter: 247 return 248 if len(kwargs) > 0: 249 kwargs_str = '/'.join([f'{k}({v})' for k, v in kwargs.items()]) 250 else: 251 kwargs_str = '' 252 253 if env.closed: 254 env.launch() 255 else: 256 env.reset() 257 policy.reset() 258 eval_monitor = VectorEvalMonitor(env.env_num, cfg.env.n_evaluator_episode) 259 260 while not eval_monitor.is_finished(): 261 obs = ttorch.as_tensor(env.ready_obs).to(dtype=ttorch.float32) 262 obs = {i: obs[i] for i in range(get_shape0(obs))} # TBD 263 if len(kwargs) > 0: 264 inference_output = policy.forward(obs, **kwargs) 265 else: 266 inference_output = policy.forward(obs) 267 if render: 268 eval_monitor.update_video(env.ready_imgs) 269 eval_monitor.update_output(inference_output) 270 output = [v for v in inference_output.values()] 271 action = [to_ndarray(v['action']) for v in output] # TBD 272 timesteps = env.step(action) 273 for timestep in timesteps: 274 env_id = timestep.env_id.item() 275 if timestep.done: 276 policy.reset([env_id]) 277 reward = timestep.info.eval_episode_return 278 eval_monitor.update_reward(env_id, reward) 279 if 'episode_info' in timestep.info: 280 eval_monitor.update_info(env_id, timestep.info.episode_info) 281 episode_return = eval_monitor.get_episode_return() 282 episode_return_min = np.min(episode_return) 283 episode_return_max = np.max(episode_return) 284 episode_return_std = np.std(episode_return) 285 episode_return = np.mean(episode_return) 286 stop_flag = episode_return >= cfg.env.stop_value and ctx.train_iter > 0 287 if isinstance(ctx, OnlineRLContext): 288 logging.info( 289 'Evaluation: Train Iter({}) Env Step({}) Episode Return({:.3f}) {}'.format( 290 ctx.train_iter, ctx.env_step, episode_return, kwargs_str 291 ) 292 ) 293 elif isinstance(ctx, OfflineRLContext): 294 logging.info( 295 'Evaluation: Train Iter({}) Eval Return({:.3f}) {}'.format(ctx.train_iter, episode_return, kwargs_str) 296 ) 297 else: 298 raise TypeError("not supported ctx type: {}".format(type(ctx))) 299 ctx.last_eval_iter = ctx.train_iter 300 ctx.eval_value = episode_return 301 ctx.eval_value_min = episode_return_min 302 ctx.eval_value_max = episode_return_max 303 ctx.eval_value_std = episode_return_std 304 ctx.last_eval_value = ctx.eval_value 305 ctx.eval_output = {'episode_return': episode_return} 306 episode_info = eval_monitor.get_episode_info() 307 if episode_info is not None: 308 ctx.eval_output['episode_info'] = episode_info 309 if render: 310 ctx.eval_output['replay_video'] = eval_monitor.get_episode_video() 311 ctx.eval_output['output'] = eval_monitor.get_episode_output() 312 else: 313 ctx.eval_output['output'] = output # for compatibility 314 315 if len(kwargs) > 0: 316 ctx.info_for_logging.update( 317 { 318 f'{kwargs_str}/eval_episode_return': episode_return, 319 f'{kwargs_str}/eval_episode_return_min': episode_return_min, 320 f'{kwargs_str}/eval_episode_return_max': episode_return_max, 321 f'{kwargs_str}/eval_episode_return_std': episode_return_std, 322 } 323 ) 324 325 if stop_flag: 326 task.finish = True 327 328 return _evaluate 329 330 331def interaction_evaluator_ttorch( 332 seed: int, 333 policy: Policy, 334 env: BaseEnvManager, 335 n_evaluator_episode: Optional[int] = None, 336 stop_value: float = np.inf, 337 eval_freq: int = 1000, 338 render: bool = False, 339) -> Callable: 340 """ 341 Overview: 342 The middleware that executes the evaluation with ttorch data. 343 Arguments: 344 - policy (:obj:`Policy`): The policy to be evaluated. 345 - env (:obj:`BaseEnvManager`): The env for the evaluation. 346 - render (:obj:`bool`): Whether to render env images and policy logits. 347 """ 348 if task.router.is_active and not task.has_role(task.role.EVALUATOR): 349 return task.void() 350 351 env.seed(seed, dynamic_seed=False) 352 if n_evaluator_episode is None: 353 n_evaluator_episode = env.env_num 354 355 def _evaluate(ctx: "OnlineRLContext"): 356 """ 357 Overview: 358 - The evaluation will be executed if the task begins and enough train_iter passed \ 359 since last evaluation. 360 Input of ctx: 361 - last_eval_iter (:obj:`int`): Last evaluation iteration. 362 - train_iter (:obj:`int`): Current train iteration. 363 Output of ctx: 364 - eval_value (:obj:`float`): The average reward in the current evaluation. 365 """ 366 367 # evaluation will be executed if the task begins or enough train_iter after last evaluation 368 if ctx.last_eval_iter != -1 and (ctx.train_iter - ctx.last_eval_iter < eval_freq): 369 return 370 371 if env.closed: 372 env.launch() 373 else: 374 env.reset() 375 policy.reset() 376 device = policy._device 377 eval_monitor = VectorEvalMonitor(env.env_num, n_evaluator_episode) 378 379 while not eval_monitor.is_finished(): 380 obs = ttorch.as_tensor(env.ready_obs).to(dtype=ttorch.float32) 381 obs = obs.to(device) 382 inference_output = policy.eval(obs) 383 inference_output = inference_output.cpu() 384 if render: 385 eval_monitor.update_video(env.ready_imgs) 386 # eval_monitor.update_output(inference_output) 387 action = inference_output.action.numpy() 388 timesteps = env.step(action) 389 for timestep in timesteps: 390 env_id = timestep.env_id.item() 391 if timestep.done: 392 policy.reset([env_id]) 393 reward = timestep.info.eval_episode_return 394 eval_monitor.update_reward(env_id, reward) 395 if 'episode_info' in timestep.info: 396 eval_monitor.update_info(env_id, timestep.info.episode_info) 397 episode_return = eval_monitor.get_episode_return() 398 episode_return_std = np.std(episode_return) 399 episode_return_mean = np.mean(episode_return) 400 stop_flag = episode_return_mean >= stop_value and ctx.train_iter > 0 401 logging.info( 402 'Evaluation: Train Iter({})\tEnv Step({})\tMean Episode Return({:.3f})'.format( 403 ctx.train_iter, ctx.env_step, episode_return_mean 404 ) 405 ) 406 ctx.last_eval_iter = ctx.train_iter 407 ctx.eval_value = episode_return_mean 408 ctx.eval_value_std = episode_return_std 409 ctx.last_eval_value = ctx.eval_value 410 ctx.eval_output = {'episode_return': episode_return} 411 episode_info = eval_monitor.get_episode_info() 412 if episode_info is not None: 413 ctx.eval_output['episode_info'] = episode_info 414 if render: 415 ctx.eval_output['replay_video'] = eval_monitor.get_episode_video() 416 ctx.eval_output['output'] = eval_monitor.get_episode_output() 417 else: 418 ctx.eval_output['output'] = inference_output.numpy() # for compatibility 419 420 if stop_flag: 421 task.finish = True 422 423 return _evaluate 424 425 426def metric_evaluator(cfg: EasyDict, policy: Policy, dataset: Dataset, metric: IMetric) -> Callable: 427 dataloader = DataLoader(dataset, batch_size=cfg.policy.eval.batch_size) 428 429 def _evaluate(ctx: "Context"): 430 # evaluation will be executed if the task begins or enough train_iter after last evaluation 431 if ctx.last_eval_iter != -1 and \ 432 (ctx.train_iter - ctx.last_eval_iter < cfg.policy.eval.evaluator.eval_freq): 433 return 434 435 policy.reset() 436 eval_output = [] 437 438 for batch_idx, batch_data in enumerate(dataloader): 439 inputs, label = batch_data 440 inference_output = policy.forward(inputs) 441 eval_output.append(metric.eval(inference_output, label)) 442 # TODO reduce avg_eval_output among different gpus 443 avg_eval_output = metric.reduce_mean(eval_output) 444 stop_flag = metric.gt(avg_eval_output, cfg.env.stop_value) and ctx.train_iter > 0 445 logging.info( 446 'Evaluation: Train Iter({})\tEnv Step({})\tEpisode Return({:.3f})'.format( 447 ctx.train_iter, ctx.env_step, avg_eval_output 448 ) 449 ) 450 ctx.last_eval_iter = ctx.train_iter 451 ctx.eval_value = avg_eval_output 452 453 if stop_flag: 454 task.finish = True 455 456 return _evaluate 457 458 459# TODO battle evaluator