python - Broadcasting message to all clients using Pika + sockjs-tornado -
i'm new realtime apps based on websockets, , stucked @ 1 point. app has following components:
- simple python script fired when generic event occurs. receives data , sends queue (rabbitmq) using pika.
- tornado app (with sockjs-tornado) receiving message queue (pika asynchronous client), processing content, saving new app state database , broadcastig data clients (sockjs clients). communication clients in 1 direction - connect server , receive data.
the problem can't figure out how pass data received queue clients. i've done pub/sub exchange, when user connects server, new connection established rabbitmq every user, that's not want. below i've got far.
common/pika_client.py:
import logging import pika pika.adapters.tornado_connection import tornadoconnection class pikaclient(object): def __init__(self, exchange, host='localhost', port=5672, vhost=''): # default values self.connected = false self.connecting = false self.connection = none self.channel = none self.host = host self.port = port self.vhost = vhost self.exchange = exchange # preparing logger self.log = logging.getlogger(__name__) self.set_log_level() def set_log_level(self, log_level=logging.warning): self.log.setlevel(log_level) def connect(self): self.log.info("connecting") if self.connecting: self.log.info('%s: connecting rabbitmq' % self.__class__.__name__) return self.log.info('%s: connecting rabbitmq on localhost:5672' % self.__class__.__name__) self.connecting = true param = pika.connectionparameters( host=self.host, port=self.port, virtual_host=self.vhost ) self.connection = tornadoconnection(param, on_open_callback=self.on_connected) self.connection.add_on_close_callback(self.on_closed) def on_connected(self, connection): self.log.info('%s: connected rabbitmq on localhost:5672' % self.__class__.__name__) self.connected = true self.connection = connection self.connection.channel(self.on_channel_open) def on_channel_open(self, channel): self.log.info('%s: channel open, declaring exchange %s' % (self.__class__.__name__, self.exchange)) self.channel = channel self.channel.exchange_declare( exchange=self.exchange, type="fanout", callback=self.on_exchange_declared ) def on_exchange_declared(self, frame): self.log.info('%s: exchange declared, declaring queue' % (self.__class__.__name__)) self.channel.queue_declare(exclusive=true, callback=self.on_queue_declared) def on_queue_declared(self, frame): self.log.info('%s: queue declared, binding queue %s' % (self.__class__.__name__, frame.method.queue)) self.queue_name = frame.method.queue self.channel.queue_bind( exchange=self.exchange, queue=frame.method.queue, callback=self.on_queue_bound ) def on_queue_bound(self, frame): self.log.info('%s: queue bound. receive messages implement \ on_queue_bound method' % self.__class__.__name__) def on_closed(self, connection): self.log.info('%s: connection closed' % self.__class__.__name__) self.connected = false self.connection = none self.connecting = false self.channel = none self.connection = self.connect() def add_message_handler(self, handler): self.message_handler = handler
tracker.py
from sockjs.tornado import sockjsconnection import settings common.pika_client import pikaclient class queuereceiver(pikaclient): """receives messages rabbitmq """ def on_queue_bound(self, frame): self.log.info('consuming on queue %s' % self.queue_name) self.channel.basic_consume(consumer_callback=self.message_handler, queue=self.queue_name ) class trackerconnection(sockjsconnection): def on_open(self, info): self.queue = queuereceiver('clt') self.queue.add_message_handler(self.on_queue_message) self.queue.set_log_level(settings.log_level) self.queue.connect() def on_queue_message(self, channel, method, header, body): self.send(body) self.queue.channel.basic_ack(delivery_tag=method.delivery_tag)
it works, mentioned, i'd have 1 connection queue, receive messages, stuff , broadcast results clients using broadcast() method. in advance help.
Comments
Post a Comment