1from typing import List, Dict, Any, Tuple, Union 2from collections import namedtuple 3import torch 4import copy 5import numpy as np 6from torch.distributions import Independent, Normal 7 8from ding.torch_utils import Adam, to_device, to_dtype, unsqueeze, ContrastiveLoss 9from ding.rl_utils import happo_data, happo_error, happo_policy_error, happo_policy_data, \ 10 v_nstep_td_data, v_nstep_td_error, get_train_sample, gae, gae_data, happo_error_continuous, \ 11 get_gae 12from ding.model import model_wrap 13from ding.utils import POLICY_REGISTRY, split_data_generator, RunningMeanStd 14from ding.utils.data import default_collate, default_decollate 15from .base_policy import Policy 16from .common_utils import default_preprocess_learn 17 18 19@POLICY_REGISTRY.register('happo') 20class HAPPOPolicy(Policy): 21 """ 22 Overview: 23 Policy class of on policy version HAPPO algorithm. Paper link: https://arxiv.org/abs/2109.11251. 24 """ 25 config = dict( 26 # (str) RL policy register name (refer to function "POLICY_REGISTRY"). 27 type='happo', 28 # (bool) Whether to use cuda for network. 29 cuda=False, 30 # (bool) Whether the RL algorithm is on-policy or off-policy. (Note: in practice PPO can be off-policy used) 31 on_policy=True, 32 # (bool) Whether to use priority(priority sample, IS weight, update priority) 33 priority=False, 34 # (bool) Whether to use Importance Sampling Weight to correct biased update due to priority. 35 # If True, priority must be True. 36 priority_IS_weight=False, 37 # (bool) Whether to recompurete advantages in each iteration of on-policy PPO 38 recompute_adv=True, 39 # (str) Which kind of action space used in PPOPolicy, ['discrete', 'continuous', 'hybrid'] 40 action_space='discrete', 41 # (bool) Whether to use nstep return to calculate value target, otherwise, use return = adv + value 42 nstep_return=False, 43 # (bool) Whether to enable multi-agent training, i.e.: MAPPO 44 multi_agent=False, 45 # (bool) Whether to need policy data in process transition 46 transition_with_policy_data=True, 47 learn=dict( 48 epoch_per_collect=10, 49 batch_size=64, 50 learning_rate=3e-4, 51 # ============================================================== 52 # The following configs is algorithm-specific 53 # ============================================================== 54 # (float) The loss weight of value network, policy network weight is set to 1 55 value_weight=0.5, 56 # (float) The loss weight of entropy regularization, policy network weight is set to 1 57 entropy_weight=0.0, 58 # (float) PPO clip ratio, defaults to 0.2 59 clip_ratio=0.2, 60 # (bool) Whether to use advantage norm in a whole training batch 61 adv_norm=True, 62 value_norm=True, 63 ppo_param_init=True, 64 grad_clip_type='clip_norm', 65 grad_clip_value=0.5, 66 ignore_done=False, 67 ), 68 collect=dict( 69 # (int) Only one of [n_sample, n_episode] shoule be set 70 # n_sample=64, 71 # (int) Cut trajectories into pieces with length "unroll_len". 72 unroll_len=1, 73 # ============================================================== 74 # The following configs is algorithm-specific 75 # ============================================================== 76 # (float) Reward's future discount factor, aka. gamma. 77 discount_factor=0.99, 78 # (float) GAE lambda factor for the balance of bias and variance(1-step td and mc) 79 gae_lambda=0.95, 80 ), 81 eval=dict(), 82 ) 83 84 def _init_learn(self) -> None: 85 """ 86 Overview: 87 Initialize the learn mode of policy, including related attributes and modules. For HAPPO, it mainly \ 88 contains optimizer, algorithm-specific arguments such as loss weight, clip_ratio and recompute_adv. This \ 89 method also executes some special network initializations and prepares running mean/std monitor for value. 90 This method will be called in ``__init__`` method if ``learn`` field is in ``enable_field``. 91 92 .. note:: 93 For the member variables that need to be saved and loaded, please refer to the ``_state_dict_learn`` \ 94 and ``_load_state_dict_learn`` methods. 95 96 .. note:: 97 For the member variables that need to be monitored, please refer to the ``_monitor_vars_learn`` method. 98 99 .. note:: 100 If you want to set some spacial member variables in ``_init_learn`` method, you'd better name them \ 101 with prefix ``_learn_`` to avoid conflict with other modes, such as ``self._learn_attr1``. 102 """ 103 self._priority = self._cfg.priority 104 self._priority_IS_weight = self._cfg.priority_IS_weight 105 assert not self._priority and not self._priority_IS_weight, "Priority is not implemented in PPO" 106 107 assert self._cfg.action_space in ["continuous", "discrete"] 108 self._action_space = self._cfg.action_space 109 if self._cfg.learn.ppo_param_init: 110 for n, m in self._model.named_modules(): 111 if isinstance(m, torch.nn.Linear): 112 torch.nn.init.orthogonal_(m.weight) 113 torch.nn.init.zeros_(m.bias) 114 if self._action_space in ['continuous']: 115 # init log sigma 116 for agent_id in range(self._cfg.agent_num): 117 # if hasattr(self._model.agent_models[agent_id].actor_head, 'log_sigma_param'): 118 # torch.nn.init.constant_(self._model.agent_models[agent_id].actor_head.log_sigma_param, 1) 119 # The above initialization step has been changed to reparameterizationHead. 120 for m in list(self._model.agent_models[agent_id].critic.modules()) + \ 121 list(self._model.agent_models[agent_id].actor.modules()): 122 if isinstance(m, torch.nn.Linear): 123 # orthogonal initialization 124 torch.nn.init.orthogonal_(m.weight, gain=np.sqrt(2)) 125 torch.nn.init.zeros_(m.bias) 126 # do last policy layer scaling, this will make initial actions have (close to) 127 # 0 mean and std, and will help boost performances, 128 # see https://arxiv.org/abs/2006.05990, Fig.24 for details 129 for m in self._model.agent_models[agent_id].actor.modules(): 130 if isinstance(m, torch.nn.Linear): 131 torch.nn.init.zeros_(m.bias) 132 m.weight.data.copy_(0.01 * m.weight.data) 133 134 # Add the actor/critic parameters of each HAVACAgent in HAVAC to the parameter list of actor/critic_optimizer 135 actor_params = [] 136 critic_params = [] 137 for agent_idx in range(self._model.agent_num): 138 actor_params.append({'params': self._model.agent_models[agent_idx].actor.parameters()}) 139 critic_params.append({'params': self._model.agent_models[agent_idx].critic.parameters()}) 140 141 self._actor_optimizer = Adam( 142 actor_params, 143 lr=self._cfg.learn.learning_rate, 144 grad_clip_type=self._cfg.learn.grad_clip_type, 145 clip_value=self._cfg.learn.grad_clip_value, 146 # eps = 1e-5, 147 ) 148 149 self._critic_optimizer = Adam( 150 critic_params, 151 lr=self._cfg.learn.critic_learning_rate, 152 grad_clip_type=self._cfg.learn.grad_clip_type, 153 clip_value=self._cfg.learn.grad_clip_value, 154 # eps = 1e-5, 155 ) 156 157 self._learn_model = model_wrap(self._model, wrapper_name='base') 158 # self._learn_model = model_wrap( 159 # self._model, 160 # wrapper_name='hidden_state', 161 # state_num=self._cfg.learn.batch_size, 162 # init_fn=lambda: [None for _ in range(self._cfg.model.agent_num)] 163 # ) 164 165 # Algorithm config 166 self._value_weight = self._cfg.learn.value_weight 167 self._entropy_weight = self._cfg.learn.entropy_weight 168 self._clip_ratio = self._cfg.learn.clip_ratio 169 self._adv_norm = self._cfg.learn.adv_norm 170 self._value_norm = self._cfg.learn.value_norm 171 if self._value_norm: 172 self._running_mean_std = RunningMeanStd(epsilon=1e-4, device=self._device) 173 self._gamma = self._cfg.collect.discount_factor 174 self._gae_lambda = self._cfg.collect.gae_lambda 175 self._recompute_adv = self._cfg.recompute_adv 176 # Main model 177 self._learn_model.reset() 178 179 def prepocess_data_agent(self, data: Dict[str, Any]): 180 """ 181 Overview: 182 Preprocess data for agent dim. This function is used in learn mode. \ 183 It will be called recursively to process nested dict data. \ 184 It will transpose the data with shape (B, agent_num, ...) to (agent_num, B, ...). \ 185 Arguments: 186 - data (:obj:`dict`): Dict type data, where each element is the data of an agent of dict type. 187 Returns: 188 - ret (:obj:`dict`): Dict type data, where each element is the data of an agent of dict type. 189 """ 190 ret = {} 191 for key, value in data.items(): 192 if isinstance(value, dict): 193 ret[key] = self.prepocess_data_agent(value) 194 elif isinstance(value, torch.Tensor) and len(value.shape) > 1: 195 ret[key] = value.transpose(0, 1) 196 else: 197 ret[key] = value 198 return ret 199 200 def _forward_learn(self, data: Dict[str, Any]) -> Dict[str, Any]: 201 """ 202 Overview: 203 Forward and backward function of learn mode. 204 Arguments: 205 - data (:obj:`dict`): List type data, where each element is the data of an agent of dict type. 206 Returns: 207 - info_dict (:obj:`Dict[str, Any]`): 208 Including current lr, total_loss, policy_loss, value_loss, entropy_loss, \ 209 adv_abs_max, approx_kl, clipfrac 210 Overview: 211 Policy forward function of learn mode (training policy and updating parameters). Forward means \ 212 that the policy inputs some training batch data from the replay buffer and then returns the output \ 213 result, including various training information such as loss, clipfrac, approx_kl. 214 Arguments: 215 - data (:obj:`List[Dict[int, Any]]`): The input data used for policy forward, including the latest \ 216 collected training samples for on-policy algorithms like HAPPO. For each element in list, the key of \ 217 dict is the name of data items and the value is the corresponding data. Usually, the value is \ 218 torch.Tensor or np.ndarray or there dict/list combinations. In the ``_forward_learn`` method, data \ 219 often need to first be stacked in the batch dimension by some utility functions such as \ 220 ``default_preprocess_learn``. \ 221 For HAPPO, each element in list is a dict containing at least the following keys: ``obs``, \ 222 ``action``, ``reward``, ``logit``, ``value``, ``done``. Sometimes, it also contains other keys \ 223 such as ``weight``. 224 Returns: 225 - return_infos (:obj:`List[Dict[str, Any]]`): The information list that indicated training result, each \ 226 training iteration contains append a information dict into the final list. The list will be precessed \ 227 and recorded in text log and tensorboard. The value of the dict must be python scalar or a list of \ 228 scalars. For the detailed definition of the dict, refer to the code of ``_monitor_vars_learn`` method. 229 230 .. tip:: 231 The training procedure of HAPPO is three for loops. The outermost loop trains each agent separately. \ 232 The middle loop trains all the collected training samples with ``epoch_per_collect`` epochs. The inner \ 233 loop splits all the data into different mini-batch with the length of ``batch_size``. 234 235 .. note:: 236 The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ 237 For the data type that not supported, the main reason is that the corresponding model does not support it. \ 238 You can implement you own model rather than use the default model. For more information, please raise an \ 239 issue in GitHub repo and we will continue to follow up. 240 241 .. note:: 242 For more detailed examples, please refer to our unittest for HAPPOPolicy: ``ding.policy.tests.test_happo``. 243 """ 244 data = default_preprocess_learn(data, ignore_done=self._cfg.learn.ignore_done, use_nstep=False) 245 all_data_len = data['obs']['agent_state'].shape[0] 246 # fator is the ratio of the old and new strategies of the first m-1 agents, initialized to 1. 247 # Each transition has its own factor. ref: http://arxiv.org/abs/2109.11251 248 factor = torch.ones(all_data_len, 1) # (B, 1) 249 if self._cuda: 250 data = to_device(data, self._device) 251 factor = to_device(factor, self._device) 252 # process agent dim 253 data = self.prepocess_data_agent(data) 254 # ==================== 255 # PPO forward 256 # ==================== 257 return_infos = [] 258 self._learn_model.train() 259 260 for agent_id in range(self._cfg.agent_num): 261 agent_data = {} 262 for key, value in data.items(): 263 if value is not None: 264 if type(value) is dict: 265 agent_data[key] = {k: v[agent_id] for k, v in value.items()} # not feasible for rnn 266 elif len(value.shape) > 1: 267 agent_data[key] = data[key][agent_id] 268 else: 269 agent_data[key] = data[key] 270 else: 271 agent_data[key] = data[key] 272 273 # update factor 274 agent_data['factor'] = factor 275 # calculate old_logits of all data in buffer for later factor 276 inputs = { 277 'obs': agent_data['obs'], 278 # 'actor_prev_state': agent_data['actor_prev_state'], 279 # 'critic_prev_state': agent_data['critic_prev_state'], 280 } 281 old_logits = self._learn_model.forward(agent_id, inputs, mode='compute_actor')['logit'] 282 283 for epoch in range(self._cfg.learn.epoch_per_collect): 284 if self._recompute_adv: # calculate new value using the new updated value network 285 with torch.no_grad(): 286 inputs['obs'] = agent_data['obs'] 287 # value = self._learn_model.forward(agent_id, agent_data['obs'], mode='compute_critic')['value'] 288 value = self._learn_model.forward(agent_id, inputs, mode='compute_critic')['value'] 289 inputs['obs'] = agent_data['next_obs'] 290 next_value = self._learn_model.forward(agent_id, inputs, mode='compute_critic')['value'] 291 if self._value_norm: 292 value *= self._running_mean_std.std 293 next_value *= self._running_mean_std.std 294 295 traj_flag = agent_data.get('traj_flag', None) # traj_flag indicates termination of trajectory 296 compute_adv_data = gae_data( 297 value, next_value, agent_data['reward'], agent_data['done'], traj_flag 298 ) 299 agent_data['adv'] = gae(compute_adv_data, self._gamma, self._gae_lambda) 300 301 unnormalized_returns = value + agent_data['adv'] 302 303 if self._value_norm: 304 agent_data['value'] = value / self._running_mean_std.std 305 agent_data['return'] = unnormalized_returns / self._running_mean_std.std 306 self._running_mean_std.update(unnormalized_returns.cpu().numpy()) 307 else: 308 agent_data['value'] = value 309 agent_data['return'] = unnormalized_returns 310 311 else: # don't recompute adv 312 if self._value_norm: 313 unnormalized_return = agent_data['adv'] + agent_data['value'] * self._running_mean_std.std 314 agent_data['return'] = unnormalized_return / self._running_mean_std.std 315 self._running_mean_std.update(unnormalized_return.cpu().numpy()) 316 else: 317 agent_data['return'] = agent_data['adv'] + agent_data['value'] 318 319 for batch in split_data_generator(agent_data, self._cfg.learn.batch_size, shuffle=True): 320 inputs = { 321 'obs': batch['obs'], 322 # 'actor_prev_state': batch['actor_prev_state'], 323 # 'critic_prev_state': batch['critic_prev_state'], 324 } 325 output = self._learn_model.forward(agent_id, inputs, mode='compute_actor_critic') 326 adv = batch['adv'] 327 if self._adv_norm: 328 # Normalize advantage in a train_batch 329 adv = (adv - adv.mean()) / (adv.std() + 1e-8) 330 331 # Calculate happo error 332 if self._action_space == 'continuous': 333 happo_batch = happo_data( 334 output['logit'], batch['logit'], batch['action'], output['value'], batch['value'], adv, 335 batch['return'], batch['weight'], batch['factor'] 336 ) 337 happo_loss, happo_info = happo_error_continuous(happo_batch, self._clip_ratio) 338 elif self._action_space == 'discrete': 339 happo_batch = happo_data( 340 output['logit'], batch['logit'], batch['action'], output['value'], batch['value'], adv, 341 batch['return'], batch['weight'], batch['factor'] 342 ) 343 happo_loss, happo_info = happo_error(happo_batch, self._clip_ratio) 344 wv, we = self._value_weight, self._entropy_weight 345 total_loss = happo_loss.policy_loss + wv * happo_loss.value_loss - we * happo_loss.entropy_loss 346 347 # actor update 348 # critic update 349 self._actor_optimizer.zero_grad() 350 self._critic_optimizer.zero_grad() 351 total_loss.backward() 352 self._actor_optimizer.step() 353 self._critic_optimizer.step() 354 355 return_info = { 356 'agent{}_cur_lr'.format(agent_id): self._actor_optimizer.defaults['lr'], 357 'agent{}_total_loss'.format(agent_id): total_loss.item(), 358 'agent{}_policy_loss'.format(agent_id): happo_loss.policy_loss.item(), 359 'agent{}_value_loss'.format(agent_id): happo_loss.value_loss.item(), 360 'agent{}_entropy_loss'.format(agent_id): happo_loss.entropy_loss.item(), 361 'agent{}_adv_max'.format(agent_id): adv.max().item(), 362 'agent{}_adv_mean'.format(agent_id): adv.mean().item(), 363 'agent{}_value_mean'.format(agent_id): output['value'].mean().item(), 364 'agent{}_value_max'.format(agent_id): output['value'].max().item(), 365 'agent{}_approx_kl'.format(agent_id): happo_info.approx_kl, 366 'agent{}_clipfrac'.format(agent_id): happo_info.clipfrac, 367 } 368 if self._action_space == 'continuous': 369 return_info.update( 370 { 371 'agent{}_act'.format(agent_id): batch['action'].float().mean().item(), 372 'agent{}_mu_mean'.format(agent_id): output['logit']['mu'].mean().item(), 373 'agent{}_sigma_mean'.format(agent_id): output['logit']['sigma'].mean().item(), 374 } 375 ) 376 return_infos.append(return_info) 377 # calculate the factor 378 inputs = { 379 'obs': agent_data['obs'], 380 # 'actor_prev_state': agent_data['actor_prev_state'], 381 } 382 new_logits = self._learn_model.forward(agent_id, inputs, mode='compute_actor')['logit'] 383 if self._cfg.action_space == 'discrete': 384 dist_new = torch.distributions.categorical.Categorical(logits=new_logits) 385 dist_old = torch.distributions.categorical.Categorical(logits=old_logits) 386 elif self._cfg.action_space == 'continuous': 387 dist_new = Normal(new_logits['mu'], new_logits['sigma']) 388 dist_old = Normal(old_logits['mu'], old_logits['sigma']) 389 logp_new = dist_new.log_prob(agent_data['action']) 390 logp_old = dist_old.log_prob(agent_data['action']) 391 if len(logp_new.shape) > 1: 392 # for logp with shape(B, action_shape), we need to calculate the product of all action dimensions. 393 factor = factor * torch.prod( 394 torch.exp(logp_new - logp_old), dim=-1 395 ).reshape(all_data_len, 1).detach() # attention the shape 396 else: 397 # for logp with shape(B, ), directly calculate factor 398 factor = factor * torch.exp(logp_new - logp_old).reshape(all_data_len, 1).detach() 399 return return_infos 400 401 def _state_dict_learn(self) -> Dict[str, Any]: 402 """ 403 Overview: 404 Return the state_dict of learn mode optimizer and model. 405 Returns: 406 - state_dict (:obj:`Dict[str, Any]`): The dict of current policy learn mode. It contains the \ 407 state_dict of current policy network and optimizer. 408 """ 409 return { 410 'model': self._learn_model.state_dict(), 411 'actor_optimizer': self._actor_optimizer.state_dict(), 412 'critic_optimizer': self._critic_optimizer.state_dict(), 413 } 414 415 def _load_state_dict_learn(self, state_dict: Dict[str, Any]) -> None: 416 """ 417 Overview: 418 Load the state_dict of learn mode optimizer and model. 419 Arguments: 420 - state_dict (:obj:`Dict[str, Any]`): The dict of policy learn mode. It contains the state_dict \ 421 of current policy network and optimizer. 422 """ 423 self._learn_model.load_state_dict(state_dict['model']) 424 self._actor_optimizer.load_state_dict(state_dict['actor_optimizer']) 425 self._critic_optimizer.load_state_dict(state_dict['critic_optimizer']) 426 427 def _init_collect(self) -> None: 428 """ 429 Overview: 430 Initialize the collect mode of policy, including related attributes and modules. For HAPPO, it contains \ 431 the collect_model to balance the exploration and exploitation (e.g. the multinomial sample mechanism in \ 432 discrete action space), and other algorithm-specific arguments such as unroll_len and gae_lambda. 433 This method will be called in ``__init__`` method if ``collect`` field is in ``enable_field``. 434 435 .. note:: 436 If you want to set some spacial member variables in ``_init_collect`` method, you'd better name them \ 437 with prefix ``_collect_`` to avoid conflict with other modes, such as ``self._collect_attr1``. 438 439 .. tip:: 440 Some variables need to initialize independently in different modes, such as gamma and gae_lambda in PPO. \ 441 This design is for the convenience of parallel execution of different policy modes. 442 """ 443 self._unroll_len = self._cfg.collect.unroll_len 444 assert self._cfg.action_space in ["continuous", "discrete"] 445 self._action_space = self._cfg.action_space 446 if self._action_space == 'continuous': 447 self._collect_model = model_wrap(self._model, wrapper_name='reparam_sample') 448 elif self._action_space == 'discrete': 449 self._collect_model = model_wrap(self._model, wrapper_name='multinomial_sample') 450 self._collect_model.reset() 451 self._gamma = self._cfg.collect.discount_factor 452 self._gae_lambda = self._cfg.collect.gae_lambda 453 self._recompute_adv = self._cfg.recompute_adv 454 455 def _forward_collect(self, data: Dict[int, Any]) -> dict: 456 """ 457 Overview: 458 Policy forward function of collect mode (collecting training data by interacting with envs). Forward means \ 459 that the policy gets some necessary data (mainly observation) from the envs and then returns the output \ 460 data, such as the action to interact with the envs. 461 Arguments: 462 - data (:obj:`Dict[int, Any]`): The input data used for policy forward, including at least the obs. The \ 463 key of the dict is environment id and the value is the corresponding data of the env. 464 Returns: 465 - output (:obj:`Dict[int, Any]`): The output data of policy forward, including at least the action and \ 466 other necessary data (action logit and value) for learn mode defined in ``self._process_transition`` \ 467 method. The key of the dict is the same as the input data, i.e. environment id. 468 469 .. tip:: 470 If you want to add more tricks on this policy, like temperature factor in multinomial sample, you can pass \ 471 related data as extra keyword arguments of this method. 472 473 .. note:: 474 The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ 475 For the data type that not supported, the main reason is that the corresponding model does not support it. \ 476 You can implement you own model rather than use the default model. For more information, please raise an \ 477 issue in GitHub repo and we will continue to follow up. 478 479 .. note:: 480 For more detailed examples, please refer to our unittest for HAPPOPolicy: ``ding.policy.tests.test_happo``. 481 """ 482 data_id = list(data.keys()) 483 data = default_collate(list(data.values())) 484 if self._cuda: 485 data = to_device(data, self._device) 486 data = {k: v.transpose(0, 1) for k, v in data.items()} # not feasible for rnn 487 self._collect_model.eval() 488 with torch.no_grad(): 489 outputs = [] 490 for agent_id in range(self._cfg.agent_num): 491 # output = self._collect_model.forward(agent_id, data, mode='compute_actor_critic') 492 single_agent_obs = {k: v[agent_id] for k, v in data.items()} 493 input = { 494 'obs': single_agent_obs, 495 } 496 output = self._collect_model.forward(agent_id, input, mode='compute_actor_critic') 497 outputs.append(output) 498 # transfer data from (M, B, N)->(B, M, N) 499 result = {} 500 for key in outputs[0].keys(): 501 if isinstance(outputs[0][key], dict): 502 subkeys = outputs[0][key].keys() 503 stacked_subvalues = {} 504 for subkey in subkeys: 505 stacked_subvalues[subkey] = \ 506 torch.stack([output[key][subkey] for output in outputs], dim=0).transpose(0, 1) 507 result[key] = stacked_subvalues 508 else: 509 # If Value is tensor, stack it directly 510 if isinstance(outputs[0][key], torch.Tensor): 511 result[key] = torch.stack([output[key] for output in outputs], dim=0).transpose(0, 1) 512 else: 513 # If it is not tensor, assume that it is a non-stackable data type \ 514 # (such as int, float, etc.), and directly retain the original value 515 result[key] = [output[key] for output in outputs] 516 output = result 517 if self._cuda: 518 output = to_device(output, 'cpu') 519 output = default_decollate(output) 520 return {i: d for i, d in zip(data_id, output)} 521 522 def _process_transition(self, obs: Any, model_output: dict, timestep: namedtuple) -> dict: 523 """ 524 Overview: 525 Process and pack one timestep transition data into a dict, which can be directly used for training and \ 526 saved in replay buffer. For HAPPO, it contains obs, next_obs, action, reward, done, logit, value. 527 Arguments: 528 - obs (:obj:`torch.Tensor`): The env observation of current timestep. 529 - policy_output (:obj:`Dict[str, torch.Tensor]`): The output of the policy network with the observation \ 530 as input. For PPO, it contains the state value, action and the logit of the action. 531 - timestep (:obj:`namedtuple`): The execution result namedtuple returned by the environment step method, \ 532 except all the elements have been transformed into tensor data. Usually, it contains the next obs, \ 533 reward, done, info, etc. 534 Returns: 535 - transition (:obj:`Dict[str, torch.Tensor]`): The processed transition data of the current timestep. 536 537 .. note:: 538 ``next_obs`` is used to calculate nstep return when necessary, so we place in into transition by default. \ 539 You can delete this field to save memory occupancy if you do not need nstep return. 540 """ 541 transition = { 542 'obs': obs, 543 'next_obs': timestep.obs, 544 'action': model_output['action'], 545 'logit': model_output['logit'], 546 'value': model_output['value'], 547 'reward': timestep.reward, 548 'done': timestep.done, 549 } 550 return transition 551 552 def _get_train_sample(self, data: list) -> Union[None, List[Any]]: 553 """ 554 Overview: 555 For a given trajectory (transitions, a list of transition) data, process it into a list of sample that \ 556 can be used for training directly. In HAPPO, a train sample is a processed transition with new computed \ 557 ``traj_flag`` and ``adv`` field. This method is usually used in collectors to execute necessary \ 558 RL data preprocessing before training, which can help learner amortize revelant time consumption. \ 559 In addition, you can also implement this method as an identity function and do the data processing \ 560 in ``self._forward_learn`` method. 561 Arguments: 562 - transitions (:obj:`List[Dict[str, Any]`): The trajectory data (a list of transition), each element is \ 563 the same format as the return value of ``self._process_transition`` method. 564 Returns: 565 - samples (:obj:`List[Dict[str, Any]]`): The processed train samples, each element is the similar format \ 566 as input transitions, but may contain more data for training, such as GAE advantage. 567 """ 568 data = to_device(data, self._device) 569 for transition in data: 570 transition['traj_flag'] = copy.deepcopy(transition['done']) 571 data[-1]['traj_flag'] = True 572 573 if self._cfg.learn.ignore_done: 574 data[-1]['done'] = False 575 576 if data[-1]['done']: 577 last_value = torch.zeros_like(data[-1]['value']) 578 else: 579 with torch.no_grad(): 580 last_values = [] 581 for agent_id in range(self._cfg.agent_num): 582 inputs = {'obs': {k: unsqueeze(v[agent_id], 0) for k, v in data[-1]['next_obs'].items()}} 583 last_value = self._collect_model.forward(agent_id, inputs, mode='compute_actor_critic')['value'] 584 last_values.append(last_value) 585 last_value = torch.cat(last_values) 586 if len(last_value.shape) == 2: # multi_agent case: 587 last_value = last_value.squeeze(0) 588 if self._value_norm: 589 last_value *= self._running_mean_std.std 590 for i in range(len(data)): 591 data[i]['value'] *= self._running_mean_std.std 592 data = get_gae( 593 data, 594 to_device(last_value, self._device), 595 gamma=self._gamma, 596 gae_lambda=self._gae_lambda, 597 cuda=False, 598 ) 599 if self._value_norm: 600 for i in range(len(data)): 601 data[i]['value'] /= self._running_mean_std.std 602 603 # remove next_obs for save memory when not recompute adv 604 if not self._recompute_adv: 605 for i in range(len(data)): 606 data[i].pop('next_obs') 607 return get_train_sample(data, self._unroll_len) 608 609 def _init_eval(self) -> None: 610 """ 611 Overview: 612 Initialize the eval mode of policy, including related attributes and modules. For PPO, it contains the \ 613 eval model to select optimial action (e.g. greedily select action with argmax mechanism in discrete action). 614 This method will be called in ``__init__`` method if ``eval`` field is in ``enable_field``. 615 616 .. note:: 617 If you want to set some spacial member variables in ``_init_eval`` method, you'd better name them \ 618 with prefix ``_eval_`` to avoid conflict with other modes, such as ``self._eval_attr1``. 619 """ 620 assert self._cfg.action_space in ["continuous", "discrete"] 621 self._action_space = self._cfg.action_space 622 if self._action_space == 'continuous': 623 self._eval_model = model_wrap(self._model, wrapper_name='deterministic_sample') 624 elif self._action_space == 'discrete': 625 self._eval_model = model_wrap(self._model, wrapper_name='argmax_sample') 626 self._eval_model.reset() 627 628 def _forward_eval(self, data: dict) -> dict: 629 """ 630 Overview: 631 Policy forward function of eval mode (evaluation policy performance by interacting with envs). Forward \ 632 means that the policy gets some necessary data (mainly observation) from the envs and then returns the \ 633 action to interact with the envs. ``_forward_eval`` in HAPPO often uses deterministic sample method to \ 634 get actions while ``_forward_collect`` usually uses stochastic sample method for balance exploration and \ 635 exploitation. 636 Arguments: 637 - data (:obj:`Dict[int, Any]`): The input data used for policy forward, including at least the obs. The \ 638 key of the dict is environment id and the value is the corresponding data of the env. 639 Returns: 640 - output (:obj:`Dict[int, Any]`): The output data of policy forward, including at least the action. The \ 641 key of the dict is the same as the input data, i.e. environment id. 642 643 .. note:: 644 The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ 645 For the data type that not supported, the main reason is that the corresponding model does not support it. \ 646 You can implement you own model rather than use the default model. For more information, please raise an \ 647 issue in GitHub repo and we will continue to follow up. 648 649 .. note:: 650 For more detailed examples, please refer to our unittest for HAPPOPolicy: ``ding.policy.tests.test_happo``. 651 """ 652 data_id = list(data.keys()) 653 data = default_collate(list(data.values())) 654 if self._cuda: 655 data = to_device(data, self._device) 656 # transfer data from (B, M, N)->(M, B, N) 657 data = {k: v.transpose(0, 1) for k, v in data.items()} # not feasible for rnn 658 self._eval_model.eval() 659 with torch.no_grad(): 660 outputs = [] 661 for agent_id in range(self._cfg.agent_num): 662 single_agent_obs = {k: v[agent_id] for k, v in data.items()} 663 input = { 664 'obs': single_agent_obs, 665 } 666 output = self._eval_model.forward(agent_id, input, mode='compute_actor') 667 outputs.append(output) 668 output = self.revert_agent_data(outputs) 669 if self._cuda: 670 output = to_device(output, 'cpu') 671 output = default_decollate(output) 672 return {i: d for i, d in zip(data_id, output)} 673 674 def default_model(self) -> Tuple[str, List[str]]: 675 """ 676 Overview: 677 Return this algorithm default neural network model setting for demonstration. ``__init__`` method will \ 678 automatically call this method to get the default model setting and create model. 679 Returns: 680 - model_info (:obj:`Tuple[str, List[str]]`): The registered model name and model's import_names. 681 682 .. note:: 683 The user can define and use customized network model but must obey the same inferface definition indicated \ 684 by import_names path. For example about HAPPO, its registered name is ``happo`` and the import_names is \ 685 ``ding.model.template.havac``. 686 """ 687 return 'havac', ['ding.model.template.havac'] 688 689 def _monitor_vars_learn(self) -> List[str]: 690 """ 691 Overview: 692 Return the necessary keys for logging the return dict of ``self._forward_learn``. The logger module, such \ 693 as text logger, tensorboard logger, will use these keys to save the corresponding data. 694 Returns: 695 - necessary_keys (:obj:`List[str]`): The list of the necessary keys to be logged. 696 """ 697 variables = super()._monitor_vars_learn() + [ 698 'policy_loss', 699 'value_loss', 700 'entropy_loss', 701 'adv_max', 702 'adv_mean', 703 'approx_kl', 704 'clipfrac', 705 'value_max', 706 'value_mean', 707 ] 708 if self._action_space == 'continuous': 709 variables += ['mu_mean', 'sigma_mean', 'sigma_grad', 'act'] 710 prefixes = [f'agent{i}_' for i in range(self._cfg.agent_num)] 711 variables = [prefix + var for prefix in prefixes for var in variables] 712 return variables 713 714 def revert_agent_data(self, data: list): 715 """ 716 Overview: 717 Revert the data of each agent to the original data format. 718 Arguments: 719 - data (:obj:`list`): List type data, where each element is the data of an agent of dict type. 720 Returns: 721 - ret (:obj:`dict`): Dict type data, where each element is the data of an agent of dict type. 722 """ 723 ret = {} 724 # Traverse all keys of the first output 725 for key in data[0].keys(): 726 if isinstance(data[0][key], torch.Tensor): 727 # If the value corresponding to the current key is tensor, stack N tensors 728 stacked_tensor = torch.stack([output[key] for output in data], dim=0) 729 ret[key] = stacked_tensor.transpose(0, 1) 730 elif isinstance(data[0][key], dict): 731 # If the value corresponding to the current key is a dictionary, recursively \ 732 # call the function to process the contents inside the dictionary. 733 ret[key] = self.revert_agent_data([output[key] for output in data]) 734 return ret