Per Brage's Blog

const String ABOUT = "Somehow related to code";

Event Broker using Rx and SignalR (Part 2: Implementation)

In part one I wrote about my reasoning and background of why I chose to have some fun by creating my own event broker. Then I took you through a scenario, the events that will make up the scenario, and then finished up the post by specifying a few interfaces that will make up the fluent API of the event broker.

Now it’s time to implement the first pieces of the broker. Let’s just throw ourselves at it, shall we?

Local event streams

Subscribing to a local event stream is something that can be implemented quite easily using Reactive Extensions (or LINQ to Events, if you prefer that name). Out of the box, Reactive Extensions provide two classes that will simplify the implementation dramatically, Subject<T> and EventLoopScheduler.
Basically, Subject<T> is both an Observable<T> and an Observer<T> that handles all publishing, subscriptions, disposing etc. for us, whereas the EventLoopScheduler class ensure event concurrency on designated thread.

So without further ado, here is the first draft of our Reactive Extensions event broker for local event streams, implemented with the interfaces described in part one of this series.


    public class EventBroker : IEventBroker
    {
        private readonly IScheduler _scheduler;
        private readonly ISubject<IEvent> _subject;
        
        public EventBroker()
        {
            _scheduler = new EventLoopScheduler();
            _subject = new Subject<IEvent>();
        }
 
        public IEventBroker Publish<TEvent>(TEvent @event) 
            where TEvent : IEvent
        {
            _subject.OnNext(@event);
            return this;
        }
 
        public ISubscribe Locally()
        {
            return this;
        }
 
        public ISubscribe Remotely(String remoteEventStreamUri)
        {
            throw new NotImplementedException();
        }
 
        ISubscribe ISubscribe.Subscribe<TEvent>(Action<TEvent> onConsume) 
        {
            ((ISubscribe)this).Subscribe(null, onConsume);
            return this;
        }
 
        ISubscribe ISubscribe.Subscribe<TEvent>(Func<TEvent, Boolean> filter, 
                                                Action<TEvent> onConsume) 
        {
            _subject.Where(o => o is TEvent).Cast<TEvent>()
                                .Where(filter ?? (x => true))
                                .ObserveOn(_scheduler)
                                .Subscribe(onConsume);
            return this;
        }
 
        public void Dispose()
        {
        }
    }

Remote event streams

To be able to subscribe to remote event streams, we need to communicate and send our events over the network. There are many ways of doing this, but fortunately, there is this library that I guess no one can have missed with all the buzz it has created lately, namely SignalR. Using SignalR it becomes very easy to create a publisher that subscribers can connect to and receive events. What makes it even better is that SignalR has built in support for Reactive Extensions, allowing us to reuse and provide the same API for both remote and local subscriptions.

SignalR does however not provide any safety against loss of data, clients will just pick up listening to the new events as they reconnect, and be totally oblivious about the events they missed while being disconnected. This is one of the major reasons behind the caveat in the first post of the series.

SignalR can be hosted in various ways but, for this example I will use the self-hosting server since I’m only using Console Applications in the example.

Starting SignalR Self-Hosting Server

SignalR provides a class named PersistentConnection that you inherit from and override various methods to handle negotiation, data received etc. But we will only use the broadcast feature of the server connection in this example, so we will only create an empty class, by the name EchoConnection.


    public class EchoConnection : PersistentConnection
    {
    }

We can now start the self-hosting server, extract the connection through the provided dependency resolver and then use this connection to broadcast events to all subscribers.
To start the event broker in a self-hosting mode we add an overloaded constructor taking a publishingUri, which the server will register itself on.


        private readonly IConnection _serverConnection;

        public EventBroker(String publishingUri)
            : this()
        {
            var server = new SignalR.Hosting.Self.Server(publishingUri);
            server.MapConnection<EchoConnection>("/echo");
            server.Start();
 
            var connectonManager = server.DependencyResolver.Resolve<IConnectionManager>();
            _serverConnection = connectonManager.GetConnection<EchoConnection>();
        }

