Source code for thriftpool.components.base

# -*- coding: utf-8 -*-
"""Implements components and boot-steps.

This file was copied and adapted from celery.

:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

from collections import defaultdict
from importlib import import_module
from logging import getLogger

from six import with_metaclass

from thriftworker.utils.imports import instantiate

from thriftpool.utils.mixin import LogsMixin
from thriftpool.utils.structures import DependencyGraph

logger = getLogger(__name__)


[docs]class Namespace(LogsMixin): """A namespace containing components. Every component must belong to a namespace. When component classes are created they are added to the mapping of unclaimed components. The components will be claimed when the namespace they belong to is created. :keyword name: Set the name of this namespace. :keyword app: Set the Celery app for this namespace. """ name = None _unclaimed = defaultdict(dict) _started_count = 0 def __init__(self, name=None, app=None): self.app = app self.name = name or self.name self.services = []
[docs] def modules(self): """Subclasses can override this to return a list of modules to import before components are claimed.""" return []
[docs] def load_modules(self): """Will load the component modules this namespace depends on.""" for m in self.modules(): self.import_module(m)
[docs] def apply(self, parent, **kwargs): """Apply the components in this namespace to an object. This will apply the ``__init__`` and ``include`` methods of each components with the object as argument. For ``StartStopComponents`` the services created will also be added the the objects ``components`` attribute. """ self._debug("Loading modules.") self.load_modules() self._debug("Claiming components.") self.components = self._claim() self._debug("Building boot step graph.") self.boot_steps = [self.bind_component(name, parent, **kwargs) for name in self._finalize_boot_steps()] self._debug("New boot order: {%s}", ', '.join(c.name for c in self.boot_steps)) for component in self.boot_steps: component.include(parent) return self
[docs] def bind_component(self, name, parent, **kwargs): """Bind component to parent object and this namespace.""" comp = self[name](parent, **kwargs) comp.namespace = self return comp
[docs] def import_module(self, module): return import_module(module)
def __getitem__(self, name): return self.components[name] def _find_last(self): for C in self.components.itervalues(): if C.last: return C def _finalize_boot_steps(self): G = self.graph = \ DependencyGraph((C.name, C.requires) for C in self.components.itervalues()) last = self._find_last() if last: for obj in G: if obj != last.name: G.add_edge(last.name, obj) return G.topsort() def _claim(self): return self._unclaimed[self.name]
[docs]class ComponentType(type): """Metaclass for components.""" def __new__(cls, name, bases, attrs): if name == 'NewBase': return super(ComponentType, cls).__new__(cls, name, bases, attrs) abstract = attrs.pop("abstract", False) if not abstract: try: cname = attrs["name"] except KeyError: raise NotImplementedError("Components must be named") namespace = attrs.get("namespace", None) if not namespace: attrs["namespace"], _, attrs["name"] = cname.partition('.') cls = super(ComponentType, cls).__new__(cls, name, bases, attrs) if not abstract: Namespace._unclaimed[cls.namespace][cls.name] = cls return cls
[docs]class Component(with_metaclass(ComponentType)): """A component. The :meth:`__init__` method is called when the component is bound to a parent object, and can as such be used to initialize attributes in the parent object at parent instantiation-time. """ #: The name of the component, or the namespace #: and the name of the component separated by dot. name = None #: List of component names this component depends on. #: Note that the dependencies must be in the same namespace. requires = () #: can be used to specify the namespace, #: if the name does not include it. namespace = None #: if set the component will not be registered, #: but can be used as a component base class. abstract = True #: Optional obj created by the :meth:`create` method. #: This is used by StartStopComponents to keep the #: original service object. obj = None #: This flag is reserved for the workers Consumer, #: since it is required to always be started last. #: There can only be one object marked with last #: in every namespace. last = False #: This provides the default for :meth:`include_if`. enabled = True def __init__(self, parent, **kwargs): pass
[docs] def create(self, parent): """Create the component.""" raise NotImplementedError('subclass responsibility')
[docs] def include_if(self, parent): """An optional predicate that decided whether this component should be created.""" return self.enabled
[docs] def instantiate(self, qualname, *args, **kwargs): return instantiate(qualname, *args, **kwargs)
[docs] def include(self, parent): if self.include_if(parent): self.obj = self.create(parent) return True
[docs]class StartStopComponent(Component): abstract = True terminable = False
[docs] def start(self): if self.obj is None: return self.obj.start()
[docs] def abort(self): if self.obj is None: return if hasattr(self.obj, 'abort'): self.obj.abort()
[docs] def stop(self): if self.obj is None: return self.obj.stop()
[docs] def terminate(self): if self.obj is None: return if self.terminable: self.obj.terminate() self.obj.stop()
[docs] def include(self, parent): if super(StartStopComponent, self).include(parent): parent.components.append(self)

Project Versions

This Page