Source code for thriftpool.controllers.base

# -*- coding: utf-8 -*-
"""Implements controllers.

This file was copied and adapted from celery.

:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

import uuid
from logging import getLogger

from thriftworker.utils.imports import qualname
from thriftworker.utils.finalize import Finalize
from thriftworker.utils.decorators import cached_property

from thriftpool.components.base import Namespace
from thriftpool.exceptions import SystemTerminate
from thriftpool.utils.mixin import LogsMixin
from thriftpool.utils.signals import Signals

__all__ = ['Controller']

logger = getLogger(__name__)


[docs]class Controller(LogsMixin): app = None wait_for_shutdown = True register_signals = True ignore_interrupt = False RUNNING = 0x1 CLOSED = 0x2 TERMINATED = 0x3 Namespace = Namespace def __init__(self): self._starting = [] self._running = 0 self._state = None self._finalize = Finalize(self, self.stop, exitpriority=1) self.ident = uuid.uuid4() self.on_before_init() self.components = [] self.namespace = self.Namespace(app=self.app).apply(self) @cached_property def _signals(self): return Signals()
[docs] def register_signal_handler(self): self._debug('Register signal handlers') if not self.ignore_interrupt: self._signals['SIGINT'] = lambda signum, frame: self.stop() else: self._signals['SIGINT'] = lambda signum, frame: None self._signals['SIGTERM'] = lambda signum, frame: self.stop() self._signals['SIGQUIT'] = lambda signum, frame: self.terminate()
[docs] def on_before_init(self): self.app.loader.on_before_init(self) self.app.finalize()
[docs] def on_start(self): self.app.loader.on_start()
[docs] def after_start(self): self.app.loader.after_start()
[docs] def on_shutdown(self): self.app.loader.on_shutdown()
[docs] def start(self): self._state = self.RUNNING if self.register_signals: self.register_signal_handler() self.on_start() try: for component in self.components: self._debug('Starting %s...', qualname(component)) if component is not None: self._starting.append(component) component.start() self._running += 1 self._debug('%s OK!', qualname(component)) except SystemTerminate as exc: self._debug('Terminating server: %r', exc, exc_info=True) self.terminate() except Exception as exc: self._error('Unrecoverable error: %r', exc, exc_info=True) self.terminate() except (KeyboardInterrupt, SystemExit): self._debug('Terminating from keyboard') self.stop() finally: self._starting = [] self._info('Controller %r started!', self) self.after_start() self._signals.wait() self._info('Controller %r stopped!', self)
def _shutdown(self, warm=True): what = 'Stopping' if warm else 'Terminating' if self._state in (self.CLOSED, self.TERMINATED): return try: if not self.is_running or \ self._running != len(self.components): # Call abort on starting components. for component in self._starting[:]: self._debug('Aborting %s...', qualname(component)) if hasattr(component, 'abort'): component.abort() # Not fully started, can safely exit. self._state = self.TERMINATED return self._state = self.CLOSED for component in reversed(self.components): self._debug('%s %s...', what, qualname(component)) if component: stop = component.stop if not warm: stop = getattr(component, 'terminate', None) or stop stop() self.on_shutdown() self._debug('Whole controller stopped!') self._state = self.TERMINATED except Exception as exc: self._error('Unrecoverable error: %r', exc, exc_info=True) finally: self._signals.set() @property
[docs] def is_running(self): return self._state == self.RUNNING
[docs] def stop(self): """Graceful shutdown of the worker server.""" self._debug('Try to stop controller!') self._shutdown(warm=True)
[docs] def terminate(self): """Not so graceful shutdown of the worker server.""" self._debug('Try to terminate controller!') self._shutdown(warm=False)

Project Versions

This Page