路由)工作模式"/>
RabbitMQ Routing(路由)工作模式
RabbitMQ Routing(路由)工作模式
路由的这种模式和发布订阅的模式相比多了一个路由的环节。要求队列在绑定在绑定到交换机的时候指定到对应的路由。路由的作用是什么呢?作为一种分发的规则。可以按照相应的条件指定和分发。
官方还是很形象的给出了我们的模型图。
这里对日志的级别采用了不同的处理方式。分别是error,info,warninng。其实日志移动有五种这里给出了三种日志举例的处理方式。
DEBUG:详细的信息,通常只出现在诊断问题上
INFO:确认一切按预期运行
WARNING:一个迹象表明,一些意想不到的事情发生了,或表明一些问题在不久的将来(例如。磁盘空间低”)。这个软件还能按预期工作。
ERROR:更严重的问题,软件没能执行一些功能
CRITICAL:一个严重的错误,这表明程序本身可能无法继续运行
运行出现的日志info,error,warning会路由到一个队列,然后单独的error也会路由到一个队列。
其实还是比较简单的,我们来看具体的代码实现。
那我们还需要一个生产者首先,来看代码。
package com.jgdabc.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//完成发送消息
public class Producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {// 1 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2 设置连接参数connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhostconnectionFactory.setPort(5672); //消息端口 默认5672connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/connectionFactory.setUsername("jgdabc");//用户名 默认guestconnectionFactory.setPassword("123456");//密码 默认guest// 3创建连接connectionConnection connection = connectionFactory.newConnection();
// 4 创建ChannelChannel channel = connection.createChannel();
// 5:创建交换机
// exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
// 1:exchange : 交换机名称
// 2: type:交换机的类型 BuiltinExchangeType代表枚举类型
// Direct("direct"):定向
// FANOUT("fanout") : 扇形广播,发送消息到每一个与之绑定的队列
// TOPIC("topic") 通配符的方式
// HEADERS("headers")
// 3:durable是否持久化
// 4:autoDelete:是否自动删除
// 5:internal 内部使用
// 6Larguments:参数列表String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 6:创建队列String queueName = "test_direct_queue";String queueName01 = "test_direct_queue01";channel.queueDeclare(queueName,true,false,false,null);channel.queueDeclare(queueName01,true,false,false,null);// queueBind(String queue, String exchange, String routingKey)
// 7 绑定队列和交换机
// queue:绑定的队列名称
// exchange:交换机名称
// routingKey:路由键,绑定规则
// 如果倦鸟还击的类型为fanout,routingKey设置为“”channel.queueBind(queueName,exchangeName,"error");channel.queueBind(queueName,exchangeName,"info");channel.queueBind(queueName,exchangeName,"warning");channel.queueBind(queueName01,exchangeName,"error");
// 8:发送消息String body = "日志信息。张三调用了findAll方法,日志级别info....";channel.basicPublish(exchangeName,"info",null,body.getBytes());// 9:释放资源channel.close();connection.close();}
}
说明一次处理逻辑,我们这回的交换机需要指定为DIRECT,也就是一种定向的模式,很明显我们是定向分配的,如果你要是广播的模式的话,那么路由就不能做成定向的。
这是一点。
抽出来说明一些,下面这段是创建交换机以及创建出队列
String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 6:创建队列String queueName = "test_direct_queue";String queueName01 = "test_direct_queue01";channel.queueDeclare(queueName,true,false,false,null);channel.queueDeclare(queueName01,true,false,false,null);
下面这段是进行一个路由的绑定
channel.queueBind(queueName,exchangeName,"error");channel.queueBind(queueName,exchangeName,"info");channel.queueBind(queueName,exchangeName,"warning");channel.queueBind(queueName01,exchangeName,"error");
下面是我们发送消息,body是要发送的消息。这里指定了交换机和路由,这是一个info级别的日志,所以不会发送到单独绑定error的队列。当然你在这里可以更改。只是举个例子。
String body = "日志信息。张三调用了findAll方法,日志级别info....";channel.basicPublish(exchangeName,"info",null,body.getBytes());
然后呢,然后来看消费者的代码,我们只说明一个,另一个是一样的道理。
消费者一:
package com.jgdabc.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing {public static void main(String[] args) throws IOException, TimeoutException {// 1 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2 设置连接参数connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhostconnectionFactory.setPort(5672); //消息端口 默认5672connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/connectionFactory.setUsername("jgdabc");//用户名 默认guestconnectionFactory.setPassword("123456");//密码 默认guest// 3创建连接connectionConnection connection = connectionFactory.newConnection();
// 4 创建ChannelChannel channel = connection.createChannel();String queueName = "test_direct_queue";String queueName01 = "test_direct_queue01";channel.queueDeclare(queueName,true,false,false,null);channel.queueDeclare(queueName01,true,false,false,null);// 发送消息//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台。。。");}};channel.basicConsume(queueName,true,consumer);
//}
}
消费者二:
package com.jgdabc.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {// 1 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2 设置连接参数connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhostconnectionFactory.setPort(5672); //消息端口 默认5672connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/connectionFactory.setUsername("jgdabc");//用户名 默认guestconnectionFactory.setPassword("123456");//密码 默认guest// 3创建连接connectionConnection connection = connectionFactory.newConnection();
// 4 创建ChannelChannel channel = connection.createChannel();String queueName = "test_direct_queue";String queueName01 = "test_direct_queue01";// 发送消息//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除channel.queueDeclare(queueName,true,false,false,null);channel.queueDeclare(queueName01,true,false,false,null);
// 接收消息
// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);System.out.println("body:"+new String(body));System.out.println("将信息存储到数据库。。。");}};channel.basicConsume(queueName01,true,consumer);
//}
}
取来一个说明。这里面有一些多余的代码啊!不知道你有没有发现。
生产者做过的事情消费者没必要做了。队列都创建过了。这里这段在消费者里的代码可以删除掉,不过并不影响,
channel.queueDeclare(queueName,true,false,false,null);channel.queueDeclare(queueName01,true,false,false,null);
这里是一个处理逻辑
这里代表监听的队列。
channel.basicConsume(queueName,true,consumer);
测试
你看这里只有一个 消费者获取到了消息
获取到消息的做出响应的逻辑处理,没获取到消息的则不会进行处理。
这就是路由。
更多推荐
RabbitMQ Routing(路由)工作模式
发布评论