Running MassTransit within a Topshelf Windows Service

Published on Tuesday, 17 February 2015 by Russ Cam

Whenever I have a need to write a Windows Service, Topshelf is pretty much the first nuget package that I install to aid in the task. In a nutshell, Topshelf is a framework that makes writing Windows Services easier; you write a standard console application, add Topshelf to the mix and you now have an application that can be run as a console application whilst developing and debugging, and installed as a Windows Service when the need to deploy arises. It's a pretty frictionless experience, but there can be some a gotcha when using Topshelf in conjunction with MassTransit and Dependency Injection. I've experienced this a few times so thought I'd write it down here for next time!

Got the message?

I'm assuming that if you're reading this, you've heard of MassTransit and are very likely using it with Topshelf but in case you haven’t heard, MassTransit is a lightweight Message/Service Bus for .NET from the same guys that also wrote Topshelf. It is an open source service bus abstraction on top of messaging infrastructure that can be used in conjunction with MSMQ, RabbitMQ or Azure Service Bus (with plans for version 3 to support only RabbitMQ and Azure Service Bus). There are extensibility points to support more advanced features such as Sagas and the Routing Slip pattern with overall focus on being straightforward to use. Let’s see how MassTransit and Topshelf play together.

The Simple case

Following online documentation in configuring MassTransit (using RabbitMQ) with Topshelf, the simple way to configure the application is as follows:

using System.Text;
using MassTransit;
using MassTransit.NLogIntegration;
using Topshelf;

namespace MassTransitExample
{
    class Program
    {
        static void Main(string[] args)
        {
            HostFactory.Run(x =>
            {
                x.Service<IService>(s =>
                {
                    s.BeforeStartingService(sc => sc.RequestAdditionalTime(TimeSpan.FromSeconds(30)));
                    s.BeforeStoppingService(sc => sc.RequestAdditionalTime(TimeSpan.FromSeconds(30)));

                    s.ConstructUsing(() => new Service());

                    s.WhenStarted(service =>
                    {
                        service.Start();
                    });

                    s.WhenStopped(service =>
                    {
                        service.Stop();
                    });

                    s.WhenShutdown(service =>
                    {
                        service.Stop();
                    });
                });

                x.UseNLog();
                x.EnableShutdown();
                x.RunAsLocalSystem();
                x.StartAutomatically();
                x.SetServiceName("ServiceName");
                x.SetDisplayName("Service display name");
                x.SetDescription("Service description");
            });
        }
    }

    public class Service : IService
    {
        public void Start()
        {
            Bus.Initialize(c =>
            {
                c.UseRabbitMq();
                c.ReceiveFrom("rabbitmq://localhost/topshelf-service");

                // register consumers here

                c.UseNLog();
            });
        }

        public void Stop()
        {
            Bus.Shutdown();
        }
    }

    public interface IService
    {
        void Start();

        void Stop();
    }
}

Running as a Console App yields

Console output

and installing as a service, starting, stopping and uninstalling yields

Console output of starting and stoppping service

This all works great for the simple case, but for a more complex and composable application we may want to use a Dependency Injection container such as Castle Windsor. Let’s have a look at that.

The DI Case

Taking what we did in the simple case and reworking it to use Castle Windsor for Dependency Injection, we get the following:

using System;
using Castle.Core.Logging;
using Castle.MicroKernel.Registration;
using Castle.MicroKernel.SubSystems.Configuration;
using Castle.Windsor;
using Castle.Windsor.Installer;
using MassTransit;
using MassTransit.NLogIntegration;
using Topshelf;

