我已经开始使用微服务,我需要创建一个事件发布机制.
我计划使用Amazon SQS.
这个想法很简单.我将事件与聚合存储在数据库中. 如果用户要更改电子邮件,则事件UserChangedEmail将存储在数据库中.
我还有事件处理程序,例如UserChangedEmailHandler,它将(在这种情况下)负责将此事件发布到SQS队列,以便其他服务可以知道用户已更改电子邮件.
我的问题是,要实现这一目标的做法是什么?我应该有某种后台定时过程来扫描事件表并将事件发布到SQS吗? 这可以是WebApi应用程序内的进程(首选),还是应该是单独的进程?
一种想法是使用Hangfire,但它在一分钟之内不支持cron作业.
有什么建议吗?
按照答案之一的建议,我已经查看了NServicebus. NServiceBus页面上的示例之一显示了我的关注重点.
在他们的示例中,他们创建一个已下订单的日志.如果成功提交了日志或数据库条目,但是发布中断并且事件从未发布,该怎么办?
这是事件处理程序的代码:
public class PlaceOrderHandler : IHandleMessages<PlaceOrder> { static ILog log = LogManager.GetLogger<PlaceOrderHandler>(); IBus bus; public PlaceOrderHandler(IBus bus) { this.bus = bus; } public void Handle(PlaceOrder message) { log.Info($"Order for Product:{message.Product} placed with id: {message.Id}"); log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}"); var orderPlaced = new OrderPlaced { OrderId = message.Id }; bus.Publish(orderPlaced); <!-- my concern } }解决方案
现成的建议
我建议您不要研究自己的现成产品,因为这里有很多复杂性,这些复杂性一开始就不会显现出来,例如
- 管理事件订阅者列表-SQS队列与事件使用者(而不是事件生产者)更合适地配对,因为当消息被消耗时,它不再在队列中可用-因此,如果要支持多个订阅者给定事件(这是事件驱动的体系结构的巨大好处),您如何知道在首次引发事件消息时将事件消息推送到哪个SQS队列?
- 重试语义,错误转发队列-处理临时性基础结构问题导致的临时错误与业务逻辑语义问题导致的永久性错误
- 哪些消息何时发出以及在何处发送的审计线索
- 通过SQS发送的消息的安全性(您的业务案例需要对它们进行加密吗?SQS是Amazon提供的一项应用程序服务,不提供存储级加密
- 邮件大小-SQS有邮件大小限制,因此您最终可能需要处理大型邮件的带外传输
那只是我的头顶...
一些现成的系统可以提供帮助:
- NServiceBus 提供了用于管理命令和事件消息传递的框架,并且具有允许灵活传输类型的插件框架- NServiceBus.SQS 提供SQS作为传输.
- 提供全面而灵活的重试,审核和错误处理
- 公然使用命令vs事件(命令消息说执行此操作",并发送到单个服务进行处理,事件消息说发生了什么事",并发送到任意数量的灵活订户)
- 发件箱模式即使使用非事务一致的传输(例如SQS),也可以提供事务一致的消息
- 当前,SQS插件使用默认的NServiceBus订阅者持久性,这需要SQL Server来存储事件订阅者列表(有关利用SNS的选项,请参见下文)
- 为sagas提供支持,提供了一个框架,以通过补偿动作来确保多事务最终与回滚的一致性
- 超时支持计划的邮件处理
- 商业产品,不是免费的,但是许多插件/扩展都是开源的
- 大众运输
- 不支持现成的SQS,但确实支持Azure Service Bus和RabbitMq,因此如果可以的话,可以作为您的替代选择
- 与NServiceBus相似的产品,但并非100%相同- NServiceBus与MassTransit 提供全面的比较
- 完全开源/免费
- 只是说
- 专为基于SQS/SNS设计的轻量级开源消息传递框架
- 每个事件的SNS主题,每个微服务的SQS队列,使用本机SNS SQS Queue订阅来实现扇出
- 免费开源
可能还有其他人,并且我在NServiceBus方面拥有最丰富的个人经验,但是我强烈建议您研究现成的解决方案-它们将使您腾出时间来根据业务事件开始设计系统,而不用担心事件传递的机制.
即使您确实想构建自己的学习练习,回顾一下以上工作可能也会为您提供一些可靠的事件驱动消息传递所需的技巧.
交易一致性和发件箱模式已对问题进行了编辑,以询问如果部分操作成功但发布操作失败会发生什么情况.我已经将其称为消息传递的事务一致性,它通常意味着在事务中,所有业务副作用都已落实,或者没有.业务副作用可能意味着:
- 数据库记录已更新
- 另一个数据库记录已删除
- 邮件已发布到邮件队列
- 电子邮件已发送
如果数据库操作失败,通常您不希望发送电子邮件或发布消息,同样,如果消息发布失败,您也不想执行数据库操作.
那么如何确保消息的一致性?
NServiceBus通过以下两种方式之一进行处理:
处理程序语义
关于提案的另一条评论-按照惯例,UserChangedEmailHandler通常会与响应电子邮件更改而执行某些操作的服务相关联,而不是简单地参与电子邮件更改信息的传播.当您的系统发布了50个事件时,您是否要50个不同的处理程序仅将这些消息推送到不同的队列中?
上述系统使用通用框架通过传输方式传播消息,因此您可以为预订系统保留UserChangedEmailHandler,并在其中包含用户更改电子邮件时应发生的业务逻辑.
I have started to work with micro-services and I need to create an event publishing mechanism.
I plan to use Amazon SQS.
The idea is quite simple. I store events in the database in the same transaction as aggregates. If user would change his email, event UserChangedEmail will be stored in the database.
I also have event handler, such as UserChangedEmailHandler, which will (in this case) be responsible to publish this event to SQS queue, so other services can know that user changed email.
My question is, what is the practice to achieve this? Should I have some kind of background timed process which will scan events table and publish events to SQS? Can this be process within WebApi application (preferable), or should this be a separate a process?
One of the ideas was to use Hangfire, but it does not support cron jobs under a minute.
Any suggestions?
EDIT:
As suggested in the one of the answers, I've looked in to NServicebus. One of the examples on the NServiceBus page shows core of my concern.
In their example, they create a log that order has been placed. What if log or database entry is successfully commited, but publish breaks and event never gets published?
Here's the code for the event handler:
public class PlaceOrderHandler : IHandleMessages<PlaceOrder> { static ILog log = LogManager.GetLogger<PlaceOrderHandler>(); IBus bus; public PlaceOrderHandler(IBus bus) { this.bus = bus; } public void Handle(PlaceOrder message) { log.Info($"Order for Product:{message.Product} placed with id: {message.Id}"); log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}"); var orderPlaced = new OrderPlaced { OrderId = message.Id }; bus.Publish(orderPlaced); <!-- my concern } }解决方案
Off the Shelf Suggestions
Rather than rolling your own, I recommend looking into off the shelf products, as there is a lot of complexity here that will not be apparent out the outset, e.g.
- Managing event subscriber list - an SQS queue is more appropriately paired with an event consumer, rather than with an event producer as when a message is consumed it is no longer available on the queue - so if you want to support multiple subscribers for a given event (which is a massive benefit of event driven architectures), how do you know which SQS queues you push the event message onto when it is first raised?
- Retry semantics, error forwarding queues - handling temporary errors due to ephemeral infrastructure issues vs permanent errors due to business logic semantic issues
- Audit trails of which messages were raised when and sent where
- Security of messages sent via SQS (does your business case require them to be encrypted? SQS is an application service offered by Amazon which doesn't provide storage level encryption
- Size of messages - SQS has a message size limit so you may eventually need to handle out-of-band transmission of large messages
And that's just off the top of my head...
A few off the shelf systems that would assist:
- NServiceBus provides a framework for managing command and event messaging, and it has a plugin framework permitting flexible transport types - NServiceBus.SQS offers SQS as a transport.
- Offers comprehensive and flexible retry, audit and error handling
- Opinionated use of commands vs events (command messages say "Do this" and are sent to a single service for processing, event messages say "Something happened" and are sent to an arbitrary number of flexible subscribers)
- Outbox pattern provides transactionally consistent messaging even with non-transactionally consistent transports, such as SQS
- Currently the SQS plugin uses default NServiceBus subscriber persistence, which requires an SQL Server for storing the event subscriber list (see below for an option that leverages SNS)
- Built in support for sagas, offering a framework to ensure multi transaction eventual consistency with rollback via compensating actions
- Timeouts supporting scheduled message handling
- Commercial offering, so not free, but many plugins/extensions are open source
- Mass Transit
- Doesn't support SQS off the shelf, but does support Azure Service Bus and RabbitMq, so could be an alternative for you if that is an option
- Similar offering to NServiceBus, but not 100% the same - NServiceBus vs MassTransit offers a comprehensive comparison
- Fully open source/free
- Just Saying
- A light-weight open source messaging framework designed specifically for SQS/SNS based
- SNS topic per event, SQS queue per microservice, use native SNS SQS Queue subcription to achieve fanout
- Open Source Free
There may be others, and I've most personal experience with NServiceBus, but I strongly recommend looking into the off the shelf solutions - they will free you up to start designing your system in terms of business events, rather than worrying about the mechanics of event transmission.
Even if you do want to build your own as a learning exercise, reviewing how the above work may give you some tips on what's needed for reliable event driven messaging.
Transactional Consistency and the Outbox PatternThe question has been edited to ask about the what happens if parts of the operation succeed, but the publish operation fails. I've seen this referred to as the transactional consistency of the messaging, and it generally means that within a transaction, all business side-effects are committed, or none. Business side effects may mean:
- Database record updated
- Another database record deleted
- Message published to a message queue
- Email sent
You generally don't want an email sent or a message published, if the database operation failed, and likewise, you don't want the database operation committed if the message publish failed.
So how to ensure consistency of messaging?
NServiceBus handles this in one of two ways:
Handler Semantics
One more comment about your proposal - by convention, UserChangedEmailHandler would more commonly be associated with the service that does something in response to the email being changed, rather than simply participating in the propagation of the information that the email has changed. When you have 50 events being published by your system, do you want 50 different handlers just to push those messages onto different queues?
The systems above use a generic framework to propagate messages via the transport, so you can reserve UserChangedEmailHandler for the subscribing system and include in it the business logic that should happen whenever a user changes their email.
更多推荐
ASP.NET Web Api的事件发布者
发布评论