Per Brage's Blog

const String ABOUT = "Somehow related to code";

Category Archives: Event Broker

Event Broker using Rx and SignalR (Part 4: Solving the Scenario)

The time has come to start implementing the scenario. The scenario I invented, and then refactored from a science fiction novel into this simple online shop, which just happens to sell computers and components. As promised earlier, by the time you are reading this post the full source will be available on my GitHub repository, just follow the link below.

Configuration

This post will wrap up the series with the fluent configuration for our brokers; Website, ComponentStock, ComputerStock and Procurement. Let’s begin with the website since that’s where everything starts.

Website

The website creates the event broker by specifying a publishingUri, which will register the event broker so subscribers can connect to it. We can also see how a local subscription is added here, which we will use for sending out confirmation mails within our ProductOrderedEventConsumer. Then we start ordering products and publish the events using the OrderProduct() method, which just randomly creates ProductOrderedEvents.


            using (var eventBroker = new EventBroker("http://localhost:53000/"))
            {
                eventBroker.Locally().Subscribe(new ProductOrderedEventConsumer());

                Console.WriteLine("Press any key to start ordering products");
                Console.ReadKey();

                for (var i = 0; i < 30; i++)
                {
                    eventBroker.Publish(OrderProduct());
                    Thread.Sleep(200);
                }

                Console.ReadKey();
            }

Computer and Component stock

Both our computer and component stock register themselves to allow remote subscribers, while they also remotely subscribe to ProductOrderedEvent (through an event consumer). Internally, but not shown here, the event consumer for the computer stock makes use of a specification to filter the incoming events, whereas the component stock uses lambdas to show the difference.


            using (var eventBroker = new EventBroker("http://localhost:53001/"))
            {
                eventBroker.ConnectionStatus += (s, ev) => 
                    Console.WriteLine("Component stock: " + (ev.Success ? "Connected!" : ev.ErrorMessage));
                eventBroker
                    .Locally()
                        .Subscribe<ProductShippedEvent>(x => 
                            Console.WriteLine(String.Format("{0} order packed and shipped", x.ProductName)))
                    .Remotely("http://localhost:53000/")
                        .Subscribe(new ProductOrderedEventConsumer(eventBroker));

                Console.ReadKey();
            }


    public class ProductOrderedEventConsumer : EventConsumer<ProductOrderedEvent>
    {
        private readonly IEventBroker _eventBroker;
        private readonly Random _random;

        public ProductOrderedEventConsumer(IEventBroker eventBroker)
        {
            _eventBroker = eventBroker;
            _random = new Random();

            RegisterSpecification(new ItemsInLaptopOrComputerProductGroupSpecification());
        }

        public override void Handle(ProductOrderedEvent @event)
        {
            _eventBroker.Publish(new ProductShippedEvent
                                     {
                                         ProductName = @event.ProductName
                                     });

            if (_random.Next(10) > 5)
                _eventBroker.Publish(new ProductOrderPointReachedEvent()
                                         {
                                             ProductName = @event.ProductName
                                         });
        }
    }

Procurement

The last of our configurations! We set up remote subscriptions to our stocks and start listening for events telling us that the order point was reached, so procurement can order new products to fill up our stocks. Here you can also see an example of dual remote subscriptions added through the fluent API.


            using (var eventBroker = new EventBroker())
            {
                eventBroker.ConnectionStatus += (s, ev) => 
                    Console.WriteLine("Procurement: " + (ev.Success ? "Connected!" 
                                                                          : ev.ErrorMessage));
                eventBroker.Remotely("http://localhost:53001/")
                                .Subscribe(new ProductOrderPointReachedEventConsumer())
                            .Remotely("http://localhost:53002/")
                                .Subscribe(new ProductOrderPointReachedEventConsumer());

                Console.ReadKey();
            }

Result

By running the solution you will see four console windows, which will display information when they receive and process an event. I added links to images for each console window as an example of how they would look after completing 30 product orders. But a better example of the result would be to run the source available in my repository.

Website console window
Component stock console window
Computer stock console window
Procurement console window

