Source code for notifierlib.notifierlib

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# File:
Main module file

Put your classes here

import logging
import abc

from stopit import ThreadingTimeout, TimeoutException
from Queue import Queue
from threading import Thread

__author__ = '''Costas Tyfoxylos <>'''
__docformat__ = 'plaintext'
__date__ = '''18-09-2017'''

# This is the main prefix used for logging
LOGGER_BASENAME = '''notifierlib'''
LOGGER = logging.getLogger(LOGGER_BASENAME)


[docs]class Channel(object): __metaclass__ = abc.ABCMeta def __init__(self): self._logger = logging.getLogger('{base}.{suffix}' .format(base=LOGGER_BASENAME, suffix=self.__class__.__name__) ) @abc.abstractmethod
[docs] def notify(self): pass
[docs]class Group(object): def __init__(self, groupname='', *channels): self._logger = logging.getLogger('{base}.{suffix}' .format(base=LOGGER_BASENAME, suffix=self.__class__.__name__) ) = groupname self._channels = [self._validate_channel(channel) for channel in channels] self._queue = Queue() self._results = None @staticmethod def _validate_channel(channel): if not isinstance(channel, Channel): raise ValueError('The object is not a Channel') return channel def _start_workers(self): for _ in range(WORKERS): worker = Thread(target=self._worker, args=(self._queue,)) worker.setDaemon(False) worker.start() def __call__(self, **kwargs): self.send(**kwargs)
[docs] def send(self, **kwargs): self._results = [] for channel in self._channels: self._queue.put((channel, kwargs)) self._start_workers() self._logger.debug('Waiting for results') self._queue.join() self._logger.debug(('Result of notification: ' '{result}').format(result=self._results)) return self._results
def _worker(self, queue): while not queue.empty(): channel, kwargs = queue.get() self._logger.debug(('Sending notification using channel: {channel} ' 'with args:{args}').format(, args=kwargs)) try: with ThreadingTimeout(TIMEOUT): result = channel.notify(**kwargs) self._results.append({ result}) except TimeoutException: self._logger.error(('The worker reached the time limit ' '({} secs)').format(TIMEOUT)) except Exception: self._logger.exception(('Exception caught on sending on ' 'channel:{}').format( queue.task_done()
[docs]class Notifier(object): def __init__(self): self._logger = logging.getLogger('{base}.{suffix}' .format(base=LOGGER_BASENAME, suffix=self.__class__.__name__) ) self.broadcast = Group('broadcast') @property def channels(self): return [ for channel in self.broadcast._channels] # noqa
[docs] def register(self, *args): for channel in args: if not isinstance(channel, Channel): raise ValueError(('The object is not a Channel :' '[{}]').format(channel)) if in self.channels: raise ValueError('Channel already registered') self.broadcast._channels.append(channel) # noqa
[docs] def unregister(self, *args): for channel in args: if not in self.channels: raise ValueError('Channel not registered') self.broadcast._channels = [ch for ch in self.broadcast._channels # noqa if not ==]
[docs] def add_group(self, group): if not isinstance(group, Group): raise ValueError(('The object is not a Group :' '[{}]').format(group)) setattr(self,, group)
[docs] def remove_group(self, group): if not isinstance(group, Group): raise ValueError(('The object is not a Group :' '[{}]').format(group)) try: delattr(self, return True except AttributeError: self._logger.error('No such group :{}'.format( return False