Using a JMS Exception Listener in the latest .NET Adapter, Part 1

The latest release of the JNBridge JMS Adapter for.NET  provides a mechanism that asynchronously detects dropped or severed connections to the JMS Server.  This enables building fault tolerant messaging clients that can manage communication with the server. This two part blog shows how to use this feature by implementing a WCF channel listener that automatically reconnects when it detects a severed connection, without faulting the WCF stack or service process.

Introduction

The Java Message Service specification provides a mechanism through which the server can notify its clients, such as the JMS Adapter for .NET, that the connection to each will be severed. A JMS Server can sever the connection to its clients, according to the specification: “…a JMS provider detects a problem with a connection, it will inform the connection’s Exception Listener, if one has been registered“. The most common “problem” detected by the server is that it is being robustly shutdown, so it notifies each connection asynchronously using the mechanism. The second most common problem results from activity monitors implemented by the server detecting idle connections. If the server detects an idle connection, it will dutifully notify the connection’s Exception Listener and sever the connection. Activity monitors are usually configured server-side in Connection Factories, though some vendors allow clients to also configure idle connection detection.  Using an Exception Listener allows the client to implement an explicit re-connection mechanism.

While the .NET Adapter for JMS exposes the JMS Message Listener mechanism in inbound operations like OnReceiveTextFromQueue, until recently the Exception Listener mechanism was not available. With the latest release of the adapter, version 4.0, the Exception Listener is available as two inbound operations, OnQueueConnectionException and OnTopicConnectionException. This blog demonstrates creating the inbound WCF channel listener that captures server shutdown exceptions and implementing a reconnect mechanism. In Part 1, the WCF services and reconnect mechanism are hosted in a stand-alone ServiceHost instance. In Part 2 of this blog, the WCF services and reconnect mechanism are hosted in IIS.

For more information on the .NET Adapter for JMS and inbound operations hosted in a ServiceHost instance, please see the Users’ Guide. For a quick introduction to the .NET Adapter for JMS, please see this online walk-through.

Prerequisites

Building this example requires the JNBridge JMS Adapter for .NET, Visual Studio 2010, 2012, 2013 or 2015, a Java 7 (or later) JRE and  ActiveMQ.

Generating the inbound WCF Channel Listener Stack

This screen shot shows the WCF  Add Adapter Service Reference design tool in Visual Studio configured to generate two inbound services. One channel listener, OnReceiveTextFromQueue, will consume messages from a queue while the other channel listener, OnQueueConnectionException, will handle server exceptions. The underlying mechanisms are a JMS Message Listener and a JMS Exception Listener, respectively.

AASR

The AASR design tool configured to build two WCF Channel Listeners

The resulting interfaces for the two channel listeners

