Skip to content

ding.worker.collector.comm.base_comm_collector

ding.worker.collector.comm.base_comm_collector

BaseCommCollector

Bases: ABC

Overview

Abstract baseclass for common collector.

Interfaces: init, get_policy_update_info, send_metadata, send_stepdata start, close, _create_collector Property: collector_uid

__init__(cfg)

Overview

Initialization method.

Arguments: - cfg (:obj:EasyDict): Config dict

get_policy_update_info(path) abstractmethod

Overview

Get policy information in corresponding path. Will be registered in base collector.

Arguments: - path (:obj:str): path to policy update information.

send_metadata(metadata) abstractmethod

Overview

Store meta data in queue, which will be retrieved by callback function "deal_with_collector_data" in collector slave, then will be sent to coordinator. Will be registered in base collector.

Arguments: - metadata (:obj:Any): meta data.

send_stepdata(stepdata) abstractmethod

Overview

Save step data in corresponding path. Will be registered in base collector.

Arguments: - stepdata (:obj:Any): step data.

start()

Overview

Start comm collector.

close()

Overview

Close comm collector.

create_comm_collector(cfg)

Overview

Given the key(comm_collector_name), create a new comm collector instance if in comm_map's values, or raise an KeyError. In other words, a derived comm collector must first register, then can call create_comm_collector to get the instance.

Arguments: - cfg (:obj:EasyDict): Collector config. Necessary keys: [import_names, comm_collector_type]. Returns: - collector (:obj:BaseCommCollector): The created new comm collector, should be an instance of one of comm_map's values.

Full Source Code

../ding/worker/collector/comm/base_comm_collector.py

1from abc import ABC, abstractmethod 2from typing import Any 3from easydict import EasyDict 4 5from ding.utils import get_task_uid, import_module, COMM_COLLECTOR_REGISTRY 6from ..base_parallel_collector import create_parallel_collector, BaseParallelCollector 7 8 9class BaseCommCollector(ABC): 10 """ 11 Overview: 12 Abstract baseclass for common collector. 13 Interfaces: 14 __init__, get_policy_update_info, send_metadata, send_stepdata 15 start, close, _create_collector 16 Property: 17 collector_uid 18 """ 19 20 def __init__(self, cfg): 21 """ 22 Overview: 23 Initialization method. 24 Arguments: 25 - cfg (:obj:`EasyDict`): Config dict 26 """ 27 self._cfg = cfg 28 self._end_flag = True 29 self._collector_uid = get_task_uid() 30 31 @abstractmethod 32 def get_policy_update_info(self, path: str) -> Any: 33 """ 34 Overview: 35 Get policy information in corresponding path. 36 Will be registered in base collector. 37 Arguments: 38 - path (:obj:`str`): path to policy update information. 39 """ 40 raise NotImplementedError 41 42 @abstractmethod 43 def send_metadata(self, metadata: Any) -> None: 44 """ 45 Overview: 46 Store meta data in queue, which will be retrieved by callback function "deal_with_collector_data" 47 in collector slave, then will be sent to coordinator. 48 Will be registered in base collector. 49 Arguments: 50 - metadata (:obj:`Any`): meta data. 51 """ 52 raise NotImplementedError 53 54 @abstractmethod 55 def send_stepdata(self, stepdata: Any) -> None: 56 """ 57 Overview: 58 Save step data in corresponding path. 59 Will be registered in base collector. 60 Arguments: 61 - stepdata (:obj:`Any`): step data. 62 """ 63 raise NotImplementedError 64 65 def start(self) -> None: 66 """ 67 Overview: 68 Start comm collector. 69 """ 70 self._end_flag = False 71 72 def close(self) -> None: 73 """ 74 Overview: 75 Close comm collector. 76 """ 77 self._end_flag = True 78 79 @property 80 def collector_uid(self) -> str: 81 return self._collector_uid 82 83 def _create_collector(self, task_info: dict) -> BaseParallelCollector: 84 """ 85 Overview: 86 Receive ``task_info`` passed from coordinator and create a collector. 87 Arguments: 88 - task_info (:obj:`dict`): Task info dict from coordinator. Should be like \ 89 Returns: 90 - collector (:obj:`BaseParallelCollector`): Created base collector. 91 Note: 92 Four methods('send_metadata', 'send_stepdata', 'get_policy_update_info'), and policy are set. 93 The reason why they are set here rather than base collector is, they highly depend on the specific task. 94 Only after task info is passed from coordinator to comm collector through learner slave, can they be 95 clarified and initialized. 96 """ 97 collector_cfg = EasyDict(task_info['collector_cfg']) 98 collector = create_parallel_collector(collector_cfg) 99 for item in ['send_metadata', 'send_stepdata', 'get_policy_update_info']: 100 setattr(collector, item, getattr(self, item)) 101 return collector 102 103 104def create_comm_collector(cfg: EasyDict) -> BaseCommCollector: 105 """ 106 Overview: 107 Given the key(comm_collector_name), create a new comm collector instance if in comm_map's values, 108 or raise an KeyError. In other words, a derived comm collector must first register, 109 then can call ``create_comm_collector`` to get the instance. 110 Arguments: 111 - cfg (:obj:`EasyDict`): Collector config. Necessary keys: [import_names, comm_collector_type]. 112 Returns: 113 - collector (:obj:`BaseCommCollector`): The created new comm collector, should be an instance of one of \ 114 comm_map's values. 115 """ 116 import_module(cfg.get('import_names', [])) 117 return COMM_COLLECTOR_REGISTRY.build(cfg.type, cfg=cfg)