--- title: nsq阅读(2)——diskqueue categories: [源码阅读,nsq] --- ## 开篇 DiskQueue 是 NSQ 中的持久化组件,承担着将消息存储到磁盘的任务,它以高效、可靠的方式在内存与磁盘之间进行切换。在开始分析 nsq 那几个大件之前,先深入分析下 DiskQueue 的核心设计和实现,解密其背后的关键技术点,包括写入机制、读取逻辑以及在异常场景下的恢复策略。 ## 设计 nsq将diskqueue抽取出来作为一个库来维护: 里面只有一个源码文件`diskqueue.go`,里面核心结构体是基于磁盘的消息队列`diskQueue`,实现了nsqd中的`BackendQueue`接口,比较简单: ```go type BackendQueue interface { Put([]byte) error ReadChan() <-chan []byte Close() error Delete() error Depth() int64 Empty() error } ``` 消息以字节数组的方式传递,接收端需要对字节数组进行解码,存储端的实现就能相对简单。 diskqueue在创建时会运行一个叫做ioLoop的协程用于处理io相关的事务,并与主协程通过以下各个`chan`进行通信: ```go type diskQueue struct { ... // 队列大小(即depth在nsq中表示的就是可读消息数) depthChan chan int64 // 写磁盘的数据 writeChan chan []byte // 写磁盘的结果 writeResponseChan chan error // 清空队列 emptyChan chan int // 清空队列的结果 emptyResponseChan chan error // 退出 exitChan chan int // 主协程等待ioLoop退出 exitSyncChan chan int } ``` ## logrotation与metadata diskqueue还用了rotation的方式来限制单个文件的大小(拓展:在linux中有一个实现logrotation命令叫logrotate),这样便于在消息读出之后,通过直接删除文件的方式来删除不再需要的数据。为了实现logrotation,还会维护一些元数据: 1. 队列大小(消息数) 2. 正在读第几个文件中的第几个字节 3. 正在写第几个文件中的第几个字节 代码中通过`persistMetaData`方法来**原子性地**持久化这些元数据。元数据非常重要,如果不是原子性地写入,比如写一半就宕机了,那么所有数据将会不可用。**原子性是通过写临时文件、将临时文件重命名为最终文件名来保证的** ## 写数据 使用接口方法`Put`写入: ```go func (d *diskQueue) Put(data []byte) error { d.RLock() defer d.RUnlock() if d.exitFlag == 1 { return errors.New("exiting") } d.writeChan <- data return <-d.writeResponseChan } ``` 由`ioLoop`协程处理数据的写入,调用`writeOne`进行处理: ```go func (d *diskQueue) writeOne(data []byte) error { var err error dataLen := int32(len(data)) totalBytes := int64(4 + dataLen) if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { // 写入的大小将超出单个文件大小限制,触发logrotation,即写入下一个新文件 if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } d.writeFileNum++ d.writePos = 0 // 每次logrotation前,都sync将最后一个文件以及元数据落盘 err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } // 关闭序号为i的数据文件 if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } } // 创建序号为i+1的数据文件 if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err != nil { d.writeFile.Close() d.writeFile = nil return err } } } // 数据大小写入缓存 d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err } // 数据写入缓存 _, err = d.writeBuf.Write(data) if err != nil { return err } // 缓存写入文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { d.writeFile.Close() d.writeFile = nil return err } d.writePos += totalBytes d.depth += 1 return err } ``` 这里要注意在整个rotation的过程中并不保证原子性,而是分为三步: 1. 落盘序号为 i 的数据文件 2. 落盘元数据文件(其中的writeFileNum为 i+1) 3. 创建序号为 i+1 的数据文件 对于1、2之间发生宕机的情况,会导致数据文件大小比元数据的`writePos`要大,这里采取的措施是直接触发rotation 对于2、3之间发生宕机的话,重启之后读取序号为 i+1 的文件,报文件不存在错误但是会忽略这个错误。然后第一次写入数据时会创建这个文件,正常运行。 ## 读数据 读数据是通过`ioLoop`在每次循环中调用`readOne`实现的: ```go func (d *diskQueue) readOne() ([]byte, error) { var err error var msgSize int32 // 创建读取文件,定位到上次读的地方 if d.readFile == nil { curFileName := d.fileName(d.readFileNum) d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600) if err != nil { return nil, err } d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName) if d.readPos > 0 { _, err = d.readFile.Seek(d.readPos, 0) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } } // 对于已经归档的文件,将最大可读大小设置为文件大小 d.maxBytesPerFileRead = d.maxBytesPerFile if d.readFileNum < d.writeFileNum { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() } } // 设置reader d.reader = bufio.NewReader(d.readFile) } // 读取数据大小 err = binary.Read(d.reader, binary.BigEndian, &msgSize) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } ... // 读入数据 readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } totalBytes := int64(4 + msgSize) // 先将readPos和readFileNum先暂存到next*,等消息发送给消费者之后再更新他们 d.nextReadPos = d.readPos + totalBytes d.nextReadFileNum = d.readFileNum // 这个文件已经读完,rotate到下一个文件 if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead { if d.readFile != nil { d.readFile.Close() d.readFile = nil } d.nextReadFileNum++ d.nextReadPos = 0 } return readBuf, nil } ``` 读完之后,然后`ioLoop`中`select`其中一个分支会将数据发送到`readChan`中,给外界读取。 ## 数据落盘 为了提高数据读写的效率,消息数据不会每次写入都落盘,数据只有以下几个时机会`sync`落盘: 1. 触发rotation 2. 读写次数到达`syncEvery` 3. 按`syncTimeout`定期落盘 综上,高并发写入数据时,特别是`maxBytesPerFile`设置得比较大、`syncEvery`设置得比较大、`syncTimeout`设置得比较大时,宕机其实很容易会丢失一部分数据,这部分数据是存在操作系统的page cache当中而丢失的。 从侧面反映出,依赖于diskqueue的nsq消息队列,进行所谓的“持久化”操作时其实也会因为宕机而丢失数据。 ## 其它细节 看一下用于查询队列大小的`Depth`方法: ```go func (d *diskQueue) Depth() int64 { depth, ok := <-d.depthChan if !ok { // ioLoop exited depth = d.depth } return depth } ``` 这里优先从`depthChan`中返回深度,而不是直接返回`d.depth`,原因是`d.depth`只会被`ioLoop`协程改写,而由于调用`Depth`的一定是其它协程,读写`d.depth`时势必要加锁避免并发读写冲突。为了避免加锁带来性能上的影响,因此优先从`d.depthChan`中读取,相应的,ioLoop的`select`有一个分支是写入`d.depthChan`: ```go func (d *diskQueue) ioLoop() { for { ... select { // 写入depthChan case d.depthChan <- d.depth: // 其它分支改写d.depth case ... } } } ``` 当整个diskQeueu被关闭,`ioLoop`结束时,`d.depth`不会再被改写,此时直接返回`d.depth`