Skip to content

ding.reward_model.her_reward_model

ding.reward_model.her_reward_model

HerRewardModel

Overview

Hindsight Experience Replay model.

.. note:: - her_strategy (:obj:str): Type of strategy that HER uses, should be in ['final', 'future', 'episode'] - her_replay_k (:obj:int): Number of new episodes generated by an original episode. (Not used in episodic HER) - episode_size (:obj:int): Sample how many episodes in one iteration. - sample_per_episode (:obj:int): How many new samples are generated from an episode.

.. note:: In HER, we require episode trajectory to change the goals. However, episode lengths are different and may have high variance. As a result, we recommend that you only use some transitions in the complete episode by specifying episode_size and sample_per_episode in config. Therefore, in one iteration, batch_size would be episode_size * sample_per_episode.

estimate(episode, merge_func=None, split_func=None, goal_reward_func=None)

Overview

Get HER processed episodes from original episodes.

Arguments: - episode (:obj:List[Dict[str, Any]]): Episode list, each element is a transition. - merge_func (:obj:Callable): The merge function to use, default set to None. If None, then use __her_default_merge_func - split_func (:obj:Callable): The split function to use, default set to None. If None, then use __her_default_split_func - goal_reward_func (:obj:Callable): The goal_reward function to use, default set to None. If None, then use __her_default_goal_reward_func Returns: - new_episode (:obj:List[Dict[str, Any]]): the processed transitions

__her_default_merge_func(x, y) staticmethod

Overview

The function to merge obs in HER timestep

Arguments: - x (:obj:Any): one of the timestep obs to merge - y (:obj:Any): another timestep obs to merge Returns: - ret (:obj:Any): the merge obs

__her_default_split_func(x) staticmethod

Overview

Split the input into obs, desired goal, and achieved goal.

Arguments: - x (:obj:Any): The input to split Returns: - obs (:obj:torch.Tensor): Original obs. - desired_goal (:obj:torch.Tensor): The final goal that wants to desired_goal - achieved_goal (:obj:torch.Tensor): the achieved_goal

__her_default_goal_reward_func(achieved_goal, desired_goal) staticmethod

Overview

Get the corresponding merge reward according to whether the achieved_goal fit the desired_goal

Arguments: - achieved_goal (:obj:torch.Tensor): the achieved goal - desired_goal (:obj:torch.Tensor): the desired_goal Returns: - goal_reward (:obj:torch.Tensor): the goal reward according to \ whether the achieved_goal fit the disired_goal

Full Source Code

../ding/reward_model/her_reward_model.py

