| const { Writable, Readable, getStreamError } = require('streamx') |
| const FIFO = require('fast-fifo') |
| const b4a = require('b4a') |
| const headers = require('./headers') |
| |
| const EMPTY = b4a.alloc(0) |
| |
| class BufferList { |
| constructor () { |
| this.buffered = 0 |
| this.shifted = 0 |
| this.queue = new FIFO() |
| |
| this._offset = 0 |
| } |
| |
| push (buffer) { |
| this.buffered += buffer.byteLength |
| this.queue.push(buffer) |
| } |
| |
| shiftFirst (size) { |
| return this._buffered === 0 ? null : this._next(size) |
| } |
| |
| shift (size) { |
| if (size > this.buffered) return null |
| if (size === 0) return EMPTY |
| |
| let chunk = this._next(size) |
| |
| if (size === chunk.byteLength) return chunk // likely case |
| |
| const chunks = [chunk] |
| |
| while ((size -= chunk.byteLength) > 0) { |
| chunk = this._next(size) |
| chunks.push(chunk) |
| } |
| |
| return b4a.concat(chunks) |
| } |
| |
| _next (size) { |
| const buf = this.queue.peek() |
| const rem = buf.byteLength - this._offset |
| |
| if (size >= rem) { |
| const sub = this._offset ? buf.subarray(this._offset, buf.byteLength) : buf |
| this.queue.shift() |
| this._offset = 0 |
| this.buffered -= rem |
| this.shifted += rem |
| return sub |
| } |
| |
| this.buffered -= size |
| this.shifted += size |
| |
| return buf.subarray(this._offset, (this._offset += size)) |
| } |
| } |
| |
| class Source extends Readable { |
| constructor (self, header, offset) { |
| super() |
| |
| this.header = header |
| this.offset = offset |
| |
| this._parent = self |
| } |
| |
| _read (cb) { |
| if (this.header.size === 0) { |
| this.push(null) |
| } |
| if (this._parent._stream === this) { |
| this._parent._update() |
| } |
| cb(null) |
| } |
| |
| _predestroy () { |
| this._parent.destroy(getStreamError(this)) |
| } |
| |
| _detach () { |
| if (this._parent._stream === this) { |
| this._parent._stream = null |
| this._parent._missing = overflow(this.header.size) |
| this._parent._update() |
| } |
| } |
| |
| _destroy (cb) { |
| this._detach() |
| cb(null) |
| } |
| } |
| |
| class Extract extends Writable { |
| constructor (opts) { |
| super(opts) |
| |
| if (!opts) opts = {} |
| |
| this._buffer = new BufferList() |
| this._offset = 0 |
| this._header = null |
| this._stream = null |
| this._missing = 0 |
| this._longHeader = false |
| this._callback = noop |
| this._locked = false |
| this._finished = false |
| this._pax = null |
| this._paxGlobal = null |
| this._gnuLongPath = null |
| this._gnuLongLinkPath = null |
| this._filenameEncoding = opts.filenameEncoding || 'utf-8' |
| this._allowUnknownFormat = !!opts.allowUnknownFormat |
| this._unlockBound = this._unlock.bind(this) |
| } |
| |
| _unlock (err) { |
| this._locked = false |
| |
| if (err) { |
| this.destroy(err) |
| this._continueWrite(err) |
| return |
| } |
| |
| this._update() |
| } |
| |
| _consumeHeader () { |
| if (this._locked) return false |
| |
| this._offset = this._buffer.shifted |
| |
| try { |
| this._header = headers.decode(this._buffer.shift(512), this._filenameEncoding, this._allowUnknownFormat) |
| } catch (err) { |
| this._continueWrite(err) |
| return false |
| } |
| |
| if (!this._header) return true |
| |
| switch (this._header.type) { |
| case 'gnu-long-path': |
| case 'gnu-long-link-path': |
| case 'pax-global-header': |
| case 'pax-header': |
| this._longHeader = true |
| this._missing = this._header.size |
| return true |
| } |
| |
| this._locked = true |
| this._applyLongHeaders() |
| |
| if (this._header.size === 0 || this._header.type === 'directory') { |
| this.emit('entry', this._header, this._createStream(), this._unlockBound) |
| return true |
| } |
| |
| this._stream = this._createStream() |
| this._missing = this._header.size |
| |
| this.emit('entry', this._header, this._stream, this._unlockBound) |
| return true |
| } |
| |
| _applyLongHeaders () { |
| if (this._gnuLongPath) { |
| this._header.name = this._gnuLongPath |
| this._gnuLongPath = null |
| } |
| |
| if (this._gnuLongLinkPath) { |
| this._header.linkname = this._gnuLongLinkPath |
| this._gnuLongLinkPath = null |
| } |
| |
| if (this._pax) { |
| if (this._pax.path) this._header.name = this._pax.path |
| if (this._pax.linkpath) this._header.linkname = this._pax.linkpath |
| if (this._pax.size) this._header.size = parseInt(this._pax.size, 10) |
| this._header.pax = this._pax |
| this._pax = null |
| } |
| } |
| |
| _decodeLongHeader (buf) { |
| switch (this._header.type) { |
| case 'gnu-long-path': |
| this._gnuLongPath = headers.decodeLongPath(buf, this._filenameEncoding) |
| break |
| case 'gnu-long-link-path': |
| this._gnuLongLinkPath = headers.decodeLongPath(buf, this._filenameEncoding) |
| break |
| case 'pax-global-header': |
| this._paxGlobal = headers.decodePax(buf) |
| break |
| case 'pax-header': |
| this._pax = this._paxGlobal === null |
| ? headers.decodePax(buf) |
| : Object.assign({}, this._paxGlobal, headers.decodePax(buf)) |
| break |
| } |
| } |
| |
| _consumeLongHeader () { |
| this._longHeader = false |
| this._missing = overflow(this._header.size) |
| |
| const buf = this._buffer.shift(this._header.size) |
| |
| try { |
| this._decodeLongHeader(buf) |
| } catch (err) { |
| this._continueWrite(err) |
| return false |
| } |
| |
| return true |
| } |
| |
| _consumeStream () { |
| const buf = this._buffer.shiftFirst(this._missing) |
| if (buf === null) return false |
| |
| this._missing -= buf.byteLength |
| const drained = this._stream.push(buf) |
| |
| if (this._missing === 0) { |
| this._stream.push(null) |
| if (drained) this._stream._detach() |
| return drained && this._locked === false |
| } |
| |
| return drained |
| } |
| |
| _createStream () { |
| return new Source(this, this._header, this._offset) |
| } |
| |
| _update () { |
| while (this._buffer.buffered > 0 && !this.destroying) { |
| if (this._missing > 0) { |
| if (this._stream !== null) { |
| if (this._consumeStream() === false) return |
| continue |
| } |
| |
| if (this._longHeader === true) { |
| if (this._missing > this._buffer.buffered) break |
| if (this._consumeLongHeader() === false) return false |
| continue |
| } |
| |
| const ignore = this._buffer.shiftFirst(this._missing) |
| if (ignore !== null) this._missing -= ignore.byteLength |
| continue |
| } |
| |
| if (this._buffer.buffered < 512) break |
| if (this._stream !== null || this._consumeHeader() === false) return |
| } |
| |
| this._continueWrite(null) |
| } |
| |
| _continueWrite (err) { |
| const cb = this._callback |
| this._callback = noop |
| cb(err) |
| } |
| |
| _write (data, cb) { |
| this._callback = cb |
| this._buffer.push(data) |
| this._update() |
| } |
| |
| _final (cb) { |
| this._finished = this._missing === 0 && this._buffer.buffered === 0 |
| cb(this._finished ? null : new Error('Unexpected end of data')) |
| } |
| |
| _predestroy () { |
| this._continueWrite(null) |
| } |
| |
| _destroy (cb) { |
| if (this._stream) this._stream.destroy(getStreamError(this)) |
| cb(null) |
| } |
| |
| [Symbol.asyncIterator] () { |
| let error = null |
| |
| let promiseResolve = null |
| let promiseReject = null |
| |
| let entryStream = null |
| let entryCallback = null |
| |
| const extract = this |
| |
| this.on('entry', onentry) |
| this.on('error', (err) => { error = err }) |
| this.on('close', onclose) |
| |
| return { |
| [Symbol.asyncIterator] () { |
| return this |
| }, |
| next () { |
| return new Promise(onnext) |
| }, |
| return () { |
| return destroy(null) |
| }, |
| throw (err) { |
| return destroy(err) |
| } |
| } |
| |
| function consumeCallback (err) { |
| if (!entryCallback) return |
| const cb = entryCallback |
| entryCallback = null |
| cb(err) |
| } |
| |
| function onnext (resolve, reject) { |
| if (error) { |
| return reject(error) |
| } |
| |
| if (entryStream) { |
| resolve({ value: entryStream, done: false }) |
| entryStream = null |
| return |
| } |
| |
| promiseResolve = resolve |
| promiseReject = reject |
| |
| consumeCallback(null) |
| |
| if (extract._finished && promiseResolve) { |
| promiseResolve({ value: undefined, done: true }) |
| promiseResolve = promiseReject = null |
| } |
| } |
| |
| function onentry (header, stream, callback) { |
| entryCallback = callback |
| stream.on('error', noop) // no way around this due to tick sillyness |
| |
| if (promiseResolve) { |
| promiseResolve({ value: stream, done: false }) |
| promiseResolve = promiseReject = null |
| } else { |
| entryStream = stream |
| } |
| } |
| |
| function onclose () { |
| consumeCallback(error) |
| if (!promiseResolve) return |
| if (error) promiseReject(error) |
| else promiseResolve({ value: undefined, done: true }) |
| promiseResolve = promiseReject = null |
| } |
| |
| function destroy (err) { |
| extract.destroy(err) |
| consumeCallback(err) |
| return new Promise((resolve, reject) => { |
| if (extract.destroyed) return resolve({ value: undefined, done: true }) |
| extract.once('close', function () { |
| if (err) reject(err) |
| else resolve({ value: undefined, done: true }) |
| }) |
| }) |
| } |
| } |
| } |
| |
| module.exports = function extract (opts) { |
| return new Extract(opts) |
| } |
| |
| function noop () {} |
| |
| function overflow (size) { |
| size &= 511 |
| return size && 512 - size |
| } |