模式"/>
rabbitmq 直连模式
生产者:直连模式
package com.huixiang.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Productor {public static void main(String[] args) {//所有中间件都是基于tcp/ip协议//创建连接ConnectionFactory connectionFactory = new ConnectionFactory();//设置ip,port,账号,密码,虚拟访问节点connectionFactory.setHost("");connectionFactory.setPort(5672);connectionFactory.setUsername("");connectionFactory.setPassword("");// 虚拟主机名connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;//获取创建连接获取通道try {connection = connectionFactory.newConnection("生产者");channel = connection.createChannel();//创建交换机,队列,绑定关系,路由key,发送消息接受消息//声明队列String queueName = "queue1";/*** @params1 队列名称* @params2 是否要持久化,durable=false ,是否存盘,不持久化也会存盘,随着服务器重启然后丢失* @params3 排他性,是否独占独立的队列* @params4 是否自动化删除,随着最后一个消费者消费完毕后,是否删除队列* @params5 携带附属参数*/channel.queueDeclare(queueName,false,false,false,null);//准备消息内容String info = "nihao !";//发送消息到队列//交换机,队列名或路由key,消息状态控制(是否持久化),信息//交换机默认,不可能存在没有交换机的对列channel.basicPublish("",queueName,null,info.getBytes());} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}finally {//关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}//关闭连接try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
消费者:直连
package com.huixiang.rabbitmq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) {//所有中间件都是基于tcp/ip协议//创建连接ConnectionFactory connectionFactory = new ConnectionFactory();//设置ip,port,账号,密码,虚拟访问节点connectionFactory.setHost("");connectionFactory.setPort(5672);connectionFactory.setUsername("");connectionFactory.setPassword("");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;//获取创建连接获取通道try {connection = connectionFactory.newConnection("生产者");channel = connection.createChannel();/*** 通道绑定消息队列* @params1 队列名称 对列不存在就自动创建* @params2 是否要持久化,durable=false ,是否存盘,不持久化也会存盘,随着服务器重启然后丢失* @params3 排他性,是否独占独立的队列* @params4 是否自动化删除,随着最后一个消费者消费完毕后,是否删除队列* @params5 携带附属参数*/channel.queueDeclare("queue1",false,false,false,null);//接受消息/*** 通道绑定消息队列* @params1 队列名称* @params2 开启消费确认机制* @params3 消费时的回调接口* @params4 是否自动化删除,随着最后一个消费者消费完毕后,是否删除队列*/channel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("收到消息" + new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("失败");}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}finally {// 关闭通道(不建议关闭)try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}//关闭连接try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
代码冗余,提取工具类
package com.huixiang.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitmqUtils {// 工厂是重量级资源 最好初始化就加载 加载一次private static ConnectionFactory connectionFactory;static {connectionFactory = new ConnectionFactory();//设置ip,port,账号,密码,虚拟访问节点connectionFactory.setHost("");connectionFactory.setPort(5672);connectionFactory.setUsername("");connectionFactory.setPassword("");// 虚拟主机名connectionFactory.setVirtualHost("/");}//定义提供连接对象的方法public static Connection setConnection(){try {//返回连接return connectionFactory.newConnection();} catch (TimeoutException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}return null;}//关闭连接public static void closeConnectionChanel(Channel channel,Connection connection) {try {if (channel != null) channel.close();if (connection != null) connection.close();}catch (Exception e){e.printStackTrace();}}
}
// 生产者简化 消费者同理
// 化简public static void main(String[] args) {Connection connection = null;Channel channel = null;//获取创建连接获取通道try {connection = RabbitmqUtils.setConnection();// 创建通道channel = connection.createChannel();//创建交换机,队列,绑定关系,路由key,发送消息接受消息//声明队列String queueName = "queue1";/*** 通道绑定消息队列* @params1 队列名称 对列不存在就自动创建* @params2 是否要持久化,durable=false ,是否存盘,不持久化也会存盘,随着服务器重启然后丢失* @params3 排他性,是否独占独立的队列* @params4 是否自动化删除,随着最后一个消费者消费完毕后,是否删除队列* @params5 携带附属参数*/channel.queueDeclare(queueName,false,false,false,null);//准备消息内容String info = "nihao !";//发送消息到队列//交换机,队列名或路由key,消息状态控制(是否持久化),信息//交换机默认,不可能存在没有交换机的对列channel.basicPublish("",queueName,null,info.getBytes());} catch (IOException e) {e.printStackTrace();} finally {RabbitmqUtils.closeConnectionChanel(channel,connection);}}
更多推荐
rabbitmq 直连模式
发布评论