# This file is part of PubMarine.
#
# PubMarine is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Foobar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with PubMarine. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright: 2017, Toshio Kuratomi
# License: LGPLv3+
"""
PubMarine is a simple PubSub framework for Python3's asyncio.
Authors: Toshio Kuratomi <toshio@fedoraproject.org
"""
import asyncio
import warnings
from collections import defaultdict
from functools import partial
import types
from typing import Any, Callable, DefaultDict as DefaultDict_t, Dict, Generator, List, Union
from weakref import WeakMethod, ref
__version__ = '0.4.3'
__version_info__ = ('0', '4', '3')
[docs]class PubMarineError(Exception):
""" Base of all errors specific to PubMarine
"""
pass
[docs]class EventNotFoundError(PubMarineError):
""" Raised when an event is not handled by this PubPen
"""
pass
[docs]class PubPen:
"""
A PubPen object coordinates subscription and publication.
Use :meth:`PubPen.subscribe` to register callbacks to be invoked when an event is published.
Use :meth:`PubPen.publish` to publish an event, invoking the callbacks.
Callbacks will be queued to be executed by the :mod:`asyncio` event loop that is passed into the
:class:`PubPen` when it is instantiated.
.. note:: Most programs should create one PubPen instance and then share it between
all of the objects that wish to communicate with each other.
"""
def __init__(self, loop: asyncio.AbstractEventLoop, event_list: List[str] = None) -> None:
"""
:arg loop: Event loop (asyncio compatible) to use.
:kwarg event_list: If given, event_list is a list of allowed
event_names. If not given, any name can be subscribed to on the
fly. Dynamic event_lists are convenient. Statically defined
lists provide protection against typos.
"""
self.loop = loop
self._next_id = self._id_generator()
self._subscriptions = {} # type: Dict
if event_list is not None:
self._event_list = frozenset(event_list)
else:
self._event_list = frozenset()
self._event_handlers = defaultdict(dict) # type: DefaultDict_t[str, Dict]
# This has to be a method because the ids increment per-instance. We don't have to use self
# because the generator itself maintains state.
def _id_generator(self) -> Generator[int, None, None]: # pylint: disable=no-self-use
"""Generate a new unique event id on this :class:`PubPen` instance"""
i = 0
while True:
yield i
i += 1
[docs] def subscribe(self, event: str, callback: Union[Callable[..., Any], types.MethodType]) -> int:
""" Subscribe a callback to an event
:arg event: String name of an event to subscribe to
:callback: The function to call when the event is published. This can
be any python callable.
Use :func:`functools.partial` to call the callback with any other
arguments.
.. note:: The callback is registered with the event each time this
method is called. The callback is called each time it has been
registered when the event is published. For example::
>>> import asyncio
>>> import pubmarine
>>> pubpen = pubmarine.PubPen(asyncio.get_event_loop)
>>> def message():
... print('message called')
>>> pubpen.subscribe('test', message)
>>> pubpen.subscribe('test', message)
>>> pubpen.publish('test')
message called
message called
If the caller wants the callback to only be called once, it is the
caller's responsibility to only subscribe the callback once.
"""
if self._event_list and event not in self._event_list:
raise EventNotFoundError('{} is not a registered event'
.format(event))
# Get an id for the subscription
sub_id = next(self._next_id)
self._subscriptions[sub_id] = event
if isinstance(callback, types.MethodType):
# Add a method
self._event_handlers[event][sub_id] = WeakMethod(callback)
else:
# Add a function
self._event_handlers[event][sub_id] = ref(callback)
return sub_id
[docs] def unsubscribe(self, sub_id: int) -> None:
"""Unsubscribe from an event.
:arg sub_id: The subscription id returned from subscribe.
"""
if sub_id in self._subscriptions:
event = self._subscriptions[sub_id]
else:
# It's okay, we just want the subscription to be gone
return
for cur_sub_id in self._event_handlers[event]:
if cur_sub_id == sub_id:
del self._event_handlers[event][sub_id]
break
del self._subscriptions[sub_id]
[docs] def publish(self, event: str, *args: Any, **kwargs: Any) -> None:
""" Publish an event
:arg event: String name of an event to publish
Other args and keyword args are passed to the callback function.
"""
if self._event_list and event not in self._event_list:
raise EventNotFoundError('{} is not a registered event'
.format(event))
removed_sub_ids = []
for sub_id, handler in self._event_handlers[event].items():
# Get the callback from the weakref
func = handler()
if func is None:
# Callback was deleted. Cleanup the weakref as well
removed_sub_ids.append(sub_id)
continue
func = partial(func, *args, **kwargs)
self.loop.call_soon(func)
# Cleanup any handlers that are no longer around
for sub_id in removed_sub_ids:
del self._event_handlers[event][sub_id]
try:
del self._subscriptions[sub_id]
except KeyError:
# It's okay. We just want this gone.
pass
[docs] def emit(self, event: str, *args: Any, **kwargs: Any) -> None:
""" Publish an event
:arg event: String name of an event to publish
Other args and keyword args are passed to the callback function.
**Deprecated**: Use publish() instead
"""
warnings.warn('PubPen.emit() is deprecated. Use PubPen.publish()'
' instead', DeprecationWarning, stacklevel=2)
self.publish(event, *args, **kwargs)