Skip to content

ding.policy.happo

ding.policy.happo

HAPPOPolicy

Bases: Policy

Overview

Policy class of on policy version HAPPO algorithm. Paper link: https://arxiv.org/abs/2109.11251.

prepocess_data_agent(data)

Overview

Preprocess data for agent dim. This function is used in learn mode. It will be called recursively to process nested dict data. It will transpose the data with shape (B, agent_num, ...) to (agent_num, B, ...). Arguments: - data (:obj:dict): Dict type data, where each element is the data of an agent of dict type.

Returns: - ret (:obj:dict): Dict type data, where each element is the data of an agent of dict type.

default_model()

Overview

Return this algorithm default neural network model setting for demonstration. __init__ method will automatically call this method to get the default model setting and create model.

Returns: - model_info (:obj:Tuple[str, List[str]]): The registered model name and model's import_names.

.. note:: The user can define and use customized network model but must obey the same inferface definition indicated by import_names path. For example about HAPPO, its registered name is happo and the import_names is ding.model.template.havac.

revert_agent_data(data)

Overview

Revert the data of each agent to the original data format.

Arguments: - data (:obj:list): List type data, where each element is the data of an agent of dict type. Returns: - ret (:obj:dict): Dict type data, where each element is the data of an agent of dict type.

Full Source Code

../ding/policy/happo.py

