Monday, October 15, 2012

Deploy .NET window service artifact to a remote server from TeamCity (Updated)

- This post is an update of my previous post. Basically, it's the same approach of 90% similar except I won't use Remote Powershell in this post. Config remote PowerShell is such a pain in the ass, I reckon. I had to setup a deployment on another box which have IIS actually but someone in my team had already removed the Default Web Site so I couldn't find any quick and easy way to make Remote Powershell work. The error message Powershell prints out is so stupid which does not even tell me what it wants.

“Set-WSManQuickConfig : The client cannot connect to the destination specified in the request. Verify that the service on the destination is running and is accepting requests. Consult the logs and documentation for the WS-Management service running on the destination, most commonly IIS or WinRM. If the destination is the WinRM service, run the following command on the destination to analyze and configure the WinRM service: “winrm quickconfig”. At line:50 char:33 + Set-WSManQuickConfig <<<< -force + CategoryInfo : InvalidOperation: (:) [Set-WSManQuickConfig], InvalidOperationException + FullyQualifiedErrorId : WsManError,Microsoft.WSMan.Management.SetWSManQuickConfigCommand“

- Ok, there is another reason to hate Microsoft. It's time to abandon PS, I tried pstools before Remote Power Shell and got other problems so I won't waste time to go back to the very old tool as Power Shell is much more power full. So writting a simple console WCF application to communitcate between TeamCity and the Remote server is my choice.

- And the tool's name is DD which is a shortname of "Distributed Deployment". In this post, I'll sum up with details how to setup deployment for a windows service from TeamCity.

- Unlike web application, a window service is often a long running process in background. A .NET windows service has an OnStop method for you to clean up resource before stopping, which is cool. HOWEVER, when you try to stop the service using "net stop servicename", it does stop the service but the process will not end as fast as it can. I reckon a .NET window service can host multiple window services which are classes inherit from ServiceBase class so it could be a reason that makes the window services manager wait a little while for all potential services within a process to stop before ending the main process.

- In some cases like mine, I want the service stop immediately when it can so I have to somehow call Environment.Exit to make the process stop asap. Apparently I cannot use TASK KILL like the previous post as it was such a hacky way and it could corrupt my data. So my approach is letting the program listen to a command, when receiving an exit signal, the app should cleanup resources and invoke Enrironment.Exit. So if you need something like this, go on reading.

I/ Things you need to prepare:

  • Remote server: aka target server, a server that we'll install the service on
  • TeamCity server: aka build server, a server that has TeamCity installed
  • 1 Project slot in Teamcity as I want to deploy whenever I click "Run" on a Teamcity deployment project instead of auto deployment after code build, so it'll cost you 1 project in 20 available slots of free TeamCity
  • Wget: this tool will download a compressed artifac from Teamcity
  • 7zip: this tool will be used to decompressed the artifac
  • DD: this tool will do 2 things: listen on a deployment command from teamcity and send exit signal to your long running window service

II/ Code involved:

1/ Assume that your service class is AwesomeService.cs, implement interface ISignalListener and add event WhenStop:
public partial class AwesomeService : ServiceBase, ISignalListener
    public Action WhenStop { get; set; }
    public void Exit()
        if (WhenStop != null)
    // ...

2/ In your service code, anywhere before service start such as Program.cs, add this code:
var svc = new AwesomeService();            
//NOTE: Wait for signal from Teamcity Deployment
Server.Start<ISignalListener>(svc, "AwesomeService");
Timer timer;
svc.WhenStop = () =>
    // NOTE: Will end process after 1 second
    timer = new Timer(o => Environment.Exit(0), null, 1000, Timeout.Infinite);
// ...

3/ Prepare the deploy.bat script. I like to control the deployment process in a batch file instead of implement the steps in a program as I think people will have their own steps and a batch file is simple enough to manage. Again this batch file will basically do these things:
  • Use wget to download the latest artifact from TeamCity.
  • Use 7zip to extract the artifact, copy it to a temporary folder.
  • Save the artifact to Artifacts folder
  • Backup the whole current service folder
  • Stop target window service
  • Copy over new files and old configuration files
  • Start target window service
  • Clean up

Here is the basic code that anyone can use, just keep it somewhere, we'll need to copy the batch file to RemoteServer.
@echo off
:: 0/ --------- Set some local variables
SET Environment.ExecutingFolder="C:\Deployment"
SET Environment.7zip="C:\Program Files (x86)\7-Zip\7z.exe"
SET Environment.Wget="C:\Deployment\wget.exe"

SET TeamCity.User=your-teamcity-account
SET TeamCity.Password=your-teamcity-password
SET TeamCity.BuildTypeId=teamcity-build-type
SET TeamCity.Artifact=awesomeservice.{build.number}.zip ::

SET AwesomeService.TargetFolderName=AwesomeService
SET AwesomeService.TargetFolderPath=C:\AwesomeService
SET AwesomeService.ServiceName=AwesomeService
SET AwesomeService.ImageName=AwesomeService.exe
CD /D %Environment.ExecutingFolder%
ECHO 1/ --------- Get latest artifact from TeamCity, AwesomeService
%Environment.Wget% -q --http-user=%TeamCity.User% --http-password=%TeamCity.Password% --auth-no-challenge
REN *.zip* *.zip
ECHO Found following artifact
DIR /B *zip

ECHO 2/ --------- Extract the artifact to folder __Temp ---------------
%Environment.7zip% e -y -o__Temp *.zip

ECHO 3/ --------- Store the artifact ------------------ 
MOVE /Y *.zip Artifacts\

ECHO 4/ --------- Backup current service folder --------------- 
for %%a in (%AwesomeService.TargetFolderPath%) do set Temp.LastDate=%%~ta
SET Temp.LastDate=%Temp.LastDate:~6,4%-%Temp.LastDate:~0,2%-%Temp.LastDate:~3,2% %Temp.LastDate:~11,2%%Temp.LastDate:~14,2%%Temp.LastDate:~17,2%
ECHO Last deployment: %Temp.LastDate%
ECHO Now backup files to folder %AwesomeService.TargetFolderName%.%Temp.LastDate%
XCOPY /E /I /H /R /Y %AwesomeService.TargetFolderPath% "%AwesomeService.TargetFolderName%.%Temp.LastDate%"

ECHO 5/ --------- Stop %AwesomeService.ServiceName% service ---------------
DD AwesomeService /wait 50
ECHO Wait 2 more seconds
ping -n 1 -w 2000 > NUL

ECHO 6/ --------- Deploy new files and copy over old configs ----------------------
ECHO ... Deploy latest assemblies
XCOPY /E /H /R /Y __Temp %AwesomeService.TargetFolderPath%

ECHO ... Deploy old configs 
COPY /Y "%AwesomeService.TargetFolderName%.%Temp.LastDate%\*.config" %AwesomeService.TargetFolderPath%

ECHO ... Delete log files 
DEL /F /Q %AwesomeService.TargetFolderPath%\Logs\log.txt* > NUL

ECHO 7/ --------- Start %AwesomeService.ServiceName% service ---------------
net start %AwesomeService.ServiceName% 

ECHO 8/ --------- Cleanup --------------------------------- 
::DEL /F /Q /S *jsessionid*
RD /S /Q __Temp

- The url to the artifac in TeamCity will look like:
http://teamcity-ip-address:80/repository/download/bt2/3907:id/ - So dedend on the build type id of your artifac, change it in the above deploy.bat

III/ Setup steps:

* On Remote Server

- Download and install 7zip
- Assume that you put all custom tools and Deploy.bat in C:\Deployment. Create folder C:\Deployment\Artifacs to store your teamcity artifacs.
The Deployment folder should look like this:
 Volume in drive C is OS
 Volume Serial Number is ABCD-EFGH

 Directory of C:\Deployment

14/10/2012  02:59 PM    <DIR>          .
14/10/2012  02:59 PM    <DIR>          ..
14/10/2012  02:58 PM    <DIR>          Artifacts
14/10/2012  02:58 PM            38,912 DD.exe
14/10/2012  02:58 PM             2,558 Deploy.bat
14/10/2012  02:58 PM           401,408 wget.exe
               4 File(s)        442,878 bytes
               3 Dir(s)  405,532,868,608 bytes free

- Run DD tool:
C:\Deployment\DD -listen 5555 -token YourS3cur!tyTok3n

- Or install it as a window service, remember to start it once installed ;)
C:\Windows\Microsoft.NET\Framework64\v4.0.30319\InstallUtil.exe /listen=5555 /token=YourS3cur!tyTok3n /i C:\Deployment\DD.exe

* On Team City Server

- Copy DD.exe and put somewhere such as C:\Program Files\DD.exe
- Add new project such as: Awesome service deployment
- Add 1 build step like this picture:

