Welcome to Loafer’s documentation!

Loafer is an asynchronous message dispatcher for concurrent tasks processing.

Quickstart

Installation

Requirements

Python 3.6+

Note:

Some packages also needs python3.5-dev package (ubuntu) or similar

To install via pip:

$ pip install loafer

Basic configuration

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

You might find some configuration tips in our FAQ as well.

Tutorial

For this tutorial we assume you already have loafer installed and your aws credentials configured.

Let’s create a repository named foobar with the following structure:

foobar/
    handlers.py
    routes.py
    __main__.py
    __init__.py  # empty file

The handlers.py:

import asyncio

async def print_handler(message, *args):
    print('message is {}'.format(message))
    print('args is {}'.format(args))

    # mimic IO processing
    await asyncio.sleep(0.1)
    return True


async def error_handler(exc_info, message):
    print('exception {} received'.format(exc_info))
    # do not delete the message that originated the error
    return  False

The routes.py:

from loafer.ext.aws.routes import SQSRoute
from .handlers import print_handler, error_handler

# assuming a queue named "loafer-test"
routes = (
    SQSRoute('loafer-test', {'options': {'WaitTimeSeconds': 3}},
             handler=print_handler,
             error_handler=error_handler),
)

The __main__.py:

from loafer.managers import LoaferManager
from .routes import routes

manager = LoaferManager(routes=routes)
manager.run()

To execute:

$ python -m foobar

To see any output, publish some messages using AWS dashboard or utilities like awscli.

For example:

$ aws sqs send-message --queue-url http://<url-part>/loafer-test --message-body '{"key": true}'

User Guide

Overview

Loafer is an asynchronous message dispatcher for concurrent tasks processing.

To take full advantage, your tasks:

  1. Should use asyncio
  2. Should be I/O bounded
  3. Should be decoupled from the message producer

If your task are CPU bounded, you should look for projects that use multiprocessing python module or similar.

We don’t require the use of asyncio, but any code that’s not a coroutine will run in thread, outside the event loop. Performance and error handling might be compromised in this scenarios, you might want to look for other alternatives.

If your code are too tied to the message producer, you might end up writing too much boilerplate code in order to use Loafer.

Components

The main components inside Loafer are:

  • Manager

The manager is responsible to setup the event event loop and handle system errors.

It prepares everything needed to run and starts the dispatcher.

  • Dispatcher

The dispatcher starts the providers, schedules the message routing and message acknowledgment.

  • Provider

The provider is responsible for retrieving messages and delete it when requested.

The act is deleting a message is also known as message acknowledgment.

At the moment, we only have provider for AWS SQS service.

  • Message Translator

The message translator is the contract between provider and handler.

At the moment, the message translator receives the “raw message” and transform it to an appropriate format that is expected by the handler.

It may also add metadata information, if available.

  • Route

The route is the link between the provider and handler. It is responsible to deliver the message to handler and receive its confirmation.

  • Handler

Handler or task/job, the callable that will receive the message.

Error Handler

The optional callback to handle any errors in message translation or in the handler processing.

The message lifecycle

A simplified view of a message lifecycle is illustrated below:

_images/sequence_loafer.png

Settings

AWS

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

You might find some configuration tips in our FAQ as well.

Exceptions

All exceptions are defined at loafer.exceptions module.

A description of all the exceptions:

  • ProviderError: a fatal error from a provider instance. Loafer will stop all operations and shutdown.
  • ConfigurationError: a configuration error. Loafer will stop all operations and shutdown.
  • DeleteMessage: if any Handlers raises this exception, the message will be rejected and acknowledged (the message will be deleted).
  • LoaferException: the base exception for DeleteMessage.

Handlers

Handlers are callables that receives the message.

The handler signature should be similar to:

async def my_handler(message, metadata):
    ... code ...
    return True # or False

Where message is the message to be processed and metadata is a dict with metadata information.

The async def is the python coroutine syntax, but regular functions can also be used, but will run in a thread, outside the event loop.

The return value indicates if the handler successfully processed the message or not. By returning True the message will be acknowledged (deleted).

Another way to acknowledge messages inside a handler is to raise DeleteMessage exception.

Any other exception will be redirected to an error_handler, see more Error Handlers.

The default error_handler will log the error and not acknowledge the message.

For some generic handlers that can give you a starting point, take a look at Generic Handlers section.

Class-based handlers

