我有一个这样定义的集成流程:
I have an integration flow defined like this:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName") .id("id") .autoStartup(autoStartup) .concurrentConsumers(2) .maxConcurrentConsumers(3) .messageConverter(messageConverter())) .aggregate(a -> ...) .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow())) .get();serviceActivatorBean的定义如下:
@Component @Transactional public class ServiceActivator { @ServiceActivator public void myMethod(Collection<MyEvent> events) { .... } }requestHandlerRetryAdviceForIntegrationFlow()的定义如下:
public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() { RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice(); RetryTemplate retryTemplate = new RetryTemplate(); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(MAX_VALUE); retryTemplate.setRetryPolicy(retryPolicy); retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() { @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(), context.getRetryCount(), throwable); } }}); advice.setRetryTemplate(retryTemplate); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setMaxInterval(5000L); backOffPolicy.setInitialInterval(200L); backOffPolicy.setMultiplier(2); retryTemplate.setBackOffPolicy(backOffPolicy); return advice; }我们面临的问题是,服务激活器中的events集合包含2个或更多事件,并且由于某些原因,myMethod的处理失败并且服务器崩溃.似乎发生的事情是IntegrationFlow一次消耗并确认RabbitMQ发出的一条消息,因此,如果服务器在处理myMethod时崩溃,则除最后一个事件外的所有事件都会丢失.这对我们来说既不好也不安全.我们可以做些什么来配置IntegrationFlow在服务激活器中的myMethod成功完成之前不确认任何消息?
The problem we face is when the events collection in the service activator contains 2 or more events and for some reason the processing of myMethod fails and the server crash. What seems to happen is that the IntegrationFlow consumes and acks one message at a time from RabbitMQ, so if the server crash during the processing of myMethod all but the last event is lost. This is neither good nor safe enough for us. Is there something we can do to configure the IntegrationFlow to NOT ack any message until myMethod in the service activator has been completed successfully?
推荐答案您可以使用确认模式手册,然后通过标题进行确认:
You can use Acknowledge Mode MANUAL and confirm via headers afterward:
docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-inbound-channel-adapter
更多推荐
在IntegrationFlow服务激活器方法成功返回之前,不要确认RabbitMQ消息吗?
发布评论