Windows Azure Queues are part of the Windows Azure Storage service, along with Blobs and Tables. Windows Azure Queues provide a means for sending and receiving messages. Since these messages are stored in Windows Azure Storage, they’re durable, meaning storage is triple-replicated within the datacenter, providing resiliency to hardware failures.
All Windows Azure Storage services are accessed via REST-based API. In the case of queues, this API consists of methods for queue management (listing queues, creating queues, deleting queues) as well as message management (storing, peeking, retrieving, and deleting). The PUT method enqueues a single message, which is a single transaction process. The GET method dequeues one or more messages, and the DELETE method deletes a single message. The dequeue involves the retrieval of the message (GET), followed by a request to remove the message from the queue (DELETE). The Queue in Windows Azure Storage can be accessible to any number of clients at a time. Also, the details related to the operations performed on Windows Azure Queue can be logged by enabling Storage Analytics logging.
Here we will demonstrate the use of Windows Azure Queue Storage from a Java application running locally or within a role. We recently published CloudNinja for Java to github, a reference application illustrating how to build multi-tenant Java based applications for Windows Azure. CloudNinja for Java uses Windows Azure Queue Storage for scheduler operations.
We will discuss the following activities:
-
-
Create a queue
-
Insert, get, delete a message from a queue
-
Peek at a message in a queue
-
De-queue a message in a queue
-
Deal with a poison message
-
Delete a queue
-
Prerequisites
The prerequisites for using Windows Azure Queue Storage service from a Java application are:
-
Windows Azure Libraries for Java
-
Windows Azure SDK
-
Java Development Kit (JDK)
Creating a Java Application to Access the Queue Storage Service
We add the following import statements to the Java classes that we use to access the Windows Azure Queue Storage service
// Import following to use queue API’simport com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;import com.microsoft.windowsazure.services.core.storage.StorageException;import com.microsoft.windowsazure.services.queue.client.CloudQueue;import com.microsoft.windowsazure.services.queue.client.CloudQueueClient;import com.microsoft.windowsazure.services.queue.client.CloudQueueMessage; |
Retrieving a Storage Account
To retrieve a storage account, initialize an object of the CloudStorageAccount class. The initialized object represents the storage account. We can initialize CloudStorageAccount using a Windows Azure Storage account or an emulated storage account (Storage Emulator account).
Retrieving a Windows Azure Storage Account
We first need to retrieve the Windows Azure Storage account using the CloudStorageAccount class. The storage account can be retrieved by parsing the connection string using the CloudStorageAccount.parse method. The connection string consists of the default endpoint protocol, storage account name, and storage account key.
Here is the sample code of retrieving the Windows Azure Storage account
// Define the connection-string with your valuespublic static final String storageConnectionString = DefaultEndpointsProtocol=http;” + “AccountName=your_storage_account;” + “AccountKey=your_storage_account_key”; // Retrieve storage account from connection-stringCloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); |
In this code, the storage account is specified as AccountName and the primary access key of the storage account is specified as AccountKey. The primary access key is listed in the Windows Azure management portal.
Working with Storage Account in Local Emulator
Windows Azure SDK provides a Storage Emulator that emulates Windows Azure Storage, and is backed by a local SQL Server instance (SQL Express, by default). While the storage emulator is fine for development, it differs from Windows Azure Storage. Please see this MSDN article for details about specific differences.
The code below retrieves the emulated storage account. Before running the following code, ensure that Storage Emulator is up and running.
CloudStorageAccount storageAccount = CloudStorageAccount.getDevelopmentStorageAccount(); |
While developing an application the CloudStorageAccount.getDevelopmentStorageAccount method can be used to access the emulated storage account. This is particularly useful if the developer is not having access to the Windows Azure Storage account. However, you should not use this method in code that you deploy to Windows Azure, because the development storage account is not available in Windows Azure.
An alternative approach to accessing the local emulator storage account is to access it just like you would access a real storage account, with a storage account name and key in your configuration file. The emulator account has a special account name and key:
-
Account name: devstoreaccount1
-
Account key: Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
You can place these in the local configuration file, and place your real credentials in the cloud configuration file, allowing you to easily run code against either account without changing any code.
Development storage account details are documented in this MSDN article.
Performing Operations on Queue Storage from a Java Application
The next step is to get a reference to our queue. To connect to a queue, we need to first connect to the Storage Account that we have initialized earlier and then to the Queue Storage service. Queue storage maintains a hierarchy. The queue is contained in the Queue Storage service, which is contained in our Storage Account in Windows Azure.
To access the Queue Storage service, a queue client is required. We use the CloudQueueClient class to get the reference to queues. Here is the sample code of creating a queue client.
//Create the queue clientCloudQueueClient queueClient = storageAccount.createCloudQueueClient(); |
This queueClient instance can be used to perform various operations on the queue.
How to Create a Queue
Use the CreateIfNotExist method to create a queue. This method first checks whether a queue with the same name exists in Windows Azure and then creates a queue only if the same queue name does not exist.
The following code creates a queue object (of type CloudQueue) that allows us work with the queue that we have created. Remember that the name of the queue must be in lowercase.
// Retrieve the reference to the queueCloudQueue queue = queueClient.getQueueReference(“commandqueue”); // create the queue if not readily existsqueue.createIfNotExist(); |
Queues Naming Convention
Queue names are alphanumeric and lowercase. For complete naming rules, please view this article.
How to Insert a Message into a Queue
To insert a message into an existing queue, first create a new CloudQueueMessage. Next, call the addMessage method. A CloudQueueMessage class can be created from either a string (in UTF-8 format) or a byte array.
// Retrieve the reference to the queueCloudQueue queue = queueClient.getQueueReference(“commandqueue”); // Create a message and add it to queueCloudQueueMessage message = new CloudQueueMessage(“Start”); // Add the message to the queuequeue.addMessage(message); |
How to Peek at a Message in a Queue
The peek message operation retrieves one or more messages from the front of the queue. This operation doesn’t change the visibility timeout interval of the message. It also does not increment the dequeueCount (the property containing the number of times the message has been retrieved) of the peeked messages. This operation only peeks the visible messages.
Using the peekMessage method, the following code snippet can retrieve a single message from the front of the queue.
// Retrieve the reference to the queueCloudQueue queue = queueClient.getQueueReference(“commandqueue”); // Peek at the messagequeue.peekMessage(); |
How to Dequeue a Message from a Queue
Dequeuing a message means removing the message from the queue. Dequeue message is a two-step process. In the first step, the message is retrieved from the queue using the retrieveMessage method. On retrieval, the message remains in the queue, but becomes invisible for a specified time period (defaulting to 30 seconds). Once the message is retrieved, it is deleted using the deleteMessage method.
// Retrieve the reference to the queueCloudQueue queue = queueClient.getQueueReference(“commandqueue”); // Retrieve the first visible message in the queueCloudQueueMessage retrievedMessage = queue.retrieveMessage(); // Process the message in less than 30 seconds, and then delete the message.queue.deleteMessage(retrievedMessage); |
How to Deal with Poison Messages
To understand about poison messages, we need to have an idea of the message life cycle.
The Message Lifecycle
When we retrieve a message, the message is not removed from the queue. It just becomes invisible for some time, which is known as visibility timeout interval. By default, the visibility timeout interval is of 30 seconds. Once the message is processed, it can only be deleted by calling the deleteMessage method.
Every time a message is retrieved from the queue, the application can determine the timeout interval based on the processing logic. The application also gets a unique pop receipt for the operation. The pop receipt is used while performing the deleteMessage operation on the queue. This receipt is an important part of the message life cycle in the queue.
What is a Poison Message
The application sometimes fails to process a message from a queue. The visibility timeout interval for the message expires and the message reappears in the queue. The messages that continuously fail to get processed are called poison messages. Poison messages should be removed from the queue because they unnecessarily keep the application busy trying to process them.
A message processing may fail because the application crashes or takes too long to process the message. In this case, the message reappears in the queue and can be picked up by another application or thread. If another application retrieves the same message and gets a pop receipt, the pop receipt received by the first application becomes invalid. Now if the first application tries to delete the message, the application receives an exception stating that the message has been picked up by another process.
Dealing with a Poison Message
When the application retrieves a message from a queue using the retrieveMessage method, the message dequeueCount property is incremented by 1. For example, if a message is being retrieved for the first time, this property value will be incremented from 0 to 1. By using this property, the application can determine the number of times the message has been retrieved from the queue for processing.
If the application fails to process a message for the specified dequeueCount threshold value, that message is considered as a poison message. The following code snippet demonstrates the detection of a poison message.
// Get the reference to the queueCloudQueue queue = queueClient.getQueueReference(“commandqueue”); // Retrieve the message from the queueCloudQueueMessage reterivedMessage = queue.retrieveMessage(); // Get the DequeueCountint dequeueCount = reterivedMessage.getDequeueCount(); // You need to check for and set a threshold for this dequeueCountif ( dequeueCount >= 2) { // Deal accordingly queue.deleteMessage(reterivedMessage);} else { // Your application logic} |
The strategy for handling poison messages is dependent on the application design, and it can be tricky. The application may opt to preserve a poison message either in a queue, table, or blob, from where the message can be again retried for processing or can be analyzed by the application administrator for further actions. Another strategy can be to log the details of the poison message and then delete the message.
How to Change the Content and Visibility Timeout Interval of a Message
We can change the content and visibility timeout interval of a message. You should change the visibility timeout interval if the application takes more time than the current visibility timeout interval to process the message. The following code updates the queue message with new content and extends its visibility timeout interval by another 60 seconds.
// Retrieve the reference to the queueCloudQueue queue = queueClient.getQueueReference(“commandqueue”); // Retrieve the messageCloudQueueMessage reterivedMessage = queue.retrieveMessage(); // Update the messagereterivedMessage.setMessageContent(“Updated State.”); // create EnumSet for updating visibility time and content EnumSet<MessageUpdateFields> updateFields = EnumSet.of(MessageUpdateFields.CONTENT, MessageUpdateFields.VISIBILITY); // Update the queue with the messagequeue.updateMessage(reterivedMessage, 60, updateFields, null, null); // Updated message will now only be visible after 60 seconds |
How to Delete a Queue
The queues are managed by the Windows Azure Queue Storage service, and each queue consumes the system resources. There can be scenarios when the application may need to create a queue on the fly for temporary usage. Once all messages from the temporary queue are processed, this queue will not be referred in future. The developer must delete such queues to release the resources.
To delete a queue with all its messages, call the delete method on the queue object.
// Retrieve the reference to the queueCloudQueue queue = queueClient.getQueueReference(“commandqueue”); // Delete the queue from Storage Servicequeue.delete(); |