Links

Source
Full source at my GitHub repository

Navigation
Part 1: A Fluent API
Part 2: Implementation
Part 3: Event Consumers
Part 4: Solving the Scenario

Event Broker using Rx and SignalR (Part 3: Event Consumers)

If you go back and look at the result from the first post of the series, you will notice that during registration of subscriptions, we add filter predicates and assign actions to be executed as events are consumed. This isn’t really such a good idea, since we will end up having all our logic in our configuration and it will clutter the code among other bad things! So what can we do about it?

Event Consumers

The solution to our problem is to outsource both filtering and processing into event consumers. For those of you familiar with CQRS, just think of a command handler that implements IHandle<T>, but for an event. With event consumers we end up with classes that handle each particular event. We will add simplicity, separation and if named correctly, end up with a more declarative code.

Let’s start with an interface describing an event consumer, and also an abstract class implementation that will provide a register method, which will help us with registering a Func<TEvent, Boolean> (e.g. a filter). The func is added to our multicast delegate property, which will allow us to add several filters on an event consumer.

    public interface IEventConsumer<in TEvent> : IHandle<TEvent>
    {
        Func<TEvent, Boolean> Filters { get; }
    }
    public abstract class EventConsumer<TEvent> : IEventConsumer<TEvent>
        where TEvent : IEvent
    {
        public Func<TEvent, Boolean> Filters { get; private set; }
 
        protected void Register(Func<TEvent, Boolean> filter)
        {
            if (Filters == null)
                Filters = filter;
            else
                Filters += filter;
        }
 
        public abstract void Handle(TEvent message);
    }

Let’s implement the ProductOrderedEventConsumer that will be used at the component stock in our scenario. When the component stock receives a ProductOrderedEvent, we will have to make sure it won’t act upon events for laptops and computers, as the computer stock will handle those two types of products. To accomplish this we just register a filter in the constructor to exclude all events for products in the laptop or computer product group. The handle method will now only process events matching our registered filter.

Speaking of the Handle method, it won’t do much more than publish a ProductShippedEvent, and at random, the ProductOrderPointReachedEvent which will simulate that our inventory is getting low on a particular product.

    public class ProductOrderedEventConsumer : EventConsumer<ProductOrderedEvent>
    {
        private readonly IEventBroker _eventBroker;
        private readonly Random _random;

        public ProductOrderedEventConsumer(IEventBroker eventBroker)
        {
            _eventBroker = eventBroker;
            _random = new Random();

            Register(x => x.ProductGroup != "Laptop" && x.ProductGroup != "Computer");
        }

        public override void Handle(ProductOrderedEvent @event)
        {
            _eventBroker.Publish(new ProductShippedEvent
            {
                ProductName = @event.ProductName
            });
            
            if (_random.Next(10) > 5)
                _eventBroker.Publish(new ProductOrderPointReachedEvent()
                {
                    ProductName = @event.ProductName
                });
        }
    }

Using Specification pattern to apply filtering

Nitpicker corner: Yes, specifications can be a bit cumbersome and yes, it does add a lot of code that could at times be written with a few simple lambdas. But remember the ubiquitous language? Specifications allows us to communicate about rules and predicates with everyone involved in developing our software! To be able to talk about the ‘Items in laptop or computer product group’ instead of the ‘x rocket x dot product group equals laptops or x dot product group equals computers’ – predicate, provides a lot of value not to be neglected! At the same time we need to be pragmatic about it and not implement specifications for every little equality check we create, as that would definitely overwhelm our code base. For this particular use-case we might be overdoing it, but I wanted to show a simple example of using Specifications

The specification pattern in its essence matches an element against a predicate, and respond with a boolean if the input data is satisfied by the predicate. This interface describes it rather well.

    public interface ISpecification<TElement>
    {
        Boolean IsSatisfiedBy(TElement element);
    }

