Skip to content

ding.framework.message_queue.redis

ding.framework.message_queue.redis

RedisMQ

Bases: MQ

__init__(redis_host, redis_port, **kwargs)

Overview

Connect distributed processes with redis

Arguments: - redis_host (:obj:str): Redis server host. - redis_port (:obj:int): Redis server port.

Full Source Code

../ding/framework/message_queue/redis.py

1import uuid 2from ditk import logging 3from time import sleep 4from typing import Tuple 5 6import redis 7from ding.framework.message_queue.mq import MQ 8from ding.utils import MQ_REGISTRY 9 10 11@MQ_REGISTRY.register("redis") 12class RedisMQ(MQ): 13 14 def __init__(self, redis_host: str, redis_port: int, **kwargs) -> None: 15 """ 16 Overview: 17 Connect distributed processes with redis 18 Arguments: 19 - redis_host (:obj:`str`): Redis server host. 20 - redis_port (:obj:`int`): Redis server port. 21 """ 22 self.host = redis_host 23 self.port = redis_port if isinstance(redis_port, int) else int(redis_port) 24 self.db = 0 25 self._running = False 26 self._id = uuid.uuid4().hex.encode() 27 28 def listen(self) -> None: 29 self._client = client = redis.Redis(host=self.host, port=self.port, db=self.db) 30 self._sub = client.pubsub() 31 self._running = True 32 33 def publish(self, topic: str, data: bytes) -> None: 34 data = self._id + b"::" + data 35 self._client.publish(topic, data) 36 37 def subscribe(self, topic: str) -> None: 38 self._sub.subscribe(topic) 39 40 def unsubscribe(self, topic: str) -> None: 41 self._sub.unsubscribe(topic) 42 43 def recv(self) -> Tuple[str, bytes]: 44 while True: 45 if not self._running: 46 raise RuntimeError("Redis MQ was not running!") 47 try: 48 msg = self._sub.get_message(ignore_subscribe_messages=True) 49 if msg is None: 50 sleep(0.001) 51 continue 52 topic = msg["channel"].decode() 53 data = msg["data"].split(b"::", maxsplit=1) 54 if len(data) != 2 or len(data[0]) != 32: 55 logging.warn("Got invalid message from topic: {}".format(topic)) 56 continue 57 node_id, data = data 58 if node_id == self._id: # Discard message sent by self 59 continue 60 return topic, data 61 except (OSError, AttributeError, Exception) as e: 62 logging.error("Meet exception when listening for new messages", e) 63 64 def stop(self) -> None: 65 if self._running: 66 self._running = False 67 self._sub.close() 68 self._client.close() 69 70 def __del__(self) -> None: 71 self.stop()