Software programs (specifically those with a user interface) typically need to react to stuff. The user clicks on something. They type something into an input. The response for a previously made request arrives. So on.
Handling all these events in an explicit manner (via event handlers, change observers, etc) can be daunting and lead to complicated code:
// this is schematic code, also update handler is simplified
// as in reality it would be more complex.
render(
<>
<input type='text' placeholder='type something'
onInput={e => inputChanged(e.target.value)}/>
<div ref={resultContainer}/>
</>
)
on(inputChanged, async input => {
const response = await fetchData(input)
const alreadyRendered = []
//
// look at rendered children, remove those who
// are no longer inside our result set
//
resultContainer.children.forEach(child => {
const entry = child.getAttribute('key')
if (response.entries.includes(entry))
alreadyRendered.push(entry)
else
child.remove()
})
//
// look at the result set, render those who are not
// already rendered.
//
response.entries.forEach(entry => {
if (!alreadyRendered.includes(entry)) {
render(
<div key={entry}>{entry}</div>
, resultContainer)
}
})
})
Fortunately, there are tools out there like React that help with this problem. React, for example, does that by basically masking changes via detecting, propagating and handling them on its own behind the scene, and allowing us to describe our desired UI based on a given snapshot of the program state:
function MyApp() {
const [input, setInput] = useState('')
const [entries, setEntries] = useState([])
//
// this is sort of an explicit event handler,
// because in response to input event we need to
// do something not directly related to UI.
//
useEffect(async () => {
const response = await fetchData(input)
setEntries(response.entries)
}, [input])
return (
<>
<input type='text' placeholder='type something'
onInput={e => setInput(e.target.value)}/>
<div>
{
entries.map(entry =>
<div key={entry}>{entry}</div>
)
}
</div>
</>
)
}
Unfortunately, looking only at snapshots of program state is not always enough. For example, in the code above, it might happen that entries displayed do not match the input by the user (check it out). This is because fetchData()
takes varying times for responding to different inputs (randomly between 0 and 5 seconds in this case), so the result for some input A
might come after the result for some other input B
, despite A
itself being entered earlier than B
. Our component, as described above, will always display the last result it receives from fetchData()
, regardless of the order of its corresponding input.
A pure React-based solution to this problem would look like this:
function App() {
const [input, setInput] = useState('')
const [entries, setEntries] = useState([])
// here we maintain a reference to last request
const lastPromise = useRef()
useEffect(async () => {
// keep a reference to our request in `promise`
// and set it to last request as well
const promise = lastPromise.current = fetchData(input)
// wait for the response
const response = await promise
// if our request is still the last request,
// then update the `entries`.
if (promise === lastPromise.current)
setEntries(response.entries)
}, [input])
return (
<div>
<input type='text' placeholder='type something'
onInput={e => setInput(e.target.value)}/>
<div>
{
entries?.map(entry =>
<div key={entry}>{entry}</div>
)
}
</div>
</div>
)
}
For properly ordering responses, we would need a mapping between inputs and their corresponding requests. A single snapshot would not suffice for our case, as we would need to look at the stream of incoming inputs and what happens to them subsequently.
Unfortunately though, we lose that information in snapshot-only style of React. As a result, we need to hack our way by partially maintaining that information (the input stream and its mapping to requests) by holding a reference (useRef()
) to the last request and keeping it up to date.
I say it is a hacky solution because it is pretty low-level (most of the information of corresponding code is about implementation detail rather than the intention) and not particularly composable/reusable (imagine a relatively similar situation where we receive our entries from a web-socket and in pieces).
Reactive Streams
Ok so sometimes we actually need to look at the stream instead of snapshots of it. But how would such a stream look like?
Well if you are familiar with RxJS, Observable
is another name for a stream. If not, you can think of a stream as a source of data that will emit one or more values at unspecified times in the future (it might be synchronous, it might be one value every 10 seconds, it might be whenever there is a user-click, etc.).
For example, clicks on a button can be modeled with a stream of click events. The values of our input in previous example can be thought of as a stream of strings. A request can even be thought of as a stream, one which emits only a single value (though at a non-specified time in the future). Even a single value can be modeled by a stream that emits that value in a synchronous manner.
Now we can remodel our problem using streams: input
is a stream of strings, and we need to map each emitted value to a corresponding request:
pipe(
input,
map(i => fetchData(i))
)
What this (βοΈ) gets us is a stream of requests, but we want a stream of responses. Since each request is a stream as well, we basically have a stream of streams, which we need to flatten down to a stream of values, i.e.:
pipe(
input,
map(i => fetchData(i)),
flatten
)
This is of course callbag lingo (which is a light-weight model for streams, pretty similar to RxJS). Using RxJS our code would look more like this:
input.pipe(
switchMap(i => fetchData(i)),
)
where switchMap()
operator is basically a combination of map()
and flatten()
.
How does this flattening work? Well there are many solutions for turning a stream of streams into a stream of values. For example, you could merge all values emitted by all of the sub-streams (this is what RxJS's mergeMap()
does). More often than not, you would want to simply pick (or switch to) the last emitted stream and take values from it, which is what flatten()
(or RxJS's switchMap()
) does.
All of this means if we could look at input
as a stream instead of a snapshot, our request-ordering problem would be solved pretty intuitively:
- map each input to a request
- flatten the stream
Reactive Streams in React
As we saw in the first example, manually handling changes can be a daunting task and lead to complex code. React masks these changes and allows us to look only at snapshots of our state, however that comes with the trade-off that when we need to look at the stream again (which happens often), our code quickly turns hacky.
What if we could combine the two models? What if we could look at state in snapshots, but also whenever necessary look at the underlying stream and then conveniently turn it back into state snapshots (i.e. normal JavaScript variables)?
Turns out, with a set of pretty simple hooks, we actually can achieve that. The resulting code, would then look like this:
function App() {
const [input, setInput] = useState('')
const [entries] = useStream( // π build a stream
input, // .. from input
map(i => fromPromise(fetchData(i))), // π map each to a request
flatten, // π flatten
map(r => r.entries) // π also map responses to `.entries` key
)
return (
<div>
<input type='text' placeholder='type something'
onInput={e => setInput(e.target.value)}/>
<div>
{
entries?.map(entry =>
<div key={entry}>{entry}</div>
)
}
</div>
</div>
)
}
For this example, I have used react-callbag-streams
, which uses callbags to represent reactive streams (it is much like RxJS, but more lightweight and also pretty decentralized).
npm i react-callbag-streams
import { useStream } from 'react-callbag-streams'
// ...
This hybrid approach brings us the best of both worlds: we get the convenience of only dealing with snapshots of the program state (as plain variables), while being able to peek into the underlying stream and run some stream operations on the state (like mapping to request streams and flattening), and then treating the result of those streams as snapshot states again.
This also provides much higher degrees of composability / reusability. For example, imagine if after requesting corresponding entries
for some input, our server (or any other source of data) would also push updated entries
whenever they change. In that case, fetchData()
would need to return an actual stream of responses (i.e. a callbag). But our code would remain basically the same:
const [entries] = useStream(
input,
map(i => fetchData(i)),
flatten,
)
Other Streaming Operations
Another exceedingly common stream operator is debounce()
. It is used for handling back-pressure: you might not want to send a request to the server for every single key the user presses, but rather one request when the user finishes typing. To determine that, you can look at each emission of the input
stream, and if a certain amount of time passes without any new emissions, you can assume the user has finished typing and the latest emission is the input you should request your API with (we call this debouncing the stream).
With our hybrid model, adding a debounce (using the standard debounce()
operator) can be as simple as this:
const [entries] = useStream(
input,
debounce(200), // π debounce the stream by 200ms
map(i => fromPromise(fetchData(i))),
flatten,
map(r => r.entries)
)
Note that debounce can be implemented by writing a specific debounce hook or using lodash's debounce in combination with useCallback()
, but all these solutions would again be hacky:
// using a hook would require an intermediary variable
const [input, setInput] = useState('')
const debouncedInput = useDebounce(input, 200)
// using useCallback would require wrapping the setter callback
const [input, setInput] = useState('')
const debouncedSetInput = useCallback(
debounce(setInput), 200),
[]
)
The inherent simplicity in our streaming approach also opens the door for further composability: for example you can easily debounce multiple values together:
//
// this app finds repositories on github
// based on a query and language
//
function App() {
const [q, setQ] = useState('')
const [l, setL] = useState('javascript')
const [repos, loading] = useCombinedStream(
[q, l], // π a combined stream of query and language
filter(([q]) => q.length >= 2), // π filter out when query is too short
debounce(1000), // π debounce the combo by a second (github API will block us o.w.)
map(([q, l]) => fromPromise(search(q, l))), // π search in github api using query and language
flatten, // π flatten the stream (preserve order of responses)
map(res => // π format the incoming result ...
res.items.map(item => // .. take each repository ...
({ name: item.name, url: item.html_url }) // .. get its name and its url
)
),
)
return <>
<input type='text'
placeholder='keywords ....'
value={q}
onInput={e => setQ((e.target as any).value)}/>
<input type='text'
placeholder='language'
value={l}
onInput={e => setL((e.target as any).value)}/>
<br/>
{ q.length >= 2 ?
(
loading ? 'loading ...' : (
<ul>
{ repos?.map(repo => (
<li key={repo.url}>
<a href={repo.url}>{repo.name}</a>
</li>
))}
</ul>
)
) : ''
}
</>
}
Conclusion
As we saw, there is extreme convenience in React's model of only dealing with snapshots of program state, as explicitly handling changes and updates can result in pretty complicated and hard-to-read code.
However, in many extremely common scenarios (sending requests to an API, debouncing), we would basically need to look at the data stream instead of data snapshots (e.g. we care about order of data emissions or their timing). Subsequently, trying to handle these scenarios in a pure React way would result in hacky, not-so-composable code.
Fortunately though, we can combine the convenience of React's approach with power of reactive stream programming using hook libraries such as react-callbag-streams. This way, we get to handle pretty complicated reactive scenarios in elegant, composable and reusable manners, without sacrificing the convenient provided by React.
Top comments (0)