在IntegrationFlow服务激活器方法成功返回之前,不要确认RabbitMQ消息吗?

编程入门 行业动态 更新时间:2024-10-28 15:20:06
本文介绍了在IntegrationFlow服务激活器方法成功返回之前,不要确认RabbitMQ消息吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个这样定义的集成流程:

I have an integration flow defined like this:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName") .id("id") .autoStartup(autoStartup) .concurrentConsumers(2) .maxConcurrentConsumers(3) .messageConverter(messageConverter())) .aggregate(a -> ...) .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow())) .get();

serviceActivatorBean的定义如下:

@Component @Transactional public class ServiceActivator { @ServiceActivator public void myMethod(Collection<MyEvent> events) { .... } }

requestHandlerRetryAdviceForIntegrationFlow()的定义如下:

public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() { RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice(); RetryTemplate retryTemplate = new RetryTemplate(); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(MAX_VALUE); retryTemplate.setRetryPolicy(retryPolicy); retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() { @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(), context.getRetryCount(), throwable); } }}); advice.setRetryTemplate(retryTemplate); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setMaxInterval(5000L); backOffPolicy.setInitialInterval(200L); backOffPolicy.setMultiplier(2); retryTemplate.setBackOffPolicy(backOffPolicy); return advice; }

我们面临的问题是,服务激活器中的events集合包含2个或更多事件,并且由于某些原因,myMethod的处理失败并且服务器崩溃.似乎发生的事情是IntegrationFlow一次消耗并确认RabbitMQ发出的一条消息,因此,如果服务器在处理myMethod时崩溃,则除最后一个事件外的所有事件都会丢失.这对我们来说既不好也不安全.我们可以做些什么来配置IntegrationFlow在服务激活器中的myMethod成功完成之前不确认任何消息?

The problem we face is when the events collection in the service activator contains 2 or more events and for some reason the processing of myMethod fails and the server crash. What seems to happen is that the IntegrationFlow consumes and acks one message at a time from RabbitMQ, so if the server crash during the processing of myMethod all but the last event is lost. This is neither good nor safe enough for us. Is there something we can do to configure the IntegrationFlow to NOT ack any message until myMethod in the service activator has been completed successfully?

推荐答案

您可以使用确认模式手册,然后通过标题进行确认:

You can use Acknowledge Mode MANUAL and confirm via headers afterward:

docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-inbound-channel-adapter

更多推荐

在IntegrationFlow服务激活器方法成功返回之前,不要确认RabbitMQ消息吗?

本文发布于:2023-11-26 15:06:19,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1634216.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:消息   方法   IntegrationFlow   RabbitMQ

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!