Skip to content

ding.utils.data.structure.cache

ding.utils.data.structure.cache

Cache

Overview

Data cache for reducing concurrent pressure, with timeout and full queue eject mechanism

Interfaces: __init__, push_data, get_cached_data_iter, run, close Property: remain_data_count

remain_data_count property

Overview

Return receive queue's remain data count

Returns: - count (:obj:int): The size of the receive queue.

__init__(maxlen, timeout, monitor_interval=1.0, _debug=False)

Overview

Initialize the cache object.

Arguments: - maxlen (:obj:int): Maximum length of the cache queue. - timeout (:obj:float): Maximum second of the data can remain in the cache. - monitor_interval (:obj:float): Interval of the timeout monitor thread checks the time. - _debug (:obj:bool): Whether to use debug mode or not, which enables debug print info.

push_data(data)

Overview

Push data into receive queue, if the receive queue is full(after push), then push all the data in receive queue into send queue.

Arguments: - data (:obj:Any): The data which needs to be added into receive queue

.. tip:: thread-safe

get_cached_data_iter()

Overview

Get the iterator of the send queue. Once a data is pushed into send queue, it can be accessed by this iterator. 'STOP' is the end flag of this iterator.

Returns: - iterator (:obj:callable_iterator) The send queue iterator.

run()

Overview

Launch the cache internal thread, e.g. timeout monitor thread.

close()

Overview

Shut down the cache internal thread and send the end flag to send queue's iterator.

dprint(s)

Overview

In debug mode, print debug str.

Arguments: - s (:obj:str): Debug info to be printed.

Full Source Code

../ding/utils/data/structure/cache.py

1import time 2from queue import Queue 3from threading import Thread 4from typing import Any 5 6from ding.utils import LockContext, LockContextType 7 8 9class Cache: 10 """ 11 Overview: 12 Data cache for reducing concurrent pressure, with timeout and full queue eject mechanism 13 Interfaces: 14 ``__init__``, ``push_data``, ``get_cached_data_iter``, ``run``, ``close`` 15 Property: 16 remain_data_count 17 """ 18 19 def __init__(self, maxlen: int, timeout: float, monitor_interval: float = 1.0, _debug: bool = False) -> None: 20 """ 21 Overview: 22 Initialize the cache object. 23 Arguments: 24 - maxlen (:obj:`int`): Maximum length of the cache queue. 25 - timeout (:obj:`float`): Maximum second of the data can remain in the cache. 26 - monitor_interval (:obj:`float`): Interval of the timeout monitor thread checks the time. 27 - _debug (:obj:`bool`): Whether to use debug mode or not, which enables debug print info. 28 """ 29 assert maxlen > 0 30 self.maxlen = maxlen 31 self.timeout = timeout 32 self.monitor_interval = monitor_interval 33 self.debug = _debug 34 # two separate receive and send queue for reducing interaction frequency and interference 35 self.receive_queue = Queue(maxlen) 36 self.send_queue = Queue(maxlen) 37 self.receive_lock = LockContext(lock_type=LockContextType.THREAD_LOCK) 38 self._timeout_thread = Thread(target=self._timeout_monitor) 39 # the bool flag for gracefully shutting down the timeout monitor thread 40 self._timeout_thread_flag = True 41 42 def push_data(self, data: Any) -> None: 43 """ 44 Overview: 45 Push data into receive queue, if the receive queue is full(after push), then push all the data 46 in receive queue into send queue. 47 Arguments: 48 - data (:obj:`Any`): The data which needs to be added into receive queue 49 50 .. tip:: 51 thread-safe 52 """ 53 with self.receive_lock: 54 # Push the data item and current time together into queue 55 self.receive_queue.put([data, time.time()]) 56 if self.receive_queue.full(): 57 self.dprint('send total receive_queue, current len:{}'.format(self.receive_queue.qsize())) 58 while not self.receive_queue.empty(): 59 # Only send raw data to send queue 60 self.send_queue.put(self.receive_queue.get()[0]) 61 62 def get_cached_data_iter(self) -> 'callable_iterator': # noqa 63 """ 64 Overview: 65 Get the iterator of the send queue. Once a data is pushed into send queue, it can be accessed by 66 this iterator. 'STOP' is the end flag of this iterator. 67 Returns: 68 - iterator (:obj:`callable_iterator`) The send queue iterator. 69 """ 70 return iter(self.send_queue.get, 'STOP') 71 72 def _timeout_monitor(self) -> None: 73 """ 74 Overview: 75 The workflow of the timeout monitor thread. 76 """ 77 # Loop until the flag is set to False 78 while self._timeout_thread_flag: 79 # A fixed check interval 80 time.sleep(self.monitor_interval) 81 with self.receive_lock: 82 # For non-empty receive_queue, check the time from head to tail(only access no pop) until finding 83 # the first data which is not timeout 84 while not self.receive_queue.empty(): 85 # Check the time of the data remains in the receive_queue, if excesses the timeout then returns True 86 is_timeout = self._warn_if_timeout() 87 if not is_timeout: 88 break 89 90 def _warn_if_timeout(self) -> bool: 91 """ 92 Overview: 93 Return whether is timeout. 94 Returns 95 - result: (:obj:`bool`) Whether is timeout. 96 """ 97 wait_time = time.time() - self.receive_queue.queue[0][1] 98 if wait_time >= self.timeout: 99 self.dprint( 100 'excess the maximum wait time, eject from the cache.(wait_time/timeout: {}/{}'.format( 101 wait_time, self.timeout 102 ) 103 ) 104 self.send_queue.put(self.receive_queue.get()[0]) 105 return True 106 else: 107 return False 108 109 def run(self) -> None: 110 """ 111 Overview: 112 Launch the cache internal thread, e.g. timeout monitor thread. 113 """ 114 self._timeout_thread.start() 115 116 def close(self) -> None: 117 """ 118 Overview: 119 Shut down the cache internal thread and send the end flag to send queue's iterator. 120 """ 121 self._timeout_thread_flag = False 122 self.send_queue.put('STOP') 123 124 def dprint(self, s: str) -> None: 125 """ 126 Overview: 127 In debug mode, print debug str. 128 Arguments: 129 - s (:obj:`str`): Debug info to be printed. 130 """ 131 if self.debug: 132 print('[CACHE] ' + s) 133 134 @property 135 def remain_data_count(self) -> int: 136 """ 137 Overview: 138 Return receive queue's remain data count 139 Returns: 140 - count (:obj:`int`): The size of the receive queue. 141 """ 142 return self.receive_queue.qsize()