Monday, June 23, 2014

Using RabbitMQ Header exchange with Burrow.NET

In RabbitMQ, messages are published to the Exchange. However, subscription will be made against queues. Binding is the key to make the right messages to go to the expected queue. In this post, we will setup an exchange to filter messages that have ip = 1"0.10.0.1" in the header go to an expected queue. This will involve following steps:

  • - Create an exchange type Header
  • - Create a queue
  • - Bind the queue to the exchange with following arguments
       ip = "10.10.0.1"
       x-match = "all"
    
  • - Publish messages to the exchange with the ip in the header
  • - Subscribe the the queue and process such messages

RabbitMQ header exchange is capable of filtering multiple header value by allowing us to set x-match = all or any. And using a combination of the bindings will facilitate any complex routing logics. In this post, we are filtering only the ip in the header so x-match = all or any doesn't matter. Creating exchange, queue and binding can be done manually. However, below is the code to do those using Burrow.NET library:
Assume that the exchange name is : Burrow.Demo.E.Headers
the queue name is                : Burrow.Demo.Q.Message
and the message to use is        : IpMessage

1/ Setup queue, exchange and binding using Burrow.NET

var setup = new RabbitSetup(_connectionString);
var routeSetupData = new RouteSetupData
{
    RouteFinder = new ConstantRouteFinder("Burrow.Demo.E.Headers", "Burrow.Demo.Q.Message", "IpMessage"),
    ExchangeSetupData = new ExchangeSetupData
    {
        ExchangeType = ExchangeType.Headers,
    },
    QueueSetupData = new QueueSetupData
    {
        MessageTimeToLive = 100000,
    },
    SubscriptionName = SubscriptionName,
};
routeSetupData.OptionalBindingData["ip"] = "10.10.0.1";
routeSetupData.OptionalBindingData["x-match"] = "all";

setup.CreateRoute<AndreyMessage>(routeSetupData);
Normally, an application should have an implementation of IRouteFinder, I use ConstantRouteFinder in this example for simple demonstration. After executing the above code, if you check the created queue or exchange, you should see the expected binding.

Note: I set MessageTimeToLive = 100 seconds, it's optional.

2/ Publishing data to RabbitMQ Exchange

var tunnel = RabbitTunnel.Factory.Create(_connectionString);
tunnel.SetSerializer(new JsonSerializer());
tunnel.SetRouteFinder(new ConstantRouteFinder("Burrow.Demo.E.Headers", "Burrow.Demo.Q.Message", "IpMessage"));
tunnel.Publish(new IpMessage(), new Dictionary<string, object>{{"ip", "10.10.0.1"}});

3/ Subscribe data from Burrow.Demo.Q.Message

var tunnel = RabbitTunnel.Factory.Create(_connectionString);
tunnel.SetSerializer(new JsonSerializer());
tunnel.SetRouteFinder(new ConstantRouteFinder("Burrow.Demo.E.Headers", "Burrow.Demo.Q.Message", "IpMessage"));
tunnel.Subscribe(new SubscriptionOption<IpMessage>
{
    MessageHandler = msg => { /* Handle the message here */},
    SubscriptionName = "AwesomeApp",
    QueuePrefetchSize = 10, 
    BatchSize = 2 // 2 threads
});


Summary, I normally make a wrapper class on top of the publishing code to set the required header value to the dictionary from the message fields. We should have in mind fields in the header we're going to use to create the correct binding. However, if later in your project, a new field is added and it's a criteria to put in the header for routing messages, a new binding including that field should be added and we can simply delete the old binding.


Cheers

0 comments:

Post a Comment