根据流中的值暂停RxJS流

编程入门 行业动态 更新时间:2024-10-25 02:31:28
本文介绍了根据流中的值暂停RxJS流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个带有一个按钮的简单组件,该按钮可以启动和暂停RxJS计时器生成的数字流.

import { Component, OnInit } from '@angular/core'; import { BehaviorSubject, Observable, timer, merge } from 'rxjs'; import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators'; @Component({ selector: 'my-app', template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`, styleUrls: [ './appponent.css' ] }) export class AppComponent implements OnInit { active$ = new BehaviorSubject<boolean>(true); ngOnInit(): void { const on$ = this.active$.pipe(filter(v => v)); const off$ = this.active$.pipe(filter(v => !v)); const stream$ = timer(500, 500).pipe(share()); const out$ = merge( stream$.pipe( bufferToggle(off$, () => on$), mergeAll(), ), stream$.pipe( windowToggle(on$, () => off$), mergeAll(), ), ); out$.subscribe(v => console.log(v)); } toggle(): void { this.active$.next(!this.active$.value); } }

这很完美,但是我需要再添加一个功能!

我需要根据流中满足条件的值自动暂停流.

例如,如果最新值为5的倍数,则暂停流.

您对如何执行此操作有任何想法吗?

这是stackblitz上的一个可运行示例 stackblitz/edit/angular-6hjznn

解决方案

可以(1)扩展当前的bufferToggle/windowToggle方法,或者(2)使用自定义的缓冲区实现.

1.扩展bufferToggle/windowToggle方法

