Form an orderly queue

By rcwd, Fri 31 May 2019, in category Python

beanstalkd, rabbitmq, sqs

As a Brit, queueing is very close to my heart. It's something of a national pastime here in the UK - we'll queue up for anything!

As a software developer queues are equally important to me. The ability to dispatch work to be done at an arbitrary point in the future has been a key element in my web toolkit for years. Make no mistake, I'm not talking about micro-services or anything even remotely complex - simple tasks that take time to complete are handed off onto a queue where they're picked up by a runner and the work is done out of sync with the main user interaction making for a snappy user experience and a very clean codebase.

But before we get into the nitty-gritty...

What is a queue?

A queue is an abstract data type - an ordered collection of entities whereby new items are added at the back and items can be removed from the front. This makes a queue a First-In-First-Out data structure meaning the first item added to the queue will be the first to be removed. Simple right?

In terms of the components that make up a queue we need the following:

Queues have more properties that are required but we'll get into those a little later in this post.

Let's take a look at a queue modeled in Python:

Basic Queue in Code

#!/usr/bin/env python3

class BasicQueue:

    # Constructor for BasicQueue class
    def __init__(self):
        self.__queue = [] # Create our Queue as a private property

    # Add an item to the queue
    def put(self, item):
        self.__queue.append(item) # Add the item to the end

    # Get the next item from the queue
    def get(self):
        return self.__queue.pop(0) # Get and remove the first item


my_queue = BasicQueue() # Initialise our queue
my_queue.put('Foo') # Add an item
print(my_queue.get()) # Remove that item should print Foo

There you go - a minimal queue implemented in pure Python! The eagle-eyed amongst you will have spotted that this is pretty much just a Python List with a couple of methods, and you'd be spot on! A list provides the basis for a solid queue - it's ordered, we can append, we can pop the first element off the top.

This is super basic, and kind of useless in the real world. Firstly it's an object that would need to be passed around our code or injected. Only this code can access it. If the script crashes then the contents of the queue are lost, there's no error handling to deal with the queue being empty (try adding another my_queue.get() at the end of the main() method ... 💥), it only works with one item at a time, and it lacks any of the nice to have (but oh so useful) methods listed above. It's not even the best option for a queue that's only accessible from a single application.

Next in the queue of queues

If we did require a purely Python queue then we should almost certainly look at the Queue Module in the Python Standard Library. Not only does this module offer a much more robust implementation of the code above in the form of SimpleQueue, but it also provides a whole load of other options, including a Last-In-First-Out Queue (also known as a Stack) and a PriorityQueue.

Finally, the official Queue module is thread-aware so it can be used to add simple but very powerful concurrency to your applications. The threading element is beyond the scope of this post but I encourage you to explore the docs and examples above to get a handle on this built-in queueing system.

Networked / Shared Queues

Queues (or stacks) in your code are surprisingly useful - any situation where you need to process an unknown range of items, potentially with threaded or process-based concurrency, the built-in queue will serve you well. But, to really get up and queuing we need to think about making our queue accessible over some form of shared transport. Traditionally this means introducing a new queue server or service which is connected to our application over the network.

Even with the simplest network queues, we have just introduced a whole load of new requirements for our queuing application:

Technically these could also be applied to an in-app queue but using an external service means we don't have an easy way to inspect and review the queue contents so having extra control over job processing seems like a good idea.

Another huge benefit of a networked queue is the introduction of a programming language agnostic interchange - in other words, we can have jobs pushed in from PHP, or Java applications, processed by Python or whatever combination of languages makes sense in your stack. Provided they can talk to the queue and you can agree on a common message format there's no longer any need to be limited to a single language or version of a language.

BeanstalkD

My first proper queue service was based on BeanstalkD. Billed as a simple, fast, work queue it does one thing very well - it allows you to queue jobs. The creator, Keith Rarick, attributes Memcached as the inspiration for the protocol design which is super simple to work with. Of course, being a Python developer I'm not going to roll my own implementation of the protocol but would prefer to reach for a library. There are 4 Python libraries listed on the official BeanstalkD Client Libraries wiki page. We're going to look at the most modern (as in most recently updated) option for our demo: Greenstalk.

Before we can use Beanstalk we need a working server. Fortunately, it's simple to install on OSX or Linux (Windows users might want to consider a Linux VPS for their queue server). Once installed and running, we can connect to the queue using the Greenstalk library:

#!/usr/bin/env python

from __future__ import print_function
import greenstalk

def main():
    queue = greenstalk.Client(host='127.0.0.1', port=11300) # Connect to Beanstalkd

    queue.put('hello') # Add a job to the queue

    job = queue.reserve() # Get and lock the next job on the queue

    print(job.id) # Integer - should count up

    print(job.body) # Whatever we sent in above

    queue.delete(job) # Delete the job from the queue

if __name__ == '__main__':
    main()

There are a couple of things to note here. The first is the concept of reserving a job from the queue. This means that a job can only be processed by one worker at a time until it is either:

The second is that, except for time outs, we as the developer are responsible for managing the queue job's life cycle. We create it, and we have to ensure a job is correctly removed once it has been processed.

Jobs & Workers

In BeanstalkD's world, the body or payload of a job can be any arbitrary sequence of bytes. It doesn't care if you use JSON, XML, CSV or binary. In reality, jobs should be kept small! In the case of a work queue, this normally means some kind of method name and any supporting parameters.

or example if we're sending an email via a queue worker (a common task as email is slow!) we could create the whole message, add all the headers and serialize the whole thing onto the queue. But that's a lot of data to be pushed and pulled across the network.

Better would be to push a packet that looks more like this:

{
  "job": "sendMail",
  "args": {
    "mail_type": "welcome",
    "mail_to": "[email protected]"
  }
}

Our job worker can infer from that, all the info needed to produce the email body, correctly address it and send.

Most shared queue systems have the concept of Workers - code in your application that polls the queue from time to time waiting for new jobs and performs the job when one appears.

Really Important: BeanstalkD is, by default, an in-memory queue server which means, if the server crashes out or restarts, the contents of the queue are lost. To prevent this (if needed) be sure to run BeanstalkD with the Persistent flag. See this entry in the FAQ for more info.

RabbitMQ

BeanstalkD is all kinds of awesome and perfect for a lot of workloads but it is inherently a very simple server and doesn't concern itself with much beyond the basics of managing a work queue. For larger, more complex applications, developers might find themselves needing more functionality, cluster support, pub-sub style features and more.

While there are a load of alternatives in the queue space, the open-source market leader (by quite some margin) is a platform called RabbitMQ. Technically RabbitMQ is a message broker rather than a pure queue. It accepts and forwards messages offering the user a massive amount of flexibility and control over job routing. While the full feature set of RabbitMQ is beyond the scope of this post we can take a look at how we implement a basic work queue in Python.

Unlike BeanstalkD which uses a single protocol, RabbitMQ supports many messaging protocols. We are going to be using the default (and most common), the Advanced Message Queuing Protocol or AMQP version 0-9-1. Also, because we're Pythonistas we're going to use a library called Pika. Finally, you'll need RabbitMQ installed on your machine or access to a remote server. Assuming we have all of that configured let's look at a simple example.

In this example, we are going to create two small Python scripts. The first will send a message to the queue. The second will wait and listen for new messages and then display them:

send.py

#!/usr/bin/env python

from __future__ import print_function
import pika

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    channel = connection.channel()

    channel.queue_declare(queue='hello')

    channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

    print("Sent 'Hello World!'")

    connection.close()

if __name__ == '__main__':
    main()

receive.py

#!/usr/bin/env python

from __future__ import print_function
import pika

def callback(ch, method, properties, body):
    print("Received %r" % body)

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    channel = connection.channel()

    channel.queue_declare(queue='hello')

    channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

    print('Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()

if __name__ == '__main__':
    main()

A lot is going on in these scripts so let's try and break them up a little before we run them. Both scripts start by initializing a connection to our RabbitMQ server and then connect to a channel on that server (channels are logical groupings of messages).

Both scripts then declare the queue. In RabbitMQ you can send to a non-existent queue and the message will just vanish into the ether. So to be certain the queue exists it should be declared which will create the queue if needed … I know it's weird but roll with it.

Sender then publishes a message with three arguments, the exchange (a RabbitMQ concept that routes the data onto a queue), the routing key which must equal our queue name and a message body/payload.

Meanwhile, our receiver simply connects to the queue and is configured to call the callback method in our code when a new message is received.

Let's run this example now. Open two terminal windows. In one, run receiver.py and once it's connected run sender.py a few times:

We can see that the messages sent in are picked up by the receiver and displayed! To exit the receiver simply press Ctrl+C to break the loop.

There's a load more that RabbitMQ and the Pika library can do. For more examples check out the rather brilliant RabbitMQ intro series (you can skip part one as that's pretty much exactly what we did above).

