Welcome to Loafer’s documentation!

Loafer is an asynchronous message dispatcher for concurrent tasks processing.

Quickstart

Installation

Requirements

Python 3.5+

Note:

Some packages also needs python3.5-dev package (ubuntu) or similar

To install via pip:

$ pip install loafer

Basic configuration

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

There are more two environment variables you’ll need to set:

$ export LOAFER_DEFAULT_ROUTE_SOURCE=my-queue
$ export LOAFER_DEFAULT_ROUTE_HANDLER=loafer.example.jobs.async_example_job

The LOAFER_DEFAULT_ROUTE_SOURCE is the name of SQS queue you’ll consume messages from.

The LOAFER_DEFAULT_ROUTE_HANDLER is a full path of the callable that will receive the message.

The value set by default in handler configuration is loafer.example.jobs.async_example_job and it will only log the received message with warning level.

Usage

Loafer provides a simple CLI interface.

Check the full list of options by running loafer --help.

Examples

  • Starting loafer:

    $ loafer
    
  • Add source and handler parameters (all parameters are optional):

    $ loafer --source my-queue-name --handler loafer.example.jobs.async_example_job
    
  • Publishing a message to SNS/SQS (you should see some logging messages):

    # To SQS
    $ loafer publish_sqs --queue a-queue-name --msg 'foobar'
    
    # To SNS (considering, the queue we consume are subscribed to topic)
    $ loafer publish_sns --topic a-topic-name --msg 'foobar'
    

User Guide

Overview

Loafer is an asynchronous message dispatcher for concurrent tasks processing.

To take full advantage, your tasks:

  1. Should use asyncio
  2. Should be I/O bounded
  3. Should be decoupled from the message producer

If your task are CPU bounded, you should look for projects that use multiprocessing python module or similar.

We don’t require the use of asyncio, but any code that’s not a coroutine will run in thread, outside the event loop. Performance and error handling might be compromised in this scenarios, you might want to look for other alternatives.

If your code are too tied to the message producer, you might end writing too much boilerplate code in order to use Loafer.

Components

The main components inside Loafer are:

  • Manager

The manager is responsible to setup the event event loop and handle system errors.

It prepares everything needed to run and starts the dispatcher.

  • Dispatcher

The dispatcher starts the consumers, schedules the message routing and message acknowledgment.

  • Consumer

The consumer is responsible for retrieving messages and delete it when requested.

The act is deleting a message is also known as message acknowledgment.

At the moment, we only have consumer for AWS SQS service.

  • Message Translator

The message translator is the contract between consumer and handler.

In the future it will also help the dispatcher choose a message destination handler.

At the moment, the message translator receives the “raw message” and transform it to an appropriate format that is expected by the handler.

  • Route

The route is the link between the consumer and handler. It is responsible to deliver the message to handler and receive its confirmation.

  • Handler

Handler or task/job, the callable that will receive the message.

The message lifecycle

A simplified view of a message lifecycle is illustrated below:

_images/sequence_loafer.png

Settings

All configuration is done using environment variables.

You can also create an .env or *.cfg file in your project root, they will be loaded automatically.

Here is a sample .env file:

LOAFER_DEFAULT_ROUTE_SOURCE=my-queue-name
LOAFER_DEFAULT_ROUTE_HANDLER=loafer.example.jobs.async_example_job

All the possible configuration keys and its default values are listed below (entries without default values are marked as required):

Settings table
Key Default value
LOAFER_LOG_FORMAT ‘%(asctime)s - %(name)s - %(levelname)s - %(message)s’
LOAFER_MAX_JOBS 10
LOAFER_MAX_THREAD_POOL None
LOAFER_DEFAULT_MESSAGE_TRANSLATOR_CLASS ‘loafer.aws.message_translator.SQSMessageTranslator’
LOAFER_DEFAULT_ROUTE_NAME ‘default’
LOAFER_DEFAULT_ROUTE_SOURCE required
LOAFER_DEFAULT_ROUTE_HANDLER ‘loafer.example.jobs.async_example_job’
LOAFER_DEFAULT_CONSUMER_CLASS ‘loafer.aws.consumer.Consumer’
LOAFER_DEFAULT_CONSUMER_OPTIONS {‘WaitTimeSeconds: 5, ‘MaxNumberOfMessages’: 5}

The LOAFER_MAX_JOBS is the number of concurrent handler executions.

The LOAFER_MAX_THREAD_POOL if not set, are determined automatically by the number of cores in the machine. Threads are used to execute non-asyncio code.

All variables that requires a class or callable must be a full name, e.g., we must be able to import it.

AWS

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

Exceptions and error handling

All exceptions are defined at loafer.exceptions module.

