# -*- coding: utf-8 -*-
"""Implements components and boot-steps.
This file was copied and adapted from celery.
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
from collections import defaultdict
from importlib import import_module
from logging import getLogger
from six import with_metaclass
from thriftworker.utils.imports import instantiate
from thriftpool.utils.mixin import LogsMixin
from thriftpool.utils.structures import DependencyGraph
logger = getLogger(__name__)
[docs]class Namespace(LogsMixin):
"""A namespace containing components.
Every component must belong to a namespace.
When component classes are created they are added to the
mapping of unclaimed components. The components will be
claimed when the namespace they belong to is created.
:keyword name: Set the name of this namespace.
:keyword app: Set the Celery app for this namespace.
"""
name = None
_unclaimed = defaultdict(dict)
_started_count = 0
def __init__(self, name=None, app=None):
self.app = app
self.name = name or self.name
self.services = []
[docs] def modules(self):
"""Subclasses can override this to return a
list of modules to import before components are claimed."""
return []
[docs] def load_modules(self):
"""Will load the component modules this namespace depends on."""
for m in self.modules():
self.import_module(m)
[docs] def apply(self, parent, **kwargs):
"""Apply the components in this namespace to an object.
This will apply the ``__init__`` and ``include`` methods
of each components with the object as argument.
For ``StartStopComponents`` the services created
will also be added the the objects ``components`` attribute.
"""
self._debug("Loading modules.")
self.load_modules()
self._debug("Claiming components.")
self.components = self._claim()
self._debug("Building boot step graph.")
self.boot_steps = [self.bind_component(name, parent, **kwargs)
for name in self._finalize_boot_steps()]
self._debug("New boot order: {%s}",
', '.join(c.name for c in self.boot_steps))
for component in self.boot_steps:
component.include(parent)
return self
[docs] def bind_component(self, name, parent, **kwargs):
"""Bind component to parent object and this namespace."""
comp = self[name](parent, **kwargs)
comp.namespace = self
return comp
[docs] def import_module(self, module):
return import_module(module)
def __getitem__(self, name):
return self.components[name]
def _find_last(self):
for C in self.components.itervalues():
if C.last:
return C
def _finalize_boot_steps(self):
G = self.graph = \
DependencyGraph((C.name, C.requires)
for C in self.components.itervalues())
last = self._find_last()
if last:
for obj in G:
if obj != last.name:
G.add_edge(last.name, obj)
return G.topsort()
def _claim(self):
return self._unclaimed[self.name]
[docs]class ComponentType(type):
"""Metaclass for components."""
def __new__(cls, name, bases, attrs):
if name == 'NewBase':
return super(ComponentType, cls).__new__(cls, name, bases, attrs)
abstract = attrs.pop("abstract", False)
if not abstract:
try:
cname = attrs["name"]
except KeyError:
raise NotImplementedError("Components must be named")
namespace = attrs.get("namespace", None)
if not namespace:
attrs["namespace"], _, attrs["name"] = cname.partition('.')
cls = super(ComponentType, cls).__new__(cls, name, bases, attrs)
if not abstract:
Namespace._unclaimed[cls.namespace][cls.name] = cls
return cls
[docs]class Component(with_metaclass(ComponentType)):
"""A component.
The :meth:`__init__` method is called when the component
is bound to a parent object, and can as such be used
to initialize attributes in the parent object at
parent instantiation-time.
"""
#: The name of the component, or the namespace
#: and the name of the component separated by dot.
name = None
#: List of component names this component depends on.
#: Note that the dependencies must be in the same namespace.
requires = ()
#: can be used to specify the namespace,
#: if the name does not include it.
namespace = None
#: if set the component will not be registered,
#: but can be used as a component base class.
abstract = True
#: Optional obj created by the :meth:`create` method.
#: This is used by StartStopComponents to keep the
#: original service object.
obj = None
#: This flag is reserved for the workers Consumer,
#: since it is required to always be started last.
#: There can only be one object marked with last
#: in every namespace.
last = False
#: This provides the default for :meth:`include_if`.
enabled = True
def __init__(self, parent, **kwargs):
pass
[docs] def create(self, parent):
"""Create the component."""
raise NotImplementedError('subclass responsibility')
[docs] def include_if(self, parent):
"""An optional predicate that decided whether this
component should be created."""
return self.enabled
[docs] def instantiate(self, qualname, *args, **kwargs):
return instantiate(qualname, *args, **kwargs)
[docs] def include(self, parent):
if self.include_if(parent):
self.obj = self.create(parent)
return True
[docs]class StartStopComponent(Component):
abstract = True
terminable = False
[docs] def start(self):
if self.obj is None:
return
self.obj.start()
[docs] def abort(self):
if self.obj is None:
return
if hasattr(self.obj, 'abort'):
self.obj.abort()
[docs] def stop(self):
if self.obj is None:
return
self.obj.stop()
[docs] def terminate(self):
if self.obj is None:
return
if self.terminable:
self.obj.terminate()
self.obj.stop()
[docs] def include(self, parent):
if super(StartStopComponent, self).include(parent):
parent.components.append(self)