Source code for thriftpool.components.processes

"""Manage process pool."""
from __future__ import absolute_import

import json
import logging
import sys
import os

from gaffer.http.util import CorsHandler
from gaffer.http_handler import HttpHandler, HttpEndpoint
from pyuv import Pipe
from six import iteritems

from thriftworker.utils.loop import in_loop
from thriftworker.utils.decorators import cached_property
from thriftworker.utils.mixin import LoopMixin

from thriftpool.app import current_app
from thriftpool.exceptions import SystemTerminate
from thriftpool.utils.mixin import LogsMixin
from thriftpool.utils.serializers import StreamSerializer
from thriftpool.rpc.manager import Clients

from .base import StartStopComponent

logger = logging.getLogger(__name__)


[docs]class BaseHandler(CorsHandler):
[docs] def initialize(self, clients): self._clients = clients
def _execute(self, *args, **kwargs): execute = super(BaseHandler, self)._execute current_app.hub.spawn(execute, *args, **kwargs)
[docs]class ClientsHandler(BaseHandler):
[docs] def get(self): self.preflight() self.set_status(200) self.write(json.dumps(self._clients.keys()))
[docs]class BaseClientHandler(BaseHandler):
[docs] def get_data(self, proxy): raise NotImplementedError('subclass responsibility')
[docs] def get(self, *args): self.preflight() try: pid = int(args[0]) except ValueError: self.set_status(400) self.write({"error": "bad_value"}) return if pid in self._clients: self.set_status(200) else: self.set_status(404) return client = self._clients[pid] data = client.spawn(self.get_data).get() self.write(json.dumps(data))
[docs]class CounterHandler(BaseClientHandler):
[docs] def get_data(self, proxy): return {'{0}.{1}'.format(service, method): value for (service, method), value in proxy.get_counters().items()}
[docs]class TimerHandler(BaseClientHandler):
[docs] def get_data(self, proxy): return {'{0}.{1}'.format(service, method): value for (service, method), value in proxy.get_timers().items()}
[docs]class StackHandler(BaseClientHandler):
[docs] def get_data(self, proxy): return {ident: [{'method': '{0}.{1}'.format(service, method), 'args': repr(args), 'kwargs': repr(kwargs)} for (service, method, args, kwargs) in l] for ident, l in proxy.get_stack().items()}
[docs]class RedirectStream(object): """Try to write to stream asynchronous.""" def __init__(self, loop, stream): self.fd = None self.stream = stream try: fd = self.fd = stream.fileno() except AttributeError: self.channel = None else: channel = self.channel = Pipe(loop) setattr(channel, 'bypass', True) channel.open(fd)
[docs] def write(self, data): """Write data in asynchronous way.""" if self.channel is not None and not self.channel.closed: self.channel.write(data) else: self.stream.write(data)
[docs]class ProcessManager(LogsMixin, LoopMixin): """Start and manage workers.""" process_name = 'worker' name_template = '[thriftworker-{0}]' \ ' -c {1.CONCURRENCY}' \ ' -k {1.WORKER_TYPE}' gevent_monkey = 'from gevent.monkey import patch_all; patch_all();' script = 'from thriftpool.bin.thriftworker import main; main();' def __init__(self, app, listeners, controller): self.clients = Clients() self.serializers = StreamSerializer() self.app = app self.listeners = listeners self.controller = controller # store process id and start time self._bootstrapped = {} self._aborted = False super(ProcessManager, self).__init__() def __iter__(self): return iter(self._bootstrapped)
[docs] def get_start_time(self, process_id): """When process was registered?""" return self._bootstrapped.get(process_id)
@property
[docs] def manager(self): return self.app.gaffer_manager
@cached_property def _is_started(self): return self.app.env.RealEvent() @cached_property def _is_stopped(self): return self.app.env.RealEvent() @cached_property def _stdout(self): return RedirectStream(self.loop, sys.stdout) @cached_property def _stderr(self): return RedirectStream(self.loop, sys.stderr) @property
[docs] def initialized(self): """All workers started or not?""" state = self.manager.get_process(self.process_name) return len(self._bootstrapped) >= state.numprocesses
def _bootstrap_process(self, proxy, process): # Change name of process. name = self.name_template.format(process.id, self.app.config) proxy.change_title(name) # Register acceptors in remote process. proxy.register_acceptors({i: listener.name for i, listener in iteritems(self.listeners.enumerated)}) for listener in self.listeners: if listener.started: proxy.start_acceptor(listener.name) # Notify about process initialization. self._bootstrapped[process.id] = self.loop.now() self._info('Worker %d initialized.', process.id) if self.initialized: self._info('Workers initialization done.') self._is_started.set() def _do_handshake(self, process): # Pass application to created process. stream = process.streams['handshake'] stream.write(self.serializers.encode_with_length(self.app)) def handshake_done(evtype, info): stream.unsubscribe(handshake_done) # Process exited and we do the same. if not process.active: return self.clients.register(process, self._bootstrap_process) # Wait for worker answer. stream.subscribe(handshake_done) def _redirect_io(self, process): """Redirect stdout & stderr.""" monitor_io = (lambda evtype, msg: (evtype == 'err' and self._stderr or self._stdout) .write(msg['data'])) process.monitor_io('.', monitor_io) def _on_event(self, evtype, msg): """Handle process events.""" if msg['name'] != self.process_name: # Not our process. return if evtype == 'exit': # Log exit event. pid, term_signal, exit_status = \ msg['pid'], msg['term_signal'], msg['exit_status'] log = (exit_status != 0 or term_signal not in (0, 15)) \ and self._critical or self._info log('Worker %d exited with term signal %d and exit status %d.', pid, term_signal, exit_status) elif evtype == 'spawn': # Log spawn event. self._info('Worker %d spawned with pid %d.', msg['pid'], msg['os_pid']) if evtype == 'spawn' and self.controller.is_running: # New process spawned, handle event. process = self.manager.get_process(msg['pid']) self._redirect_io(process) self._do_handshake(process) elif evtype == 'exit': # Process exited, handle event. self._bootstrapped.pop(msg['pid']) self.clients.unregister(msg['pid']) def _create_proc_kwargs(self): """Create arguments for worker.""" config = self.app.config worker_type = config.WORKER_TYPE if worker_type == 'gevent': startup_line = '{0} {1}'.format(self.gevent_monkey, self.script) elif worker_type == 'sync': startup_line = self.script else: raise NotImplementedError() return dict(cmd=sys.executable, args=['-c', '{0}'.format(startup_line)], redirect_output=['out', 'err'], custom_streams=['handshake', 'incoming', 'outgoing'], custom_channels=self.listeners.channels, env=dict(os.environ, IS_WORKER='1'), numprocesses=config.WORKERS, redirect_input=True, graceful_timeout=config.PROCESS_STOP_TIMEOUT) def _create_apps(self): """Create applications for gaffer.""" apps = [] options = dict(clients=self.clients) handlers = [ (r'/timers', ClientsHandler, options), (r'/timers/([0-9^/]+)', TimerHandler, options), (r'/counters', ClientsHandler, options), (r'/counters/([0-9^/]+)', CounterHandler, options), (r'/stack', ClientsHandler, options), (r'/stack/([0-9^/]+)', StackHandler, options), ] endpoints = self.app.config.TORNADO_ENDPOINTS if endpoints: apps.append(HttpHandler(handlers=handlers, log_function=self.app.log.log_tornado_request, endpoints=[HttpEndpoint(uri=uri) for uri in endpoints])) return apps @in_loop def _setup(self): manager = self.manager manager.subscribe('.', self._on_event) manager.add_process(self.process_name, **self._create_proc_kwargs()) manager.start(apps=self._create_apps())
[docs] def start(self): self._setup() self._is_started.wait(self.app.config.PROCESS_START_TIMEOUT) if not self._is_started.is_set() or self._aborted: if not self._aborted: self._error('Timeout when starting processes.') self._teardown() raise SystemTerminate()
[docs] def abort(self): self._aborted = True self._is_started.set()
@in_loop def _teardown(self): self.clients.clear() stop_callback = lambda *args: self._is_stopped.set() self.manager.stop(stop_callback)
[docs] def stop(self): self._teardown() self._is_stopped.wait(self.app.config.PROCESS_STOP_TIMEOUT * 2) if not self._is_stopped.is_set(): self._error('Timeout when terminating processes.') raise SystemTerminate()
[docs]class ProcessManagerComponent(StartStopComponent): name = 'manager.process_manager' requires = ('loop', 'listeners')
[docs] def create(self, parent): processes = parent.processes = \ ProcessManager(parent.app, parent.listeners, parent) return processes

Project Versions

This Page