Akka持久性查询事件流和CQRS

编程入门 行业动态 更新时间:2024-10-09 01:22:16
本文介绍了Akka持久性查询事件流和CQRS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试在我的ES-CQRS体系结构中实现读取端。假设我有一个这样的持久性参与者:

I'm trying to implement read side in my ES-CQRS architecture. Let's say I have a persistent actor like this:

object UserWrite { sealed trait UserEvent sealed trait State case object Uninitialized extends State case class User(username: String, password: String) extends State case class AddUser(user: User) case class UserAdded(user: User) extends UserEvent case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed]) case class UsersStream(fromSeqNo: Long) case object GetCurrentUser def props = Props(new UserWrite) } class UserWrite extends PersistentActor { import UserWrite._ private var currentUser: State = Uninitialized override def persistenceId: String = "user-write" override def receiveRecover: Receive = { case UserAdded(user) => currentUser = user } override def receiveCommand: Receive = { case AddUser(user: User) => persist(UserAdded(user)) { case UserAdded(`user`) => currentUser = user } case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) case GetCurrentUser => sender() ! currentUser } def publishUserEvents(fromSeqNo: Long) = { val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) val userEvents = readJournal .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue) .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } sender() ! UserEvents(userEvents) } }

据我所知,每个事件持续存在的时间,我们可以通过 Akka Persistence Query 发布它。现在,我不确定订阅这些事件的正确方法是什么,以便可以将其持久保存在我的读取数据库中?一种想法是首先从我的阅读侧actor向 UserWrite actor和接收器事件发送 UsersStream 消息

As far as I understand, each time when event gets persisted, we can publish it via Akka Persistence Query. Now, I'm not sure what would be a proper way to subscribe on these events so I can persist it in my read side database? One of the ideas is to initially send a UsersStream message from my read side actor to UserWrite actor and "sink" events in that read actor.

EDIT

在@cmbaxter的建议下,我这样实现了读取方:

Following suggestion of @cmbaxter, I implemented read side this way:

object UserRead { case object GetUsers case class GetUserByUsername(username: String) case class LastProcessedEventOffset(seqNo: Long) case object StreamCompleted def props = Props(new UserRead) } class UserRead extends PersistentActor { import UserRead._ var inMemoryUsers = Set.empty[User] var offset = 0L override val persistenceId: String = "user-read" override def receiveRecover: Receive = { // Recovery from snapshot will always give us last sequence number case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo case RecoveryCompleted => recoveryCompleted() } // After recovery is being completed, events will be projected to UserRead actor def recoveryCompleted(): Unit = { implicit val materializer = ActorMaterializer() PersistenceQuery(context.system) .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue) .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } .runWith(Sink.actorRef(self, StreamCompleted)) } override def receiveCommand: Receive = { case GetUsers => sender() ! inMemoryUsers case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) // Match projected event and update offset case (seqNo: Long, UserAdded(user)) => saveSnapshot(LastProcessedEventOffset(seqNo)) inMemoryUsers += user } }

有一些类似的问题:事件流似乎很慢。即 UserRead actor可以在保存新添加的用户之前用一组用户回答。

There are some issues like: Event stream seems to be slow. I.e. UserRead actor can answer with set of users before the newly added user is being saved.

编辑2

我增加了cassandra查询日志的刷新间隔,这更不会解决慢事件流带来的问题。默认情况下,似乎Cassandra事件日志是每3秒进行一次轮询。在我的 application.conf 中,我添加了:

I increased refresh interval of cassandra query journal which more less solved issue with slow event stream. It appears that Cassandra event journal is by default, being polled each 3 seconds. In my application.conf I added:

cassandra-query-journal { refresh-interval = 20ms }

编辑3

实际上,请勿减少刷新间隔。这会增加内存使用量,但这并不危险,一点也不重要。 CQRS的一般概念是写入和读取端是异步的。因此,写入后,数据将永远无法立即读取。处理UI?我只是打开流并在读取端确认后通过服务器发送的事件推送数据。

Actually, do not decrease refresh interval. That will increase memory usage but that's not dangerous, neither a point. In general concept of CQRS is that write and read side are async. Therefore, after you write data will never be available immediately for reading. Dealing with UI? I just open the stream and push data via server sent events after the read side acknowledges them.

推荐答案

有一些方法可以执行这个。例如,在我的应用程序中,我在查询端有一个参与者,该参与者具有一个PersistenceQuery,它一直在寻找更改,但是您也可以使用相同的查询来创建一个线程。事情是保持流打开,以便能够在发生持久事件时立即读取

There are some ways to do this. For example, in my app i have an actor in my query side that have a PersistenceQuery that is consistently looking for changes, but you can have a thread with the same query too. The thing is to maintain the stream open to be able to read the persisted event as soon as it happens

val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal]( CassandraReadJournal.Identifier) // issue query to journal val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue) // materialize stream, consuming events implicit val mat = ActorMaterializer() source.map(_.event).runForeach{ case userEvent: UserEvent => { doSomething(userEvent) } }

为此,您可以使用一个引发PersistenceQuery并存储新事件的计时器,但是我认为打开流是最好的方法

Instead of this, you can have a timer that raises a PersistenceQuery and stores new events, but i think that having a stream open is the best way

更多推荐

Akka持久性查询事件流和CQRS

本文发布于:2023-11-25 06:11:01,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1628624.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:持久性   事件   Akka   CQRS

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!