Welcome to Loafer’s documentation!

Loafer is an asynchronous message dispatcher for concurrent tasks processing.

Quickstart

Installation

Requirements

Python 3.5+

Note:

Some packages also needs python3.5-dev package (ubuntu) or similar

To install via pip:

$ pip install loafer

Basic configuration

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

Tutorial

For this tutorial we assume you already have loafer installed and your aws credentials configured.

Let’s create a repository named foobar with the following structure:

foobar/
    handlers.py
    routes.py
    __main__.py
    __init__.py  # empty file

The handlers.py:

import asyncio

async def print_handler(message, *args):
    print('message is {}'.format(message))
    print('args is {}'.format(args))

    # mimic IO processing
    await asyncio.sleep(0.1)
    return True


async def error_handler(exc_type, exc, message):
    print('exception {} received'.format(exc_type))
    # do not delete the message that originated the error
    return  False

The routes.py:

from loafer.ext.aws.routes import SQSRoute
from .handlers import print_handler, error_handler

# assuming a queue named "loafer-test"
routes = (
    SQSRoute('loafer-test', {'options': {'WaitTimeSeconds': 3}},
             handler=print_handler,
             error_handler=error_handler),
)

The __main__.py:

from loafer.managers import LoaferManager
from .routes import routes

manager = LoaferManager(routes=routes)
manager.run()

To execute:

$ python -m foobar

To see any output, publish some messages using AWS dashboard or utilities like awscli.

For example:

$ aws sqs send-message --queue-url http://<url-part>/loafer-test --message-body '{"key": true}'

User Guide

Overview

Loafer is an asynchronous message dispatcher for concurrent tasks processing.

To take full advantage, your tasks:

  1. Should use asyncio
  2. Should be I/O bounded
  3. Should be decoupled from the message producer

If your task are CPU bounded, you should look for projects that use multiprocessing python module or similar.

We don’t require the use of asyncio, but any code that’s not a coroutine will run in thread, outside the event loop. Performance and error handling might be compromised in this scenarios, you might want to look for other alternatives.

If your code are too tied to the message producer, you might end up writing too much boilerplate code in order to use Loafer.

Components

The main components inside Loafer are:

  • Manager

The manager is responsible to setup the event event loop and handle system errors.

It prepares everything needed to run and starts the dispatcher.

  • Dispatcher

The dispatcher starts the providers, schedules the message routing and message acknowledgment.

  • Provider

The provider is responsible for retrieving messages and delete it when requested.

The act is deleting a message is also known as message acknowledgment.

At the moment, we only have provider for AWS SQS service.

  • Message Translator

The message translator is the contract between provider and handler.

At the moment, the message translator receives the “raw message” and transform it to an appropriate format that is expected by the handler.

It may also add metadata information, if available.

  • Route

The route is the link between the provider and handler. It is responsible to deliver the message to handler and receive its confirmation.

  • Handler

Handler or task/job, the callable that will receive the message.

Error Handler

The optional callback to handle any errors in message translation or in the handler processing.

The message lifecycle

A simplified view of a message lifecycle is illustrated below:

_images/sequence_loafer.png

Settings

AWS

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

Exceptions

All exceptions are defined at loafer.exceptions module.

A description of all the exceptions:

  • ProviderError: a fatal error from a provider instance. Loafer will stop all operations and shutdown.
  • ConfigurationError: a configuration error. Loafer will stop all operations and shutdown.
  • DeleteMessage: if any Handlers raises this exception, the message will be rejected and acknowledged (the message will be deleted).
  • LoaferException: the base exception for DeleteMessage.

Handlers

Handlers are callables that receives the message.

The handler signature should be similar to:

async def my_handler(message, metadata):
    ... code ...
    return True # or False

Where message is the message to be processed and metadata is a dict with metadata information.

The async def is the python coroutine syntax, but regular functions can also be used, but will run in a thread, outside the event loop.

The return value indicates if the handler successfully processed the message or not. By returning True the message will be acknowledged (deleted).

Another way to acknowledge messages inside a handler is to raise DeleteMessage exception.

Any other exception will be redirected to an error_handler, see Error Handlers.

The default error_handler will log the error and not acknowledge the message.

Error Handlers

Every Route implements a coroutine error_handler that can be used as an error hook. This hook are called when unhandled exceptions happen and by default it will only log the error and not acknowledge the message.

Route accepts a callable parameter, error_handler, so you can pass a custom function or coroutine to replace the default error handler behavior.

The callable should be similar to:

async def custom_error_handler(exc_type, exc, message):
    ... custom code here ...
    return True

 # Route(..., error_handler=custom_error_handler)

The return value determines if the message that originated the error will be acknowledged or not. True means acknowledge it, False will only ignore the message (default behavior).

Sentry

To integrate with sentry you will need the raven client and your account DSN.

Then you can automatically create an error_handler with the following code:

from loafer.ext.sentry import sentry_handler
from raven import Client

client = Client(dsn=..., **options)
error_handler = sentry_handler(client, delete_message=True)

The optional delete_message parameter controls the message acknowledgement after the error report. By default, delete_message is False.

The error_handler defined can be set on any Route instance.

Message Translators

The message translator receives a “raw message” and process it to a suitable format expected by the handler.

The “raw message” is the message received by the provider “as-is” and it might be delivered without any processing if the message translator was not set. In some cases, you should explicitly set message_translator=None to disable any configured translators.

Implementation

The message translator class should implement the method translate like:

class MyMessageTranslator:

    def translate(self, message):
        return {'content': int(message), 'metadata': {}}

