--- title: nsq阅读(3)——nsqd categories: [源码阅读,nsq] --- > [!note] > 基于nsq v1.3.0 ## 执行流程 在第一篇中,主要对nsq中涉及到的主要组件,以及数据流在这些组件中的流动进行了简单说明。在本篇中,我们跟随上一篇最后给出的demo,对应到nsqd的代码中,看一下执行流程是怎么样的。 上篇最后我们向4151端口发起http请求来发送消息: ```shell curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic" ``` 这个调用的是http接口`/pub`,对应nsqd源码`http.go`中的`doPUB`函数: 1. 调用`nsqd.GetTopic`获取/创建topic 2. 调用`NewMesssage`创建message 3. 调用`topic.PutMessage`将消息投递到topic中 创建topic的时候会启动topic的messagePump协程,负责处理topic的相关事件,包括: - 处理内存/磁盘消息 - 监听channel数量变更 - 暂停topic - 优雅退出 ```go func (t *Topic) messagePump() { var msg *Message var buf []byte var err error var chans []*Channel var memoryMsgChan chan *Message var backendChan <-chan []byte ... for { select { case msg = <-memoryMsgChan: // 获取到内存队列的消息 case buf = <-backendChan: // 获取到磁盘队列的消息,需要解码 msg, err = decodeMessage(buf) ... case <-t.channelUpdateChan: // channel数量发生变化 chans = chans[:0] t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.pauseChan: // 暂停channel if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.exitChan: // 优雅退出 goto exit } // 将消息投递到每个channel for i, channel := range chans { // 第一个channel直接用msg // 其它channel拷贝msg chanMsg := msg if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } // 投递延时消息,用优先队列维护 if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } // 投递普通消息,放到channel的内存队列或者磁盘队列 err := channel.PutMessage(chanMsg) if err != nil { t.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } exit: t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) } ``` 以上是topic的所有处理逻辑,外界通过信号,其实最主要的只有「将消息投递到所有绑定的channel」这一步。 而channel的创建时机有三个: 1. 程序启动时的`NSQD.LoadMetadata`,读取本地配置文件配置,创建channel 2. 通过http接口`/channel/create`直接创建channel 3. client通过发起SUB请求订阅topic、channel的时候创建channel 我们最常见的是client通过SUB订阅topic来创建channel,client即连接到nsqd的消费者: ```go func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { ... topicName := string(params[1]) channelName := string(params[2]) ... var channel *Channel for i := 1; ; i++ { // 获取topic,如果不存在就会创建 topic := p.nsqd.GetTopic(topicName) // 获取channel,如果不存在就会创建 channel = topic.GetChannel(channelName) // 将client加入channel中 if err := channel.AddClient(client.ID, client); err != nil { return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error()) } ... break } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // 通知client订阅了channel client.SubEventChan <- channel return okBytes, nil } ``` 将client加入channel后,channel中的消息就会负载均衡到所有订阅该channel的client。 SUB只是将topic、channel、client三者关系做了绑定,最后一行将这个channel发送到了client的messagePump当中,client的messagePump是在IOLoop中启动的,messagePump负责处理client相关的各种事件,包括: 1. 消息发送控制 2. 消息缓冲管理 3. 心跳维护 ```go func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var memoryMsgChan chan *Message var backendMsgChan <-chan []byte var subChannel *Channel var flusherChan <-chan time.Time var sampleRate int32 // client发送 SUB 命令订阅某个主题时,新的 Channel 会通过这个通道传递 subEventChan := client.SubEventChan // client发送 IDENTIFY 命令协商client的配置的时候,携带的数据会通过这个通道传递 identifyEventChan := client.IdentifyEventChan // 控制缓冲刷新的最小时间间隔 // 1. 批量发送优化: 将多个消息缓存起来一起发送,减少系统调用 // 2. 强制刷新控制: 如果消息量很小,确保消息不会在缓冲区停留太久 outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) // 心跳间隔,用于探测client的存活状态 heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C // 控制消息处理超时 // 1. 限制客户端处理单个消息的最大时间,当消息处理超时时,NSQ 会重新投递该消息 // 2. 确保每个消息都得到正确处理或重试 msgTimeout := client.MsgTimeout // 是否已经冲刷缓冲区,等下次有数据的时候再置为false,然后ticker到时间后会触发刷新 flushed := true // 通知IOLoop,messagePump已初始化完毕 close(startedChan) for { if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true } else if flushed { // 上次已冲刷,不需要监听 flusherChan memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { // 未冲刷,需要监听 flusherChan memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // 冲刷缓冲区 client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true case <-client.ReadyStateChan: // case subChannel = <-subEventChan: // 只能订阅一次 subEventChan = nil case identifyData := <-identifyEventChan: // 只能协商一次 identifyEventChan = nil outputBufferTicker.Stop() if identifyData.OutputBufferTimeout > 0 { outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout) } heartbeatTicker.Stop() heartbeatChan = nil if identifyData.HeartbeatInterval > 0 { heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval) heartbeatChan = heartbeatTicker.C } if identifyData.SampleRate > 0 { sampleRate = identifyData.SampleRate } msgTimeout = identifyData.MsgTimeout case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case b := <-backendMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } exit: p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client) heartbeatTicker.Stop() outputBufferTicker.Stop() if err != nil { p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err) } } ``` ## 程序入口main 这篇进入nsqd程序的源码分析,废话不多说直接看到apps/nsqd/main.go这个程序入口文件。这里用了go-svc库,其实就是对系统信号通知进行了封装,定义回调接口的方式将代码规范化。 ```go // 实现了svc.Service接口 type program struct { once sync.Once nsqd *nsqd.NSQD } func main() { prg := &program{} // 运行 if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { logFatal("%s", err) } } ``` Run方法很简单: ```go func Run(service Service, sig ...os.Signal) error { env := environment{} // 回调Service.Init方法 if err := service.Init(env); err != nil { return err } // 回调service.Start方法 if err := service.Start(); err != nil { return err } if len(sig) == 0 { sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM} } // 监听信号 signalChan := make(chan os.Signal, 1) signalNotify(signalChan, sig...) var ctx context.Context if s, ok := service.(Context); ok { ctx = s.Context() } else { ctx = context.Background() } for { select { case s := <-signalChan: // 信号发生 if h, ok := service.(Handler); ok { // 如果还实现了处理信号的接口,回调Service.Handle if err := h.Handle(s); err == ErrStop { goto stop } } else { // 否则默认为退出 goto stop } case <-ctx.Done(): goto stop } } stop: return service.Stop() } ``` 所以在Start函数中应该再起一个协程去处理主事务,而主协程用来监听信号的发生,并且做优雅退出之类的处理。整个main函数就干了三件事,比较简单就不再贴代码: 1. 解析命令行参数并初始化nsqd 2. 加载本地的元数据并启动nsqd 3. 出错或停止时调用nsqd.Exit优雅退出 ## 初始化nsqd.New 主要做了如下工作: 1. 初始化了一些配置参数 2. 初始化与nsqlookupd通信的client,以在运行时感知集群的元数据 3. 监听4150 tcp端口 4. 监听4151 http端口 ## 启动nsqd.Main 上面的“启动nsqd”就是运行nsqd.Main方法: ```go func (n *NSQD) Main() error { exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { once.Do(func() { // 通知退出Main方法 if err != nil { n.logf(LOG_FATAL, "%s", err) } exitCh <- err }) } // 启动TPCServer n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) if n.httpListener != nil { // 启动httpServer httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) } // https ... // n.waitGroup.Wrap(n.queueScanLoop) // n.waitGroup.Wrap(n.lookupLoop) // if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } err := <-exitCh return err } ``` TPC和HTTP都同时监听,不过干的都是一样的事情,前者是基于TCP,使用自己定义的包格式来传输,后者则是基于HTTP包格式进行传输。默认分别监听4150和4151端口。 | **协议** | **场景** | **支持的命令示例** | | -------- | --------------------------------------- | ------------------- | | TCP | 持续连接、高频通信、大规模消息流处理 | PUB, SUB, RDY, FIN | | HTTP | 临时操作、调试、运维工具、REST API 调用 | /pub, /stats, /ping | ### TCP 使用无限循环与每个客户端建立连接,开启协程单独处理每个连接: ```go func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { var wg sync.WaitGroup for { // 建立连接 clientConn, err := listener.Accept() ... wg.Add(1) // 开启协程处理连接 go func() { handler.Handle(clientConn) wg.Done() }() } wg.Wait() ... } ``` 每个连接建立的时候,开始的4字节为协议约定好的魔数“ V2”,如果不是这个魔数则返回协议错误 ```go func (p *tcpServer) Handle(conn net.Conn) { // 读取魔数,作为协议的序言 buf := make([]byte, 4) _, err := io.ReadFull(conn, buf) protocolMagic := string(buf) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{nsqd: p.nsqd} default: // 魔数不是预期值,直接返回协议错误 ... } client := prot.NewClient(conn) p.conns.Store(conn.RemoteAddr(), client) // IOLoop循环读取并处理客户端的请求 err = prot.IOLoop(client) if err != nil { p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err) } p.conns.Delete(conn.RemoteAddr()) client.Close() } ``` 确定是这个协议,再接着进入IOLoop读取客户端的请求并执行请求: ```go func (p *protocolV2) IOLoop(c protocol.Client) error { var err error var line []byte var zeroTime time.Time client := c.(*clientV2) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan for { // 读取一行客户端请求 line, err = client.Reader.ReadSlice('\n') ... params := bytes.Split(line, separatorBytes) var response []byte // 执行请求 response, err = p.Exec(client, params) ... if response != nil { // 返回结果给客户端 err = p.Send(client, frameTypeResponse, response) ... } } p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } return err } ``` ### HTTP HTTP使用了httprouter这个库来进行路由 ```go func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer { log := http_api.Log(nsqd.logf) router := httprouter.New() router.HandleMethodNotAllowed = true router.PanicHandler = http_api.LogPanicHandler(nsqd.logf) router.NotFound = http_api.LogNotFoundHandler(nsqd.logf) router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf) s := &httpServer{ nsqd: nsqd, tlsEnabled: tlsEnabled, tlsRequired: tlsRequired, router: router, } router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1)) // v1 negotiate router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) ... // only v1 router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1)) ... // debug router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) ... return s } ``` 对于每个路由的Handler还加了一个http_api.Decorate: ```go type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error) type Decorator func(APIHandler) APIHandler func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle { decorated := f // 组装成调用链 for _, decorate := range ds { decorated = decorate(decorated) } return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { decorated(w, req, ps) } } ``` Decorate的作用是给Handler增加中间件,比如log中间件: ```go func Log(logf lg.AppLogFunc) Decorator { return func(f APIHandler) APIHandler { return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // 记录调用前的时间 start := time.Now() // 调用 response, err := f(w, req, ps) // 记录调用后的时间 elapsed := time.Since(start) status := 200 if e, ok := err.(Err); ok { status = e.Code } // 打印调用耗时 logf(lg.INFO, "%d %s %s (%s) %s", status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed) return response, err } } } ``` ### queueScanLoop 这个函数用时间最小堆来管理延时消息和等待确认的消息。这个函数管理了一个协程池,里面的协程叫做`queueScanWorker`,他们并发地处理所有channel。 因为channel可能会空闲而没有消息,那么这些worker就应该睡眠等待有消息再起来工作。nsq可能考虑到实时唤醒这些worker比较复杂,**于是参考了redis的概率过期算法(*probabilistic expiration algorithm*),采用轮询的方式去处理到达的消息**:每隔一段时间(默认100ms)唤醒worker让他们从本地缓存中随机选择一部分channels(默认20个),发现有消息到达的channel被标记为dirty并处理。每轮如果dirty的channel占总选择的channel的一定比例以上(默认25%),那么继续重复上面的操作,否则睡眠等待下一轮唤醒。 ```go func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) channels := n.channels() // 调整worker池大小 n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: // 每隔一段时间唤醒worker if len(channels) == 0 { continue } case <-refreshTicker.C: // 每隔一段时间刷新channel数量,以调整worker数量 channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: // nsqd退出 goto exit } num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: // 随机选取一部分的channels,将它们发送给workers进行处理 for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } // 统计标记为dirty的channel数 numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } // 如果dirty数到达一定量级,继续选取,不用睡眠 if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } exit: ... } ``` 选取一部分channel的时候是随机的,这里用了一个从n个元素中随机选择m个的算法技巧(m<=n):第 i 个元素与后面随机一个元素进行交换,不断重复上述步骤,就得到实现了这样的效果。 而每个worker的工作是从最小堆中获取消息,然后保存到内存或者磁盘队列中,等待消费: ```go func (c *Channel) put(m *Message) error { select { // 保存到内存 case c.memoryMsgChan <- m: default: // 保存到磁盘 err := writeMessageToBackend(m, c.backend) c.nsqd.SetHealth(err) if err != nil { c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.name, err) return err } } return nil } ``` ## 消息接收 上一节分析了nsq怎么处理已经接收到的消息的:基于时间最小堆,将消息保存到内存或者磁盘中等待下一步的处理。我们再回过头来看一下nsq怎么接收消息的 在demo中,我们用http接口进行消息的发送: ```go curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic" ``` 对应的handler代码为: ```go func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { // TODO: one day I'd really like to just error on chunked requests // to be able to fail "too big" requests before we even read if req.ContentLength > s.nsqd.getOpts().MaxMsgSize { return nil, http_api.Err{413, "MSG_TOO_BIG"} } // add 1 so that it's greater than our max when we test for it // (LimitReader returns a "fake" EOF) readMax := s.nsqd.getOpts().MaxMsgSize + 1 body, err := io.ReadAll(io.LimitReader(req.Body, readMax)) if err != nil { return nil, http_api.Err{500, "INTERNAL_ERROR"} } if int64(len(body)) == readMax { return nil, http_api.Err{413, "MSG_TOO_BIG"} } if len(body) == 0 { return nil, http_api.Err{400, "MSG_EMPTY"} } reqParams, topic, err := s.getTopicFromQuery(req) if err != nil { return nil, err } var deferred time.Duration if ds, ok := reqParams["defer"]; ok { var di int64 di, err = strconv.ParseInt(ds[0], 10, 64) if err != nil { return nil, http_api.Err{400, "INVALID_DEFER"} } deferred = time.Duration(di) * time.Millisecond if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout { return nil, http_api.Err{400, "INVALID_DEFER"} } } msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, "EXITING"} } return "OK", nil } ``` 。其实是在IOLoop中,开启的messagePump协程中接收消息。 ```go func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var memoryMsgChan chan *Message var backendMsgChan <-chan []byte var subChannel *Channel // NOTE: `flusherChan` is used to bound message latency for // the pathological case of a channel on a low volume topic // with >1 clients having >1 RDY counts var flusherChan <-chan time.Time var sampleRate int32 subEventChan := client.SubEventChan identifyEventChan := client.IdentifyEventChan outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout // v2 opportunistically buffers data to clients to reduce write system calls // we force flush in two cases: // 1. when the client is not ready to receive messages // 2. we're buffered and the channel has nothing left to send us // (ie. we would block in this loop anyway) // flushed := true // signal to the goroutine that started the messagePump // that we've started up close(startedChan) for { if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // if this case wins, we're either starved // or we won the race between other channels... // in either case, force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true case <-client.ReadyStateChan: case subChannel = <-subEventChan: // you can't SUB anymore subEventChan = nil case identifyData := <-identifyEventChan: // you can't IDENTIFY anymore identifyEventChan = nil outputBufferTicker.Stop() if identifyData.OutputBufferTimeout > 0 { outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout) } heartbeatTicker.Stop() heartbeatChan = nil if identifyData.HeartbeatInterval > 0 { heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval) heartbeatChan = heartbeatTicker.C } if identifyData.SampleRate > 0 { sampleRate = identifyData.SampleRate } msgTimeout = identifyData.MsgTimeout case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case b := <-backendMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } exit: p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client) heartbeatTicker.Stop() outputBufferTicker.Stop() if err != nil { p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err) } } ``` ### lookupLoop