1from typing import List, Dict, Any, Tuple, Union 2from collections import namedtuple 3import torch 4import copy 5import numpy as np 6from torch.distributions import Independent, Normal 7 8from ding.torch_utils import Adam, to_device, to_dtype, unsqueeze, ContrastiveLoss 9from ding.rl_utils import happo_data, happo_error, happo_policy_error, happo_policy_data, \ 10 v_nstep_td_data, v_nstep_td_error, get_train_sample, gae, gae_data, happo_error_continuous, \ 11 get_gae 12from ding.model import model_wrap 13from ding.utils import POLICY_REGISTRY, split_data_generator, RunningMeanStd 14from ding.utils.data import default_collate, default_decollate 15from .base_policy import Policy 16from .common_utils import default_preprocess_learn 17 18 19@POLICY_REGISTRY.register('happo') 20class HAPPOPolicy(Policy): 21 """ 22 Overview: 23 Policy class of on policy version HAPPO algorithm. Paper link: https://arxiv.org/abs/2109.11251. 24 """ 25 config = dict( 26 # (str) RL policy register name (refer to function "POLICY_REGISTRY"). 27 type='happo', 28 # (bool) Whether to use cuda for network. 29 cuda=False, 30 # (bool) Whether the RL algorithm is on-policy or off-policy. (Note: in practice PPO can be off-policy used) 31 on_policy=True, 32 # (bool) Whether to use priority(priority sample, IS weight, update priority) 33 priority=False, 34 # (bool) Whether to use Importance Sampling Weight to correct biased update due to priority. 35 # If True, priority must be True. 36 priority_IS_weight=False, 37 # (bool) Whether to recompurete advantages in each iteration of on-policy PPO 38 recompute_adv=True, 39 # (str) Which kind of action space used in PPOPolicy, ['discrete', 'continuous', 'hybrid'] 40 action_space='discrete', 41 # (bool) Whether to use nstep return to calculate value target, otherwise, use return = adv + value 42 nstep_return=False, 43 # (bool) Whether to enable multi-agent training, i.e.: MAPPO 44 multi_agent=False, 45 # (bool) Whether to need policy data in process transition 46 transition_with_policy_data=True, 47 learn=dict( 48 epoch_per_collect=10, 49 batch_size=64, 50 learning_rate=3e-4, 51 # ============================================================== 52 # The following configs is algorithm-specific 53 # ============================================================== 54 # (float) The loss weight of value network, policy network weight is set to 1 55 value_weight=0.5, 56 # (float) The loss weight of entropy regularization, policy network weight is set to 1 57 entropy_weight=0.0, 58 # (float) PPO clip ratio, defaults to 0.2 59 clip_ratio=0.2, 60 # (bool) Whether to use advantage norm in a whole training batch 61 adv_norm=True, 62 value_norm=True, 63 ppo_param_init=True, 64 grad_clip_type='clip_norm', 65 grad_clip_value=0.5, 66 ignore_done=False, 67 ), 68 collect=dict( 69 # (int) Only one of [n_sample, n_episode] shoule be set 70 # n_sample=64, 71 # (int) Cut trajectories into pieces with length "unroll_len". 72 unroll_len=1, 73 # ============================================================== 74 # The following configs is algorithm-specific 75 # ============================================================== 76 # (float) Reward's future discount factor, aka. gamma. 77 discount_factor=0.99, 78 # (float) GAE lambda factor for the balance of bias and variance(1-step td and mc) 79 gae_lambda=0.95, 80 ), 81 eval=dict(), 82 ) 83 84 def _init_learn(self) -> None: 85 """ 86 Overview: 87 Initialize the learn mode of policy, including related attributes and modules. For HAPPO, it mainly \ 88 contains optimizer, algorithm-specific arguments such as loss weight, clip_ratio and recompute_adv. This \ 89 method also executes some special network initializations and prepares running mean/std monitor for value. 90 This method will be called in ``__init__`` method if ``learn`` field is in ``enable_field``. 91 92 .. note:: 93 For the member variables that need to be saved and loaded, please refer to the ``_state_dict_learn`` \ 94 and ``_load_state_dict_learn`` methods. 95 96 .. note:: 97 For the member variables that need to be monitored, please refer to the ``_monitor_vars_learn`` method. 98 99 .. note:: 100 If you want to set some spacial member variables in ``_init_learn`` method, you'd better name them \ 101 with prefix ``_learn_`` to avoid conflict with other modes, such as ``self._learn_attr1``. 102 """ 103 self._priority = self._cfg.priority 104 self._priority_IS_weight = self._cfg.priority_IS_weight 105 assert not self._priority and not self._priority_IS_weight, "Priority is not implemented in PPO" 106 107 assert self._cfg.action_space in ["continuous", "discrete"] 108 self._action_space = self._cfg.action_space 109 if self._cfg.learn.ppo_param_init: 110 for n, m in self._model.named_modules(): 111 if isinstance(m, torch.nn.Linear): 112 torch.nn.init.orthogonal_(m.weight) 113 torch.nn.init.zeros_(m.bias) 114 if self._action_space in ['continuous']: 115 # init log sigma 116 for agent_id in range(self._cfg.agent_num): 117 # if hasattr(self._model.agent_models[agent_id].actor_head, 'log_sigma_param'): 118 # torch.nn.init.constant_(self._model.agent_models[agent_id].actor_head.log_sigma_param, 1) 119 # The above initialization step has been changed to reparameterizationHead. 120 for m in list(self._model.agent_models[agent_id].critic.modules()) + \ 121 list(self._model.agent_models[agent_id].actor.modules()): 122 if isinstance(m, torch.nn.Linear): 123 # orthogonal initialization 124 torch.nn.init.orthogonal_(m.weight, gain=np.sqrt(2)) 125 torch.nn.init.zeros_(m.bias) 126 # do last policy layer scaling, this will make initial actions have (close to) 127 # 0 mean and std, and will help boost performances, 128 # see https://arxiv.org/abs/2006.05990, Fig.24 for details 129 for m in self._model.agent_models[agent_id].actor.modules(): 130 if isinstance(m, torch.nn.Linear): 131 torch.nn.init.zeros_(m.bias) 132 m.weight.data.copy_(0.01 * m.weight.data) 133 134 # Add the actor/critic parameters of each HAVACAgent in HAVAC to the parameter list of actor/critic_optimizer 135 actor_params = [] 136 critic_params = [] 137 for agent_idx in range(self._model.agent_num): 138 actor_params.append({'params': self._model.agent_models[agent_idx].actor.parameters()}) 139 critic_params.append({'params': self._model.agent_models[agent_idx].critic.parameters()}) 140 141 self._actor_optimizer = Adam( 142 actor_params, 143 lr=self._cfg.learn.learning_rate, 144 grad_clip_type=self._cfg.learn.grad_clip_type, 145 clip_value=self._cfg.learn.grad_clip_value, 146 # eps = 1e-5, 147 ) 148 149 self._critic_optimizer = Adam( 150 critic_params, 151 lr=self._cfg.learn.critic_learning_rate, 152 grad_clip_type=self._cfg.learn.grad_clip_type, 153 clip_value=self._cfg.learn.grad_clip_value, 154 # eps = 1e-5, 155 ) 156 157 self._learn_model = model_wrap(self._model, wrapper_name='base') 158 # self._learn_model = model_wrap( 159 # self._model, 160 # wrapper_name='hidden_state', 161 # state_num=self._cfg.learn.batch_size, 162 # init_fn=lambda: [None for _ in range(self._cfg.model.agent_num)] 163 # ) 164 165 # Algorithm config 166 self._value_weight = self._cfg.learn.value_weight 167 self._entropy_weight = self._cfg.learn.entropy_weight 168 self._clip_ratio = self._cfg.learn.clip_ratio 169 self._adv_norm = self._cfg.learn.adv_norm 170 self._value_norm = self._cfg.learn.value_norm 171 if self._value_norm: 172 self._running_mean_std = RunningMeanStd(epsilon=1e-4, device=self._device) 173 self._gamma = self._cfg.collect.discount_factor 174 self._gae_lambda = self._cfg.collect.gae_lambda 175 self._recompute_adv = self._cfg.recompute_adv 176 # Main model 177 self._learn_model.reset() 178 179 def prepocess_data_agent(self, data: Dict[str, Any]): 180 """ 181 Overview: 182 Preprocess data for agent dim. This function is used in learn mode. \ 183 It will be called recursively to process nested dict data. \ 184 It will transpose the data with shape (B, agent_num, ...) to (agent_num, B, ...). \ 185 Arguments: 186 - data (:obj:`dict`): Dict type data, where each element is the data of an agent of dict type. 187 Returns: 188 - ret (:obj:`dict`): Dict type data, where each element is the data of an agent of dict type. 189 """ 190 ret = {} 191 for key, value in data.items(): 192 if isinstance(value, dict): 193 ret[key] = self.prepocess_data_agent(value) 194 elif isinstance(value, torch.Tensor) and len(value.shape) > 1: 195 ret[key] = value.transpose(0, 1) 196 else: 197 ret[key] = value 198 return ret 199 200 def _forward_learn(self, data: Dict[str, Any]) -> Dict[str, Any]: 201 """ 202 Overview: 203 Forward and backward function of learn mode. 204 Arguments: 205 - data (:obj:`dict`): List type data, where each element is the data of an agent of dict type. 206 Returns: 207 - info_dict (:obj:`Dict[str, Any]`): 208 Including current lr, total_loss, policy_loss, value_loss, entropy_loss, \ 209 adv_abs_max, approx_kl, clipfrac 210 Overview: 211 Policy forward function of learn mode (training policy and updating parameters). Forward means \ 212 that the policy inputs some training batch data from the replay buffer and then returns the output \ 213 result, including various training information such as loss, clipfrac, approx_kl. 214 Arguments: 215 - data (:obj:`List[Dict[int, Any]]`): The input data used for policy forward, including the latest \ 216 collected training samples for on-policy algorithms like HAPPO. For each element in list, the key of \ 217 dict is the name of data items and the value is the corresponding data. Usually, the value is \ 218 torch.Tensor or np.ndarray or there dict/list combinations. In the ``_forward_learn`` method, data \ 219 often need to first be stacked in the batch dimension by some utility functions such as \ 220 ``default_preprocess_learn``. \ 221 For HAPPO, each element in list is a dict containing at least the following keys: ``obs``, \ 222 ``action``, ``reward``, ``logit``, ``value``, ``done``. Sometimes, it also contains other keys \ 223 such as ``weight``. 224 Returns: 225 - return_infos (:obj:`List[Dict[str, Any]]`): The information list that indicated training result, each \ 226 training iteration contains append a information dict into the final list. The list will be precessed \ 227 and recorded in text log and tensorboard. The value of the dict must be python scalar or a list of \ 228 scalars. For the detailed definition of the dict, refer to the code of ``_monitor_vars_learn`` method. 229 230 .. tip:: 231 The training procedure of HAPPO is three for loops. The outermost loop trains each agent separately. \ 232 The middle loop trains all the collected training samples with ``epoch_per_collect`` epochs. The inner \ 233 loop splits all the data into different mini-batch with the length of ``batch_size``. 234 235 .. note:: 236 The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ 237 For the data type that not supported, the main reason is that the corresponding model does not support it. \ 238 You can implement you own model rather than use the default model. For more information, please raise an \ 239 issue in GitHub repo and we will continue to follow up. 240 241 .. note:: 242 For more detailed examples, please refer to our unittest for HAPPOPolicy: ``ding.policy.tests.test_happo``. 243 """ 244 data = default_preprocess_learn(data, ignore_done=self._cfg.learn.ignore_done, use_nstep=False) 245 all_data_len = data['obs']['agent_state'].shape[0] 246 # fator is the ratio of the old and new strategies of the first m-1 agents, initialized to 1. 247 # Each transition has its own factor. ref: http://arxiv.org/abs/2109.11251 248 factor = torch.ones(all_data_len, 1) # (B, 1) 249 if self._cuda: 250 data = to_device(data, self._device) 251 factor = to_device(factor, self._device) 252 # process agent dim 253 data = self.prepocess_data_agent(data) 254 # ==================== 255 # PPO forward 256 # ==================== 257 return_infos = [] 258 self._learn_model.train() 259 260 for agent_id in range(self._cfg.agent_num): 261 agent_data = {} 262 for key, value in data.items(): 263 if value is not None: 264 if type(value) is dict: 265 agent_data[key] = {k: v[agent_id] for k, v in value.items()} # not feasible for rnn 266 elif len(value.shape) > 1: 267 agent_data[key] = data[key][agent_id] 268 else: 269 agent_data[key] = data[key] 270 else: 271 agent_data[key] = data[key] 272 273 # update factor 274 agent_data['factor'] = factor 275 # calculate old_logits of all data in buffer for later factor 276 inputs = { 277 'obs': agent_data['obs'], 278 # 'actor_prev_state': agent_data['actor_prev_state'], 279 # 'critic_prev_state': agent_data['critic_prev_state'], 280 } 281 old_logits = self._learn_model.forward(agent_id, inputs, mode='compute_actor')['logit'] 282 283 for epoch in range(self._cfg.learn.epoch_per_collect): 284 if self._recompute_adv: # calculate new value using the new updated value network 285 with torch.no_grad(): 286 inputs['obs'] = agent_data['obs'] 287 # value = self._learn_model.forward(agent_id, agent_data['obs'], mode='compute_critic')['value'] 288 value = self._learn_model.forward(agent_id, inputs, mode='compute_critic')['value'] 289 inputs['obs'] = agent_data['next_obs'] 290 next_value = self._learn_model.forward(agent_id, inputs, mode='compute_critic')['value'] 291 if self._value_norm: 292 value *= self._running_mean_std.std 293 next_value *= self._running_mean_std.std 294 295 traj_flag = agent_data.get('traj_flag', None) # traj_flag indicates termination of trajectory 296 compute_adv_data = gae_data( 297 value, next_value, agent_data['reward'], agent_data['done'], traj_flag 298 ) 299 agent_data['adv'] = gae(compute_adv_data, self._gamma, self._gae_lambda) 300 301 unnormalized_returns = value + agent_data['adv'] 302 303 if self._value_norm: 304 agent_data['value'] = value / self._running_mean_std.std 305 agent_data['return'] = unnormalized_returns / self._running_mean_std.std 306 self._running_mean_std.update(unnormalized_returns.cpu().numpy()) 307 else: 308 agent_data['value'] = value 309 agent_data['return'] = unnormalized_returns 310 311 else: # don't recompute adv 312 if self._value_norm: 313 unnormalized_return = agent_data['adv'] + agent_data['value'] * self._running_mean_std.std 314 agent_data['return'] = unnormalized_return / self._running_mean_std.std 315 self._running_mean_std.update(unnormalized_return.cpu().numpy()) 316 else: 317 agent_data['return'] = agent_data['adv'] + agent_data['value'] 318 319 for batch in split_data_generator(agent_data, self._cfg.learn.batch_size, shuffle=True): 320 inputs = { 321 'obs': batch['obs'], 322 # 'actor_prev_state': batch['actor_prev_state'], 323 # 'critic_prev_state': batch['critic_prev_state'], 324 } 325 output = self._learn_model.forward(agent_id, inputs, mode='compute_actor_critic') 326 adv = batch['adv'] 327 if self._adv_norm: 328 # Normalize advantage in a train_batch 329 adv = (adv - adv.mean()) / (adv.std() + 1e-8) 330 331 # Calculate happo error 332 if self._action_space == 'continuous': 333 happo_batch = happo_data( 334 output['logit'], batch['logit'], batch['action'], output['value'], batch['value'], adv, 335 batch['return'], batch['weight'], batch['factor'] 336 ) 337 happo_loss, happo_info = happo_error_continuous(happo_batch, self._clip_ratio) 338 elif self._action_space == 'discrete': 339 happo_batch = happo_data( 340 output['logit'], batch['logit'], batch['action'], output['value'], batch['value'], adv, 341 batch['return'], batch['weight'], batch['factor'] 342 ) 343 happo_loss, happo_info = happo_error(happo_batch, self._clip_ratio) 344 wv, we = self._value_weight, self._entropy_weight 345 total_loss = happo_loss.policy_loss + wv * happo_loss.value_loss - we * happo_loss.entropy_loss 346 347 # actor update 348 # critic update 349 self._actor_optimizer.zero_grad() 350 self._critic_optimizer.zero_grad() 351 total_loss.backward() 352 self._actor_optimizer.step() 353 self._critic_optimizer.step() 354 355 return_info = { 356 'agent{}_cur_lr'.format(agent_id): self._actor_optimizer.defaults['lr'], 357 'agent{}_total_loss'.format(agent_id): total_loss.item(), 358 'agent{}_policy_loss'.format(agent_id): happo_loss.policy_loss.item(), 359 'agent{}_value_loss'.format(agent_id): happo_loss.value_loss.item(), 360 'agent{}_entropy_loss'.format(agent_id): happo_loss.entropy_loss.item(), 361 'agent{}_adv_max'.format(agent_id): adv.max().item(), 362 'agent{}_adv_mean'.format(agent_id): adv.mean().item(), 363 'agent{}_value_mean'.format(agent_id): output['value'].mean().item(), 364 'agent{}_value_max'.format(agent_id): output['value'].max().item(), 365 'agent{}_approx_kl'.format(agent_id): happo_info.approx_kl, 366 'agent{}_clipfrac'.format(agent_id): happo_info.clipfrac, 367 } 368 if self._action_space == 'continuous': 369 return_info.update( 370 { 371 'agent{}_act'.format(agent_id): batch['action'].float().mean().item(), 372 'agent{}_mu_mean'.format(agent_id): output['logit']['mu'].mean().item(), 373 'agent{}_sigma_mean'.format(agent_id): output['logit']['sigma'].mean().item(), 374 } 375 ) 376 return_infos.append(return_info) 377 # calculate the factor 378 inputs = { 379 'obs': agent_data['obs'], 380 # 'actor_prev_state': agent_data['actor_prev_state'], 381 } 382 new_logits = self._learn_model.forward(agent_id, inputs, mode='compute_actor')['logit'] 383 if self._cfg.action_space == 'discrete': 384 dist_new = torch.distributions.categorical.Categorical(logits=new_logits) 385 dist_old = torch.distributions.categorical.Categorical(logits=old_logits) 386 elif self._cfg.action_space == 'continuous': 387 dist_new = Normal(new_logits['mu'], new_logits['sigma']) 388 dist_old = Normal(old_logits['mu'], old_logits['sigma']) 389 logp_new = dist_new.log_prob(agent_data['action']) 390 logp_old = dist_old.log_prob(agent_data['action']) 391 if len(logp_new.shape) > 1: 392 # for logp with shape(B, action_shape), we need to calculate the product of all action dimensions. 393 factor = factor * torch.prod( 394 torch.exp(logp_new - logp_old), dim=-1 395 ).reshape(all_data_len, 1).detach() # attention the shape 396 else: 397 # for logp with shape(B, ), directly calculate factor 398 factor = factor * torch.exp(logp_new - logp_old).reshape(all_data_len, 1).detach() 399 return return_infos 400 401 def _state_dict_learn(self) -> Dict[str, Any]: 402 """ 403 Overview: 404 Return the state_dict of learn mode optimizer and model. 405 Returns: 406 - state_dict (:obj:`Dict[str, Any]`): The dict of current policy learn mode. It contains the \ 407 state_dict of current policy network and optimizer. 408 """ 409 return { 410 'model': self._learn_model.state_dict(), 411 'actor_optimizer': self._actor_optimizer.state_dict(), 412 'critic_optimizer': self._critic_optimizer.state_dict(), 413 } 414 415 def _load_state_dict_learn(self, state_dict: Dict[str, Any]) -> None: 416 """ 417 Overview: 418 Load the state_dict of learn mode optimizer and model. 419 Arguments: 420 - state_dict (:obj:`Dict[str, Any]`): The dict of policy learn mode. It contains the state_dict \ 421 of current policy network and optimizer. 422 """ 423 self._learn_model.load_state_dict(state_dict['model']) 424 self._actor_optimizer.load_state_dict(state_dict['actor_optimizer']) 425 self._critic_optimizer.load_state_dict(state_dict['critic_optimizer']) 426 427 def _init_collect(self) -> None: 428 """ 429 Overview: 430 Initialize the collect mode of policy, including related attributes and modules. For HAPPO, it contains \ 431 the collect_model to balance the exploration and exploitation (e.g. the multinomial sample mechanism in \ 432 discrete action space), and other algorithm-specific arguments such as unroll_len and gae_lambda. 433 This method will be called in ``__init__`` method if ``collect`` field is in ``enable_field``. 434 435 .. note:: 436 If you want to set some spacial member variables in ``_init_collect`` method, you'd better name them \ 437 with prefix ``_collect_`` to avoid conflict with other modes, such as ``self._collect_attr1``. 438 439 .. tip:: 440 Some variables need to initialize independently in different modes, such as gamma and gae_lambda in PPO. \ 441 This design is for the convenience of parallel execution of different policy modes. 442 """ 443 self._unroll_len = self._cfg.collect.unroll_len 444 assert self._cfg.action_space in ["continuous", "discrete"] 445 self._action_space = self._cfg.action_space 446 if self._action_space == 'continuous': 447 self._collect_model = model_wrap(self._model, wrapper_name='reparam_sample') 448 elif self._action_space == 'discrete': 449 self._collect_model = model_wrap(self._model, wrapper_name='multinomial_sample') 450 self._collect_model.reset() 451 self._gamma = self._cfg.collect.discount_factor 452 self._gae_lambda = self._cfg.collect.gae_lambda 453 self._recompute_adv = self._cfg.recompute_adv 454 455 def _forward_collect(self, data: Dict[int, Any]) -> dict: 456 """ 457 Overview: 458 Policy forward function of collect mode (collecting training data by interacting with envs). Forward means \ 459 that the policy gets some necessary data (mainly observation) from the envs and then returns the output \ 460 data, such as the action to interact with the envs. 461 Arguments: 462 - data (:obj:`Dict[int, Any]`): The input data used for policy forward, including at least the obs. The \ 463 key of the dict is environment id and the value is the corresponding data of the env. 464 Returns: 465 - output (:obj:`Dict[int, Any]`): The output data of policy forward, including at least the action and \ 466 other necessary data (action logit and value) for learn mode defined in ``self._process_transition`` \ 467 method. The key of the dict is the same as the input data, i.e. environment id. 468 469 .. tip:: 470 If you want to add more tricks on this policy, like temperature factor in multinomial sample, you can pass \ 471 related data as extra keyword arguments of this method. 472 473 .. note:: 474 The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ 475 For the data type that not supported, the main reason is that the corresponding model does not support it. \ 476 You can implement you own model rather than use the default model. For more information, please raise an \ 477 issue in GitHub repo and we will continue to follow up. 478 479 .. note:: 480 For more detailed examples, please refer to our unittest for HAPPOPolicy: ``ding.policy.tests.test_happo``. 481 """ 482 data_id = list(data.keys()) 483 data = default_collate(list(data.values())) 484 if self._cuda: 485 data = to_device(data, self._device) 486 data = {k: v.transpose(0, 1) for k, v in data.items()} # not feasible for rnn 487 self._collect_model.eval() 488 with torch.no_grad(): 489 outputs = [] 490 for agent_id in range(self._cfg.agent_num): 491 # output = self._collect_model.forward(agent_id, data, mode='compute_actor_critic') 492 single_agent_obs = {k: v[agent_id] for k, v in data.items()} 493 input = { 494 'obs': single_agent_obs, 495 } 496 output = self._collect_model.forward(agent_id, input, mode='compute_actor_critic') 497 outputs.append(output) 498 # transfer data from (M, B, N)->(B, M, N) 499 result = {} 500 for key in outputs[0].keys(): 501 if isinstance(outputs[0][key], dict): 502 subkeys = outputs[0][key].keys() 503 stacked_subvalues = {} 504 for subkey in subkeys: 505 stacked_subvalues[subkey] = \ 506 torch.stack([output[key][subkey] for output in outputs], dim=0).transpose(0, 1) 507 result[key] = stacked_subvalues 508 else: 509 # If Value is tensor, stack it directly 510 if isinstance(outputs[0][key], torch.Tensor): 511 result[key] = torch.stack([output[key] for output in outputs], dim=0).transpose(0, 1) 512 else: 513 # If it is not tensor, assume that it is a non-stackable data type \ 514 # (such as int, float, etc.), and directly retain the original value 515 result[key] = [output[key] for output in outputs] 516 output = result 517 if self._cuda: 518 output = to_device(output, 'cpu') 519 output = default_decollate(output) 520 return {i: d for i, d in zip(data_id, output)} 521 522 def _process_transition(self, obs: Any, model_output: dict, timestep: namedtuple) -> dict: 523 """ 524 Overview: 525 Process and pack one timestep transition data into a dict, which can be directly used for training and \ 526 saved in replay buffer. For HAPPO, it contains obs, next_obs, action, reward, done, logit, value. 527 Arguments: 528 - obs (:obj:`torch.Tensor`): The env observation of current timestep. 529 - policy_output (:obj:`Dict[str, torch.Tensor]`): The output of the policy network with the observation \ 530 as input. For PPO, it contains the state value, action and the logit of the action. 531 - timestep (:obj:`namedtuple`): The execution result namedtuple returned by the environment step method, \ 532 except all the elements have been transformed into tensor data. Usually, it contains the next obs, \ 533 reward, done, info, etc. 534 Returns: 535 - transition (:obj:`Dict[str, torch.Tensor]`): The processed transition data of the current timestep. 536 537 .. note:: 538 ``next_obs`` is used to calculate nstep return when necessary, so we place in into transition by default. \ 539 You can delete this field to save memory occupancy if you do not need nstep return. 540 """ 541 transition = { 542 'obs': obs, 543 'next_obs': timestep.obs, 544 'action': model_output['action'], 545 'logit': model_output['logit'], 546 'value': model_output['value'], 547 'reward': timestep.reward, 548 'done': timestep.done, 549 } 550 return transition 551 552 def _get_train_sample(self, data: list) -> Union[None, List[Any]]: 553 """ 554 Overview: 555 For a given trajectory (transitions, a list of transition) data, process it into a list of sample that \ 556 can be used for training directly. In HAPPO, a train sample is a processed transition with new computed \ 557 ``traj_flag`` and ``adv`` field. This method is usually used in collectors to execute necessary \ 558 RL data preprocessing before training, which can help learner amortize revelant time consumption. \ 559 In addition, you can also implement this method as an identity function and do the data processing \ 560 in ``self._forward_learn`` method. 561 Arguments: 562 - transitions (:obj:`List[Dict[str, Any]`): The trajectory data (a list of transition), each element is \ 563 the same format as the return value of ``self._process_transition`` method. 564 Returns: 565 - samples (:obj:`List[Dict[str, Any]]`): The processed train samples, each element is the similar format \ 566 as input transitions, but may contain more data for training, such as GAE advantage. 567 """ 568 data = to_device(data, self._device) 569 for transition in data: 570 transition['traj_flag'] = copy.deepcopy(transition['done']) 571 data[-1]['traj_flag'] = True 572 573 if self._cfg.learn.ignore_done: 574 data[-1]['done'] = False 575 576 if data[-1]['done']: 577 last_value = torch.zeros_like(data[-1]['value']) 578 else: 579 with torch.no_grad(): 580 last_values = [] 581 for agent_id in range(self._cfg.agent_num): 582 inputs = {'obs': {k: unsqueeze(v[agent_id], 0) for k, v in data[-1]['next_obs'].items()}} 583 last_value = self._collect_model.forward(agent_id, inputs, mode='compute_actor_critic')['value'] 584 last_values.append(last_value) 585 last_value = torch.cat(last_values) 586 if len(last_value.shape) == 2: # multi_agent case: 587 last_value = last_value.squeeze(0) 588 if self._value_norm: 589 last_value *= self._running_mean_std.std 590 for i in range(len(data)): 591 data[i]['value'] *= self._running_mean_std.std 592 data = get_gae( 593 data, 594 to_device(last_value, self._device), 595 gamma=self._gamma, 596 gae_lambda=self._gae_lambda, 597 cuda=False, 598 ) 599 if self._value_norm: 600 for i in range(len(data)): 601 data[i]['value'] /= self._running_mean_std.std 602 603 # remove next_obs for save memory when not recompute adv 604 if not self._recompute_adv: 605 for i in range(len(data)): 606 data[i].pop('next_obs') 607 return get_train_sample(data, self._unroll_len) 608 609 def _init_eval(self) -> None: 610 """ 611 Overview: 612 Initialize the eval mode of policy, including related attributes and modules. For PPO, it contains the \ 613 eval model to select optimial action (e.g. greedily select action with argmax mechanism in discrete action). 614 This method will be called in ``__init__`` method if ``eval`` field is in ``enable_field``. 615 616 .. note:: 617 If you want to set some spacial member variables in ``_init_eval`` method, you'd better name them \ 618 with prefix ``_eval_`` to avoid conflict with other modes, such as ``self._eval_attr1``. 619 """ 620 assert self._cfg.action_space in ["continuous", "discrete"] 621 self._action_space = self._cfg.action_space 622 if self._action_space == 'continuous': 623 self._eval_model = model_wrap(self._model, wrapper_name='deterministic_sample') 624 elif self._action_space == 'discrete': 625 self._eval_model = model_wrap(self._model, wrapper_name='argmax_sample') 626 self._eval_model.reset() 627 628 def _forward_eval(self, data: dict) -> dict: 629 """ 630 Overview: 631 Policy forward function of eval mode (evaluation policy performance by interacting with envs). Forward \ 632 means that the policy gets some necessary data (mainly observation) from the envs and then returns the \ 633 action to interact with the envs. ``_forward_eval`` in HAPPO often uses deterministic sample method to \ 634 get actions while ``_forward_collect`` usually uses stochastic sample method for balance exploration and \ 635 exploitation. 636 Arguments: 637 - data (:obj:`Dict[int, Any]`): The input data used for policy forward, including at least the obs. The \ 638 key of the dict is environment id and the value is the corresponding data of the env. 639 Returns: 640 - output (:obj:`Dict[int, Any]`): The output data of policy forward, including at least the action. The \ 641 key of the dict is the same as the input data, i.e. environment id. 642 643 .. note:: 644 The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ 645 For the data type that not supported, the main reason is that the corresponding model does not support it. \ 646 You can implement you own model rather than use the default model. For more information, please raise an \ 647 issue in GitHub repo and we will continue to follow up. 648 649 .. note:: 650 For more detailed examples, please refer to our unittest for HAPPOPolicy: ``ding.policy.tests.test_happo``. 651 """ 652 data_id = list(data.keys()) 653 data = default_collate(list(data.values())) 654 if self._cuda: 655 data = to_device(data, self._device) 656 # transfer data from (B, M, N)->(M, B, N) 657 data = {k: v.transpose(0, 1) for k, v in data.items()} # not feasible for rnn 658 self._eval_model.eval() 659 with torch.no_grad(): 660 outputs = [] 661 for agent_id in range(self._cfg.agent_num): 662 single_agent_obs = {k: v[agent_id] for k, v in data.items()} 663 input = { 664 'obs': single_agent_obs, 665 } 666 output = self._eval_model.forward(agent_id, input, mode='compute_actor') 667 outputs.append(output) 668 output = self.revert_agent_data(outputs) 669 if self._cuda: 670 output = to_device(output, 'cpu') 671 output = default_decollate(output) 672 return {i: d for i, d in zip(data_id, output)} 673 674 def default_model(self) -> Tuple[str, List[str]]: 675 """ 676 Overview: 677 Return this algorithm default neural network model setting for demonstration. ``__init__`` method will \ 678 automatically call this method to get the default model setting and create model. 679 Returns: 680 - model_info (:obj:`Tuple[str, List[str]]`): The registered model name and model's import_names. 681 682 .. note:: 683 The user can define and use customized network model but must obey the same inferface definition indicated \ 684 by import_names path. For example about HAPPO, its registered name is ``happo`` and the import_names is \ 685 ``ding.model.template.havac``. 686 """ 687 return 'havac', ['ding.model.template.havac'] 688 689 def _monitor_vars_learn(self) -> List[str]: 690 """ 691 Overview: 692 Return the necessary keys for logging the return dict of ``self._forward_learn``. The logger module, such \ 693 as text logger, tensorboard logger, will use these keys to save the corresponding data. 694 Returns: 695 - necessary_keys (:obj:`List[str]`): The list of the necessary keys to be logged. 696 """ 697 variables = super()._monitor_vars_learn() + [ 698 'policy_loss', 699 'value_loss', 700 'entropy_loss', 701 'adv_max', 702 'adv_mean', 703 'approx_kl', 704 'clipfrac', 705 'value_max', 706 'value_mean', 707 ] 708 if self._action_space == 'continuous': 709 variables += ['mu_mean', 'sigma_mean', 'sigma_grad', 'act'] 710 prefixes = [f'agent{i}_' for i in range(self._cfg.agent_num)] 711 variables = [prefix + var for prefix in prefixes for var in variables] 712 return variables 713 714 def revert_agent_data(self, data: list): 715 """ 716 Overview: 717 Revert the data of each agent to the original data format. 718 Arguments: 719 - data (:obj:`list`): List type data, where each element is the data of an agent of dict type. 720 Returns: 721 - ret (:obj:`dict`): Dict type data, where each element is the data of an agent of dict type. 722 """ 723 ret = {} 724 # Traverse all keys of the first output 725 for key in data[0].keys(): 726 if isinstance(data[0][key], torch.Tensor): 727 # If the value corresponding to the current key is tensor, stack N tensors 728 stacked_tensor = torch.stack([output[key] for output in data], dim=0) 729 ret[key] = stacked_tensor.transpose(0, 1) 730 elif isinstance(data[0][key], dict): 731 # If the value corresponding to the current key is a dictionary, recursively \ 732 # call the function to process the contents inside the dictionary. 733 ret[key] = self.revert_agent_data([output[key] for output in data]) 734 return ret