Skip to content

ding.worker.coordinator.solo_parallel_commander

ding.worker.coordinator.solo_parallel_commander

SoloCommander

Bases: BaseCommander

Overview

Parallel commander for solo games.

Interface: init, get_collector_task, get_learner_task, finish_collector_task, finish_learner_task, notify_fail_collector_task, notify_fail_learner_task, update_learner_info

__init__(cfg)

Overview

Init the solo commander according to config.

Arguments: - cfg (:obj:dict): Dict type config file.

get_collector_task()

Overview

Return the new collector task when there is residual task space; Otherwise return None.

Return: - task (:obj:Optional[dict]): New collector task.

get_learner_task()

Overview

Return the new learner task when there is residual task space; Otherwise return None.

Return: - task (:obj:Optional[dict]): New learner task.

finish_collector_task(task_id, finished_task)

Overview

Get collector's finish_task_info and release collector_task_space. If collector's task is evaluation, judge the convergence and return it.

Arguments: - task_id (:obj:str): the collector task_id - finished_task (:obj:dict): the finished task Returns: - convergence (:obj:bool): Whether the stop val is reached and the algorithm is converged. \ If True, the pipeline can be finished.

finish_learner_task(task_id, finished_task)

Overview

Get learner's finish_task_info, release learner_task_space, reset corresponding variables.

Arguments: - task_id (:obj:str): Learner task_id - finished_task (:obj:dict): Learner's finish_learn_info. Returns: - buffer_id (:obj:str): Buffer id of the finished learner.

notify_fail_collector_task(task)

Overview

Release task space when collector task fails.

notify_fail_learner_task(task)

Overview

Release task space when learner task fails.

update_learner_info(task_id, info)

Overview

Append the info to learner_info:

Arguments: - task_id (:obj:str): Learner task_id - info (:obj:dict): Dict type learner info.

increase_collector_task_space()

" Overview: Increase task space when a new collector has added dynamically.

decrease_collector_task_space()

" Overview: Decrease task space when a new collector has removed dynamically.

Full Source Code

../ding/worker/coordinator/solo_parallel_commander.py

