Welcome to Loafer’s documentation!¶
Loafer is an asynchronous message dispatcher for concurrent tasks processing.
Quickstart¶
Installation¶
Requirements¶
Python 3.6+
Note:
Some packages also needs python3.5-dev package (ubuntu) or similar
To install via pip:
$ pip install loafer
Basic configuration¶
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
You might find some configuration tips in our FAQ as well.
Tutorial¶
For this tutorial we assume you already have loafer
installed and your
aws credentials configured.
Let’s create a repository named foobar
with the following structure:
foobar/
handlers.py
routes.py
__main__.py
__init__.py # empty file
The handlers.py
:
import asyncio
async def print_handler(message, *args):
print('message is {}'.format(message))
print('args is {}'.format(args))
# mimic IO processing
await asyncio.sleep(0.1)
return True
async def error_handler(exc_info, message):
print('exception {} received'.format(exc_info))
# do not delete the message that originated the error
return False
The routes.py
:
from loafer.ext.aws.routes import SQSRoute
from .handlers import print_handler, error_handler
# assuming a queue named "loafer-test"
routes = (
SQSRoute('loafer-test', {'options': {'WaitTimeSeconds': 3}},
handler=print_handler,
error_handler=error_handler),
)
The __main__.py
:
from loafer.managers import LoaferManager
from .routes import routes
manager = LoaferManager(routes=routes)
manager.run()
To execute:
$ python -m foobar
To see any output, publish some messages using AWS dashboard or utilities like awscli.
For example:
$ aws sqs send-message --queue-url http://<url-part>/loafer-test --message-body '{"key": true}'
User Guide¶
Overview¶
Loafer is an asynchronous message dispatcher for concurrent tasks processing.
To take full advantage, your tasks:
- Should use asyncio
- Should be I/O bounded
- Should be decoupled from the message producer
If your task are CPU bounded, you should look for projects that use
multiprocessing
python module or similar.
We don’t require the use of asyncio, but any code that’s not a coroutine will run in thread, outside the event loop. Performance and error handling might be compromised in this scenarios, you might want to look for other alternatives.
If your code are too tied to the message producer, you might end up writing
too much boilerplate code in order to use Loafer
.
Components¶
The main components inside Loafer are:
- Manager
The manager is responsible to setup the event event loop and handle system errors.
It prepares everything needed to run and starts the dispatcher.
- Dispatcher
The dispatcher starts the providers, schedules the message routing and message acknowledgment.
- Provider
The provider is responsible for retrieving messages and delete it when requested.
The act is deleting a message is also known as message acknowledgment.
At the moment, we only have provider for AWS SQS service.
- Message Translator
The message translator is the contract between provider and handler.
At the moment, the message translator receives the “raw message” and transform it to an appropriate format that is expected by the handler.
It may also add metadata information, if available.
- Route
The route is the link between the provider and handler. It is responsible to deliver the message to handler and receive its confirmation.
- Handler
Handler or task/job, the callable that will receive the message.
Error Handler
The optional callback to handle any errors in message translation or in the handler processing.
The message lifecycle¶
A simplified view of a message lifecycle is illustrated below:
Settings¶
AWS¶
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
You might find some configuration tips in our FAQ as well.
Exceptions¶
All exceptions are defined at loafer.exceptions
module.
A description of all the exceptions:
ProviderError
: a fatal error from a provider instance. Loafer will stop all operations and shutdown.ConfigurationError
: a configuration error. Loafer will stop all operations and shutdown.DeleteMessage
: if any Handlers raises this exception, the message will be rejected and acknowledged (the message will be deleted).LoaferException
: the base exception forDeleteMessage
.
Handlers¶
Handlers are callables that receives the message.
The handler
signature should be similar to:
async def my_handler(message, metadata):
... code ...
return True # or False
Where message
is the message to be processed and metadata
is a dict
with metadata information.
The async def
is the python coroutine syntax, but regular functions
can also be used, but will run in a thread, outside the event loop.
The return value indicates if the handler
successfully processed the
message or not.
By returning True
the message will be acknowledged (deleted).
Another way to acknowledge messages inside a handler is to raise
DeleteMessage
exception.
Any other exception will be redirected to an error_handler
, see more
Error Handlers.
The default error_handler
will log the error and not acknowledge the message.
For some generic handlers that can give you a starting point, take a look at Generic Handlers section.
Class-based handlers¶
You can also write handlers using classes. The class should implement a
handle
coroutine/method:
class MyHandler:
async def handle(self, message, *args):
... code ...
return True
def stop(self):
... clean-up code ...
The method stop
is optional and will be called before loafer shutdown it’s
execution. Note that stop
is not a coroutine.
When configuring your Routes, you can set handler
to an instance of
MyHandler
instead of the handle
(the callable) method (but both ways work):
Route(handler=MyHandler(), ...)
# or
Route(handler=MyHandler().handle)
Message dependency¶
Handlers are supposed to be stateless or have limited dependency on message values. Since the same handler instance object are used to process the incoming messages, we can’t guarantee that an attached value will be kept among several concurrent calls to the same handler.
This might be hard to detect in production and probably is an undesired side-effect:
class Handler:
async def foo(self):
# do something with `self.some_value`
print(self.some_value)
... code ...
async def handle(self, message, *args):
self.some_value = message['foo']
await self.foo()
return True
Error Handlers¶
Every Route
implements a coroutine error_handler
that can be used as an error hook.
This hook are called when unhandled exceptions happen and by default it will only log the
error and not acknowledge the message.
Route
accepts a callable parameter, error_handler
, so you can pass a custom function or
coroutine to replace the default error handler behavior.
The callable should be similar to:
async def custom_error_handler(exc_info, message):
... custom code here ...
return True
# Route(..., error_handler=custom_error_handler)
The return value determines if the message that originated the error will be acknowledged or not.
True
means acknowledge it, False
will only ignore the message (default behavior).
Sentry¶
To integrate with sentry you will need the sdk client and your account DSN.
Then you can automatically create an error_handler
with the following code:
import sentry_sdk
from loafer.ext.sentry import sentry_handler
sentry_sdk.init(...)
error_handler = sentry_handler(sentry_sdk, delete_message=True)
The optional delete_message
parameter controls the message acknowledgement
after the error report. By default, delete_message
is False
.
The error_handler
defined can be set on any Route
instance.
Generic Handlers¶
Here are some handlers that are easy to extend or use directly.
loafer.ext.aws.handlers.SQSHandler¶
A handler that publishes messages to a SQS queue.
For instance, if we just want just redirect a message to a queue named “my-queue”:
# in your route definition
from loafer.ext.aws.handlers import SQSHandler
Route(handler=SQSHandler('my-queue'), ...)
The handler assumes a message that could be json encoded (usually a python dict
instance).
You can customize this handler by subclassing it:
class MyHandler(SQSHandler):
queue_name = 'my-queue'
async def handle(self, message, *args):
text = message['text']
return await self.publish(text, encoder=None)
In the example above, we are disabling the message encoding by passing None
to the publish
coroutine.
The encoder
parameter should be a callable that receives the message to be encoded.
By default, it assumes json.dumps
.
Take a note how queue_name was set. You can configure it when instantiate the handler or set the class attribute queue_name, both are valid and the attribute is mandatory. You can also use the queue URL directly, if you prefer.
loafer.ext.aws.handlers.SNSHandler¶
A handler that publishes messages to a SNS topic.
This handler is very similar to SQSHandler, so going to a similar example:
from loafer.ext.aws.handlers import SNSHandler
class MyHandler(SNSHandler):
topic = 'my-topic'
async def handle(self, message, *args):
text = message['text']
return await self.publish(text, encoder=None)
The handler also provides a publish
coroutine with an encoder
parameter
that works in the same way as with SQSHandler, except it will publish in a
SNS topic instead a queue.
The SNSHandler also assumes a message that could be json encoded and the
encoder default to json.dumps
.
The topic is also mandatory and should be configured in the class definition or when creating the handler instance.
You can set either the topic name or the topic arn (recommended), but when
using the topic name the handler will use wildcards
to match the topic arn.
More details about SNS wildcards are available in their documentation.
Message Translators¶
The message translator receives a “raw message” and process it to a suitable
format expected by the handler
.
The “raw message” is the message received by the provider
“as-is” and
it might be delivered without any processing if the message translator was
not set.
In some cases, you should explicitly set message_translator=None
to disable
any configured translators.
Implementation¶
The message translator class should subclass AbstractMessageTranslator
and
implement the translate
method like:
from loafer.message_translators import AbstractMessageTranslator
class MyMessageTranslator(AbstractMessageTranslator):
def translate(self, message):
return {'content': int(message), 'metadata': {}}
And it should return a dictionary in the format:
{'content': processed_message, 'metadata': {}}
The processed_message
and metadata
(optional) will be delivered to
handler
.
If processed_message
is None
(or empty) the message will cause
ValueError
exception.
All the exceptions in message translation will be caught by the configured Error Handlers.
The existing message translators are described below.
loafer.message_translators.StringMessageTranslator¶
A message translator that translates the given message to a string (python str).
loafer.ext.aws.message_translators.SQSMessageTranslator¶
A message translator that translates SQS messages. The expected message body
is a json payload that will be decoded with json.loads
.
All the keys will be kept in metadata
key dict
(except Body
that was previously translated).
loafer.ext.aws.message_translators.SNSMessageTranslator¶
A message translator that translates SQS messages that came from SNS topic.
The expected notification message is a json payload that will be decoded
with json.loads
.
SNS notifications wraps (and encodes) the message inside the body of a SQS
message, so the SQSMessageTranslator
will fail to properly
translate those messages (or at least, fail to translate to the expected format).
All the keys will be kept in metadata
key dict
(except Body
).
For more details about message translators usage, check the Routes examples.
Providers¶
Providers are responsible to retrieve messages and acknowledge it (delete from source).
SQSProvider¶
SQSProvider
(located at loafer.ext.aws.providers
) receives the following options:
queue_name
: the queue nameoptions
: (optional): adict
with SQS options to retrieve messages. Example:{'WaitTimeSeconds: 5, 'MaxNumberOfMessages': 5}
Also, you might override any of the parameters below from boto library (all optional):
api_version
aws_access_key_id
aws_secret_access_key
aws_session_token
endpoint_url
region_name
use_ssl
verify
Check boto3 client documentation for detailed information.
Usually, the provider are not configured manually, but set by Routes and it’s helper classes.
Routes¶
A Route
aggregate all the main entities previously described, the generic parameters are:
provider
: a provider instancehandler
: a handler instanceerror_handler
(optional): an error handler instancemessage_translator
(optional): a message translator instancename
(optional): a name for this route
We provide some helper routes, so you don’t need to setup all this boilerplate code:
loafer.ext.aws.routes.SQSRoute
: a route that configures aloafer.ext.aws.providers.SQSProvider
andloafer.ext.aws.message_translators.SQSMessageTranslator
.A route for handlers that consume messages from SQS queue (expects json format messages).
loafer.ext.aws.routes.SNSQueueRoute
: a route that configures aloafer.ext.aws.providers.SQSProvider
andloafer.ext.aws.message_translators.SNSMessageTranslator
.A route for handlers that consume messages from a SQS queue subscribed to a SNS topic (expects json format messages).
Examples¶
Some examples of route creation:
from loafer.ext.aws.routes import SQSRoute, SNSQueueRoute
from loafer.message_translators import StringMessageTranslator
# regular route
route1 = SQSRoute('my-queue1', handler=..., name='route1')
# route with custom SQSProvider parameters
route2 = SNSQueueRoute('my-queue2', {'use_ssl': False, 'options': {'WaitTimeSeconds': 4}}, handler=..., name='route2')
# route with custom message translator
route3 = SQSRoute('my-queue3', message_translator=StringMessageTranslator(), handler=...)
# route disabling message translation
route4 = SNSQueueRoute('my-queue4', message_translator=None, handler=...)
# route with custom error handler
route5 = SQSRoute('my-queue5', handler=..., error_handler=custom_error_handler)
Managers¶
Managers are responsible for the execution of all the components.
The LoaferManager
(at loafer.managers
) receives a list
of Routes.
Every service/application using loafer
should instantiate a manager:
from loafer.managers import LoaferManager
from .routes import routes # the list of routes
manager = LoaferManager(routes=routes)
manager.run()
The default execution mode will run indefinitely. To run only one iteration of your services, the last line in the code above should be replaced with:
manager.run(forever=False)
The “one iteration” could be a little tricky. For example, if you have one provider that fetches two messages at time, it means your handler will be called twice (one for each message) and then stop.
Development¶
Development Installation¶
Development install¶
After forking or checking out:
$ cd loafer/
$ pip install -r requirements/local.txt
$ pre-commit install
$ pip install -e .
The requirements folder are only used for development, so we can easily install/track dependencies required to run the tests using continuous integration platforms.
The official entrypoint for distritubution is the setup.py
which also
contains the minimum requirements to execute the tests.
It’s important to execute pip install -e .
not only to install the main
dependencies, but also to include loafer
in our environment.
Running tests:
$ make test
Generating documentation:
$ cd docs/
$ make html
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
Check the Settings section to see specific configurations.
Release¶
To release a new version, a few steps are required:
- Update version/release number in
docs/source/conf.py
- Add entry to
CHANGES.rst
and documentation - Review changes in test requirements (
requirements/test.txt
andsetup.py
) - Test with
python setup.py test
andmake test-cov
- Test build with
make dist
- Commit changes
- Release with
make release
Other¶
FAQ¶
1. How do I run I/O blocking code that’s not a coroutine ?
Any code that is blocking and not a coroutine could run in a separate thread.
It’s not recommended, but it looks like this:
import asyncio loop = asyncio.get_event_loop() loop.run_in_executor(None, your_callable, your_callable_args) # Important: do not close/stop the loop
2. How to integrate with newrelic ?
The newrelic should be the primary source of information. One alternative is to use environment variables
NEW_RELIC_LICENSE_KEY
andNEW_RELIC_APP_NAME
and for every handler:import newrelic.agent @newrelic.agent.background_task() def some_code(...): ...
3. Using different regions/credentials with SQSRoute/SNSRoute ?
SQSRoute
/SNSRoute
instantiatesloafer.ext.aws.providers.SQSProvider
, therefore you can dinamically set these options to any of Providers available.An example with explicity AWS credentials and provider options would look like:
from loafer.ext.aws.routes import SQSRoute route = SQSRoute( 'test-queue-name', name='my-route', handler=some_handler, provider_options={ 'aws_access_key_id': my_aws_access_key, 'aws_secret_access_key': my_secret_key, 'region_name': 'sa-east-1', 'options': {'WaitTimeSeconds': 3}, }, )
Changelog¶
2.0.1 (2020-07-28)¶
- Fix setry integration (# by @hartungstenio)
- Minor improvements
2.0.0 (2020-05-16)¶
- Dropped support for Python 3.5
- Added support for Python 3.8
- Update aiobotocore dependency version (#61 by @GuilhermeVBeira)
- Improvements due to changes in asyncio (#48, #52 by @lamenezes)
- Sentry wrapper/helper updated to support new sdk (wip)
- Minor documentation improvements
1.3.2 (2019-04-27)¶
- Improve message processing (#48 by @lamenezes)
- Improve error logging (#39 by @wiliamsouza)
- Refactor in message dispatcher and event-loop shutdown
- Minor fixes and improvements
1.3.1 (2017-10-22)¶
- Improve performance (#35 by @allisson)
- Fix requirement versions resolution
- Minor fixes and improvements
1.3.0 (2017-09-26)¶
- Refactor tasks dispatching, it should improve performance
- Refactor SQSProvider to ignore HTTP 404 errors when deleting messages
- Minor fixes and improvements
1.2.1 (2017-09-11)¶
- Bump boto3 version (by @daneoshiga)
1.2.0 (2017-08-15)¶
- Enable provider parameters (boto client options)
1.1.1 (2017-06-14)¶
- Bugfix: fix SNS prefix value in use for topic name wildcard (by @lamenezes)
1.1.0 (2017-05-01)¶
- Added initial contracsts for class-based handlers
- Added generic handlers: SQSHandler/SNSHander
- Improve internal error handling
- Improve docs
1.0.2 (2017-04-13)¶
- Fix sentry error handler integration
1.0.1 (2017-04-09)¶
- Add tox and execute tests for py36
- Update aiohttp/aiobotocore versions
- Minor fixes and enhancements
1.0.0 (2017-03-27)¶
- Major code rewrite
- Remove CLI
- Add better support for error handlers, including sentry/raven
- Refactor exceptions
- Add message metadata information
- Update message lifecycle with handler/error handler return value
- Enable execution of one service iteration (by default, it still runs “forever”)
0.0.3 (2016-04-24)¶
- Improve documentation
- Improve package metadata and dependencies
- Add loafer.aws.message_translator.SNSMessageTranslator class
- Fix ImportError exceptions for configuration that uses loafer.utils.import_callable
0.0.2 (2016-04-18)¶
- Fix build hardcoding tests dependencies
0.0.1 (2016-04-18)¶
- Initial release
Contributors¶
Thanks to:¶
- allisson <allisson@olist.com>
- cleberzavadniak <cleber.zavadniak@olist.com>
- danilo shiga <danilo.shiga@olist.com>
- lamenezes <luiz.menezes@olist.com>
- luizdepra <luiz.pra@olist.com>
- wiliamsouza <wiliamsouza83@gmail.com>
- GuilhermeVBeira <guilherme.vieira.beira@gmail.com>
- hartungstenio <hartung@live.com>