1from enum import unique, IntEnum 2from threading import Lock 3from typing import Mapping, Any, Optional, Callable 4from uuid import UUID, uuid4 5 6import enum_tools 7import requests 8from requests import RequestException 9 10from .base import _BEFORE_HOOK_TYPE, _AFTER_HOOK_TYPE, _ERROR_HOOK_TYPE 11from ..base import HttpEngine, get_values_from_response, default_func 12 13 14@enum_tools.documentation.document_enum 15@unique 16class TaskResultType(IntEnum): 17 """ 18 Overview: 19 Types of the task result 20 """ 21 COMPLETED = 1 # doc: Task complete without error 22 FAILED = 2 # doc: Task end with error 23 24 25@enum_tools.documentation.document_enum 26@unique 27class TaskStatus(IntEnum): 28 """ 29 Overview: 30 Status of a task 31 """ 32 IDLE = 0x00 # doc: Task not started, waiting for awake 33 34 STARTING = 0x11 # doc: Task is starting, but initialization is not completed. 35 STARTED = 0x12 # doc: Task started, initialization is completed. 36 START_FAILED = 0x13 # doc: Task start failed, error occurred when initializing. 37 38 COMPLETED = 0x21 # doc: Task completed without error 39 FAILED = 0x22 # doc: Task ended with error 40 41 42_COMPLETE_TRIGGER_NAME = '__TASK_COMPLETE__' 43_FAIL_TRIGGER_NAME = '__TASK_FAIL__' 44 45 46class Task: 47 """ 48 Overview: 49 Task object of the connections. 50 Linking call is fully supported. 51 Example: 52 >>> with master.new_connection('cnn1,', '127.0.0.1', 2333) as connection: 53 >>> task = connection.new_task({'data': 233}) 54 >>> # task is not sent yet 55 >>> 56 >>> task = task.on_complete(func1).on_fail(func2).on_complete(func3).start().join() 57 >>> # task is completed or failed after this line 58 >>> # when task completed : func1(result) --> func3(result) 59 >>> # when task failed : func2(result) 60 """ 61 62 def __init__( 63 self, 64 http_engine: HttpEngine, 65 data: Mapping[str, Any], 66 task_id: Optional[UUID] = None, 67 before_task_start: Optional[_BEFORE_HOOK_TYPE] = None, 68 after_task_start: Optional[_AFTER_HOOK_TYPE] = None, 69 error_task_start: Optional[_ERROR_HOOK_TYPE] = None 70 ): 71 """ 72 Overview: 73 Constructor of `Task` 74 Arguments: 75 - http_engine (:obj:`HttpEngine`): Http engine object used by the task 76 - data (:obj:`Mapping[str, Any]`): Task data of the task 77 - task_id (:obj:`Optional[UUID]`): Id of the task 78 - before_task_start (:obj:`Optional[_BEFORE_HOOK_TYPE]`): Callback to be executed before task start \ 79 (`None` means do nothing) 80 - after_task_start (:obj:`Optional[_AFTER_HOOK_TYPE]`): Callback to be executed after task start \ 81 (`None` means do nothing) 82 - error_task_start (:obj:`Optional[_ERROR_HOOK_TYPE]`): Callback to be executed when task start failed \ 83 (`None` means do nothing) 84 """ 85 self.__http_engine = http_engine 86 self.__lock = Lock() 87 88 self.__task_id = task_id or uuid4() 89 self.__task_data = data 90 self.__task_result = None 91 self.__task_status = TaskStatus.IDLE 92 self.__task_lock = Lock() 93 94 self.__before_task_start = before_task_start or (lambda d: d) 95 self.__after_task_start = default_func(None)(after_task_start) 96 self.__error_task_start = default_func(None)(error_task_start) 97 self.__after_task_completed_callbacks = [] 98 self.__after_task_failed_callbacks = [] 99 100 self.__init_triggers() 101 102 def __request(self, method: str, path: str, data: Optional[Mapping[str, Any]] = None) -> requests.Response: 103 return self.__http_engine.request(method, path, data) 104 105 def __task_start(self): 106 try: 107 self.__task_status = TaskStatus.STARTING 108 response = self.__request( 109 'POST', '/task/new', { 110 'task': { 111 'id': str(self.__task_id) 112 }, 113 'data': self.__before_task_start(self.__task_data) or {} 114 } 115 ) 116 except RequestException as err: 117 self.__task_status = TaskStatus.START_FAILED 118 return self.__error_task_start(err) 119 else: 120 self.__task_status = TaskStatus.STARTED 121 ret = self.__after_task_start(*get_values_from_response(response)) 122 self.__task_lock.acquire() 123 return ret 124 125 def __task_complete(self, result: Mapping[str, Any]): 126 self.__task_status = TaskStatus.COMPLETED 127 self.__task_result = result 128 for _callback in self.__after_task_completed_callbacks: 129 _callback(self.__task_data, result) 130 self.__task_lock.release() 131 132 def __task_fail(self, result: Mapping[str, Any]): 133 self.__task_status = TaskStatus.FAILED 134 self.__task_result = result 135 for _callback in self.__after_task_failed_callbacks: 136 _callback(self.__task_data, result) 137 self.__task_lock.release() 138 139 # trigger methods 140 def __task_complete_trigger(self, result: Mapping[str, Any]): 141 with self.__lock: 142 if self.__task_status == TaskStatus.STARTED: 143 self.__task_complete(result) 144 else: 145 raise ValueError( 146 "Only task with {expect} status can be completed, but {actual} found.".format( 147 expect=repr(TaskStatus.STARTED.name), 148 actual=repr(self.__task_status.name), 149 ) 150 ) 151 152 def __task_fail_trigger(self, result: Mapping[str, Any]): 153 with self.__lock: 154 if self.__task_status == TaskStatus.STARTED: 155 self.__task_fail(result) 156 else: 157 raise ValueError( 158 "Only task with {expect} status can be failed, but {actual} found.".format( 159 expect=repr(TaskStatus.STARTED.name), 160 actual=repr(self.__task_status.name), 161 ) 162 ) 163 164 def __init_triggers(self): 165 setattr(self, _COMPLETE_TRIGGER_NAME, self.__task_complete_trigger) 166 setattr(self, _FAIL_TRIGGER_NAME, self.__task_fail_trigger) 167 168 # public properties 169 @property 170 def status(self) -> TaskStatus: 171 """ 172 Overview: 173 Get status of the current task 174 Returns: 175 - status (:obj:`TaskStatus`): Task status 176 """ 177 return self.__task_status 178 179 @property 180 def task(self) -> Mapping[str, Any]: 181 """ 182 Overview: 183 Get task data of the current task 184 Returns: 185 - data (:obj:`Mapping[str, Any]`): Task data 186 """ 187 return self.__task_data 188 189 @property 190 def result(self) -> Optional[Mapping[str, Any]]: 191 """ 192 Overview: 193 Get task result of the current task, return `None` if task is not completed or failed. 194 Returns: 195 - result (:obj:`Optional[Mapping[str, Any]]`): Task result (`None` when not completed or failed) 196 """ 197 return self.__task_result 198 199 # public methods 200 def start(self) -> 'Task': 201 """ 202 Overview: 203 Start current task. 204 Returns: 205 - task (:obj:`Task`): Self object, supporting linking call 206 """ 207 with self.__lock: 208 if self.__task_status == TaskStatus.IDLE: 209 self.__task_start() 210 return self 211 else: 212 raise ValueError( 213 "Only task with {expect} status can be started, but {actual} found.".format( 214 expect=repr(TaskStatus.IDLE.name), 215 actual=repr(self.__task_status.name), 216 ) 217 ) 218 219 def join(self) -> 'Task': 220 """ 221 Overview: 222 Wait until the task is completed or failed. 223 Returns: 224 - task (:obj:`Task`): Self object, supporting linking call 225 """ 226 with self.__task_lock: 227 return self 228 229 def on_complete(self, callback: Callable[[Mapping[str, Any], Mapping[str, Any]], Any]) -> 'Task': 230 """ 231 Overview: 232 Execute the callback when the task completed. Multiple callbacks is supported by using linking call. 233 Arguments: 234 - callback (:obj:`Callable[[Mapping[str, Any], Mapping[str, Any]], Any]`): Function to be executed when \ 235 task completed. 236 Returns: 237 - task (:obj:`Task`): Self object, supporting linking call 238 """ 239 with self.__lock: 240 self.__after_task_completed_callbacks.append(callback) 241 return self 242 243 def on_fail(self, callback: Callable[[Mapping[str, Any], Mapping[str, Any]], Any]) -> 'Task': 244 """ 245 Overview: 246 Execute the callback when the task failed. Multiple callbacks is supported by using linking call. 247 Arguments: 248 - callback (:obj:`Callable[[Mapping[str, Any], Mapping[str, Any]], Any]`): Function to be executed when \ 249 task failed. 250 Returns: 251 - task (:obj:`Task`): Self object, supporting linking call 252 """ 253 with self.__lock: 254 self.__after_task_failed_callbacks.append(callback) 255 return self 256 257 258def _task_complete(task: Task, result: Mapping[str, Any]): 259 getattr(task, _COMPLETE_TRIGGER_NAME)(result) 260 261 262def _task_fail(task: Task, result: Mapping[str, Any]): 263 getattr(task, _FAIL_TRIGGER_NAME)(result)