RxJS - Stream of cached API Responses
I’ve been spending a lot of time in the frontend recently, trying to catch up with the year-ish worth of developments I haven’t been trully involved in. I’ve been going through React, Redux, and RxJS in earnest over the last serveral weeks. I think particularly, making RxJS do things I wanted it to do has been a bit frustrating to me since some situations require a clumsy and workaround-ish setup for them to work within the Observable -> Observer paradigm. Maybe it’s just a matter of getting used to the RxJS way? Not sure. Here’s a small example:
On this intro
to RxJS, Andre Staltz makes a small user suggestion widget from the
GitHub API. It’s nifty in that there are no while
or
if
statements - just pure Observer patterns. The use case
is to query from the API and store the result, since there’s about 30
returned users and if we dismiss a user we’d like to use the cached
request from our API call.
One proposed solution is to call combineLatest
on two
streams, which returns the latest two emissions from two Observables.
This, in effect, queries the latest API response while serving up the
actual response on a click event. However, this does not take into
account consuming the actual users and querying for more when we are out
of new users. I tried (and believe me, I tried) to look around for a
good, straightforward solution in the RxJS way - but did not find a
clean answer. controlled()
won’t get us what we need, as it
just finishes if generated off of the response. Actually, in one SO
post Andre answers a similar question, suggesting to use Subjects as
some sort of “callback” to trigger a data fetch based on a
condition.
Now we’re getting somewhere. If we can somehow signal back to our API stream that we need more data when we run out of users, the cycle is complete! My issue is I was trying to generate the Observer from the resulting API array - whereas most other solutions keep the Array intact, and just iterate through it, keeping track of the index again.
The final break came in my SO
post and a solution that makes the array index the
stream! Brilliant. Not quite flexible with arrays of variable length,
but that was the inspiration I needed. Ultimately, all I needed to do
was keep the index supplied by map()
at hand, and just ask
for another API call when I was out of bounds in my array. Solution
below and fiddle for
reference.
RxJS is really proving to be tricky, but we might just get along after all.
var refreshButton = document.querySelector('#main');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
var proxyStream = new Rx.Subject();
var requestStream = new Rx.Observable
.just('startup')
.merge(proxyStream)
.map(function() {
var randomOffset = Math.floor(Math.random() * 5000);
return 'https://api.github.com/users?since=' + randomOffset;
});
var responseStream = requestStream.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(fetch(requestUrl));
}).flatMapLatest(function(response) {
return Rx.Observable.fromPromise(response.json());
});
var userOutputStream = Rx.Observable.combineLatest(
responseStream,
refreshClickStream,
function(users) {
return users;
}
);
var controlStream = userOutputStream.map(function(users, i) {
var index = i % users.length;
if (!users[index + 1]) proxyStream.onNext('query');
return users[i % users.length];
});
controlStream.subscribe(function(user) {
console.log(user)
});