消费者连接NSQD或NSQLookupd
消费者可以连接多个NSQLookupd
,消费者从每一个NSQLookupd
查询拥有需要消费的Topic
的NSQD
,并与查询得到的每一个NSQD
建立连接。 每当NSQD
中Topic
或Channle
出现了变动,NSQD
会通知每一个与它相连的NSQLookupd
。
一条消息的生命周期
一条消息首先被生产者投递到NSQD
的Topic
,然后由Topic.messagePump
这个方法分发给Topic
的Channel
。
nsq/nsqd/protocol_v2.go
中的protocolV2.IOLoop
方法一边根据客户端的接收情况把Channel
中的消息发给客户端(protocolV2.messagePump
),一遍处理来自客户端的指令如REQ
、FIN
等。 消息发给客户端之前,首先push到InFlightQueue
这个优先队列中,这个队列可以表示正在处理中的消息。 InFlightQueue
是最小堆,用于比较的属性是过期时刻。 每当消费者发送表示执行成功的FIN
指令给NSQD
时,NSQD
就会把执行成功的消息从对应Channel
的InFlightQueue
删除; 如果直到超时还未收到消费者的FIN
指令,那么对应的消息会从InFlightQueue
删除,然后重新投递给Channel
; 如果收到REQ
指令,说明对应消息需要重新执行,这条消息会从InFlightQueue
删除然后进入DeferredQueue
,等到需要被执行的时刻再重新投递给Channel
。 DeferredQueue
也是最小堆,用于比较的属性是消息被执行的时刻。
启动和关闭
DeferredQueue和InFlightQueue
NSQD
启动时,通过Metadata
生成Topic
和Channel
,并根据Topic
和Channel
找到对应的文件队列。
NSQD
关闭时,每一个Channel
的memoryMsgChan
的消息会被写到对应的文件队列里,inFlightMessages
和deferredMessages
同上。