我正在与我无法控制的Java库中的数据发布者合作。发布者库使用典型的回调设置。库代码中的某个位置(库是Java,但是我会在scala中描述简洁性):
I am working with a data publisher from a java library that I do not control. The publisher library uses a typical callback setup; somewhere in the library code (the library is java but I will describe in scala for terseness):
type DataType = ??? trait DataConsumer { def onData(data : DataType) : Unit }该库的用户需要编写一个实现 onData 方法的类,并将该类传递给 DataProducer ,库代码如下所示:
The user of the library is required to write a class that implements the onData method and pass that into a DataProducer, the library code looks something like:
class DataProducer(consumer : DataConsumer) {...}DataProducer 有自己的内部线程I每当有另一个要使用的 DataType 对象时,都无法控制并伴随数据缓冲区,即调用 onData 。
The DataProducer has its own internal thread I cannot control, and accompanying data buffer, that is calling onData whenever there is another DataType object to consume.
所以,我的问题是:如何编写一个将原始库模式转换/转换为akka流的层源对象?
So, my question is: how do I write a layer that will convert/translate the original library pattern into an akka stream Source object?
谢谢。
推荐答案有多种解决方法。一种是使用ActorPublisher: doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors ,您可以在其中更改回调以便向演员发送消息。根据回调的工作方式,您也许也可以使用mapAsync(将回调转换为Future)。只有在一个请求产生一个回调调用的情况下,这种方法才有效。
There are various ways this can be solved. One is to use an ActorPublisher: doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.
更多推荐
将回调方法实现转换为akka流Source
发布评论