Command parameters:
-execute C:\Deployment\deploy.bat -target remoteserver-ip:5555 -token YourS3cur!tyTok3n

That's it. Your AwesomeService should be deployed whenever you click Run on the deployment project from TeamCity. Obviously you could adjust some thing in the deploy.bat to suit your needs, let me know if you have any problems.


Tuesday, October 2, 2012

RPC with Burrow.NET and RabbitMQ

Definitely RPC is nothing new, I just implemented it in a slightly different way. Burrow.RPC is some code I made recently for my project. It helps applications communicate in a RPC style using Burrow.NET.

To use Burrow.RPC you definitely need Burrow.NET package, and that's all. If you need some more utilities like JsonSerializer, you have to grab Burrow.Extras. So if you only need Burrow.NET for your RabbitMQ stuff, Burrow.RPC is absolutely not neccessary.

The way Burrow.RPC works is that it wraps your method call in a request object and wait for response object. Everything is done via Castle Dynamic Proxy at client and .NET Reflection at server so you don't have to write much extra code to make your application work in RPC way.

Let's say you have a "basic" interface and its implementation, now you want these existing code work remotely, just download package Burrow.RPC, declate the client using RpcFactory and create a listener at server side, also using RpcFactory and it just works.

I mentioned the "basic" interface because this library will work for all basic methods except methods have Action or Func param. It can work with methods have "out" or "ref" params as soon as the paramether is serializable, just don't make the method be so fancy ;), it will work. I recommend using JsonSerializer from Burrow.Extras package as it using Json.NET which is pretty awesome.

It's time for some sample. Given that you have following interface and its implementation:
public interface ISomeService
    [Async] // This method will be called asynchronously
    void Delete(string userId);        
    [RpcTimeToLive(100)] // in seconds
    void SendWelcomeMessage(EmailMessage message);
    IEnumerable<User> Get(int page, int pageSize, out int totalCount);    

public class SomeServiceImplementation : ISomeService
     // Implementation here

There are something to note about the above interface:

  1. Attribute Async decorated on method Delete will make this "void" method work asynchronously. That means the client code will not wait until it receives response after calling the method. It's pretty convenient in some casee when you don't need to wait for the result. So you cannot use Async attribute on methods that have return type or have "out" param.
  2. Attribute RpcTimeToLive decorated on method SendWelcomeMessage will make the request valid in 100 seconds. If the server is so busy to pickup messages on the request queue, and when it has a chance to do that but it's over 100 seconds since the request was created, the TimeOutException will be thrown from server and certainly the client will get that Excepton.
  3. The last method of this interface has an out param and a return type, so whenever you call this method, the client thread will be blocked until it receives the result from server. The out parameter "totalCount" will definitely have value

So at client code, you will need an instance of ISomeService which is just a mock object created by Castle dynamic proxy. All the method of this object will be intercepted and the method call together with its parameters will be wrapped in a RpcRequest which eventually will be published to the RabbitMQ server:

var client = RpcFactory.CreateClient<ISomeService>();

Ideally, the client code is a class which has dependency on ISomeService and we just have to register the proxy instance using your favorite IOC container.

At server side, we will need to do similar thing for the server, there must be a real instance of ISomeService which eventually handle the method call.

ISomeService realService = new SomeServiceImplementation();
IRpcServerCoordinator server = RpcFactory.CreateServer<ISomeService>(realService, serverId: "TestApp");

The RpcFactory basically will use Burrow.NET to subscribe to request queue, when a request comes, it will try to map the request to a valid method on ISomeService and delegate the invocation to the correct method of the real service object. After that, the result value if any together with all the params which are potentially changed during the method call will be wrapped in a RpcRespone object and sent back to the response queue. Please explicitly specify the generic type when creating the server as the type will be a part of the queue/exchange names for Requests and Responses.

There is one interesting thing to note here: every instance of the proxied client will have a separated response queue. Let's say you have 2 instances of your application running on different machine, these instances of the same application create the same RPC request and send to the same request queue, the server will somehow need to know where to send the response back to the correct client. That's why the response address (which is a queue name) will be different between clients. If the service is singleton and shared accross different objects in your client code, you have to guarantee the RPC method call in different threads should be synchronize properly. Otherwise, you could face some silly sittuation where result for the second call arrives to the code block where the first call happened. Anyway, too much information will make more confuse , please just know that Burrow.RPC is not thread safe for singleton object.

By default, Burrow.RPC will create queues and exchanges for you. It's neccessary as the response queue should be deleted once the client disconnected. When the client connect again, new response queue will be generated. You can change the exchange name, type and queue names by implementing following interface:
/// <summary>
/// This route finder is created for RPC communication.
/// Implement this interface if you want to use custom naming conventions for your Response/Request queue
/// </summary>
public interface IRpcRouteFinder
    /// <summary>
    /// If set to true, the library will create exchange and queue for you
    /// </summary>
    bool CreateExchangeAndQueue { get; }

    /// <summary>
    /// Default can be empty as the empty exchange is the built-in exchange
    /// </summary>
    string RequestExchangeName { get; }

    /// <summary>
    /// Should be either direct or fanout
    /// </summary>
    string RequestExchangeType { get; }
    /// <summary>
    /// If RequestExchangeName is empty, Burrow.RPC will route the RpcRequest object to this queue by publishing the msg to the empty exchange with the routing key is equal to this queue name
    /// </summary>
    string RequestQueue { get; }

    /// <summary>
    /// The response queue must be unique per instance of the RPC client
    /// <para>If you have 2 instances of the rpc clients, these instances should subscribe to different response queue as the responses from the rpc server must be routed to correct client</para>
    /// </summary>
    string UniqueResponseQueue { get; }

There are a few more things about the code but I think it's better for you to findout yourself if you're interested, there are quite a few comments in the code which could make thing clear eventhough I always believe if I try to comment to make thing clear, that means the code is not cleared enough. This blogspot should show only basic usage of the library. And furthur more, there are probably changes in the future for this RPC API so I'll leave it for now. Please grab the source code from github, and run the TestRPC console demo project to see how it works . Nuget package is also available: Burrow.RPC


Tuesday, September 11, 2012

Dual-invocation using Autofac interception - AOP

- The project I've been working on used to have SQL as its primary datastore. We're implementing another data store using MongoDB as it will replace the SQL one soon. During the migration process, we have 2 versions of the website running parallel. The production version is using SQL which is viewed by customers and the other one using MongoDB which is being tested by our staff. So once the Mongo version development is finished, it will be kept running in parallel with the SQL version until we're completely confident that all the features are working fine. During that time, any changes made by users to the SQL db have to be applied to MongoDB.

- There're quite a few solutions in my mind. However, we've had some sort of DataAccess dlls to access Sql database before and another implementation of same interfaces which talk to MongoDB have been implemented. So the options is narrowed down to something like an adapter pattern which takes 2 instances of the same interface, invoke method on the first one and then invoke the same method on the second instance. Being a good fan of Composition over Inheritance, I was thinking of something like:
public class DualService<T> : T
    private readonly T _primary;
    private readonly T _secondary;
    public DualService(T primary, T secondary)    
        _primary = primary;
        _secondary = secondary;    
    public void MethodA()
    // So on

