Skip to content

ding.envs.env_manager.base_env_manager

ding.envs.env_manager.base_env_manager

BaseEnvManager

Bases: object

Overview

The basic class of env manager to manage multiple vectorized environments. BaseEnvManager define all the necessary interfaces and derived class must extend this basic class.

The class is implemented by the pseudo-parallelism (i.e. serial) mechanism, therefore, this class is only used in some tiny environments and for debug purpose.

Interfaces: reset, step, seed, close, enable_save_replay, launch, default_config, reward_shaping, enable_save_figure Properties: env_num, env_ref, ready_obs, ready_obs_id, ready_imgs, done, closed, method_name_list, observation_space, action_space, reward_space

env_num property

Overview

env_num is the number of sub-environments in env manager.

Returns: - env_num (:obj:int): The number of sub-environments.

env_ref property

Overview

env_ref is used to acquire some common attributes of env, like obs_shape and act_shape.

Returns: - env_ref (:obj:BaseEnv): The reference of sub-environment.

observation_space property

Overview

observation_space is the observation space of sub-environment, following the format of gym.spaces.

Returns: - observation_space (:obj:gym.spaces.Space): The observation space of sub-environment.

action_space property

Overview

action_space is the action space of sub-environment, following the format of gym.spaces.

Returns: - action_space (:obj:gym.spaces.Space): The action space of sub-environment.

reward_space property

Overview

reward_space is the reward space of sub-environment, following the format of gym.spaces.

Returns: - reward_space (:obj:gym.spaces.Space): The reward space of sub-environment.

ready_obs property

Overview

Get the ready (next) observation, which is a special design to unify both aysnc/sync env manager. For each interaction between policy and env, the policy will input the ready_obs and output the action. Then the env_manager will step with the action and prepare the next ready_obs.

Returns: - ready_obs (:obj:Dict[int, Any]): A dict with env_id keys and observation values. Example: >>> obs = env_manager.ready_obs >>> stacked_obs = np.concatenate(list(obs.values())) >>> action = policy(obs) # here policy inputs np obs and outputs np action >>> action = {env_id: a for env_id, a in zip(obs.keys(), action)} >>> timesteps = env_manager.step(action)

ready_obs_id property

Overview

Get the ready (next) observation id, which is a special design to unify both aysnc/sync env manager.

Returns: - ready_obs_id (:obj:List[int]): A list of env_ids for ready observations.

ready_imgs property

Overview

Sometimes, we need to render the envs, this function is used to get the next ready renderd frame and corresponding env id.

Arguments: - render_mode (:obj:Optional[str]): The render mode, can be 'rgb_array' or 'depth_array', which follows the definition in the render function of ding.utils . Returns: - ready_imgs (:obj:Dict[int, np.ndarray]): A dict with env_id keys and rendered frames.

done property

Overview

done is a flag to indicate whether env manager is done, i.e., whether all sub-environments have executed enough episodes.

Returns: - done (:obj:bool): Whether env manager is done.

method_name_list property

Overview

The public methods list of sub-environments that can be directly called from the env manager level. Other methods and attributes will be accessed with the __getattr__ method. Methods defined in this list can be regarded as the vectorized extension of methods in sub-environments. Sub-class of BaseEnvManager can override this method to add more methods.

Returns: - method_name_list (:obj:list): The public methods list of sub-environments.

closed property

Overview

closed is a property that returns whether the env manager is closed.

Returns: - closed (:obj:bool): Whether the env manager is closed.

default_config() classmethod

Overview

Return the deepcopyed default config of env manager.

Returns: - cfg (:obj:EasyDict): The default config of env manager.

__init__(env_fn, cfg=EasyDict({}))

Overview

Initialize the base env manager with callable the env function and the EasyDict-type config. Here we use env_fn to ensure the lazy initialization of sub-environments, which is benetificial to resource allocation and parallelism. cfg is the merged result between the default config of this class and user's config. This construction function is in lazy-initialization mode, the actual initialization is in launch.

Arguments: - env_fn (:obj:List[Callable]): A list of functions to create env_num sub-environments. - cfg (:obj:EasyDict): Final merged config.

.. note:: For more details about how to merge config, please refer to the system document of DI-engine (en link1 <../03_system/config.html>_).

__getattr__(key)

Note

If a python object doesn't have the attribute whose name is key, it will call this method. We suppose that all envs have the same attributes. If you need different envs, please implement other env managers.

launch(reset_param=None)

Overview

Launch the env manager, instantiate the sub-environments and set up the environments and their parameters.

Arguments: - reset_param (:obj:Optional[Dict]): A dict of reset parameters for each environment, key is the env_id, value is the corresponding reset parameter, defaults to None.

reset(reset_param=None)

Overview

Forcely reset the sub-environments their corresponding parameters. Because in env manager all the sub-environments usually are reset automatically as soon as they are done, this method is only called when the caller must forcely reset all the sub-environments, such as in evaluation.

Arguments: - reset_param (:obj:List): Dict of reset parameters for each environment, key is the env_id, value is the corresponding reset parameters.