1from typing import Optional 2import time 3import copy 4 5from ding.policy import create_policy 6from ding.utils import LimitedSpaceContainer, get_task_uid, build_logger, COMMANDER_REGISTRY 7from .base_parallel_commander import BaseCommander 8 9 10@COMMANDER_REGISTRY.register('solo') 11class SoloCommander(BaseCommander): 12 r""" 13 Overview: 14 Parallel commander for solo games. 15 Interface: 16 __init__, get_collector_task, get_learner_task, finish_collector_task, finish_learner_task, 17 notify_fail_collector_task, notify_fail_learner_task, update_learner_info 18 """ 19 config = dict( 20 collector_task_space=1, 21 learner_task_space=1, 22 eval_interval=60, 23 ) 24 25 def __init__(self, cfg: dict) -> None: 26 r""" 27 Overview: 28 Init the solo commander according to config. 29 Arguments: 30 - cfg (:obj:`dict`): Dict type config file. 31 """ 32 self._cfg = cfg 33 self._exp_name = cfg.exp_name 34 commander_cfg = self._cfg.policy.other.commander 35 self._commander_cfg = commander_cfg 36 37 self._collector_env_cfg = copy.deepcopy(self._cfg.env) 38 self._collector_env_cfg.pop('collector_episode_num') 39 self._collector_env_cfg.pop('evaluator_episode_num') 40 self._collector_env_cfg.manager.episode_num = self._cfg.env.collector_episode_num 41 self._evaluator_env_cfg = copy.deepcopy(self._cfg.env) 42 self._evaluator_env_cfg.pop('collector_episode_num') 43 self._evaluator_env_cfg.pop('evaluator_episode_num') 44 self._evaluator_env_cfg.manager.episode_num = self._cfg.env.evaluator_episode_num 45 46 self._collector_task_space = LimitedSpaceContainer(0, commander_cfg.collector_task_space) 47 self._learner_task_space = LimitedSpaceContainer(0, commander_cfg.learner_task_space) 48 self._learner_info = [{'learner_step': 0}] 49 # TODO(nyz) accumulate collect info 50 self._collector_info = [] 51 self._total_collector_env_step = 0 52 self._evaluator_info = [] 53 self._current_buffer_id = None 54 self._current_policy_id = None 55 self._last_eval_time = 0 56 # policy_cfg must be deepcopyed 57 policy_cfg = copy.deepcopy(self._cfg.policy) 58 self._policy = create_policy(policy_cfg, enable_field=['command']).command_mode 59 self._logger, self._tb_logger = build_logger( 60 "./{}/log/commander".format(self._exp_name), "commander", need_tb=True 61 ) 62 self._collector_logger, _ = build_logger( 63 "./{}/log/commander".format(self._exp_name), "commander_collector", need_tb=False 64 ) 65 self._evaluator_logger, _ = build_logger( 66 "./{}/log/commander".format(self._exp_name), "commander_evaluator", need_tb=False 67 ) 68 self._sub_logger = { 69 'collector': self._collector_logger, 70 'evaluator': self._evaluator_logger, 71 } 72 self._end_flag = False 73 74 def get_collector_task(self) -> Optional[dict]: 75 r""" 76 Overview: 77 Return the new collector task when there is residual task space; Otherwise return None. 78 Return: 79 - task (:obj:`Optional[dict]`): New collector task. 80 """ 81 if self._end_flag: 82 return None 83 if self._collector_task_space.acquire_space(): 84 if self._current_buffer_id is None or self._current_policy_id is None: 85 self._collector_task_space.release_space() 86 return None 87 cur_time = time.time() 88 if cur_time - self._last_eval_time > self._commander_cfg.eval_interval: 89 eval_flag = True 90 self._last_eval_time = time.time() 91 else: 92 eval_flag = False 93 collector_cfg = copy.deepcopy(self._cfg.policy.collect.collector) 94 # the newest info 95 info = self._learner_info[-1] 96 info['envstep'] = self._total_collector_env_step 97 collector_cfg.collect_setting = self._policy.get_setting_collect(info) 98 collector_cfg.policy_update_path = self._current_policy_id 99 collector_cfg.eval_flag = eval_flag 100 collector_cfg.policy = copy.deepcopy(self._cfg.policy) 101 collector_cfg.exp_name = self._exp_name 102 if eval_flag: 103 collector_cfg.env = self._evaluator_env_cfg 104 else: 105 collector_cfg.env = self._collector_env_cfg 106 return { 107 'task_id': 'collector_task_{}'.format(get_task_uid()), 108 'buffer_id': self._current_buffer_id, 109 'collector_cfg': collector_cfg, 110 } 111 else: 112 return None 113 114 def get_learner_task(self) -> Optional[dict]: 115 r""" 116 Overview: 117 Return the new learner task when there is residual task space; Otherwise return None. 118 Return: 119 - task (:obj:`Optional[dict]`): New learner task. 120 """ 121 if self._end_flag: 122 return None 123 if self._learner_task_space.acquire_space(): 124 learner_cfg = copy.deepcopy(self._cfg.policy.learn.learner) 125 learner_cfg.exp_name = self._exp_name 126 return { 127 'task_id': 'learner_task_{}'.format(get_task_uid()), 128 'policy_id': self._init_policy_id(), 129 'buffer_id': self._init_buffer_id(), 130 'learner_cfg': learner_cfg, 131 'replay_buffer_cfg': copy.deepcopy(self._cfg.policy.other.replay_buffer), 132 'policy': copy.deepcopy(self._cfg.policy), 133 } 134 else: 135 return None 136 137 def finish_collector_task(self, task_id: str, finished_task: dict) -> bool: 138 r""" 139 Overview: 140 Get collector's finish_task_info and release collector_task_space. 141 If collector's task is evaluation, judge the convergence and return it. 142 Arguments: 143 - task_id (:obj:`str`): the collector task_id 144 - finished_task (:obj:`dict`): the finished task 145 Returns: 146 - convergence (:obj:`bool`): Whether the stop val is reached and the algorithm is converged. \ 147 If True, the pipeline can be finished. 148 """ 149 self._collector_task_space.release_space() 150 evaluator_or_collector = "evaluator" if finished_task['eval_flag'] else "collector" 151 train_iter = finished_task['train_iter'] 152 info = { 153 'train_iter': train_iter, 154 'episode_count': finished_task['real_episode_count'], 155 'step_count': finished_task['step_count'], 156 'avg_step_per_episode': finished_task['avg_time_per_episode'], 157 'avg_time_per_step': finished_task['avg_time_per_step'], 158 'avg_time_per_episode': finished_task['avg_step_per_episode'], 159 'reward_mean': finished_task['reward_mean'], 160 'reward_std': finished_task['reward_std'], 161 } 162 self._sub_logger[evaluator_or_collector].info( 163 "[{}] Task ends:\n{}".format( 164 evaluator_or_collector.upper(), '\n'.join(['{}: {}'.format(k, v) for k, v in info.items()]) 165 ) 166 ) 167 for k, v in info.items(): 168 if k in ['train_iter']: 169 continue 170 self._tb_logger.add_scalar('{}_iter/'.format(evaluator_or_collector) + k, v, train_iter) 171 self._tb_logger.add_scalar('{}_step/'.format(evaluator_or_collector) + k, v, self._total_collector_env_step) 172 if finished_task['eval_flag']: 173 self._evaluator_info.append(finished_task) 174 eval_stop_value = self._cfg.env.stop_value 175 if eval_stop_value is not None and finished_task['reward_mean'] >= eval_stop_value: 176 self._logger.info( 177 "[DI-engine parallel pipeline] current episode_return: {} is greater than the stop_value: {}". 178 format(finished_task['reward_mean'], eval_stop_value) + ", so the total training program is over." 179 ) 180 self._end_flag = True 181 return True 182 else: 183 self._collector_info.append(finished_task) 184 self._total_collector_env_step += finished_task['step_count'] 185 return False 186 187 def finish_learner_task(self, task_id: str, finished_task: dict) -> str: 188 r""" 189 Overview: 190 Get learner's finish_task_info, release learner_task_space, reset corresponding variables. 191 Arguments: 192 - task_id (:obj:`str`): Learner task_id 193 - finished_task (:obj:`dict`): Learner's finish_learn_info. 194 Returns: 195 - buffer_id (:obj:`str`): Buffer id of the finished learner. 196 """ 197 self._learner_task_space.release_space() 198 buffer_id = finished_task['buffer_id'] 199 self._current_buffer_id = None 200 self._current_policy_id = None 201 self._learner_info = [{'learner_step': 0}] 202 self._evaluator_info = [] 203 self._last_eval_time = 0 204 return buffer_id 205 206 def notify_fail_collector_task(self, task: dict) -> None: 207 r""" 208 Overview: 209 Release task space when collector task fails. 210 """ 211 self._collector_task_space.release_space() 212 213 def notify_fail_learner_task(self, task: dict) -> None: 214 r""" 215 Overview: 216 Release task space when learner task fails. 217 """ 218 self._learner_task_space.release_space() 219 220 def update_learner_info(self, task_id: str, info: dict) -> None: 221 r""" 222 Overview: 223 Append the info to learner_info: 224 Arguments: 225 - task_id (:obj:`str`): Learner task_id 226 - info (:obj:`dict`): Dict type learner info. 227 """ 228 self._learner_info.append(info) 229 230 def _init_policy_id(self) -> str: 231 r""" 232 Overview: 233 Init the policy id and return it. 234 Returns: 235 - policy_id (:obj:`str`): New initialized policy id. 236 """ 237 policy_id = 'policy_{}'.format(get_task_uid()) 238 self._current_policy_id = policy_id 239 return policy_id 240 241 def _init_buffer_id(self) -> str: 242 r""" 243 Overview: 244 Init the buffer id and return it. 245 Returns: 246 - buffer_id (:obj:`str`): New initialized buffer id. 247 """ 248 buffer_id = 'buffer_{}'.format(get_task_uid()) 249 self._current_buffer_id = buffer_id 250 return buffer_id 251 252 def increase_collector_task_space(self): 253 r"""" 254 Overview: 255 Increase task space when a new collector has added dynamically. 256 """ 257 self._collector_task_space.increase_space() 258 259 def decrease_collector_task_space(self): 260 r"""" 261 Overview: 262 Decrease task space when a new collector has removed dynamically. 263 """ 264 self._collector_task_space.decrease_space()