There is also a simple abstract class (available in the full source) that I use to avoid repeating myself. There are far more advanced implementations of the specification pattern available online if you are interested, and I would suggest using one of them if you want to start using specifications in your code. Below is the implementation of the ItemsInLaptopOrComputerProductGroup specification I mentioned earlier, which we will use in our scenario to apply filtering of incoming events in the computer stock.

    public class ItemsInLaptopOrComputerProductGroupSpecification:Specification<ProductOrderedEvent>
    {
        public ItemsInLaptopOrComputerProductGroupSpecification()
        {
            AssignPredicate(x => x.ProductGroup == "Laptop" || x.ProductGroup == "Computer");
        }
    }

Now we have a specification that declares intent with a name instead of a lambda. All we need now is a way of registering this specification into our event consumer, and that can be done with the addition of this method.

        protected void Register(ISpecification<TEvent> specification)
        {
            Register(specification.IsSatisfiedBy);
        }

Registering a specification now becomes a single line of code within our event consumers.

        Register(new ItemsInLaptopOrComputerProductGroupSpecification());

Links

Source
Full source at my GitHub repository

Navigation
Part 1: A Fluent API
Part 2: Implementation
Part 3: Event Consumers
Part 4: Solving the Scenario

Event Broker using Rx and SignalR (Part 2: Implementation)

In part one I wrote about my reasoning and background of why I chose to have some fun by creating my own event broker. Then I took you through a scenario, the events that will make up the scenario, and then finished up the post by specifying a few interfaces that will make up the fluent API of the event broker.

Now it’s time to implement the first pieces of the broker. Let’s just throw ourselves at it, shall we?

Local event streams

Subscribing to a local event stream is something that can be implemented quite easily using Reactive Extensions (or LINQ to Events, if you prefer that name). Out of the box, Reactive Extensions provide two classes that will simplify the implementation dramatically, Subject<T> and EventLoopScheduler.
Basically, Subject<T> is both an Observable<T> and an Observer<T> that handles all publishing, subscriptions, disposing etc. for us, whereas the EventLoopScheduler class ensure event concurrency on designated thread.

So without further ado, here is the first draft of our Reactive Extensions event broker for local event streams, implemented with the interfaces described in part one of this series.


    public class EventBroker : IEventBroker
    {
        private readonly IScheduler _scheduler;
        private readonly ISubject<IEvent> _subject;
        
        public EventBroker()
        {
            _scheduler = new EventLoopScheduler();
            _subject = new Subject<IEvent>();
        }
 
        public IEventBroker Publish<TEvent>(TEvent @event) 
            where TEvent : IEvent
        {
            _subject.OnNext(@event);
            return this;
        }
 
        public ISubscribe Locally()
        {
            return this;
        }
 
        public ISubscribe Remotely(String remoteEventStreamUri)
        {
            throw new NotImplementedException();
        }
 
        ISubscribe ISubscribe.Subscribe<TEvent>(Action<TEvent> onConsume) 
        {
            ((ISubscribe)this).Subscribe(null, onConsume);
            return this;
        }
 
        ISubscribe ISubscribe.Subscribe<TEvent>(Func<TEvent, Boolean> filter, 
                                                Action<TEvent> onConsume) 
        {
            _subject.Where(o => o is TEvent).Cast<TEvent>()
                                .Where(filter ?? (x => true))
                                .ObserveOn(_scheduler)
                                .Subscribe(onConsume);
            return this;
        }
 
        public void Dispose()
        {
        }
    }

Remote event streams

To be able to subscribe to remote event streams, we need to communicate and send our events over the network. There are many ways of doing this, but fortunately, there is this library that I guess no one can have missed with all the buzz it has created lately, namely SignalR. Using SignalR it becomes very easy to create a publisher that subscribers can connect to and receive events. What makes it even better is that SignalR has built in support for Reactive Extensions, allowing us to reuse and provide the same API for both remote and local subscriptions.

SignalR does however not provide any safety against loss of data, clients will just pick up listening to the new events as they reconnect, and be totally oblivious about the events they missed while being disconnected. This is one of the major reasons behind the caveat in the first post of the series.

SignalR can be hosted in various ways but, for this example I will use the self-hosting server since I’m only using Console Applications in the example.

Starting SignalR Self-Hosting Server

SignalR provides a class named PersistentConnection that you inherit from and override various methods to handle negotiation, data received etc. But we will only use the broadcast feature of the server connection in this example, so we will only create an empty class, by the name EchoConnection.


    public class EchoConnection : PersistentConnection
    {
    }

We can now start the self-hosting server, extract the connection through the provided dependency resolver and then use this connection to broadcast events to all subscribers.
To start the event broker in a self-hosting mode we add an overloaded constructor taking a publishingUri, which the server will register itself on.


        private readonly IConnection _serverConnection;

        public EventBroker(String publishingUri)
            : this()
        {
            var server = new SignalR.Hosting.Self.Server(publishingUri);
            server.MapConnection<EchoConnection>("/echo");
            server.Start();
 
            var connectonManager = server.DependencyResolver.Resolve<IConnectionManager>();
            _serverConnection = connectonManager.GetConnection<EchoConnection>();
        }

Connecting to SignalR servers

We need to refactor a bit to be able to connect to a remote SignalR server (e.g. our remote event stream). First we add a stack of client connections meaning that this event broker is a subscriber to zero or more other event publishers. I guess a dictionary and naming publishers might be a good idea, but a Stack will suffice for this example.
Let’s add two private methods that will help us register subscriptions, both locally and remotely. First is the GetCurrentConnection() that peeks the top connection of the client stack, which will always contain the current remote we are adding subscriptions to. The other method GetCurrentObservable<TEVENT>() will, depending on if we are registering local or remote subscriptions, return an IObseravable<TEVENT> from either the Subject<T> or the client connection.

This code also shows how SignalR supports Reactive Extensions by using the AsObservable<>() on the current client connection.


        private readonly Stack _clientConnections;

        private IObservable<TEvent> GetCurrentObservable<TEvent>()
        {
            return _inLocalSubscriptionMode ? _subject.Where(o => o is TEvent).Cast<TEvent>()
                                            : GetCurrentConnection().AsObservable<TEvent>();
        }
 
        private Connection GetCurrentConnection()
        {
            return _clientConnections.Peek();
        }

But wait a minute, there is a problem here that we haven’t addressed yet. I don’t know if this is intended or bug in SignalR, but when you use AsObservable<TEvent>() on a client connection, that doesn’t mean that you will filter incoming events by TEvent, SignalR will rather try to deserialize every incoming event (that would be all events of all types published) into the TEvent type. Some events might work, some fail, some get mixed and this is definitely not how we want it to work. So the solution to this problem is to take care of the serialization and deserialization ourselves, and not rely on SignalR’s default serialization.

Json.NET has a TypeHandling setting that can be used to add the type name of an object as metadata under the property name $type. Let’s use this feature and verify this property on all incoming events. What we’ll do is to use the AsObservable() to get an IObservable<String> and upon that instance apply some type filtering, deserialization and then use AsObservable() again. The code below has been refactored to accomplish this.


    private IObservable<TEvent> GetCurrentObservable<TEvent>()
    {
        return _inLocalSubscriptionMode ? _subject.Where(o => o is TEvent).Cast<TEvent>()
                                        : GetCurrentConnection()
                                                    .AsObservable()
                                                    .Where(IsEventOfCorrectType<TEvent>)
                                                    .Select(JsonConvert.DeserializeObject<TEvent>)
                                                    .AsObservable();
    }

With those two methods in place we can refactor the main Subscribe method to use GetCurrentObservable<TEVent>(), and also add the subscription to our subscriptions collection, so we can close and dispose them when exiting the application. And by those small changes we can now subscribe to a remote event stream.


        ISubscribe ISubscribe.Subscribe<TEvent>(Func<TEvent, Boolean> filter, 
                                                Action<TEvent> onConsume)
        {
            _subscriptions.Add(GetCurrentObservable<TEvent>()
                                .Where(filter ?? (x => true))
                                .ObserveOn(_scheduler)
                                .Subscribe(onConsume));
            return this;
        }

Publishing

Servers are up, connections can be made. Now we just need to broadcast events.