namespace MassTransitExampleDI
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            HostFactory.Run(x =>
            {
                IWindsorContainer container = null;

                x.Service<IService>(s =>
                {
                    s.BeforeStartingService(sc => sc.RequestAdditionalTime(TimeSpan.FromSeconds(30)));
                    s.BeforeStoppingService(sc => sc.RequestAdditionalTime(TimeSpan.FromSeconds(30)));

                    s.ConstructUsing(() =>
                    {
                        // container should be initialized in here according to Chris Patterson
                        // https://groups.google.com/d/msg/masstransit-discuss/Pz7ttS7niGQ/A-K7MTK8aiUJ
                        container = new WindsorContainer().Install(FromAssembly.This());
                        return container.Resolve<IService>();
                    });

                    s.WhenStarted(service => { service.Start(); });

                    s.WhenStopped(service =>
                    {
                        service.Stop();

                        if (container != null)
                        {
                            container.Dispose();
                        }
                    });

                    s.WhenShutdown(service =>
                    {
                        service.Stop();

                        if (container != null)
                        {
                            container.Dispose();
                        }
                    });
                });

                x.UseNLog();
                x.EnableShutdown();
                x.RunAsLocalSystem();
                x.StartAutomatically();
                x.SetServiceName("ServiceName");
                x.SetDisplayName("Service display name");
                x.SetDescription("Service description");
            });
        }
    }

    public class ServicesInstaller : IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {
            container.Register(
                Component.For<IService>().ImplementedBy<Service>(),
                Component.For<IServiceBus>().UsingFactoryMethod(kernel =>
                {
                    return ServiceBusFactory.New(c =>
                    {
                        c.UseRabbitMq();
                        c.ReceiveFrom("rabbitmq://localhost/topshelf-service");
                        c.Subscribe(x => x.LoadFrom(container));
                        c.UseNLog();
                    });
                }));
        }
    }

    public class Service : IService
    {
        private readonly IServiceBus _serviceBus;

        public Service(IServiceBus serviceBus)
        {
            _serviceBus = serviceBus;
        }

        public void Start()
        {
        }

        public void Stop()
        {
            _serviceBus.Dispose();
        }
    }

    public interface IService
    {
        void Start();

        void Stop();
    }
}

It's pretty similar to the non-DI version, except both the service and service bus are resolved by the Windsor container and the bus is already instantiated when it is passed to the service constructor. Running as a console application yields the same result as before so we won't bother with a screenshot for that, but when installing as a service, the following behaviour is exhibited

Console output of starting and stoppping service with Topshelf

Starting only half the time

The service installs successfully but fails to start every other time – consistently. It appears that ServiceBusFactory.New(() => {}) blocks the Start() method from returning "in good time" resulting in the Windows Service Manager interpreting the signal as a problem with starting the service. The end result is a message like this:

System.InvalidOperationException: Cannot start service ServiceName on com puter '.'. ---> System.ComponentModel.Win32Exception: The service did not respond to the start or control request in a timely fashion

--- End of inner exception stack trace --- at System.ServiceProcess.ServiceController.Start(String[] args) at Topshelf.Runtime.Windows.WindowsHostEnvironment.StartService(String serviceName, TimeSpan startTimeOut) at Topshelf.Hosts.StartHost.Run()

This seems to be a common problem so let’s look at some ideas for how we might go about solving the problem.

Lazy<T> boy

Windsor has the notion of lazily resolving components with an ILazyComponentLoader, meaning any service and component registered with Windsor can be resolved from the container as a Lazy<T>. Registering LazyOfTComponentLoader with the container using

container.Register(
   Component.For<ILazyComponentLoader>().ImplementedBy<LazyOfTComponentLoader>()
);

and then changing the service class to the following

public class Service : IService
{
    private readonly Lazy<IServiceBus> _lazyServiceBus;
    private IServiceBus _serviceBus;

    public Service(Lazy<IServiceBus> lazyServiceBus)
    {
        _lazyServiceBus = lazyServiceBus;
    }

    public void Start()
    {
        _serviceBus = _lazyServiceBus.Value;
    }

    public void Stop()
    {
        _serviceBus.Dispose();
    }
}

results in the service successfully starting everytime. It better resembles the non-DI version than the previous example as the service bus creation does not occur until the Value property is accessed on the Lazy<ServiceBus>. But what if you're not using Windsor or the container that you're using does not support lazy resolution?

Threads ahoy!

We can instantiate the service bus on a separate thread to the thread on which the Start() method is called. What may need to pass a reference to the container to resolve the service from. In the case of Windsor, we can pass the IKernel

public class Service : IService
{
    private readonly IKernel _kernel;
    private IServiceBus _serviceBus;
    private Thread _thread;

    public Service(IKernel kernel)
    {
        _kernel = kernel;
    }

    public void Start()
    {
        _thread = new Thread(InternalStart);
        _thread.Start();
    }

    private void InternalStart()
    {
        _serviceBus = _kernel.Resolve<IServiceBus>();
    }

    public void Stop()
    {
        _thread.Join();
        _serviceBus.Dispose();
    }
}

Using either Lazy<ServiceBus> or a separate thread mitigate the problem. Know any other/better ways?


Comments

comments powered by Disqus