Welcome to Loafer’s documentation!¶
Loafer is an asynchronous message dispatcher for concurrent tasks processing.
Quickstart¶
Installation¶
Requirements¶
Python 3.5+
Note:
Some packages also needs python3.5-dev package (ubuntu) or similar
To install via pip:
$ pip install loafer
Basic configuration¶
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
Tutorial¶
For this tutorial we assume you already have loafer
installed and your
aws credentials configured.
Let’s create a repository named foobar
with the following structure:
foobar/
handlers.py
routes.py
__main__.py
__init__.py # empty file
The handlers.py
:
import asyncio
async def print_handler(message, *args):
print('message is {}'.format(message))
print('args is {}'.format(args))
# mimic IO processing
await asyncio.sleep(0.1)
return True
async def error_handler(exc_type, exc, message):
print('exception {} received'.format(exc_type))
# do not delete the message that originated the error
return False
The routes.py
:
from loafer.ext.aws.routes import SQSRoute
from .handlers import print_handler, error_handler
# assuming a queue named "loafer-test"
routes = (
SQSRoute('loafer-test', {'options': {'WaitTimeSeconds': 3}},
handler=print_handler,
error_handler=error_handler),
)
The __main__.py
:
from loafer.managers import LoaferManager
from .routes import routes
manager = LoaferManager(routes=routes)
manager.run()
To execute:
$ python -m foobar
To see any output, publish some messages using AWS dashboard or utilities like awscli.
For example:
$ aws sqs send-message --queue-url http://<url-part>/loafer-test --message-body '{"key": true}'
User Guide¶
Overview¶
Loafer is an asynchronous message dispatcher for concurrent tasks processing.
To take full advantage, your tasks:
- Should use asyncio
- Should be I/O bounded
- Should be decoupled from the message producer
If your task are CPU bounded, you should look for projects that use
multiprocessing
python module or similar.
We don’t require the use of asyncio, but any code that’s not a coroutine will run in thread, outside the event loop. Performance and error handling might be compromised in this scenarios, you might want to look for other alternatives.
If your code are too tied to the message producer, you might end up writing
too much boilerplate code in order to use Loafer
.
Components¶
The main components inside Loafer are:
- Manager
The manager is responsible to setup the event event loop and handle system errors.
It prepares everything needed to run and starts the dispatcher.
- Dispatcher
The dispatcher starts the providers, schedules the message routing and message acknowledgment.
- Provider
The provider is responsible for retrieving messages and delete it when requested.
The act is deleting a message is also known as message acknowledgment.
At the moment, we only have provider for AWS SQS service.
- Message Translator
The message translator is the contract between provider and handler.
At the moment, the message translator receives the “raw message” and transform it to an appropriate format that is expected by the handler.
It may also add metadata information, if available.
- Route
The route is the link between the provider and handler. It is responsible to deliver the message to handler and receive its confirmation.
- Handler
Handler or task/job, the callable that will receive the message.
Error Handler
The optional callback to handle any errors in message translation or in the handler processing.
Settings¶
AWS¶
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
Exceptions¶
All exceptions are defined at loafer.exceptions
module.
A description of all the exceptions:
ProviderError
: a fatal error from a provider instance. Loafer will stop all operations and shutdown.ConfigurationError
: a configuration error. Loafer will stop all operations and shutdown.DeleteMessage
: if any Handlers raises this exception, the message will be rejected and acknowledged (the message will be deleted).LoaferException
: the base exception forDeleteMessage
.
Handlers¶
Handlers are callables that receives the message.
The handler
signature should be similar to:
async def my_handler(message, metadata):
... code ...
return True # or False
Where message
is the message to be processed and metadata
is a dict
with metadata information.
The async def
is the python coroutine syntax, but regular functions
can also be used, but will run in a thread, outside the event loop.
The return value indicates if the handler
successfully processed the
message or not.
By returning True
the message will be acknowledged (deleted).
Another way to acknowledge messages inside a handler is to raise
DeleteMessage
exception.
Any other exception will be redirected to an error_handler
, see Error Handlers.
The default error_handler
will log the error and not acknowledge the message.
Error Handlers¶
Every Route
implements a coroutine error_handler
that can be used as an error hook.
This hook are called when unhandled exceptions happen and by default it will only log the
error and not acknowledge the message.
Route
accepts a callable parameter, error_handler
, so you can pass a custom function or
coroutine to replace the default error handler behavior.
The callable should be similar to:
async def custom_error_handler(exc_type, exc, message):
... custom code here ...
return True
# Route(..., error_handler=custom_error_handler)
The return value determines if the message that originated the error will be acknowledged or not.
True
means acknowledge it, False
will only ignore the message (default behavior).
Sentry¶
To integrate with sentry you will need the raven client and your account DSN.
Then you can automatically create an error_handler
with the following code:
from loafer.ext.sentry import sentry_handler
from raven import Client
client = Client(dsn=..., **options)
error_handler = sentry_handler(client, delete_message=True)
The optional delete_message
parameter controls the message acknowledgement
after the error report. By default, delete_message
is False
.
The error_handler
defined can be set on any Route
instance.
Message Translators¶
The message translator receives a “raw message” and process it to a suitable
format expected by the handler
.
The “raw message” is the message received by the provider
“as-is” and
it might be delivered without any processing if the message translator was
not set.
In some cases, you should explicitly set message_translator=None
to disable
any configured translators.
Implementation¶
The message translator class should implement the method translate
like:
class MyMessageTranslator:
def translate(self, message):
return {'content': int(message), 'metadata': {}}
And it should return a dictionary in the format:
return {'content': processed_message, 'metadata': {}}
The processed_message
and metadata
(optional) will be delivered to
handler
.
If processed_message
is None
(or empty) the message will cause
ValueError
exception.
All the exceptions in message translation will be caught by the configured Error Handlers.
Providers¶
Providers are responsible to retrieve messages and acknowledge it (delete from source).
SQSProvider¶
SQSProvider
(located at loafer.ext.aws.providers
) receives the following options:
queue_name
: the queue nameendpoint_url
(optional): the base url, only usefull if you don’t use AWSuse_ssl
(optional, default=True): SSL usageoptions
: (optional): adict
with SQS options to retrieve messages. Example:{'WaitTimeSeconds: 5, 'MaxNumberOfMessages': 5}
Usually, the provider are not configured manually, but set by Routes and it’s helper classes.
Routes¶
A Route
aggregate all the main entities, the generic parameters are:
provider
: a provider instancehandler
: a handler instanceerror_handler
(optional): an error handler instancemessage_translator
(optional): a message translator instancename
(optional): a name for this route
We provide some helper routes, so you don’t need to setup all this boilerplate code:
loafer.ext.aws.routes.SQSRoute
: a route that configures aloafer.ext.aws.providers.SQSProvider
andloafer.ext.aws.message_translators.SQSMessageTranslator
. A route for handlers that consume messages from SQS queue (expects json format messages).loafer.ext.aws.routes.SNSQueueRoute
: a route that configures aloafer.ext.aws.providers.SQSProvider
andloafer.ext.aws.message_translators.SNSMessageTranslator
. A route for handlers that consume messages from a SQS queue subscribed to a SNS topic (expects json format messages).
Examples¶
Some examples of route creation:
from loafer.ext.aws.routes import SQSRoute, SNSQueueRoute
# regular route
route1 = SQSRoute('my-queue1', handler=..., name='route1')
# route with custom SQSProvider parameters
route2 = SNSQueueRoute('my-queue2', {'use_ssl': False, 'options': {'WaitTimeSeconds': 4}}, handler=..., name='route2')
# route with custom message translator
route3 = SQSRoute('my-queue3', message_translator=custom_message_translator, handler=...)
# route with custom error handler
route4 = SNSQueueRoute('my-queue4', handler=..., error_handler=custom_error_handler)
Managers¶
Managers are responsible for the execution of all the components.
The LoaferManager
(at loafer.managers
) receives a list
of Routes.
Every service/application using loafer
should instantiate a manager:
from loafer.managers import LoaferManager
from .routes import routes # the list of routes
manager = LoaferManager(routes=routes)
manager.run()
The default execution mode will run indefinitely. To run only one iteration of your services, the last line in the code above should be replaced with:
manager.run(forever=False)
The “one iteration” could be a little tricky. For example, if you have one provider that fetches two messages at time, it means your handler will be called twice (one for each message) and then stop.
Publisher¶
The publisher is only used for testing purposes, so we can have an easy way to validate some message translation process or handler execution.
It is responsible to publish a message in a provider source (queue).
It can be usefull to emulates the behavior of a given handler without the real message producer.
Development¶
Development Installation¶
Development install¶
After forking or checking out:
$ cd loafer/
$ python setup.py develop
$ pip install -r requirements/local.txt
$ pip install -r requirements/test.txt
$ pre-commit install
The requirements folder are only used for development, so we can easily install/track dependencies required to run the tests using continuous integration platforms.
The official entrypoint for distritubution is the setup.py
which also
contains the minimum requirements to execute the tests.
It’s important to execute setup.py develop
not only to install the main
dependencies, but also to include loafer
CLI in our environment.
Running tests:
$ make test
Generating documentation:
$ cd docs/
$ make html
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
Check the Settings section to see specific configurations.
Release¶
To release a new version, a few steps are required:
- Update version/release number in
docs/source/conf.py
- Add entry to
CHANGES.rst
and documentation - Review changes in test requirements (
requirements/test.txt
andsetup.py
) - Test with
python setup.py test
andmake test-cov
- Test build with
make dist
- Commit changes
- Release with
make release
Other¶
FAQ¶
1. How do I run I/O blocking code that’s not a coroutine ?
Any code that is blocking and not a coroutine could run in a separate thread.
It’s not recommended, but it looks like this:
import asyncio loop = asyncio.get_event_loop() loop.run_in_executor(None, your_callable, your_callable_args) # Important: do not close/stop the loop
2. How to integrate with newrelic ?
The newrelic should be the primary source of information. One alternative is to use environment variables
NEW_RELIC_LICENSE_KEY
andNEW_RELIC_APP_NAME
and for every handler:import newrelic.agent @newrelic.agent.background_task() def some_code(...): ...
Changelog¶
1.0.1 (2017-04-09)¶
- Add tox and execute tests for py36
- Update aiohttp/aiobotocore versions
- Minor fixes and enhancements
1.0.0 (2017-03-27)¶
- Major code rewrite
- Remove CLI
- Add better support for error handlers, including sentry/raven
- Refactor exceptions
- Add message metadata information
- Update message lifecycle with handler/error handler return value
- Enable execution of one service iteration (by default, it still runs “forever”)
0.0.3 (2016-04-24)¶
- Improve documentation
- Improve package metadata and dependencies
- Add loafer.aws.message_translator.SNSMessageTranslator class
- Fix ImportError exceptions for configuration that uses loafer.utils.import_callable
0.0.2 (2016-04-18)¶
- Fix build hardcoding tests dependencies
0.0.1 (2016-04-18)¶
- Initial release
Contributors¶
Thanks to:¶
- allisson <allisson@olist.com>
- cleberzavadniak <cleber.zavadniak@olist.com>
- lamenezes <luiz.menezes@olist.com>