Skip to content

ding.worker.coordinator.operator_server

ding.worker.coordinator.operator_server

Full Source Code

../ding/worker/coordinator/operator_server.py

1from typing import Optional, Mapping, Any 2from requests.exceptions import RequestException 3from ding.interaction.base import get_http_engine_class, get_values_from_response 4 5 6class OperatorServer: 7 8 def __init__( 9 self, 10 host: str, 11 port: Optional[int] = None, 12 api_version: str = "v1alpha1", 13 https: bool = False, 14 namespace: str = None, 15 name: str = None, 16 ): 17 # request part 18 self.__http_engine = get_http_engine_class(headers={})()(host, port, https) 19 self.__api_version = api_version 20 self.__namespace = namespace 21 self.__my_name = name 22 self.__worker_type = None 23 24 @property 25 def api_version(self): 26 return self.__api_version 27 28 def set_worker_type(self, type): 29 assert type in ['coordinator', 'aggregator'], "invalid worker_type: {}".format(type) 30 self.__worker_type = type 31 32 def __prefix_with_api_version(self, path): 33 return self.__api_version + path 34 35 def get_replicas(self, name: str = None): 36 try: 37 if name is None: 38 assert self.__worker_type, "set worker type first" 39 params = {"namespace": self.__namespace, self.__worker_type: self.__my_name} 40 else: 41 params = {"namespace": self.__namespace, "name": name} 42 response = self.__http_engine.request('GET', self.__prefix_with_api_version('/replicas'), params=params) 43 except RequestException as err: 44 return self._error_request(err) 45 else: 46 return self._after_request(*get_values_from_response(response)) 47 48 def post_replicas(self, data): 49 try: 50 data.update({"namespace": self.__namespace, "coordinator": self.__my_name}) 51 response = self.__http_engine.request('POST', self.__prefix_with_api_version('/replicas'), data=data) 52 except RequestException as err: 53 return self._error_request(err) 54 else: 55 return self._after_request(*get_values_from_response(response)) 56 57 def post_replicas_failed(self, collectors=[], learners=[]): 58 try: 59 data = { 60 "namespace": self.__namespace, 61 "coordinator": self.__my_name, 62 "collectors": collectors, 63 "learners": learners, 64 } 65 response = self.__http_engine.request('POST', self.__prefix_with_api_version('/replicas/failed'), data=data) 66 except RequestException as err: 67 return self._error_request(err) 68 else: 69 return self._after_request(*get_values_from_response(response)) 70 71 def delete_replicas(self, n_collectors=0, n_learners=0): 72 try: 73 data = { 74 "namespace": self.__namespace, 75 "coordinator": self.__my_name, 76 "collectors": { 77 "replicas": n_collectors, 78 }, 79 "learners": { 80 "replicas": n_learners, 81 } 82 } 83 response = self.__http_engine.request('DELETE', self.__prefix_with_api_version('/replicas'), data=data) 84 except RequestException as err: 85 return self._error_request(err) 86 else: 87 return self._after_request(*get_values_from_response(response)) 88 89 def _after_request( 90 self, status_code: int, success: bool, code: int, message: Optional[str], data: Optional[Mapping[str, Any]] 91 ) -> Any: 92 return success, code, message, data 93 94 def _error_request(self, error: RequestException) -> Any: 95 # raise error 96 raise RequestException