A description of all the exceptions:

  • ConsumerError: a fatal error from a consumer instance. Loafer will stop all operations and shutdown.
  • ConfigurationError: a configuration error. Loafer will stop all operations and shutdown.
  • IgnoreMessage: if the handler raises this exception, the message will be ignored and not acknowledged.
  • RejectMessage: if the handler raises this exception, the message will be rejected and acknowledged (e.g., the message will be deleted).

At the message translation phase

If the message translator returns an invalid message content or an unhandled exception:

  1. The message will not be delivered to handler
  2. The message will be ignored (e.g., will not be deleted)

At the handler processing phase

The handler can use IgnoreMessage or RejectMessage to explicity determine how the message will be handled.

Any unhandled exceptions will have the same effect of IgnoreMessage, but is advised to handler treat its own errors in case of change of behavior in the future.

Message deletion

The message will be deleted (acknowledged):

  1. When the handler raises RejectMessage
  2. Succesfull executions of handler

Message Translators

The message translator receives a “raw message” and process it to a suitable format expected by the handler.

The “raw message” is the message received by the consumer “as-is”.

It could be defined via LOAFER_DEFAULT_MESSAGE_TRANSLATOR_CLASS setting.

Implementation

The message translator class should implement the method translate like:

class MyMessageTranslator(object):

    def translate(self, message):
        return {'content': int(message)}

And it should return a dictionary in the format:

return {'content': processed_message}

The processed_message is the message delivered to handler.

In the example above, message is supposed to be an integer and will be delivered as integer too.

The message parameter in def translate is always a string object.

If processed_message is None the message will be ignored and not acknowledged.

Unhandled errors also makes the message to be ignored, but it’s a good practice to handle those errors.

Handlers

Handlers are callables that receives the message.

It could be defined via LOAFER_DEFAULT_ROUTE_HANDLER setting.

The handler signature depends on the Message Translators implementation.

Usually, it will be similar to this:

async def my_handler(message):
    ... code ...

Where message will contain some expected message format.

The async def is the python coroutine syntax, but regular functions can also be used, but will run in a thread, outside the event loop.

The handler does not need to return anything, but any unhandled error will cause the message to be ignored.

Note that, if the handler silently failed, for example, if you do:

async def my_handler(message):
    try:
        ... some code ...
    except Exception:
        pass

This will cause the message to be always acknowledged.

You can specify the message to be ignored by explicity raising IgnoreMessage, then the message will not be deleted, see Exceptions and error handling for details.

Successfull executions of handler will acknowledge (delete) the message.

Publisher

The publisher is only used for testing purposes, so we can have an easy way to validate some message translation process or handler execution.

It is responsible to publish a message in a consumer source (queue).

It can be usefull to emulates the behavior of a given handler without the real message producer.

At the moment we provide publishers for Amazon SQS and SNS. Check the Usage for examples.

Development

Development Installation

Requirements

Python 3.5+

Note:

Some packages also needs python3.5-dev package (ubuntu) or similar

Development install

After forking or checking out:

$ cd loafer/
$ python setup.py develop
$ pip install -r requirements/local.txt
$ pip install -r requirements/test.txt
$ pre-commit install

The requirements folder are only used for development, so we can easily install/track dependencies required to run the tests using continuous integration platforms.

The official entrypoint for distritubution is the setup.py which also contains the minimum requirements to execute the tests.

It’s important to execute setup.py develop not only to install the main dependencies, but also to include loafer CLI in our environment.

Running tests:

$ make test

Generating documentation:

$ cd docs/
$ make html

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

Check the Settings section to see specific configurations.

Release

To release a new version, a few steps are required:

  • Update version number at loafer/__init__.py and docs/source/conf.py
  • Add entry to CHANGES.rst and documentation
  • Review changes in test requirements (requirements/test.txt and setup.py)
  • Test with python setup.py test and make test-cov
  • Test build with make dist
  • Commit changes
  • Release with make release

Other

FAQ

1. How do I run I/O blocking code that’s not a coroutine ?

Any code that is blocking and not a coroutine could run in a separate thread.

It’s not recommended, but it looks like this:

import asyncio
loop = asyncio.get_event_loop()
loop.run_in_executor(None, your_callable, your_callable_args)
# Important: do not close/stop the loop

Changelog

0.0.3 (2016-04-24)

  • Improve documentation
  • Improve package metadata and dependencies
  • Add loafer.aws.message_translator.SNSMessageTranslator class
  • Fix ImportError exceptions for configuration that uses loafer.utils.import_callable

0.0.2 (2016-04-18)

  • Fix build hardcoding tests dependencies

0.0.1 (2016-04-18)

  • Initial release