python - Broadcasting message to all clients using Pika + sockjs-tornado -


i'm new realtime apps based on websockets, , stucked @ 1 point. app has following components:

  1. simple python script fired when generic event occurs. receives data , sends queue (rabbitmq) using pika.
  2. tornado app (with sockjs-tornado) receiving message queue (pika asynchronous client), processing content, saving new app state database , broadcastig data clients (sockjs clients). communication clients in 1 direction - connect server , receive data.

the problem can't figure out how pass data received queue clients. i've done pub/sub exchange, when user connects server, new connection established rabbitmq every user, that's not want. below i've got far.

common/pika_client.py:

import logging  import pika pika.adapters.tornado_connection import tornadoconnection   class pikaclient(object):      def __init__(self, exchange, host='localhost', port=5672, vhost=''):         # default values         self.connected = false         self.connecting = false         self.connection = none         self.channel = none         self.host = host         self.port = port         self.vhost = vhost         self.exchange = exchange          # preparing logger         self.log = logging.getlogger(__name__)         self.set_log_level()      def set_log_level(self, log_level=logging.warning):         self.log.setlevel(log_level)      def connect(self):         self.log.info("connecting")         if self.connecting:             self.log.info('%s: connecting rabbitmq' %                 self.__class__.__name__)             return          self.log.info('%s: connecting rabbitmq on localhost:5672' %             self.__class__.__name__)         self.connecting = true          param = pika.connectionparameters(             host=self.host,             port=self.port,             virtual_host=self.vhost         )          self.connection = tornadoconnection(param,             on_open_callback=self.on_connected)         self.connection.add_on_close_callback(self.on_closed)      def on_connected(self, connection):         self.log.info('%s: connected rabbitmq on localhost:5672' %             self.__class__.__name__)         self.connected = true         self.connection = connection         self.connection.channel(self.on_channel_open)      def on_channel_open(self, channel):         self.log.info('%s: channel open, declaring exchange %s' %             (self.__class__.__name__, self.exchange))         self.channel = channel         self.channel.exchange_declare(             exchange=self.exchange,             type="fanout",             callback=self.on_exchange_declared         )      def on_exchange_declared(self, frame):         self.log.info('%s: exchange declared, declaring queue' %             (self.__class__.__name__))         self.channel.queue_declare(exclusive=true,             callback=self.on_queue_declared)      def on_queue_declared(self, frame):         self.log.info('%s: queue declared, binding queue %s' %             (self.__class__.__name__, frame.method.queue))         self.queue_name = frame.method.queue         self.channel.queue_bind(             exchange=self.exchange,             queue=frame.method.queue,             callback=self.on_queue_bound         )      def on_queue_bound(self, frame):         self.log.info('%s: queue bound. receive messages implement \                        on_queue_bound method' % self.__class__.__name__)      def on_closed(self, connection):         self.log.info('%s: connection closed' % self.__class__.__name__)         self.connected = false         self.connection = none         self.connecting = false         self.channel = none         self.connection = self.connect()      def add_message_handler(self, handler):         self.message_handler = handler 

tracker.py

from sockjs.tornado import sockjsconnection  import settings common.pika_client import pikaclient   class queuereceiver(pikaclient):     """receives messages rabbitmq """      def on_queue_bound(self, frame):         self.log.info('consuming on queue %s' % self.queue_name)         self.channel.basic_consume(consumer_callback=self.message_handler,             queue=self.queue_name         )   class trackerconnection(sockjsconnection):      def on_open(self, info):         self.queue = queuereceiver('clt')         self.queue.add_message_handler(self.on_queue_message)         self.queue.set_log_level(settings.log_level)         self.queue.connect()      def on_queue_message(self, channel, method, header, body):         self.send(body)         self.queue.channel.basic_ack(delivery_tag=method.delivery_tag) 

it works, mentioned, i'd have 1 connection queue, receive messages, stuff , broadcast results clients using broadcast() method. in advance help.


Comments

Popular posts from this blog

Perl - how to grep a block of text from a file -

delphi - How to remove all the grips on a coolbar if I have several coolbands? -

javascript - Animating array of divs; only the final element is modified -