Rhino Service Bus

A developer friendly service bus for .NET

Rhino Service Bus is a messaging framework that supports transactional, durable, reliable, distributed, asynchronous messaging.

Rhino Service Bus currently supports two transports MSMQ and Rhino.Queues.

Where to download?

You can get the latest recommended version with Nuget.

Where to ask questions?

Check out on Rhino Tools Discussion Group

Learn

API introduction

IServiceBus

  • CurrentMessageInformation – Stores message and information about the message for the current message being processed.
  • Endpoint – The endpoint for the bus being hosted.
  • AddInstanceSubscription – This method will keep a reference locally to the OcassionalConsumerOf that is subscribing. The method returns IDisposable and when disposed will remove the subscription. The method sends a AddInstanceSubscription message to the confgured owner.
  • ConsumeMessages – This will directly invoke consumers on the local service bus without using any queuing or transport.
  • DelaySend – Waits to send the message until the specified time. This can be a great way to replace timers in your application because the service bus takes care of it for you.
  • HandleCurrentMessageLater – Puts the message back into the queue. This is essentially a loop where you can wait to process a message until a condition has been met.
  • Notify – Will send a message to each consumer / endpoint that has subscribed to the message argument.
  • Publish – Same as Notify. However, if no subscribers are found a MessagePublicationException will be thrown.
  • Reply – Will send a message back to the CurrentMessageInformation.Source endpoint.
  • Send – Sends a message to the message owner specified in the configuration, or to the Endpoint specified in the parameter arguments.
  • Subscribe – Will send an AddSubscription message to the configured message owner for the message type being subscribed. This is typically handled for you automatically for any ConsumerOf, InitiatedBy, and Orchestrates.
  • Unsubscribe – Will send a RemoveSubscription message to the configured owner.

IStartableServiceBus

  • Start – Used when the bus must receive messages (not using IOnewayBus), calling Start will begin listening for messages. When using the service bus host this is not needed.

Consumers

Consumers are similar to how you might think of an endpoint contract in WCF, but is really just a simple way to say I want to handle this message type. One important detail is that it doesn’t matter how the message reaches an endpoint for instance Send, Reply, Publish, or Notify. Below describes each kind of consumer in more detail.

  • ConsumerOf – Consumes messages for the specified type. The bus on startup will automatically send AddSubscription messages to the message owner.
  • OccasionalConsumerOf – Consumes messages for the specified type, but only when programmatically subscribed by calling bus.AddInstanceSubscription. It is your responsibility to remove the subscription as well. If a message is sent while not subscribed the message will be placed in the Discarded sub-queue.
  • InitiatedBy – Similar to ConsumerOf, but used to start a Saga.
  • Orchestrates – Similar to ConsumerOf but in order to consume the message the Saga must be started and IsComplete must be false. Otherwise the message will be placed in the Discarded sub-queue.
  • Consumer.SkipAutomaticSubscription – Consumes messages via Send / Reply only and no subscription is required. Typically there is no harm in having a subscription, so using this is only necessary if you really want it.

Sample Send / Reply

This sample demonstrates messaging similar to Request / Response.

using Demo.Messages;
using Rhino.ServiceBus;
namespace Demo.Client
{
    public class PingInputScreen
    {
        private readonly IServiceBus bus;
        public PingInputScreen(IServiceBus bus)
        {
            this.bus = bus;
        }
        public void SendMessage()
        {
            bus.Send(new PingMessage());
        }
    }
}
using System;
using Demo.Messages;
using Rhino.ServiceBus;
namespace Demo.Server
{
    public class PingConsumer : ConsumerOf<PingMessage>
    {
        private readonly IServiceBus bus;
        public PingConsumer(IServiceBus bus)
        {
            this.bus = bus;
        }
        public void Consume(PingMessage message)
        {
            Console.WriteLine("Received ping message");
            bus.Reply(new PongMessage());
            Console.WriteLine("Sent reply pong message");
        }
    }
}

Sample Pub / Sub

The sample below demonstrates using the service bus to publish events to subscribed listeners. Note that you must publish messages from the service owner. Note also the difference between the command (something happening) and the event (something that has happened).