Connecting to SignalR servers

We need to refactor a bit to be able to connect to a remote SignalR server (e.g. our remote event stream). First we add a stack of client connections meaning that this event broker is a subscriber to zero or more other event publishers. I guess a dictionary and naming publishers might be a good idea, but a Stack will suffice for this example.
Let’s add two private methods that will help us register subscriptions, both locally and remotely. First is the GetCurrentConnection() that peeks the top connection of the client stack, which will always contain the current remote we are adding subscriptions to. The other method GetCurrentObservable<TEVENT>() will, depending on if we are registering local or remote subscriptions, return an IObseravable<TEVENT> from either the Subject<T> or the client connection.

This code also shows how SignalR supports Reactive Extensions by using the AsObservable<>() on the current client connection.


        private readonly Stack _clientConnections;

        private IObservable<TEvent> GetCurrentObservable<TEvent>()
        {
            return _inLocalSubscriptionMode ? _subject.Where(o => o is TEvent).Cast<TEvent>()
                                            : GetCurrentConnection().AsObservable<TEvent>();
        }
 
        private Connection GetCurrentConnection()
        {
            return _clientConnections.Peek();
        }

But wait a minute, there is a problem here that we haven’t addressed yet. I don’t know if this is intended or bug in SignalR, but when you use AsObservable<TEvent>() on a client connection, that doesn’t mean that you will filter incoming events by TEvent, SignalR will rather try to deserialize every incoming event (that would be all events of all types published) into the TEvent type. Some events might work, some fail, some get mixed and this is definitely not how we want it to work. So the solution to this problem is to take care of the serialization and deserialization ourselves, and not rely on SignalR’s default serialization.

Json.NET has a TypeHandling setting that can be used to add the type name of an object as metadata under the property name $type. Let’s use this feature and verify this property on all incoming events. What we’ll do is to use the AsObservable() to get an IObservable<String> and upon that instance apply some type filtering, deserialization and then use AsObservable() again. The code below has been refactored to accomplish this.


    private IObservable<TEvent> GetCurrentObservable<TEvent>()
    {
        return _inLocalSubscriptionMode ? _subject.Where(o => o is TEvent).Cast<TEvent>()
                                        : GetCurrentConnection()
                                                    .AsObservable()
                                                    .Where(IsEventOfCorrectType<TEvent>)
                                                    .Select(JsonConvert.DeserializeObject<TEvent>)
                                                    .AsObservable();
    }

With those two methods in place we can refactor the main Subscribe method to use GetCurrentObservable<TEVent>(), and also add the subscription to our subscriptions collection, so we can close and dispose them when exiting the application. And by those small changes we can now subscribe to a remote event stream.


        ISubscribe ISubscribe.Subscribe<TEvent>(Func<TEvent, Boolean> filter, 
                                                Action<TEvent> onConsume)
        {
            _subscriptions.Add(GetCurrentObservable<TEvent>()
                                .Where(filter ?? (x => true))
                                .ObserveOn(_scheduler)
                                .Subscribe(onConsume));
            return this;
        }

Publishing

Servers are up, connections can be made. Now we just need to broadcast events.

The publishing method have two new lines of code, and those two last ones now broadcasts the event if the event broker was instantiated with a publishing Uri and by that registered itself as a self-hosting server. Remember from above that we also wanted to take care of our own serialization?


        public IEventBroker Publish<TEvent>(TEvent @event)
            where TEvent : IEvent
        {
            _subject.OnNext(@event);

            if (_serverConnection != null)
                _serverConnection.Broadcast(JsonConvert.SerializeObject(@event, 
                                                                        Formatting.None, 
                                                                        _includeTypeJsonSetting));

            return this;
        }

Links

Source
Full source at my GitHub repository

Navigation
Part 1: A Fluent API
Part 2: Implementation
Part 3: Event Consumers
Part 4: Solving the Scenario

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: