| #ifndef SRC_STREAM_BASE_H_ |
| #define SRC_STREAM_BASE_H_ |
| |
| #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
| |
| #include "env.h" |
| #include "async_wrap.h" |
| #include "node.h" |
| #include "util.h" |
| |
| #include "v8.h" |
| |
| namespace node { |
| |
| // Forward declarations |
| class Environment; |
| class ShutdownWrap; |
| class WriteWrap; |
| class StreamBase; |
| class StreamResource; |
| |
| struct StreamWriteResult { |
| bool async; |
| int err; |
| WriteWrap* wrap; |
| size_t bytes; |
| }; |
| |
| using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args); |
| |
| class StreamReq { |
| public: |
| // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot |
| // here because instances derived from StreamReq will also derive from |
| // BaseObject, and the slots are used for the identical purpose. |
| enum InternalFields { |
| kSlot = BaseObject::kSlot, |
| kStreamReqField = BaseObject::kInternalFieldCount, |
| kInternalFieldCount |
| }; |
| |
| inline explicit StreamReq( |
| StreamBase* stream, |
| v8::Local<v8::Object> req_wrap_obj); |
| |
| virtual ~StreamReq() = default; |
| virtual AsyncWrap* GetAsyncWrap() = 0; |
| inline v8::Local<v8::Object> object(); |
| |
| inline void Done(int status, const char* error_str = nullptr); |
| inline void Dispose(); |
| |
| StreamBase* stream() const { return stream_; } |
| |
| static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj); |
| |
| // Sets all internal fields of `req_wrap_obj` to `nullptr`. |
| // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do, |
| // and what we use in C++ after creating these objects from their |
| // v8::ObjectTemplates, to avoid the overhead of calling the |
| // constructor explicitly. |
| static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj); |
| |
| protected: |
| virtual void OnDone(int status) = 0; |
| |
| inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj); |
| |
| private: |
| StreamBase* const stream_; |
| }; |
| |
| class ShutdownWrap : public StreamReq { |
| public: |
| inline ShutdownWrap( |
| StreamBase* stream, |
| v8::Local<v8::Object> req_wrap_obj); |
| |
| // Call stream()->EmitAfterShutdown() and dispose of this request wrap. |
| void OnDone(int status) override; |
| }; |
| |
| class WriteWrap : public StreamReq { |
| public: |
| inline void SetAllocatedStorage(AllocatedBuffer&& storage); |
| |
| inline WriteWrap( |
| StreamBase* stream, |
| v8::Local<v8::Object> req_wrap_obj); |
| |
| // Call stream()->EmitAfterWrite() and dispose of this request wrap. |
| void OnDone(int status) override; |
| |
| private: |
| AllocatedBuffer storage_; |
| }; |
| |
| |
| // This is the generic interface for objects that control Node.js' C++ streams. |
| // For example, the default `EmitToJSStreamListener` emits a stream's data |
| // as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream. |
| class StreamListener { |
| public: |
| virtual ~StreamListener(); |
| |
| // This is called when a stream wants to allocate memory before |
| // reading data into the freshly allocated buffer (i.e. it is always followed |
| // by a `OnStreamRead()` call). |
| // This memory may be statically or dynamically allocated; for example, |
| // a protocol parser may want to read data into a static buffer if it knows |
| // that all data is going to be fully handled during the next |
| // `OnStreamRead()` call. |
| // The returned buffer does not need to contain `suggested_size` bytes. |
| // The default implementation of this method returns a buffer that has exactly |
| // the suggested size and is allocated using malloc(). |
| // It is not valid to return a zero-length buffer from this method. |
| // It is not guaranteed that the corresponding `OnStreamRead()` call |
| // happens in the same event loop turn as this call. |
| virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; |
| |
| // `OnStreamRead()` is called when data is available on the socket and has |
| // been read into the buffer provided by `OnStreamAlloc()`. |
| // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer |
| // with base nullptr in case of an error. |
| // `nread` is the number of read bytes (which is at most the buffer length), |
| // or, if negative, a libuv error code. |
| virtual void OnStreamRead(ssize_t nread, |
| const uv_buf_t& buf) = 0; |
| |
| // This is called once a write has finished. `status` may be 0 or, |
| // if negative, a libuv error code. |
| // By default, this is simply passed on to the previous listener |
| // (and raises an assertion if there is none). |
| virtual void OnStreamAfterWrite(WriteWrap* w, int status); |
| |
| // This is called once a shutdown has finished. `status` may be 0 or, |
| // if negative, a libuv error code. |
| // By default, this is simply passed on to the previous listener |
| // (and raises an assertion if there is none). |
| virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); |
| |
| // This is called by the stream if it determines that it wants more data |
| // to be written to it. Not all streams support this. |
| // This callback will not be called as long as there are active writes. |
| // It is not supported by all streams; `stream->HasWantsWrite()` returns |
| // true if it is supported by a stream. |
| virtual void OnStreamWantsWrite(size_t suggested_size) {} |
| |
| // This is called immediately before the stream is destroyed. |
| virtual void OnStreamDestroy() {} |
| |
| // The stream this is currently associated with, or nullptr if there is none. |
| StreamResource* stream() const { return stream_; } |
| |
| protected: |
| // Pass along a read error to the `StreamListener` instance that was active |
| // before this one. For example, a protocol parser does not care about read |
| // errors and may instead want to let the original handler |
| // (e.g. the JS handler) take care of the situation. |
| inline void PassReadErrorToPreviousListener(ssize_t nread); |
| |
| StreamResource* stream_ = nullptr; |
| StreamListener* previous_listener_ = nullptr; |
| |
| friend class StreamResource; |
| }; |
| |
| |
| // An (incomplete) stream listener class that calls the `.oncomplete()` |
| // method of the JS objects associated with the wrap objects. |
| class ReportWritesToJSStreamListener : public StreamListener { |
| public: |
| void OnStreamAfterWrite(WriteWrap* w, int status) override; |
| void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; |
| |
| private: |
| void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); |
| }; |
| |
| |
| // A default emitter that just pushes data chunks as Buffer instances to |
| // JS land via the handle’s .ondata method. |
| class EmitToJSStreamListener : public ReportWritesToJSStreamListener { |
| public: |
| uv_buf_t OnStreamAlloc(size_t suggested_size) override; |
| void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; |
| }; |
| |
| |
| // An alternative listener that uses a custom, user-provided buffer |
| // for reading data. |
| class CustomBufferJSListener : public ReportWritesToJSStreamListener { |
| public: |
| uv_buf_t OnStreamAlloc(size_t suggested_size) override; |
| void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; |
| void OnStreamDestroy() override { delete this; } |
| |
| explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {} |
| |
| private: |
| uv_buf_t buffer_; |
| }; |
| |
| |
| // A generic stream, comparable to JS land’s `Duplex` streams. |
| // A stream is always controlled through one `StreamListener` instance. |
| class StreamResource { |
| public: |
| virtual ~StreamResource(); |
| |
| // These need to be implemented on the readable side of this stream: |
| |
| // Start reading from the underlying resource. This is called by the consumer |
| // when more data is desired. Use `EmitAlloc()` and `EmitData()` to |
| // pass data along to the consumer. |
| virtual int ReadStart() = 0; |
| // Stop reading from the underlying resource. This is called by the |
| // consumer when its buffers are full and no more data can be handled. |
| virtual int ReadStop() = 0; |
| |
| // These need to be implemented on the writable side of this stream: |
| // All of these methods may return an error code synchronously. |
| // In that case, the finish callback should *not* be called. |
| |
| // Perform a shutdown operation, and either call req_wrap->Done() when |
| // finished and return 0, return 1 for synchronous success, or |
| // a libuv error code for synchronous failures. |
| virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; |
| // Try to write as much data as possible synchronously, and modify |
| // `*bufs` and `*count` accordingly. This is a no-op by default. |
| // Return 0 for success and a libuv error code for failures. |
| virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); |
| // Initiate a write of data. If the write completes synchronously, return 0 on |
| // success (with bufs modified to indicate how much data was consumed) or a |
| // libuv error code on failure. If the write will complete asynchronously, |
| // return 0. When the write completes asynchronously, call req_wrap->Done() |
| // with 0 on success (with bufs modified to indicate how much data was |
| // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if |
| // the write completes synchronously, that is, it should never be called |
| // before DoWrite() has returned. |
| virtual int DoWrite(WriteWrap* w, |
| uv_buf_t* bufs, |
| size_t count, |
| uv_stream_t* send_handle) = 0; |
| |
| // Returns true if the stream supports the `OnStreamWantsWrite()` interface. |
| virtual bool HasWantsWrite() const { return false; } |
| |
| // Optionally, this may provide an error message to be used for |
| // failing writes. |
| virtual const char* Error() const; |
| // Clear the current error (i.e. that would be returned by Error()). |
| virtual void ClearError(); |
| |
| // Transfer ownership of this stream to `listener`. The previous listener |
| // will not receive any more callbacks while the new listener was active. |
| inline void PushStreamListener(StreamListener* listener); |
| // Remove a listener, and, if this was the currently active one, |
| // transfer ownership back to the previous listener. |
| inline void RemoveStreamListener(StreamListener* listener); |
| |
| protected: |
| // Call the current listener's OnStreamAlloc() method. |
| inline uv_buf_t EmitAlloc(size_t suggested_size); |
| // Call the current listener's OnStreamRead() method and update the |
| // stream's read byte counter. |
| inline void EmitRead( |
| ssize_t nread, |
| const uv_buf_t& buf = uv_buf_init(nullptr, 0)); |
| // Call the current listener's OnStreamAfterWrite() method. |
| inline void EmitAfterWrite(WriteWrap* w, int status); |
| // Call the current listener's OnStreamAfterShutdown() method. |
| inline void EmitAfterShutdown(ShutdownWrap* w, int status); |
| // Call the current listener's OnStreamWantsWrite() method. |
| inline void EmitWantsWrite(size_t suggested_size); |
| |
| StreamListener* listener_ = nullptr; |
| uint64_t bytes_read_ = 0; |
| uint64_t bytes_written_ = 0; |
| |
| friend class StreamListener; |
| }; |
| |
| |
| class StreamBase : public StreamResource { |
| public: |
| // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot |
| // because instances deriving from StreamBase generally also derived from |
| // BaseObject (it's possible for it not to, however). |
| enum InternalFields { |
| kSlot = BaseObject::kSlot, |
| kStreamBaseField = BaseObject::kInternalFieldCount, |
| kOnReadFunctionField, |
| kInternalFieldCount |
| }; |
| |
| static void AddMethods(Environment* env, |
| v8::Local<v8::FunctionTemplate> target); |
| |
| virtual bool IsAlive() = 0; |
| virtual bool IsClosing() = 0; |
| virtual bool IsIPCPipe(); |
| virtual int GetFD(); |
| |
| enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS }; |
| |
| v8::MaybeLocal<v8::Value> CallJSOnreadMethod( |
| ssize_t nread, |
| v8::Local<v8::ArrayBuffer> ab, |
| size_t offset = 0, |
| StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS); |
| |
| // This is named `stream_env` to avoid name clashes, because a lot of |
| // subclasses are also `BaseObject`s. |
| Environment* stream_env() const { return env_; } |
| |
| // Shut down the current stream. This request can use an existing |
| // ShutdownWrap object (that was created in JS), or a new one will be created. |
| // Returns 1 in case of a synchronous completion, 0 in case of asynchronous |
| // completion, and a libuv error case in case of synchronous failure. |
| inline int Shutdown( |
| v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); |
| |
| // Write data to the current stream. This request can use an existing |
| // WriteWrap object (that was created in JS), or a new one will be created. |
| // This will first try to write synchronously using `DoTryWrite()`, then |
| // asynchronously using `DoWrite()`. |
| // If the return value indicates a synchronous completion, no callback will |
| // be invoked. |
| inline StreamWriteResult Write( |
| uv_buf_t* bufs, |
| size_t count, |
| uv_stream_t* send_handle = nullptr, |
| v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>()); |
| |
| // These can be overridden by subclasses to get more specific wrap instances. |
| // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap |
| // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like |
| // an associated libuv request. |
| virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object); |
| virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object); |
| |
| // One of these must be implemented |
| virtual AsyncWrap* GetAsyncWrap() = 0; |
| virtual v8::Local<v8::Object> GetObject(); |
| |
| static inline StreamBase* FromObject(v8::Local<v8::Object> obj); |
| |
| protected: |
| inline explicit StreamBase(Environment* env); |
| |
| // JS Methods |
| int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args); |
| int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args); |
| int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args); |
| int Writev(const v8::FunctionCallbackInfo<v8::Value>& args); |
| int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); |
| template <enum encoding enc> |
| int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args); |
| int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); |
| |
| static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args); |
| static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args); |
| static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args); |
| static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args); |
| inline void AttachToObject(v8::Local<v8::Object> obj); |
| |
| template <int (StreamBase::*Method)( |
| const v8::FunctionCallbackInfo<v8::Value>& args)> |
| static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args); |
| |
| // Internal, used only in StreamBase methods + env.cc. |
| enum StreamBaseStateFields { |
| kReadBytesOrError, |
| kArrayBufferOffset, |
| kBytesWritten, |
| kLastWriteWasAsync, |
| kNumStreamBaseStateFields |
| }; |
| |
| private: |
| Environment* env_; |
| EmitToJSStreamListener default_listener_; |
| |
| void SetWriteResult(const StreamWriteResult& res); |
| static void AddMethod(Environment* env, |
| v8::Local<v8::Signature> sig, |
| enum v8::PropertyAttribute attributes, |
| v8::Local<v8::FunctionTemplate> t, |
| JSMethodFunction* stream_method, |
| v8::Local<v8::String> str); |
| |
| friend class WriteWrap; |
| friend class ShutdownWrap; |
| friend class Environment; // For kNumStreamBaseStateFields. |
| }; |
| |
| |
| // These are helpers for creating `ShutdownWrap`/`WriteWrap` instances. |
| // `OtherBase` must have a constructor that matches the `AsyncWrap` |
| // constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature |
| // and be a subclass of `AsyncWrap`. |
| template <typename OtherBase> |
| class SimpleShutdownWrap : public ShutdownWrap, public OtherBase { |
| public: |
| SimpleShutdownWrap(StreamBase* stream, |
| v8::Local<v8::Object> req_wrap_obj); |
| |
| AsyncWrap* GetAsyncWrap() override { return this; } |
| |
| SET_NO_MEMORY_INFO() |
| SET_MEMORY_INFO_NAME(SimpleShutdownWrap) |
| SET_SELF_SIZE(SimpleShutdownWrap) |
| }; |
| |
| template <typename OtherBase> |
| class SimpleWriteWrap : public WriteWrap, public OtherBase { |
| public: |
| SimpleWriteWrap(StreamBase* stream, |
| v8::Local<v8::Object> req_wrap_obj); |
| |
| AsyncWrap* GetAsyncWrap() override { return this; } |
| |
| SET_NO_MEMORY_INFO() |
| SET_MEMORY_INFO_NAME(SimpleWriteWrap) |
| SET_SELF_SIZE(SimpleWriteWrap) |
| }; |
| |
| } // namespace node |
| |
| #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |
| |
| #endif // SRC_STREAM_BASE_H_ |