- Well, this approach is good only if the generic T interface has just a few methods. Ours has over 30 methods and it keeps counting ;), it's implemented by the "Outsource team" So I dropped this option, and thought about the Autofac Interceptor. The intercepter I'm gonna implement will take the secondary implementation and invoke the same method as the invocation if:

  1. The method is not decorated by IgnoreInterceptAttribute
  2. The method has a result type void. (Well, a command should not have return type here if you're curios about the reason)
  3. The method is decorated by DualInvocationAttribute

- Here is the implementation. This approach requires Castle.Core and AutofacContrib.DynamicProxy2. The 2 attributes are just simple attributes I created to enhance the filter:
public class IgnoreInterceptAttribute : Attribute

public class DualInvocationAttribute : Attribute

public class DualInvocation<T> : IInterceptor where T: class 
    private readonly T _secondaryImplementation;
    private readonly Action<Exception> _onSecondaryImplementationInvokeError;

    public DualInvocation(T secondaryImplementation, Action<Exception> onSecondaryImplementationInvokeError = null)
        _secondaryImplementation = secondaryImplementation;
        _onSecondaryImplementationInvokeError = onSecondaryImplementationInvokeError ?? (ex => Trace.WriteLine(ex));

    public void Intercept(IInvocation invocation)
        var returnType = invocation.Method.ReturnType;
        var att = invocation.Method.GetCustomAttributes(typeof(IgnoreInterceptAttribute), true);
        if (att.Any())

        if (returnType != typeof(void))
            att = invocation.Method.GetCustomAttributes(typeof(DualInvocationAttribute), true);
            if (!att.Any())

        var methodInfo = invocation.Method;
            methodInfo.Invoke(_secondaryImplementation, invocation.Arguments);
        catch (Exception ex)

- You might wonder about how to use this class, take a look at the unit tests. You should probably do something either in code or autofac xml config, similar to what I did in the SetUp:
- Here is a dummy interface to demonstrate the Autofac registrations
public interface ISomeService
    void Update();

    void Update(int userId);

    void Update(int userId, bool multi);

    int GetUser(int userId);

    bool DeleteUser(int userId);

- And here is the tests, the tests are using NSubstitute to mock the interface. (Pretty similar to Moq, etc)
public class MethodIntercept
    private ISomeService primary,

    private IContainer container;
    private Action<Exception> exceptionHandler;

    public void RegisterAutofacComponents()
        primary = Substitute.For<ISomeService>();
        secondary = Substitute.For<ISomeService>();

        var builder = new ContainerBuilder();


               .InterceptedBy(typeof (DualInvocation<ISomeService>));

               .WithParameters(new Parameter[]
                   new ResolvedParameter((p, c) => p.Name == "secondaryImplementation", (p, c) => c.ResolveNamed("secondary", typeof (ISomeService))),
                   new NamedParameter("exceptionHandler", exceptionHandler)

        container = builder.Build();

    public void Should_invoke_secondary_method_if_it_is_not_ignored()
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action

        // Assert

    public void Should_invoke_secondary_overload_method_if_it_is_not_ignored()
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action

        // Assert

    public void Should_invoke_secondary_overload_method_if_it_is_decorated_by_DualInvocationAttribute()
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action

        // Assert

    public void Should_not_invoke_secondary_method_if_it_is_not_void()
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action

        // Assert

    public void Should_not_invoke_secondary_method_if_it_is_ignored()
        // Arrange
        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action
        primaryService.Update(1, true);

        // Assert
        secondaryService.DidNotReceive().Update(Arg.Any<int>(), Arg.Any<bool>());

    public void Should_not_throw_exception_if_cannot_invoke_secondary_method()
        // Arrange
        exceptionHandler = ex => Trace.WriteLine(ex);
        secondary.When(x => x.Update()).Do(callInfo => { throw new Exception(); });

        var primaryService = container.ResolveNamed<ISomeService>("primary");
        var secondaryService = container.ResolveNamed<ISomeService>("secondary");

        // Action

        // Assert

- I reckon this approach is clean, simple and It works on my machine . Cheers.

Thursday, July 26, 2012

Priority With RabbitMQ implementation in .NET

- Follow up several posts about RabbitMQ and Burrow.NET, this post is about the Priority implementation for RabbitMQ in Burrow.NET. Actually, We have it implemented in the library a few weeks ago. However, the code probably has bugs and we need more time to tune and fix those bugs. At this point, I'm confident to say the implementation is quite stable and It's time for a blog post.

- Basically, the implementation is following this post. We split the logical queue to several physical queues and use a memory queue to aggregate messages from those physical queues. The in-memory-queue is implemented using the IntervalHeap from C5 library and the code rests in side Burrow.Extras package as I hope RabbitMQ will support priority in the future. If they do, it will be the good time to get rid of this implementation without modifying much code in core Burrow.NET core. - We've followed the same pattern mentioned in Doug Barth post. We let each physical queue decide how much unacked messages can be delivered to client using PrefetchSize so if you have n priority, the internal queue will have capacity of n x PrefetchSize. It will block RabbitMQ from sending more messages to client if the internal memory queue is full so the client has to process and ack messages from high priorty to low priority order and then get more messages.

- The physical queues have the same prefetch size and we would recommend to keep the prefetch size approximately as double as the processing velocity. if you set prefetsize too high compare to the processing velocity, the only drawback we can think of is that the internal memory queue will unnecessary contain so much messages, and that could lead to high memory consumption.

- In our production code, we need 5 priorities so there will be 5 threads created to consume messages from those physical queues. We got several issues with ThreadPool and TPL in our production environment as the window service was getting slow after a while running. We figured out that should never used ThreadPool for long running processing. I'll post about this topic later in Aug but for now if you have a long running window service and it takes time plus lot of IO operations to process your messages, use normal Thread instead of threads from pool. In order to do that, set following line at your BootStrapper:
Global.DefaultTaskCreationOptionsProvider = () => TaskCreationOptions.LongRunning;

- This implementation uses an exchange type Header to delivery messages with a priority value to the expected queue. Someone said the exchange Header has lower performance than the others but I'm think it's good because I do need the messages alive after server reboot, that means I need the messages to be persited to disk by setting the PersistentMode to true. That eventually reduces the maximum potential performance of RabbitMQ as mentioned in their docs. Anyway, it's not very difficult to switch to different exchange type such as Topic or Direct if we really need.

- So, here are the steps to setup and consume from the priority queues:

1/ Declare exchange and queues

ExchangeSetupData exchangeSetupData = new HeaderExchangeSetupData();
QueueSetupData queueSetupData = new PriorityQueueSetupData(3)
    SubscriptionName = "YourApplicationName", 

var connectonString = ConfigurationManager.ConnectionStrings["RabbitMQ"].ToString();
var environment = "PROD";

Func<string, string, IRouteFinder> factory = (environment, exchangeType) => new YourRouteFinderImplementation();
var setup = new PriorityQueuesRabbitSetup(factory, Global.DefaultWatcher, connectonString, environment);
setup.SetupExchangeAndQueueFor<YourMessageType>(exchangeSetupData, queueSetupData);

2/ Publish a priority message

using(var tunnel = RabbitTunnel.Factory.WithPrioritySupport().Create().WithPrioritySupport())
    tunnel.SetSerializer(new JsonSerializer());
    tunnel.SetRouteFinder(new YourRouteFinderImplementation());

    uint priority = 3;

    tunnel.Publish(new YourMessageType() , priority);     

3/ Consume from a logical queue with priority support asynchronously

const ushort maxPriorityLevel = 3;
Global.PreFetchSize = 64;
var tunnel = RabbitTunnel.Factory.WithPrioritySupport()

tunnel.SubscribeAsync<YourMessageType>("YourApplicationName", maxPriorityLevel, msg => {
    // Process message here

4/ Consume from a logical queue with priority support synchronously

MessageDeliverEventArgs arg = null;
var subscriber = tunnel.Subscribe<YourMessageType>("YourApplicationName", maxPriorityLevel, (msg, eventArgs) =>
    arg = eventArgs;
    // Process message here 

// Later on
subscriber.Ack(arg.ConsumerTag, arg.DeliveryTag);

Hope it can help someone's problem

Monday, July 23, 2012

DotNet Hotfix KB 2640103

I've a window service which has been running pretty well for a long time. Recently I have installed a .NET framework extended update and it probably caused this error:

Description: The process was terminated due to an internal error in the .NET Runtime at IP 6B484BC2 (6B300000) with exit code 80131506

This error message leads me to this support page: It seems to be so many people got the same problem and they've been trying to download this hot fix. I think Microsoft has not officially released it yet.

So if you met the same problem and need the hot fix, download it here, more info:

The problem seems to be fixed after I've installed it as my windows service has been running smoothly for 2 days without crashing. It used to randomly crash within a day.

Goood luck.

Thursday, June 14, 2012

Programmatically create RabbitMQ Exchange and Queue with Burrow.NET

- As mentioned in previous posts, IRouteFinder is an interface defined in Burrow.NET to help publishing the message to correct target. If you have many message types, potentially you will need many exchanges and queues. And if it's true, setup everything for different environments or different machines is not an interesting task. So with Burrow.NET we can programmatically create these exchanges, queues and bind them very easy.

- What you need is an implementation of IRouteFinder which should define your exchange names, queues name based on message type.
class TestRouteFinder : IRouteFinder
    public string FindExchangeName<T>()
        return "Burrow.Exchange";

    public string FindRoutingKey<T>()
        return typeof(T).Name;

    public string FindQueueName<T>(string subscriptionName)
        return string.IsNullOrEmpty(subscriptionName)
            ? string.Format("Burrow.Queue.{0}", typeof(T).Name)
            : string.Format("Burrow.Queue.{0}.{1}", subscriptionName, typeof(T).Name);

- Once having the route finder implemented, write a few lines of code like below:
var exchangeSetupData = new ExchangeSetupData{ExchangeType = "direct"};   
var queueSetupData = new QueueSetupData {SubscriptionName = "BurrowTestApp", MessageTimeToLive = 100000};

Func<string, string, IRouteFinder> factory = (environment, exchangeType) => new TestRouteFinder();
var setup = new RabbitSetup(factory, Global.DefaultWatcher, ConfigurationManager.ConnectionStrings["RabbitMQ"].ToString(), "TEST");

setup.SetupExchangeAndQueueFor<YourMessageA>(exchangeSetupData, queueSetupData);
setup.SetupExchangeAndQueueFor<YourMessageB>(exchangeSetupData, queueSetupData);
setup.SetupExchangeAndQueueFor<YourMessageC>(exchangeSetupData, queueSetupData);

- Ofcourse, if you have a predefined exchange and queue names but want to have the same settings for different box you can use built-in ConstantRouteFinder:
Func<string, string, IRouteFinder> factory = (environment, exchangeType) => new ConstantRouteFinder("exchangeName", "queueName", "routingKey");

- These code can be run multi times even though the exchanges and queues have've been created before. In practice, I use a application config key/value to determine to run these setup or not. If there have been existing exchanges, queues with the same names but the setting details like ExchangeType, MessageTimeToLive, Durable are different, running the code against the same server will print out the warning messages that would tell you the settings are different. Please note that different exchange type will lead to unexpected results.

- For someone who like to write integration tests which need to create the exchanges, queues before a test and destroy everything after the test, you can use the Destroy method:
Func<string, string, IRouteFinder> factory = (x, y) => new TestRouteFinder();
var setup = new RabbitSetup(factory, Global.DefaultWatcher, ConfigurationManager.ConnectionStrings["RabbitMQ"].ToString(), "TEST");

setup.Destroy<YourMessageA>(exchangeSetupData, queueSetupData);
setup.Destroy<YourMessageB>(exchangeSetupData, queueSetupData);
setup.Destroy<YourMessageC>(exchangeSetupData, queueSetupData);

- This sample code is also available in Githup project


Wednesday, June 13, 2012

How I Mock Mongo Collection?

- I have been working with Mongo quite alot recently and writing my service classes using mongo without testing is such a pain in the neck. So today, I'll post about how I mock mongo collection for my unit testings.
- Given that I have a ReminderService like below:
public class ReminderService
    private readonly MongoCollection<User> _mongoCollection;
    private readonly INotificationService _notificationService;

    public ReminderService(MongoCollection<User> mongoCollection, INotificationService notificationService)
        _mongoCollection = mongoCollection;
        _notificationService = notificationService;

    public void Send()
        var notActivatedUsers = _mongoCollection.FindAs<User>(Query.EQ("activated", false)).ToList();
        notActivatedUsers.ForEach(user => _notificationService.Send(new ActivationReminder(user)));

- It has 1 method Send which needs to be unit tested. This method will find all users who have not their activated account and then send reminder emails to them. This service class is oviously designed for unit testing as it has 2 dependencies: a mongo collection and an interface of INotificationService. To mock the interface, it's straightforward , isn't it? So our job today is mocking this mongo collection.

- Before mocking a class like mongo collection, I often check whether it has virtual methods because the thing we need to mock here is the method FindAs. By pressing F12 on the MongoCollection class to go to assembly metadata, we can easily see that this method is virtual and return a MongoCursor.
// Summary:
//     Returns a cursor that can be used to find all documents in this collection
//     that match the query as TDocuments.
// Parameters:
//   query:
//     The query (usually a QueryDocument or constructed using the Query builder).
// Type parameters:
//   TDocument:
//     The type to deserialize the documents as.
// Returns:
//     A MongoDB.Driver.MongoCursor<TDocument>.
public virtual MongoCursor<TDocument> FindAs<TDocument>(IMongoQuery query);

- Hmm, it's getting more complicated here because this method does not return an enumerable or collection but a MongoCursor<TDocument> and that type inherits from IEnumerable<TDocument>. So it's telling me that when we call mongoCursor.ToList(), behind the scene, it will call GetEnumerator(). And certainly, checking class MongoCursor, I see that it's methods are all virtual so it's really open for unit testing.

- To wrap up, the idea will be creating a mock cursor to return expected items and let it be the result of method FindAs on our mock collection. There are other things to note here: mocking a class is not similar to mocking an interface since it has constructor. That means we have to provide the constructor parameters to create a mock object of MongoCollection class. The parameters here are MongoDatabase and MongoCollectionSettings<TDefaultDocument>:
// Summary:
//     Creates a new instance of MongoCollection. Normally you would call one of
//     the indexers or GetCollection methods of MongoDatabase instead.
// Parameters:
//   database:
//     The database that contains this collection.
//   settings:
//     The settings to use to access this collection.
public MongoCollection(MongoDatabase database, MongoCollectionSettings<TDefaultDocument> settings);

- To have a MongoDatabase we need MongoServer and MongoDatabaseSettings and so on. So honestly it's not really simple. I don't want my unit test to talk to real mongo server while running so I must ensure everything runs in memory. I had to create a TestHelper class for these thing, ofcourse for reuse reason:
public static class MongoTestHelpers
    private static readonly MongoServerSettings ServerSettings;
    private static readonly MongoServer Server;
    private static readonly MongoDatabaseSettings DatabaseSettings;
    private static readonly MongoDatabase Database;

    static MongoTestHelpers()
        ServerSettings = new MongoServerSettings
            Servers = new List<MongoServerAddress>
                new MongoServerAddress("unittest")
        Server = new MongoServer(ServerSettings);
        DatabaseSettings = new MongoDatabaseSettings("databaseName", new MongoCredentials("", ""), GuidRepresentation.Standard, SafeMode.True, true);
        Database = new MongoDatabase(Server, DatabaseSettings);

    /// <summary>
    /// Creates a mock of mongo collection
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <returns></returns>
    /// <remarks></remarks>
    public static MongoCollection<T> CreateMockCollection<T>()
        var collectionSetting = new MongoCollectionSettings<T>(Database, typeof(T).Name);
        var m = Substitute.For<MongoCollection<T>>(Database, collectionSetting);
        return m;

    public static MongoCollection<T> ReturnsCollection<T>(this MongoCollection<T> collection, IEnumerable<T> enumerable)
        var cursor = Substitute.For<MongoCursor<T>>(collection, Substitute.For<IMongoQuery>());
        cursor.When(x => x.GetEnumerator())
              .Do(callInfo => enumerable.GetEnumerator().Reset());// Reset Enumerator, incase the method is called multiple times


        // You properly need to setup more methods of cursor here
        return collection;

- There are some interesting points here:
  1. I'm using NSubstitute, hope you understand the syntax. I think the syntax is similar to other frameworks like Moq so it's not a problem if you use different library.
  2. The code will never talk to real database as I used "unittest" as server address.
  3. Method ReturnsCollection is an extension method of MongoCollection<T>. We'll use it to specify the items we simulate the result from mongo query. In practice, we might use fluent syntax on mongo cursor object such as SetSortOrder, SetLimit, etc so we definitely don't want these things alter our result. That's why you see there're alot of lines to mock the result for those methods to return the same cursor.

- So with this helper, the unit test will look like this:
public class ReminderServiceTests
    public void Send_should_send_notification_to_unactivated_users()
        // Arrange
        var mockCollection = MongoTestHelpers.CreateMockCollection<User>();
        mockCollection.ReturnsCollection(new List<User>
            new User {IsActivated = false}, 
            new User {IsActivated = false}
        var notificationService = Substitute.For<INotificationService>();
        var service = new ReminderService(mockCollection, notificationService);

        // Action

        // Assert

- Well, The purpose of this test is guaranteeing that the notification service will send reminders to all unactivated users from database. Because I'm using mongo so the final thing we want to mock is the result set returned by Mongo, we're not testing mongo query here. Hope this help.


Wednesday, June 6, 2012

Save a little time when writing unit test using VS.NET Macro

- As a unit testing passioner, I write a lot of unit tests for my code. But repeating adding test attributes to classes and methods, adding resharper ignore InconsistentNaming statement or adding AAA pattern are killing me. So I thought of making some awesome macros to save my time.
- Basically, when I add a test class, the file will originaly be like this:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MyProject
    class NewTestClass

- And I want it to be like this after a few key presses:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.VisualStudio.TestTools.UnitTesting;

// ReSharper disable InconsistentNaming
namespace MyProject
    public class NewTestClass
        public void Should_be_implemented_NOW()
            // Arrange

            // Action

            // Assert

// ReSharper restore InconsistentNaming

- This is archivable with following macros:
Sub CreateTestClassAttribute()
    DTE.ActiveDocument.Selection.Text = "public "
    DTE.ActiveDocument.Selection.Text = "[TestClass]"
    DTE.ActiveDocument.Selection.LineUp(False, 3)
    DTE.ActiveDocument.Selection.StartOfLine(vsStartOfLineOptions.vsStartOfLineOptionsFirstText, True)
    DTE.ActiveDocument.Selection.Text = "using Microsoft.VisualStudio.TestTools.UnitTesting;"

    DTE.ActiveDocument.Selection.Text = "// ReSharper disable InconsistentNaming"
    DTE.ActiveDocument.Selection.LineDown(False, 8)
    DTE.ActiveDocument.Selection.Text = "// ReSharper restore InconsistentNaming"
    DTE.ActiveDocument.Selection.LineUp(False, 3)

End Sub
Public Sub CreateTestMethod()
    DTE.ActiveDocument.Selection.Text = "[TestMethod]"
    DTE.ActiveDocument.Selection.Text = "public void Should_be_implemented_now()"
    DTE.ActiveDocument.Selection.Text = "{"

    DTE.ActiveDocument.Selection.Text = "// Arrange"

    DTE.ActiveDocument.Selection.Text = "// Action"

    DTE.ActiveDocument.Selection.Text = "// Assert"
    DTE.ActiveDocument.Selection.LineUp(False, 6)
End Sub

- Well, the steps are simple:
  1. In your VS.NET click Tool -> Macros -> Macro Explorer and paste these macros to the RecordingModule.
  2. Choose Tool again -> Options -> Keyboard. Type in Macros, choose these 2 macros and bind your favorite hot key for it like picture below:

- I choose Ctrl F - Ctrl C to make a test class with first test method and Ctrl F - Ctrl M to add another Test method.
- Hehe, It's DRY principle in unit testing. I has been using different Unit testing frameworks like NUnit, MBUnit, etc but in this post, I used VS.NET test framework. You properly need to alter the macros a little bit to fit your needs. Now enjoy your tests. Cheers

Sunday, June 3, 2012

Things you can easily change in Burrow.NET

- As I said in other posts, Burrow.NET aims to generic needs of any .NET projects that use RabbitMQ. I believe the built-in code has covered most of your requirements for common usages. However there are some things you can easily change without writing any new code or at least just a few lines of code. So in this post, I'll introduce about things you can make to suit your needs when working with RabbitMQ using Burrow.NET.

1/ Global.DefaultConsumerBatchSize

- The default value of this static property is 256. When you use ITunnel object to asynchronously subscribe to a queue, Burrow.NET use a internal Semaphore object when creating asynchronous Task to handle the messages from the queue. The amount of Tasks it create will depend on this value since it's used as the initialize value for the internal Semaphore. In the mean time, this value is also used to call IModel.BasicQos, which eventually sets the maximum amount of messages stay on the Unacknowledged list when they are consumed. That's the fetchsize value mentioned in RabbitMQ document and they advise to keep this number as high as double to the acknowledge speed. Therefore, you potentially should adjust this number to fit your need. Moreover, the amount of Tasks and eventualy Threads Burrow.NET create when consumming messages is in proportion to this value and I personally think creating more threads does not make the application run faster. I'll keep monitoring the way it works in my projects so I could split the value for the queue fetch size and the number of concurrent Tasks if it's neccessary but I think so far so good.

2/ Global.DefaultPersistentMode

- This value is equal to true by default and is used when creating the ITunnel object. It's eventually used when you publish messages. If the persistent mode is true, the message will be written to disk instead of being kept in memory. So if there is any problem with the server and you have the machine reboot, your messages will be safe. Otherwise, they will be removed when server reboots. However, keeping messages in memory could improve the performance. The DefaultPersistentMode is used to create ITunnel object in TunnelFactory, but you can change it anytime as the ITunnel has a method to do that.

3/ Global.DefaultErrorExchangeName and Global.DefaultErrorQueueName

- The built-in error handler will publish any unhandled exception wrapped in an object and publish to these exchange/queue so you can modify this value to route the error messages to their new home. The default value for the exchange is "Burrow.Exchange.Error" and "Burrow.Queue.Error" for the queue.

4/ Global.DefaultSerializer

- This property hold the default value of an JavaScriptSerializer which is quite slow in performance and cannot handle serialization very well. The reason to have this is to reduce the dependencies to 3rd libraries so you can just use Burrow.dll without Burrow.Extras.dll. However, I know everyone likes Json.NET include me so there is another implementation of ISerializer locates in Burrow.Extras package using Json.NET. It's not the fastest .NET serializer but fast enough for common usages. So I strongly recommend using this one in your project. Again, this value is used to create ITunnel and the ITunnel object will have a method to change that anytime.

Global.DefaultSerializer = new JsonSerializer();

var tunnel = RabbitTunnel.Factory.Create();
tunnel.SetSerializer(new JsonSerializer());

5/ Global.DefaultWatcher

- This one is used to write debug messages from Burrow.NET. The default value is ConsoleWatcher which writes messages to console so if you're writing a console app, you will easily see those messages. You properly want to change it to anything else such as Log4Net. To do that, simply implement IRabbitWatcher using Log4net and change the DefaultWatcher when app start.

Global.DefaultWatcher = new Log4NetRabbitWatcher();

6/ Global.DefaultCorrelationIdGenerator

- This one is used to generate the CorrelationId when you publish a message. The default implementation will generate a Guid and I think it's pretty enough for an Id. You don't have to change that but just do it if you have time and good reason

7/ Global.DefaultTypeNameSerializer

- Like the CorrelationId, this one is used to generate a string to determine the message when you publish messsags to RabbitMQ. I can think of a case we might need this value is when you just publish any type of message to the queue. When consuming the messages, base on this value you can know the type and deserialie the object. But honestly I've never tried this, neither used this value so if you don't need it in your project, leave it.

- In conclusion, I put these things in the Global class just to make it's easier to change the behavior of the ITunnel without touching to the TunnelFactory or calling methods on ITunnel object. Things like DefaultWatcher, DefaultTypeNameSerializer and DefaultCorrelationIdGenerator are the bare minimum needs for Burrow.NET and you can simply use the library without worrying about those values. Again, the only thing I suggest to take care of is the DefaultSerializer, just change it to JsonSerializer for better performance.


Custom Burrow.NET TunnelFactory & RabbitMQ

I/ Default TunnelFactory

- When using Burrow.NET, one of the important thing to create is a Tunnel by using TunnelFactory:

var tunnel = RabbitTunnel.Factory.Create();

- The Factory static property of RabbitTunnel keeps a default instance of TunnelFactory class. The TunnelFactory has 4 overloads to create the ITunnel object:

public virtual ITunnel Create();
public virtual ITunnel Create(string connectionString);
public virtual ITunnel Create(string connectionString, IRabbitWatcher watcher):
public virtual ITunnel Create(string hostName, string virtualHost, string username, string password, IRabbitWatcher watcher):

- They're all virtual so if you want to implement a custom ITunnel, you have things to override. Personally, I don't think of any cases to make a custom ITunnel but just in case you want to change some thing . These 4 methods are quite straight forward and I think the first one is the simplest one to use since it'll take the RabbitMQ connection string from app.config and create an built-in ITunnel object with built-in dependencies.

- I intentionally don't create an ITunnelFactory but only this base class. The reason is I tend to keep only 1 type of TunnelFactory in use in the whole AppDomain. I mean if you create a CustomTunnelFactory which inherits from class TunnelFactory, you just need to initialize an instance of your class somewhere and this action will automatcally set RabbitTunnel.Factory to this new factory. That's the only way to change the type of RabbitTunnel.Factory since it does not have public setter. Well you still can create any static classes that have similar methods to create the ITunnel if you want but in my opinion, creating tunnel objects from RabbitTunnel.Factory looks better.

II/ Implement a custom TunnelFactory and make it as default

- Let's say you want to replace default DefaultRouteFinder of Burrow.NET and set it your one as the route finder whenever you create an ITunnel. There are at least 2 ways to do that. The first way is creating a custom Factory and override the last method of 4 methods above since that built-in method always use DefaultRouteFinder as RouteFinder. The second way is using DependencyInjectionTunnelFactory. I'll talk about the 2nd way later in this post. There is another simple method to change the RouteFinder of an ITunnel, that is using the method SetRouteFinder of ITunnel but in this topic, we're talking about make it as default RouteFinder.

- This is how to implement the first method:

public class CustomTunnelFactory : TunnelFactory
    public override ITunnel Create(string hostName, string virtualHost, string username, string password, IRabbitWatcher watcher)
        var rabbitWatcher = watcher ?? Global.DefaultWatcher;
            var connectionFactory = new RabbitMQ.Client.ConnectionFactory
                                            HostName = hostName,
                                            VirtualHost = virtualHost,
                                            UserName = username,
                                            Password = password

            var durableConnection = new DurableConnection(new DefaultRetryPolicy(), rabbitWatcher, connectionFactory);
            var errorHandler = new ConsumerErrorHandler(connectionFactory, Global.DefaultSerializer, rabbitWatcher);
            var msgHandlerFactory = new DefaultMessageHandlerFactory(errorHandler, rabbitWatcher);
            var consumerManager = new ConsumerManager(rabbitWatcher, msgHandlerFactory, Global.DefaultSerializer, Global.DefaultConsumerBatchSize);

            return new RabbitTunnel(consumerManager,
                                    new CustomRouteFinder(), 

- And here is how to register this as default and use it:

// Put this line somewhere when app start:
new CustomTunnelFactory();

// Create your tunnel:

// Hmmm, I can't stop you doing this:
(new CustomTunnelFactory()).Create(); // But please, it's such an ugly code

- This first way could possibly require you to read the the original TunnelFactory source code to know how to replace the route finder. In fact the override method is exactly the same as the base one except the CustomRouteFinder thing. I personally don't like this way since duplication is always bad. That's why we have DependencyInjectionTunnelFactory.

III/ Dependency Injection Tunnel Factory

- Dependecy Injection is my favorite topic. I like to use libraries that allow me to change behavior easily and I tend to do the same thing in my code. The default RabbitTunnel class has some dependencies. As it's constructor requires following things:

  • IConsumerManager
  • IRabbitWatcher
  • IRouteFinder
  • IDurableConnection
  • ISerializer
  • ICorrelationIdGenerator

- Again, these interfaces have default implementations and I cannot think of any cases that have to change the behavior, but I would not close the door to change it. The potential behavior you possibly want to change could be the way this library handle error which is implemented in ConsumerErrorHandler class. There is a long chunk of dependencies to get there so properly using DependencyInjectionTunnelFactory is the easiest way to change the behavior without spending time reading the whole source code. The way DependencyInjectionTunnelFactory works is that it will try to resolve the object it requires. If it cannot find any registed dependencies, it'll use the default one.

- To use this factory, you have to implement IBurrowResolver interface. Basically, it exposes methods like some popular IOC library like Autofac or StructureMap. So if you use Autofac in your project, simply imlement IBurrowResolver and wrap the Autofac's IContainer inside, delegate all methods of IBurrowResolver to IContainer.

- After have the IBurrowResolver implemented, here is how to use it:

RabbitTunnel.Factory.RegisterResolver(new AutofacBurrowResolver());
var tunnel = RabbitTunnel.Factory.Create();

- That's it. Behind the scene, this extension method does exactly what I say above:

namespace Burrow.Extras
    public static class TunnelFactoryExtensions
        /// <summary>
        /// Call this method to register a dependency resolver and set default TunnelFactory to DependencyInjectionTunnelFactory
        /// </summary>
        /// <param name="factory"></param>
        /// <param name="burrowResolver"></param>
        public static void RegisterResolver(this TunnelFactory factory, IBurrowResolver burrowResolver)
            new DependencyInjectionTunnelFactory(burrowResolver);           

- These things are located in Burrow.Extras so you propably have to grab that nuget package to use it. Obviously, you have to register any dependencies before creating any ITunnel. For instance, if you're using Autofac and you want to change the type of RabbitWatcher when creating ITunnel you have to do this first:

var builder = new ContainerBuilder();

- And it's pretty similar for other IOC libraries. I use the above RabbitWatcher replacement thing as an example because there is definitely a simpler way to replace the default watcher type which is setting it to Global static class. Okey, It's enough for this topic. I would love to hear any suggestions & comments from you to make this better.


Thursday, May 31, 2012

RabbitMQ Exchanges & Queues naming convention with Burrow.NET

- If you just need only 1 exchange and 1 queue, naming convention should not be a big concern. Even that, you may need exchanges and queues for different environments. As in my projects, I often have 3 environments: DEV, UAT and PROD and I've been using following conventions for exchanges and queues:

Exchanges : ProjectName.E.<ExchangeType>.<Environment>.MessageTypeName
Queue        : ProjectName.Q.<Environment>.<ConsumerName>.MessageTypeName

- The consumer name in the Queue convention is optional. It makes sence to have different consumer names for different queues of the same message when Fanout exchange is used. Because at that case, we'll have many clients interested in the same message. We cannot let these clients subcribe to the same queue because only 1 of them will get the message while they all want the same thing. Fanout exchange in that sittuation is a best choice since it will deliver the same copy of the message you publish to all queues bind to it.

- In below example, we'll use exchange type Direct and we won't need ConsumerName in the queue name. Then a single message type will have 1 exchange and 1 queue bind to it for every environment. As a result, with message type "RequestMessage" we'll have

  • ProjectName.E.Direct.DEV.RequestMessage
  • ProjectName.E.Direct.UAT.RequestMessage
  • ProjectName.E.Direct.PROD.RequestMessage

  • ProjectName.Q.DEV.RequestMessage
  • ProjectName.Q.UAT.RequestMessage
  • ProjectName.Q.PROD.RequestMessage

- Creating queues and exchanges for every single message classes is not a good way as we will polute the server with a long list of queues and exchanges. Indeed, we realized that way as a problem and we've got a few better options. We can use 1 exchange for all those message types and bind different queues to the same exchange by different routing keys. So if we publish a message type RequestA to the exchange with routing key "RequestA", the message will go the queue "ProjectName.PROD.Q.RequestA" for instance. In our project, we had a need to use Fanout exchange as I mentioned above so we created a wrapper class like below:

public class MessageWrapper
    public object MessageObject {get; set;}

- So we'll have only 1 queue for each client. The message will be deserialized perfectly with the real object as the one we publish so the client can base on the type of MessageObject and determine what to do.

- Back to our story, with this convention in mind, what we have to do is implementing the IRouteFinder:

public class MyCustomRouteFinder : IRouteFinder
    private readonly string _environment;
    public MyCustomRouteFinder(string environment)
        if (string.IsNullOrEmpty(environment))
            throw new ArgumentNullException(environment);
        _environment = environment;

    public string FindExchangeName<T>()
        return string.Format("ProjectName.E.{0}.{1}.{2}", GetExchangeTypeFor<T>().ToUpper(), _environment, typeof(T).Name);

    public string FindRoutingKey<T>()
        return typeof(T).Name;

    public string FindQueueName<T>(string subscriptionName)
        if (GetExchangeTypeFor<T>() == ExchangeType.Fanout && string.IsNullOrEmpty(subscriptionName))
            throw new ArgumentException("subscriptionName");

        var subscription = GetSubscriptionTextFor<T>(subscriptionName);
        return !string.IsNullOrEmpty(subscription)
               ? string.Format("ProjectName.Q.{0}.{1}.{2}", _environment, subscription, typeof(T).Name)
               : string.Format("ProjectName.Q.{0}.{1}", _environment, typeof(T).Name);

    protected virtual string GetExchangeTypeFor<T>()
        var messageType = typeof(T);
        var messageTypesNeedDirectExchange = new List<Type> { typeof(MessageA), 
                                                              typeof(MessageB) };

        return messageTypesNeedDirectExchange.Contains(messageType)
             ? ExchangeType.Direct
             : ExchangeType.Fanout;

    protected virtual string GetSubscriptionTextFor<T>(string subscriptionName)
        var typesDontNeedSubscriptionName = new List<Type> { typeof(MessageA), 
                                                             typeof(MessageB) };

        return typesDontNeedSubscriptionName.Contains(typeof(T)) 
             ? null 
             : subscriptionName;

- Here is how to use it:

// Init
var environment = "DEV";
var routeFinder = new MyCustomRouteFinder(environment);
Global.DefaultSerializer = new JsonSerializer();

// Setup exchange and queues programatically
Func<string, string, IRouteFinder> routeFinderFactory = (environment, exchangeType) => routeFinder;
var setup = new RabbitSetup(routeFinderFactory, 
setup.SetupExchangeAndQueueFor<MessageA>(new ExchangeSetupData(), new QueueSetupData());
setup.SetupExchangeAndQueueFor<MessageB>(new ExchangeSetupData(), new QueueSetupData());

// Publish message 
using(var tunnel = RabbitTunnel.Factory.Create())
    tunnel.Publish<MessageA>(new MessageA());
    tunnel.Publish<MessageB>(new MessageB());

- And I swear by the moon and the stars in the sky that the messages will go to correct queues. In my last post, I'd mentioned the ConstantRouteFinder which can be used for a predefined exchange name, queue. However, in any cases, I think having a convention is better eventhough we have only 1 queue and exchange.

- As a best practice, we should not share the development environment with PROD or UAT environment so the "DEV" exchanges and queues should not be created on the same server as exchanges and queues for "UAT" and "PROD". We should install rabbitmq server on personal machine for development, it won't take so long.

Happy messaging.

Wednesday, May 30, 2012

Messaging with RabbitMQ and Burrow.NET

- As a .NET developer, what I have known about queueing world is only MSMQ because I've rarely had chances to using queues in any projects. I've known RabbitMQ since my last 2 projects and I'm pretty impressed by it's reliability and stability. RabittMQ is very easy to install on Windows or Linux. I wrote a blog post "Install RabbitMQ on SliTaz Linux" which I described step by step to make a RabbitMQ server on such a very tiny SliTaz Linux distro. It's obviously not the first blog post about RabbitMQ in .NET but I would take this as a opportunity to introduce a part of my project: Burrow.NET.

- Burrow.NET is a wrapper library of RabbitMQ.Client for .NET. I started this library from EasyNETQ source code and improve the project to fit our usages. However, I have made this library as generic for any project as possible. Basically, Burrow.NET will abstract away the complexity about connection handling, error handling, logging, and recently from version 1.0.8, I've finished the work around solution for priority queues because RabbitMQ is not natively supporting it. In this post, I'll shortly introduce the very basic features of Burrow.NET and I'll keep other interesting topics for later posts.

I/ Use cases

1/ Let's say you have to build an async web service to serve a huge amount of clients. A normal basic WCF service host on IIS or Window service does not have capability to handle the big amount of requests. So RabbitMQ acts as a middle layer to queue the requests, you can launch as many instance of "processor application" to handle the requests as you need. So this way can easily scale in long term.

2/ You're building a workflow base system that the output of a sub-system will be the input for other system. On other words, your sub-systems need a proper way to asynchronous communicate with each other. Since these sub-systems are different AppDomain so WCF supposes to be a good choice as first sign. However, one like me will not like WCF as an option due to it's configuration complexity. And in this use case, WCF seems not to be a proper solution as we may need scaling for a part or some parts of the system.

- In my personal opinion, whenever you need asynchronous communication between applications, queueing is the best and think about RabbitMQ :D. Well I don't say RabbitMQ is the best. There are alot of mature solutions before RabbitMQ but they came with cost. That's one of the reason why we have the awesome RabbitMQ software and I personaly think RabbitMQ is the best choice, it's fast, it's fashionable, it's free and it works :D.

II/ Burrow.NET

- As the above introduction, this code is a wrapper of .NET RabbitMQ.Client. If you don't want to use wrapper, simply grab RabbitMQ.Client.dll and use it if your need is as simple as publishing a message to a known exchange, queues. However, if you want something consistance like the naming conventions for the whole soultion since you may need alot of queues for the system; or if you expect your application auto reconnect after a connection problem, etc then Burrow.NET is a good candidate.

- I've been using it in my real project which is a big system with different applications comunicate via RabbitMQ channels, I had to make the code as efficient as possible such as establishing only 1 connection for each AppDomain to save the "socket descriptors", open only neccesary RabbitMQ channels to save the resources and consume the messages in the queue as fast as possible to keep the server's memory stay low.

Alright, enough promoting. Here is the main topic:

III/ Example

- Asume that you have to build a web service in user case 1. Your web service will not handle the messages immediately but will put the requests to a queue. There will be 1 or many instances of window services which subscribe to that queue and process them. It's not so complicated, isn't it. Firstly grab the RabbitMQ lib either from NUGET or from GITHUB.

- In other to push messages to RabbitMQ queue, we're gonna need exchange, queue and binding. I really hope you already known about these things. When you publish a message, you actually send it to the RabbitMQ exchange. And depend on the exchange type, also depend on the queue and binding between that queue and the exchange, your messages will be delivered to the proper queue. RabbitMQ has magic stuff to route your messages to the expected target and believe me, you will love that feature. So let's say we will use following exchange and queue:

exchange: RequestExchange
queue : RequestQueue

- The above exchange type will be "direct" and the "RequestQueue" will bind to it with a routing key such as "RequestMessage". That means if you publish a request object to that exchange with a routing key "RequestMessage", it will eventually go to queue "RequestQueue". There are several exchange types, each one will suit different needs and I really recommend you to read about it here:

- Since we use the above names which are not any convention at all, we'll use ConstantRouteFinder in Burrow.NET.

Here is the code to publish a message:
var routeFinder = new ConstantRouteFinder("RequestExchange", "RequestQueue", "RequestMessage");
var tunnel = RabbitTunnel.Factory.Create();
tunnel.SetSerializer(new JsonSerializer());
tunnel.Publish<RequestMessage>(new RequestMessage());
To subscribe from the above queue, it's very similar
var routeFinder = new ConstantRouteFinder("RequestExchange", "RequestQueue", "RequestMessage");
var tunnel = RabbitTunnel.Factory.Create();
tunnel.SetSerializer(new JsonSerializer());
tunnel.Subscribe<RequestMessage>("RabbitMQDemo", requestMsg => { 
    // Handle the requestMsg here
- Or if you want to asynchronous handle the messages in the queue using multi thread:
Global.DefaultConsumerBatchSize = 10; // Set the maximum 10 threads
tunnel.SubscribeAsync<RequestMessage>("RabbitMQDemo", requestMsg => { 
    // Handle the requestMsg here

- There something to note here, the code above create RabbitMQ connection using the connection string in your application config. You'll need something like following lines in your app.config:

    <add name="RabbitMQ" connectionString="host=localhost;username=guest;password=guest"/>

- If not, you can use other overloads of RabbitTunnel.Factory to provide the connection string. The "JsonSerializer" is the implementation of ISerializer using Json.NET which is one of the best json serializer for .NET for now. It's currently supporting serializing/deserializing complex object with inheritance hierachy so don't worry about it.

- Burrow.NET has a DefaultRouteFinder which will generate the queue name, exchange name following a default convention which I don't recommend to use. In fact, if you like this library, implement your own IRouteFinder and set that instance to the tunnel like the code above. So I'm saying that If you don't use ConstantRouteFinder as above, the code still work but it will create different exchange and queue following built-in convention. You can checkout the latest source code from GITHUB, there are 2 demo projects which demonstrate everything about the library, and 1 unit test project which reaches 99% test coverage ^^

- That's all you need for publishing and subscribing. Other stuff are already handled by the library and you can change those by implement serveral interfaces then inject to the tunnel using your own TunnelFactory. But that would be other topics, now it's time for me to go to bed :D


Sunday, May 20, 2012

Install RabbitMQ on SliTaz Linux

- I have played with RabbitMQ for a few months so I tried to install RabbitMQ on a Linux distro running on VMWare. I do this as setting up a development environment at home since I don't want to install so much stuff on my Win7 machine. And another reason, for fun with Linux things since It's been along time I didn't mess with Linux. Well I did some googling to find some tiny distros. I like simple things that just work and Ubuntu is so fancy for my purpose.

- SliTaz came out to be the highest rated tiny Linux so I gave it a try. It took me a whole day for trouble shooting problems when install RabbitMQ on this distro. There are quite a few problems such as Erlang version, OpenSSL for mochiweb, and daemon on SliTaz so I think it's worth to write down my experience incase someone likes to do something like this.

1/ The problems

  • RabbitMQ (current 2.8.2) requires Erlang R12B-3 ( However, mochiweb requires R13B01 which is not available for SliTaz. I could't find any exising packages for Erlang with that version so I had to build OTP version from source which I have never done before :D
  • There is not RabbitMQ package available for SliTaz, so I found out that we can convert from debian package and use within SliTaz.
  • Mochiweb requires OpenSSL so we not only need Erlang source but we also need OpenSSL source. If you check Erlang website, you will see they've mentioned that.
  • I want the RabbitMQ server autostart when system starts but the daemon script for Debian which is installed when you install rabbitmq package does not work. So I have to use "screen" to work around.
  • And many many more :D but I promish it will be less pain if you follow these below steps

2/ Installation steps:

- Assume that you have install SliTaz successfully (If using VMWare, try use IDE instead of SCSI, I got the problem finding HDD with that) and login as root, the first thing to do is enable SSH on the SliTaz server because it's alot easier using "putty" than "sakura". You'll need to copy and paste something and doing this with putty is damn simple. The SSH server in SliTaz is "dropbear" and it will not start automatically when system boot so you have to put that to SliTaz "daemon" script. In SliTaz, daemon scripts locate at /etc/init.d/ folder and the primary script is /etc/rcS.conf. Simply put "dropbear" to the end of RUN_DAEMONS to make it autostart. Save the file and reboot.

- When system reboot, SSH to SliTaz server and executing following commands to install make, perl, gcc and other tools required to compile erlang.
tazpkg get-install make
tazpkg get-install perl
tazpkg get-install gcc
tazpkg get-install slitaz-toolchain --forced

- Then we'll get erlang and openssl source, at this time they are latest version.
root@slitaz:/# mkdir /home/temp
root@slitaz:/# mkdir /home/temp/erlang
root@slitaz:/# mkdir /home/temp/openssl

root@slitaz:/# cd /home/temp/openssl
root@slitaz:/home/temp/openssl/# wget
root@slitaz:/home/temp/openssl/# gunzip -c openssl-1.0.1c.tar.gz | tar xf -

root@slitaz:/# cd /home/temp/erlang
root@slitaz:/home/temp/erlang/# wget
root@slitaz:/home/temp/erlang/# gunzip -c otp_src_R15B01.tar.gz | tar xf -
- Now we will config erlang with ssl tag (It's a requirement to use mochiweb)
root@slitaz:/home/temp/erlang/# ./configure --with-ssl=/home/temp/openssl/openssl-1.0.1c

- If everything is okey, you'll see this at the end:
config.status: creating Makefile
config.status: creating ../make/i686-pc-linux-gnu/
config.status: creating ../make/i686-pc-linux-gnu/
config.status: creating ../lib/ic/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/os_mon/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/crypto/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/orber/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/runtime_tools/c_src/i686-pc-linux-gnu/Makefile
config.status: creating ../lib/tools/c_src/i686-pc-linux-gnu/Makefile
config.status: creating i686-pc-linux-gnu/config.h
config.status: creating include/internal/i686-pc-linux-gnu/ethread_header_config.h
config.status: creating include/i686-pc-linux-gnu/erl_int_sizes_config.h
**********************  APPLICATIONS DISABLED  **********************

jinterface     : No Java compiler found
odbc           : ODBC library - link check failed

**********************  APPLICATIONS INFORMATION  *******************

wx             : wxWidgets not found, wx will NOT be usable

**********************  DOCUMENTATION INFORMATION  ******************

documentation  :
                 fop is missing.
                 Using fakefop to generate placeholder PDF files.


- We will make and install Erlang:
root@slitaz:/home/temp/erlang/# make

root@slitaz:/home/temp/erlang/# make install

- Now, it's time to grab and install rabbitmq package
root@slitaz:/home/temp/erlang/# cd /home/temp
root@slitaz:/home/temp/# mkdir rabbitmq
root@slitaz:/home/temp/# cd rabbitmq

root@slitaz:/home/temp/rabbitmq/# wget
root@slitaz:/home/temp/rabbitmq/# tazpkg convert rabbitmq-server_2.8.2-1_all.deb
root@slitaz:/home/temp/rabbitmq/# tazpkg install rabbitmq-server-2.8.2-1.tazpkg

- We are almost done. Let's test things by checking rabbitmqctl
root@slitaz:/# rabbitmqctl
id: unknown user rabbitmq
sh: rabbitmqctl: unknown operand
su: unknown user rabbitmq

- Seem like the converted package didn't create user rabbitmq and group rabbitmq for the server. So we have to do it manually:
root@slitaz:/# addgroup rabbitmq
root@slitaz:/# adduser -S rabbitmq -G rabbitmq

root@slitaz:/# rabbitmqctl status
Status of node rabbit@slitaz ...
Error: unable to connect to node rabbit@slitaz: nodedown


nodes in question: [rabbit@slitaz]

hosts, their running nodes and ports:
- slitaz: [{rabbitmqctl22218,45733}]

current node details:
- node name: rabbitmqctl22218@slitaz
- home dir: /home/rabbitmq
- cookie hash: vPSQrFu2FNfj5/GsGmwn5A==


- Awesome, it works. Now it's time to start the server:
root@slitaz:/# rabbitmq-server start
/usr/lib/rabbitmq/bin/rabbitmq-server: line 70: can't create /var/lib/rabbitmq/mnesia/ Permission denied
Activating RabbitMQ plugins ...
ERROR: Could not create dir /var/lib/rabbitmq/mnesia/rabbit@slitaz-plugins-expand (eacces)

- Hmm, the user we've created doesn't have permission on that folder. We have to chown:
root@slitaz:/# chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/mnesia

root@slitaz:/# rabbitmq-server start
Activating RabbitMQ plugins ...
0 plugins activated:

{"init terminating in do_boot",{{nocatch,{error,{cannot_log_to_file,"/var/log/rabbitmq/rabbit@slitaz.log",{error,eacces}}}},[{rabbit,ensure_working_log_handler,5,[]},{rabbit,ensure_working_log_handlers,0,[]},{rabbit,prepare,0,[]},{init,eval_script,8,[]},{init,do_boot,3,[]}]}}
init terminating in do_boot ()

- Hmm, we'll have to do the same thing for /var/log/rabbitmq
root@slitaz:/# chown -R rabbitmq:rabbitmq /var/log/rabbitmq

root@slitaz:/# rabbitmq-server start
Activating RabbitMQ plugins ...
0 plugins activated:

{"init terminating in do_boot",{{nocatch,{error,{cannot_log_to_file,"/var/log/rabbitmq/rabbit@slitaz.log",{error,eacces}}}},[{rabbit,ensure_working_log_handler,5,[]},{rabbit,ensure_working_log_handlers,0,[]},{rabbit,prepare,0,[]},{init,eval_script,8,[]},{init,do_boot,3,[]}]}}
init terminating in do_boot ()
root@slitaz:/# chown -R rabbitmq:rabbitmq /var/log/rabbitmq
root@slitaz:/# rabbitmq-server start
Activating RabbitMQ plugins ...
0 plugins activated:

+---+   +---+
|   |   |   |
|   |   |   |
|   |   |   |
|   +---+   +-------+
|                   |
| RabbitMQ  +---+   |
|           |   |   |
|   v2.8.2  +---+   |
|                   |
AMQP 0-9-1 / 0-9 / 0-8
Copyright (C) 2007-2012 VMware, Inc.
Licensed under the MPL.  See

node           : rabbit@slitaz
app descriptor : /usr/lib/rabbitmq/lib/rabbitmq_server-2.8.2/sbin/../ebin/
home dir       : /home/rabbitmq
config file(s) : (none)
cookie hash    : vPSQrFu2FNfj5/GsGmwn5A==
log            : /var/log/rabbitmq/rabbit@slitaz.log
sasl log       : /var/log/rabbitmq/rabbit@slitaz-sasl.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@slitaz
erlang version : 5.9.1

-- rabbit boot start
starting file handle cache server                                     ...done
starting worker pool                                                  ...done
starting database                                                     ...done
starting database sync                                                ...done
starting codec correctness check                                      ...done
-- external infrastructure ready
starting plugin registry                                              ...done
starting auth mechanism cr-demo                                       ...done
starting auth mechanism amqplain                                      ...done
starting auth mechanism plain                                         ...done
starting statistics event manager                                     ...done
starting logging server                                               ...done
starting exchange type direct                                         ...done
starting exchange type fanout                                         ...done
starting exchange type headers                                        ...done
starting exchange type topic                                          ...done
-- kernel ready
starting alarm handler                                                ...done
starting node monitor                                                 ...done
starting cluster delegate                                             ...done
starting guid generator                                               ...done
starting memory monitor                                               ...done
-- core initialized
starting empty DB check                                               ...done
starting exchange, queue and binding recovery                         ...done
starting mirror queue slave sup                                       ...done
starting adding mirrors to queues                                     ...done
-- message delivery logic ready
starting error log relay                                              ...done
starting networking                                                   ...done
starting direct client                                                ...done
starting notify cluster nodes                                         ...done

broker running

- Muahahaha, it's running without a problem except there are not any plugins available. Press CTRL-C and choose a to stop the server. Then enable the management plugin. Before that (this is optional), we can manually create a config file for rabbitmq: /etc/rabbitmq/rabbitmq.config. Hope you know how to use vi :D. Here i make it use 80% available guest memory and 5GB hard disk for data. You can find out these things on rabbitmq memory management website
    {mnesia, [{dump_log_write_threshold, 1000}]},
    {rabbit, [{tcp_listeners, [5672]}, {vm_memory_high_watermark, 0.8}, {disk_free_limit, 5368709120}]}

The dot at the end of the script is required :D
- To enable management plugin, simply execute
rabbitmq-plugins enable rabbitmq_management
- Finally, we have to let RabbitMQ server auto start when system boot. We'll use screen. First, download and install screen:
root@slitaz:/# tazpkg get-install screen

- And create the bash script to start rabbitmq in a screen session, i name it and put in /etc/init.d
export PATH
screen -dmS rabbitmq /usr/sbin/rabbitmq-server start

- Obviously, we have to chmod this file
root@slitaz:/# chmod +x /etc/init.d/

- And the last step, put it to /etc/init.d/
echo "Run rabbitmq startup script"

- Save and reboot, you have the awesome RabbitMQ server running on SliTaz. If you want to use it on internet, find dyndns service and do some port fowarding. The ports you need to foward are 5672 and 55672, default port for the server and mochiweb.