using Demo.Messages;
using Rhino.ServiceBus;
namespace Demo.Client
{
    public class CustomerAddressScreen
    {
        private readonly IServiceBus bus;
        public CustomerAddressScreen(IServiceBus bus)
        {
            this.bus = bus;
        }
        public void CustomerMoving(CustomerMovingModel customerMovingModel)
        {
            bus.Send(new CustomerMovingCommand
            {
                CustomerId = customerMovingModel.CustomerId,
                Line1 = customerMovingModel.Line1,
                Line2 = customerMovingModel.Line2,
                City = customerMovingModel.City,
                State = customerMovingModel.State,
                Zip = customerMovingModel.Zip,
            });
        }
    }
}
using Demo.Messages;
using Rhino.ServiceBus;
namespace Demo.Server
{
       public class CustomerMovingCommandConsumer : ConsumerOf<CustomerMovingCommand>
       {
               private readonly IServiceBus bus;
               private readonly IRepository repository;
               public CustomerMovingCommandConsumer(IServiceBus bus, IRepository repository)
               {
                    this.bus = bus;
                    this.repository = repository;
               }
               
               public void Consume(CustomerMovingCommand message)
               {
                    var customer = repository.Get<Customer>(message.CustomerId);
                    customer.Moved(message.Line1, message.Line2, message.City, message.State, message.Zip);
                    bus.Publish(new CustomerMovedMessage() {
                        CustomerId = message.CustomerId
                    });
               }
        }
}

Configuration

Rhino Service Bus (RSB from now on) is using the Castle Windsor syntax to provide configuration values. The configuration settings are aimed to be administrator (operations team) friendly, and should only specify values that are important for operations. Developer settings should be specified separately from operations settings.

RSB configuration is convention based, and it will configure itself properly based on the settings provided. The following is a sample RSB configuration file.

<configuration> 
  <configSections> 
    <section name="castle" 
             type="Castle.Windsor.Configuration.AppDomain.CastleSectionHandler, Castle.Windsor" /> 
  </configSections> 
  <castle> 
    <facilities> 
      <facility id="rhino.esb" > 
        <bus threadCount="5" 
             numberOfRetries="5" 
             endpoint="msmq://hibernatingrhinos.com/starbucks.customer"
             logEndpoint="msmq://hibernatingrhinos.com/audit.queue"
             queueIsolationLevel="ReadCommitted" 
             transactional="false" 
             /> 
        <messages> 
          <add name="Starbucks.Barista" 
               endpoint="msmq://hibernatingrhinos.com/starbucks.barista" 
               transactional="false"/> 
          <add name="Starbucks.Cashier" 
               endpoint="msmq://hibernatingrhinos.com/starbucks.cashier"/> 
        </messages> 
        <security> 
          <key>f/gdDWbDqHRvpqdRbTs3mxhGdZh9qCaDrasxJGXl+5s=</key> 
        </security> 
      </facility> 
    </facilities> 
   </castle> 
</configuration>

The first part define the config section handler. As mentioned, RSB uses the Castle Windsor configuration system to handle that.

The important part in the configuration is the “rhino.esb” facility element. Inside the rhino.esb facility, there are two mandatory configuration values, and , and one optional, .

The specifies the following options:

  • threadCount [mandatory] – the number of threads listening to the queue. The recommended setting is to have one thread per each core in the machine. For development, the recommended setting is 1 (to make debugging easier).
  • numberOfRetries [mandatory] – the number of times the bus will retry a message before sending it to the error queue. See error handling section for more details about that.
  • endpoint [optional – IOnewayBus only, required otherwise]- the queue endpoint this bus is listening to. Every bus can listen to a single queue endpoint. Currently, the bus support two endpoints types, “msmq://”, using the MSMQ queuing system, and “rhino.queues”://”, using the Rhino Queues system. Note that the use of endpoint type must be identical for all endpoints (that is, the endpoint specified in the element and the endpoints specified in the element).
  • logEndpoint [optional, for use with MSMQ only] – the endpoint to forward a copy of all messages to. Used for auditing purposes. RSB does nothing more than forward to this queue. It is your responsibility to do something with the messages once they get there.
  • queueIsolationLevel [optional, default to Serializable] – the isolation level for the DTC transaction that the bus is running all operations under.
  • transactional [optional, for use with MSMQ only, default to true] – whatever the MSMQ queue is transactional or not. This setting is useful if you want to be able to read from a remote non transactional queue.

