/*! * on-stream-end * * Copyright (c) 2015 Charlike Mike Reagent <@tunnckoCore> (http://www.tunnckocore.tk) * Released under the MIT license. */ /* jshint asi:true */ 'use strict' var once = require('onetime') var dezalgo = require('dezalgo') var isNodeStream = require('is-node-stream') var isRealObject = require('is-real-object') var isChildProcess = require('is-child-process') var isRequestStream = require('is-request-stream') /** * > Handles completion and errors of any stream - readable/writable/duplex. * * **Example** * * ```js * const fs = require('fs') * const eos = require('on-stream-end') * const readable = fs.createReadStream('README.md') * * eos(readable, err => { * if (err) return console.log('stream had an error or closed early') * console.log('stream has ended') * }) * ``` * * @name onStreamEnd * @param {Stream} `stream` stream to listen for completion * @param {Object} `opts` optional options object * @param {Function} `callback` completion callback * @api public */ module.exports = function onStreamEnd (stream, opts, callback) { if (!isNodeStream(stream) && !isRequestStream(stream) && !isChildProcess(stream)) { throw new TypeError('on-stream-end: expect `stream` to be Stream, RequestStream or ChildProcess') } if (typeof opts === 'function') { callback = opts opts = null } if (typeof callback !== 'function') { throw new TypeError('on-stream-end: expect `callback` to be function') } opts = isRealObject(opts) ? opts : {} callback = once(dezalgo(callback)) var ws = stream._writableState var rs = stream._readableState var readable = opts.readable || (opts.readable !== false && stream.readable) var writable = opts.writable || (opts.writable !== false && stream.writable) function onerror (err) { done(err) } function onlegacyfinish () { if (!stream.writable) { onfinish() } } function onfinish () { writable = false if (!readable) { done() } } function onend () { readable = false if (!writable) { done() } } function onexit (exitCode) { var err = new Error('exited with error code: ' + exitCode) err.exitCode = exitCode return done(err) } function onclose (exitCode) { var err = new Error('premature close with error code: ' + exitCode) err.exitCode = exitCode if (readable && !(rs && rs.ended)) { return done(err) } if (writable && !(ws && ws.ended)) { return done(err) } } function onrequest () { stream.req.once('finish', onfinish) } if (isRequestStream(stream)) { stream.once('complete', onfinish) stream.once('abort', onclose) if (stream.req) { onrequest() } else { stream.once('request', onrequest) } } else if (writable && !ws) { // legacy streams /* istanbul ignore next */ stream.once('end', onlegacyfinish) /* istanbul ignore next */ stream.once('close', onlegacyfinish) } if (isChildProcess(stream)) { stream.once('exit', onexit) } stream.once('end', onend) stream.once('finish', onfinish) if (opts.error !== false) { stream.once('error', onerror) } stream.once('close', onclose) function cleanup () { stream.removeListener('complete', onfinish) stream.removeListener('abort', onclose) stream.removeListener('request', onrequest) if (stream.req) { stream.req.removeListener('finish', onfinish) } stream.removeListener('end', onlegacyfinish) stream.removeListener('close', onlegacyfinish) stream.removeListener('finish', onfinish) stream.removeListener('exit', onexit) stream.removeListener('end', onend) stream.removeListener('error', onerror) stream.removeListener('close', onclose) } function done (err) { if (opts.cleanup !== false) { cleanup() } if (err && err.exitCode !== 0 && err instanceof Error) { return callback(err) } callback() } return cleanup }