1from abc import ABCMeta, abstractmethod 2from functools import wraps 3from threading import Lock 4from typing import Optional, Any, Mapping, Type, Callable 5from uuid import uuid4, UUID 6 7import requests 8from requests.exceptions import RequestException 9 10from .base import _BEFORE_HOOK_TYPE, _AFTER_HOOK_TYPE, _ERROR_HOOK_TYPE 11from .task import Task, _task_complete, _task_fail 12from ..base import random_token, ControllableContext, get_http_engine_class, get_values_from_response 13from ..config import DEFAULT_CHANNEL, DEFAULT_SLAVE_PORT, DEFAULT_REQUEST_RETRIES, DEFAULT_REQUEST_RETRY_WAITING 14from ..exception import get_slave_exception_by_error 15 16_COMPLETE_TRIGGER_NAME = '__TASK_COMPLETE__' 17_FAIL_TRIGGER_NAME = '__TASK_FAIL__' 18 19 20class _ISlaveConnection(ControllableContext, metaclass=ABCMeta): 21 """ 22 Overview: 23 Basic model of the connection classes, such as `SlaveConnection` and `SlaveConnectionProxy`, \ 24 which are used widely in interaction module. 25 Example: 26 - The following code shows a sample to correctly use slave connection 27 >>> connection = master.new_connection('cnn1,', '127.0.0.1', 2333) 28 >>> connection.connect() 29 >>> try: 30 >>> pass # do anything you like 31 >>> finally: 32 >>> connection.disconnect() 33 34 - Another simple structure of the code above 35 >>> with master.new_connection('cnn1,', '127.0.0.1', 2333) as connection: 36 >>> pass # do anything you like, connect and disconnect will be done automatically 37 """ 38 39 @abstractmethod 40 def connect(self): 41 """ 42 Overview: 43 Connect to slave end. 44 """ 45 raise NotImplementedError # pragma: no cover 46 47 @abstractmethod 48 def disconnect(self): 49 """ 50 Overview: 51 Disconnect from slave end. 52 """ 53 raise NotImplementedError # pragma: no cover 54 55 @abstractmethod 56 def new_task(self, data: Optional[Mapping[str, Any]] = None): 57 """ 58 Overview: 59 Send new task to slave end and receive task result from it. 60 Arguments: 61 - data (:obj:`Optional[Mapping[str, Any]]`): Data of the new task 62 Returns: 63 - result (:obj:`Mapping[str, Any]`): Result of the task processed by slave end 64 """ 65 raise NotImplementedError # pragma: no cover 66 67 def start(self): 68 """ 69 Overview: 70 Alias for `connect`, for supporting context manager. 71 """ 72 self.connect() 73 74 def close(self): 75 """ 76 Overview: 77 Alias for `disconnect`, for support context manager. 78 """ 79 self.disconnect() 80 81 82class SlaveConnection(_ISlaveConnection, metaclass=ABCMeta): 83 """ 84 Overview: 85 Slave connection object, which need to directly interact with slave end. 86 """ 87 88 def __init__( 89 self, 90 host: str, 91 port: Optional[int] = None, 92 https: bool = False, 93 channel: Optional[int] = None, 94 my_address: Optional[str] = None, 95 token: Optional[str] = None, 96 request_retries: Optional[int] = None, 97 request_retry_waiting: Optional[float] = None, 98 ): 99 """ 100 Overview: 101 Constructor of `SlaveConnection` 102 Arguments: 103 - host (:obj:`str`): Host of the slave server 104 - port (:obj:`Optional[int]`): Port of the slave server (None means `7236`) 105 - https (:obj:`bool`): Use https or not 106 - channel (:obj:`Optional[int]`): Channel id for the slave client. 107 - my_address (:obj:`Optional[str]`): The address of current server (None will grep local ip automatically, \ 108 this address will be used when connect to slave, the slave's request will be send to this address, \ 109 **so please make sure the address can be achieved by slave**) 110 - token (:obj:`Optional[str]`): Token of this connection, it is a token for authenticate to the \ 111 connection (`None` means this token would be randomly generated) 112 - request_retries (:obj:`Optional[int]`): Max times for request retries (None means `5`) 113 - request_retry_waiting (:obj:`Optional[float]`): Sleep time before requests' retrying (None means `1.0`, \ 114 unit: second) 115 """ 116 # meta info part 117 self.__channel = channel or DEFAULT_CHANNEL 118 self.__my_address = my_address 119 self.__token = token or random_token() 120 121 # request part 122 self.__http_engine = get_http_engine_class( 123 headers={ 124 'Channel': lambda: str(self.__channel), 125 'Token': lambda: self.__token, 126 }, 127 http_error_gene=get_slave_exception_by_error, 128 )()(host, port or DEFAULT_SLAVE_PORT, https) 129 self.__request_retries = max(request_retries or DEFAULT_REQUEST_RETRIES, 0) 130 self.__request_retry_waiting = max(request_retry_waiting or DEFAULT_REQUEST_RETRY_WAITING, 0.0) 131 132 # threading part 133 self.__lock = Lock() 134 self.__is_connected = False 135 136 # task part 137 self.__tasks = {} 138 139 self.__init_triggers() 140 141 def __request(self, method: str, path: str, data: Optional[Mapping[str, Any]] = None) -> requests.Response: 142 return self.__http_engine.request( 143 method, 144 path, 145 data, 146 retries=self.__request_retries, 147 retry_waiting=self.__request_retry_waiting, 148 ) 149 150 @property 151 def is_connected(self) -> bool: 152 """ 153 Overview: 154 Check connection status 155 Returns: 156 - connected (:obj:`bool`): Whether this connection is still alive 157 """ 158 with self.__lock: 159 return self.__is_connected 160 161 def _before_connect(self) -> Mapping[str, Any]: 162 pass # pragma: no cover 163 164 def _after_connect( 165 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, Any]] 166 ) -> Any: 167 pass # pragma: no cover 168 169 def _error_connect(self, error: RequestException) -> Any: 170 raise error # pragma: no cover 171 172 def __connect(self): 173 try: 174 response = self.__request( 175 'POST', '/connect', { 176 'master': { 177 'address': self.__my_address, 178 }, 179 'data': (self._before_connect() or {}) 180 } 181 ) 182 except RequestException as err: 183 return self._error_connect(err) 184 else: 185 self.__is_connected = True 186 return self._after_connect(*get_values_from_response(response)) 187 188 def connect(self): 189 with self.__lock: 190 return self.__connect() 191 192 def _before_disconnect(self) -> Mapping[str, Any]: 193 pass # pragma: no cover 194 195 def _after_disconnect( 196 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, Any]] 197 ) -> Any: 198 pass # pragma: no cover 199 200 def _error_disconnect(self, error: RequestException) -> Any: 201 raise error # pragma: no cover 202 203 def __disconnect(self): 204 try: 205 response = self.__request('DELETE', '/disconnect', { 206 'data': self._before_disconnect() or {}, 207 }) 208 except RequestException as err: 209 return self._error_disconnect(err) 210 else: 211 self.__is_connected = False 212 return self._after_disconnect(*get_values_from_response(response)) 213 214 def disconnect(self): 215 with self.__lock: 216 return self.__disconnect() 217 218 def _before_new_task(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]: 219 return data # pragma: no cover 220 221 def _after_new_task( 222 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, Any]] 223 ) -> Any: 224 pass # pragma: no cover 225 226 def _error_new_task(self, error: RequestException) -> Any: 227 raise error # pragma: no cover 228 229 def new_task(self, data: Optional[Mapping[str, Any]] = None) -> Task: 230 with self.__lock: 231 _uuid = uuid4() 232 _task = Task( 233 http_engine=self.__http_engine, 234 data=data, 235 task_id=_uuid, 236 before_task_start=self._before_new_task, 237 after_task_start=self._after_new_task, 238 error_task_start=self._error_new_task, 239 ) 240 241 self.__tasks[_uuid] = _task 242 return _task 243 244 def __task_complete(self, task_id: UUID, task_result: Mapping[str, Any]): 245 _task = self.__tasks[task_id] 246 _task_complete(_task, task_result) 247 del self.__tasks[task_id] 248 249 def __task_fail(self, task_id: UUID, task_result: Mapping[str, Any]): 250 _task = self.__tasks[task_id] 251 _task_fail(_task, task_result) 252 del self.__tasks[task_id] 253 254 def __task_complete_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 255 with self.__lock: 256 if task_id in self.__tasks.keys(): 257 return self.__task_complete(task_id, task_result) 258 else: 259 raise KeyError("Task {uuid} not found in this connection.".format(uuid=repr(str(task_id)))) 260 261 def __task_fail_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 262 with self.__lock: 263 if task_id in self.__tasks.keys(): 264 return self.__task_fail(task_id, task_result) 265 else: 266 raise KeyError("Task {uuid} not found in this connection.".format(uuid=repr(str(task_id)))) 267 268 def __init_triggers(self): 269 setattr(self, _COMPLETE_TRIGGER_NAME, self.__task_complete_trigger) 270 setattr(self, _FAIL_TRIGGER_NAME, self.__task_fail_trigger) 271 272 273def _connection_task_complete(connection: SlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 274 return getattr(connection, _COMPLETE_TRIGGER_NAME)(task_id, task_result) 275 276 277def _connection_task_fail(connection: SlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 278 return getattr(connection, _FAIL_TRIGGER_NAME)(task_id, task_result) 279 280 281class SlaveConnectionProxy(_ISlaveConnection): 282 """ 283 Overview: 284 Proxy class for `SlaveConnection` class, which wraps the original methods. 285 """ 286 287 def __init__( 288 self, 289 connection: SlaveConnection, 290 after_connect: Optional[Callable] = None, 291 after_disconnect: Optional[Callable] = None 292 ): 293 """ 294 Overview: 295 Constructor of `SlaveConnectionProxy` 296 Arguments: 297 - connection (:obj:`SlaveConnection`): Slave connection object 298 - after_connect (:obj:`Optional[Callable]`): Behaviour going to be executed after connection established 299 - after_disconnect (:obj:`Optional[Callable]`): Behaviour going to be executed after connection killed 300 """ 301 self.__connection = connection 302 self.__lock = Lock() 303 self.__after_connect = after_connect 304 self.__after_disconnect = after_disconnect 305 306 self.__init_triggers() 307 308 @property 309 def is_connected(self) -> bool: 310 """ 311 Overview: 312 Check connection status 313 Returns: 314 - connected (:obj:`bool`): Whether this connection is still alive 315 """ 316 with self.__lock: 317 return self.__connection.is_connected 318 319 def connect(self): 320 with self.__lock: 321 result = self.__connection.connect() 322 if self.__after_connect is not None: 323 self.__after_connect(connection=self) 324 return result 325 326 def disconnect(self): 327 with self.__lock: 328 result = self.__connection.disconnect() 329 if self.__after_disconnect is not None: 330 self.__after_disconnect(connection=self) 331 return result 332 333 def new_task(self, data: Optional[Mapping[str, Any]] = None): 334 with self.__lock: 335 return self.__connection.new_task(data) 336 337 def __task_complete_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 338 with self.__lock: 339 return _connection_task_complete(self.__connection, task_id, task_result) 340 341 def __task_fail_trigger(self, task_id: UUID, task_result: Mapping[str, Any]): 342 with self.__lock: 343 return _connection_task_fail(self.__connection, task_id, task_result) 344 345 def __init_triggers(self): 346 setattr(self, _COMPLETE_TRIGGER_NAME, self.__task_complete_trigger) 347 setattr(self, _FAIL_TRIGGER_NAME, self.__task_fail_trigger) 348 349 350def _proxy_task_complete(proxy: SlaveConnectionProxy, task_id: UUID, task_result: Mapping[str, Any]): 351 return getattr(proxy, _COMPLETE_TRIGGER_NAME)(task_id, task_result) 352 353 354def _proxy_task_fail(proxy: SlaveConnectionProxy, task_id: UUID, task_result: Mapping[str, Any]): 355 return getattr(proxy, _FAIL_TRIGGER_NAME)(task_id, task_result) 356 357 358def _slave_task_complete(connection: _ISlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 359 if isinstance(connection, SlaveConnection): 360 return _connection_task_complete(connection, task_id, task_result) 361 elif isinstance(connection, SlaveConnectionProxy): 362 return _proxy_task_complete(connection, task_id, task_result) 363 else: 364 raise TypeError( 365 "{expect1} or {expect2} expected, but {actual} found.".format( 366 expect1=SlaveConnection.__name__, 367 expect2=SlaveConnectionProxy.__name__, 368 actual=type(connection).__name__, 369 ) 370 ) 371 372 373def _slave_task_fail(connection: _ISlaveConnection, task_id: UUID, task_result: Mapping[str, Any]): 374 if isinstance(connection, SlaveConnection): 375 return _connection_task_fail(connection, task_id, task_result) 376 elif isinstance(connection, SlaveConnectionProxy): 377 return _proxy_task_fail(connection, task_id, task_result) 378 else: 379 raise TypeError( 380 "{expect1} or {expect2} expected, but {actual} found.".format( 381 expect1=SlaveConnection.__name__, 382 expect2=SlaveConnectionProxy.__name__, 383 actual=type(connection).__name__, 384 ) 385 ) 386 387 388def _default_wrap(func: Callable) -> Callable: 389 390 @wraps(func) 391 def _new_func(*args, **kwargs): 392 if func: 393 return func(*args, **kwargs) 394 else: 395 return None 396 397 return _new_func 398 399 400def _get_connection_class( 401 before_new_task: Optional[_BEFORE_HOOK_TYPE] = None, 402 after_new_task: Optional[_AFTER_HOOK_TYPE] = None, 403 error_new_task: Optional[_ERROR_HOOK_TYPE] = None, 404 before_connect: Optional[_BEFORE_HOOK_TYPE] = None, 405 after_connect: Optional[_AFTER_HOOK_TYPE] = None, 406 error_connect: Optional[_ERROR_HOOK_TYPE] = None, 407 before_disconnect: Optional[_BEFORE_HOOK_TYPE] = None, 408 after_disconnect: Optional[_AFTER_HOOK_TYPE] = None, 409 error_disconnect: Optional[_ERROR_HOOK_TYPE] = None, 410) -> Type[SlaveConnection]: 411 412 class _Connection(SlaveConnection): 413 414 def _before_connect(self) -> Mapping[str, Any]: 415 return _default_wrap(before_connect)() or {} 416 417 def _after_connect( 418 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, 419 Any]] 420 ) -> Any: 421 return _default_wrap(after_connect)(status_code, success, code, message, data) 422 423 def _error_connect(self, error: RequestException) -> Any: 424 return _default_wrap(error_connect)(error) 425 426 def _before_disconnect(self) -> Mapping[str, Any]: 427 return _default_wrap(before_disconnect)() or {} 428 429 def _after_disconnect( 430 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, 431 Any]] 432 ) -> Any: 433 return _default_wrap(after_disconnect)(status_code, success, code, message, data) 434 435 def _error_disconnect(self, error: RequestException) -> Any: 436 return _default_wrap(error_disconnect)(error) 437 438 def _before_new_task(self, data: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]: 439 return _default_wrap(before_new_task)(data) or {} 440 441 def _after_new_task( 442 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, 443 Any]] 444 ) -> Any: 445 return _default_wrap(after_new_task)(status_code, success, code, message, data) 446 447 def _error_new_task(self, error: RequestException) -> Any: 448 return _default_wrap(error_new_task)(error) 449 450 return _Connection