Skip to content

ding.interaction.master.connection

ding.interaction.master.connection

SlaveConnection

Bases: _ISlaveConnection

Overview

Slave connection object, which need to directly interact with slave end.

is_connected property

Overview

Check connection status

Returns: - connected (:obj:bool): Whether this connection is still alive

__init__(host, port=None, https=False, channel=None, my_address=None, token=None, request_retries=None, request_retry_waiting=None)

Overview

Constructor of SlaveConnection

Arguments: - host (:obj:str): Host of the slave server - port (:obj:Optional[int]): Port of the slave server (None means 7236) - https (:obj:bool): Use https or not - channel (:obj:Optional[int]): Channel id for the slave client. - my_address (:obj:Optional[str]): The address of current server (None will grep local ip automatically, this address will be used when connect to slave, the slave's request will be send to this address, so please make sure the address can be achieved by slave) - token (:obj:Optional[str]): Token of this connection, it is a token for authenticate to the connection (None means this token would be randomly generated) - request_retries (:obj:Optional[int]): Max times for request retries (None means 5) - request_retry_waiting (:obj:Optional[float]): Sleep time before requests' retrying (None means 1.0, unit: second)

SlaveConnectionProxy

Bases: _ISlaveConnection

Overview

Proxy class for SlaveConnection class, which wraps the original methods.

is_connected property

Overview

Check connection status

Returns: - connected (:obj:bool): Whether this connection is still alive

__init__(connection, after_connect=None, after_disconnect=None)

Overview

Constructor of SlaveConnectionProxy

Arguments: - connection (:obj:SlaveConnection): Slave connection object - after_connect (:obj:Optional[Callable]): Behaviour going to be executed after connection established - after_disconnect (:obj:Optional[Callable]): Behaviour going to be executed after connection killed

Full Source Code

../ding/interaction/master/connection.py

1from abc import ABCMeta, abstractmethod 2from functools import wraps 3from threading import Lock 4from typing import Optional, Any, Mapping, Type, Callable 5from uuid import uuid4, UUID 6 7import requests 8from requests.exceptions import RequestException 9 10from .base import _BEFORE_HOOK_TYPE, _AFTER_HOOK_TYPE, _ERROR_HOOK_TYPE 11from .task import Task, _task_complete, _task_fail 12from ..base import random_token, ControllableContext, get_http_engine_class, get_values_from_response 13from ..config import DEFAULT_CHANNEL, DEFAULT_SLAVE_PORT, DEFAULT_REQUEST_RETRIES, DEFAULT_REQUEST_RETRY_WAITING 14from ..exception import get_slave_exception_by_error 15 16_COMPLETE_TRIGGER_NAME = '__TASK_COMPLETE__' 17_FAIL_TRIGGER_NAME = '__TASK_FAIL__' 18 19 20class _ISlaveConnection(ControllableContext, metaclass=ABCMeta): 21 """ 22 Overview: 23 Basic model of the connection classes, such as `SlaveConnection` and `SlaveConnectionProxy`, \ 24 which are used widely in interaction module. 25 Example: 26 - The following code shows a sample to correctly use slave connection 27 >>> connection = master.new_connection('cnn1,', '127.0.0.1', 2333) 28 >>> connection.connect() 29 >>> try: 30 >>> pass # do anything you like 31 >>> finally: 32 >>> connection.disconnect() 33 34 - Another simple structure of the code above 35 >>> with master.new_connection('cnn1,', '127.0.0.1', 2333) as connection: 36 >>> pass # do anything you like, connect and disconnect will be done automatically 37 """ 38 39 @abstractmethod 40 def connect(self): 41 """ 42 Overview: 43 Connect to slave end. 44 """ 45 raise NotImplementedError # pragma: no cover 46 47 @abstractmethod 48 def disconnect(self): 49 """ 50 Overview: 51 Disconnect from slave end. 52 """ 53 raise NotImplementedError # pragma: no cover 54 55 @abstractmethod 56 def new_task(self, data: Optional[Mapping[str, Any]] = None): 57 """ 58 Overview: 59 Send new task to slave end and receive task result from it. 60 Arguments: 61 - data (:obj:`Optional[Mapping[str, Any]]`): Data of the new task 62 Returns: 63 - result (:obj:`Mapping[str, Any]`): Result of the task processed by slave end 64 """ 65 raise NotImplementedError # pragma: no cover 66 67 def start(self): 68 """ 69 Overview: 70 Alias for `connect`, for supporting context manager. 71 """ 72 self.connect() 73 74 def close(self): 75 """ 76 Overview: 77 Alias for `disconnect`, for support context manager. 78 """ 79 self.disconnect() 80 81 82class SlaveConnection(_ISlaveConnection, metaclass=ABCMeta): 83 """ 84 Overview: 85 Slave connection object, which need to directly interact with slave end. 86 """ 87 88 def __init__( 89 self, 90 host: str, 91 port: Optional[int] = None, 92 https: bool = False, 93 channel: Optional[int] = None, 94 my_address: Optional[str] = None, 95 token: Optional[str] = None, 96 request_retries: Optional[int] = None, 97 request_retry_waiting: Optional[float] = None, 98 ): 99 """ 100 Overview: 101 Constructor of `SlaveConnection` 102 Arguments: 103 - host (:obj:`str`): Host of the slave server 104 - port (:obj:`Optional[int]`): Port of the slave server (None means `7236`) 105 - https (:obj:`bool`): Use https or not 106 - channel (:obj:`Optional[int]`): Channel id for the slave client. 107 - my_address (:obj:`Optional[str]`): The address of current server (None will grep local ip automatically, \ 108 this address will be used when connect to slave, the slave's request will be send to this address, \ 109 **so please make sure the address can be achieved by slave**) 110 - token (:obj:`Optional[str]`): Token of this connection, it is a token for authenticate to the \ 111 connection (`None` means this token would be randomly generated) 112 - request_retries (:obj:`Optional[int]`): Max times for request retries (None means `5`) 113 - request_retry_waiting (:obj:`Optional[float]`): Sleep time before requests' retrying (None means `1.0`, \ 114 unit: second) 115 """ 116 # meta info part 117 self.__channel = channel or DEFAULT_CHANNEL 118 self.__my_address = my_address 119 self.__token = token or random_token() 120 121 # request part 122 self.__http_engine = get_http_engine_class( 123 headers={ 124 'Channel': lambda: str(self.__channel), 125 'Token': lambda: self.__token, 126 }, 127 http_error_gene=get_slave_exception_by_error, 128 )()(host, port or DEFAULT_SLAVE_PORT, https) 129 self.__request_retries = max(request_retries or DEFAULT_REQUEST_RETRIES, 0) 130 self.__request_retry_waiting = max(request_retry_waiting or DEFAULT_REQUEST_RETRY_WAITING, 0.0) 131 132 # threading part 133 self.__lock = Lock() 134 self.__is_connected = False 135 136 # task part 137 self.__tasks = {} 138 139 self.__init_triggers() 140 141 def __request(self, method: str, path: str, data: Optional[Mapping[str, Any]] = None) -> requests.Response: 142 return self.__http_engine.request( 143 method, 144 path, 145 data, 146 retries=self.__request_retries, 147 retry_waiting=self.__request_retry_waiting, 148 ) 149 150 @property 151 def is_connected(self) -> bool: 152 """ 153 Overview: 154 Check connection status 155 Returns: 156 - connected (:obj:`bool`): Whether this connection is still alive 157 """ 158 with self.__lock: 159 return self.__is_connected 160 161 def _before_connect(self) -> Mapping[str, Any]: 162 pass # pragma: no cover 163 164 def _after_connect( 165 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, Any]] 166 ) -> Any: 167 pass # pragma: no cover 168 169 def _error_connect(self, error: RequestException) -> Any: 170 raise error # pragma: no cover 171 172 def __connect(self): 173 try: 174 response = self.__request( 175 'POST', '/connect', { 176 'master': { 177 'address': self.__my_address, 178 }, 179 'data': (self._before_connect() or {}) 180 } 181 ) 182 except RequestException as err: 183 return self._error_connect(err) 184 else: 185 self.__is_connected = True 186 return self._after_connect(*get_values_from_response(response)) 187 188 def connect(self): 189 with self.__lock: 190 return self.__connect() 191 192 def _before_disconnect(self) -> Mapping[str, Any]: 193 pass # pragma: no cover 194 195 def _after_disconnect( 196 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, Any]] 197 ) -> Any: 198 pass # pragma: no cover 199 200 def _error_disconnect(self, error: RequestException) -> Any: 201 raise error # pragma: no cover 202 203 def __disconnect(self): 204 try: 205 response = self.__request('DELETE', '/disconnect', { 206 'data': self._before_disconnect() or {}, 207 }) 208 except RequestException as err: 209 return self._error_disconnect(err) 210 else: 211 self.__is_connected = False 212 return self._after_disconnect(*get_values_from_response(response)) 213 214 def disconnect(self): 215 with self.__lock: 216 return self.__disconnect() 217 218 def _before_new_task(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]: 219 return data # pragma: no cover 220 221 def _after_new_task( 222 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, Any]] 223 ) -> Any: 224 pass # pragma: no cover 225 226 def _error_new_task(self, error: RequestException) -> Any: 227 raise error # pragma: no cover 228 229 def new_task(self, data: Optional[Mapping[str, Any]] = None) -> Task: 230 with self.__lock: 231 _uuid = uuid4() 232 _task = Task( 233 http_engine=self.__http_engine, 234 data=data, 235 task_id=_uuid, 236 before_task_start=self._before_new_task, 237 after_task_start=self._after_new_task, 238 error_task_start=self._error_new_task, 239 ) 240 241 self.__tasks[_uuid] = _task 242 return _task 243 244 def __task_complete(self, task_id: UUID, task_result: Mapping[str, Any]): 245 _task = self.__tasks[task_id] 246 _task_complete(_task, task_result) 247 del self.__tasks[task_id] 248 249 def __task_fail(self, task_id: UUID, task_result: Mapping[str, Any]): 250 _task = self.__tasks[task_id] 251 _task_fail(_task, task_result) 252 del self.__tasks[task_id] 253 254 def __task_complete_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 255 with self.__lock: 256 if task_id in self.__tasks.keys(): 257 return self.__task_complete(task_id, task_result) 258 else: 259 raise KeyError("Task {uuid} not found in this connection.".format(uuid=repr(str(task_id)))) 260 261 def __task_fail_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 262 with self.__lock: 263 if task_id in self.__tasks.keys(): 264 return self.__task_fail(task_id, task_result) 265 else: 266 raise KeyError("Task {uuid} not found in this connection.".format(uuid=repr(str(task_id)))) 267 268 def __init_triggers(self): 269 setattr(self, _COMPLETE_TRIGGER_NAME, self.__task_complete_trigger) 270 setattr(self, _FAIL_TRIGGER_NAME, self.__task_fail_trigger) 271 272 273def _connection_task_complete(connection: SlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 274 return getattr(connection, _COMPLETE_TRIGGER_NAME)(task_id, task_result) 275 276 277def _connection_task_fail(connection: SlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 278 return getattr(connection, _FAIL_TRIGGER_NAME)(task_id, task_result) 279 280 281class SlaveConnectionProxy(_ISlaveConnection): 282 """ 283 Overview: 284 Proxy class for `SlaveConnection` class, which wraps the original methods. 285 """ 286 287 def __init__( 288 self, 289 connection: SlaveConnection, 290 after_connect: Optional[Callable] = None, 291 after_disconnect: Optional[Callable] = None 292 ): 293 """ 294 Overview: 295 Constructor of `SlaveConnectionProxy` 296 Arguments: 297 - connection (:obj:`SlaveConnection`): Slave connection object 298 - after_connect (:obj:`Optional[Callable]`): Behaviour going to be executed after connection established 299 - after_disconnect (:obj:`Optional[Callable]`): Behaviour going to be executed after connection killed 300 """ 301 self.__connection = connection 302 self.__lock = Lock() 303 self.__after_connect = after_connect 304 self.__after_disconnect = after_disconnect 305 306 self.__init_triggers() 307 308 @property 309 def is_connected(self) -> bool: 310 """ 311 Overview: 312 Check connection status 313 Returns: 314 - connected (:obj:`bool`): Whether this connection is still alive 315 """ 316 with self.__lock: 317 return self.__connection.is_connected 318 319 def connect(self): 320 with self.__lock: 321 result = self.__connection.connect() 322 if self.__after_connect is not None: 323 self.__after_connect(connection=self) 324 return result 325 326 def disconnect(self): 327 with self.__lock: 328 result = self.__connection.disconnect() 329 if self.__after_disconnect is not None: 330 self.__after_disconnect(connection=self) 331 return result 332 333 def new_task(self, data: Optional[Mapping[str, Any]] = None): 334 with self.__lock: 335 return self.__connection.new_task(data) 336 337 def __task_complete_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 338 with self.__lock: 339 return _connection_task_complete(self.__connection, task_id, task_result) 340 341 def __task_fail_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 342 with self.__lock: 343 return _connection_task_fail(self.__connection, task_id, task_result) 344 345 def __init_triggers(self): 346 setattr(self, _COMPLETE_TRIGGER_NAME, self.__task_complete_trigger) 347 setattr(self, _FAIL_TRIGGER_NAME, self.__task_fail_trigger) 348 349 350def _proxy_task_complete(proxy: SlaveConnectionProxy, task_id: UUID, task_result: Mapping[str, Any]): 351 return getattr(proxy, _COMPLETE_TRIGGER_NAME)(task_id, task_result) 352 353 354def _proxy_task_fail(proxy: SlaveConnectionProxy, task_id: UUID, task_result: Mapping[str, Any]): 355 return getattr(proxy, _FAIL_TRIGGER_NAME)(task_id, task_result) 356 357 358def _slave_task_complete(connection: _ISlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 359 if isinstance(connection, SlaveConnection): 360 return _connection_task_complete(connection, task_id, task_result) 361 elif isinstance(connection, SlaveConnectionProxy): 362 return _proxy_task_complete(connection, task_id, task_result) 363 else: 364 raise TypeError( 365 "{expect1} or {expect2} expected, but {actual} found.".format( 366 expect1=SlaveConnection.__name__, 367 expect2=SlaveConnectionProxy.__name__, 368 actual=type(connection).__name__, 369 ) 370 ) 371 372 373def _slave_task_fail(connection: _ISlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 374 if isinstance(connection, SlaveConnection): 375 return _connection_task_fail(connection, task_id, task_result) 376 elif isinstance(connection, SlaveConnectionProxy): 377 return _proxy_task_fail(connection, task_id, task_result) 378 else: 379 raise TypeError( 380 "{expect1} or {expect2} expected, but {actual} found.".format( 381 expect1=SlaveConnection.__name__, 382 expect2=SlaveConnectionProxy.__name__, 383 actual=type(connection).__name__, 384 ) 385 ) 386 387 388def _default_wrap(func: Callable) -> Callable: 389 390 @wraps(func) 391 def _new_func(*args, **kwargs): 392 if func: 393 return func(*args, **kwargs) 394 else: 395 return None 396 397 return _new_func 398 399 400def _get_connection_class( 401 before_new_task: Optional[_BEFORE_HOOK_TYPE] = None, 402 after_new_task: Optional[_AFTER_HOOK_TYPE] = None, 403 error_new_task: Optional[_ERROR_HOOK_TYPE] = None, 404 before_connect: Optional[_BEFORE_HOOK_TYPE] = None, 405 after_connect: Optional[_AFTER_HOOK_TYPE] = None, 406 error_connect: Optional[_ERROR_HOOK_TYPE] = None, 407 before_disconnect: Optional[_BEFORE_HOOK_TYPE] = None, 408 after_disconnect: Optional[_AFTER_HOOK_TYPE] = None, 409 error_disconnect: Optional[_ERROR_HOOK_TYPE] = None, 410) -> Type[SlaveConnection]: 411 412 class _Connection(SlaveConnection): 413 414 def _before_connect(self) -> Mapping[str, Any]: 415 return _default_wrap(before_connect)() or {} 416 417 def _after_connect( 418 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, 419 Any]] 420 ) -> Any: 421 return _default_wrap(after_connect)(status_code, success, code, message, data) 422 423 def _error_connect(self, error: RequestException) -> Any: 424 return _default_wrap(error_connect)(error) 425 426 def _before_disconnect(self) -> Mapping[str, Any]: 427 return _default_wrap(before_disconnect)() or {} 428 429 def _after_disconnect( 430 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, 431 Any]] 432 ) -> Any: 433 return _default_wrap(after_disconnect)(status_code, success, code, message, data) 434 435 def _error_disconnect(self, error: RequestException) -> Any: 436 return _default_wrap(error_disconnect)(error) 437 438 def _before_new_task(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]: 439 return _default_wrap(before_new_task)(data) or {} 440 441 def _after_new_task( 442 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, 443 Any]] 444 ) -> Any: 445 return _default_wrap(after_new_task)(status_code, success, code, message, data) 446 447 def _error_new_task(self, error: RequestException) -> Any: 448 return _default_wrap(error_new_task)(error) 449 450 return _Connection