Skip to content

ding.worker.replay_buffer.naive_buffer

ding.worker.replay_buffer.naive_buffer

NaiveReplayBuffer

Bases: IBuffer

Overview

Naive replay buffer, can store and sample data. An naive implementation of replay buffer with no priority or any other advanced features. This buffer refers to multi-thread/multi-process and guarantees thread-safe, which means that methods like sample, push, clear are all mutual to each other.

Interface: start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config Property: replay_buffer_size, push_count

__init__(cfg, tb_logger=None, exp_name='default_experiment', instance_name='buffer')

Overview

Initialize the buffer

Arguments: - cfg (:obj:dict): Config dict. - tb_logger (:obj:Optional['SummaryWriter']): Outer tb logger. Usually get this argument in serial mode. - exp_name (:obj:Optional[str]): Name of this experiment. - instance_name (:obj:Optional[str]): Name of this instance.

start()

Overview

Start the buffer's used_data_remover thread if enables track_used_data.

close()

Overview

Clear the buffer; Join the buffer's used_data_remover thread if enables track_used_data.

push(data, cur_collector_envstep)

Overview

Push a data into buffer.

Arguments: - data (:obj:Union[List[Any], Any]): The data which will be pushed into buffer. Can be one \ (in Any type), or many(int List[Any] type). - cur_collector_envstep (:obj:int): Collector's current env step. \ Not used in naive buffer, but preserved for compatibility.

sample(size, cur_learner_iter, sample_range=None, replace=False)

Overview

Sample data with length size.

Arguments: - size (:obj:int): The number of the data that will be sampled. - cur_learner_iter (:obj:int): Learner's current iteration. Not used in naive buffer, but preserved for compatibility. - sample_range (:obj:slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data - replace (:obj:bool): Whether sample with replacement Returns: - sample_data (:obj:list): A list of data with length size.

update(info)

Overview

Naive Buffer does not need to update any info, but this method is preserved for compatibility.

clear()

Overview

Clear all the data and reset the related variables.

__del__()

Overview

Call close to delete the object.

count()

Overview

Count how many valid datas there are in the buffer.

Returns: - count (:obj:int): Number of valid data.

state_dict()

Overview

Provide a state dict to keep a record of current buffer.

