javascript - Paginated data cursor in RXJS and confusion about subject.onCompleted and errors -
i'm working rxjs , came implementation of paginated data cursor. having not spent time reactive functional programming, i'm wondering if implementation in spirit of how library intended used.
i want class can load in pages endpoint. if subscribe it, receive last page queried. first subscription results in first page being automatically queried. call "getpage" should trigger onnext subscriptions. multiple subscriptions should not cause multiple requests.
i wrote basic example satisfies this, heavily commented thought process: https://jsfiddle.net/gfmn708g/1/
my questions are:
- is in spirit of rxjs? using both replysubject , sharereplay feels wrong me, found no other way behavior want. read using subjects "bad" , against principles of paradigm.
- will line 63 unsubscribe/finish of items$ subscriptions (lines 82 , 89) after in-flight requests completed , processed?
- what proper way handle errors, errors propagated subscribers, don't murder stream , prevent me pushing more requests?
(here's listing of code per so's question guidelines)
const logdiv = $("#log"); function log(message, cls) { logdiv.append($("<li>").text(message).addclass(cls)); } /* interface irequest { url: string; page: number: refresh?: boolean } interface iendpoint { get(request: irequest): []; } */ // class represents cursor paginated data function pageddata(endpoint, url) { this._endpoint = endpoint; this._url = url; // our request queue observable of structurs of type irequest // use reply subject last url requested in stream when first subscriber subscribes. this._requestqueue = new rx.replaysubject(1); // our data observable, subscribe // a) receive last page cursor has produced // b) receive future pages this.items$ = this._requestqueue // don't re-query unless "refresh" boolean true .distinctuntilchanged(req => req, (left, right) => right.refresh ? false : left.page == right.page) // make request... .flatmaplatest(request => rx.observable.of(request).zip(this._endpoint.get(request))) // wrap data returned envelope data such page requested .map(data => { const request = data[0]; const response = data[1]; return { page: request.page, url: request.url, items: response }; }) // replay last page worth of data on each subscription .sharereplay(1); // queue first page retrieved on first subscription this.getpage(1); } pageddata.prototype.getpage = function(page, refresh) { refresh = refresh || false; // fire off workflow this._requestqueue.onnext({ url: this._url, refresh: refresh, page: page }); } pageddata.prototype.dispose = function() { // question: should unsubscribe of subscriptions this.items$, right? this._requestqueue.completed(); } // ----------------- // example usage var dummyendpoint = { get(request) { log(`get: ${request.url} @ page ${request.page}`, "service"); return rx.observable.range(request.page * 10, 10) .delay(1000) .map(i => ({id: i, title: `track ${i}`})) .toarray(); } }; const tracks = new pageddata(dummyendpoint, "/api/tracks"); // results in getting first page tracks.items$.subscribe(data => { log(`on page ${data.page}, ${data.items.map(i => i.title).join(",")}`, "first") }); // wait 1 second after getting first page window.settimeout(() => { // subscribe again, receive first page no re-query tracks.items$.subscribe(data => log(`got page ${data.page} after delay`, "second")); // second page tracks.getpage(2); // wait second after getting second page window.settimeout(() => { log("getting second page (without refresh)"); // shouldn't result in anything, since "refresh" false/undefined tracks.getpage(2); // wait 1 more second... window.settimeout(() => { log("getting second page (with refresh)"); // should result in getting second page, refresh true tracks.getpage(2, true); // should rid of subscriptions after last in-flight request? tracks.dispose(); }, 1000); }, 2000); }, 2000);
it isn't subjects
bad tend crutch new users don't have use paradigm (an observable , observer price of one, how can afford not use it?).
in seriousness though think gut on correct, use of replaysubject
+ sharereplay
code smell. might try think data actually coming from. in cases functions don't exist themselves, triggered else.
you need find else , follow until find root source. in cases source user or network event can wrap using fromevent
or frompromise
. once have starting point matter of connecting source want do.
so refactor business logic of calling endpoint observable
extension:
rx.observable.prototype.paginate = function(endpoint, url) { return .startwith({ page: 1, refresh: false }) .map(req => ({page: req.page,url: url,refresh: req.refresh})) .distinctuntilchanged(req => req, (left, right) => right.refresh ? false : left.page == right.page) .flatmaplatest(request => endpoint.get(request), (request, response) => ({ page: request.page, url: request.url, items: response })) .sharereplay(1) }
the above wait first subscription , automatically make first request when subscription occurs. after each subsequent subscriber receive latest value pagination.
from there depend on source imagine like:
var trigger = rx.observable.fromevent($nextpagebutton, 'click') .scan((current, _) => current + 1, 1) .paginate(endpoint, url); trigger.subscribe(/*handle result*/);
in case wouldn't unsubscribe until page needed unload, instead hook pipeline on load , take care of rest. while subscribing trigger
latest data.
i added working sample using refactoring of existing code.
const logdiv = $("#log"); function log(message, cls) { logdiv.append($("<li>").text(message).addclass(cls)); } /* interface irequest { url: string; page: number: refresh?: boolean } interface iendpoint { get(request: irequest): []; } */ rx.observable.prototype.paginate = function(endpoint, url) { return .startwith({ page: 1, refresh: false }) .map(req => ({page: req.page,url: url,refresh: req.refresh})) .distinctuntilchanged(req => req, (left, right) => right.refresh ? false : left.page == right.page) .flatmaplatest(request => endpoint.get(request), (request, response) => ({ page: request.page, url: request.url, items: response })) .sharereplay(1) } // ----------------- // example usage var dummyendpoint = { get(request) { log(`get: ${request.url} @ page ${request.page} with${request.refresh ? "" : "out"} refresh`, "service"); return rx.observable.range(request.page * 10, 10) .delay(1000) .map(i => ({ id: i, title: `track ${i}` })) .toarray(); } }; var trigger = rx.observable.concat( rx.observable.just({ page: 2 }).delay(2000), rx.observable.just({ page: 2 }).delay(2000), rx.observable.just({ page: 2, refresh: true }).delay(1000) ); const tracks = trigger.paginate(dummyendpoint, "/api/tracks"); tracks.delaysubscription(2000).subscribe(data => log(`got page ${data.page} after delay`, "second")); // results in getting first page tracks.subscribe(data => { log(`on page ${data.page}, ${data.items.map(i => i.title).join(",")}`, "first") });
#log li.first { color: green; } #log li.second { color: blue; }
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script> <ol id="log"> </ol>
Comments
Post a Comment