rocketmq的消费者源码解读五(broker端处理拉取消息请求)

编程知识 更新时间:2023-05-02 19:03:05

9)broker端处理拉取消息请求

9.1)broker接收请求网络层回顾

消费者客户端向broker端发送RemotingCommand,主要包括opaque,requestCode为PULL_MESSAGE,RPC_TYPE为REQUEST_COMMAND,header消息体等;
broker在启动阶段会向协议处理器映射表中注册键值对<code,pair>,pair对象中第一个value是处理器,第二个value是线程池;

9.1.1)broker端的netty框架处理

headContext——>handshakehandler——>nettyEncoder——>nettyDecoder——>idleStatehandler——>connectionManagerHandler ——>serverHandler

9.1.1.1)NettyServerHandler

根据requestCode在协议处理器映射表中,找到pair,将网络层的处理逻辑包装成requestTask,提交给线程池运行;

9.1.1.2)网络层处理逻辑包括:

9.1.1.2.1)构建RemotingResponselCallback实例

重写callback方法,其内部封装了发送结果的逻辑,判断若response不为null,则设置opaque,RPC_TYPE为RESPONSE_COMMAND,执行ChannelHandlerContext#writeAndFlush方法;若response为null,则啥也不做;

9.1.1.2.2)processor.processRequest

准备执行业务逻辑,调用RemotingResponselCallback#callback方法,将RemotingCommand类型的返回结果response作为其入参;

9.2)PullMessageProcessor#processRequest

代码接近400行,先抓重点分析;getMessage方法是broker端处理消息拉取的核心部分;

9.2.1)创建RemotingCommand类型的response

解析requestHead,赋值opaque

9.2.2)权限校验,为tag的hash过滤做准备

9.2.2.1)如请求的queueId小于0或者大于max-1,则返回SYSYTEM_ERROR等;
9.2.2.2)根据requestHeader中的消费者组,从broker端的ComsumerManager中的consumerTable中找到ComsumerGroupInfo实例,即消费者信息;再根据requestHeader中的topic,从ConsumerGroupInfo实例中的subscriptionTable中获取SubscriptionData实例,即订阅信息;将订阅信息等封装进MessageFilter对象;

9.2.3)DefultMessageStore#getMessage方法

broker端处理消息拉取请求,代码接近200行;先抽重点分析;由于已经获取到了分配给当前clientId的一组set,所以只要去broker上访问这些mq即可拿到消息;而PullRequest实例包含四个属性,消费者组,messageQueue,processQueue,nextOffset,所以通过messageQueue中的brokerName,topic和queueId可以定位到某台机器的某个ConsumerQueue实例,再通过nextOffset可以定位到cq实例中具体某个mappedFile;由于每次selectMappedBufferResult的范围是当前mappedFile的nextOffset到当前(mappedFile文件名+wrotePosition)之间,所以最大只能拉取当前mappedFile中的这段区间内容;

9.2.3.1)findConsumeQueue

findConsumeQueue方法根据当前topic,查询consumeQueueTable表,拿到<queueId,consumeQueue>,再根据queueId,拿到consumeQueue;接着拿到当前cq的min值和max逻辑偏移量,判断请求的offset是否处在有效范围;分为4种情况:
若maxOffset为0,表示cq是在findConsumeQueue方法中刚创建的,此时设置GetMessageStatus.NO_MESSAGE_IN_QUEUE,修改offset为0,准备进行长轮询;
若offset<minOffset或offset>maxOffset,设置前者GetMessageStatus.OFFSET_TOO_SMALL,修改offset为minOffset;设置后者GetMessageStatus.OFFSET_OVERFLOW_BADLY,修改offset为minOffset(当minOffset为0时),或者maxOffset(minOffset不为0);
若offset=maxOffset,表示客户端的消费进度和当前queue的消费进度持平,此时设置GetMessageStatus.OFFSET_OVERFLOW_ONE,offset不变,准备进行长轮询;
若offset介于[minOffset,maxOffset),左闭右开,则执行9.2.3.2节开始的逻辑;

9.2.3.2)ConsumeQueue#getIndexBuffer

入参offset其实就是queueOffset,为当前消息在某个ConsumeQueue实例的mfq中的逻辑偏移量,queueOffset乘以20得到真实偏移量;该方法最终返回SelectedMappedBufferResult实例,即目标消息所在的那一小块buffer,该实例的范围是[请求的offset,当前mappedFile文件名+wrotePosition];若该方法返回null,则表示offset所在的cq的mappedFile因为过期被清理掉了,所以会设置GetMessageStatus为OFFSET_FOUND_NULL,并且更新nextBeginOffset为下一个mappedFile的名字,准备进入长轮询;

9.2.3.2.1)MappedFileQueue#findMappedFileByOffset

入参为真实偏移量,该方法在前面已分析;最终返回偏移量所在的MappedFile实例;

9.2.3.2.2)MappedFile#selectMappedBuffer

入参为:真实偏移量%mappedFileSize。最终返回一块buffer副本bufferConsumeQueue;

9.2.3.3)循环读取bufferConsumeQueue上的消息

每次读取一条consumeQueue消息,大小为20字节;

9.2.3.4)isTheBatchFull方法

该方法控制是否跳出循环;
a)还没拉取到数据,则继续循环;
b)拉取的消息数量超过了上限,则准备退出循环;
c)若当前消息属于冷数据,即是最新数据的40%以外的数据,若拉取超过了64kb,则退出循环;或者拉取超过了8条消息,则退出循环;
d)若当前消息属于热数据,即是最新数据的40%以内的数据,若拉取超过了256kb,则退出循环;或者拉取超过了32条消息,则退出循环;
e)以上条件均未达到,则继续循环;
该方法返回false后,接着处理tagsCode过滤,即看当前tagsCode是否在订阅信息subscriptionData中存在,若不存在,则设置GetMessageStatus.NO_MATCHED_MESSAGE;

9.2.3.5)CommitLog#getMessage

和9.2.3.2小节中的方法逻辑一样;最终返回一个SelectMappedBufferResult实例;由于9.2.3.3小节已拿到一条consumeQueue消息,所以取出第一个8字节和第二个4字节中的字段,分别表示该条cq消息在commitLog中的物理偏移量offsetPy和大小sizePy,这两个值作为CommitLog#getMessage方法的入参;若返回的buffer切片为null,则表示删除过期文件的定时任务将offsetPy所在的commitLog的mappedFile文件删除了,则设置GetMessageStatus为MESSAGE_WAS_REMOVING,更新nextPhyFileStartOffset 为包含该offsetPy的commitLog的mappedFile文件的下一个commitLog的mappedFile的名字,而再次循环时,cq中消息的offsetPy小于nextPhyFileStartOffset的消息都属于过期的无效的消息,因为commitLog中过期被删除的消息,consumeQueue中消息的offsetPy也是无效值;

9.2.3.6)GetMessageResult#addMessage方法

累加消息的数量,大小;接着设置GetMessageStatus为FOUND,复位nextPhyFileStartOffset为0;接着再次循环,读取当前consumeQueue上mappedFile的下一条消息;退出循环一般发生在isTheBatchFull方法处;退出循环后会执行9.2.3.7节开始的逻辑;

9.2.3.7)计算下次拉取的起点

计算公式为(offset+ i/20),i表示本次一共循环读取的消息大小,是20的整数倍;offset表示客户端发起本次拉取消息请求的起点,是所请求的messageQueue对应的ConsumeQueue实例中mfq的逻辑偏移量;

9.2.3.8)计算还有多少数据没有消费

公式为(maxOffsetPy – maxPhyOffsetPulling),maxOffsetPy表示当前CommitLog实例中mfq的最大物理偏移量,maxPhyOffsetPulling表示本次循环读取的最后一个consumeQueue消息在commitLog中偏移量;若此差值大于内存的40%,则表示下次拉取消息需要从broker的从机拉取了,因为离最新数据40%以外的内容,都已经刷到磁盘了;最后执行bufferConsumeQueue.release方法,释放切片资源;
为啥有这样的机制呢?因为若让broker主节点去磁盘拉取数据,这是个相对耗时的任务,broker的主节点的最主要任务是将生产者发送的消息储存进磁盘,所以为了不影响broker主节点,让下次让broker从节点去磁盘读数据;最后返回GetMessageResult实例,包含了当前messageQueue的本次消息拉取请求所拉取到的累加消息大小,数量,下次拉取消息的consumeQueue的逻辑偏移量,拉取状态,当前cq的最大最小值,建议下次从主机或从机拉取等,返回到PullMessageProcessor类的getMessage方法的调用处;

9.2.4)一些其他杂七杂八的判断

这里省略杂七杂八的,暂时只讲重难点;

9.2.5)状态转换,并执行相应操作

broker端将9种GetMessageStatus转换为4种ResponseCode,在客户端最后将4种ResponseCode转换为4种PullStatus;其中下边的9.2.5.7节,9.2.5.8节,9.2.5.9节三种情况需要进行长轮询,不给客户端响应;其他情况都将返回响应给客户端;

9.2.5.1)找到了需要拉取的消息

查询到了msg,broker端会将GetMessageResult类中的List类型的集合全部导入一个byte[]数组内;接着GetMessageResult会调用release方法,其中会遍历List集合,调用每个元素的release方法释放资源,将byte[]数组赋值给reponse的body,再发送给客户端;客户端收到响应后会正常处理;
GetMessageStatus. FOUND转为ResponseCode.SUCCESS,客户端收到响应后,会转为FOUND状态;

9.2.5.2)拉取的commitLog文件已过期且被清除

在DefaultMessageStore#getMessage的 方法中执行CommitLog#getMessage方法,若获取到的buffer切片为null,若一直都没有拉取到消息则设置GetMessageStatus状态为MESSAGE_WAS_REMOVING,若有拉取到过消息,则保持NO_MATCHED_MESSAGE;
获取到的buffer切片为null是因为查询时正好赶上commitLog清理过期文件,导致查询失败;
GetMessageStatus. MESSAGE_WAS_REMOVING或GetMessageStatus. NO_MATCHED_MESSAGE均转为ResponseCode.PULL_RETRY_IMMEDIATELY;并且broker端不做处理,broker端将响应发给客户端,客户端收到响应后,会转为NO_MATCHED_MESSAGE,调整consume offset,再次向broker发起pullRequest请求;

9.2.5.3)并没有定位到ConsumeQueue实例

不存在这种情况;因为getMessage方法中的findConsumeQueue方法一定会返回一个ConsumeQueue实例的,不可能返回null;因为findConsumeQueue方法中发现consumeQueue为null后,会创建一个ConsumeQueue实例;

9.2.5.4)要拉取的消息存在但通过tagsCode被滤掉了

在DefaultMessageStore#getMessage方法中,取到每条cq消息后,需要先执行tagsCode过滤,若被过滤掉了,则设置状态为GetMessageStatus.NO_MATCHED_MESSAGE ,接着转为
ResponseCode.PULL_RETRY_IMMEDIATELY
表示broker端将消息过滤掉了,此时broker端不做进一步处理,客户端收到响应后,会转为NO_MATCHED_MESSAGE,即调整consume offset,再次发起pullRequest;

9.2.5.5)请求的cq的逻辑偏移量超了cq的上限

在DefaultMessageStore#getMessage方法中,在执行findConsumeQueue方法找到一个ConsumeQueue实例后,会先判断请求的offset是否在当前cq的有效范围;
若pullRequest.offset越界maxOffset,此时broker端不做进一步处理,在客户端会转为OFFSET_ILLEGAL,即删除processQueue,消费者停止消费该mq,待下次负载均衡后再继续消费;
此时设置状态为GetMessageStatus.OFFSET_OVERFLOW_BADLY,接着会转为ResponseCode.PULL_OFFSET_MOVED,在客户端会转为OFFSET_ILLEGAL;

9.2.5.6)请求的cq的逻辑偏移量超了cq的下限

同理表示pullRequest.offset越界minOffset,此时broker端不做进一步处理,在客户端会转为OFFSET_ILLEGAL,即删除processQueue,消费者停止消费该mq,待下次负载均衡后再继续消费;
此时设置状态为GetMessageStatus. OFFSET_TOO_SMALL,接着会转为ResponseCode.PULL_OFFSET_MOVED,在客户端会转为OFFSET_ILLEGAL;

9.2.5.7)发现cq不存在,但马上创建了一个

此时,cq中的mfq中的集合肯定为null,此时设置GetMessageStatus状态为
NO_MESSAGE_IN_QUEUE;接着根据请求的offset是否为null,转为不同的ResponseCode;
若pullRequest.offset不为0,则转换为ResponseCode.PULL_OFFSET_MOVED,此时broker端不做进一步处理,在客户端会转为OFFSET_ILLEGAL,即删除processQueue,消费者停止消费该mq,待下次负载均衡后再继续消费;ResponseCode.PULL_OFFSET_MOVED这种状态表示offset有问题;
若pullRequest.offset为0,则转换为ResponseCode.PULL_NOT_FOUND,对于ResponseCode.PULL_NOT_FOUND状态,broker端会先判断是否允许长轮询,根据PullMessageProcessor#processRequest方法的第三个入参brokerAllowSuspend是否为true,若为true则允许长轮询,则会创建一个PullRequest实例(该类与PullRequestService类中的PullRequest不是同一个类,只是名字相同),接着会将PullRequest实例放入PullRequestHoldService线程中的pullRequestTable中;
最后将response设为null,则broker端不给客户端响应;
长轮询是为了避免客户端收到响应后,又频繁的向broker发送拉取消息的请求;关于长轮询的具体细节将单独抽出一小节分析,在第10)小节中将分析长轮询;
准备进行长轮询机制,在客户端会转为NO_NEW_MESSAGE,即调整consume offset,再次发起pullRequest;

9.2.5.8)拉取的cq文件已过期且被清除

在DefaultMessageStore#getMessage的 方法中,若请求的offset在有效范围,但执行ConsumeQueue#getIndexBuffer方法时,返回null,即表示获取到的offset所在文件的buffer切片为null,则意味着查询时正好赶上consumeQueue清理过期文件,导致查询失败,此时broker端需要进行长轮询,处理逻辑参照9.2.5.7节;
GetMessageStatus.OFFSET_FOUND_NULL转为ResponseCode.PULL_NOT_FOUND;
在客户端会转为NO_NEW_MESSAGE,即调整consume offset,再次发起pullRequest;

9.2.5.9)请求的cq的offset与cq的最大偏移量持平

pullRequest.offset=consumeQueue.maxOffset,表示请求的offset与当前mq最新的offset持平,即没有新消息,broker端准备进行长轮询机制,处理逻辑参照9.2.5.7节;
在客户端会转为NO_NEW_MESSAGE,即调整consume offset,再次发起pullRequest;
此时状态由GetMessageStatus.OFFSET_OVERFLOW_ONE —>ResponseCode.PULL_NOT_FOUND;

9.2.6)处理ResponseCode.PULL_OFFSET_MOVED情况

新建一个MessageQueue实例封装了本次拉取请求的topic,queueId,brokerName;又新建一个OffsetMovedEvent实例,封装了刚才新建的mq,以及其它的一些属性,再执行DefaultMessageStore#generateOffsetMovedEvent方法,其中会新建一个MessageExtBrokerInner实例msgInner,topic为OFFSET_MOVED_EVENT,以及一些其他属性,最后执行DefaultMessageStore#putMessage(msgInner)方法,将该系统主题的消息存入磁盘,最终控制台的就依赖该主题获取一些信息做视图;

9.2.7)持久化消费进度

只有当PullMessageProcessor#processRequest的入参brokerAllowSuspend为true,sysFlag允许提交消费者本地mq进度至broker,当前broker为主节点,三者同时满足时,才会将指定消费者组下指定topic的指定mq的消费进度持久化到broker端;主要是将数据缓存进ConsumerOffsetManager的offsetTable中,key为topic@group,value为map,其中map的key为mq,value为offset;最后返回response;

更多推荐

rocketmq的消费者源码解读五(broker端处理拉取消息请求)

本文发布于:2023-04-28 04:25:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/a367bd15ebc09cce8903202f4e3a0d30.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   消费者   消息   rocketmq   broker

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!

  • 107620文章数
  • 27229阅读数
  • 0评论数