Returns: - state_dict (:obj:Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

load_state_dict(_state_dict)

Overview

Load state dict to reproduce the buffer.

Returns: - state_dict (:obj:Dict[str, Any]): A dict containing all important values in the buffer.

ElasticReplayBuffer

Bases: NaiveReplayBuffer

Overview

Elastic replay buffer, it stores data and support dynamically change the buffer size. An naive implementation of replay buffer with no priority or any other advanced features. This buffer refers to multi-thread/multi-process and guarantees thread-safe, which means that methods like sample, push, clear are all mutual to each other.

Interface: start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config Property: replay_buffer_size, push_count

__init__(cfg, tb_logger=None, exp_name='default_experiment', instance_name='buffer')

Overview

Initialize the buffer

Arguments: - cfg (:obj:dict): Config dict. - tb_logger (:obj:Optional['SummaryWriter']): Outer tb logger. Usually get this argument in serial mode. - exp_name (:obj:Optional[str]): Name of this experiment. - instance_name (:obj:Optional[str]): Name of this instance.

SequenceReplayBuffer

Bases: NaiveReplayBuffer

Overview: Interface: start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config Property: replay_buffer_size, push_count

sample(batch, sequence, cur_learner_iter, sample_range=None, replace=False)

Overview

Sample data with length size.

Arguments: - size (:obj:int): The number of the data that will be sampled. - sequence (:obj:int): The length of the sequence of a data that will be sampled. - cur_learner_iter (:obj:int): Learner's current iteration. Not used in naive buffer, but preserved for compatibility. - sample_range (:obj:slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data - replace (:obj:bool): Whether sample with replacement Returns: - sample_data (:obj:list): A list of data with length size.

Full Source Code

../ding/worker/replay_buffer/naive_buffer.py

1import os 2import copy 3from typing import Union, Any, Optional, List 4import numpy as np 5import math 6import hickle 7from easydict import EasyDict 8 9from ding.worker.replay_buffer import IBuffer 10from ding.utils import LockContext, LockContextType, BUFFER_REGISTRY, build_logger 11from .utils import UsedDataRemover, PeriodicThruputMonitor 12 13 14@BUFFER_REGISTRY.register('naive') 15class NaiveReplayBuffer(IBuffer): 16 r""" 17 Overview: 18 Naive replay buffer, can store and sample data. 19 An naive implementation of replay buffer with no priority or any other advanced features. 20 This buffer refers to multi-thread/multi-process and guarantees thread-safe, which means that methods like 21 ``sample``, ``push``, ``clear`` are all mutual to each other. 22 Interface: 23 start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config 24 Property: 25 replay_buffer_size, push_count 26 """ 27 28 config = dict( 29 type='naive', 30 replay_buffer_size=10000, 31 deepcopy=False, 32 # default `False` for serial pipeline 33 enable_track_used_data=False, 34 periodic_thruput_seconds=60, 35 ) 36 37 def __init__( 38 self, 39 cfg: 'EasyDict', # noqa 40 tb_logger: Optional['SummaryWriter'] = None, # noqa 41 exp_name: Optional[str] = 'default_experiment', 42 instance_name: Optional[str] = 'buffer', 43 ) -> None: 44 """ 45 Overview: 46 Initialize the buffer 47 Arguments: 48 - cfg (:obj:`dict`): Config dict. 49 - tb_logger (:obj:`Optional['SummaryWriter']`): Outer tb logger. Usually get this argument in serial mode. 50 - exp_name (:obj:`Optional[str]`): Name of this experiment. 51 - instance_name (:obj:`Optional[str]`): Name of this instance. 52 """ 53 self._exp_name = exp_name 54 self._instance_name = instance_name 55 self._cfg = cfg 56 self._replay_buffer_size = self._cfg.replay_buffer_size 57 self._deepcopy = self._cfg.deepcopy 58 # ``_data`` is a circular queue to store data (full data or meta data) 59 self._data = [None for _ in range(self._replay_buffer_size)] 60 # Current valid data count, indicating how many elements in ``self._data`` is valid. 61 self._valid_count = 0 62 # How many pieces of data have been pushed into this buffer, should be no less than ``_valid_count``. 63 self._push_count = 0 64 # Point to the tail position where next data can be inserted, i.e. latest inserted data's next position. 65 self._tail = 0 66 # Lock to guarantee thread safe 67 self._lock = LockContext(lock_type=LockContextType.THREAD_LOCK) 68 self._end_flag = False 69 self._enable_track_used_data = self._cfg.enable_track_used_data 70 if self._enable_track_used_data: 71 self._used_data_remover = UsedDataRemover() 72 if tb_logger is not None: 73 self._logger, _ = build_logger( 74 './{}/log/{}'.format(self._exp_name, self._instance_name), self._instance_name, need_tb=False 75 ) 76 self._tb_logger = tb_logger 77 else: 78 self._logger, self._tb_logger = build_logger( 79 './{}/log/{}'.format(self._exp_name, self._instance_name), 80 self._instance_name, 81 ) 82 # Periodic thruput. Here by default, monitor range is 60 seconds. You can modify it for free. 83 self._periodic_thruput_monitor = PeriodicThruputMonitor( 84 self._instance_name, EasyDict(seconds=self._cfg.periodic_thruput_seconds), self._logger, self._tb_logger 85 ) 86 87 def start(self) -> None: 88 """ 89 Overview: 90 Start the buffer's used_data_remover thread if enables track_used_data. 91 """ 92 if self._enable_track_used_data: 93 self._used_data_remover.start() 94 95 def close(self) -> None: 96 """ 97 Overview: 98 Clear the buffer; Join the buffer's used_data_remover thread if enables track_used_data. 99 """ 100 self.clear() 101 if self._enable_track_used_data: 102 self._used_data_remover.close() 103 self._tb_logger.flush() 104 self._tb_logger.close() 105 106 def push(self, data: Union[List[Any], Any], cur_collector_envstep: int) -> None: 107 r""" 108 Overview: 109 Push a data into buffer. 110 Arguments: 111 - data (:obj:`Union[List[Any], Any]`): The data which will be pushed into buffer. Can be one \ 112 (in `Any` type), or many(int `List[Any]` type). 113 - cur_collector_envstep (:obj:`int`): Collector's current env step. \ 114 Not used in naive buffer, but preserved for compatibility. 115 """ 116 if isinstance(data, list): 117 self._extend(data, cur_collector_envstep) 118 self._periodic_thruput_monitor.push_data_count += len(data) 119 else: 120 self._append(data, cur_collector_envstep) 121 self._periodic_thruput_monitor.push_data_count += 1 122 123 def sample(self, 124 size: int, 125 cur_learner_iter: int, 126 sample_range: slice = None, 127 replace: bool = False) -> Optional[list]: 128 """ 129 Overview: 130 Sample data with length ``size``. 131 Arguments: 132 - size (:obj:`int`): The number of the data that will be sampled. 133 - cur_learner_iter (:obj:`int`): Learner's current iteration. \ 134 Not used in naive buffer, but preserved for compatibility. 135 - sample_range (:obj:`slice`): Buffer slice for sampling, such as `slice(-10, None)`, which \ 136 means only sample among the last 10 data 137 - replace (:obj:`bool`): Whether sample with replacement 138 Returns: 139 - sample_data (:obj:`list`): A list of data with length ``size``. 140 """ 141 if size == 0: 142 return [] 143 can_sample = self._sample_check(size, replace) 144 if not can_sample: 145 return None 146 with self._lock: 147 indices = self._get_indices(size, sample_range, replace) 148 sample_data = self._sample_with_indices(indices, cur_learner_iter) 149 self._periodic_thruput_monitor.sample_data_count += len(sample_data) 150 return sample_data 151 152 def save_data(self, file_name: str): 153 if not os.path.exists(os.path.dirname(file_name)): 154 if os.path.dirname(file_name) != "": 155 os.makedirs(os.path.dirname(file_name)) 156 hickle.dump(py_obj=self._data, file_obj=file_name) 157 158 def load_data(self, file_name: str): 159 self.push(hickle.load(file_name), 0) 160 161 def _append(self, ori_data: Any, cur_collector_envstep: int = -1) -> None: 162 r""" 163 Overview: 164 Append a data item into ``self._data``. 165 Arguments: 166 - ori_data (:obj:`Any`): The data which will be inserted. 167 - cur_collector_envstep (:obj:`int`): Not used in this method, but preserved for compatibility. 168 """ 169 with self._lock: 170 if self._deepcopy: 171 data = copy.deepcopy(ori_data) 172 else: 173 data = ori_data 174 self._push_count += 1 175 if self._data[self._tail] is None: 176 self._valid_count += 1 177 self._periodic_thruput_monitor.valid_count = self._valid_count 178 elif self._enable_track_used_data: 179 self._used_data_remover.add_used_data(self._data[self._tail]) 180 self._data[self._tail] = data 181 self._tail = (self._tail + 1) % self._replay_buffer_size 182 183 def _extend(self, ori_data: List[Any], cur_collector_envstep: int = -1) -> None: 184 r""" 185 Overview: 186 Extend a data list into queue. 187 Add two keys in each data item, you can refer to ``_append`` for details. 188 Arguments: 189 - ori_data (:obj:`List[Any]`): The data list. 190 - cur_collector_envstep (:obj:`int`): Not used in this method, but preserved for compatibility. 191 """ 192 with self._lock: 193 if self._deepcopy: 194 data = copy.deepcopy(ori_data) 195 else: 196 data = ori_data 197 length = len(data) 198 # When updating ``_data`` and ``_use_count``, should consider two cases regarding 199 # the relationship between "tail + data length" and "replay buffer size" to check whether 200 # data will exceed beyond buffer's max length limitation. 201 if self._tail + length <= self._replay_buffer_size: 202 if self._valid_count != self._replay_buffer_size: 203 self._valid_count += length 204 self._periodic_thruput_monitor.valid_count = self._valid_count 205 elif self._enable_track_used_data: 206 for i in range(length): 207 self._used_data_remover.add_used_data(self._data[self._tail + i]) 208 self._push_count += length 209 self._data[self._tail:self._tail + length] = data 210 else: 211 new_tail = self._tail 212 data_start = 0 213 residual_num = len(data) 214 while True: 215 space = self._replay_buffer_size - new_tail 216 L = min(space, residual_num) 217 if self._valid_count != self._replay_buffer_size: 218 self._valid_count += L 219 self._periodic_thruput_monitor.valid_count = self._valid_count 220 elif self._enable_track_used_data: 221 for i in range(L): 222 self._used_data_remover.add_used_data(self._data[new_tail + i]) 223 self._push_count += L 224 self._data[new_tail:new_tail + L] = data[data_start:data_start + L] 225 residual_num -= L 226 assert residual_num >= 0 227 if residual_num == 0: 228 break 229 else: 230 new_tail = 0 231 data_start += L 232 # Update ``tail`` and ``next_unique_id`` after the whole list is pushed into buffer. 233 self._tail = (self._tail + length) % self._replay_buffer_size 234 235 def _sample_check(self, size: int, replace: bool = False) -> bool: 236 r""" 237 Overview: 238 Check whether this buffer has more than `size` datas to sample. 239 Arguments: 240 - size (:obj:`int`): Number of data that will be sampled. 241 - replace (:obj:`bool`): Whether sample with replacement. 242 Returns: 243 - can_sample (:obj:`bool`): Whether this buffer can sample enough data. 244 """ 245 if self._valid_count == 0: 246 print("The buffer is empty") 247 return False 248 if self._valid_count < size and not replace: 249 print( 250 "No enough elements for sampling without replacement (expect: {} / current: {})".format( 251 size, self._valid_count 252 ) 253 ) 254 return False 255 else: 256 return True 257 258 def update(self, info: dict) -> None: 259 r""" 260 Overview: 261 Naive Buffer does not need to update any info, but this method is preserved for compatibility. 262 """ 263 print( 264 '[BUFFER WARNING] Naive Buffer does not need to update any info, \ 265 but `update` method is preserved for compatibility.' 266 ) 267 268 def clear(self) -> None: 269 """ 270 Overview: 271 Clear all the data and reset the related variables. 272 """ 273 with self._lock: 274 for i in range(len(self._data)): 275 if self._data[i] is not None: 276 if self._enable_track_used_data: 277 self._used_data_remover.add_used_data(self._data[i]) 278 self._data[i] = None 279 self._valid_count = 0 280 self._periodic_thruput_monitor.valid_count = self._valid_count 281 self._push_count = 0 282 self._tail = 0 283 284 def __del__(self) -> None: 285 """ 286 Overview: 287 Call ``close`` to delete the object. 288 """ 289 self.close() 290 291 def _get_indices(self, size: int, sample_range: slice = None, replace: bool = False) -> list: 292 r""" 293 Overview: 294 Get the sample index list. 295 Arguments: 296 - size (:obj:`int`): The number of the data that will be sampled 297 - sample_range (:obj:`slice`): Buffer slice for sampling, such as `slice(-10, None)`, which \ 298 means only sample among the last 10 data 299 Returns: 300 - index_list (:obj:`list`): A list including all the sample indices, whose length should equal to ``size``. 301 """ 302 assert self._valid_count <= self._replay_buffer_size 303 if self._valid_count == self._replay_buffer_size: 304 tail = self._replay_buffer_size 305 else: 306 tail = self._tail 307 if sample_range is None: 308 indices = list(np.random.choice(a=tail, size=size, replace=replace)) 309 else: 310 indices = list(range(tail))[sample_range] 311 indices = list(np.random.choice(indices, size=size, replace=replace)) 312 return indices 313 314 def _sample_with_indices(self, indices: List[int], cur_learner_iter: int) -> list: 315 r""" 316 Overview: 317 Sample data with ``indices``. 318 Arguments: 319 - indices (:obj:`List[int]`): A list including all the sample indices. 320 - cur_learner_iter (:obj:`int`): Not used in this method, but preserved for compatibility. 321 Returns: 322 - data (:obj:`list`) Sampled data. 323 """ 324 data = [] 325 for idx in indices: 326 assert self._data[idx] is not None, idx 327 if self._deepcopy: 328 copy_data = copy.deepcopy(self._data[idx]) 329 else: 330 copy_data = self._data[idx] 331 data.append(copy_data) 332 return data 333 334 def count(self) -> int: 335 """ 336 Overview: 337 Count how many valid datas there are in the buffer. 338 Returns: 339 - count (:obj:`int`): Number of valid data. 340 """ 341 return self._valid_count 342 343 def state_dict(self) -> dict: 344 """ 345 Overview: 346 Provide a state dict to keep a record of current buffer. 347 Returns: 348 - state_dict (:obj:`Dict[str, Any]`): A dict containing all important values in the buffer. \ 349 With the dict, one can easily reproduce the buffer. 350 """ 351 return { 352 'data': self._data, 353 'tail': self._tail, 354 'valid_count': self._valid_count, 355 'push_count': self._push_count, 356 } 357 358 def load_state_dict(self, _state_dict: dict) -> None: 359 """ 360 Overview: 361 Load state dict to reproduce the buffer. 362 Returns: 363 - state_dict (:obj:`Dict[str, Any]`): A dict containing all important values in the buffer. 364 """ 365 assert 'data' in _state_dict 366 if set(_state_dict.keys()) == set(['data']): 367 self._extend(_state_dict['data']) 368 else: 369 for k, v in _state_dict.items(): 370 setattr(self, '_{}'.format(k), v) 371 372 @property 373 def replay_buffer_size(self) -> int: 374 return self._replay_buffer_size 375 376 @property 377 def push_count(self) -> int: 378 return self._push_count 379 380 381@BUFFER_REGISTRY.register('elastic') 382class ElasticReplayBuffer(NaiveReplayBuffer): 383 r""" 384 Overview: 385 Elastic replay buffer, it stores data and support dynamically change the buffer size. 386 An naive implementation of replay buffer with no priority or any other advanced features. 387 This buffer refers to multi-thread/multi-process and guarantees thread-safe, which means that methods like 388 ``sample``, ``push``, ``clear`` are all mutual to each other. 389 Interface: 390 start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config 391 Property: 392 replay_buffer_size, push_count 393 """ 394 395 config = dict( 396 type='elastic', 397 replay_buffer_size=10000, 398 deepcopy=False, 399 # default `False` for serial pipeline 400 enable_track_used_data=False, 401 periodic_thruput_seconds=60, 402 ) 403 404 def __init__( 405 self, 406 cfg: 'EasyDict', # noqa 407 tb_logger: Optional['SummaryWriter'] = None, # noqa 408 exp_name: Optional[str] = 'default_experiment', 409 instance_name: Optional[str] = 'buffer', 410 ) -> None: 411 """ 412 Overview: 413 Initialize the buffer 414 Arguments: 415 - cfg (:obj:`dict`): Config dict. 416 - tb_logger (:obj:`Optional['SummaryWriter']`): Outer tb logger. Usually get this argument in serial mode. 417 - exp_name (:obj:`Optional[str]`): Name of this experiment. 418 - instance_name (:obj:`Optional[str]`): Name of this instance. 419 """ 420 super().__init__(cfg, tb_logger, exp_name, instance_name) 421 self._set_buffer_size = self._cfg.set_buffer_size 422 self._current_buffer_size = self._set_buffer_size(0) # Set the buffer size at the 0-th envstep. 423 # The variable 'current_buffer_size' restricts how many samples the buffer can use for sampling 424 425 def _sample_check(self, size: int, replace: bool = False) -> bool: 426 r""" 427 Overview: 428 Check whether this buffer has more than `size` datas to sample. 429 Arguments: 430 - size (:obj:`int`): Number of data that will be sampled. 431 - replace (:obj:`bool`): Whether sample with replacement. 432 Returns: 433 - can_sample (:obj:`bool`): Whether this buffer can sample enough data. 434 """ 435 valid_count = min(self._valid_count, self._current_buffer_size) 436 if valid_count == 0: 437 print("The buffer is empty") 438 return False 439 if valid_count < size and not replace: 440 print( 441 "No enough elements for sampling without replacement (expect: {} / current: {})".format( 442 size, self._valid_count 443 ) 444 ) 445 return False 446 else: 447 return True 448 449 def _get_indices(self, size: int, sample_range: slice = None, replace: bool = False) -> list: 450 r""" 451 Overview: 452 Get the sample index list. 453 Arguments: 454 - size (:obj:`int`): The number of the data that will be sampled. 455 - replace (:obj:`bool`): Whether sample with replacement. 456 Returns: 457 - index_list (:obj:`list`): A list including all the sample indices, whose length should equal to ``size``. 458 """ 459 assert self._valid_count <= self._replay_buffer_size 460 assert sample_range is None # not support 461 range = min(self._valid_count, self._current_buffer_size) 462 indices = list( 463 (self._tail - 1 - np.random.choice(a=range, size=size, replace=replace)) % self._replay_buffer_size 464 ) 465 return indices 466 467 def update(self, envstep): 468 self._current_buffer_size = self._set_buffer_size(envstep) 469 470 471@BUFFER_REGISTRY.register('sequence') 472class SequenceReplayBuffer(NaiveReplayBuffer): 473 r""" 474 Overview: 475 Interface: 476 start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config 477 Property: 478 replay_buffer_size, push_count 479 """ 480 481 def sample( 482 self, 483 batch: int, 484 sequence: int, 485 cur_learner_iter: int, 486 sample_range: slice = None, 487 replace: bool = False 488 ) -> Optional[list]: 489 """ 490 Overview: 491 Sample data with length ``size``. 492 Arguments: 493 - size (:obj:`int`): The number of the data that will be sampled. 494 - sequence (:obj:`int`): The length of the sequence of a data that will be sampled. 495 - cur_learner_iter (:obj:`int`): Learner's current iteration. \ 496 Not used in naive buffer, but preserved for compatibility. 497 - sample_range (:obj:`slice`): Buffer slice for sampling, such as `slice(-10, None)`, which \ 498 means only sample among the last 10 data 499 - replace (:obj:`bool`): Whether sample with replacement 500 Returns: 501 - sample_data (:obj:`list`): A list of data with length ``size``. 502 """ 503 if batch == 0: 504 return [] 505 can_sample = self._sample_check(batch * sequence, replace) 506 if not can_sample: 507 return None 508 with self._lock: 509 indices = self._get_indices(batch, sequence, sample_range, replace) 510 sample_data = self._sample_with_indices(indices, sequence, cur_learner_iter) 511 self._periodic_thruput_monitor.sample_data_count += len(sample_data) 512 return sample_data 513 514 def _get_indices(self, size: int, sequence: int, sample_range: slice = None, replace: bool = False) -> list: 515 r""" 516 Overview: 517 Get the sample index list. 518 Arguments: 519 - size (:obj:`int`): The number of the data that will be sampled 520 - sample_range (:obj:`slice`): Buffer slice for sampling, such as `slice(-10, None)`, which \ 521 means only sample among the last 10 data 522 Returns: 523 - index_list (:obj:`list`): A list including all the sample indices, whose length should equal to ``size``. 524 """ 525 assert self._valid_count <= self._replay_buffer_size 526 if self._valid_count == self._replay_buffer_size: 527 tail = self._replay_buffer_size 528 else: 529 tail = self._tail 530 episodes = math.ceil(self._valid_count / 500) 531 batch = 0 532 indices = [] 533 if sample_range is None: 534 while batch < size: 535 episode = np.random.choice(episodes) 536 length = tail - episode * 500 if tail - episode * 500 < 500 else 500 537 available = length - sequence 538 if available < 1: 539 continue 540 list(range(episode * 500, episode * 500 + available)) 541 indices.append(np.random.randint(episode * 500, episode * 500 + available + 1)) 542 batch += 1 543 else: 544 raise NotImplementedError("sample_range is not implemented in this version") 545 return indices 546 547 def _sample_with_indices(self, indices: List[int], sequence: int, cur_learner_iter: int) -> list: 548 r""" 549 Overview: 550 Sample data with ``indices``. 551 Arguments: 552 - indices (:obj:`List[int]`): A list including all the sample indices. 553 - cur_learner_iter (:obj:`int`): Not used in this method, but preserved for compatibility. 554 Returns: 555 - data (:obj:`list`) Sampled data. 556 """ 557 data = [] 558 for idx in indices: 559 assert self._data[idx] is not None, idx 560 if self._deepcopy: 561 copy_data = copy.deepcopy(self._data[idx:idx + sequence]) 562 else: 563 copy_data = self._data[idx:idx + sequence] 564 data.append(copy_data) 565 return data