0%

NSQ学习 逻辑梳理

消费者连接NSQD或NSQLookupd

消费者可以连接多个NSQLookupd,消费者从每一个NSQLookupd查询拥有需要消费的TopicNSQD,并与查询得到的每一个NSQD建立连接。 每当NSQDTopicChannle出现了变动,NSQD会通知每一个与它相连的NSQLookupd

一条消息的生命周期

一条消息首先被生产者投递到NSQDTopic,然后由Topic.messagePump这个方法分发给TopicChannel

nsq/nsqd/protocol_v2.go中的protocolV2.IOLoop方法一边根据客户端的接收情况把Channel中的消息发给客户端(protocolV2.messagePump),一遍处理来自客户端的指令如REQFIN等。 消息发给客户端之前,首先push到InFlightQueue这个优先队列中,这个队列可以表示正在处理中的消息。 InFlightQueue是最小堆,用于比较的属性是过期时刻。 每当消费者发送表示执行成功的FIN指令给NSQD时,NSQD就会把执行成功的消息从对应ChannelInFlightQueue删除; 如果直到超时还未收到消费者的FIN指令,那么对应的消息会从InFlightQueue删除,然后重新投递给Channel; 如果收到REQ指令,说明对应消息需要重新执行,这条消息会从InFlightQueue删除然后进入DeferredQueue,等到需要被执行的时刻再重新投递给ChannelDeferredQueue也是最小堆,用于比较的属性是消息被执行的时刻。

启动和关闭

DeferredQueue和InFlightQueue

NSQD启动时,通过Metadata生成TopicChannel,并根据TopicChannel找到对应的文件队列。

NSQD关闭时,每一个ChannelmemoryMsgChan的消息会被写到对应的文件队列里,inFlightMessagesdeferredMessages同上。