And it should return a dictionary in the format:

return {'content': processed_message, 'metadata': {}}

The processed_message and metadata (optional) will be delivered to handler.

If processed_message is None (or empty) the message will cause ValueError exception.

All the exceptions in message translation will be caught by the configured Error Handlers.

Providers

Providers are responsible to retrieve messages and acknowledge it (delete from source).

SQSProvider

SQSProvider (located at loafer.ext.aws.providers) receives the following options:

  • queue_name: the queue name
  • endpoint_url (optional): the base url, only usefull if you don’t use AWS
  • use_ssl (optional, default=True): SSL usage
  • options: (optional): a dict with SQS options to retrieve messages. Example: {'WaitTimeSeconds: 5, 'MaxNumberOfMessages': 5}

Usually, the provider are not configured manually, but set by Routes and it’s helper classes.

Routes

A Route aggregate all the main entities, the generic parameters are:

  • provider: a provider instance
  • handler: a handler instance
  • error_handler (optional): an error handler instance
  • message_translator (optional): a message translator instance
  • name (optional): a name for this route

We provide some helper routes, so you don’t need to setup all this boilerplate code:

  • loafer.ext.aws.routes.SQSRoute: a route that configures a loafer.ext.aws.providers.SQSProvider and loafer.ext.aws.message_translators.SQSMessageTranslator. A route for handlers that consume messages from SQS queue (expects json format messages).
  • loafer.ext.aws.routes.SNSQueueRoute: a route that configures a loafer.ext.aws.providers.SQSProvider and loafer.ext.aws.message_translators.SNSMessageTranslator. A route for handlers that consume messages from a SQS queue subscribed to a SNS topic (expects json format messages).

Examples

Some examples of route creation:

from loafer.ext.aws.routes import SQSRoute, SNSQueueRoute

# regular route
route1 = SQSRoute('my-queue1', handler=..., name='route1')

# route with custom SQSProvider parameters
route2 = SNSQueueRoute('my-queue2', {'use_ssl': False, 'options': {'WaitTimeSeconds': 4}}, handler=..., name='route2')

# route with custom message translator
route3 = SQSRoute('my-queue3', message_translator=custom_message_translator, handler=...)

# route with custom error handler
route4 = SNSQueueRoute('my-queue4', handler=..., error_handler=custom_error_handler)

Managers

Managers are responsible for the execution of all the components.

The LoaferManager (at loafer.managers) receives a list of Routes.

Every service/application using loafer should instantiate a manager:

from loafer.managers import LoaferManager
from .routes import routes  # the list of routes

manager = LoaferManager(routes=routes)
manager.run()

The default execution mode will run indefinitely. To run only one iteration of your services, the last line in the code above should be replaced with:

manager.run(forever=False)

The “one iteration” could be a little tricky. For example, if you have one provider that fetches two messages at time, it means your handler will be called twice (one for each message) and then stop.

Publisher

The publisher is only used for testing purposes, so we can have an easy way to validate some message translation process or handler execution.

It is responsible to publish a message in a provider source (queue).

It can be usefull to emulates the behavior of a given handler without the real message producer.

Development

Development Installation

Requirements

Python 3.5+

Note:

Some packages also needs python3.5-dev package (ubuntu) or similar

Development install

After forking or checking out:

$ cd loafer/
$ python setup.py develop
$ pip install -r requirements/local.txt
$ pip install -r requirements/test.txt
$ pre-commit install

The requirements folder are only used for development, so we can easily install/track dependencies required to run the tests using continuous integration platforms.

The official entrypoint for distritubution is the setup.py which also contains the minimum requirements to execute the tests.

It’s important to execute setup.py develop not only to install the main dependencies, but also to include loafer CLI in our environment.

Running tests:

$ make test

Generating documentation:

$ cd docs/
$ make html

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

Check the Settings section to see specific configurations.

Release

To release a new version, a few steps are required:

  • Update version/release number in docs/source/conf.py
  • Add entry to CHANGES.rst and documentation
  • Review changes in test requirements (requirements/test.txt and setup.py)
  • Test with python setup.py test and make test-cov
  • Test build with make dist
  • Commit changes
  • Release with make release

Other

FAQ

1. How do I run I/O blocking code that’s not a coroutine ?

Any code that is blocking and not a coroutine could run in a separate thread.

It’s not recommended, but it looks like this:

import asyncio
loop = asyncio.get_event_loop()
loop.run_in_executor(None, your_callable, your_callable_args)
# Important: do not close/stop the loop

2. How to integrate with newrelic ?

The newrelic should be the primary source of information. One alternative is to use environment variables NEW_RELIC_LICENSE_KEY and NEW_RELIC_APP_NAME and for every handler:

import newrelic.agent

@newrelic.agent.background_task()
def some_code(...):
    ...

Changelog

1.0.0 (2017-03-27)

  • Major code rewrite
  • Remove CLI
  • Add better support for error handlers, including sentry/raven
  • Refactor exceptions
  • Add message metadata information
  • Update message lifecycle with handler/error handler return value
  • Enable execution of one service iteration (by default, it still runs “forever”)

0.0.3 (2016-04-24)

  • Improve documentation
  • Improve package metadata and dependencies
  • Add loafer.aws.message_translator.SNSMessageTranslator class
  • Fix ImportError exceptions for configuration that uses loafer.utils.import_callable

0.0.2 (2016-04-18)

  • Fix build hardcoding tests dependencies

0.0.1 (2016-04-18)

  • Initial release

Contributors

Thanks to: