Per Brage's Blog

const String ABOUT = "Somehow related to code";

Tag Archives: CQRS

Event Broker using Rx and SignalR (Part 1: A Fluent API)

Caveat: Please understand that this is a very simple event broker that was implemented during a short period of time, and that it does NOT include anything like message reliability, acknowledgement, queuing or similar things to prevent data loss. The remote feature is built on-top of SignalR and publishes events ‘live’ for anyone listening at the time of publising, meaning that subscribers can’t pick up where they left after a connection loss. The event broker was developed for fun and educational purposes only and is provided as is. I would suggest not using, nor basing your own implementation on this example for any mission critical application/software. If you need a reliable and performing service/message bus, I suggest you take a look at NServiceBus, MassTransit or similar products. Thank you!


Recently, I started to build what will be a very small application based on the architectural pattern CQRS. When done, the application is supposed to be released for free and will be launched on a very small scale for a few select people! At the start of development, I was unsure of the hosting environment, and what technologies would be available to me (e.g. what could I install and things like that). So one of my basic requirements became to refrain from using any, or as few frameworks, server and/or third-party products as possible, unless I knew for sure that they could be embedded in my code, or would work at the specific hosting environment. Things have however changed, and the original hosting provider has been replaced with AppHarbor, or at the time of writing, still in the works of switching.

In CQRS we use an event store to persist all events which are the results of applying commands in our domain. At the time you persist them, you also want to dispatch the events to the read side, so they can be transformed and persisted in the read model. This is typically done with a service bus, it doesn’t have to, but it’s definitely preferred way of doing it from my point of view. Due to licensing, hosting, another use-case and the fact that I’m still trying to limit the use of third-party products, I decided I would have some fun developing something temporary and very basic, to handle the event dispatching to the read side (or read sides, as the application will have more than one read model), hence my EventBroker implementation was born. I deliberately choose the name EventBroker instead of ServiceBus, or MessageBroker, as I will only use this component to publish and broadcast events. Even so, in my opinion events are the only thing you should ever send over a service bus, but I guess that would be another discussion.

To provide a nice example I created a scenario that will be used throughout the series to base implementation on. The hard part was inventing a scenario described only by events, as we are not creating a full system-wide implementation with client, commands and the works.

Ordering a product – An example scenario

To begin with, I wrote this science fiction scenario taking place in the year 3068. Humans are scattered all over the universe since our beloved planet Earth ceased to exist during the 300 years’ war in the middle of previous millennia! But after a few pages, I realized I was writing a science fiction novel, rather than a blog post about an event broker. So I refactored my scenario into a small online computer shop, which sells computers and components over the internet (Exciting huh?!).The scenario consists of a web site, which ships all its orders from two stocks, where one stock handles off-the-shelf computers and laptops in various variations, whereas the other stock only handles computer components. To protect these two stocks from stockout, there is also procurement that resupplies the stocks as their products reaches their respectively order point.

There are likely hundreds or even thousands of events within this context, but for this example I narrowed them down to these three.

Published by the website as soon as a product has been ordered (Yes, my shop has a horrible user experience as customers can only order one product at the time.) Both stocks subscribe to this event so they can prepare, pack and ship the ordered product. The website itself also subscribes to this event locally, to be able to send out order confirmation emails.

This event is published by both stocks as soon as an ordered product has been packed and shipped. In this example we only subscribe to this event locally for sending out shipping confirmation emails.

Published by the stocks as inventory gets low on a certain product, and subscribed to by procurement to order resupplies.

The scenario and these three events will be enough to demonstrate the entirety of the event broker, but before implementing our broker we will be defining our fluent API through a few simple interfaces.

Defining our fluent API

Looking at the events above and their short descriptions, it’s not hard to notice that we want to subscribe to both local and remote event streams. And it doesn’t take much brain activity to realize that our local event subscriptions don’t need to be routed through any network stack. Hence we can divide the event broker’s subscriptions into two types, local and remote subscriptions. We also need to be able to filter events, as for example, the stocks are not interested in all ProductOrderedEvents, but rather the ones they can process and complete. We also want to publish events, but we don’t want to separate that into local and remote publishes. A publish is a publish, it shouldn’t care who is listening, may it be a local or remote subscription.

So let’s start with defining the interfaces that will fulfill the above requirements we derived out of our scenario.

The IPublish interface defines our Publish method, but there isn’t really any options or method-chaining paths after you call the publish method. The return type could actually be just void, but we might as well return the IEventBroker interface so we can reset our path, and make all choices available again after a call to the publish method. This will also allow us to chain publish methods, for those small use-cases that would actually be useful.

    public interface IPublish
        IEventBroker Publish<TEvent>(TEvent @event)
            where TEvent : IEvent;

The ISubscribe interface defines our subscription methods, and we currently have two of them. One that subscribes to a specific event and one that subscribes to a specific event that matches a predicate, our filter. In our example scenario we will use this type of filtering for our stocks when they subscribe to the ProductOrderedEvent, as one stock only wants information about computers and laptops, while the other stock wants events about all other products. Each Subscribe method will return the ISubscribe interface which will allow us to chain subscriptions. Also, the ISubscribe interface will be implemented explicitly to force the end user to use Locally() or Remotely(remoteEventStreamUri) methods first, and then add subscriptions, otherwise we wouldn’t know what to register our subscriptions upon.

    public interface ISubscribe : ISubscriptionSource
        ISubscribe Subscribe<TEvent>(Action<TEvent> onConsume)
            where TEvent : IEvent;

        ISubscribe Subscribe<TEvent>(Func<TEvent, Boolean> filter, Action<TEvent> onConsume)
            where TEvent : IEvent;

Did you notice that I added the ISubscriptionSource on the ISubscribe interface? By doing that, we will be able to switch registering subscriptions between local and one or more remote event streams without completing the statement.

    public interface ISubscriptionSource
        ISubscribe Locally();
        ISubscribe Remotely(String remoteEventStreamUri);

Now the IEventBroker interface just needs to inherit from all the interfaces we defined above. Remember, ISubscriptionSource is included through our ISubscribe interface.

    public interface IEventBroker : IDisposable, IPublish, ISubscribe

We have every interface we need to start implementing the broker now, but that will have to wait until the next post, which will be ready and posted together with this one.


By faking the event broker, our fluent API will now allow us to write statements like the one below, even though we haven’t even written a single line of implementation code yet. In the next part of the series we will make this work, and after that we will continue adding a few more features to the event broker in upcoming posts.

Example of chaining subscriptions


Example of chaining our publish method

    eventBroker.Publish(new ProductOrderedEvent())
               .Publish(new ProductShippedEvent());


Full source at my GitHub repository

Part 1: A Fluent API
Part 2: Implementation
Part 3: Event Consumers
Part 4: Solving the Scenario