Skip to content

ding.envs.env_manager.gym_vector_env_manager

ding.envs.env_manager.gym_vector_env_manager

GymVectorEnvManager

Bases: BaseEnvManager

Overview

Create an GymVectorEnvManager to manage multiple environments. Each Environment is run by a respective subprocess.

Interfaces: seed, ready_obs, step, reset, close

__init__(env_fn, cfg)

.. note:: env_fn must create gym-type environment instance, which may different DI-engine environment.

close()

Overview

Release the environment resources Since not calling super.init, no need to release BaseEnvManager's resources

Full Source Code

../ding/envs/env_manager/gym_vector_env_manager.py

1from typing import Any, Union, List, Tuple, Dict, Callable, Optional 2from ditk import logging 3import numpy as np 4from easydict import EasyDict 5from collections import namedtuple 6import gym 7from gym.vector.async_vector_env import AsyncVectorEnv 8 9from ding.envs import BaseEnv, BaseEnvTimestep 10from ding.torch_utils import to_ndarray, to_list 11from ding.utils import PropagatingThread, LockContextType, LockContext, ENV_MANAGER_REGISTRY 12from .base_env_manager import BaseEnvManager 13from .base_env_manager import EnvState 14 15 16@ENV_MANAGER_REGISTRY.register('gym_vector') 17class GymVectorEnvManager(BaseEnvManager): 18 """ 19 Overview: 20 Create an GymVectorEnvManager to manage multiple environments. 21 Each Environment is run by a respective subprocess. 22 Interfaces: 23 seed, ready_obs, step, reset, close 24 """ 25 config = dict(shared_memory=False, episode_num=float("inf")) 26 27 def __init__(self, env_fn: List[Callable], cfg: EasyDict) -> None: 28 """ 29 .. note:: 30 ``env_fn`` must create gym-type environment instance, which may different DI-engine environment. 31 """ 32 self._cfg = cfg 33 self._env_fn = env_fn 34 self._env_num = len(self._env_fn) 35 self._closed = True 36 self._env_replay_path = None 37 # env_ref is used to acquire some common attributes of env, like obs_shape and act_shape 38 self._env_ref = self._env_fn[0]() 39 self._env_states = {i: EnvState.VOID for i in range(self._env_num)} 40 41 self._episode_num = self._cfg.episode_num 42 self._env_episode_count = {i: 0 for i in range(self.env_num)} 43 44 self._env_manager = AsyncVectorEnv( 45 env_fns=self._env_fn, 46 # observation_space=observation_space, 47 # action_space=action_space, 48 shared_memory=cfg.shared_memory, 49 ) 50 self._env_states = {i: EnvState.INIT for i in range(self._env_num)} 51 self._eval_episode_return = [0. for _ in range(self._env_num)] 52 53 def reset(self, reset_param: Optional[Dict] = None) -> None: 54 assert reset_param is None 55 self._closed = False 56 for env_id in range(self.env_num): 57 self._env_states[env_id] = EnvState.RESET 58 self._ready_obs = self._env_manager.reset() 59 for env_id in range(self.env_num): 60 self._env_states[env_id] = EnvState.RUN 61 self._eval_episode_return = [0. for _ in range(self._env_num)] 62 63 def step(self, actions: Dict[int, Any]) -> Dict[int, namedtuple]: 64 assert isinstance(actions, Dict), type(actions) 65 66 env_ids_given = list(actions.keys()) 67 for env_id in range(self.env_num): 68 if env_id not in actions.keys(): 69 actions[env_id] = self._env_ref.random_action() 70 """actions should be sorted by keys, since the original implementation 71 of the step method in gym accepts list-type actions""" 72 actions = dict(sorted(actions.items())) 73 74 actions = list(actions.values()) 75 elem = actions[0] 76 if not isinstance(elem, np.ndarray): 77 raise Exception('DI-engine only accept np.ndarray-type action!') 78 if elem.shape == (1, ): 79 actions = [v.item() for v in actions] 80 81 timestep = self._env_manager.step(actions) 82 timestep_collate_result = {} 83 for i in range(self.env_num): 84 if i in env_ids_given: 85 # Fix the compatability of API for both gym>=0.24.0 and gym<0.24.0 86 # https://github.com/openai/gym/pull/2773 87 if gym.version.VERSION >= '0.24.0': 88 timestepinfo = {} 89 for k, v in timestep[3].items(): 90 timestepinfo[k] = v[i] 91 timestep_collate_result[i] = BaseEnvTimestep( 92 timestep[0][i], timestep[1][i], timestep[2][i], timestepinfo 93 ) 94 else: 95 timestep_collate_result[i] = BaseEnvTimestep( 96 timestep[0][i], timestep[1][i], timestep[2][i], timestep[3][i] 97 ) 98 self._eval_episode_return[i] += timestep_collate_result[i].reward 99 if timestep_collate_result[i].done: 100 timestep_collate_result[i].info['eval_episode_return'] = self._eval_episode_return[i] 101 self._eval_episode_return[i] = 0 102 self._env_episode_count[i] += 1 103 if self._env_episode_count[i] >= self._episode_num: 104 self._env_states[i] = EnvState.DONE 105 else: 106 self._env_states[i] = EnvState.RESET 107 if all([self._env_states[i] == EnvState.RESET for i in range(self.env_num)]): 108 self.reset() 109 else: 110 self._ready_obs[i] = timestep_collate_result[i].obs 111 112 return timestep_collate_result 113 114 @property 115 def ready_obs(self) -> Dict[int, Any]: 116 return { 117 i: self._ready_obs[i] 118 for i in range(len(self._ready_obs)) if self._env_episode_count[i] < self._episode_num 119 } 120 121 def seed(self, seed: Union[Dict[int, int], List[int], int], dynamic_seed: bool = None) -> None: 122 self._env_manager.seed(seed) 123 # TODO dynamic_seed 124 logging.warning("gym env doesn't support dynamic_seed in different episode") 125 126 def close(self) -> None: 127 """ 128 Overview: 129 Release the environment resources 130 Since not calling super.__init__, no need to release BaseEnvManager's resources 131 """ 132 if self._closed: 133 return 134 self._closed = True 135 self._env_ref.close() 136 self._env_manager.close() 137 self._env_manager.close_extras(terminate=True)