AWS Simple Queue Service

What if you can't be bothered to run a whole separate server for your queue? I mean RabbitMQ brings all the features to the party but it's a pretty complex piece of software that needs to be run ideally on dedicated hardware, monitored, backed up and generally adds to your sysadmin workload.

Fortunately, in the modern world of cloud computing, it's possible to use a full scalable queue without any additional services. Simple Queue Service (SQS) is one of the oldest Amazon Web Services offerings, introduced in 2004, so we can rely on it to be stable, available and pretty much unchanging in terms of the interface.

Integrating SQS into our applications is not trivial. Not because SQS is inherently hard to work with, but because we need to handle the whole AWS IAM authentication step, deal with provisioning access keys and ensuring our users have permission to work with the queue (without giving away the farm). Fortunately, AWS is very good at producing step by step documentation for these processes so, assuming we can follow instructions, a lot of the pain is removed from this process. Rather than detail all of the steps here I'm going to reference them in a list below on the assumption that, once they're all complete, you will be able to work with SQS:

Once the above are complete you should be good to start interacting with AWS from Python scripts. This in itself is pretty epic as Boto3 doesn't just support SQS but gives you access to all of the toys in the AWS toy box!

Note: the credentials you currently have will likely be scoped to all services. For production use, you should probably look into creating a set of SQS specific IAM User credentials as detailed here.

Now we've done all that, let's take a look at actually using SQS from Python. Note, these examples are taken nearly wholesale from the official AWS Python Code Samples for Amazon SQS page.

As per RabbitMQ, we're going to need a sender and a receiver, and these will be separate scripts. Before these will work we need to create a queue - you can do this programmatically or via the web interface whichever is your preference.

