将回调转换为被动发布者(Flux)

编程入门 行业动态 更新时间:2024-10-18 16:54:42
本文介绍了将回调转换为被动发布者(Flux)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在使用第三方库注册MessageListener,当某些事件发生时,它们调用已注册的监听器onMessage方法

public interface MessageListener { // third party code, it auto-scans for all MessageListeners and registers them void onMessage(Message message); } public class SimpleMessageListener implements MessageListener { public void onMessage(Message message) { //do something non blocking //is it possible to 'transmit' to messagePublisher } public Flux<Message> messagePublisher() { // a method to which to subscribeOn } }

所以我的问题是,将其转化为流量的最佳方法是什么

最后,我希望能够做这样的事情

messagePublisher().subscribe(System.out::println); 编辑* 我的第一次尝试是这样的

private List<FluxSink<Message>> handlers = new ArrayList<>(); public void onMessage(Message message) { handlers.forEach(han -> han.next(message)); } public Flux<Message> messagePublisher() { return Flux.create(sink -> { handlers.add(sink); sink.onDispose(() -> handlers.remove(sink)); }); }

有效-但我觉得这不是一个很好的解决方案,让类实现FlosSink并手动处理是不是更好--目前我预计不会有很多订阅者。 但是有许多MessageListeners(每种类型一个)

推荐答案

您可以创建单个Flux实例来桥接MessageListener观察到的消息,例如

public class SimpleMessageListener implements MessageListener { private FluxSink<Message> handler; private Flux<Message> flux; public SimpleMessageListener() { flux = Flux.create(emitter -> { handler = emitter; }, OverflowStrategy.DROP); // or some other overflow strategy } public void onMessage(Message message) { if (handler != null) { /* * null check is required to avoid NPE if a message is received * before any subscription occurs since handler is instantiated * lazily when the first subscription is requested */ handler.next(message); } } public Flux<Message> messagePublisher() { return flux; } }

现在所有监听器都可以使用Flux‘publish()方法及其返回的ConnectableFlux订阅相同的messsagePublisher()Flux实例:

// fetch message publisher Flux<Message> messagePublisher = messageListener.messagePublisher(); // prepare ConenctableFlux ConnectableFlux<Message> connectableFlux = messagePublisher().publish(); // register subscribers connectableFlux.subscribe(/* aConsumer */); connectableFlux.subscribe(/* aCoreSubscriber */); connectableFlux.subscribe(/* aSubscriber */); // connect the ConnectableFlux to messagePublisher connectableFlux.connect();

更多推荐

将回调转换为被动发布者(Flux)

本文发布于:2023-11-25 15:22:09,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1630279.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:转换为   回调   发布者   Flux

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!