[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")]
[System.ServiceModel.ServiceContractAttribute(Namespace="jms://JNBridge.JMSAdapter"
    , ConfigurationName="JNBridgeJmsAdapter")]
public interface JNBridgeJmsAdapter {
    
    [System.ServiceModel.OperationContractAttribute(
        Action="/Queues/Generic Queues/Text/receive_text_any_q_async"
        , ReplyAction="/Queues/Generic Queues/Text/receive_text_any_q_async/response")]
    void OnReceiveTextFromQueue(string name, string text);
    
    [System.ServiceModel.OperationContractAttribute(
        Action="/Queues/Generic Queues/Configuration/exception_listener_any_q_async"
        , ReplyAction="/Queues/Generic Queues/Configuration/exception_listener_any_q_async/response")]
    void OnQueueConnectionException(string exception);
}

Re-connection Implementation using System.ServiceModel.ServiceHost

When the implementation of the OnQueueConnectionException method in the above interface is invoked, it is interpreted as warning from the server that it will sever the connection. Most vendors do not provide information in the actual exception message about why the connection is to be severed, so it’s not worth logging.

A typical client strategy when a server connection exception is received is to close the connection and then try and periodically reconnect. While the adapter provides the ability to do this using standard Open and Close operations, the System.ServiceModel.ServiceHost container which hosts the channel listeners has a serial life-cycle of instantiation, configuration, connection, close and dispose. It does not provide an interface for re-connection. Moreover, the container knows very little about the state of the service object it is hosting: it doesn’t know or care that the connection has been closed.

The following class, FaultTolerantService, implements a re-connection strategy using the ServiceHost container. The reconnect mechanism is triggered by the WCF channel  listener.

    public class FaultTolerantService
    {
        public static ServiceHost theServiceHost = null;
        public int pollingPeriod = 3000;  // 3 seconds

        public void connect(Object stateInfo)...

        public void onClosed(object sender, EventArgs args)...
    }

Threads

The ServiceHost reference, theServiceHost, is a public static global. A ServiceHost instance is thread-safe only if it is a static class member. Instance members are not guaranteed to be thread-safe. In this implementation, the ServiceHost will be accessed by up to three threads. The connection exception handler, OnQueueConnectionException, is called in a JVM thread managed by the adapter’s underlying JMS session. Here is its implementation, along with the implementation of the channel listener handler that consumes messages.

 
 public class JMSAdapterBindingService : JNBridgeJmsAdapter 
 { 
    public virtual void OnQueueConnectionException(string exception)
    { 
       Console.WriteLine("Exception Listener called. The server will sever the connection."); 
       try 
       { 
          OperationContext.Current.Host.Abort(); 
       } 
       catch (Exception ex)   
       { 
          Console.WriteLine("In Exception Listener. ServiceHost.Abort() failed with: " + ex.Message); 
       }  
    } 

    public virtual void OnReceiveTextFromQueue(string name, string text) 
    { 
       Console.WriteLine("Message received from JMS destination, " + name + ", with message, " + text); 
    } 
 } 

The connection exception handler simply calls the ServiceHost’s Abort method. The ServiceHost also has a Close method, however, there can be a substantial time lag before the ServiceHost instance manifests a closed state. Calling Abort is an immediate state change, closing the ServiceHost and the communication stack it hosts. In addition, the ServiceHost fires its own ‘closed’ event, which is used in this implementation.  Here are the two methods in the class FaultTolerantService. The highlighted line adds the handler, onClosed,  to the ServiceHost’s closed event.

 
public void connect(Object singleAttempt)
{
    bool notConnected = true;
    while (notConnected == true)
    {
        if ( theServiceHost == null 
             || theServiceHost.State == CommunicationState.Closed 
             || theServiceHost.State == CommunicationState.Faulted )
        {
            theServiceHost = new ServiceHost(typeof(JMSAdapterBindingService));
        }
        try
        {
            theServiceHost.Open();
        }
        catch(Exception ex)
        {
            Console.WriteLine("Unable to connect to the JMS server with exception: " 
               + ex.Message);
            if ( (bool)singleAttempt == true )
            {
                break;
            }
            Thread.Sleep(this.pollingPeriod);
            continue;
        }
        theServiceHost.Closed += new EventHandler(this.onClosed);
        notConnected = false;
        Console.WriteLine("Connected to JMS server");
    }
}

public void onClosed(object sender, EventArgs args)
{
    ThreadPool.QueueUserWorkItem(this.connect, false);
}

The method, connect, initially creates a new ServiceHost instance. If the current ServiceHost is in a faulted or closed state, it is replaced.  If the ServiceHost.Open method fails, it is assumed that a connection could not be made. For convenience, the argument singleAttempt, if true, will exit the connect method. If false, the method will loop, trying to reconnect periodically where the period is determined by the member instance, pollingPeriod.

The onClosed event handler, called when the current ServiceHost fires its closed event, invokes the connect method in its own thread using a thread pool. This is done because the handler’s thread is still managed by the underlying JMS Session, it’s the same thread that called the WCF channel listener handler, OnQueueConnectionException. When the ServiceHost is explicitly aborted, the adapter is notified and closes the JMS session. Returning on the thread managed by the JVM allows the JMS Session to close.

This code completes the example.

    class Program
    {
        static void Main(string[] args)
        {
            FaultTolerantService aService = new FaultTolerantService();
            aService.PollingPeriod = 15000;
            aService.connect(true);
            // block termination
            Console.ReadLine();
        }
    }

Please see the post, Using a JMS Exception Listener in the new .NET Adapter, Part 2. Part 2 will discuss hosting the WCF service in IIS. The Visual Studio 2013 project for this post can be downloaded here.