step(actions)

Overview

Execute env step according to input actions. If some sub-environments are done after this execution, they will be reset automatically when self._auto_reset is True, otherwise they need to be reset when the caller use the reset method of env manager.

Arguments: - actions (:obj:Dict[int, Any]): A dict of actions, key is the env_id, value is corresponding action. action can be any type, it depends on the env, and the env will handle it. Ususlly, the action is a dict of numpy array, and the value is generated by the outer caller like policy. Returns: - timesteps (:obj:Dict[int, BaseEnvTimestep]): Each timestep is a BaseEnvTimestep object, usually including observation, reward, done, info. Some special customized environments will have the special timestep definition. The length of timesteps is the same as the length of actions in synchronous env manager. Example: >>> timesteps = env_manager.step(action) >>> for env_id, timestep in enumerate(timesteps): >>> if timestep.done: >>> print('Env {} is done'.format(env_id))

seed(seed, dynamic_seed=None)

Overview

Set the random seed for each environment.

Arguments: - seed (:obj:Union[Dict[int, int], List[int], int]): Dict or List of seeds for each environment; If only one seed is provided, it will be used in the same way for all environments. - dynamic_seed (:obj:bool): Whether to use dynamic seed.

.. note:: For more details about dynamic_seed, please refer to the best practice document of DI-engine (en link2 <../04_best_practice/random_seed.html>_).

enable_save_replay(replay_path)

Overview

Enable all environments to save replay video after each episode terminates.

Arguments: - replay_path (:obj:Union[List[str], str]): List of paths for each environment; Or one path for all environments.

enable_save_figure(env_id, figure_path)

Overview

Enable a specific env to save figure (e.g. environment statistics or episode return curve).

Arguments: - figure_path (:obj:str): The file directory path for all environments to save figures.

close()

Overview

Close the env manager and release all the environment resources.

reward_shaping(env_id, transitions)

Overview

Execute reward shaping for a specific environment, which is often called when a episode terminates.

Arguments: - env_id (:obj:int): The id of the environment to be shaped. - transitions (:obj:List[dict]): The transition data list of the environment to be shaped. Returns: - transitions (:obj:List[dict]): The shaped transition data list.

BaseEnvManagerV2

Bases: BaseEnvManager

Overview

The basic class of env manager to manage multiple vectorized environments. BaseEnvManager define all the necessary interfaces and derived class must extend this basic class.

The class is implemented by the pseudo-parallelism (i.e. serial) mechanism, therefore, this class is only used in some tiny environments and for debug purpose.

V2 means this env manager is designed for new task pipeline and interfaces coupled with treetensor.`

.. note:: For more details about new task pipeline, please refer to the system document of DI-engine (system en link3 <../03_system/index.html>_).

Interfaces

reset, step, seed, close, enable_save_replay, launch, default_config, reward_shaping, enable_save_figure

Properties: env_num, env_ref, ready_obs, ready_obs_id, ready_imgs, done, closed, method_name_list, observation_space, action_space, reward_space

ready_obs property

Overview

Get the ready (next) observation, which is a special design to unify both aysnc/sync env manager. For each interaction between policy and env, the policy will input the ready_obs and output the action. Then the env_manager will step with the action and prepare the next ready_obs. For V2 version, the observation is transformed and packed up into tnp.array type, which allows more convenient operations.

Return: - ready_obs (:obj:tnp.array): A stacked treenumpy-type observation data. Example: >>> obs = env_manager.ready_obs >>> action = policy(obs) # here policy inputs treenp obs and output np action >>> timesteps = env_manager.step(action)

step(actions)

Overview

Execute env step according to input actions. If some sub-environments are done after this execution, they will be reset automatically by default.

Arguments: - actions (:obj:List[tnp.ndarray]): A list of treenumpy-type actions, the value is generated by the outer caller like policy. Returns: - timesteps (:obj:List[tnp.ndarray]): A list of timestep, Each timestep is a tnp.ndarray object, usually including observation, reward, done, info, env_id. Some special environments will have the special timestep definition. The length of timesteps is the same as the length of actions in synchronous env manager. For the compatibility of treenumpy, here we use make_key_as_identifier and remove_illegal_item functions to modify the original timestep. Example: >>> timesteps = env_manager.step(action) >>> for timestep in timesteps: >>> if timestep.done: >>> print('Env {} is done'.format(timestep.env_id))

timeout_wrapper(func=None, timeout=None)

Overview

Watch the function that must be finihsed within a period of time. If timeout, raise the captured error.

create_env_manager(manager_cfg, env_fn)

Overview

Create an env manager according to manager_cfg and env functions.

Arguments: - manager_cfg (:obj:EasyDict): Final merged env manager config. - env_fn (:obj:List[Callable]): A list of functions to create env_num sub-environments. ArgumentsKeys: - type (:obj:str): Env manager type set in ENV_MANAGER_REGISTRY.register , such as base . - import_names (:obj:List[str]): A list of module names (paths) to import before creating env manager, such as ding.envs.env_manager.base_env_manager . Returns: - env_manager (:obj:BaseEnvManager): The created env manager.

.. tip:: This method will not modify the manager_cfg , it will deepcopy the manager_cfg and then modify it.

get_env_manager_cls(cfg)

Overview

Get the env manager class according to config, which is used to access related class variables/methods.

Arguments: - manager_cfg (:obj:EasyDict): Final merged env manager config. ArgumentsKeys: - type (:obj:str): Env manager type set in ENV_MANAGER_REGISTRY.register , such as base . - import_names (:obj:List[str]): A list of module names (paths) to import before creating env manager, such as ding.envs.env_manager.base_env_manager . Returns: - env_manager_cls (:obj:type): The corresponding env manager class.

Full Source Code

../ding/envs/env_manager/base_env_manager.py

1from types import MethodType 2from typing import Union, Any, List, Callable, Dict, Optional, Tuple 3from functools import partial, wraps 4from easydict import EasyDict 5from ditk import logging 6import copy 7import platform 8import numbers 9import enum 10import time 11import treetensor.numpy as tnp 12from ding.utils import ENV_MANAGER_REGISTRY, import_module, one_time_warning, make_key_as_identifier, WatchDog, \ 13 remove_illegal_item 14from ding.envs import BaseEnv, BaseEnvTimestep 15 16global space_log_flag 17space_log_flag = True 18 19 20class EnvState(enum.IntEnum): 21 VOID = 0 22 INIT = 1 23 RUN = 2 24 RESET = 3 25 DONE = 4 26 ERROR = 5 27 NEED_RESET = 6 28 29 30def timeout_wrapper(func: Callable = None, timeout: Optional[int] = None) -> Callable: 31 """ 32 Overview: 33 Watch the function that must be finihsed within a period of time. If timeout, raise the captured error. 34 """ 35 if func is None: 36 return partial(timeout_wrapper, timeout=timeout) 37 if timeout is None: 38 return func 39 40 windows_flag = platform.system().lower() == 'windows' 41 if windows_flag: 42 one_time_warning("Timeout wrapper is not implemented in windows platform, so ignore it default") 43 return func 44 45 @wraps(func) 46 def wrapper(*args, **kwargs): 47 watchdog = WatchDog(timeout) 48 try: 49 watchdog.start() 50 except ValueError as e: 51 # watchdog invalid case 52 return func(*args, **kwargs) 53 try: 54 return func(*args, **kwargs) 55 except BaseException as e: 56 raise e 57 finally: 58 watchdog.stop() 59 60 return wrapper 61 62 63@ENV_MANAGER_REGISTRY.register('base') 64class BaseEnvManager(object): 65 """ 66 Overview: 67 The basic class of env manager to manage multiple vectorized environments. BaseEnvManager define all the 68 necessary interfaces and derived class must extend this basic class. 69 70 The class is implemented by the pseudo-parallelism (i.e. serial) mechanism, therefore, this class is only 71 used in some tiny environments and for debug purpose. 72 Interfaces: 73 reset, step, seed, close, enable_save_replay, launch, default_config, reward_shaping, enable_save_figure 74 Properties: 75 env_num, env_ref, ready_obs, ready_obs_id, ready_imgs, done, closed, method_name_list, observation_space, \ 76 action_space, reward_space 77 """ 78 79 @classmethod 80 def default_config(cls: type) -> EasyDict: 81 """ 82 Overview: 83 Return the deepcopyed default config of env manager. 84 Returns: 85 - cfg (:obj:`EasyDict`): The default config of env manager. 86 """ 87 cfg = EasyDict(copy.deepcopy(cls.config)) 88 cfg.cfg_type = cls.__name__ + 'Dict' 89 return cfg 90 91 config = dict( 92 # (int) The total episode number to be executed, defaults to inf, which means no episode limits. 93 episode_num=float("inf"), 94 # (int) The maximum retry times when the env is in error state, defaults to 1, i.e. no retry. 95 max_retry=1, 96 # (str) The retry type when the env is in error state, including ['reset', 'renew'], defaults to 'reset'. 97 # The former is to reset the env to the last reset state, while the latter is to create a new env. 98 retry_type='reset', 99 # (bool) Whether to automatically reset sub-environments when they are done, defaults to True. 100 auto_reset=True, 101 # (float) WatchDog timeout (second) for ``step`` method, defaults to None, which means no timeout. 102 step_timeout=None, 103 # (float) WatchDog timeout (second) for ``reset`` method, defaults to None, which means no timeout. 104 reset_timeout=None, 105 # (float) The interval waiting time for automatically retry mechanism, defaults to 0.1. 106 retry_waiting_time=0.1, 107 ) 108 109 def __init__( 110 self, 111 env_fn: List[Callable], 112 cfg: EasyDict = EasyDict({}), 113 ) -> None: 114 """ 115 Overview: 116 Initialize the base env manager with callable the env function and the EasyDict-type config. Here we use 117 ``env_fn`` to ensure the lazy initialization of sub-environments, which is benetificial to resource 118 allocation and parallelism. ``cfg`` is the merged result between the default config of this class 119 and user's config. 120 This construction function is in lazy-initialization mode, the actual initialization is in ``launch``. 121 Arguments: 122 - env_fn (:obj:`List[Callable]`): A list of functions to create ``env_num`` sub-environments. 123 - cfg (:obj:`EasyDict`): Final merged config. 124 125 .. note:: 126 For more details about how to merge config, please refer to the system document of DI-engine \ 127 (`en link1 <../03_system/config.html>`_). 128 """ 129 self._cfg = cfg 130 self._env_fn = env_fn 131 self._env_num = len(self._env_fn) 132 self._closed = True 133 self._env_replay_path = None 134 # env_ref is used to acquire some common attributes of env, like obs_shape and act_shape 135 self._env_ref = self._env_fn[0]() 136 try: 137 self._observation_space = self._env_ref.observation_space 138 self._action_space = self._env_ref.action_space 139 self._reward_space = self._env_ref.reward_space 140 except: 141 # For some environment, 142 # we have to reset before getting observation description. 143 # However, for dmc-mujoco, we should not reset the env at the main thread, 144 # when using in a subprocess mode, which would cause opengl rendering bugs, 145 # leading to no response subprocesses. 146 self._env_ref.reset() 147 self._observation_space = self._env_ref.observation_space 148 self._action_space = self._env_ref.action_space 149 self._reward_space = self._env_ref.reward_space 150 self._env_ref.close() 151 self._env_states = {i: EnvState.VOID for i in range(self._env_num)} 152 self._env_seed = {i: None for i in range(self._env_num)} 153 self._episode_num = self._cfg.episode_num 154 self._max_retry = max(self._cfg.max_retry, 1) 155 self._auto_reset = self._cfg.auto_reset 156 self._retry_type = self._cfg.retry_type 157 assert self._retry_type in ['reset', 'renew'], self._retry_type 158 self._step_timeout = self._cfg.step_timeout 159 self._reset_timeout = self._cfg.reset_timeout 160 self._retry_waiting_time = self._cfg.retry_waiting_time 161 162 @property 163 def env_num(self) -> int: 164 """ 165 Overview: 166 ``env_num`` is the number of sub-environments in env manager. 167 Returns: 168 - env_num (:obj:`int`): The number of sub-environments. 169 """ 170 return self._env_num 171 172 @property 173 def env_ref(self) -> 'BaseEnv': 174 """ 175 Overview: 176 ``env_ref`` is used to acquire some common attributes of env, like obs_shape and act_shape. 177 Returns: 178 - env_ref (:obj:`BaseEnv`): The reference of sub-environment. 179 """ 180 return self._env_ref 181 182 @property 183 def observation_space(self) -> 'gym.spaces.Space': # noqa 184 """ 185 Overview: 186 ``observation_space`` is the observation space of sub-environment, following the format of gym.spaces. 187 Returns: 188 - observation_space (:obj:`gym.spaces.Space`): The observation space of sub-environment. 189 """ 190 return self._observation_space 191 192 @property 193 def action_space(self) -> 'gym.spaces.Space': # noqa 194 """ 195 Overview: 196 ``action_space`` is the action space of sub-environment, following the format of gym.spaces. 197 Returns: 198 - action_space (:obj:`gym.spaces.Space`): The action space of sub-environment. 199 """ 200 return self._action_space 201 202 @property 203 def reward_space(self) -> 'gym.spaces.Space': # noqa 204 """ 205 Overview: 206 ``reward_space`` is the reward space of sub-environment, following the format of gym.spaces. 207 Returns: 208 - reward_space (:obj:`gym.spaces.Space`): The reward space of sub-environment. 209 """ 210 return self._reward_space 211 212 @property 213 def ready_obs(self) -> Dict[int, Any]: 214 """ 215 Overview: 216 Get the ready (next) observation, which is a special design to unify both aysnc/sync env manager. 217 For each interaction between policy and env, the policy will input the ready_obs and output the action. 218 Then the env_manager will ``step`` with the action and prepare the next ready_obs. 219 Returns: 220 - ready_obs (:obj:`Dict[int, Any]`): A dict with env_id keys and observation values. 221 Example: 222 >>> obs = env_manager.ready_obs 223 >>> stacked_obs = np.concatenate(list(obs.values())) 224 >>> action = policy(obs) # here policy inputs np obs and outputs np action 225 >>> action = {env_id: a for env_id, a in zip(obs.keys(), action)} 226 >>> timesteps = env_manager.step(action) 227 """ 228 active_env = [i for i, s in self._env_states.items() if s == EnvState.RUN] 229 return {i: self._ready_obs[i] for i in active_env} 230 231 @property 232 def ready_obs_id(self) -> List[int]: 233 """ 234 Overview: 235 Get the ready (next) observation id, which is a special design to unify both aysnc/sync env manager. 236 Returns: 237 - ready_obs_id (:obj:`List[int]`): A list of env_ids for ready observations. 238 """ 239 # In BaseEnvManager, if env_episode_count equals episode_num, this env is done. 240 return [i for i, s in self._env_states.items() if s == EnvState.RUN] 241 242 @property 243 def ready_imgs(self, render_mode: Optional[str] = 'rgb_array') -> Dict[int, Any]: 244 """ 245 Overview: 246 Sometimes, we need to render the envs, this function is used to get the next ready renderd frame and \ 247 corresponding env id. 248 Arguments: 249 - render_mode (:obj:`Optional[str]`): The render mode, can be 'rgb_array' or 'depth_array', which follows \ 250 the definition in the ``render`` function of ``ding.utils`` . 251 Returns: 252 - ready_imgs (:obj:`Dict[int, np.ndarray]`): A dict with env_id keys and rendered frames. 253 """ 254 from ding.utils import render 255 assert render_mode in ['rgb_array', 'depth_array'], render_mode 256 return {i: render(self._envs[i], render_mode) for i in self.ready_obs_id} 257 258 @property 259 def done(self) -> bool: 260 """ 261 Overview: 262 ``done`` is a flag to indicate whether env manager is done, i.e., whether all sub-environments have \ 263 executed enough episodes. 264 Returns: 265 - done (:obj:`bool`): Whether env manager is done. 266 """ 267 return all([s == EnvState.DONE for s in self._env_states.values()]) 268 269 @property 270 def method_name_list(self) -> list: 271 """ 272 Overview: 273 The public methods list of sub-environments that can be directly called from the env manager level. Other \ 274 methods and attributes will be accessed with the ``__getattr__`` method. 275 Methods defined in this list can be regarded as the vectorized extension of methods in sub-environments. 276 Sub-class of ``BaseEnvManager`` can override this method to add more methods. 277 Returns: 278 - method_name_list (:obj:`list`): The public methods list of sub-environments. 279 """ 280 return [ 281 'reset', 'step', 'seed', 'close', 'enable_save_replay', 'render', 'reward_shaping', 'enable_save_figure' 282 ] 283 284 def env_state_done(self, env_id: int) -> bool: 285 return self._env_states[env_id] == EnvState.DONE 286 287 def __getattr__(self, key: str) -> Any: 288 """ 289 Note: 290 If a python object doesn't have the attribute whose name is `key`, it will call this method. 291 We suppose that all envs have the same attributes. 292 If you need different envs, please implement other env managers. 293 """ 294 if not hasattr(self._env_ref, key): 295 raise AttributeError("env `{}` doesn't have the attribute `{}`".format(type(self._env_ref), key)) 296 if isinstance(getattr(self._env_ref, key), MethodType) and key not in self.method_name_list: 297 raise RuntimeError("env getattr doesn't support method({}), please override method_name_list".format(key)) 298 self._check_closed() 299 return [getattr(env, key) if hasattr(env, key) else None for env in self._envs] 300 301 def _check_closed(self): 302 """ 303 Overview: 304 Check whether the env manager is closed. Will be called in ``__getattr__`` and ``step``. 305 """ 306 assert not self._closed, "env manager is closed, please use the alive env manager" 307 308 def launch(self, reset_param: Optional[Dict] = None) -> None: 309 """ 310 Overview: 311 Launch the env manager, instantiate the sub-environments and set up the environments and their parameters. 312 Arguments: 313 - reset_param (:obj:`Optional[Dict]`): A dict of reset parameters for each environment, key is the env_id, \ 314 value is the corresponding reset parameter, defaults to None. 315 """ 316 assert self._closed, "Please first close the env manager" 317 try: 318 global space_log_flag 319 if space_log_flag: 320 logging.info("Env Space Information:") 321 logging.info("\tObservation Space: {}".format(self._observation_space)) 322 logging.info("\tAction Space: {}".format(self._action_space)) 323 logging.info("\tReward Space: {}".format(self._reward_space)) 324 space_log_flag = False 325 except: 326 pass 327 if reset_param is not None: 328 assert len(reset_param) == len(self._env_fn) 329 self._create_state() 330 self.reset(reset_param) 331 332 def _create_state(self) -> None: 333 self._env_episode_count = {i: 0 for i in range(self.env_num)} 334 self._ready_obs = {i: None for i in range(self.env_num)} 335 self._envs = [e() for e in self._env_fn] 336 assert len(self._envs) == self._env_num 337 self._reset_param = {i: {} for i in range(self.env_num)} 338 self._env_states = {i: EnvState.INIT for i in range(self.env_num)} 339 if self._env_replay_path is not None: 340 for e, s in zip(self._envs, self._env_replay_path): 341 e.enable_save_replay(s) 342 self._closed = False 343 344 def reset(self, reset_param: Optional[Dict] = None) -> None: 345 """ 346 Overview: 347 Forcely reset the sub-environments their corresponding parameters. Because in env manager all the \ 348 sub-environments usually are reset automatically as soon as they are done, this method is only called when \ 349 the caller must forcely reset all the sub-environments, such as in evaluation. 350 Arguments: 351 - reset_param (:obj:`List`): Dict of reset parameters for each environment, key is the env_id, \ 352 value is the corresponding reset parameters. 353 """ 354 self._check_closed() 355 # set seed if necessary 356 env_ids = list(range(self._env_num)) if reset_param is None else list(reset_param.keys()) 357 for i, env_id in enumerate(env_ids): # loop-type is necessary 358 if self._env_seed[env_id] is not None: 359 if self._env_dynamic_seed is not None: 360 self._envs[env_id].seed(self._env_seed[env_id], self._env_dynamic_seed) 361 else: 362 self._envs[env_id].seed(self._env_seed[env_id]) 363 self._env_seed[env_id] = None # seed only use once 364 # reset env 365 if reset_param is None: 366 env_range = range(self.env_num) 367 else: 368 for env_id in reset_param: 369 self._reset_param[env_id] = reset_param[env_id] 370 env_range = reset_param.keys() 371 for env_id in env_range: 372 if self._env_replay_path is not None and self._env_states[env_id] == EnvState.RUN: 373 logging.warning("please don't reset a unfinished env when you enable save replay, we just skip it") 374 continue 375 self._reset(env_id) 376 377 def _reset(self, env_id: int) -> None: 378 379 @timeout_wrapper(timeout=self._reset_timeout) 380 def reset_fn(): 381 # if self._reset_param[env_id] is None, just reset specific env, not pass reset param 382 if self._reset_param[env_id] is not None: 383 assert isinstance(self._reset_param[env_id], dict), type(self._reset_param[env_id]) 384 return self._envs[env_id].reset(**self._reset_param[env_id]) 385 else: 386 return self._envs[env_id].reset() 387 388 exceptions = [] 389 for _ in range(self._max_retry): 390 try: 391 self._env_states[env_id] = EnvState.RESET 392 obs = reset_fn() 393 self._ready_obs[env_id] = obs 394 self._env_states[env_id] = EnvState.RUN 395 return 396 except BaseException as e: 397 if self._retry_type == 'renew': 398 err_env = self._envs[env_id] 399 err_env.close() 400 self._envs[env_id] = self._env_fn[env_id]() 401 exceptions.append(e) 402 time.sleep(self._retry_waiting_time) 403 continue 404 405 self._env_states[env_id] = EnvState.ERROR 406 self.close() 407 logging.error("Env {} reset has exceeded max retries({})".format(env_id, self._max_retry)) 408 runtime_error = RuntimeError( 409 "Env {} reset has exceeded max retries({}), and the latest exception is: {}".format( 410 env_id, self._max_retry, str(exceptions[-1]) 411 ) 412 ) 413 runtime_error.__traceback__ = exceptions[-1].__traceback__ 414 raise runtime_error 415 416 def step(self, actions: Dict[int, Any]) -> Dict[int, BaseEnvTimestep]: 417 """ 418 Overview: 419 Execute env step according to input actions. If some sub-environments are done after this execution, \ 420 they will be reset automatically when ``self._auto_reset`` is True, otherwise they need to be reset when \ 421 the caller use the ``reset`` method of env manager. 422 Arguments: 423 - actions (:obj:`Dict[int, Any]`): A dict of actions, key is the env_id, value is corresponding action. \ 424 action can be any type, it depends on the env, and the env will handle it. Ususlly, the action is \ 425 a dict of numpy array, and the value is generated by the outer caller like ``policy``. 426 Returns: 427 - timesteps (:obj:`Dict[int, BaseEnvTimestep]`): Each timestep is a ``BaseEnvTimestep`` object, \ 428 usually including observation, reward, done, info. Some special customized environments will have \ 429 the special timestep definition. The length of timesteps is the same as the length of actions in \ 430 synchronous env manager. 431 Example: 432 >>> timesteps = env_manager.step(action) 433 >>> for env_id, timestep in enumerate(timesteps): 434 >>> if timestep.done: 435 >>> print('Env {} is done'.format(env_id)) 436 """ 437 self._check_closed() 438 timesteps = {} 439 for env_id, act in actions.items(): 440 timesteps[env_id] = self._step(env_id, act) 441 if timesteps[env_id].done: 442 self._env_episode_count[env_id] += 1 443 if self._env_episode_count[env_id] < self._episode_num: 444 if self._auto_reset: 445 self._reset(env_id) 446 else: 447 self._env_states[env_id] = EnvState.NEED_RESET 448 else: 449 self._env_states[env_id] = EnvState.DONE 450 else: 451 self._ready_obs[env_id] = timesteps[env_id].obs 452 return timesteps 453 454 def _step(self, env_id: int, act: Any) -> BaseEnvTimestep: 455 456 @timeout_wrapper(timeout=self._step_timeout) 457 def step_fn(): 458 return self._envs[env_id].step(act) 459 460 exceptions = [] 461 for _ in range(self._max_retry): 462 try: 463 return step_fn() 464 except BaseException as e: 465 exceptions.append(e) 466 self._env_states[env_id] = EnvState.ERROR 467 logging.error("Env {} step has exceeded max retries({})".format(env_id, self._max_retry)) 468 runtime_error = RuntimeError( 469 "Env {} step has exceeded max retries({}), and the latest exception is: {}".format( 470 env_id, self._max_retry, str(exceptions[-1]) 471 ) 472 ) 473 runtime_error.__traceback__ = exceptions[-1].__traceback__ 474 raise runtime_error 475 476 def seed(self, seed: Union[Dict[int, int], List[int], int], dynamic_seed: bool = None) -> None: 477 """ 478 Overview: 479 Set the random seed for each environment. 480 Arguments: 481 - seed (:obj:`Union[Dict[int, int], List[int], int]`): Dict or List of seeds for each environment; \ 482 If only one seed is provided, it will be used in the same way for all environments. 483 - dynamic_seed (:obj:`bool`): Whether to use dynamic seed. 484 485 .. note:: 486 For more details about ``dynamic_seed``, please refer to the best practice document of DI-engine \ 487 (`en link2 <../04_best_practice/random_seed.html>`_). 488 """ 489 if isinstance(seed, numbers.Integral): 490 seed = [seed + i for i in range(self.env_num)] 491 self._env_seed = seed 492 elif isinstance(seed, list): 493 assert len(seed) == self._env_num, "len(seed) {:d} != env_num {:d}".format(len(seed), self._env_num) 494 self._env_seed = seed 495 elif isinstance(seed, dict): 496 if not hasattr(self, '_env_seed'): 497 raise RuntimeError("please indicate all the seed of each env in the beginning") 498 for env_id, s in seed.items(): 499 self._env_seed[env_id] = s 500 else: 501 raise TypeError("invalid seed arguments type: {}".format(type(seed))) 502 self._env_dynamic_seed = dynamic_seed 503 try: 504 self._action_space.seed(seed[0]) 505 except Exception: # TODO(nyz) deal with nested action_space like SMAC 506 pass 507 508 def enable_save_replay(self, replay_path: Union[List[str], str]) -> None: 509 """ 510 Overview: 511 Enable all environments to save replay video after each episode terminates. 512 Arguments: 513 - replay_path (:obj:`Union[List[str], str]`): List of paths for each environment; \ 514 Or one path for all environments. 515 """ 516 if isinstance(replay_path, str): 517 replay_path = [replay_path] * self.env_num 518 self._env_replay_path = replay_path 519 520 def enable_save_figure(self, env_id: int, figure_path: str) -> None: 521 """ 522 Overview: 523 Enable a specific env to save figure (e.g. environment statistics or episode return curve). 524 Arguments: 525 - figure_path (:obj:`str`): The file directory path for all environments to save figures. 526 """ 527 assert figure_path is not None 528 self._envs[env_id].enable_save_figure(figure_path) 529 530 def close(self) -> None: 531 """ 532 Overview: 533 Close the env manager and release all the environment resources. 534 """ 535 if self._closed: 536 return 537 for env in self._envs: 538 env.close() 539 for i in range(self._env_num): 540 self._env_states[i] = EnvState.VOID 541 self._closed = True 542 543 def reward_shaping(self, env_id: int, transitions: List[dict]) -> List[dict]: 544 """ 545 Overview: 546 Execute reward shaping for a specific environment, which is often called when a episode terminates. 547 Arguments: 548 - env_id (:obj:`int`): The id of the environment to be shaped. 549 - transitions (:obj:`List[dict]`): The transition data list of the environment to be shaped. 550 Returns: 551 - transitions (:obj:`List[dict]`): The shaped transition data list. 552 """ 553 return self._envs[env_id].reward_shaping(transitions) 554 555 @property 556 def closed(self) -> bool: 557 """ 558 Overview: 559 ``closed`` is a property that returns whether the env manager is closed. 560 Returns: 561 - closed (:obj:`bool`): Whether the env manager is closed. 562 """ 563 return self._closed 564 565 def random_action(self) -> Dict: 566 return {env_id: self._env_ref.action_space.sample() for env_id in self.ready_obs_id} 567 568 569@ENV_MANAGER_REGISTRY.register('base_v2') 570class BaseEnvManagerV2(BaseEnvManager): 571 """ 572 Overview: 573 The basic class of env manager to manage multiple vectorized environments. BaseEnvManager define all the 574 necessary interfaces and derived class must extend this basic class. 575 576 The class is implemented by the pseudo-parallelism (i.e. serial) mechanism, therefore, this class is only 577 used in some tiny environments and for debug purpose. 578 579 ``V2`` means this env manager is designed for new task pipeline and interfaces coupled with treetensor.` 580 581 .. note:: 582 For more details about new task pipeline, please refer to the system document of DI-engine \ 583 (`system en link3 <../03_system/index.html>`_). 584 585 Interfaces: 586 reset, step, seed, close, enable_save_replay, launch, default_config, reward_shaping, enable_save_figure 587 Properties: 588 env_num, env_ref, ready_obs, ready_obs_id, ready_imgs, done, closed, method_name_list, observation_space, \ 589 action_space, reward_space 590 """ 591 592 @property 593 def ready_obs(self) -> tnp.array: 594 """ 595 Overview: 596 Get the ready (next) observation, which is a special design to unify both aysnc/sync env manager. 597 For each interaction between policy and env, the policy will input the ready_obs and output the action. 598 Then the env_manager will ``step`` with the action and prepare the next ready_obs. 599 For ``V2`` version, the observation is transformed and packed up into ``tnp.array`` type, which allows 600 more convenient operations. 601 Return: 602 - ready_obs (:obj:`tnp.array`): A stacked treenumpy-type observation data. 603 Example: 604 >>> obs = env_manager.ready_obs 605 >>> action = policy(obs) # here policy inputs treenp obs and output np action 606 >>> timesteps = env_manager.step(action) 607 """ 608 active_env = [i for i, s in self._env_states.items() if s == EnvState.RUN] 609 obs = [self._ready_obs[i] for i in active_env] 610 if isinstance(obs[0], dict): # transform each element to treenumpy array 611 obs = [tnp.array(o) for o in obs] 612 return tnp.stack(obs) 613 614 def step(self, actions: List[tnp.ndarray]) -> List[tnp.ndarray]: 615 """ 616 Overview: 617 Execute env step according to input actions. If some sub-environments are done after this execution, \ 618 they will be reset automatically by default. 619 Arguments: 620 - actions (:obj:`List[tnp.ndarray]`): A list of treenumpy-type actions, the value is generated by the \ 621 outer caller like ``policy``. 622 Returns: 623 - timesteps (:obj:`List[tnp.ndarray]`): A list of timestep, Each timestep is a ``tnp.ndarray`` object, \ 624 usually including observation, reward, done, info, env_id. Some special environments will have \ 625 the special timestep definition. The length of timesteps is the same as the length of actions in \ 626 synchronous env manager. For the compatibility of treenumpy, here we use ``make_key_as_identifier`` \ 627 and ``remove_illegal_item`` functions to modify the original timestep. 628 Example: 629 >>> timesteps = env_manager.step(action) 630 >>> for timestep in timesteps: 631 >>> if timestep.done: 632 >>> print('Env {} is done'.format(timestep.env_id)) 633 """ 634 actions = {env_id: a for env_id, a in zip(self.ready_obs_id, actions)} 635 timesteps = super().step(actions) 636 new_data = [] 637 for env_id, timestep in timesteps.items(): 638 obs, reward, done, info = timestep 639 # make the type and content of key as similar as identifier, 640 # in order to call them as attribute (e.g. timestep.xxx), such as ``TimeLimit.truncated`` in cartpole info 641 info = make_key_as_identifier(info) 642 info = remove_illegal_item(info) 643 new_data.append(tnp.array({'obs': obs, 'reward': reward, 'done': done, 'info': info, 'env_id': env_id})) 644 return new_data 645 646 647def create_env_manager(manager_cfg: EasyDict, env_fn: List[Callable]) -> BaseEnvManager: 648 """ 649 Overview: 650 Create an env manager according to ``manager_cfg`` and env functions. 651 Arguments: 652 - manager_cfg (:obj:`EasyDict`): Final merged env manager config. 653 - env_fn (:obj:`List[Callable]`): A list of functions to create ``env_num`` sub-environments. 654 ArgumentsKeys: 655 - type (:obj:`str`): Env manager type set in ``ENV_MANAGER_REGISTRY.register`` , such as ``base`` . 656 - import_names (:obj:`List[str]`): A list of module names (paths) to import before creating env manager, such \ 657 as ``ding.envs.env_manager.base_env_manager`` . 658 Returns: 659 - env_manager (:obj:`BaseEnvManager`): The created env manager. 660 661 .. tip:: 662 This method will not modify the ``manager_cfg`` , it will deepcopy the ``manager_cfg`` and then modify it. 663 """ 664 manager_cfg = copy.deepcopy(manager_cfg) 665 if 'import_names' in manager_cfg: 666 import_module(manager_cfg.pop('import_names')) 667 manager_type = manager_cfg.pop('type') 668 return ENV_MANAGER_REGISTRY.build(manager_type, env_fn=env_fn, cfg=manager_cfg) 669 670 671def get_env_manager_cls(cfg: EasyDict) -> type: 672 """ 673 Overview: 674 Get the env manager class according to config, which is used to access related class variables/methods. 675 Arguments: 676 - manager_cfg (:obj:`EasyDict`): Final merged env manager config. 677 ArgumentsKeys: 678 - type (:obj:`str`): Env manager type set in ``ENV_MANAGER_REGISTRY.register`` , such as ``base`` . 679 - import_names (:obj:`List[str]`): A list of module names (paths) to import before creating env manager, such \ 680 as ``ding.envs.env_manager.base_env_manager`` . 681 Returns: 682 - env_manager_cls (:obj:`type`): The corresponding env manager class. 683 """ 684 import_module(cfg.get('import_names', [])) 685 return ENV_MANAGER_REGISTRY.get(cfg.type)