Skip to content

ding.framework.message_queue.nng

ding.framework.message_queue.nng

NNGMQ

Bases: MQ

__init__(listen_to, attach_to=None, **kwargs)

Overview

Connect distributed processes with nng

Arguments: - listen_to (:obj:Optional[List[str]]): The node address to attach to. - attach_to (:obj:Optional[List[str]]): The node's addresses you want to attach to.

Full Source Code

../ding/framework/message_queue/nng.py

1import pynng 2from ditk import logging 3from typing import List, Optional, Tuple 4from pynng import Bus0 5from time import sleep 6 7from ding.framework.message_queue.mq import MQ 8from ding.utils import MQ_REGISTRY 9 10 11@MQ_REGISTRY.register("nng") 12class NNGMQ(MQ): 13 14 def __init__(self, listen_to: str, attach_to: Optional[List[str]] = None, **kwargs) -> None: 15 """ 16 Overview: 17 Connect distributed processes with nng 18 Arguments: 19 - listen_to (:obj:`Optional[List[str]]`): The node address to attach to. 20 - attach_to (:obj:`Optional[List[str]]`): The node's addresses you want to attach to. 21 """ 22 self.listen_to = listen_to 23 self.attach_to = attach_to or [] 24 self._sock: Bus0 = None 25 self._running = False 26 27 def listen(self) -> None: 28 self._sock = sock = Bus0() 29 sock.listen(self.listen_to) 30 sleep(0.1) # Wait for peers to bind 31 for contact in self.attach_to: 32 sock.dial(contact) 33 logging.info("NNG listen on {}, attach to {}".format(self.listen_to, self.attach_to)) 34 self._running = True 35 36 def publish(self, topic: str, data: bytes) -> None: 37 if self._running: 38 topic += "::" 39 data = topic.encode() + data 40 self._sock.send(data) 41 42 def subscribe(self, topic: str) -> None: 43 return 44 45 def unsubscribe(self, topic: str) -> None: 46 return 47 48 def recv(self) -> Tuple[str, bytes]: 49 while True: 50 try: 51 if not self._running: 52 break 53 msg = self._sock.recv() 54 # Use topic at the beginning of the message, so we don't need to call pickle.loads 55 # when the current process is not subscribed to the topic. 56 topic, payload = msg.split(b"::", maxsplit=1) 57 return topic.decode(), payload 58 except pynng.Timeout: 59 logging.warning("Timeout on node {} when waiting for message from bus".format(self.listen_to)) 60 except pynng.Closed: 61 if self._running: 62 logging.error("The socket was not closed under normal circumstances!") 63 except Exception as e: 64 logging.error("Meet exception when listening for new messages", e) 65 66 def stop(self) -> None: 67 if self._running: 68 self._running = False 69 self._sock.close() 70 self._sock = None 71 72 def __del__(self) -> None: 73 self.stop()