Source code for thriftpool.components.broker
"""Execute external commands by this worker."""
from __future__ import absolute_import
import logging
from pyuv import Pipe
from gaffer.events import EventEmitter
from thriftworker.utils.decorators import cached_property
from thriftworker.utils.loop import in_loop
from thriftworker.utils.mixin import LoopMixin
from thriftpool.components.base import StartStopComponent
from thriftpool.rpc.transport import Consumer
from thriftpool.utils.mixin import LogsMixin
logger = logging.getLogger(__name__)
[docs]class Stream(object):
"""Wrapper for plain file descriptor."""
def __init__(self, loop, fd):
self.loop = loop
self.channel = Pipe(loop)
self.fd = fd
self._emitter = EventEmitter(loop)
[docs] def start(self):
self._emitter.subscribe("WRITE", self._on_write)
self.channel.open(self.fd)
self.channel.start_read(self._on_read)
[docs] def write(self, data):
self._emitter.publish("WRITE", data)
[docs] def subscribe(self, listener):
self._emitter.subscribe('READ', listener)
[docs] def unsubscribe(self, listener):
self._emitter.unsubscribe('READ', listener)
[docs] def stop(self, all_events=False):
if self.channel.active:
self.channel.close()
if all_events:
self._emitter.close()
def _on_write(self, evtype, data):
self.channel.write(data)
def _on_read(self, handle, data, error):
if not data:
return
msg = dict(event='READ', data=data)
self._emitter.publish('READ', msg)
[docs]class PerspectiveBroker(LogsMixin, LoopMixin):
"""Execute commands provided through pipe."""
def __init__(self, app, controller):
self.app = app
self.controller = controller
super(PerspectiveBroker, self).__init__()
@cached_property
[docs] def incoming_stream(self):
return Stream(self.loop, self.controller.incoming_fd)
@cached_property
[docs] def outgoing_stream(self):
return Stream(self.loop, self.controller.outgoing_fd)
@property
[docs] def streams(self):
return [self.incoming_stream,
self.outgoing_stream]
@cached_property
[docs] def consumer(self):
return Consumer(self.loop,
incoming=self.incoming_stream,
outgoing=self.outgoing_stream,
handler=self.controller)
@in_loop
[docs] def start(self):
for stream in self.streams:
stream.start()
self.consumer.start()
@in_loop
[docs] def stop(self):
self.consumer.stop()
for stream in self.streams:
stream.stop(all_events=True)
[docs]class PerspectiveBrokerComponent(StartStopComponent):
name = 'worker.broker'
requires = ('loop', 'acceptors', 'worker')
[docs] def create(self, parent):
return PerspectiveBroker(parent.app, parent)