Skip to content

ding.example.dqn

ding.example.dqn

Example of DQN pipeline

Use the pipeline on a single process:

python3 -u ding/example/dqn.py

Use the pipeline on multiple processes:

We surpose there are N processes (workers) = 1 learner + 1 evaluator + (N-2) collectors

First Example —— Execute on one machine with multi processes.

Execute 4 processes with 1 learner + 1 evaluator + 2 collectors Remember to keep them connected by mesh to ensure that they can exchange information with each other.

ditask --package . --main ding.example.dqn.main --parallel-workers 4 --topology mesh

Second Example —— Execute on multiple machines.

  1. Execute 1 learner + 1 evaluator on one machine.

ditask --package . --main ding.example.dqn.main --parallel-workers 2 --topology mesh --node-ids 0 --ports 50515

  1. Execute 2 collectors on another machine. (Suppose the ip of the first machine is 127.0.0.1). Here we use alone topology instead of mesh because the collectors do not need communicate with each other. Remember the node_ids cannot be duplicated with the learner, evaluator processes. And remember to set the ports (should not conflict with others) and attach_to parameters. The value of the attach_to parameter should be obtained from the log of the process started earlier (e.g. 'NNG listen on tcp://10.0.0.4:50515').

ditask --package . --main ding.example.dqn.main --parallel-workers 2 --topology alone --node-ids 2 --ports 50517 --attach-to tcp://10.0.0.4:50515,tcp://127.0.0.1:50516

  1. You can repeat step 2 to start more collectors on other machines.

Full Source Code

../ding/example/dqn.py

1""" 2# Example of DQN pipeline 3 4Use the pipeline on a single process: 5 6> python3 -u ding/example/dqn.py 7 8Use the pipeline on multiple processes: 9 10We surpose there are N processes (workers) = 1 learner + 1 evaluator + (N-2) collectors 11 12## First Example —— Execute on one machine with multi processes. 13 14Execute 4 processes with 1 learner + 1 evaluator + 2 collectors 15Remember to keep them connected by mesh to ensure that they can exchange information with each other. 16 17> ditask --package . --main ding.example.dqn.main --parallel-workers 4 --topology mesh 18 19## Second Example —— Execute on multiple machines. 20 211. Execute 1 learner + 1 evaluator on one machine. 22 23> ditask --package . --main ding.example.dqn.main --parallel-workers 2 --topology mesh --node-ids 0 --ports 50515 24 252. Execute 2 collectors on another machine. (Suppose the ip of the first machine is 127.0.0.1). 26 Here we use `alone` topology instead of `mesh` because the collectors do not need communicate with each other. 27 Remember the `node_ids` cannot be duplicated with the learner, evaluator processes. 28 And remember to set the `ports` (should not conflict with others) and `attach_to` parameters. 29 The value of the `attach_to` parameter should be obtained from the log of the 30 process started earlier (e.g. 'NNG listen on tcp://10.0.0.4:50515'). 31 32> ditask --package . --main ding.example.dqn.main --parallel-workers 2 --topology alone --node-ids 2 \ 33 --ports 50517 --attach-to tcp://10.0.0.4:50515,tcp://127.0.0.1:50516 34 353. You can repeat step 2 to start more collectors on other machines. 36""" 37import gym 38from ditk import logging 39from ding.data.model_loader import FileModelLoader 40from ding.data.storage_loader import FileStorageLoader 41from ding.model import DQN 42from ding.policy import DQNPolicy 43from ding.envs import DingEnvWrapper, BaseEnvManagerV2 44from ding.data import DequeBuffer 45from ding.config import compile_config 46from ding.framework import task, ding_init 47from ding.framework.context import OnlineRLContext 48from ding.framework.middleware import OffPolicyLearner, StepCollector, interaction_evaluator, data_pusher, \ 49 eps_greedy_handler, CkptSaver, ContextExchanger, ModelExchanger, online_logger 50from ding.utils import set_pkg_seed 51from dizoo.classic_control.cartpole.config.cartpole_dqn_config import main_config, create_config 52 53 54def main(): 55 logging.getLogger().setLevel(logging.INFO) 56 cfg = compile_config(main_config, create_cfg=create_config, auto=True, save_cfg=task.router.node_id == 0) 57 ding_init(cfg) 58 with task.start(async_mode=False, ctx=OnlineRLContext()): 59 collector_env = BaseEnvManagerV2( 60 env_fn=[lambda: DingEnvWrapper(gym.make("CartPole-v0")) for _ in range(cfg.env.collector_env_num)], 61 cfg=cfg.env.manager 62 ) 63 evaluator_env = BaseEnvManagerV2( 64 env_fn=[lambda: DingEnvWrapper(gym.make("CartPole-v0")) for _ in range(cfg.env.evaluator_env_num)], 65 cfg=cfg.env.manager 66 ) 67 68 set_pkg_seed(cfg.seed, use_cuda=cfg.policy.cuda) 69 70 model = DQN(**cfg.policy.model) 71 buffer_ = DequeBuffer(size=cfg.policy.other.replay_buffer.replay_buffer_size) 72 policy = DQNPolicy(cfg.policy, model=model) 73 74 # Consider the case with multiple processes 75 if task.router.is_active: 76 # You can use labels to distinguish between workers with different roles, 77 # here we use node_id to distinguish. 78 if task.router.node_id == 0: 79 task.add_role(task.role.LEARNER) 80 elif task.router.node_id == 1: 81 task.add_role(task.role.EVALUATOR) 82 else: 83 task.add_role(task.role.COLLECTOR) 84 85 # Sync their context and model between each worker. 86 task.use(ContextExchanger(skip_n_iter=1)) 87 task.use(ModelExchanger(model)) 88 89 # Here is the part of single process pipeline. 90 task.use(interaction_evaluator(cfg, policy.eval_mode, evaluator_env)) 91 task.use(eps_greedy_handler(cfg)) 92 task.use(StepCollector(cfg, policy.collect_mode, collector_env)) 93 task.use(data_pusher(cfg, buffer_)) 94 task.use(OffPolicyLearner(cfg, policy.learn_mode, buffer_)) 95 task.use(online_logger(train_show_freq=10)) 96 task.use(CkptSaver(policy, cfg.exp_name, train_freq=100)) 97 98 task.run() 99 100 101if __name__ == "__main__": 102 main()