Skip to content

ding.entry.parallel_entry

ding.entry.parallel_entry

parallel_pipeline(input_cfg, seed, enable_total_log=False, disable_flask_log=True)

Overview

Parallel pipeline entry.

Arguments: - config (:obj:Union[str, dict]): Config file path. - seed (:obj:int): Random seed. - enable_total_log (:obj:Optional[bool]): whether enable total DI-engine system log - disable_flask_log (:obj:Optional[bool]): whether disable flask log

Full Source Code

../ding/entry/parallel_entry.py

1from typing import Optional, Union, Tuple 2import time 3import pickle 4from ditk import logging 5from multiprocessing import Process, Event 6import threading 7from easydict import EasyDict 8 9from ding.worker import create_comm_learner, create_comm_collector, Coordinator 10from ding.config import read_config_with_system, compile_config_parallel 11from ding.utils import set_pkg_seed 12 13 14def parallel_pipeline( 15 input_cfg: Union[str, Tuple[dict, dict, dict]], 16 seed: int, 17 enable_total_log: Optional[bool] = False, 18 disable_flask_log: Optional[bool] = True, 19) -> None: 20 r""" 21 Overview: 22 Parallel pipeline entry. 23 Arguments: 24 - config (:obj:`Union[str, dict]`): Config file path. 25 - seed (:obj:`int`): Random seed. 26 - enable_total_log (:obj:`Optional[bool]`): whether enable total DI-engine system log 27 - disable_flask_log (:obj:`Optional[bool]`): whether disable flask log 28 """ 29 # Disable some part of DI-engine log 30 if not enable_total_log: 31 coordinator_log = logging.getLogger('coordinator_logger') 32 coordinator_log.disabled = True 33 # Disable flask logger. 34 if disable_flask_log: 35 log = logging.getLogger('werkzeug') 36 log.disabled = True 37 # Parallel job launch. 38 if isinstance(input_cfg, str): 39 main_cfg, create_cfg, system_cfg = read_config_with_system(input_cfg) 40 elif isinstance(input_cfg, tuple) or isinstance(input_cfg, list): 41 main_cfg, create_cfg, system_cfg = input_cfg 42 else: 43 raise TypeError("invalid config type: {}".format(input_cfg)) 44 config = compile_config_parallel(main_cfg, create_cfg=create_cfg, system_cfg=system_cfg, seed=seed) 45 learner_handle = [] 46 collector_handle = [] 47 for k, v in config.system.items(): 48 if 'learner' in k: 49 learner_handle.append(launch_learner(config.seed, v)) 50 elif 'collector' in k: 51 collector_handle.append(launch_collector(config.seed, v)) 52 launch_coordinator(config.seed, config, learner_handle=learner_handle, collector_handle=collector_handle) 53 54 55# Following functions are used to launch different components(learner, learner aggregator, collector, coordinator). 56# Argument ``config`` is the dict type config. If it is None, then ``filename`` and ``name`` must be passed, 57# for they can be used to read corresponding config from file. 58def run_learner(config, seed, start_learner_event, close_learner_event): 59 set_pkg_seed(seed) 60 log = logging.getLogger('werkzeug') 61 log.disabled = True 62 learner = create_comm_learner(config) 63 learner.start() 64 start_learner_event.set() 65 close_learner_event.wait() 66 learner.close() 67 68 69def launch_learner( 70 seed: int, config: Optional[dict] = None, filename: Optional[str] = None, name: Optional[str] = None 71) -> list: 72 if config is None: 73 with open(filename, 'rb') as f: 74 config = pickle.load(f)[name] 75 start_learner_event = Event() 76 close_learner_event = Event() 77 78 learner_thread = Process( 79 target=run_learner, args=(config, seed, start_learner_event, close_learner_event), name='learner_entry_process' 80 ) 81 learner_thread.start() 82 return learner_thread, start_learner_event, close_learner_event 83 84 85def run_collector(config, seed, start_collector_event, close_collector_event): 86 set_pkg_seed(seed) 87 log = logging.getLogger('werkzeug') 88 log.disabled = True 89 collector = create_comm_collector(config) 90 collector.start() 91 start_collector_event.set() 92 close_collector_event.wait() 93 collector.close() 94 95 96def launch_collector( 97 seed: int, config: Optional[dict] = None, filename: Optional[str] = None, name: Optional[str] = None 98) -> list: 99 if config is None: 100 with open(filename, 'rb') as f: 101 config = pickle.load(f)[name] 102 start_collector_event = Event() 103 close_collector_event = Event() 104 105 collector_thread = Process( 106 target=run_collector, 107 args=(config, seed, start_collector_event, close_collector_event), 108 name='collector_entry_process' 109 ) 110 collector_thread.start() 111 return collector_thread, start_collector_event, close_collector_event 112 113 114def launch_coordinator( 115 seed: int, 116 config: Optional[EasyDict] = None, 117 filename: Optional[str] = None, 118 learner_handle: Optional[list] = None, 119 collector_handle: Optional[list] = None 120) -> None: 121 set_pkg_seed(seed) 122 if config is None: 123 with open(filename, 'rb') as f: 124 config = pickle.load(f) 125 coordinator = Coordinator(config) 126 for _, start_event, _ in learner_handle: 127 start_event.wait() 128 for _, start_event, _ in collector_handle: 129 start_event.wait() 130 coordinator.start() 131 system_shutdown_event = threading.Event() 132 133 # Monitor thread: Coordinator will remain running until its ``system_shutdown_flag`` is set to False. 134 def shutdown_monitor(): 135 while True: 136 time.sleep(3) 137 if coordinator.system_shutdown_flag: 138 coordinator.close() 139 for _, _, close_event in learner_handle: 140 close_event.set() 141 for _, _, close_event in collector_handle: 142 close_event.set() 143 system_shutdown_event.set() 144 break 145 146 shutdown_monitor_thread = threading.Thread(target=shutdown_monitor, args=(), daemon=True, name='shutdown_monitor') 147 shutdown_monitor_thread.start() 148 system_shutdown_event.wait() 149 print( 150 "[DI-engine parallel pipeline]Your RL agent is converged, you can refer to 'log' and 'tensorboard' for details" 151 )