The publishing method have two new lines of code, and those two last ones now broadcasts the event if the event broker was instantiated with a publishing Uri and by that registered itself as a self-hosting server. Remember from above that we also wanted to take care of our own serialization?


        public IEventBroker Publish<TEvent>(TEvent @event)
            where TEvent : IEvent
        {
            _subject.OnNext(@event);

            if (_serverConnection != null)
                _serverConnection.Broadcast(JsonConvert.SerializeObject(@event, 
                                                                        Formatting.None, 
                                                                        _includeTypeJsonSetting));

            return this;
        }

Links

Source
Full source at my GitHub repository

Navigation
Part 1: A Fluent API
Part 2: Implementation
Part 3: Event Consumers
Part 4: Solving the Scenario

Event Broker using Rx and SignalR (Part 1: A Fluent API)

Caveat: Please understand that this is a very simple event broker that was implemented during a short period of time, and that it does NOT include anything like message reliability, acknowledgement, queuing or similar things to prevent data loss. The remote feature is built on-top of SignalR and publishes events ‘live’ for anyone listening at the time of publising, meaning that subscribers can’t pick up where they left after a connection loss. The event broker was developed for fun and educational purposes only and is provided as is. I would suggest not using, nor basing your own implementation on this example for any mission critical application/software. If you need a reliable and performing service/message bus, I suggest you take a look at NServiceBus, MassTransit or similar products. Thank you!

Background

Recently, I started to build what will be a very small application based on the architectural pattern CQRS. When done, the application is supposed to be released for free and will be launched on a very small scale for a few select people! At the start of development, I was unsure of the hosting environment, and what technologies would be available to me (e.g. what could I install and things like that). So one of my basic requirements became to refrain from using any, or as few frameworks, server and/or third-party products as possible, unless I knew for sure that they could be embedded in my code, or would work at the specific hosting environment. Things have however changed, and the original hosting provider has been replaced with AppHarbor, or at the time of writing, still in the works of switching.

In CQRS we use an event store to persist all events which are the results of applying commands in our domain. At the time you persist them, you also want to dispatch the events to the read side, so they can be transformed and persisted in the read model. This is typically done with a service bus, it doesn’t have to, but it’s definitely preferred way of doing it from my point of view. Due to licensing, hosting, another use-case and the fact that I’m still trying to limit the use of third-party products, I decided I would have some fun developing something temporary and very basic, to handle the event dispatching to the read side (or read sides, as the application will have more than one read model), hence my EventBroker implementation was born. I deliberately choose the name EventBroker instead of ServiceBus, or MessageBroker, as I will only use this component to publish and broadcast events. Even so, in my opinion events are the only thing you should ever send over a service bus, but I guess that would be another discussion.

To provide a nice example I created a scenario that will be used throughout the series to base implementation on. The hard part was inventing a scenario described only by events, as we are not creating a full system-wide implementation with client, commands and the works.

Ordering a product – An example scenario

To begin with, I wrote this science fiction scenario taking place in the year 3068. Humans are scattered all over the universe since our beloved planet Earth ceased to exist during the 300 years’ war in the middle of previous millennia! But after a few pages, I realized I was writing a science fiction novel, rather than a blog post about an event broker. So I refactored my scenario into a small online computer shop, which sells computers and components over the internet (Exciting huh?!).The scenario consists of a web site, which ships all its orders from two stocks, where one stock handles off-the-shelf computers and laptops in various variations, whereas the other stock only handles computer components. To protect these two stocks from stockout, there is also procurement that resupplies the stocks as their products reaches their respectively order point.

There are likely hundreds or even thousands of events within this context, but for this example I narrowed them down to these three.

ProductOrderedEvent
Published by the website as soon as a product has been ordered (Yes, my shop has a horrible user experience as customers can only order one product at the time.) Both stocks subscribe to this event so they can prepare, pack and ship the ordered product. The website itself also subscribes to this event locally, to be able to send out order confirmation emails.

ProductShippedEvent
This event is published by both stocks as soon as an ordered product has been packed and shipped. In this example we only subscribe to this event locally for sending out shipping confirmation emails.

