admin管理员组

文章数量:1635678

Composite Destinations

在1.1版本中 ActiveMQ就支持composite destinations,这允许一个虚拟的destination代表一组真实的JMS destination。

例如你可以使用composite destinations 只需一个发送composite destinations操作就可以将一条消息发送到12个物理队列中。

或者发送到一个queue和一个topic中。

多个物理队列间只需一个简单的“,”分隔符

例如如下queue实例代表三个不同的物理队列:

//send to 3 queues as one logical operation

Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");

producer.send(queue, someMessage);

如果你希望混合匹配可以使用前缀 queue://或者topic:// 去区分不同类型的destination。例如:

// send to queues and topic one logical operation

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

producer.send(queue, someMessage);

也可以通过JNDI(activemq.xml)进行配置。(JNDI:将Java对象以某个名称的形式绑定(binding)到一个容器环境).

<destinationInterceptors>

       <virtualDestinationInterceptor>

              <virtualDestinations>

                     <compositeQueue name="MY.QUEUE">

                            <forwardTo>

                                   <queue physicalName="my-queue" />

                                   <queue physicalName="my-queue2" />

                            </forwardTo>

                     </compositeQueue>

              </virtualDestinations>

       </virtualDestinationInterceptor>

</destinationInterceptors>

Configure Startup Destinations

4.1 Feature。在ActiveMQ中我们通常按需创建destinations,有时候用户需要能显示的在xml文件配置ActiveMQ启动时候能使用的destinations。例如:

<broker xmlns="http://activemq.apache/schema/core">

       <destinations>

              <queue physicalName="FOO.BAR" />

              <queue physicalName="SOME.TOPIC" />

       </destinations>

</broker>

 

Delete Inactive Destinations

Inactive Destination:在配置的时间期间消息不再追加到destination并且没有消费者链接。

默认配置broker不检查Inactive Destination,

<broker xmlns="http://activemq.apache/schema/core" schedulePeriodForDestinationPurge="10000">

 

  <destinationPolicy>

     <policyMap>

        <policyEntries>

           <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>

        </policyEntries>

     </policyMap>

  </destinationPolicy>

       

</broker>

默认配置如下:

schedulePeriodForDestinationPurge="0" and gcInactiveDestinations="false"

schedulePeriodForDestinationPurge:设置多长时间检查一次

gcInactiveDestinations:设置删除掉不活动的队列,默认为false

inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒。

当一个destination被移除时,broker将打印如下日志:

INFO  Queue                          - TEST.QUEUE Inactive for longer than 30000 ms - removing ...

 

Destination Options

Destination Options 它不是JMS规范的一部分,是ActiveMQ针对JMS consumer 配置扩展。这些配置选项被作为URL查询语法

添加到创建的destination。如:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

 

Mirrored Queues

Queues 提供了一个优秀可靠高性能的负载均衡机制,activeMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。为了便于监听队列,我们提供了Mirrored Queues。

举个例子:如果我们有很多生产者将消息发送给队列Foo.Bar 消费者们从Foo.Bar消费,我们想要监视或者查看其中信息流。

就可以启动Mirrored Queues.Broker会把发送到某个queue的所有消息转发到名为VirtualTopic.Mirror.Foo.Bar的Topic。那么许多消费者可以订阅这个Topics来监控数据。如果你想实现负载均衡你可以选择配置多个消费者来订阅。

 Mirrored Queues 和Virtual Destinations对于监控事物流非常有用,例如 Business Activity Monitoring (BAM).

默认Mirrored Queues是不启用的,因为启用它会为每个队列都创建一个virtual topic。启用方法:

首先要将BrokerService的useMirrored

Queues属性设置为true:

<broker xmlns="http://activemq.apache/schema/core"use

MirroredQueue="true">

</broker>

然后可以通过destinationInterceptors设置其属性,如mirrortopic的前缀,缺省是VritualTopic.Mirror.

