Quite often when working with Rx, you'll end up with an observable of observables: IObservable<IObservable<int>>
for example. When you were learning Rx, you were probably told to use SelectMany
to flatten the observable:
obs1
.SelectMany(_ => Observable.FromAsync(() => DoWebCall()))
...
This flattens the IObservable<IObservable<int>>
to IObservable<int>
. "Case closed" you might think, but here's something to consider: SelectMany
will merge all the observables returned inside the operator. Let's look a bit into that.
Merge
To better understand SelectMany
we'll start by having a look at Merge
.
I think the most common uses of Merge
is either as a static method on Observable
or as an operator taking an argument:
// As a static method
Observable.Merge(
obs1, obs2
)
...
// As an operator taking an argument
obs1
.Merge(obs2)
...
Merge
can also be used as an operator taking no arguments:
obs1
.Merge()
...
This Merge
only works on IObservable<IObservable<T>>
, or an observable of observables. Observables of observables are a bit hard to grasp, so here's one way to think about it:
An IObservable<T>
is something that will happen some time in the future. That future might be 1ms away or 10 minutes, doesn't matter when - you'll get a signal when it happens. The signal will contain 1 value of type T
. An IObservable<IObservable<T>>
is something that will signal when something is scheduled to happen in the future. When that scheduled thing happens, you'll get a signal (and that signal will contain 1 value of type T
).
This is can very quickly become very complex, which is why merge these observable. Instead of being told that a new observable has been added, we automatically subscribe to the new observable and get notified about the result in our existing observable.
This is what Merge
does. You give it an IObservable<IObservable<T>>
and you'll get back an IObservable<T>
. Whenever there's a new inner observable, Merge
will automatically subscribe to it and pass Next and Error signals to the outer observable.
To use Merge
in this manner, we can rewrite the first example like this:
// First example
obs1
.SelectMany(_ => Observable.FromAsync(() => DoWebCall()))
...
// Rewritten
obs1
.Select(_ => Observable.FromAsync(() => DoWebCall()))
.Merge()
...
SelectMany
You've just seen how SelectMany
basically works:
- Transform a signal to an observable (
Select
) - Merge that signal into the original/outer observable (
Merge
)
Imagine however that the observable that is returned inside the SelectMany
emits multiple signals. In some cases this is what you want, but when doing web requests you're most likely only interested in the most recent request sent. SelectMany
will pass on any signal that is returned, regardless whether they are in order or not.
This is perhaps best explained visually. This shows an Observable.Interval
emitting a signal every 2 seconds (these signals are again transformed to strings to more easily distinguish them). That signal is transformed to a new Observable.Interval
which will emit signals half a second apart.
Notice how the signals from the different Observable.Interval
s mix with each other. That might not be what we want.
Switch
Enter Switch
. Switch
also works on IObservable<IObservable<T>>
, but in contrast to Merge
, it switches to the most recent IObservable<T>
inside the IObservable<IObservable<T>>
.
When a new IObservable<T>
is emitted by IObservable<IObservable<T>>
, Switch
will unsubscribe to previous IObservable<T>
and subscribe to the new one. That way you won't get any signals from the previous observable if it should happen to send a signal after you've received the new observable.
This example uses a Select().Switch()
combo. Notice how we only receive signals from the most recent Observable.Interval
.
Concat
The last operator I'm going to cover is Concat
. Concat
also only allows one inner observable produce values, but it will wait for the previous to finish before it subscribes to the new one. When a complete signal is sent by the previous observable, Concat
will subscribe to the next and let that finish before subscribing to the next one.
Let's look at a gif:
Each observable emits 4 signals, and when that's done Concat
let's the next observable emit it's 4 values.
The tool I used for these examples is Linqpad. It's an excellent tool if you're doing any kind .Net development.
Happy coding!
Top comments (0)