2009-10-19

Queues in a Cloud

Queues are extremely powerful approaches to data storage and data exchange that are commonly used in application programming, especially when components of a program are executed in parallel. By supporting queues, cloud storage systems provide safe and efficient mechanisms by which programs can persist their state, communicate between components, and interact with other programs and systems.

What is a Queue

A Queue is an object with zero or more data "records", such that only the oldest item is accessed at any given time. Once the oldest item is removed, the second-oldest item is accessed, and so forth until the queue is empty. This provides what is known as "first-in, first-out" access to data.

In a standard object, when you update data, you update the only record in the object. But in a queue, this creates a new record. When you read the value of an object, you get the only record in the object, but in a queue, this returns the value of the oldest record. And when you delete the value of an object, you delete the object, but in a queue, you delete only the oldest record. Thus, unlike basic objects, updating and deleting records from a queue are no longer idempotent operations.

How are Queues Typically Used

Queues are typically used in two different situations:

Internal Work List - Inside a program, the program will store a queue of data that needs to be processed in a given order, such that as resources become available, the items from the queue can be accessed in order. In this use, the queue is being used to store state.

Inter-Process Communication - Between two programs, a queue will contain a list of items that need to be communicated from one program to another. As the sender program encounters data that needs to be sent to the receiver program, it enqueues the items into the queue, and as the receiver program is able to process data from the sender program, it dequeues items from the queue. In this use, the queue is being used to exchange state.

As a quick aside, TCP/IP is an example of a queue used for information exchange between two systems. When you write data to a TCP/IP connection, you are enqueueing the data for delivery, and when the destination application reads from the TCP/IP connection, it is dequeuing the data from the network abstraction.

These uses are best illustrated through an example. Let's say that we are designing a book scanning system that runs in the cloud. We have a series of image scanners that digitize pages of books, and we have an OCR program that converts the images into text that can be indexed for search.

A simple implementation would be to scan a page, OCR it, then move on to the next page, but that doesn't meet all of the criteria of a cloud solution, as we can't scale it. A good solution would be able to handle multiple scanners, and have multiple instances of the OCR process running in parallel. And that calls for queues.

Handling Multiple Writers

A queue can be used to aggregate data values from multiple writers into a single ordered set of values. This behaviour is perfect for logging, job aggregation and any other situation where data originating from multiple entities needs to be consumed by a single entity.

For example, in our book scanning example, a typical scanning workflow may have tens to hundreds of scanners running concurrently, where each scanned page needs to be run through an OCR (Optical Character Recognition) process in order to index the contents of each scanned page.

A cloud application built around queues is easily scaled by running multiple parallel instances of the scanning application, with each of the instances using a common cloud for storage. If all of these instances store scanned pages into the same queue, the interface to the OCR process (the queue) is the same regardless of the number of writers.

The logic for the scanning process would look something like this:

1. Scan Page
2. Write image as object into cloud
3. Enqueue image object ID into cloud
4. Repeat

The logic for the OCR process would look something like this:

1. Read from cloud queue to get the object ID of the next image to process
2. Read the image from cloud using the object ID
3. Perform OCR processing
4. Add OCR text to Index
5. Delete item from cloud queue
6. Repeat

Now, we can arbitrarily scale the number of scanning processes. But what if our OCR takes too long to handle the combined workload of all of these scanning processes?

Handling Multiple Readers

If our OCR processing takes far longer than the time required to scan a page, we need to be able to increase the performance of the OCR processing. And when we add multiple parallel page scanners, things get even worse. We could try to make a faster OCR engine, but it's far easier to be able to scale out the OCR processing by running multiple instances of the OCR processing in parallel.

In this case, we need to be able to have multiple readers of the queue. And, we need to ensure the following characteristics of our solution:

1. No two queue readers will get the same item
2. No items will be lost, even in the event of a queue reader failure

If you just run two queue readers in parallel, both of these situations can occur. If the two readers run lock-step, they will get the same item. And if they both delete, the second deleted item will be lost.

Thus, we need to introduce another concept in order to maintain these characteristics — the ability to atomically transfer an item from one queue to another. With this capability, our OCR process can be modified to ensure that even with multiple readers, no data is lost or processed twice:

1. Transfer item from cloud queue to worker queue
2. Read from worker queue to get the object ID of the next image to process
3. Read the image from cloud using the object ID
4. Perform OCR processing
5. Add OCR text to Index
6. Delete item from worker queue
7. Repeat

Using this approach, you can arbitrarily scale the number of reader processes without modification. And these workers can enqueue their results into a common queue, allowing the results to be recombined for further processing.

By leveraging queues, cloud storage allows the creation of complex reliable workflows that can be scaled arbitrarily and dynamically. It also facilitates the creation of loosely coupled reliable systems of interacting programs that work together to solve a given problem in a flexible and scalable manner.

No comments: