有我的方法:
public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) { api.openWorkshift(workshiftSettings) .compose(RxOperatorsHelpers.additionalStacktrace()) .doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <- .doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <- .subscribeOn(ioScheduler) .observeOn(uiScheduler) .doOnError(this::handleError) .subscribe(subscriber); }其中ActionsSystem.registerAction(...) / ActionsSystem.unregisterActions(...)看起来像这样:
public Completable registerAction(OperatorAction action) { return Completable.fromAction(() -> actions.add(action)); } public Completable unregisterAction(OperatorAction action) { return Completable.fromAction(() -> actions.remove(action)); }如您所见,我使用.await()在源Observable流中执行Completable 。 感觉就像错误的解决方案。 我怎么能更优雅呢?
There is my method:
public void openWorkshift(WorkshiftSettings workshiftSettings, Subscriber<WorkshiftSettings> subscriber) { api.openWorkshift(workshiftSettings) .compose(RxOperatorsHelpers.additionalStacktrace()) .doOnSubscribe(() -> actionsSystem.registerAction(...).await()) // <- .doOnUnsubscribe(() -> actionsSystem.unregisterAction(...).await()); // <- .subscribeOn(ioScheduler) .observeOn(uiScheduler) .doOnError(this::handleError) .subscribe(subscriber); }Where ActionsSystem.registerAction(...)/ActionsSystem.unregisterActions(...) looks like that:
public Completable registerAction(OperatorAction action) { return Completable.fromAction(() -> actions.add(action)); } public Completable unregisterAction(OperatorAction action) { return Completable.fromAction(() -> actions.remove(action)); }As you can see, I use .await() to execute Completable in flow of source Observable. It feels like wrong solution. How can I do it more elegant?
最满意答案
由于您的Completable执行琐碎的操作,您只需将其代码内联到doOnSubscribe和doOnUnsubscribe :
.doOnSubscribe(() -> actions.add(action)) .doOnUnsubscribe(() -> actions.remove(action))您可以通过从completable开始,然后使用Observable序列的其余部分来避免doOnSubscribe :
actionsSyste.registerAction(...) .andThen(api.openWorkshift(workshiftSettings) .compose(RxOperatorsHelpers.additionalStacktrace()) .doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await()) .subscribeOn(ioScheduler) .observeOn(uiScheduler) .doOnError(this::handleError) ) .subscribe(...)目前,当下游取消订阅时没有办法执行Completable ,并且当序列正常终止或异常终止时,没有简单的方法来执行它。
Since your Completables perform trivial actions, you could just simply inline their code into the doOnSubscribe and doOnUnsubscribe:
.doOnSubscribe(() -> actions.add(action)) .doOnUnsubscribe(() -> actions.remove(action))You can avoid doOnSubscribe by starting with the completable andThen the rest of the Observable sequence:
actionsSyste.registerAction(...) .andThen(api.openWorkshift(workshiftSettings) .compose(RxOperatorsHelpers.additionalStacktrace()) .doOnUnsubscribe(() -> actionsSyste.unregisterAction(...).await()) .subscribeOn(ioScheduler) .observeOn(uiScheduler) .doOnError(this::handleError) ) .subscribe(...)Currently, there is no way to execute a Completable when the downstream unsubscribes and no easy way to execute it when the sequence may terminate normally or with an exception.
更多推荐
发布评论