admin管理员组文章数量:1568431
环境搭建:
Spring整合
阿里云RocketMQ原生API是通过ONSFactory工厂来创建消费者和生产者,创建之前需要通过Properties来指定一些配置信息
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
properties.setProperty(PropertyKeyConst.GROUP_ID, this.groupId);
// 消息发送超时时间,如果服务端在配置的对应时间内未ACK,则发送客户端认为该消息发送失败。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 消费线程数
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "1");
// 消息消费失败时的最大重试次数。
properties.put(PropertyKeyConst.MaxReconsumeTimes,"16");
// 消息消费失败进行重试前的等待时间,单位(毫秒),取值范围: 10毫秒~30,000毫秒。
properties.put(PropertyKeyConst.SuspendTimeMillis,"1000");
// 每次批量消费的最大消息数量, 默认值为1, 允许自定义范围为[1, 32]
properties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize,"1");
// 消费模式 默认集群模式
properties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Producer producer = ONSFactory.createProducer(properties);
Consumer consumer = ONSFactory.createConsumer(properties);
阿里云RocketMQ,提供了一些Bean用于集成至Spring容器中:(ProducerBean、ConsumerBean),(OrderProducerBean、OrderConsumerBean),TransactionProducerBean,我们只需配置初始化方法以及销毁方法,就能跟随Spring生命周期启动和销毁生产者和消费者
<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"
init-method="start" destroy-method="shutdown">
<!--生产者配置信息-->
<property name="properties">
<!--PropertyKeyConst 中定义了常量 -->
<props>
<prop key="GROUP_ID">${ali.mq.groupId}</prop>
<prop key="AccessKey">${ali.mq.accessKey}</prop>
<prop key="SecretKey">${ali.mq.secretKey}</prop>
<prop key="NAMESRV_ADDR">${ali.mq.namesrvAddr}</prop>
</props>
</property>
</bean>
<bean id="commonMsgListener" class="com.hbger.hbger.listener.CommonMsgListener"/>
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean"
init-method="start" destroy-method="shutdown">
<property name="properties">
<props>
<prop key="GROUP_ID">${ali.mq.groupId}</prop>
<prop key="AccessKey">${ali.mq.accessKey}</prop>
<prop key="SecretKey">${ali.mq.secretKey}</prop>
<prop key="NAMESRV_ADDR">${ali.mq.namesrvAddr}</prop>
</props>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="commonMsgListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<property name="topic" value="${ali.mq.topic}"/>
<property name="expression" value="*"/>
</bean>
</key>
</entry>
<!--如果有更多的订阅 添加entry节点即可-->
</map>
</property>
</bean>
消息存储机制:
市面上消息中间件的存储一般分为两种:
- 关系型数据库 (ActiveMQ)
在消息数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB - 文件系统(RocketMQ/Kafka/RabbitMQ)
采用的是将消息刷盘至所部署服务器的文件系统来做持久化
RocketMQ主要运用了磁盘的顺序写这种机制。
目前对于高性能磁盘,顺序写速度一般可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍。
这块儿要注意下,RocketMQ的性能要是和内存比,是没办法比的,一般会和网卡去比,因为mq生产消息,消费消息,都要经过网卡发送数据,只要硬盘的读写速度和网卡差不多的话,基本上就够用了,RocketMQ就是采用了顺序写,保证了消息存储的一个高性能
然后我们看下它的一个存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
- CommitLog: 该文件默认最大为1GB,超过1GB后会写到下一个CommitLog文件,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824字节;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
- ConsumeQueue :消息消费队列,引入的目的主要是提高消息消费的性能,RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行,如果要遍历commitlog文件根据topic检索消息是非常低效。
消费者即可根据ConsumeQueue里的的元数据来查找待消费的消息。
其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引保存了:- 指定Topic下的队列消息在CommitLog中的起始物理偏移量offset
- 消息大小size
- 消息Tag的HashCode值。
consumequeue文件采取定长设计,每个条目共20个字节,分别为:
- 8字节的commitlog物理偏移量
- 4字节的消息长度
- 8字节tag hashcode
服务器中存储目录结构:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
普通消息
消息发送:
阿里云RocketMQ与原生版RocketMQ一样提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。
同步发送:生产者发送一条消息后,会在收到服务端ack响应之后才发下一条消息,
异步发送:生产者发出一条消息后,不等服务端返回响应,接着发送下一条消息,需要实现异步发送回调接口(SendCallback)
Oneway发送:oneway形式只发送请求不等待应答,只是将消息数据写入客户端的socket缓冲区 ,这种方式比较适合对耗时有要求,对可靠性要求并不高的场景
//同步发送
@Test
public void syncSend() {
//循环发送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message(mqConfig.getTopic(),
mqConfig.getTag(), ("Sync message" + i).getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
msg.setKey("sync_key_" + i );
try {
//同步发送
SendResult sendResult = producer.send(msg);
// 消息发送成功。
Assert.isTrue(sendResult != null,"发送结果为空");
System.out.println(sendResult);
} catch (ONSClientException e) {
System.out.println("发送失败");
}
}
}
//异步发送
@Test
public void asyncSend() {
//循环发送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message(mqConfig.getTopic(),
mqConfig.getTag(), ("async message" + i).getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
msg.setKey("async_key_" + i );
try {
//异步同步发送
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
Assert.isTrue(sendResult != null,"发送结果为空");
System.out.println(sendResult);
}
@Override
public void onException(OnExceptionContext context) {
System.out.println("发送失败");
}
});
} catch (ONSClientException e) {
System.out.println("发送失败");
}
}
}
//单向发送
@Test
public void oneWaySend() {
//循环发送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message(mqConfig.getTopic(),
mqConfig.getTag(), ("oneway message" + i).getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
msg.setKey("oneway_key_" + i );
try {
//单向发送
producer.sendOneway(msg);
System.out.println("发送成功");
} catch (ONSClientException e) {
System.out.println("发送失败");
}
}
}
订阅消息:
- 集群模式
同一个消费组下的所有Consumer实例会平均分摊消费消息,每一条消息都只会被分发到一台机器上处理,如果消息消费失败,集群消费模式下,不保证每一次失败重投的消息会路由到同一台机器上
// 不设置的情况下,默认为集群模式。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
- 广播模式
同一个消费组下的所有Consumer实例都会消费某条消息。广播消费模式下不支持顺序消息
// 广播订阅方式设置。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
同一个消费组下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失
顺序消息:
顺序消息分为全局顺序消息和部分顺序消息:
- 全局顺序: 某个Topic下的所有消息都要保证顺序;
- 分区顺序: 同一个分区内的消息按照严格的FIFO顺序进行发布和消费
原生mq:
rocketmq在默认情况下是不保证消息顺序,需要对rocket mq做一些设置才能保证
- 如果需要保证全局有序,需要保证生产者只有一个,消费者只有一个,topic中的队列只有一个,也就是说发送方,接收方,以及中间件都设置成单线程来处理
- 要保证部分有序,对于生产者需要保证同一订单的消息发送到同一个messagequeue 里,这块在发送的时候可以通过代码选择,同时消费者需要设置成 messageListenerOrderly 有序消费模式,每次消费者从队列中获取消息的 时候,只获取一条,同时消费者的线程需要设置成1
//确保同一个订单号的数据放到同一个queue中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
}, orderList.get(i).getOrderId());//订单id
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
阿里云mq:
- 不需要我们通过代码去指定消息发送到哪个message queue中,在发送消息的时候,只需要指定 sharding key就可以了,同一 sharding key会发送到同一个message queue里
final ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
//顺序消息
@Test
public void ordreSend() throws InterruptedException {
ArrayList<String> shardingKeys =
Lists.newArrayList("OrderedKey_01", "OrderedKey_02", "OrderedKey_03");
final CountDownLatch count = new CountDownLatch(shardingKeys.size());
shardingKeys.forEach(shardingKey->{
executor.execute(()->{
//循环发送消息
for (int i = 0; i < 100; i++) {
Message msg = new Message(
mqConfig.getOrderTopic(),
mqConfig.getOrderTag(),
("order message" +i).getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
msg.setKey("order_key_"+i);
// 发送消息,只要不抛异常就是成功
try {
SendResult sendResult = orderProducer.send(msg, shardingKey);
Assert.isTrue(sendResult != null,"发送结果为空");
System.out.println(sendResult);
} catch (ONSClientException e) {
System.out.println("发送失败");
}
}
count.countDown();
});
});
count.await();
System.out.println("发送完毕");
}
事务消息:
发送事务消息包含以下两个步骤:
- 发送半事务消息(Half Message)及执行本地事务,此时Half Message对消费者不可见,只有本地事务执行成功,提交后Half Message才会对消费者可见
@Test
public void txSend() throws IOException {
Message msg = new Message(mqConfig.getTxTopic(), "*", "tx Message".getBytes());
//设置第一次回查时间30s
msg.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"30");
SendResult sendResult =
transactionProducerBean.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("执行本地事务");
try{
userService.save(new User().setName("li").setAge(10));
//根据本地事务执行结果来返回不同的TransactionStatus
System.out.println("本地事务提交");
return TransactionStatus.CommitTransaction;
}catch (Exception e){
System.out.println("本地事务回滚");
return TransactionStatus.RollbackTransaction;
}
}
}, null);
System.out.println(sendResult);
}
- 如果本地事务一直没提交,会回查事务状态
@Component
public class TxLocalChecker implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("开始回查本地事务状态");
//根据本地事务状态检查结果返回不同的TransactionStatus
return TransactionStatus.CommitTransaction;
}
}
延迟消息:
发送后立即到达服务器,只是在到达指定延迟时间后才会投递给消费者消费,而不是在到达指定延迟时间后才发送给服务器
原生mq:
只有18个延迟级别,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(level);
阿里云RocketMQ:
//延迟发送
@Test
public void delaySend() {
Message msg = new Message(mqConfig.getDelayTopic(),
mqConfig.getDelayTag(), ("delay message").getBytes());
System.out.println("当前时间 "+ System.currentTimeMillis());
long delayTime = System.currentTimeMillis() + 30000;
msg.setStartDeliverTime(delayTime);
// 设置代表消息的业务关键属性,请尽可能全局唯一
msg.setKey("delay_key");
try {
//同步发送
SendResult sendResult = producer.send(msg);
// 消息发送成功。
Assert.isTrue(sendResult != null,"发送结果为空");
System.out.println(sendResult);
} catch (ONSClientException e) {
System.out.println("发送失败");
}
}
- 定时和延时消息的
msg.setStartDeliverTime
参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 - 定时和延时消息的
msg.setStartDeliverTime
参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败。
生产者、消费者初始化案例
版权声明:本文标题:阿里云RocketMQ 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1726604597a1077422.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论