admin管理员组

文章数量:1635850

  情景:

     项目在组内开发人员电脑上经常跑本地,activemq的队列名写在配置文件上。由于代码分支不一样,导致消息经常被不正常得消费掉。想要改进这个问题,最简单的是将注解@JMSListener 改为动态加载监听BEAN,但是大家不想为了这个事改变开发习惯,所以定为动态修改入队和监听的地址。

  开始工作:

    从JMSListener注解入手,注释中提到了JmsListenerContainerFactory和DestinationResolver。从JmsListenerContainerFactory开始查找,项目中用到的默认factory-DefaultJmsListenerContainerFactory,生产泛型为DefaultMessageListenerContainer的container,找到了最终监听内部类scheduledInvokers,但是没方法修改该类的内容。

    换个路子看DestinationResolver。找到了DynamicDestinationResolver,看到了希望,查找包内引用查到了JmsDestinationAccessor,方法是new DynamicDestinationResolver(),开始想研究这个resolver并想办法替换。找了一路十八开,原来这玩意是个池子,把初始化好的放里边,你改变这里的东西其实没啥大用处,队列已经建好了,尝试stop旧的container然后new一个新的,证明也是徒劳。期间也尝试了EmbeddedValueResolver,发现更坑,直接就是spring底层的BeanFactory。

    把org.springframework.jms.listener下面的包翻了一遍没结果,就要放弃的时候,发现annotation包里有个JmsListenerAnnotationBeanPostProcessor一下豁然开朗。我不能初始化之后改,为啥不在初始化之前改呢……继续折腾。

  赫然看见救命注释@see JmsListenerConfigurer。后面百度重启这些小白操作被大风吹跑了。

    总之最后结果就是,继承JmsListenerEndpointRegistry,重写registerListenerContainer方法,将endpoint的destincation加上自己的suffix,然后继续走父类逻辑,然后通过JmsListenerConfigurer替换JmsListenerEndpointRegistrar中的Registry。

    最后,这东西就是为了本地启动有的,就不要影响线上了,排除prd环境。

    代码贴出来:

@Slf4j
@Component
@Profile("!prd")
public class MyJmsListenerConfigurer implements JmsListenerConfigurer, ApplicationContextAware {

    private ConfigurableApplicationContext context;

    @Autowired
    private MyJmsListenerEndpointRegistry myJmsListenerEndpointRegistry;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (applicationContext instanceof ConfigurableApplicationContext) {
            this.context = (ConfigurableApplicationContext) applicationContext;
        }
    }

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        try {
            InetAddress address = InetAddress.getLocalHost();
            String localName = "-" + address.getHostName() + "@" + address.getHostAddress();
            myJmsListenerEndpointRegistry.setSuffix(localName);
            registrar.setEndpointRegistry(myJmsListenerEndpointRegistry);
        } catch (UnknownHostException e) {
            log.error("获取本机端口号异常", e);
            if (context != null) {
                context.close();
            }
        }

    }
}

 

 

@Component
@Profile("!prd")
public class MyJmsListenerEndpointRegistry extends JmsListenerEndpointRegistry {

    private String suffix;

    public void setSuffix(String suffix) {
        this.suffix = suffix;
    }

    @Override
    public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory, boolean startImmediately) {
        MethodJmsListenerEndpoint methodEndpoint = (MethodJmsListenerEndpoint) endpoint;
        methodEndpoint.setDestination(methodEndpoint.getDestination() + suffix);
        super.registerListenerContainer(endpoint, factory, startImmediately);
    }

}

 

入队的地址替换就很简单了,注入直接换就行:

 

@Component
@Order
@Profile(value = "!prd")
public class LocalQueueBeanNameChanger implements CommandLineRunner {

    @Autowired
    private List<Queue> queueList;


    @Override
    public void run(String... args) throws Exception {
        InetAddress address = InetAddress.getLocalHost();
        String localName = "-" + address.getHostName() + "@" + address.getHostAddress();
        for (Queue queue : queueList) {
            if (queue instanceof ActiveMQQueue) {
                ActiveMQQueue activeMQQueue = (ActiveMQQueue) queue;
                activeMQQueue.setPhysicalName(activeMQQueue.getPhysicalName() + localName);
            }
        }
    }
}

本文标签: 自定义SpringBootdestinationJmsListener