/* * Copyright (c) 2016-2017 RafaƂ Michalski */ "use strict"; const assert = require('assert') , os = require('os') , fs = require('fs') , { createWriteStream, createReadStream } = fs , path = require('path') , mp = require('@royaltm/msgpack-lite') const debug = require('debug')('zmq-raft:snapshotfile'); const { open, openDir, close, closeDir, fdatasync, ftruncate, fstat, fsyncDirFileCloseDir, link, mkdirp, read, renameSyncDir, write} = require('../utils/fsutil'); const { defineConst } = require('../utils/helpers'); const { TOKEN_HEADER_SIZE , TOKEN_HEADER_BYTE_SIZE , BYTES_PER_ELEMENT , TokenFile, tokenToUint32, findToken, createTokenFile } = require('../utils/tokenfile'); const { mixin: mixinHistoryRotation, createRotateName } = require('../utils/filerotate'); const ReadyEmitter = require('../common/readyemitter'); const VERSION = 1 , HEADER_SIZE = 9 , HEADER_BODY_SIZE = HEADER_SIZE - TOKEN_HEADER_SIZE , SNAP = tokenToUint32('SNAP') , DATA = tokenToUint32('DATA') , META = tokenToUint32('META') const emptyBuf = Buffer.alloc(0); const fd$ = Symbol.for('fd') , dataOffset$ = Symbol.for('dataOffset') , setReady$ = Symbol.for('setReady') , filename$ = Symbol.for('filename') /* # Create a new snapshot file var snap = new SnapshotFile('filename'[, index, term, datasize]); # Opens existing snapshot file var snap = SnapshotFile('filename') # make sure it's ready before invoking any other method snap.ready().then(snap => ..., err => ...) # using async paradigm var snap = await new SnapshotFile(...).ready(); snap.read(position, size[, buffer, offset]) snap.write(buffer, position, length, offset) snap.sync() snap.replace('destname') snap.createDataReadStream() snap.close() @property {number} logIndex @property {number} logTerm @property {number} dataSize */ class SnapshotFile extends ReadyEmitter { /** * read or create a new SnapshotFile * * If an index, term and the dataSize is given, the new snapshot file will be created. * If only a filename is given the file must exists and SnapshotFile metadata will be read from * the provided file. * * A Reader instance may be provided as 4th argument in this instance the data will be written * to the snapshot file from the reader and the dataSize will be calculated accordingly. * * @param {string} filename * @param {uint} [index] * @param {uint} [term] * @param {uint|stream.Reader} [dataSize|reader] * @return {SnapshotFile} **/ constructor(filename, index, term, dataSize) { super(); if (!filename || 'string' !== typeof filename) throw new TypeError("SnapshotFile: filename must be a non-empty string"); this[filename$] = filename; const ready = (fd, dataOffset, logIndex, logTerm, dataSize) => { this[fd$] = fd; this[dataOffset$] = dataOffset; defineConst(this, 'logIndex', logIndex); defineConst(this, 'logTerm', logTerm); defineConst(this, 'dataSize', dataSize); debug('ready "%s"', filename); this[setReady$](); }; if (arguments.length > 1) { /* will create new snapshot file */ if (arguments.length < 4) throw new Error("SnapshotFile: filename, index, term, dataSize required to create snapshot file"); if (!Number.isFinite(index) || index % 1 !== 0 || index < 0 || index > Number.MAX_SAFE_INTEGER) throw new TypeError("index should be a positive integer"); if (!Number.isFinite(term) || term % 1 !== 0 || term < 0 || term > Number.MAX_SAFE_INTEGER) throw new TypeError("term should be a positive integer"); if (!Number.isFinite(dataSize) || dataSize % 1 !== 0 || dataSize < 0 || dataSize > Number.MAX_SAFE_INTEGER) { if ('object' !== typeof dataSize || !dataSize) { throw new TypeError("dataSize should be a positive integer or a stream.Reader"); } debug('creating "%s" logIndex: %s, logTerm: %s from stream', filename, index, term); } else debug('creating "%s" logIndex: %s, logTerm: %s, dataSize: %s', filename, index, term, dataSize); createSnapshotFile(filename, index, term, dataSize) .then(([fd, dataOffset, dataSize]) => ready(fd, dataOffset, index, term, dataSize)) .catch(err => this.error(err)); } else { debug('reading "%s"', filename); open(filename, 'r+').then(fd => readSnapshotFile(fd) .then(([dataOffset, index, term, dataSize]) => { debug('read "%s" logIndex: %s, logTerm: %s, dataSize: %s', filename, index, term, dataSize); ready(fd, dataOffset, index, term, dataSize); }, err => close(fd).then(() => { throw err; })) ).catch(err => this.error(err)); } } /** * returns a snapshot file path * * @return {string} **/ toString() { return this[filename$]; } /** * returns a snapshot file path * * @return {string} **/ get filename() { return this[filename$]; } /** * returns position at which the actual data begins (after headers) * * @return {number} **/ get dataOffset() { return this[dataOffset$]; } /** * close file * * @return {Promise} **/ close() { var fd = this[fd$]; if (fd !== undefined) { delete this[fd$]; return close(fd); } else Promise.resolve(); } /** * true if file is closed * * @property {number} **/ get isClosed() { return this[fd$] === undefined; } /** * read snapshot data * * resolves to buffer or slice of the buffer with read data * * @param {number} position - position in snapshot * @param {number} length - bytes to read * @param {Buffer} [buffer] - buffer to write data to * @param {number} [offset] - offset in the buffer to start writing at * @return {Promise} **/ read(position, length, buffer, offset) { offset >>>= 0; length >>>= 0; position = parseInt(position || 0); if (buffer === undefined) { offset = 0; buffer = Buffer.allocUnsafe(length); } if (!Buffer.isBuffer(buffer)) Promise.reject(new TypeError("SnapshotFile.read: buffer must be a Buffer instance")); if (isNaN(position) || position < 0 || position + length > this.dataSize) Promise.reject(new Error("SnapshotFile.read: position or length out of bounds")); if (length === 0) return Promise.resolve(emptyBuf); if (offset + length > buffer.length) return Promise.reject(new Error("SnapshotFile.read: offset or length exceed buffer capacity")); return read(this[fd$], buffer, offset, length, position + this[dataOffset$]) .then(() => (offset === 0 && length === buffer.length ? buffer : buffer.slice(offset, offset + length))); } /** * write snapshot data * * resolves to bytes written * * index must be <= nextIndex and >= firstAvailableIndex * if index is less than nextIndex the log is first being * rolled back to the entry before the index * * @param {Buffer} buffer - buffer to write data from * @param {number} position - position in snapshot * @param {number} length - bytes to write * @param {number} [offset] - offset in the buffer to start writing at * @return {Promise} **/ write(buffer, position, length, offset) { offset >>>= 0; length >>>= 0; position = parseInt(position || 0); if (!Buffer.isBuffer(buffer)) Promise.reject(new TypeError("SnapshotFile.write: buffer must be a Buffer instance")); if (isNaN(position) || position < 0 || position + length > this.dataSize) Promise.reject(new Error("SnapshotFile.write: position or length out of bounds")); if (length === 0) return Promise.resolve(0); if (offset + length > buffer.length) return Promise.reject(new Error("SnapshotFile.write: offset or length exceed buffer capacity")); return write(this[fd$], buffer, offset, length, position + this[dataOffset$]) } /** * ensures snapshot data is durable * * @return {Promise} **/ sync() { return fdatasync(this[fd$]); } /** * atomically replaces destination file with current snapshot file creating backup if needed * * @param {string} destname * @return {Promise} **/ replace(destname) { if (!destname || 'string' !== typeof destname) throw new TypeError("SnapshotFile.replace: destname must be a non-empty string"); var filename = this.filename; if (destname === filename) throw new Error("SnapshotFile.replace: destname is not different from filename"); return this.sync() .then(() => link(destname, createRotateName(destname)) .catch(err => { if (err.code !== 'ENOENT') throw err; }) ).then(() => renameSyncDir(filename, destname) ).then(() => { this.triggerHistoryRotation(); debug('file "%s" replaced "%s"', filename, destname); return this[filename$] = destname; }); } /** * create data read stream * * @param {number} [position] * @return {ReadStream} **/ createDataReadStream(position) { position = parseInt(position || 0); if (isNaN(position) || position < 0 || position > this.dataSize) throw new Error("SnapshotFile.createDataReadStream: position out of bounds"); return createReadStream(null, {fd: this[fd$], autoClose: false, start: position + this[dataOffset$]}); } /** * make a TokenFile object for further inspection * * @return {TokenFile} **/ makeTokenFile() { return new TokenFile(this[fd$], 0); } } defineConst(SnapshotFile, 'VERSION', VERSION); mixinHistoryRotation(SnapshotFile.prototype, debug); module.exports = exports = SnapshotFile; /* utils */ function createSnapshotFile(filename, index, term, dataSize, reader) { if ('object' === typeof dataSize && 'function' === typeof dataSize.pipe) { reader = dataSize, dataSize = 0; } return openDir(path.dirname(filename)).then(dirfd => { if (reader) dataSize = 0; return createTokenFile(filename).then(tokenFile => { const fd = tokenFile.fd , headerByteSize = HEADER_BODY_SIZE * BYTES_PER_ELEMENT , headerBuf = Buffer.allocUnsafe(headerByteSize) , header = new Uint32Array(headerBuf.buffer, headerBuf.byteOffset, HEADER_BODY_SIZE) header[0] = VERSION; header[1] = index >>> 0; header[2] = index / 0x100000000 >>> 0; header[3] = term >>> 0; header[4] = term / 0x100000000 >>> 0; header[5] = dataSize >>> 0; header[6] = dataSize / 0x100000000 >>> 0; const meta = mp.encode({created: new Date().toJSON(), hostname: os.hostname()}); tokenFile.appendToken(SNAP, headerByteSize, headerBuf); tokenFile.appendToken(META, meta.length, meta); return tokenFile.appendToken(DATA, 0) .then(dataOffset => { if (reader) { const writer = createWriteStream(null, {fd: fd, autoClose: false, start: dataOffset}); const promise = new Promise((resolve, reject) => { writer.on('finish', resolve).on('error', reject); }); reader.on('data', chunk => dataSize += chunk.length); reader.pipe(writer); return promise.then(() => { header[0] = dataSize >>> 0; header[1] = dataSize / 0x100000000 >>> 0; return write(fd, headerBuf, 0, 2 * BYTES_PER_ELEMENT, 5 * BYTES_PER_ELEMENT + TOKEN_HEADER_BYTE_SIZE) .then(() => dataOffset); }); } else return dataOffset; }) .then(dataOffset => fsyncDirFileCloseDir(dirfd, fd).then(() => [fd, dataOffset, dataSize])) .catch(err => { fs.close(fd); throw err; }); }).catch(err => { closeDir(dirfd); throw err; }); }).catch(err => { if (err.code !== 'ENOENT') throw(err); return mkdirp(path.dirname(filename)).then(() => createSnapshotFile(filename, index, term, dataSize, reader)); }); } function readSnapshotFile(fd) { const headerByteSize = HEADER_SIZE * BYTES_PER_ELEMENT; const headerBuf = Buffer.allocUnsafe(headerByteSize); const header = new Uint32Array(headerBuf.buffer, headerBuf.byteOffset, HEADER_SIZE); return read(fd, headerBuf, 0, headerByteSize, 0).then(() => { if (header[0] !== SNAP || header[1] !== headerByteSize - TOKEN_HEADER_BYTE_SIZE || header[2] !== VERSION) { throw new Error("readSnapshotFile: snapshot file type mismatch"); } var index = header[3] + header[4] * 0x100000000; var term = header[5] + header[6] * 0x100000000; var dataSize = header[7] + header[8] * 0x100000000; return findToken(fd, DATA, headerByteSize) .then(([dataOffset]) => fstat(fd).then(stat => { if (stat.size - dataOffset !== dataSize) throw new Error("readSnapshotFile: invalid snapshot file"); return [dataOffset, index, term, dataSize]; })); }); } /* var SnapshotFile = require('./lib/common/snapshotfile'); var size = 1024*1024*1024,total = 10*size var snap = new SnapshotFile('./tmp/snap.tmp',1,0,total); var snap = new SnapshotFile('./tmp/snap'); snap.ready().then(n=>console.log(n), console.error) async function writeAll() { await snap.ready(); var i = 0; for(var i = 0; i < total; i+=size) { let data = crypto.randomBytes(size); await snap.write(data, i, data.length); console.log(i); } await snap.replace('./tmp/snap'); console.log('replaced'); } writeAll().then(n=>console.log(n), console.error) var snap2 = new SnapshotFile('./tmp/snap.tmp',1,0,snap.createDataReadStream(0)); snap2.ready().then(n=>console.log(n), console.error) snap.logIndex snap.logTerm snap.dataSize snap.write(crypto.randomBytes(100), 0, 100).then(n=>console.log(n), console.error) snap.replace('snap').then(n=>console.log(n), console.error) snap.close() var snap = new SnapshotFile('snap').on('ready', ()=>console.log('ok')).on('error', console.error); var s=snap.createDataReadStream(0) */