You can also write handlers using classes. The class should implement a handle coroutine/method:

class MyHandler:

    async def handle(self, message, *args):
        ... code ...
        return True

     def stop(self):
        ... clean-up code ...

The method stop is optional and will be called before loafer shutdown it’s execution. Note that stop is not a coroutine.

When configuring your Routes, you can set handler to an instance of MyHandler instead of the handle (the callable) method (but both ways work):

Route(handler=MyHandler(), ...)
# or
Route(handler=MyHandler().handle)

Message dependency

Handlers are supposed to be stateless or have limited dependency on message values. Since the same handler instance object are used to process the incoming messages, we can’t guarantee that an attached value will be kept among several concurrent calls to the same handler.

This might be hard to detect in production and probably is an undesired side-effect:

class Handler:

    async def foo(self):
        # do something with `self.some_value`
        print(self.some_value)
        ... code ...

    async def handle(self, message, *args):
        self.some_value = message['foo']
        await self.foo()
        return True

Error Handlers

Every Route implements a coroutine error_handler that can be used as an error hook. This hook are called when unhandled exceptions happen and by default it will only log the error and not acknowledge the message.

Route accepts a callable parameter, error_handler, so you can pass a custom function or coroutine to replace the default error handler behavior.

The callable should be similar to:

async def custom_error_handler(exc_info, message):
    ... custom code here ...
    return True

 # Route(..., error_handler=custom_error_handler)

The return value determines if the message that originated the error will be acknowledged or not. True means acknowledge it, False will only ignore the message (default behavior).

Sentry

To integrate with sentry you will need the sdk client and your account DSN.

Then you can automatically create an error_handler with the following code:

from loafer.ext.sentry import sentry_handler
from sentry_sdk import init, capture_message

init(...)
error_handler = sentry_handler(capture_message, delete_message=True)

The optional delete_message parameter controls the message acknowledgement after the error report. By default, delete_message is False.

The error_handler defined can be set on any Route instance.

Generic Handlers

Here are some handlers that are easy to extend or use directly.

loafer.ext.aws.handlers.SQSHandler

A handler that publishes messages to a SQS queue.

For instance, if we just want just redirect a message to a queue named “my-queue”:

# in your route definition
from loafer.ext.aws.handlers import SQSHandler

Route(handler=SQSHandler('my-queue'), ...)

The handler assumes a message that could be json encoded (usually a python dict instance).

You can customize this handler by subclassing it:

class MyHandler(SQSHandler):
    queue_name = 'my-queue'

    async def handle(self, message, *args):
        text = message['text']
        return await self.publish(text, encoder=None)

In the example above, we are disabling the message encoding by passing None to the publish coroutine.

The encoder parameter should be a callable that receives the message to be encoded. By default, it assumes json.dumps.

Take a note how queue_name was set. You can configure it when instantiate the handler or set the class attribute queue_name, both are valid and the attribute is mandatory. You can also use the queue URL directly, if you prefer.

loafer.ext.aws.handlers.SNSHandler

A handler that publishes messages to a SNS topic.

This handler is very similar to SQSHandler, so going to a similar example:

from loafer.ext.aws.handlers import SNSHandler

class MyHandler(SNSHandler):
    topic = 'my-topic'

    async def handle(self, message, *args):
        text = message['text']
        return await self.publish(text, encoder=None)

The handler also provides a publish coroutine with an encoder parameter that works in the same way as with SQSHandler, except it will publish in a SNS topic instead a queue.

The SNSHandler also assumes a message that could be json encoded and the encoder default to json.dumps.

The topic is also mandatory and should be configured in the class definition or when creating the handler instance.

You can set either the topic name or the topic arn (recommended), but when using the topic name the handler will use wildcards to match the topic arn.

More details about SNS wildcards are available in their documentation.

Message Translators

The message translator receives a “raw message” and process it to a suitable format expected by the handler.

The “raw message” is the message received by the provider “as-is” and it might be delivered without any processing if the message translator was not set.

In some cases, you should explicitly set message_translator=None to disable any configured translators.

Implementation

The message translator class should subclass AbstractMessageTranslator and implement the translate method like:

from loafer.message_translators import AbstractMessageTranslator


class MyMessageTranslator(AbstractMessageTranslator):

    def translate(self, message):
        return {'content': int(message), 'metadata': {}}

And it should return a dictionary in the format:

{'content': processed_message, 'metadata': {}}

