Skip to content

ding.utils.autolog.data

ding.utils.autolog.data

RangedData

Overview

A data structure that can store data for a period of time.

Interfaces: __init__, append, extend, current, history, expire, __bool__, _get_time. Properties: - expire (:obj:float): The expire time.

expire property

Overview

Get the expire time.

__init__(expire, use_pickle=False)

Overview

Initialize the RangedData object.

Arguments: - expire (:obj:float): The expire time of the data. - use_pickle (:obj:bool): Whether to use pickle to serialize the data.

__check_expire()

Overview

Check the expire time.

__registry_data_item(data)

Overview

Registry the data item.

Arguments: - data (:obj:_Tp): The data item.

__get_data_item(data_id)

Overview

Get the data item.

Arguments: - data_id (:obj:int): The data id.

__remove_data_item(data_id)

Overview

Remove the data item.

Arguments: - data_id (:obj:int): The data id.

__check_time(time_)

Overview

Check the time.

Arguments: - time_ (:obj:float): The time.

__append_item(time_, data)

Overview

Append the data item.

Arguments: - time_ (:obj:float): The time. - data (:obj:_Tp): The data item.

__flush_history()

Overview

Flush the history data.

__append(time_, data)

Overview

Append the data.

__current()

Overview

Get the current data.

__history_yield()

Overview

Yield the history data.

__history()

Overview

Get the history data.

append(data)

Overview

Append the data.

extend(iter_)

Overview

Extend the data.

current()

Overview

Get the current data.

history()

Overview

Get the history data.

__bool__()

Overview

Check whether the range is empty.

TimeRangedData

Bases: RangedData

Overview

A data structure that can store data for a period of time.

Interfaces: __init__, _get_time, append, extend, current, history, expire, __bool__. Properties: - time (:obj:BaseTime): The time. - expire (:obj:float): The expire time.

time property

Overview

Get the time.

__init__(time_, expire)

Overview

Initialize the TimeRangedData object.

Arguments: - time_ (:obj:BaseTime): The time. - expire (:obj:float): The expire time.

Full Source Code

../ding/utils/autolog/data.py

1import pickle 2from abc import abstractmethod, ABCMeta 3from collections import deque 4from threading import Lock 5from typing import TypeVar, Iterable, List, Tuple, Union 6 7from .time_ctl import BaseTime 8 9_Tp = TypeVar('_Tp') 10 11 12class RangedData(metaclass=ABCMeta): 13 """ 14 Overview: 15 A data structure that can store data for a period of time. 16 Interfaces: 17 ``__init__``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``, ``_get_time``. 18 Properties: 19 - expire (:obj:`float`): The expire time. 20 """ 21 22 def __init__(self, expire: float, use_pickle: bool = False): 23 """ 24 Overview: 25 Initialize the RangedData object. 26 Arguments: 27 - expire (:obj:`float`): The expire time of the data. 28 - use_pickle (:obj:`bool`): Whether to use pickle to serialize the data. 29 """ 30 31 self.__expire = expire 32 self.__use_pickle = use_pickle 33 self.__check_expire() 34 35 self.__data_max_id = 0 36 self.__data_items = {} 37 self.__data_lock = Lock() 38 39 self.__last_item = None 40 self.__queue = deque() 41 self.__lock = Lock() 42 43 def __check_expire(self): 44 """ 45 Overview: 46 Check the expire time. 47 """ 48 49 if isinstance(self.__expire, (int, float)): 50 if self.__expire <= 0: 51 raise ValueError( 52 "Expire should be greater than 0, but {actual} found.".format(actual=repr(self.__expire)) 53 ) 54 else: 55 raise TypeError( 56 'Expire should be int or float, but {actual} found.'.format(actual=type(self.__expire).__name__) 57 ) 58 59 def __registry_data_item(self, data: _Tp) -> int: 60 """ 61 Overview: 62 Registry the data item. 63 Arguments: 64 - data (:obj:`_Tp`): The data item. 65 """ 66 67 with self.__data_lock: 68 self.__data_max_id += 1 69 if self.__use_pickle: 70 self.__data_items[self.__data_max_id] = pickle.dumps(data) 71 else: 72 self.__data_items[self.__data_max_id] = data 73 74 return self.__data_max_id 75 76 def __get_data_item(self, data_id: int) -> _Tp: 77 """ 78 Overview: 79 Get the data item. 80 Arguments: 81 - data_id (:obj:`int`): The data id. 82 """ 83 84 with self.__data_lock: 85 if self.__use_pickle: 86 return pickle.loads(self.__data_items[data_id]) 87 else: 88 return self.__data_items[data_id] 89 90 def __remove_data_item(self, data_id: int): 91 """ 92 Overview: 93 Remove the data item. 94 Arguments: 95 - data_id (:obj:`int`): The data id. 96 """ 97 98 with self.__data_lock: 99 del self.__data_items[data_id] 100 101 def __check_time(self, time_: float): 102 """ 103 Overview: 104 Check the time. 105 Arguments: 106 - time_ (:obj:`float`): The time. 107 """ 108 109 if self.__queue: 110 _time, _ = self.__queue[-1] 111 if time_ < _time: 112 raise ValueError( 113 "Time {time} invalid for descending from last time {last_time}".format( 114 time=repr(time_), last_time=repr(_time) 115 ) 116 ) 117 118 def __append_item(self, time_: float, data: _Tp): 119 """ 120 Overview: 121 Append the data item. 122 Arguments: 123 - time_ (:obj:`float`): The time. 124 - data (:obj:`_Tp`): The data item. 125 """ 126 127 self.__queue.append((time_, self.__registry_data_item(data))) 128 129 def __flush_history(self): 130 """ 131 Overview: 132 Flush the history data. 133 """ 134 135 _time = self._get_time() 136 _limit_time = _time - self.__expire 137 while self.__queue: 138 _head_time, _head_id = self.__queue.popleft() 139 if _head_time >= _limit_time: 140 self.__queue.appendleft((_head_time, _head_id)) 141 break 142 else: 143 if self.__last_item: 144 _last_time, _last_id = self.__last_item 145 self.__remove_data_item(_last_id) 146 147 self.__last_item = (_head_time, _head_id) 148 149 def __append(self, time_: float, data: _Tp): 150 """ 151 Overview: 152 Append the data. 153 """ 154 155 self.__check_time(time_) 156 self.__append_item(time_, data) 157 self.__flush_history() 158 159 def __current(self): 160 """ 161 Overview: 162 Get the current data. 163 """ 164 165 if self.__queue: 166 _tail_time, _tail_id = self.__queue.pop() 167 self.__queue.append((_tail_time, _tail_id)) 168 return self.__get_data_item(_tail_id) 169 elif self.__last_item: 170 _last_time, _last_id = self.__last_item 171 return self.__get_data_item(_last_id) 172 else: 173 raise ValueError("This range is empty.") 174 175 def __history_yield(self): 176 """ 177 Overview: 178 Yield the history data. 179 """ 180 181 _time = self._get_time() 182 _limit_time = _time - self.__expire 183 _latest_time, _latest_id = None, None 184 185 if self.__last_item: 186 _latest_time, _latest_id = _last_time, _last_id = self.__last_item 187 yield max(_last_time, _limit_time), self.__get_data_item(_last_id) 188 189 for _item_time, _item_id in self.__queue: 190 _latest_time, _latest_id = _item_time, _item_id 191 yield _item_time, self.__get_data_item(_item_id) 192 193 if _latest_time is not None and _latest_time < _time: 194 yield _time, self.__get_data_item(_latest_id) 195 196 def __history(self): 197 """ 198 Overview: 199 Get the history data. 200 """ 201 202 return list(self.__history_yield()) 203 204 def append(self, data: _Tp): 205 """ 206 Overview: 207 Append the data. 208 """ 209 210 with self.__lock: 211 self.__flush_history() 212 _time = self._get_time() 213 self.__append(_time, data) 214 return self 215 216 def extend(self, iter_: Iterable[_Tp]): 217 """ 218 Overview: 219 Extend the data. 220 """ 221 222 with self.__lock: 223 self.__flush_history() 224 _time = self._get_time() 225 for item in iter_: 226 self.__append(_time, item) 227 return self 228 229 def current(self) -> _Tp: 230 """ 231 Overview: 232 Get the current data. 233 """ 234 235 with self.__lock: 236 self.__flush_history() 237 return self.__current() 238 239 def history(self) -> List[Tuple[Union[int, float], _Tp]]: 240 """ 241 Overview: 242 Get the history data. 243 """ 244 245 with self.__lock: 246 self.__flush_history() 247 return self.__history() 248 249 @property 250 def expire(self) -> float: 251 """ 252 Overview: 253 Get the expire time. 254 """ 255 256 with self.__lock: 257 self.__flush_history() 258 return self.__expire 259 260 def __bool__(self): 261 """ 262 Overview: 263 Check whether the range is empty. 264 """ 265 266 with self.__lock: 267 self.__flush_history() 268 return not not (self.__queue or self.__last_item) 269 270 @abstractmethod 271 def _get_time(self) -> float: 272 """ 273 Overview: 274 Get the current time. 275 """ 276 277 raise NotImplementedError 278 279 280class TimeRangedData(RangedData): 281 """ 282 Overview: 283 A data structure that can store data for a period of time. 284 Interfaces: 285 ``__init__``, ``_get_time``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``. 286 Properties: 287 - time (:obj:`BaseTime`): The time. 288 - expire (:obj:`float`): The expire time. 289 """ 290 291 def __init__(self, time_: BaseTime, expire: float): 292 """ 293 Overview: 294 Initialize the TimeRangedData object. 295 Arguments: 296 - time_ (:obj:`BaseTime`): The time. 297 - expire (:obj:`float`): The expire time. 298 """ 299 300 RangedData.__init__(self, expire) 301 self.__time = time_ 302 303 def _get_time(self) -> float: 304 """ 305 Overview: 306 Get the current time. 307 """ 308 309 return self.__time.time() 310 311 @property 312 def time(self): 313 """ 314 Overview: 315 Get the time. 316 """ 317 318 return self.__time