Welcome to Loafer’s documentation!¶
Loafer is an asynchronous message dispatcher for concurrent tasks processing.
Quickstart¶
Installation¶
Requirements¶
Python 3.5+
Note:
Some packages also needs python3.5-dev package (ubuntu) or similar
To install via pip:
$ pip install loafer
Basic configuration¶
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
There are more two environment variables you’ll need to set:
$ export LOAFER_DEFAULT_ROUTE_SOURCE=my-queue
$ export LOAFER_DEFAULT_ROUTE_HANDLER=loafer.example.jobs.async_example_job
The LOAFER_DEFAULT_ROUTE_SOURCE
is the name of SQS queue you’ll consume messages from.
The LOAFER_DEFAULT_ROUTE_HANDLER
is a full path of the callable that will receive the message.
The value set by default in handler configuration is loafer.example.jobs.async_example_job
and it will only log the received message with warning
level.
Usage¶
Loafer provides a simple CLI interface.
Check the full list of options by running loafer --help
.
Examples¶
Starting loafer:
$ loafer
Add
source
andhandler
parameters (all parameters are optional):$ loafer --source my-queue-name --handler loafer.example.jobs.async_example_job
Publishing a message to SNS/SQS (you should see some logging messages):
# To SQS $ loafer publish_sqs --queue a-queue-name --msg 'foobar' # To SNS (considering, the queue we consume are subscribed to topic) $ loafer publish_sns --topic a-topic-name --msg 'foobar'
User Guide¶
Overview¶
Loafer is an asynchronous message dispatcher for concurrent tasks processing.
To take full advantage, your tasks:
- Should use asyncio
- Should be I/O bounded
- Should be decoupled from the message producer
If your task are CPU bounded, you should look for projects that use
multiprocessing
python module or similar.
We don’t require the use of asyncio, but any code that’s not a coroutine will run in thread, outside the event loop. Performance and error handling might be compromised in this scenarios, you might want to look for other alternatives.
If your code are too tied to the message producer, you might end writing too
much boilerplate code in order to use Loafer
.
Components¶
The main components inside Loafer are:
- Manager
The manager is responsible to setup the event event loop and handle system errors.
It prepares everything needed to run and starts the dispatcher.
- Dispatcher
The dispatcher starts the consumers, schedules the message routing and message acknowledgment.
- Consumer
The consumer is responsible for retrieving messages and delete it when requested.
The act is deleting a message is also known as message acknowledgment.
At the moment, we only have consumer for AWS SQS service.
- Message Translator
The message translator is the contract between consumer and handler.
In the future it will also help the dispatcher choose a message destination handler.
At the moment, the message translator receives the “raw message” and transform it to an appropriate format that is expected by the handler.
- Route
The route is the link between the consumer and handler. It is responsible to deliver the message to handler and receive its confirmation.
- Handler
Handler or task/job, the callable that will receive the message.
Settings¶
All configuration is done using environment variables.
You can also create an .env
or *.cfg
file in your project root, they
will be loaded automatically.
Here is a sample .env
file:
LOAFER_DEFAULT_ROUTE_SOURCE=my-queue-name
LOAFER_DEFAULT_ROUTE_HANDLER=loafer.example.jobs.async_example_job
All the possible configuration keys and its default values are listed below (entries without default values are marked as required):
Key | Default value |
---|---|
LOAFER_LOG_FORMAT | ‘%(asctime)s - %(name)s - %(levelname)s - %(message)s’ |
LOAFER_MAX_JOBS | 10 |
LOAFER_MAX_THREAD_POOL | None |
LOAFER_DEFAULT_MESSAGE_TRANSLATOR_CLASS | ‘loafer.aws.message_translator.SQSMessageTranslator’ |
LOAFER_DEFAULT_ROUTE_NAME | ‘default’ |
LOAFER_DEFAULT_ROUTE_SOURCE | required |
LOAFER_DEFAULT_ROUTE_HANDLER | ‘loafer.example.jobs.async_example_job’ |
LOAFER_DEFAULT_CONSUMER_CLASS | ‘loafer.aws.consumer.Consumer’ |
LOAFER_DEFAULT_CONSUMER_OPTIONS | {‘WaitTimeSeconds: 5, ‘MaxNumberOfMessages’: 5} |
The LOAFER_MAX_JOBS
is the number of concurrent handler
executions.
The LOAFER_MAX_THREAD_POOL
if not set, are determined automatically by
the number of cores in the machine. Threads are used to execute non-asyncio
code.
All variables that requires a class
or callable
must be a full name, e.g.,
we must be able to import it.
AWS¶
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
Exceptions and error handling¶
All exceptions are defined at loafer.exceptions
module.
A description of all the exceptions:
ConsumerError
: a fatal error from a consumer instance. Loafer will stop all operations and shutdown.ConfigurationError
: a configuration error. Loafer will stop all operations and shutdown.IgnoreMessage
: if the handler raises this exception, the message will be ignored and not acknowledged.RejectMessage
: if the handler raises this exception, the message will be rejected and acknowledged (e.g., the message will be deleted).
At the message translation phase¶
If the message translator returns an invalid message content or an unhandled exception:
- The message will not be delivered to
handler
- The message will be ignored (e.g., will not be deleted)
At the handler processing phase¶
The handler
can use IgnoreMessage
or RejectMessage
to explicity
determine how the message will be handled.
Any unhandled exceptions will have the same effect of IgnoreMessage
, but
is advised to handler
treat its own errors in case of change of behavior
in the future.
Message deletion¶
The message will be deleted (acknowledged):
- When the
handler
raisesRejectMessage
- Succesfull executions of
handler
Message Translators¶
The message translator receives a “raw message” and process it to a suitable
format expected by the handler
.
The “raw message” is the message received by the consumer
“as-is”.
It could be defined via LOAFER_DEFAULT_MESSAGE_TRANSLATOR_CLASS
setting.
Implementation¶
The message translator class should implement the method translate
like:
class MyMessageTranslator(object):
def translate(self, message):
return {'content': int(message)}
And it should return a dictionary in the format:
return {'content': processed_message}
The processed_message
is the message delivered to handler
.
In the example above, message is supposed to be an integer and will be delivered as integer too.
The message
parameter in def translate
is always a string object.
If processed_message
is None
the message will be ignored and not
acknowledged.
Unhandled errors also makes the message to be ignored, but it’s a good practice to handle those errors.
Handlers¶
Handlers are callables that receives the message.
It could be defined via LOAFER_DEFAULT_ROUTE_HANDLER
setting.
The handler
signature depends on the Message Translators implementation.
Usually, it will be similar to this:
async def my_handler(message):
... code ...
Where message
will contain some expected message format.
The async def
is the python coroutine syntax, but regular functions
can also be used, but will run in a thread, outside the event loop.
The handler
does not need to return anything, but any unhandled error
will cause the message to be ignored.
Note that, if the handler
silently failed, for example, if you do:
async def my_handler(message):
try:
... some code ...
except Exception:
pass
This will cause the message to be always acknowledged.
You can specify the message to be ignored by explicity raising IgnoreMessage
,
then the message will not be deleted, see Exceptions and error handling for details.
Successfull executions of handler
will acknowledge (delete) the message.
Publisher¶
The publisher is only used for testing purposes, so we can have an easy way to validate some message translation process or handler execution.
It is responsible to publish a message in a consumer source (queue).
It can be usefull to emulates the behavior of a given handler without the real message producer.
At the moment we provide publishers for Amazon SQS and SNS. Check the Usage for examples.
Development¶
Development Installation¶
Development install¶
After forking or checking out:
$ cd loafer/
$ python setup.py develop
$ pip install -r requirements/local.txt
$ pip install -r requirements/test.txt
$ pre-commit install
The requirements folder are only used for development, so we can easily install/track dependencies required to run the tests using continuous integration platforms.
The official entrypoint for distritubution is the setup.py
which also
contains the minimum requirements to execute the tests.
It’s important to execute setup.py develop
not only to install the main
dependencies, but also to include loafer
CLI in our environment.
Running tests:
$ make test
Generating documentation:
$ cd docs/
$ make html
To configure AWS access, check boto3 configuration or export (see boto3 envvars):
$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1 # for example
Check the Settings section to see specific configurations.
Release¶
To release a new version, a few steps are required:
- Update version number at
loafer/__init__.py
anddocs/source/conf.py
- Add entry to
CHANGES.rst
and documentation - Review changes in test requirements (
requirements/test.txt
andsetup.py
) - Test with
python setup.py test
andmake test-cov
- Test build with
make dist
- Commit changes
- Release with
make release
Other¶
FAQ¶
1. How do I run I/O blocking code that’s not a coroutine ?
Any code that is blocking and not a coroutine could run in a separate thread.
It’s not recommended, but it looks like this:
import asyncio loop = asyncio.get_event_loop() loop.run_in_executor(None, your_callable, your_callable_args) # Important: do not close/stop the loop
Changelog¶
0.0.3 (2016-04-24)¶
- Improve documentation
- Improve package metadata and dependencies
- Add loafer.aws.message_translator.SNSMessageTranslator class
- Fix ImportError exceptions for configuration that uses loafer.utils.import_callable
0.0.2 (2016-04-18)¶
- Fix build hardcoding tests dependencies
0.0.1 (2016-04-18)¶
- Initial release