The processed_message and metadata (optional) will be delivered to handler.

If processed_message is None (or empty) the message will cause ValueError exception.

All the exceptions in message translation will be caught by the configured Error Handlers.

The existing message translators are described below.

loafer.message_translators.StringMessageTranslator

A message translator that translates the given message to a string (python str).

loafer.ext.aws.message_translators.SQSMessageTranslator

A message translator that translates SQS messages. The expected message body is a json payload that will be decoded with json.loads.

All the keys will be kept in metadata key dict (except Body that was previously translated).

loafer.ext.aws.message_translators.SNSMessageTranslator

A message translator that translates SQS messages that came from SNS topic. The expected notification message is a json payload that will be decoded with json.loads.

SNS notifications wraps (and encodes) the message inside the body of a SQS message, so the SQSMessageTranslator will fail to properly translate those messages (or at least, fail to translate to the expected format).

All the keys will be kept in metadata key dict (except Body).

For more details about message translators usage, check the Routes examples.

Providers

Providers are responsible to retrieve messages and acknowledge it (delete from source).

SQSProvider

SQSProvider (located at loafer.ext.aws.providers) receives the following options:

  • queue_name: the queue name
  • options: (optional): a dict with SQS options to retrieve messages. Example: {'WaitTimeSeconds: 5, 'MaxNumberOfMessages': 5}

Also, you might override any of the parameters below from boto library (all optional):

  • api_version
  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token
  • endpoint_url
  • region_name
  • use_ssl
  • verify

Check boto3 client documentation for detailed information.

Usually, the provider are not configured manually, but set by Routes and it’s helper classes.

Routes

A Route aggregate all the main entities previously described, the generic parameters are:

  • provider: a provider instance
  • handler: a handler instance
  • error_handler (optional): an error handler instance
  • message_translator (optional): a message translator instance
  • name (optional): a name for this route

We provide some helper routes, so you don’t need to setup all this boilerplate code:

  • loafer.ext.aws.routes.SQSRoute: a route that configures a loafer.ext.aws.providers.SQSProvider and loafer.ext.aws.message_translators.SQSMessageTranslator.

    A route for handlers that consume messages from SQS queue (expects json format messages).

  • loafer.ext.aws.routes.SNSQueueRoute: a route that configures a loafer.ext.aws.providers.SQSProvider and loafer.ext.aws.message_translators.SNSMessageTranslator.

    A route for handlers that consume messages from a SQS queue subscribed to a SNS topic (expects json format messages).

Examples

Some examples of route creation:

from loafer.ext.aws.routes import SQSRoute, SNSQueueRoute
from loafer.message_translators import StringMessageTranslator


# regular route
route1 = SQSRoute('my-queue1', handler=..., name='route1')

# route with custom SQSProvider parameters
route2 = SNSQueueRoute('my-queue2', {'use_ssl': False, 'options': {'WaitTimeSeconds': 4}}, handler=..., name='route2')

# route with custom message translator
route3 = SQSRoute('my-queue3', message_translator=StringMessageTranslator(), handler=...)

# route disabling message translation
route4 = SNSQueueRoute('my-queue4', message_translator=None, handler=...)

# route with custom error handler
route5 = SQSRoute('my-queue5', handler=..., error_handler=custom_error_handler)

Managers

Managers are responsible for the execution of all the components.

The LoaferManager (at loafer.managers) receives a list of Routes.

Every service/application using loafer should instantiate a manager:

from loafer.managers import LoaferManager
from .routes import routes  # the list of routes

manager = LoaferManager(routes=routes)
manager.run()

The default execution mode will run indefinitely. To run only one iteration of your services, the last line in the code above should be replaced with:

manager.run(forever=False)

The “one iteration” could be a little tricky. For example, if you have one provider that fetches two messages at time, it means your handler will be called twice (one for each message) and then stop.

Development

Development Installation

Requirements

Python 3.6+

Note:

Some packages also needs python3.5-dev package (ubuntu) or similar

Development install

After forking or checking out:

$ cd loafer/
$ pip install -r requirements/local.txt
$ pre-commit install
$ pip install -e .

The requirements folder are only used for development, so we can easily install/track dependencies required to run the tests using continuous integration platforms.

The official entrypoint for distritubution is the setup.py which also contains the minimum requirements to execute the tests.

It’s important to execute pip install -e . not only to install the main dependencies, but also to include loafer in our environment.

Running tests:

$ make test

Generating documentation:

$ cd docs/
$ make html

To configure AWS access, check boto3 configuration or export (see boto3 envvars):

$ export AWS_ACCESS_KEY_ID=<key>
$ export AWS_SECRET_ACCESS_KEY=<secret>
$ export AWS_DEFAULT_REGION=sa-east-1  # for example

Check the Settings section to see specific configurations.

Release

To release a new version, a few steps are required:

  • Update version/release number in docs/source/conf.py
  • Add entry to CHANGES.rst and documentation
  • Review changes in test requirements (requirements/test.txt and setup.py)
  • Test with python setup.py test and make test-cov
  • Test build with make dist
  • Commit changes
  • Release with make release

Other

FAQ

1. How do I run I/O blocking code that’s not a coroutine ?

Any code that is blocking and not a coroutine could run in a separate thread.

It’s not recommended, but it looks like this:

import asyncio
loop = asyncio.get_event_loop()
loop.run_in_executor(None, your_callable, your_callable_args)
# Important: do not close/stop the loop

2. How to integrate with newrelic ?

The newrelic should be the primary source of information. One alternative is to use environment variables NEW_RELIC_LICENSE_KEY and NEW_RELIC_APP_NAME and for every handler:

import newrelic.agent

@newrelic.agent.background_task()
def some_code(...):
    ...

3. Using different regions/credentials with SQSRoute/SNSRoute ?

SQSRoute/SNSRoute instantiates loafer.ext.aws.providers.SQSProvider, therefore you can dinamically set these options to any of Providers available.

An example with explicity AWS credentials and provider options would look like:

from loafer.ext.aws.routes import SQSRoute

route = SQSRoute(
    'test-queue-name', name='my-route', handler=some_handler,
    provider_options={
        'aws_access_key_id': my_aws_access_key,
        'aws_secret_access_key': my_secret_key,
        'region_name': 'sa-east-1',
        'options': {'WaitTimeSeconds': 3},
    },
)

Changelog

2.0.0 (2020-05-16)

  • Dropped support for Python 3.5
  • Added support for Python 3.8
  • Update aiobotocore dependency version (#61 by @GuilhermeVBeira)
  • Improvements due to changes in asyncio (#48, #52 by @lamenezes)
  • Sentry wrapper/helper updated to support new sdk (wip)
  • Minor documentation improvements

1.3.2 (2019-04-27)

  • Improve message processing (#48 by @lamenezes)
  • Improve error logging (#39 by @wiliamsouza)
  • Refactor in message dispatcher and event-loop shutdown
  • Minor fixes and improvements

1.3.1 (2017-10-22)

  • Improve performance (#35 by @allisson)
  • Fix requirement versions resolution
  • Minor fixes and improvements

1.3.0 (2017-09-26)

  • Refactor tasks dispatching, it should improve performance
  • Refactor SQSProvider to ignore HTTP 404 errors when deleting messages
  • Minor fixes and improvements

1.2.1 (2017-09-11)

  • Bump boto3 version (by @daneoshiga)

1.2.0 (2017-08-15)

  • Enable provider parameters (boto client options)

1.1.1 (2017-06-14)

  • Bugfix: fix SNS prefix value in use for topic name wildcard (by @lamenezes)

1.1.0 (2017-05-01)

  • Added initial contracsts for class-based handlers
  • Added generic handlers: SQSHandler/SNSHander
  • Improve internal error handling
  • Improve docs

1.0.2 (2017-04-13)

  • Fix sentry error handler integration

1.0.1 (2017-04-09)

  • Add tox and execute tests for py36
  • Update aiohttp/aiobotocore versions
  • Minor fixes and enhancements

1.0.0 (2017-03-27)

  • Major code rewrite
  • Remove CLI
  • Add better support for error handlers, including sentry/raven
  • Refactor exceptions
  • Add message metadata information
  • Update message lifecycle with handler/error handler return value
  • Enable execution of one service iteration (by default, it still runs “forever”)

0.0.3 (2016-04-24)

  • Improve documentation
  • Improve package metadata and dependencies
  • Add loafer.aws.message_translator.SNSMessageTranslator class
  • Fix ImportError exceptions for configuration that uses loafer.utils.import_callable

0.0.2 (2016-04-18)

  • Fix build hardcoding tests dependencies

0.0.1 (2016-04-18)

  • Initial release

Contributors

Thanks to: