| 'use strict'; |
| |
| const { |
| JSONParse, |
| JSONStringify, |
| StringPrototypeSplit, |
| ArrayPrototypePush, |
| Symbol, |
| TypedArrayPrototypeSubarray, |
| } = primordials; |
| const { Buffer } = require('buffer'); |
| const { StringDecoder } = require('string_decoder'); |
| const v8 = require('v8'); |
| const { isArrayBufferView } = require('internal/util/types'); |
| const assert = require('internal/assert'); |
| const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); |
| |
| const kMessageBuffer = Symbol('kMessageBuffer'); |
| const kMessageBufferSize = Symbol('kMessageBufferSize'); |
| const kJSONBuffer = Symbol('kJSONBuffer'); |
| const kStringDecoder = Symbol('kStringDecoder'); |
| |
| // Extend V8's serializer APIs to give more JSON-like behaviour in |
| // some cases; in particular, for native objects this serializes them the same |
| // way that JSON does rather than throwing an exception. |
| const kArrayBufferViewTag = 0; |
| const kNotArrayBufferViewTag = 1; |
| class ChildProcessSerializer extends v8.DefaultSerializer { |
| _writeHostObject(object) { |
| if (isArrayBufferView(object)) { |
| this.writeUint32(kArrayBufferViewTag); |
| return super._writeHostObject(object); |
| } |
| this.writeUint32(kNotArrayBufferViewTag); |
| this.writeValue({ ...object }); |
| } |
| } |
| |
| class ChildProcessDeserializer extends v8.DefaultDeserializer { |
| _readHostObject() { |
| const tag = this.readUint32(); |
| if (tag === kArrayBufferViewTag) |
| return super._readHostObject(); |
| |
| assert(tag === kNotArrayBufferViewTag); |
| return this.readValue(); |
| } |
| } |
| |
| // Messages are parsed in either of the following formats: |
| // - Newline-delimited JSON, or |
| // - V8-serialized buffers, prefixed with their length as a big endian uint32 |
| // (aka 'advanced') |
| const advanced = { |
| initMessageChannel(channel) { |
| channel[kMessageBuffer] = []; |
| channel[kMessageBufferSize] = 0; |
| channel.buffering = false; |
| }, |
| |
| *parseChannelMessages(channel, readData) { |
| if (readData.length === 0) return; |
| |
| ArrayPrototypePush(channel[kMessageBuffer], readData); |
| channel[kMessageBufferSize] += readData.length; |
| |
| // Index 0 should always be present because we just pushed data into it. |
| let messageBufferHead = channel[kMessageBuffer][0]; |
| while (messageBufferHead.length >= 4) { |
| // We call `readUInt32BE` manually here, because this is faster than first converting |
| // it to a buffer and using `readUInt32BE` on that. |
| const fullMessageSize = ( |
| messageBufferHead[0] << 24 | |
| messageBufferHead[1] << 16 | |
| messageBufferHead[2] << 8 | |
| messageBufferHead[3] |
| ) + 4; |
| |
| if (channel[kMessageBufferSize] < fullMessageSize) break; |
| |
| const concatenatedBuffer = channel[kMessageBuffer].length === 1 ? |
| channel[kMessageBuffer][0] : |
| Buffer.concat( |
| channel[kMessageBuffer], |
| channel[kMessageBufferSize] |
| ); |
| |
| const deserializer = new ChildProcessDeserializer( |
| TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize) |
| ); |
| |
| messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize); |
| channel[kMessageBufferSize] = messageBufferHead.length; |
| channel[kMessageBuffer] = |
| channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : []; |
| |
| deserializer.readHeader(); |
| yield deserializer.readValue(); |
| } |
| |
| channel.buffering = channel[kMessageBufferSize] > 0; |
| }, |
| |
| writeChannelMessage(channel, req, message, handle) { |
| const ser = new ChildProcessSerializer(); |
| // Add 4 bytes, to later populate with message length |
| ser.writeRawBytes(Buffer.allocUnsafe(4)); |
| ser.writeHeader(); |
| ser.writeValue(message); |
| |
| const serializedMessage = ser.releaseBuffer(); |
| const serializedMessageLength = serializedMessage.length - 4; |
| |
| serializedMessage.set([ |
| serializedMessageLength >> 24 & 0xFF, |
| serializedMessageLength >> 16 & 0xFF, |
| serializedMessageLength >> 8 & 0xFF, |
| serializedMessageLength & 0xFF, |
| ], 0); |
| |
| const result = channel.writeBuffer(req, serializedMessage, handle); |
| |
| // Mirror what stream_base_commons.js does for Buffer retention. |
| if (streamBaseState[kLastWriteWasAsync]) |
| req.buffer = serializedMessage; |
| |
| return result; |
| }, |
| }; |
| |
| const json = { |
| initMessageChannel(channel) { |
| channel[kJSONBuffer] = ''; |
| channel[kStringDecoder] = undefined; |
| }, |
| |
| *parseChannelMessages(channel, readData) { |
| if (readData.length === 0) return; |
| |
| if (channel[kStringDecoder] === undefined) |
| channel[kStringDecoder] = new StringDecoder('utf8'); |
| const chunks = |
| StringPrototypeSplit(channel[kStringDecoder].write(readData), '\n'); |
| const numCompleteChunks = chunks.length - 1; |
| // Last line does not have trailing linebreak |
| const incompleteChunk = chunks[numCompleteChunks]; |
| if (numCompleteChunks === 0) { |
| channel[kJSONBuffer] += incompleteChunk; |
| } else { |
| chunks[0] = channel[kJSONBuffer] + chunks[0]; |
| for (let i = 0; i < numCompleteChunks; i++) |
| yield JSONParse(chunks[i]); |
| channel[kJSONBuffer] = incompleteChunk; |
| } |
| channel.buffering = channel[kJSONBuffer].length !== 0; |
| }, |
| |
| writeChannelMessage(channel, req, message, handle) { |
| const string = JSONStringify(message) + '\n'; |
| return channel.writeUtf8String(req, string, handle); |
| }, |
| }; |
| |
| module.exports = { advanced, json }; |