Tuesday, October 2, 2012

RPC with Burrow.NET and RabbitMQ

Definitely RPC is nothing new, I just implemented it in a slightly different way. Burrow.RPC is some code I made recently for my project. It helps applications communicate in a RPC style using Burrow.NET.

To use Burrow.RPC you definitely need Burrow.NET package, and that's all. If you need some more utilities like JsonSerializer, you have to grab Burrow.Extras. So if you only need Burrow.NET for your RabbitMQ stuff, Burrow.RPC is absolutely not neccessary.

The way Burrow.RPC works is that it wraps your method call in a request object and wait for response object. Everything is done via Castle Dynamic Proxy at client and .NET Reflection at server so you don't have to write much extra code to make your application work in RPC way.

Let's say you have a "basic" interface and its implementation, now you want these existing code work remotely, just download package Burrow.RPC, declate the client using RpcFactory and create a listener at server side, also using RpcFactory and it just works.

I mentioned the "basic" interface because this library will work for all basic methods except methods have Action or Func param. It can work with methods have "out" or "ref" params as soon as the paramether is serializable, just don't make the method be so fancy ;), it will work. I recommend using JsonSerializer from Burrow.Extras package as it using Json.NET which is pretty awesome.

It's time for some sample. Given that you have following interface and its implementation:

public interface ISomeService
    [Async] // This method will be called asynchronously
    void Delete(string userId);        
    [RpcTimeToLive(100)] // in seconds
    void SendWelcomeMessage(EmailMessage message);
    IEnumerable<User> Get(int page, int pageSize, out int totalCount);    

public class SomeServiceImplementation : ISomeService
     // Implementation here

There are something to note about the above interface:

  1. Attribute Async decorated on method Delete will make this "void" method work asynchronously. That means the client code will not wait until it receives response after calling the method. It's pretty convenient in some casee when you don't need to wait for the result. So you cannot use Async attribute on methods that have return type or have "out" param.
  2. Attribute RpcTimeToLive decorated on method SendWelcomeMessage will make the request valid in 100 seconds. If the server is so busy to pickup messages on the request queue, and when it has a chance to do that but it's over 100 seconds since the request was created, the TimeOutException will be thrown from server and certainly the client will get that Excepton.
  3. The last method of this interface has an out param and a return type, so whenever you call this method, the client thread will be blocked until it receives the result from server. The out parameter "totalCount" will definitely have value

So at client code, you will need an instance of ISomeService which is just a mock object created by Castle dynamic proxy. All the method of this object will be intercepted and the method call together with its parameters will be wrapped in a RpcRequest which eventually will be published to the RabbitMQ server:

var client = RpcFactory.CreateClient<ISomeService>();

Ideally, the client code is a class which has dependency on ISomeService and we just have to register the proxy instance using your favorite IOC container.

At server side, we will need to do similar thing for the server, there must be a real instance of ISomeService which eventually handle the method call.

ISomeService realService = new SomeServiceImplementation();
IRpcServerCoordinator server = RpcFactory.CreateServer<ISomeService>(realService, serverId: "TestApp");

The RpcFactory basically will use Burrow.NET to subscribe to request queue, when a request comes, it will try to map the request to a valid method on ISomeService and delegate the invocation to the correct method of the real service object. After that, the result value if any together with all the params which are potentially changed during the method call will be wrapped in a RpcRespone object and sent back to the response queue. Please explicitly specify the generic type when creating the server as the type will be a part of the queue/exchange names for Requests and Responses.

There is one interesting thing to note here: every instance of the proxied client will have a separated response queue. Let's say you have 2 instances of your application running on different machine, these instances of the same application create the same RPC request and send to the same request queue, the server will somehow need to know where to send the response back to the correct client. That's why the response address (which is a queue name) will be different between clients. If the service is singleton and shared accross different objects in your client code, you have to guarantee the RPC method call in different threads should be synchronize properly. Otherwise, you could face some silly sittuation where result for the second call arrives to the code block where the first call happened. Anyway, too much information will make more confuse , please just know that Burrow.RPC is not thread safe for singleton object.

By default, Burrow.RPC will create queues and exchanges for you. It's neccessary as the response queue should be deleted once the client disconnected. When the client connect again, new response queue will be generated. You can change the exchange name, type and queue names by implementing following interface:
/// <summary>
/// This route finder is created for RPC communication.
/// Implement this interface if you want to use custom naming conventions for your Response/Request queue
/// </summary>
public interface IRpcRouteFinder
    /// <summary>
    /// If set to true, the library will create exchange and queue for you
    /// </summary>
    bool CreateExchangeAndQueue { get; }

    /// <summary>
    /// Default can be empty as the empty exchange is the built-in exchange
    /// </summary>
    string RequestExchangeName { get; }

    /// <summary>
    /// Should be either direct or fanout
    /// </summary>
    string RequestExchangeType { get; }
    /// <summary>
    /// If RequestExchangeName is empty, Burrow.RPC will route the RpcRequest object to this queue by publishing the msg to the empty exchange with the routing key is equal to this queue name
    /// </summary>
    string RequestQueue { get; }

    /// <summary>
    /// The response queue must be unique per instance of the RPC client
    /// <para>If you have 2 instances of the rpc clients, these instances should subscribe to different response queue as the responses from the rpc server must be routed to correct client</para>
    /// </summary>
    string UniqueResponseQueue { get; }

There are a few more things about the code but I think it's better for you to findout yourself if you're interested, there are quite a few comments in the code which could make thing clear eventhough I always believe if I try to comment to make thing clear, that means the code is not cleared enough. This blogspot should show only basic usage of the library. And furthur more, there are probably changes in the future for this RPC API so I'll leave it for now. Please grab the source code from github, and run the TestRPC console demo project to see how it works . Nuget package is also available: Burrow.RPC



Post a Comment