<destinationInterceptors>

    <mirroredQueue copyMessage = "true" postfix=".qmirror" prefix=""/>

</destinationInterceptors>

Per Destination Policies

Multiple different policies可以应用在每一个destination(queue or topic),可以使用通配符匹配queues or topics。

详情http://activemq.apache/per-destination-policies.html。

Virtual Destinations

Virtual Destinations 允许我们可以创建对应多个物理地址的逻辑地址。用来生产和消费消息。消息配置变得灵活、松耦合。

ActiveMQ支持的虚拟Destinations分为有两种,分别是:

  • Virtual Topics

  • Composite Destinations

Virtual Topics

The limitations of JMS durable topics:

发布订阅(队列实现)允许生产者和消费者解耦,生产者不比关心有需要发布给多少消费者。JMS规范定义了持久主题订阅,每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题:

1. 同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。queue模式可以解决这个问题,broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。

2. 同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。

JMS Queue中已经用可靠的方式实现了跨多个消费者负载均衡的功能,允许多线程、多进程、多节点处理消息。我们还有一个更复杂的的有粘性的负载均衡技术Message groups 在保证消息顺序前提下去实现负载均衡和并行化工作。

另一方面让每一个逻辑主题订阅者都有自己的物理队列的好处是我们可以通过JMX监控队列的深度来监控系统性能同时浏览这些物理队列。

Virtual Topics to the rescue

virtual topics 实现的方法是 producers 还是按照JMS Topic规范发送消息,消费者继续使用JMS topic语法,只不过消费者从queue来消费topic,可以定义多个消费者消费queue(我们知道queue可以实现负载均衡的),从而实现多节点、多线程的负载均衡。

E.g:

我们有个topic VirtualTopic.Orders(前缀是VirtualTopic. 表明他是一个virtual topic) ,我们希望将消息发送给系统A和系统B

那么系统A的队列名称需要定义为Consumer.A.VirtualTopic.Orders,系统B的队列名称需要定义为Consumer.A.VirtualTopic.Orders,我们可以为为每个系统提供一组用户,消息在组用户只能被精确地处理一次。

Customizing the out-of-the-box defaults

virtual topics 的命名默认是 VirtualTopic.> namespace  消费者队列命名Consumer.*.VirtualTopic.>. 你可以通过配置修改命名约定。如下示例使用‘>’ 表明适用所有topics 前缀改为VirtualTopicConsumers。selectorAware属性则表明如果consumer端有selector,则只有匹配selector的消息才会分派到对应的queue中去。

<destinationInterceptors> 
 <virtualDestinationInterceptor> 
 <virtualDestinations> 
 <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>    
   </virtualDestinations>
 </virtualDestinationInterceptor> 
</destinationInterceptors>

Composite Destinations

相对于client side(如上Composite Destinations) broker side 更能区分destinations的映射关系。

Using filtered destinations

ver 4.2 开始可以对virtual destination定义selector. eg:

selectors 匹配一个消息是否会发送给virtual destination MY.QUEUE 

<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> 
   <compositeQueue name="MY.QUEUE">
    <forwardTo>
     <filteredDestination selector="odd = 'yes'" queue="FOO"/>
     <filteredDestination selector="i = 5" topic="BAR"/>
    </forwardTo>
  </compositeQueue>
</virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>

Avoiding Duplicate Message in a Network of Brokers

 

只连接consumer queues 或者virtual topics,不要俩者都链接。通常queue和topic分别创建链接

白话翻译:如果你只想从一个consumer queue获取消息 但是不小心订阅了virtual topics下的queue(queue name符合命名规范了)那么消息就会冲突。如下设置:一个链接下排除 virtual topic下的 consumer queues

<networkConnectors> <networkConnector uri="static://(tcp://localhost:61617)">
 <excludedDestinations> 
 <queue physicalName="Consumer.*.VirtualTopic.>"/> 
 </excludedDestinations> 
</networkConnector> </networkConnectors>
 


 

 

 

本文标签: ActiveMqdestination