Twitter with Rx

This entry is part 4 of 4 in the series Reactive Extensions in theory and practice

In this post we will look at another sample program to get to know a few more Rx operators. We will also see how Rx is great for dealing with user interactions combined with asynchronous data streams.

Basically the program filters a stream of tweets by user input. There are two kinds of ongoing events involved, user inputs and tweets incoming in real-time from Twitter. Here are some detailed specifications:

  1. The user can type a search text into a search field
    When the user has entered at least 3 characters and stops typing for at least 2 second the text in the search field will be processed for a new search.
  2. User inputs will be used to filter the incoming tweets
    The tweets must contain an exact (but case-insensitive) match of the user input. The filtered tweets are displayed on the UI ordered from most recent (top of the list) to oldest (bottom of the list). Every time the item count of the list exceeds a maximum of 20 the oldest tweet is supposed to be removed.

  3. New tweets should be displayed with a sample rate of one second.
    Only a maximum of one tweet should be displayed per second. All other tweets should be dropped.

  4. When the search text is changed according to 1. the tweets should be filtered by the new text according to 2.

  5. All filtered tweets are classified and sorted into different lists
    Based on the score provided by a classifier each tweet will be considered either as awesome or boring. A score greater than 1000 is considered to be awesome and a score equal or less than 1000 is considered to be boring. Awesome and boring tweets are displayed in separate lists.

(The classifier function can be seen as a black box. In this sample it is a very stupid algorithm that just serves to demonstrate the capabilities of Rx. In another scenario we could imagine it to be some kind of machine learning algorithm or something like that.)

This is how the application’s simple UI will look like:

schreenshot2

The source code can be downloaded here.

Implementation

Now we will go through the requirements one by one and explore how they can be implemented using Rx.

Handling user input

To handle the user input we need to apply the Rx operators Where and Throttle.

Where

Where is a filtering operator that filters an observable sequence based on a predicate function. In the diagram below only the result sequence only emits values that are greater than 2.

alt text

This can be used to filter out search texts with less than 3 characters.

Throttle

The filtering operator Throttle only emits an item from a source sequence when a given time span has passed without the source emitting any other items. In other words all items that are rapidly followed by other items are ignored.

throttle

In our application we can use this when the user is typing in characters. As long as he is rapidly typing no events will let through. But as soon as he stops for a given time span (in this case 2 seconds) an item from the source stream is emitted.

To convert the property that is bound to the view to an observable we can use this extension method as described here. Also we have to make sure that in the view the control’s data binding has an UpdateSourceTrigger that is set to PropertyChanged.

<TextBox Text="{Binding Input, UpdateSourceTrigger=PropertyChanged}"/>

Then we can use this code to create the observable sequence of search text inputs:

var searchTexts = this
    .ToObservable(() => SearchTextBox)
    .Throttle(TimeSpan.FromSeconds(2))
    .Where(x => x.Length > 2);

If we subscribe to this observable by adding all items to a list the output might be something like this:

schreenshot1

Combining the search text with the tweet stream

The function allTweetsAbout for searching the tweets is passed into the constructor of the view model. It takes a search text as an argument and returns an observable stream of tweets which is all we need to know. The internals of this function will not be discussed here because this article should focus on the consumption of observables as opposed to creation.

In order to combine the search text stream with the tweet stream we need to introduce the operators Switch, Sample, Publish / Connect and ObserveOnDispatcher. (The Select operator we have already seen in the previous part.)

Switch

The combining operator Switch can be applied to an observable of nested observables. Switch only emits items from the most recent inner observable.

switch

In our program we combine the observable of search texts with the tweet streams using the Select operator. The result is an observable of observable of tweets. As we are only interested in the most recent search this is a perfect application for Switch.

Sample

The filtering operator Sample emits only the most recent item for every specified periodic interval.

sample

Publish and Connect

Publish allows us to share the actual values of an observable with multiple subscribers.

ObserveOnDispatcher

This extension method makes sure that the subscription is made on the UI thread. If we don’t use this in a WPF application it would cause some sort of threading exception.

Note that in order to be able to properly test the application we would use ObserveOn with an IScheduler instance instead which should be injected into our class from outside. More information on this can be found here.

Composing the application

We have introduced all the necessary Rx operators. But for updating the view model we need two more functions, UpdateViewModelAwesomeTweets and UpdateViewModelBoringTweets whose implementations are trivial. Now we have everything to compose the application as follows:

var searchTexts = this
    .ToObservable(() => SearchTextBox)
    .Throttle(TimeSpan.FromSeconds(2))
    .Where(x => x.Length > 2);

var tweets = searchTexts
    .Select(allTweetsAbout)
    .Switch()
    .Sample(TimeSpan.FromSeconds(1))
    .Select(classifyTweet)
    .Publish();

tweets
    .Where(x => x.Score > 1000)
    .Select(x => x.Tweet)
    .ObserveOnDispatcher()
    .Subscribe(UpdateViewModelAwesomeTweets);

tweets
    .Where(x => x.Score <= 1000)
    .Select(x => x.Tweet)
    .ObserveOnDispatcher()
    .Subscribe(UpdateViewModelBoringTweets);

tweets.Connect();

The complete source code of this application can be downloaded here.

Summary

Easy combination of multiple event streams
Combining and handling multiple event streams is not a simple task. But with the help of Rx this becomes really easy.

Dealing with time
When the relationship with time is involved in the event processing Rx provides a set of powerful operators that deal with time like e.g. Throttle and Sample.

Separation of concerns
In the example application we have separated the concerns pretty well. We use small functions that all have a single responsibility like e.g. searching tweets, classifying tweets or updating the model. The integration of all these functions is expressed comprehensively by the composition with the Rx operators.

Resources


The header image “tweet” by mozzercork is licensed under a Creative Commons Attribution 2.0. The picture was modified to fit this article.

Series Navigation<< Drawing lines with Rx

Published by

Leif Battermann

I am a passionate software developer interested in .NET C#, F#, Haskell, software design and architecture, functional programming, clean code, flow design, code katas, coding dojos and more.