Source code for thriftpool.components.broker

"""Execute external commands by this worker."""
from __future__ import absolute_import

import logging

from pyuv import Pipe
from gaffer.events import EventEmitter

from thriftworker.utils.decorators import cached_property
from thriftworker.utils.loop import in_loop
from thriftworker.utils.mixin import LoopMixin

from thriftpool.components.base import StartStopComponent
from thriftpool.rpc.transport import Consumer
from thriftpool.utils.mixin import LogsMixin

logger = logging.getLogger(__name__)


[docs]class Stream(object): """Wrapper for plain file descriptor.""" def __init__(self, loop, fd): self.loop = loop self.channel = Pipe(loop) self.fd = fd self._emitter = EventEmitter(loop)
[docs] def start(self): self._emitter.subscribe("WRITE", self._on_write) self.channel.open(self.fd) self.channel.start_read(self._on_read)
[docs] def write(self, data): self._emitter.publish("WRITE", data)
[docs] def subscribe(self, listener): self._emitter.subscribe('READ', listener)
[docs] def unsubscribe(self, listener): self._emitter.unsubscribe('READ', listener)
[docs] def stop(self, all_events=False): if self.channel.active: self.channel.close() if all_events: self._emitter.close()
def _on_write(self, evtype, data): self.channel.write(data) def _on_read(self, handle, data, error): if not data: return msg = dict(event='READ', data=data) self._emitter.publish('READ', msg)
[docs]class PerspectiveBroker(LogsMixin, LoopMixin): """Execute commands provided through pipe.""" def __init__(self, app, controller): self.app = app self.controller = controller super(PerspectiveBroker, self).__init__() @cached_property
[docs] def incoming_stream(self): return Stream(self.loop, self.controller.incoming_fd)
@cached_property
[docs] def outgoing_stream(self): return Stream(self.loop, self.controller.outgoing_fd)
@property
[docs] def streams(self): return [self.incoming_stream, self.outgoing_stream]
@cached_property
[docs] def consumer(self): return Consumer(self.loop, incoming=self.incoming_stream, outgoing=self.outgoing_stream, handler=self.controller)
@in_loop
[docs] def start(self): for stream in self.streams: stream.start() self.consumer.start()
@in_loop
[docs] def stop(self): self.consumer.stop() for stream in self.streams: stream.stop(all_events=True)
[docs]class PerspectiveBrokerComponent(StartStopComponent): name = 'worker.broker' requires = ('loop', 'acceptors', 'worker')
[docs] def create(self, parent): return PerspectiveBroker(parent.app, parent)

Project Versions

This Page