Skip to content

ding.framework.event_loop

ding.framework.event_loop

EventLoop

on(event, fn)

Overview

Subscribe to an event, execute this function every time the event is emitted.

Arguments: - event (:obj:str): Event name. - fn (:obj:Callable): The function.

off(event, fn=None)

Overview

Unsubscribe an event, or a specific function in the event.

Arguments: - event (:obj:str): Event name. - fn (:obj:Optional[Callable]): The function.

once(event, fn)

Overview

Subscribe to an event, execute this function only once when the event is emitted.

Arguments: - event (:obj:str): Event name. - fn (:obj:Callable): The function.

emit(event, *args, **kwargs)

Overview

Emit an event, call listeners. If there is an unhandled error in this event loop, calling emit will raise an exception, which will cause the process to exit.

Arguments: - event (:obj:str): Event name.

listened(event)

Overview

Check if the event has been listened to.

Arguments: - event (:obj:str): Event name Returns: - listened (:obj:bool): Whether this event has been listened to.

get_event_loop(name='default') classmethod

Overview

Get new event loop when name not exists, or return the existed instance.

Arguments: - name (:obj:str): Name of event loop.

Full Source Code

../ding/framework/event_loop.py

1from collections import defaultdict 2from typing import Callable, Optional 3from concurrent.futures import ThreadPoolExecutor 4from copy import copy 5import fnmatch 6from ditk import logging 7 8 9class EventLoop: 10 loops = {} 11 12 def __init__(self, name: str = "default") -> None: 13 self._name = name 14 self._listeners = defaultdict(list) 15 self._thread_pool = ThreadPoolExecutor(max_workers=2) 16 self._exception = None 17 self._active = True 18 19 def on(self, event: str, fn: Callable) -> None: 20 """ 21 Overview: 22 Subscribe to an event, execute this function every time the event is emitted. 23 Arguments: 24 - event (:obj:`str`): Event name. 25 - fn (:obj:`Callable`): The function. 26 """ 27 self._listeners[event].append(fn) 28 29 def off(self, event: str, fn: Optional[Callable] = None) -> None: 30 """ 31 Overview: 32 Unsubscribe an event, or a specific function in the event. 33 Arguments: 34 - event (:obj:`str`): Event name. 35 - fn (:obj:`Optional[Callable]`): The function. 36 """ 37 for e in fnmatch.filter(self._listeners.keys(), event): 38 if fn: 39 try: 40 self._listeners[e].remove(fn) 41 except: 42 pass 43 else: 44 self._listeners[e] = [] 45 46 def once(self, event: str, fn: Callable) -> None: 47 """ 48 Overview: 49 Subscribe to an event, execute this function only once when the event is emitted. 50 Arguments: 51 - event (:obj:`str`): Event name. 52 - fn (:obj:`Callable`): The function. 53 """ 54 55 def once_callback(*args, **kwargs): 56 self.off(event, once_callback) 57 fn(*args, **kwargs) 58 59 self.on(event, once_callback) 60 61 def emit(self, event: str, *args, **kwargs) -> None: 62 """ 63 Overview: 64 Emit an event, call listeners. 65 If there is an unhandled error in this event loop, calling emit will raise an exception, 66 which will cause the process to exit. 67 Arguments: 68 - event (:obj:`str`): Event name. 69 """ 70 if self._exception: 71 raise self._exception 72 if self._active: 73 self._thread_pool.submit(self._trigger, event, *args, **kwargs) 74 75 def _trigger(self, event: str, *args, **kwargs) -> None: 76 """ 77 Overview: 78 Execute the callbacks under the event. If any callback raise an exception, 79 we will save the traceback and ignore the exception. 80 Arguments: 81 - event (:obj:`str`): Event name. 82 """ 83 if event not in self._listeners: 84 logging.debug("Event {} is not registered in the callbacks of {}!".format(event, self._name)) 85 return 86 for fn in copy(self._listeners[event]): 87 try: 88 fn(*args, **kwargs) 89 except Exception as e: 90 self._exception = e 91 92 def listened(self, event: str) -> bool: 93 """ 94 Overview: 95 Check if the event has been listened to. 96 Arguments: 97 - event (:obj:`str`): Event name 98 Returns: 99 - listened (:obj:`bool`): Whether this event has been listened to. 100 """ 101 return event in self._listeners 102 103 @classmethod 104 def get_event_loop(cls: type, name: str = "default") -> "EventLoop": 105 """ 106 Overview: 107 Get new event loop when name not exists, or return the existed instance. 108 Arguments: 109 - name (:obj:`str`): Name of event loop. 110 """ 111 if name in cls.loops: 112 return cls.loops[name] 113 cls.loops[name] = loop = cls(name) 114 return loop 115 116 def stop(self) -> None: 117 self._active = False 118 self._listeners = defaultdict(list) 119 self._exception = None 120 self._thread_pool.shutdown() 121 if self._name in EventLoop.loops: 122 del EventLoop.loops[self._name] 123 124 def __del__(self) -> None: 125 if self._active: 126 self.stop()