The element specifies the owners of message types. Message ownership dictates two important things:

  • If a consumer wants to subscribe to a message, the message endpoint will specify where the subscription request will be sent.
  • If an endpoint wants to send a message, the message endpoint dictate where it will be sent to.

Each element in the define the following:

  • name – namespace of the messages belonging to a particular endpoint.
  • endpoint – the owner endpoint of those messages.
  • transactional [optional, MSMQ only, defaults to true] – whatever the owning queue is transactional or not.

The element is optional, and specify a Rijndael key for encrypting fields or whole messages across the wire.

Hosting

Rhino Service Bus comes with a very simple to use service host. Use this when your application needs to run as a Windows Service. The host expects your class library to have at least one implementation of AbstractBootstrapper. By default this will register with the container all IMessageConsumer implementations for the assembly being hosted.

Options for Rhino.ServiceBus.Host.exe

  • /Name: – [required] The service name that the host will use to create the windows service.
  • /Assembly: – [required] The full path to the assembly that will be hosted, must contain a class that implements AbstractBoostrapper.
  • /ConfigFile: – [optional] The full path to the applications configuration file.
  • /Action: – [required – Install, Debug, Server, Uninstall, Deploy] Install is used for the initial windows service installation. Debug is used when running from Visual Studio (sample below). Server is only used from the Windows Service and cannot be used directly. Uninstall removes the Windows Service. Deploy is used from the installer for the initial deployment.
  • /BootStrapper: – [optional] The type of BootStrapper to use. Only needed when assembly contains more than one BootStrapper.
  • Account: – [optional, MSMQ only] Used to setup the appropriate permissions when initially creating the queues.
  • Host:<string,string> – [optional] First argument is the type that implements IApplicationHost. Second argument is the assembly path containing the implementation of IApplicationHost.

When running under Visual Studio

Use the settings below when you need to Debug both client / server from within the same solution.

Solution Property settings.

How To Build

Rhino Service Bus uses Psake for its build script. You must first run the build script before building in Visual Studio. This is required because the build script will generate the AssemblyInfo.cs file and copy the 4.0 assemblies into the correct lib folder.

In order to run the builds script, run the following command from PS:

PS C:\Projects\rhino\rhino-esb> .\psake.ps1

Logging

The first thing most people will notice when building a multi-threaded distributed system is that it can be very hard to see what’s going on. Fortunately, RSB logs a lot of information and this is usually the easiest way to diagnose what’s happening under the covers.

SAMPLE CONFIGURATION

This will log everything to Visual Studio’s Output window.

<?xml version="1.0"?>
<configuration>
    <configSections>
        <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
    </configSections>
    <log4net>
        <appender name="TraceAppender" type="log4net.Appender.TraceAppender">
            <layout type="log4net.Layout.PatternLayout">
                <conversionPattern value="%date [%thread] %-5level [%ndc] - %message%newline"/>
            </layout>
        </appender>
        <root>
            <level value="DEBUG"/>
            <appender-ref ref="TraceAppender"/>
        </root>
    </log4net>
</configuration>

In your startup code:

private void InitializeLogger()
{
    XmlConfigurator.Configure();
}

COMMON LOG MESSAGES

  • Got message [MessageType], but had no consumers for it – This message can mean a few different things. You haven’t registered your consumer in Windsor. The message is for a Saga that is already complete. You have possibly sent your message to the wrong endpoint (by configuring the message owner incorrectly). The message is then placed in the Discarded subqueue.
  • Failed to process message – This message is logged typically when an error occurs inside the consumer. The error processing logic will apply here and retry until the numberOfRetries configuration value has been reached. In which case the message will be placed in the Errors subqueue.