const proc = typeof process === 'object' && process ? process : { stdout: null, stderr: null, }; import { EventEmitter } from 'node:events'; import Stream from 'node:stream'; import { StringDecoder } from 'node:string_decoder'; /** * Return true if the argument is a Minipass stream, Node stream, or something * else that Minipass can interact with. */ export const isStream = (s) => !!s && typeof s === 'object' && (s instanceof Minipass || s instanceof Stream || isReadable(s) || isWritable(s)); /** * Return true if the argument is a valid {@link Minipass.Readable} */ export const isReadable = (s) => !!s && typeof s === 'object' && s instanceof EventEmitter && typeof s.pipe === 'function' && // node core Writable streams have a pipe() method, but it throws s.pipe !== Stream.Writable.prototype.pipe; /** * Return true if the argument is a valid {@link Minipass.Writable} */ export const isWritable = (s) => !!s && typeof s === 'object' && s instanceof EventEmitter && typeof s.write === 'function' && typeof s.end === 'function'; const EOF = Symbol('EOF'); const MAYBE_EMIT_END = Symbol('maybeEmitEnd'); const EMITTED_END = Symbol('emittedEnd'); const EMITTING_END = Symbol('emittingEnd'); const EMITTED_ERROR = Symbol('emittedError'); const CLOSED = Symbol('closed'); const READ = Symbol('read'); const FLUSH = Symbol('flush'); const FLUSHCHUNK = Symbol('flushChunk'); const ENCODING = Symbol('encoding'); const DECODER = Symbol('decoder'); const FLOWING = Symbol('flowing'); const PAUSED = Symbol('paused'); const RESUME = Symbol('resume'); const BUFFER = Symbol('buffer'); const PIPES = Symbol('pipes'); const BUFFERLENGTH = Symbol('bufferLength'); const BUFFERPUSH = Symbol('bufferPush'); const BUFFERSHIFT = Symbol('bufferShift'); const OBJECTMODE = Symbol('objectMode'); // internal event when stream is destroyed const DESTROYED = Symbol('destroyed'); // internal event when stream has an error const ERROR = Symbol('error'); const EMITDATA = Symbol('emitData'); const EMITEND = Symbol('emitEnd'); const EMITEND2 = Symbol('emitEnd2'); const ASYNC = Symbol('async'); const ABORT = Symbol('abort'); const ABORTED = Symbol('aborted'); const SIGNAL = Symbol('signal'); const DATALISTENERS = Symbol('dataListeners'); const DISCARDED = Symbol('discarded'); const defer = (fn) => Promise.resolve().then(fn); const nodefer = (fn) => fn(); const isEndish = (ev) => ev === 'end' || ev === 'finish' || ev === 'prefinish'; const isArrayBufferLike = (b) => b instanceof ArrayBuffer || (!!b && typeof b === 'object' && b.constructor && b.constructor.name === 'ArrayBuffer' && b.byteLength >= 0); const isArrayBufferView = (b) => !Buffer.isBuffer(b) && ArrayBuffer.isView(b); /** * Internal class representing a pipe to a destination stream. * * @internal */ class Pipe { src; dest; opts; ondrain; constructor(src, dest, opts) { this.src = src; this.dest = dest; this.opts = opts; this.ondrain = () => src[RESUME](); this.dest.on('drain', this.ondrain); } unpipe() { this.dest.removeListener('drain', this.ondrain); } // only here for the prototype /* c8 ignore start */ proxyErrors(_er) { } /* c8 ignore stop */ end() { this.unpipe(); if (this.opts.end) this.dest.end(); } } /** * Internal class representing a pipe to a destination stream where * errors are proxied. * * @internal */ class PipeProxyErrors extends Pipe { unpipe() { this.src.removeListener('error', this.proxyErrors); super.unpipe(); } constructor(src, dest, opts) { super(src, dest, opts); this.proxyErrors = er => dest.emit('error', er); src.on('error', this.proxyErrors); } } const isObjectModeOptions = (o) => !!o.objectMode; const isEncodingOptions = (o) => !o.objectMode && !!o.encoding && o.encoding !== 'buffer'; /** * Main export, the Minipass class * * `RType` is the type of data emitted, defaults to Buffer * * `WType` is the type of data to be written, if RType is buffer or string, * then any {@link Minipass.ContiguousData} is allowed. * * `Events` is the set of event handler signatures that this object * will emit, see {@link Minipass.Events} */ export class Minipass extends EventEmitter { [FLOWING] = false; [PAUSED] = false; [PIPES] = []; [BUFFER] = []; [OBJECTMODE]; [ENCODING]; [ASYNC]; [DECODER]; [EOF] = false; [EMITTED_END] = false; [EMITTING_END] = false; [CLOSED] = false; [EMITTED_ERROR] = null; [BUFFERLENGTH] = 0; [DESTROYED] = false; [SIGNAL]; [ABORTED] = false; [DATALISTENERS] = 0; [DISCARDED] = false; /** * true if the stream can be written */ writable = true; /** * true if the stream can be read */ readable = true; /** * If `RType` is Buffer, then options do not need to be provided. * Otherwise, an options object must be provided to specify either * {@link Minipass.SharedOptions.objectMode} or * {@link Minipass.SharedOptions.encoding}, as appropriate. */ constructor(...args) { const options = (args[0] || {}); super(); if (options.objectMode && typeof options.encoding === 'string') { throw new TypeError('Encoding and objectMode may not be used together'); } if (isObjectModeOptions(options)) { this[OBJECTMODE] = true; this[ENCODING] = null; } else if (isEncodingOptions(options)) { this[ENCODING] = options.encoding; this[OBJECTMODE] = false; } else { this[OBJECTMODE] = false; this[ENCODING] = null; } this[ASYNC] = !!options.async; this[DECODER] = this[ENCODING] ? new StringDecoder(this[ENCODING]) : null; //@ts-ignore - private option for debugging and testing if (options && options.debugExposeBuffer === true) { Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] }); } //@ts-ignore - private option for debugging and testing if (options && options.debugExposePipes === true) { Object.defineProperty(this, 'pipes', { get: () => this[PIPES] }); } const { signal } = options; if (signal) { this[SIGNAL] = signal; if (signal.aborted) { this[ABORT](); } else { signal.addEventListener('abort', () => this[ABORT]()); } } } /** * The amount of data stored in the buffer waiting to be read. * * For Buffer strings, this will be the total byte length. * For string encoding streams, this will be the string character length, * according to JavaScript's `string.length` logic. * For objectMode streams, this is a count of the items waiting to be * emitted. */ get bufferLength() { return this[BUFFERLENGTH]; } /** * The `BufferEncoding` currently in use, or `null` */ get encoding() { return this[ENCODING]; } /** * @deprecated - This is a read only property */ set encoding(_enc) { throw new Error('Encoding must be set at instantiation time'); } /** * @deprecated - Encoding may only be set at instantiation time */ setEncoding(_enc) { throw new Error('Encoding must be set at instantiation time'); } /** * True if this is an objectMode stream */ get objectMode() { return this[OBJECTMODE]; } /** * @deprecated - This is a read-only property */ set objectMode(_om) { throw new Error('objectMode must be set at instantiation time'); } /** * true if this is an async stream */ get ['async']() { return this[ASYNC]; } /** * Set to true to make this stream async. * * Once set, it cannot be unset, as this would potentially cause incorrect * behavior. Ie, a sync stream can be made async, but an async stream * cannot be safely made sync. */ set ['async'](a) { this[ASYNC] = this[ASYNC] || !!a; } // drop everything and get out of the flow completely [ABORT]() { this[ABORTED] = true; this.emit('abort', this[SIGNAL]?.reason); this.destroy(this[SIGNAL]?.reason); } /** * True if the stream has been aborted. */ get aborted() { return this[ABORTED]; } /** * No-op setter. Stream aborted status is set via the AbortSignal provided * in the constructor options. */ set aborted(_) { } write(chunk, encoding, cb) { if (this[ABORTED]) return false; if (this[EOF]) throw new Error('write after end'); if (this[DESTROYED]) { this.emit('error', Object.assign(new Error('Cannot call write after a stream was destroyed'), { code: 'ERR_STREAM_DESTROYED' })); return true; } if (typeof encoding === 'function') { cb = encoding; encoding = 'utf8'; } if (!encoding) encoding = 'utf8'; const fn = this[ASYNC] ? defer : nodefer; // convert array buffers and typed array views into buffers // at some point in the future, we may want to do the opposite! // leave strings and buffers as-is // anything is only allowed if in object mode, so throw if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) { if (isArrayBufferView(chunk)) { //@ts-ignore - sinful unsafe type changing chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength); } else if (isArrayBufferLike(chunk)) { //@ts-ignore - sinful unsafe type changing chunk = Buffer.from(chunk); } else if (typeof chunk !== 'string') { throw new Error('Non-contiguous data written to non-objectMode stream'); } } // handle object mode up front, since it's simpler // this yields better performance, fewer checks later. if (this[OBJECTMODE]) { // maybe impossible? /* c8 ignore start */ if (this[FLOWING] && this[BUFFERLENGTH] !== 0) this[FLUSH](true); /* c8 ignore stop */ if (this[FLOWING]) this.emit('data', chunk); else this[BUFFERPUSH](chunk); if (this[BUFFERLENGTH] !== 0) this.emit('readable'); if (cb) fn(cb); return this[FLOWING]; } // at this point the chunk is a buffer or string // don't buffer it up or send it to the decoder if (!chunk.length) { if (this[BUFFERLENGTH] !== 0) this.emit('readable'); if (cb) fn(cb); return this[FLOWING]; } // fast-path writing strings of same encoding to a stream with // an empty buffer, skipping the buffer/decoder dance if (typeof chunk === 'string' && // unless it is a string already ready for us to use !(encoding === this[ENCODING] && !this[DECODER]?.lastNeed)) { //@ts-ignore - sinful unsafe type change chunk = Buffer.from(chunk, encoding); } if (Buffer.isBuffer(chunk) && this[ENCODING]) { //@ts-ignore - sinful unsafe type change chunk = this[DECODER].write(chunk); } // Note: flushing CAN potentially switch us into not-flowing mode if (this[FLOWING] && this[BUFFERLENGTH] !== 0) this[FLUSH](true); if (this[FLOWING]) this.emit('data', chunk); else this[BUFFERPUSH](chunk); if (this[BUFFERLENGTH] !== 0) this.emit('readable'); if (cb) fn(cb); return this[FLOWING]; } /** * Low-level explicit read method. * * In objectMode, the argument is ignored, and one item is returned if * available. * * `n` is the number of bytes (or in the case of encoding streams, * characters) to consume. If `n` is not provided, then the entire buffer * is returned, or `null` is returned if no data is available. * * If `n` is greater that the amount of data in the internal buffer, * then `null` is returned. */ read(n) { if (this[DESTROYED]) return null; this[DISCARDED] = false; if (this[BUFFERLENGTH] === 0 || n === 0 || (n && n > this[BUFFERLENGTH])) { this[MAYBE_EMIT_END](); return null; } if (this[OBJECTMODE]) n = null; if (this[BUFFER].length > 1 && !this[OBJECTMODE]) { // not object mode, so if we have an encoding, then RType is string // otherwise, must be Buffer this[BUFFER] = [ (this[ENCODING] ? this[BUFFER].join('') : Buffer.concat(this[BUFFER], this[BUFFERLENGTH])), ]; } const ret = this[READ](n || null, this[BUFFER][0]); this[MAYBE_EMIT_END](); return ret; } [READ](n, chunk) { if (this[OBJECTMODE]) this[BUFFERSHIFT](); else { const c = chunk; if (n === c.length || n === null) this[BUFFERSHIFT](); else if (typeof c === 'string') { this[BUFFER][0] = c.slice(n); chunk = c.slice(0, n); this[BUFFERLENGTH] -= n; } else { this[BUFFER][0] = c.subarray(n); chunk = c.subarray(0, n); this[BUFFERLENGTH] -= n; } } this.emit('data', chunk); if (!this[BUFFER].length && !this[EOF]) this.emit('drain'); return chunk; } end(chunk, encoding, cb) { if (typeof chunk === 'function') { cb = chunk; chunk = undefined; } if (typeof encoding === 'function') { cb = encoding; encoding = 'utf8'; } if (chunk !== undefined) this.write(chunk, encoding); if (cb) this.once('end', cb); this[EOF] = true; this.writable = false; // if we haven't written anything, then go ahead and emit, // even if we're not reading. // we'll re-emit if a new 'end' listener is added anyway. // This makes MP more suitable to write-only use cases. if (this[FLOWING] || !this[PAUSED]) this[MAYBE_EMIT_END](); return this; } // don't let the internal resume be overwritten [RESUME]() { if (this[DESTROYED]) return; if (!this[DATALISTENERS] && !this[PIPES].length) { this[DISCARDED] = true; } this[PAUSED] = false; this[FLOWING] = true; this.emit('resume'); if (this[BUFFER].length) this[FLUSH](); else if (this[EOF]) this[MAYBE_EMIT_END](); else this.emit('drain'); } /** * Resume the stream if it is currently in a paused state * * If called when there are no pipe destinations or `data` event listeners, * this will place the stream in a "discarded" state, where all data will * be thrown away. The discarded state is removed if a pipe destination or * data handler is added, if pause() is called, or if any synchronous or * asynchronous iteration is started. */ resume() { return this[RESUME](); } /** * Pause the stream */ pause() { this[FLOWING] = false; this[PAUSED] = true; this[DISCARDED] = false; } /** * true if the stream has been forcibly destroyed */ get destroyed() { return this[DESTROYED]; } /** * true if the stream is currently in a flowing state, meaning that * any writes will be immediately emitted. */ get flowing() { return this[FLOWING]; } /** * true if the stream is currently in a paused state */ get paused() { return this[PAUSED]; } [BUFFERPUSH](chunk) { if (this[OBJECTMODE]) this[BUFFERLENGTH] += 1; else this[BUFFERLENGTH] += chunk.length; this[BUFFER].push(chunk); } [BUFFERSHIFT]() { if (this[OBJECTMODE]) this[BUFFERLENGTH] -= 1; else this[BUFFERLENGTH] -= this[BUFFER][0].length; return this[BUFFER].shift(); } [FLUSH](noDrain = false) { do { } while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) && this[BUFFER].length); if (!noDrain && !this[BUFFER].length && !this[EOF]) this.emit('drain'); } [FLUSHCHUNK](chunk) { this.emit('data', chunk); return this[FLOWING]; } /** * Pipe all data emitted by this stream into the destination provided. * * Triggers the flow of data. */ pipe(dest, opts) { if (this[DESTROYED]) return dest; this[DISCARDED] = false; const ended = this[EMITTED_END]; opts = opts || {}; if (dest === proc.stdout || dest === proc.stderr) opts.end = false; else opts.end = opts.end !== false; opts.proxyErrors = !!opts.proxyErrors; // piping an ended stream ends immediately if (ended) { if (opts.end) dest.end(); } else { // "as" here just ignores the WType, which pipes don't care about, // since they're only consuming from us, and writing to the dest this[PIPES].push(!opts.proxyErrors ? new Pipe(this, dest, opts) : new PipeProxyErrors(this, dest, opts)); if (this[ASYNC]) defer(() => this[RESUME]()); else this[RESUME](); } return dest; } /** * Fully unhook a piped destination stream. * * If the destination stream was the only consumer of this stream (ie, * there are no other piped destinations or `'data'` event listeners) * then the flow of data will stop until there is another consumer or * {@link Minipass#resume} is explicitly called. */ unpipe(dest) { const p = this[PIPES].find(p => p.dest === dest); if (p) { if (this[PIPES].length === 1) { if (this[FLOWING] && this[DATALISTENERS] === 0) { this[FLOWING] = false; } this[PIPES] = []; } else this[PIPES].splice(this[PIPES].indexOf(p), 1); p.unpipe(); } } /** * Alias for {@link Minipass#on} */ addListener(ev, handler) { return this.on(ev, handler); } /** * Mostly identical to `EventEmitter.on`, with the following * behavior differences to prevent data loss and unnecessary hangs: * * - Adding a 'data' event handler will trigger the flow of data * * - Adding a 'readable' event handler when there is data waiting to be read * will cause 'readable' to be emitted immediately. * * - Adding an 'endish' event handler ('end', 'finish', etc.) which has * already passed will cause the event to be emitted immediately and all * handlers removed. * * - Adding an 'error' event handler after an error has been emitted will * cause the event to be re-emitted immediately with the error previously * raised. */ on(ev, handler) { const ret = super.on(ev, handler); if (ev === 'data') { this[DISCARDED] = false; this[DATALISTENERS]++; if (!this[PIPES].length && !this[FLOWING]) { this[RESUME](); } } else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) { super.emit('readable'); } else if (isEndish(ev) && this[EMITTED_END]) { super.emit(ev); this.removeAllListeners(ev); } else if (ev === 'error' && this[EMITTED_ERROR]) { const h = handler; if (this[ASYNC]) defer(() => h.call(this, this[EMITTED_ERROR])); else h.call(this, this[EMITTED_ERROR]); } return ret; } /** * Alias for {@link Minipass#off} */ removeListener(ev, handler) { return this.off(ev, handler); } /** * Mostly identical to `EventEmitter.off` * * If a 'data' event handler is removed, and it was the last consumer * (ie, there are no pipe destinations or other 'data' event listeners), * then the flow of data will stop until there is another consumer or * {@link Minipass#resume} is explicitly called. */ off(ev, handler) { const ret = super.off(ev, handler); // if we previously had listeners, and now we don't, and we don't // have any pipes, then stop the flow, unless it's been explicitly // put in a discarded flowing state via stream.resume(). if (ev === 'data') { this[DATALISTENERS] = this.listeners('data').length; if (this[DATALISTENERS] === 0 && !this[DISCARDED] && !this[PIPES].length) { this[FLOWING] = false; } } return ret; } /** * Mostly identical to `EventEmitter.removeAllListeners` * * If all 'data' event handlers are removed, and they were the last consumer * (ie, there are no pipe destinations), then the flow of data will stop * until there is another consumer or {@link Minipass#resume} is explicitly * called. */ removeAllListeners(ev) { const ret = super.removeAllListeners(ev); if (ev === 'data' || ev === undefined) { this[DATALISTENERS] = 0; if (!this[DISCARDED] && !this[PIPES].length) { this[FLOWING] = false; } } return ret; } /** * true if the 'end' event has been emitted */ get emittedEnd() { return this[EMITTED_END]; } [MAYBE_EMIT_END]() { if (!this[EMITTING_END] && !this[EMITTED_END] && !this[DESTROYED] && this[BUFFER].length === 0 && this[EOF]) { this[EMITTING_END] = true; this.emit('end'); this.emit('prefinish'); this.emit('finish'); if (this[CLOSED]) this.emit('close'); this[EMITTING_END] = false; } } /** * Mostly identical to `EventEmitter.emit`, with the following * behavior differences to prevent data loss and unnecessary hangs: * * If the stream has been destroyed, and the event is something other * than 'close' or 'error', then `false` is returned and no handlers * are called. * * If the event is 'end', and has already been emitted, then the event * is ignored. If the stream is in a paused or non-flowing state, then * the event will be deferred until data flow resumes. If the stream is * async, then handlers will be called on the next tick rather than * immediately. * * If the event is 'close', and 'end' has not yet been emitted, then * the event will be deferred until after 'end' is emitted. * * If the event is 'error', and an AbortSignal was provided for the stream, * and there are no listeners, then the event is ignored, matching the * behavior of node core streams in the presense of an AbortSignal. * * If the event is 'finish' or 'prefinish', then all listeners will be * removed after emitting the event, to prevent double-firing. */ emit(ev, ...args) { const data = args[0]; // error and close are only events allowed after calling destroy() if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED]) { return false; } else if (ev === 'data') { return !this[OBJECTMODE] && !data ? false : this[ASYNC] ? (defer(() => this[EMITDATA](data)), true) : this[EMITDATA](data); } else if (ev === 'end') { return this[EMITEND](); } else if (ev === 'close') { this[CLOSED] = true; // don't emit close before 'end' and 'finish' if (!this[EMITTED_END] && !this[DESTROYED]) return false; const ret = super.emit('close'); this.removeAllListeners('close'); return ret; } else if (ev === 'error') { this[EMITTED_ERROR] = data; super.emit(ERROR, data); const ret = !this[SIGNAL] || this.listeners('error').length ? super.emit('error', data) : false; this[MAYBE_EMIT_END](); return ret; } else if (ev === 'resume') { const ret = super.emit('resume'); this[MAYBE_EMIT_END](); return ret; } else if (ev === 'finish' || ev === 'prefinish') { const ret = super.emit(ev); this.removeAllListeners(ev); return ret; } // Some other unknown event const ret = super.emit(ev, ...args); this[MAYBE_EMIT_END](); return ret; } [EMITDATA](data) { for (const p of this[PIPES]) { if (p.dest.write(data) === false) this.pause(); } const ret = this[DISCARDED] ? false : super.emit('data', data); this[MAYBE_EMIT_END](); return ret; } [EMITEND]() { if (this[EMITTED_END]) return false; this[EMITTED_END] = true; this.readable = false; return this[ASYNC] ? (defer(() => this[EMITEND2]()), true) : this[EMITEND2](); } [EMITEND2]() { if (this[DECODER]) { const data = this[DECODER].end(); if (data) { for (const p of this[PIPES]) { p.dest.write(data); } if (!this[DISCARDED]) super.emit('data', data); } } for (const p of this[PIPES]) { p.end(); } const ret = super.emit('end'); this.removeAllListeners('end'); return ret; } /** * Return a Promise that resolves to an array of all emitted data once * the stream ends. */ async collect() { const buf = Object.assign([], { dataLength: 0, }); if (!this[OBJECTMODE]) buf.dataLength = 0; // set the promise first, in case an error is raised // by triggering the flow here. const p = this.promise(); this.on('data', c => { buf.push(c); if (!this[OBJECTMODE]) buf.dataLength += c.length; }); await p; return buf; } /** * Return a Promise that resolves to the concatenation of all emitted data * once the stream ends. * * Not allowed on objectMode streams. */ async concat() { if (this[OBJECTMODE]) { throw new Error('cannot concat in objectMode'); } const buf = await this.collect(); return (this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength)); } /** * Return a void Promise that resolves once the stream ends. */ async promise() { return new Promise((resolve, reject) => { this.on(DESTROYED, () => reject(new Error('stream destroyed'))); this.on('error', er => reject(er)); this.on('end', () => resolve()); }); } /** * Asynchronous `for await of` iteration. * * This will continue emitting all chunks until the stream terminates. */ [Symbol.asyncIterator]() { // set this up front, in case the consumer doesn't call next() // right away. this[DISCARDED] = false; let stopped = false; const stop = async () => { this.pause(); stopped = true; return { value: undefined, done: true }; }; const next = () => { if (stopped) return stop(); const res = this.read(); if (res !== null) return Promise.resolve({ done: false, value: res }); if (this[EOF]) return stop(); let resolve; let reject; const onerr = (er) => { this.off('data', ondata); this.off('end', onend); this.off(DESTROYED, ondestroy); stop(); reject(er); }; const ondata = (value) => { this.off('error', onerr); this.off('end', onend); this.off(DESTROYED, ondestroy); this.pause(); resolve({ value, done: !!this[EOF] }); }; const onend = () => { this.off('error', onerr); this.off('data', ondata); this.off(DESTROYED, ondestroy); stop(); resolve({ done: true, value: undefined }); }; const ondestroy = () => onerr(new Error('stream destroyed')); return new Promise((res, rej) => { reject = rej; resolve = res; this.once(DESTROYED, ondestroy); this.once('error', onerr); this.once('end', onend); this.once('data', ondata); }); }; return { next, throw: stop, return: stop, [Symbol.asyncIterator]() { return this; }, }; } /** * Synchronous `for of` iteration. * * The iteration will terminate when the internal buffer runs out, even * if the stream has not yet terminated. */ [Symbol.iterator]() { // set this up front, in case the consumer doesn't call next() // right away. this[DISCARDED] = false; let stopped = false; const stop = () => { this.pause(); this.off(ERROR, stop); this.off(DESTROYED, stop); this.off('end', stop); stopped = true; return { done: true, value: undefined }; }; const next = () => { if (stopped) return stop(); const value = this.read(); return value === null ? stop() : { done: false, value }; }; this.once('end', stop); this.once(ERROR, stop); this.once(DESTROYED, stop); return { next, throw: stop, return: stop, [Symbol.iterator]() { return this; }, }; } /** * Destroy a stream, preventing it from being used for any further purpose. * * If the stream has a `close()` method, then it will be called on * destruction. * * After destruction, any attempt to write data, read data, or emit most * events will be ignored. * * If an error argument is provided, then it will be emitted in an * 'error' event. */ destroy(er) { if (this[DESTROYED]) { if (er) this.emit('error', er); else this.emit(DESTROYED); return this; } this[DESTROYED] = true; this[DISCARDED] = true; // throw away all buffered data, it's never coming out this[BUFFER].length = 0; this[BUFFERLENGTH] = 0; const wc = this; if (typeof wc.close === 'function' && !this[CLOSED]) wc.close(); if (er) this.emit('error', er); // if no error to emit, still reject pending promises else this.emit(DESTROYED); return this; } /** * Alias for {@link isStream} * * Former export location, maintained for backwards compatibility. * * @deprecated */ static get isStream() { return isStream; } } //# sourceMappingURL=index.js.map