ProductOrderPointReachedEvent
Published by the stocks as inventory gets low on a certain product, and subscribed to by procurement to order resupplies.

The scenario and these three events will be enough to demonstrate the entirety of the event broker, but before implementing our broker we will be defining our fluent API through a few simple interfaces.

Defining our fluent API

Looking at the events above and their short descriptions, it’s not hard to notice that we want to subscribe to both local and remote event streams. And it doesn’t take much brain activity to realize that our local event subscriptions don’t need to be routed through any network stack. Hence we can divide the event broker’s subscriptions into two types, local and remote subscriptions. We also need to be able to filter events, as for example, the stocks are not interested in all ProductOrderedEvents, but rather the ones they can process and complete. We also want to publish events, but we don’t want to separate that into local and remote publishes. A publish is a publish, it shouldn’t care who is listening, may it be a local or remote subscription.

So let’s start with defining the interfaces that will fulfill the above requirements we derived out of our scenario.

IPublish
The IPublish interface defines our Publish method, but there isn’t really any options or method-chaining paths after you call the publish method. The return type could actually be just void, but we might as well return the IEventBroker interface so we can reset our path, and make all choices available again after a call to the publish method. This will also allow us to chain publish methods, for those small use-cases that would actually be useful.


    public interface IPublish
    {
        IEventBroker Publish<TEvent>(TEvent @event)
            where TEvent : IEvent;
    }

ISubscribe
The ISubscribe interface defines our subscription methods, and we currently have two of them. One that subscribes to a specific event and one that subscribes to a specific event that matches a predicate, our filter. In our example scenario we will use this type of filtering for our stocks when they subscribe to the ProductOrderedEvent, as one stock only wants information about computers and laptops, while the other stock wants events about all other products. Each Subscribe method will return the ISubscribe interface which will allow us to chain subscriptions. Also, the ISubscribe interface will be implemented explicitly to force the end user to use Locally() or Remotely(remoteEventStreamUri) methods first, and then add subscriptions, otherwise we wouldn’t know what to register our subscriptions upon.


    public interface ISubscribe : ISubscriptionSource
    {
        ISubscribe Subscribe<TEvent>(Action<TEvent> onConsume)
            where TEvent : IEvent;

        ISubscribe Subscribe<TEvent>(Func<TEvent, Boolean> filter, Action<TEvent> onConsume)
            where TEvent : IEvent;
    }

ISubscriptionSource
Did you notice that I added the ISubscriptionSource on the ISubscribe interface? By doing that, we will be able to switch registering subscriptions between local and one or more remote event streams without completing the statement.


    public interface ISubscriptionSource
    {
        ISubscribe Locally();
        ISubscribe Remotely(String remoteEventStreamUri);
    }

IEventBroker
Now the IEventBroker interface just needs to inherit from all the interfaces we defined above. Remember, ISubscriptionSource is included through our ISubscribe interface.


    public interface IEventBroker : IDisposable, IPublish, ISubscribe
    {
    }

We have every interface we need to start implementing the broker now, but that will have to wait until the next post, which will be ready and posted together with this one.

Result

By faking the event broker, our fluent API will now allow us to write statements like the one below, even though we haven’t even written a single line of implementation code yet. In the next part of the series we will make this work, and after that we will continue adding a few more features to the event broker in upcoming posts.

Example of chaining subscriptions


    eventBroker.Locally()
                    .Subscribe<ProductOrderedEvent>(Console.WriteLine)
                    .Subscribe<ProductOrderPointReachedEvent>(Console.WriteLine)
               .Remotely("http://localhost:53005/")
                    .Subscribe<ProductOrderedEvent>(Console.WriteLine);

Example of chaining our publish method


    eventBroker.Publish(new ProductOrderedEvent())
               .Publish(new ProductShippedEvent());

Links

Source
Full source at my GitHub repository

Navigation
Part 1: A Fluent API
Part 2: Implementation
Part 3: Event Consumers
Part 4: Solving the Scenario

Follow

Get every new post delivered to your Inbox.