SQS queues come in two main flavors - Standard and FIFO. Unless strict ordering is key to your application (most of the time it shouldn't be) then we should be looking at creating a standard queue. Also, SQS provides myriad options for the queue including a bunch of time-outs retention settings, something called dead letter queueing and very exciting re-drive options. Feel free to spend a bit of time reading up on what these settings are and how they affect your application if you want. For now, I'm going to accept the defaults. Finally, most AWS services are bound to a specific region. SQS is no different. You don't need to worry about the region when your experimenting but for production apps or queues that will be accessed from other AWS hosted services, be sure to pick a region that is the same as or geographically close to your application to ensure maximum performance.

Once your queue has been created you must make a note of the URL that SQS has given your queue. This can be found by selecting the queue in the AWS SQS control panel:

AWS Simple Queue Service Console showing queue URL

Once you have the URL we create the two scripts, make sure to swap out the URL value for your actual queue!

send.py

import boto3
from botocore.exceptions import ClientError

def send_sqs_message(sqs_queue_url, msg_body):

    # Send the SQS message
    sqs_client = boto3.client('sqs')

    msg = sqs_client.send_message(QueueUrl=sqs_queue_url, MessageBody=msg_body)

    return msg

def main():

    sqs_queue_url = '<YOUR QUEUE URL HERE>' # the URL of the queue we created

    # Send some SQS messages
    for i in range(1, 6):
        msg_body = f'SQS message #{i}'
        msg = send_sqs_message(sqs_queue_url, msg_body)
        if msg is not None:
            print(f'Sent SQS message ID: {msg["MessageId"]}')

if __name__ == '__main__':
    main()

receive.py

import boto3
from botocore.exceptions import ClientError


def retrieve_sqs_messages(sqs_queue_url, num_msgs=1, wait_time=0, visibility_time=5):

    # Retrieve messages from an SQS queue
    sqs_client = boto3.client('sqs')

    msgs = sqs_client.receive_message(QueueUrl=sqs_queue_url,
                                          MaxNumberOfMessages=num_msgs,
                                          WaitTimeSeconds=wait_time,
                                          VisibilityTimeout=visibility_time)

    # Return the list of retrieved messages
    return msgs['Messages']

def delete_sqs_message(sqs_queue_url, msg_receipt_handle):

    # Delete the message from the SQS queue
    sqs_client = boto3.client('sqs')
    sqs_client.delete_message(QueueUrl=sqs_queue_url,
                              ReceiptHandle=msg_receipt_handle)

def main():

    sqs_queue_url = '<YOUR QUEUE URL HERE>' # the URL of the queue we created
    num_messages = 2

    # Retrieve SQS messages
    msgs = retrieve_sqs_messages(sqs_queue_url, num_messages)
    if msgs is not None:
        for msg in msgs:
            print(f'SQS: Message ID: {msg["MessageId"]}, '
                         f'Contents: {msg["Body"]}')

            # Remove the message from the queue
            delete_sqs_message(sqs_queue_url, msg['ReceiptHandle'])

if __name__ == '__main__':
    main()

If we run send.py we should see 5 messages being sent to SQS. Give it a few seconds and then reload the SQS console and you should see a value under the Messages Available heading for your queue.

We can now run receive.py and it should get at least 1 but no more than 2 messages off the queue, display them and then delete them. You will need to run receive.py a few times to empty your queue of messages:

Things to note here: Firstly SQS doesn't automatically handle the deletion of the messages once read. That is the responsibility of the user. The message will automatically become visible to other receivers 30 seconds later if it's not deleted. Secondly, the order of messages is not guaranteed (at least not in our example video above). This is expected behavior and, for most queue workloads, is acceptable.

SQS is (as the name suggests) a simple queue service with more in common with BeanstalkD than with RabbitMQ. The true power of SQS is that you don't have to worry about any of the infrastructure, and, of course, it's very tightly integrated with other AWS services. To get a feel for everything you can do with SQS please consult the documentation and then refer to Boto3 to work out how to do it in Python. Oh, and if you want more powerful queuing/messaging options with AWS then why not take a look at one of the 5 other alternatives they offer - you're sure to find something there that meets your needs.

Other options

It goes without saying that, in the world of modern software development, the choices we have in solutions to any given problem are myriad and varied. If none of the above takes your fancy, why not consider one of these decent alternatives?

Redis

Ah Redis, is there anything it can't do? Well, probably but it makes a decent quick and dirty job queue. This use of Redis even features in the Redis eBook along with code examples in what looks a lot like python 😎 Of course I'm not suggesting you deploy Redis purely for use as a queue but if you're already using it for caching or other functions then it might make more sense than rolling in another service.

Postgres

While Redis is very much a viable queue solution, a lot of RDBMS are not! The complexities of locking and releasing records to avoid race conditions mean that building even a simple queue backed by MySQL is quite difficult. Postgres, on the other hand, provides a feature called SKIP LOCKED which can allow users to build pretty powerful queues directly onto their existing database. See this HackerNews post for a full code example (and a lot of HN style arguing over whether this is a good idea or not). Just bear in mind the increase in traffic to your database server may impact on the day to day operations of your application.

Other Cloud-Based Options

If you don't like the idea of contributing to AWS' monopoly of cloud services but don't want to host your own server, there are other cloud queue provides. Iron.io's IronMQ, CloudAMQP's RabbitMQ as a Service, Microsoft's Azure Queue Storage, Google's Cloud PubSub and no doubt countless other options exist out there, catering for a range of technical and stack requirements. Costs & features vary massively as do integrations and SDKs or language support but you're a Python developer - you got this!

Final Thoughts

Queues as part of your application stack are not something that can (or should) be implemented quickly. Chucking a queue at an app and expecting it to solve performance issues is probably not going to work. In all cases, a queue or message broker adds complexity and introduces a bunch of new potential failure points that need to be considered, measured and mitigated. Also if your queue is purely to allow a single application to do stuff in the background there are other possible solutions to consider (AsyncIO anyone?).

That said, as someone who has used simple and complex queues to great effect across a range of systems (average request time down from 2000ms to 10ms is decent enough right?) I can certainly say that they are useful tools to have in your box. But as with anything in programming, it's up to you to work out if your problem is a nail that needs this particular hammer.

Happy queuing folks!