您可以在bufferToggle之后将数组添加到运算符队列.

  • 发出bufferToggle时,将这些值附加到数组中.
  • 从数组中获取值,直到数组中的某个元素符合暂停条件为止.
  • 发出这些值并暂停您的视频流.
  • 可暂停(演示)

    pausable运算符将发出与暂停条件匹配的值,然后立即停止流.

    export function pausable<T, O>( on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused pause: () => void, // pauses the stream by triggering the given on$ and off$ observables spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array ) { return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer let buffer: T[] = []; return merge( source.pipe( bufferToggle(off$, () => on$), tap(values => buffer = buffer.concat(values)), // append values to your custom buffer map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did) ), source.pipe( windowToggle(on$, () => off$), mergeMap(x => x), tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition ), ); }); }

    您可以根据具体需要调整此运算符,例如使用较少的输入参数并将share并入其中,请参见此版本使用较少的参数.

    用法

    active$ = new BehaviorSubject<boolean>(true); on$ = this.active$.pipe(filter(v => v)); off$ = this.active$.pipe(filter(v => !v)); interval(500).pipe( share(), pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false)) ).subscribe(console.log); pauseOn = (value: number) => value > 0 && value % 10 === 0

    2.完全自定义的缓冲区

    您可以使用完全自定义的方法,仅使用一个类似于布兰登方法的可观察输入.

    bufferIf(演示) 当给定的condition发出true时,

    bufferIf将缓冲传入的值,并在condition为false时从缓冲区中发出所有值或传递新的值.

    export function bufferIf<T>(condition: Observable<boolean>) { return (source: Observable<T>) => defer(() => { const buffer: T[] = []; let paused = false; let sourceTerminated = false; return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)), condition.pipe(map(v => [v, 1])) ).pipe( // add values from the source to the buffer or set the paused variable tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean), switchMap(_ => new Observable<T>(s => { setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments) while (buffer.length > 0 && !paused) s.next(buffer.shift()) }, 0) })), // complete the stream when the source terminated and the buffer is empty takeWhile(_ => !sourceTerminated || buffer.length > 0, true) ); }) }

    用法

    pause$ = new BehaviorSubject<boolean>(false); interval(500).pipe( bufferIf(this.pause$), tap(value => this.pauseOn(value) ? this.pause$.next(true) : null) ).subscribe(console.log); pauseOn = (value: number) => value > 0 && value % 10 === 0

    I have a simple component with a single button that starts and pauses a stream of numbers generated by RxJS timer.

    import { Component, OnInit } from '@angular/core'; import { BehaviorSubject, Observable, timer, merge } from 'rxjs'; import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators'; @Component({ selector: 'my-app', template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`, styleUrls: [ './appponent.css' ] }) export class AppComponent implements OnInit { active$ = new BehaviorSubject<boolean>(true); ngOnInit(): void { const on$ = this.active$.pipe(filter(v => v)); const off$ = this.active$.pipe(filter(v => !v)); const stream$ = timer(500, 500).pipe(share()); const out$ = merge( stream$.pipe( bufferToggle(off$, () => on$), mergeAll(), ), stream$.pipe( windowToggle(on$, () => off$), mergeAll(), ), ); out$.subscribe(v => console.log(v)); } toggle(): void { this.active$.next(!this.active$.value); } }

    This works perfectly but I need to add one more feature!

    I need to pause the stream automatically based on a value in the stream satisfying a condition.

    For example, pause the stream if the latest value is a multiple of 5.

    Do you have any ideas how to do this?

    Here is a runnable example on stackblitz stackblitz/edit/angular-6hjznn

    解决方案

    It's possible to either (1) expand your current bufferToggle / windowToggle approach or to (2) use a custom buffer implementation.

    1. Expanding the bufferToggle / windowToggle approach

    You can add an array to the operator queue after bufferToggle.

  • When bufferToggle emits append those values to the array.
  • Take values from the array until a certain element in the array matches a halt condition.
  • Emit those values and pause your stream.
  • pausable (Demo)

    The pausable operator will emit values that match the halt condition and then stop the stream immediately.

    export function pausable<T, O>( on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused pause: () => void, // pauses the stream by triggering the given on$ and off$ observables spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array ) { return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer let buffer: T[] = []; return merge( source.pipe( bufferToggle(off$, () => on$), tap(values => buffer = buffer.concat(values)), // append values to your custom buffer map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did) ), source.pipe( windowToggle(on$, () => off$), mergeMap(x => x), tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition ), ); }); }

    You can adjust this operator to your specific needs e.g. use less input parameters and incorporate share into it, see this version with less parameters.

    Usage

    active$ = new BehaviorSubject<boolean>(true); on$ = this.active$.pipe(filter(v => v)); off$ = this.active$.pipe(filter(v => !v)); interval(500).pipe( share(), pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false)) ).subscribe(console.log); pauseOn = (value: number) => value > 0 && value % 10 === 0


    2. A fully custom buffer

    You can go with a fully custom approach using only one input observable similar to Brandon's approach.

    bufferIf (Demo)

    bufferIf will buffer incoming values when the given condition emits true and emits all values from the buffer or passes new ones through when the condition is false.

    export function bufferIf<T>(condition: Observable<boolean>) { return (source: Observable<T>) => defer(() => { const buffer: T[] = []; let paused = false; let sourceTerminated = false; return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)), condition.pipe(map(v => [v, 1])) ).pipe( // add values from the source to the buffer or set the paused variable tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean), switchMap(_ => new Observable<T>(s => { setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments) while (buffer.length > 0 && !paused) s.next(buffer.shift()) }, 0) })), // complete the stream when the source terminated and the buffer is empty takeWhile(_ => !sourceTerminated || buffer.length > 0, true) ); }) }

    Usage

    pause$ = new BehaviorSubject<boolean>(false); interval(500).pipe( bufferIf(this.pause$), tap(value => this.pauseOn(value) ? this.pause$.next(true) : null) ).subscribe(console.log); pauseOn = (value: number) => value > 0 && value % 10 === 0

    更多推荐

    根据流中的值暂停RxJS流

    本文发布于:2023-11-26 00:47:51,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1631991.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:RxJS

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!