1from typing import List, Dict, Any, Optional, Callable, Tuple 2import copy 3import numpy as np 4import torch 5 6 7class HerRewardModel: 8 """ 9 Overview: 10 Hindsight Experience Replay model. 11 12 .. note:: 13 - her_strategy (:obj:`str`): Type of strategy that HER uses, should be in ['final', 'future', 'episode'] 14 - her_replay_k (:obj:`int`): Number of new episodes generated by an original episode. (Not used in episodic HER) 15 - episode_size (:obj:`int`): Sample how many episodes in one iteration. 16 - sample_per_episode (:obj:`int`): How many new samples are generated from an episode. 17 18 .. note:: 19 In HER, we require episode trajectory to change the goals. However, episode lengths are different 20 and may have high variance. As a result, we **recommend** that you only use some transitions in 21 the complete episode by specifying ``episode_size`` and ``sample_per_episode`` in config. 22 Therefore, in one iteration, ``batch_size`` would be ``episode_size`` * ``sample_per_episode``. 23 """ 24 25 def __init__( 26 self, 27 cfg: dict, 28 cuda: bool = False, 29 ) -> None: 30 self._cuda = cuda and torch.cuda.is_available() 31 self._device = 'cuda' if self._cuda else 'cpu' 32 self._her_strategy = cfg.her_strategy 33 assert self._her_strategy in ['final', 'future', 'episode'] 34 # `her_replay_k` may not be used in episodic HER, so default set to 1. 35 self._her_replay_k = cfg.get('her_replay_k', 1) 36 self._episode_size = cfg.get('episode_size', None) 37 self._sample_per_episode = cfg.get('sample_per_episode', None) 38 39 def estimate( 40 self, 41 episode: List[Dict[str, Any]], 42 merge_func: Optional[Callable] = None, 43 split_func: Optional[Callable] = None, 44 goal_reward_func: Optional[Callable] = None 45 ) -> List[Dict[str, Any]]: 46 """ 47 Overview: 48 Get HER processed episodes from original episodes. 49 Arguments: 50 - episode (:obj:`List[Dict[str, Any]]`): Episode list, each element is a transition. 51 - merge_func (:obj:`Callable`): The merge function to use, default set to None. If None, \ 52 then use ``__her_default_merge_func`` 53 - split_func (:obj:`Callable`): The split function to use, default set to None. If None, \ 54 then use ``__her_default_split_func`` 55 - goal_reward_func (:obj:`Callable`): The goal_reward function to use, default set to None. If None, \ 56 then use ``__her_default_goal_reward_func`` 57 Returns: 58 - new_episode (:obj:`List[Dict[str, Any]]`): the processed transitions 59 """ 60 if merge_func is None: 61 merge_func = HerRewardModel.__her_default_merge_func 62 if split_func is None: 63 split_func = HerRewardModel.__her_default_split_func 64 if goal_reward_func is None: 65 goal_reward_func = HerRewardModel.__her_default_goal_reward_func 66 new_episodes = [[] for _ in range(self._her_replay_k)] 67 if self._sample_per_episode is None: 68 # Use complete episode 69 indices = range(len(episode)) 70 else: 71 # Use some transitions in one episode 72 indices = np.random.randint(0, len(episode), (self._sample_per_episode)) 73 for idx in indices: 74 obs, _, _ = split_func(episode[idx]['obs']) 75 next_obs, _, achieved_goal = split_func(episode[idx]['next_obs']) 76 for k in range(self._her_replay_k): 77 if self._her_strategy == 'final': 78 p_idx = -1 79 elif self._her_strategy == 'episode': 80 p_idx = np.random.randint(0, len(episode)) 81 elif self._her_strategy == 'future': 82 p_idx = np.random.randint(idx, len(episode)) 83 _, _, new_desired_goal = split_func(episode[p_idx]['next_obs']) 84 timestep = { 85 k: copy.deepcopy(v) 86 for k, v in episode[idx].items() if k not in ['obs', 'next_obs', 'reward'] 87 } 88 timestep['obs'] = merge_func(obs, new_desired_goal) 89 timestep['next_obs'] = merge_func(next_obs, new_desired_goal) 90 timestep['reward'] = goal_reward_func(achieved_goal, new_desired_goal).to(self._device) 91 new_episodes[k].append(timestep) 92 return new_episodes 93 94 @staticmethod 95 def __her_default_merge_func(x: Any, y: Any) -> Any: 96 r""" 97 Overview: 98 The function to merge obs in HER timestep 99 Arguments: 100 - x (:obj:`Any`): one of the timestep obs to merge 101 - y (:obj:`Any`): another timestep obs to merge 102 Returns: 103 - ret (:obj:`Any`): the merge obs 104 """ 105 # TODO(nyz) dict/list merge_func 106 return torch.cat([x, y], dim=0) 107 108 @staticmethod 109 def __her_default_split_func(x: Any) -> Tuple[Any, Any, Any]: 110 r""" 111 Overview: 112 Split the input into obs, desired goal, and achieved goal. 113 Arguments: 114 - x (:obj:`Any`): The input to split 115 Returns: 116 - obs (:obj:`torch.Tensor`): Original obs. 117 - desired_goal (:obj:`torch.Tensor`): The final goal that wants to desired_goal 118 - achieved_goal (:obj:`torch.Tensor`): the achieved_goal 119 """ 120 # TODO(nyz) dict/list split_func 121 # achieved_goal = f(obs), default: f == identical function 122 obs, desired_goal = torch.chunk(x, 2) 123 achieved_goal = obs 124 return obs, desired_goal, achieved_goal 125 126 @staticmethod 127 def __her_default_goal_reward_func(achieved_goal: torch.Tensor, desired_goal: torch.Tensor) -> torch.Tensor: 128 r""" 129 Overview: 130 Get the corresponding merge reward according to whether the achieved_goal fit the desired_goal 131 Arguments: 132 - achieved_goal (:obj:`torch.Tensor`): the achieved goal 133 - desired_goal (:obj:`torch.Tensor`): the desired_goal 134 Returns: 135 - goal_reward (:obj:`torch.Tensor`): the goal reward according to \ 136 whether the achieved_goal fit the disired_goal 137 """ 138 if (achieved_goal == desired_goal).all(): 139 return torch.FloatTensor([1]) 140 else: 141 return torch.FloatTensor([0]) 142 143 @property 144 def episode_size(self) -> int: 145 return self._episode_size 146 147 @property 148 def sample_per_episode(self) -> int: 149 return self._sample_per_episode