订阅者可以充当发布者吗?

编程入门 行业动态 更新时间:2024-10-24 11:15:20
本文介绍了订阅者可以充当发布者吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

就反应式流而言,有一个发布者,它可以有尽可能多的订阅者.

In terms of Reactive Streams, there is a Publisher and it could have as many Subscribers.

但是,假设订阅者从发布者那里收到一条消息.现在,此订户(例如Subs1)更改/修改了该消息,并将其传递给其他订户(例如Subs2),后者使用了已修改的消息.

But suppose, a subscriber gets a message from Publisher. Now this Subscriber(say Subs1) changes/modifies the message and passes it to some other Subscriber(say Subs2), which consumes the modified message.

那么,这个Subs1订阅者可以充当发布者,该发布者可以将消息传递给新的Subs2订阅者吗?

So can this Subs1 subscriber can act as a Publisher which can pass on message to new Subs2 subscriber?

我不确定是否可以,但是我认为这种情况是可能的.

I am not sure if it possible or not, but the scenario is possible i think.

如果可能,请提出一个可行的方法.

If its possible, please suggest a possible way to do this.

推荐答案

如果我们要转换传入的消息并将其进一步传递给下一个订阅服务器,则需要实现Processor接口.这既充当订阅者,因为它接收消息,也充当发布者,因为它处理这些消息并将其发送以进行进一步处理.

If we want to transform incoming message and pass it further to the next Subscriber, we need to implement the Processor interface. This acts both as a Subscriber because it receives messages, and as the Publisher because it processes those messages and sends them for further processing.

这是完成此操作的完整实现:​​

Here is the complete implementation to do this:

  • 创建一个MyTransformer类,该类实现Processor并扩展SubmissionPublisher,因为它将同时充当Subscriber和Publisher:

  • Create a MyTransformer class which implements Processor and extends SubmissionPublisher, as it will act both as Subscriber and Publisher: import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> { private Function<T, R> function; private Flow.Subscription subscription; public MyTransformer(Function<T, R> function) { super(); this.function = function; } @Override public void onComplete() { System.out.println("Transformer Completed"); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(T item) { System.out.println("Transformer Got : "+item); submit(function.apply(item)); subscription.request(1); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

  • 创建一个TestSubscriber类,该类实现Subscriber接口并实现所需的方法:

  • Create an TestSubscriber class which implements the Subscriber interface and implement the required methods:

    在处理开始之前调用onSubscribe()方法.订阅的实例作为参数传递.这是一个类,用于控制订阅服务器和发布服务器之间的消息流.

    The onSubscribe() method is called before processing starts. The instance of the Subscription is passed as the argument. It is a class that is used to control the flow of messages between Subscriber and the Publisher.

    这里的主要方法是onNext()-每当发布者发布新消息时都会调用此方法.

    The main method here is onNext() – this is called whenever the Publisher publishes a new message.

    我们正在使用实现Publisher接口的SubmissionPublisher类.

    We are using the SubmissionPublisher class which implements the Publisher interface.

    我们将向发布者提交N个元素-我们的TestSubscriber将接收该元素.

    We’re going to be submitting N elements to the Publisher – which our TestSubscriber will be receiving.

    请注意,我们正在TestSubscriber实例上调用close()方法.它将在给定发布者的每个订阅者下面调用onComplete()回调.

    Note, that we’re calling the close() method on the instance of the TestSubscriber. It will invoke onComplete() callback underneath on every Subscriber of the given Publisher.

    import java.util.LinkedList; import java.util.List; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class TestSubscriber<T> implements Subscriber<T> { private Subscription subscription; public List<T> consumed = new LinkedList<>(); @Override public void onComplete() { System.out.println("Subsciber Completed"); } @Override public void onError(Throwable arg0) { arg0.printStackTrace(); } @Override public void onNext(T item) { System.out.println("In Subscriber Got : "+item); subscription.request(1); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

  • 这里是发布者正在发布String元素的处理流程.
  • MyTransformer正在将String解析为Integer,这意味着此处需要进行转换.

    MyTransformer is parsing the String as Integer – which means a conversion needs to happening here.

    import java.util.List; import java.util.concurrent.SubmissionPublisher;; public class TestTransformer { public static void main(String... args) { SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt); TestSubscriber<Integer> subscriber = new TestSubscriber<>(); List<String> items = List.of("1", "2", "3"); List<Integer> expectedResult = List.of(1, 2, 3); publisher.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); items.forEach(publisher::submit); publisher.close(); } }

    更多推荐

    订阅者可以充当发布者吗?

    本文发布于:2023-11-13 14:23:47,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1584563.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:发布者

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!