Sunday, June 15, 2014

Throwing a Great Block

Last year I was working on a cloud-hosted Windows service for a client that contained an application-specific logging implementation. The existing architecture had log entries posted at various process points, i.e. file discovery, pickup, dropoff, and download. The log code would post a message to the Microsoft Messaging Queueing service (MSMQ) and a separate database writer service would dequeue those messages and post them to a series of tables in SQL Server.

Lagging The Play

While this setup worked perfectly well it had one minor issue - the queueing of a log message to MSMQ happened sequentially. That means that while the service was attempting to post a log message to the queue all other file processing was temporarily suspended. Since posting a log message to MSMQ means you're performing an inter-process communication there will be a noticeable lag imposed on the calling thread. Add to that the possibility that the MSMQ service could be located on another server and you've now imposed network lag time on the calling process as well. That's potentially alotta-lag! In the worst possible case, if MSMQ cannot be reached for some reason file processing could be suspended for a very long time. For a platform that expects to be able to process thousands of messages a day this was clearly not going to work as a long term solution. However, the client wanted to retain the use of MSMQ as a persistent message forwarding mechanism so that if the writer service was unavailable the log messages would not end up getting lost.

Block For Me

It seemed clear that what was needed was some way for the service to save log messages internally for near-term posting to MSMQ in a way that would minimally impact file processing. What came to mind initially was to have an internal Queue object on which the service could store log messages that could be dequeued and posted to MSMQ by another thread. It's a classic Producer-Consumer pattern1. While this is a threading implementation that is not of surpassing difficulty to implement it has some subtleties that make it non-trivial. First, all access to the Queue object has to be thread-safe. Second, the MSMQ posting thread needs to enter a low-CPU-load no-operation loop while it's waiting for a log message to be queued. Wouldn't it be nice if there was something built into the .Net Framework to do all this?

Well, sometimes Microsoft gets it right. In the .Net Framework 4 release Microsoft added something called a Blocking Collection2 that does exactly what we needed. It allows for thread-safe Producer-Consumer patterns that do not consume CPU resources when there is nothing on the queue.

Here's an example of how to implement it in a simple console application.

First, we'll need a message class. In the service for the client the log information message was more complex, but this should give you the general idea.

The real "meat" of the operation is in the class that encapsulates the blocking collection. Here's the first portion of the class definition.
You'll notice that the class implements the IDisposable interface. This is so that the thread that dequeues the messages from the blocking collection can clean up after itself. This will be seen in another section of the code for this class.

You'll also notice that when the BlockingCollection is defined we specify the class of objects that will be placed on the collection. However, when we instantiate the collection we signify that it should use a ConcurrentQueue object as the backing data store for the blocking collection. This ensures that the items placed in the collection will be handled in a thread-safe manner on a first-in, first-out (FIFO) basis.

The finalizer method merely calls our Dispose method with a parameter indicating that this was called from the class' destructor, a common patterm for IDisposable implementations3. The Dispose methods will be shown in their entirety later in this post.

The AddLog method is very simple; it invokes the blocking collection's Add method to enqueue the message in a thread safe manner. The DequeueMessageThread method appears to be an endless loop that keeps attempting to dequeue a message, causing a CPU spike from the tight looping. But here's where the magic of the blocking collection comes into play. The Take method of the blocking collection will enter into a low-CPU wait state if nothing is found on the queue, blocking the loop from proceeding. As soon as a message is enqueued the Take method will return from the wait state and the loop will proceed. Note that the Take mehod will also return immediately if the blocking collection has been closed down, indicating completion, hence the IsCompleted check right after the call.

The exception handler in the method captures two specific exceptions:
  1. The InvalidOperationException will be signaled if the blocking collection is stopped. We'll see this in the Dispose method;
  2. The ThreadAbortException will be signaled if the thread had to be killed because the Dispose method timed out waiting for the thread to finish.
In this code snippet the first Dispose method is our public interface that satisfies the requirement for IDisposable implementation. It simply calls our private Dispose method that takes a parameter indicating whether it was called from the class destructor method.

The second private Dispose method is where some housekeeping for the blocking collection and dequeue thread happens. First we call the blocking collection's CompleteAdding method. This will disallow any further additions to the queue, minimizing the chance that the dequeue thread will never end because messages continue to be added. We then attempt to wait for the thread to complete by calling the thread's Join method, specifying a timeout value for the thread. If the thread is not complete within the specified timeout we forcibly destroy it and exit. Finally, if called from the class' destructor we can suppress the finalize method of the garbage collector.

To utilize a producer-consumer queue like this one is quite simple:
The using statement ensures that the queue's Dispose method is invoked upon completion, thereby stopping the dequeing thread. When executed in a loop like this one that enqueues 100 messages the tail end of the output looks like this:

Enqueueing: Message with ID 92 and value Message text # 92.
Enqueueing: Message with ID 93 and value Message text # 93.
Enqueueing: Message with ID 94 and value Message text # 94.
Dequeueing: Message with ID 88 and value Message text # 88.
Dequeueing: Message with ID 89 and value Message text # 89.
Dequeueing: Message with ID 90 and value Message text # 90.
Dequeueing: Message with ID 91 and value Message text # 91.
Enqueueing: Message with ID 95 and value Message text # 95.
Enqueueing: Message with ID 96 and value Message text # 96.
Enqueueing: Message with ID 97 and value Message text # 97.
Enqueueing: Message with ID 98 and value Message text # 98.
Dequeueing: Message with ID 92 and value Message text # 92.
Dequeueing: Message with ID 93 and value Message text # 93.
Dequeueing: Message with ID 94 and value Message text # 94.
Dequeueing: Message with ID 95 and value Message text # 95.
Enqueueing: Message with ID 99 and value Message text # 99.
Enqueueing: Message with ID 100 and value Message text # 100.
Dequeueing: Message with ID 96 and value Message text # 96.
Dequeueing: Message with ID 97 and value Message text # 97.
Dequeueing: Message with ID 98 and value Message text # 98.
Dequeueing: Message with ID 99 and value Message text # 99.
Dequeueing: Message with ID 100 and value Message text # 100.
Shutting down queue. Waiting for dequeue thread completion.
Dequeue thread complete.

As you can see the dequeue process slightly lags the enqueue process, as you would expect for processes running in separate threads. The messages are interspersed as the threads compete for the shared resource.

Finishing It Off

So what we've demonstrated is a way to implement a producer-consumer pattern without writing a lot of thread management code. While this pattern is not applicable in a great many situations it certainly has its uses. Any time you need to queue up items for processing but don't want to slow down the primary process give this pattern a try.


No comments:

Post a Comment