9)broker端处理拉取消息请求
9.1)broker接收请求网络层回顾
消费者客户端向broker端发送RemotingCommand,主要包括opaque,requestCode为PULL_MESSAGE,RPC_TYPE为REQUEST_COMMAND,header消息体等;
broker在启动阶段会向协议处理器映射表中注册键值对<code,pair>,pair对象中第一个value是处理器,第二个value是线程池;
9.1.1)broker端的netty框架处理
headContext——>handshakehandler——>nettyEncoder——>nettyDecoder——>idleStatehandler——>connectionManagerHandler ——>serverHandler
9.1.1.1)NettyServerHandler
根据requestCode在协议处理器映射表中,找到pair,将网络层的处理逻辑包装成requestTask,提交给线程池运行;
9.1.1.2)网络层处理逻辑包括:
9.1.1.2.1)构建RemotingResponselCallback实例
重写callback方法,其内部封装了发送结果的逻辑,判断若response不为null,则设置opaque,RPC_TYPE为RESPONSE_COMMAND,执行ChannelHandlerContext#writeAndFlush方法;若response为null,则啥也不做;
9.1.1.2.2)processor.processRequest
准备执行业务逻辑,调用RemotingResponselCallback#callback方法,将RemotingCommand类型的返回结果response作为其入参;
9.2)PullMessageProcessor#processRequest
代码接近400行,先抓重点分析;getMessage方法是broker端处理消息拉取的核心部分;
9.2.1)创建RemotingCommand类型的response
解析requestHead,赋值opaque
9.2.2)权限校验,为tag的hash过滤做准备
9.2.2.1)如请求的queueId小于0或者大于max-1,则返回SYSYTEM_ERROR等;
9.2.2.2)根据requestHeader中的消费者组,从broker端的ComsumerManager中的consumerTable中找到ComsumerGroupInfo实例,即消费者信息;再根据requestHeader中的topic,从ConsumerGroupInfo实例中的subscriptionTable中获取SubscriptionData实例,即订阅信息;将订阅信息等封装进MessageFilter对象;
9.2.3)DefultMessageStore#getMessage方法
broker端处理消息拉取请求,代码接近200行;先抽重点分析;由于已经获取到了分配给当前clientId的一组set,所以只要去broker上访问这些mq即可拿到消息;而PullRequest实例包含四个属性,消费者组,messageQueue,processQueue,nextOffset,所以通过messageQueue中的brokerName,topic和queueId可以定位到某台机器的某个ConsumerQueue实例,再通过nextOffset可以定位到cq实例中具体某个mappedFile;由于每次selectMappedBufferResult的范围是当前mappedFile的nextOffset到当前(mappedFile文件名+wrotePosition)之间,所以最大只能拉取当前mappedFile中的这段区间内容;
9.2.3.1)findConsumeQueue
findConsumeQueue方法根据当前topic,查询consumeQueueTable表,拿到<queueId,consumeQueue>,再根据queueId,拿到consumeQueue;接着拿到当前cq的min值和max逻辑偏移量,判断请求的offset是否处在有效范围;分为4种情况:
若maxOffset为0,表示cq是在findConsumeQueue方法中刚创建的,此时设置GetMessageStatus.NO_MESSAGE_IN_QUEUE,修改offset为0,准备进行长轮询;
若offset<minOffset或offset>maxOffset,设置前者GetMessageStatus.OFFSET_TOO_SMALL,修改offset为minOffset;设置后者GetMessageStatus.OFFSET_OVERFLOW_BADLY,修改offset为minOffset(当minOffset为0时),或者maxOffset(minOffset不为0);
若offset=maxOffset,表示客户端的消费进度和当前queue的消费进度持平,此时设置GetMessageStatus.OFFSET_OVERFLOW_ONE,offset不变,准备进行长轮询;
若offset介于[minOffset,maxOffset),左闭右开,则执行9.2.3.2节开始的逻辑;
9.2.3.2)ConsumeQueue#getIndexBuffer
入参offset其实就是queueOffset,为当前消息在某个ConsumeQueue实例的mfq中的逻辑偏移量,queueOffset乘以20得到真实偏移量;该方法最终返回SelectedMappedBufferResult实例,即目标消息所在的那一小块buffer,该实例的范围是[请求的offset,当前mappedFile文件名+wrotePosition];若该方法返回null,则表示offset所在的cq的mappedFile因为过期被清理掉了,所以会设置GetMessageStatus为OFFSET_FOUND_NULL,并且更新nextBeginOffset为下一个mappedFile的名字,准备进入长轮询;
9.2.3.2.1)MappedFileQueue#findMappedFileByOffset
入参为真实偏移量,该方法在前面已分析;最终返回偏移量所在的MappedFile实例;
9.2.3.2.2)MappedFile#selectMappedBuffer
入参为:真实偏移量%mappedFileSize。最终返回一块buffer副本bufferConsumeQueue;
9.2.3.3)循环读取bufferConsumeQueue上的消息
每次读取一条consumeQueue消息,大小为20字节;
9.2.3.4)isTheBatchFull方法
该方法控制是否跳出循环;
a)还没拉取到数据,则继续循环;
b)拉取的消息数量超过了上限,则准备退出循环;
c)若当前消息属于冷数据,即是最新数据的40%以外的数据,若拉取超过了64kb,则退出循环;或者拉取超过了8条消息,则退出循环;
d)若当前消息属于热数据,即是最新数据的40%以内的数据,若拉取超过了256kb,则退出循环;或者拉取超过了32条消息,则退出循环;
e)以上条件均未达到,则继续循环;
该方法返回false后,接着处理tagsCode过滤,即看当前tagsCode是否在订阅信息subscriptionData中存在,若不存在,则设置GetMessageStatus.NO_MATCHED_MESSAGE;
9.2.3.5)CommitLog#getMessage
和9.2.3.2小节中的方法逻辑一样;最终返回一个SelectMappedBufferResult实例;由于9.2.3.3小节已拿到一条consumeQueue消息,所以取出第一个8字节和第二个4字节中的字段,分别表示该条cq消息在commitLog中的物理偏移量offsetPy和大小sizePy,这两个值作为CommitLog#getMessage方法的入参;若返回的buffer切片为null,则表示删除过期文件的定时任务将offsetPy所在的commitLog的mappedFile文件删除了,则设置GetMessageStatus为MESSAGE_WAS_REMOVING,更新nextPhyFileStartOffset 为包含该offsetPy的commitLog的mappedFile文件的下一个commitLog的mappedFile的名字,而再次循环时,cq中消息的offsetPy小于nextPhyFileStartOffset的消息都属于过期的无效的消息,因为commitLog中过期被删除的消息,consumeQueue中消息的offsetPy也是无效值;
9.2.3.6)GetMessageResult#addMessage方法
累加消息的数量,大小;接着设置GetMessageStatus为FOUND,复位nextPhyFileStartOffset为0;接着再次循环,读取当前consumeQueue上mappedFile的下一条消息;退出循环一般发生在isTheBatchFull方法处;退出循环后会执行9.2.3.7节开始的逻辑;
9.2.3.7)计算下次拉取的起点
计算公式为(offset+ i/20),i表示本次一共循环读取的消息大小,是20的整数倍;offset表示客户端发起本次拉取消息请求的起点,是所请求的messageQueue对应的ConsumeQueue实例中mfq的逻辑偏移量;
9.2.3.8)计算还有多少数据没有消费
公式为(maxOffsetPy – maxPhyOffsetPulling),maxOffsetPy表示当前CommitLog实例中mfq的最大物理偏移量,maxPhyOffsetPulling表示本次循环读取的最后一个consumeQueue消息在commitLog中偏移量;若此差值大于内存的40%,则表示下次拉取消息需要从broker的从机拉取了,因为离最新数据40%以外的内容,都已经刷到磁盘了;最后执行bufferConsumeQueue.release方法,释放切片资源;
为啥有这样的机制呢?因为若让broker主节点去磁盘拉取数据,这是个相对耗时的任务,broker的主节点的最主要任务是将生产者发送的消息储存进磁盘,所以为了不影响broker主节点,让下次让broker从节点去磁盘读数据;最后返回GetMessageResult实例,包含了当前messageQueue的本次消息拉取请求所拉取到的累加消息大小,数量,下次拉取消息的consumeQueue的逻辑偏移量,拉取状态,当前cq的最大最小值,建议下次从主机或从机拉取等,返回到PullMessageProcessor类的getMessage方法的调用处;
9.2.4)一些其他杂七杂八的判断
这里省略杂七杂八的,暂时只讲重难点;
9.2.5)状态转换,并执行相应操作
broker端将9种GetMessageStatus转换为4种ResponseCode,在客户端最后将4种ResponseCode转换为4种PullStatus;其中下边的9.2.5.7节,9.2.5.8节,9.2.5.9节三种情况需要进行长轮询,不给客户端响应;其他情况都将返回响应给客户端;
9.2.5.1)找到了需要拉取的消息
查询到了msg,broker端会将GetMessageResult类中的List类型的集合全部导入一个byte[]数组内;接着GetMessageResult会调用release方法,其中会遍历List集合,调用每个元素的release方法释放资源,将byte[]数组赋值给reponse的body,再发送给客户端;客户端收到响应后会正常处理;
GetMessageStatus. FOUND转为ResponseCode.SUCCESS,客户端收到响应后,会转为FOUND状态;
9.2.5.2)拉取的commitLog文件已过期且被清除
在DefaultMessageStore#getMessage的 方法中执行CommitLog#getMessage方法,若获取到的buffer切片为null,若一直都没有拉取到消息则设置GetMessageStatus状态为MESSAGE_WAS_REMOVING,若有拉取到过消息,则保持NO_MATCHED_MESSAGE;
获取到的buffer切片为null是因为查询时正好赶上commitLog清理过期文件,导致查询失败;
GetMessageStatus. MESSAGE_WAS_REMOVING或GetMessageStatus. NO_MATCHED_MESSAGE均转为ResponseCode.PULL_RETRY_IMMEDIATELY;并且broker端不做处理,broker端将响应发给客户端,客户端收到响应后,会转为NO_MATCHED_MESSAGE,调整consume offset,再次向broker发起pullRequest请求;
9.2.5.3)并没有定位到ConsumeQueue实例
不存在这种情况;因为getMessage方法中的findConsumeQueue方法一定会返回一个ConsumeQueue实例的,不可能返回null;因为findConsumeQueue方法中发现consumeQueue为null后,会创建一个ConsumeQueue实例;
9.2.5.4)要拉取的消息存在但通过tagsCode被滤掉了
在DefaultMessageStore#getMessage方法中,取到每条cq消息后,需要先执行tagsCode过滤,若被过滤掉了,则设置状态为GetMessageStatus.NO_MATCHED_MESSAGE ,接着转为
ResponseCode.PULL_RETRY_IMMEDIATELY
表示broker端将消息过滤掉了,此时broker端不做进一步处理,客户端收到响应后,会转为NO_MATCHED_MESSAGE,即调整consume offset,再次发起pullRequest;
9.2.5.5)请求的cq的逻辑偏移量超了cq的上限
在DefaultMessageStore#getMessage方法中,在执行findConsumeQueue方法找到一个ConsumeQueue实例后,会先判断请求的offset是否在当前cq的有效范围;
若pullRequest.offset越界maxOffset,此时broker端不做进一步处理,在客户端会转为OFFSET_ILLEGAL,即删除processQueue,消费者停止消费该mq,待下次负载均衡后再继续消费;
此时设置状态为GetMessageStatus.OFFSET_OVERFLOW_BADLY,接着会转为ResponseCode.PULL_OFFSET_MOVED,在客户端会转为OFFSET_ILLEGAL;
9.2.5.6)请求的cq的逻辑偏移量超了cq的下限
同理表示pullRequest.offset越界minOffset,此时broker端不做进一步处理,在客户端会转为OFFSET_ILLEGAL,即删除processQueue,消费者停止消费该mq,待下次负载均衡后再继续消费;
此时设置状态为GetMessageStatus. OFFSET_TOO_SMALL,接着会转为ResponseCode.PULL_OFFSET_MOVED,在客户端会转为OFFSET_ILLEGAL;
9.2.5.7)发现cq不存在,但马上创建了一个
此时,cq中的mfq中的集合肯定为null,此时设置GetMessageStatus状态为
NO_MESSAGE_IN_QUEUE;接着根据请求的offset是否为null,转为不同的ResponseCode;
若pullRequest.offset不为0,则转换为ResponseCode.PULL_OFFSET_MOVED,此时broker端不做进一步处理,在客户端会转为OFFSET_ILLEGAL,即删除processQueue,消费者停止消费该mq,待下次负载均衡后再继续消费;ResponseCode.PULL_OFFSET_MOVED这种状态表示offset有问题;
若pullRequest.offset为0,则转换为ResponseCode.PULL_NOT_FOUND,对于ResponseCode.PULL_NOT_FOUND状态,broker端会先判断是否允许长轮询,根据PullMessageProcessor#processRequest方法的第三个入参brokerAllowSuspend是否为true,若为true则允许长轮询,则会创建一个PullRequest实例(该类与PullRequestService类中的PullRequest不是同一个类,只是名字相同),接着会将PullRequest实例放入PullRequestHoldService线程中的pullRequestTable中;
最后将response设为null,则broker端不给客户端响应;
长轮询是为了避免客户端收到响应后,又频繁的向broker发送拉取消息的请求;关于长轮询的具体细节将单独抽出一小节分析,在第10)小节中将分析长轮询;
准备进行长轮询机制,在客户端会转为NO_NEW_MESSAGE,即调整consume offset,再次发起pullRequest;
9.2.5.8)拉取的cq文件已过期且被清除
在DefaultMessageStore#getMessage的 方法中,若请求的offset在有效范围,但执行ConsumeQueue#getIndexBuffer方法时,返回null,即表示获取到的offset所在文件的buffer切片为null,则意味着查询时正好赶上consumeQueue清理过期文件,导致查询失败,此时broker端需要进行长轮询,处理逻辑参照9.2.5.7节;
GetMessageStatus.OFFSET_FOUND_NULL转为ResponseCode.PULL_NOT_FOUND;
在客户端会转为NO_NEW_MESSAGE,即调整consume offset,再次发起pullRequest;
9.2.5.9)请求的cq的offset与cq的最大偏移量持平
pullRequest.offset=consumeQueue.maxOffset,表示请求的offset与当前mq最新的offset持平,即没有新消息,broker端准备进行长轮询机制,处理逻辑参照9.2.5.7节;
在客户端会转为NO_NEW_MESSAGE,即调整consume offset,再次发起pullRequest;
此时状态由GetMessageStatus.OFFSET_OVERFLOW_ONE —>ResponseCode.PULL_NOT_FOUND;
9.2.6)处理ResponseCode.PULL_OFFSET_MOVED情况
新建一个MessageQueue实例封装了本次拉取请求的topic,queueId,brokerName;又新建一个OffsetMovedEvent实例,封装了刚才新建的mq,以及其它的一些属性,再执行DefaultMessageStore#generateOffsetMovedEvent方法,其中会新建一个MessageExtBrokerInner实例msgInner,topic为OFFSET_MOVED_EVENT,以及一些其他属性,最后执行DefaultMessageStore#putMessage(msgInner)方法,将该系统主题的消息存入磁盘,最终控制台的就依赖该主题获取一些信息做视图;
9.2.7)持久化消费进度
只有当PullMessageProcessor#processRequest的入参brokerAllowSuspend为true,sysFlag允许提交消费者本地mq进度至broker,当前broker为主节点,三者同时满足时,才会将指定消费者组下指定topic的指定mq的消费进度持久化到broker端;主要是将数据缓存进ConsumerOffsetManager的offsetTable中,key为topic@group,value为map,其中map的key为mq,value为offset;最后返回response;
更多推荐
rocketmq的消费者源码解读五(broker端处理拉取消息请求)
发布评论