| // Copyright 2020 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "chrome/browser/nearby_sharing/instantmessaging/stream_parser.h" |
| |
| #include <string_view> |
| |
| #include "base/compiler_specific.h" |
| #include "base/logging.h" |
| #include "net/base/io_buffer.h" |
| #include "third_party/protobuf/src/google/protobuf/io/coded_stream.h" |
| #include "third_party/protobuf/src/google/protobuf/wire_format_lite.h" |
| |
| namespace { |
| |
| using ::google::protobuf::internal::WireFormatLite; |
| |
| // A buffer spare capacity limits the amount of times we need to resize the |
| // buffer when copying over data, which involves reallocating memory. |
| // We chose 512 because it is one of the larger standard sizes for |
| // a buffer, and we expect a lot of data to be received in the WebRTC |
| // signaling process. |
| constexpr int kReadBufferSpareCapacity = 512; |
| |
| // The minimum number of bytes to parse the messages or the noop field of |
| // the StreamBody proto is 2 because the size of the tag and wire type is a |
| // single byte, and the smallest size information would be contained in another |
| // single byte. |
| constexpr int kMinimumBytesToParseNextMessagesField = 2; |
| |
| } // namespace |
| |
| StreamParser::StreamParser() = default; |
| StreamParser::~StreamParser() = default; |
| |
| std::vector< |
| chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse> |
| StreamParser::Append(std::string_view data) { |
| if (!unparsed_data_buffer_) { |
| unparsed_data_buffer_ = base::MakeRefCounted<net::GrowableIOBuffer>(); |
| unparsed_data_buffer_->SetCapacity(data.size() + kReadBufferSpareCapacity); |
| } else if (unparsed_data_buffer_->RemainingCapacity() < |
| static_cast<int>(data.size())) { |
| unparsed_data_buffer_->SetCapacity(unparsed_data_buffer_->offset() + |
| data.size() + kReadBufferSpareCapacity); |
| } |
| |
| DCHECK_GE(unparsed_data_buffer_->RemainingCapacity(), |
| static_cast<int>(data.size())); |
| UNSAFE_TODO(memcpy(unparsed_data_buffer_->data(), data.data(), data.size())); |
| unparsed_data_buffer_->set_offset(unparsed_data_buffer_->offset() + |
| data.size()); |
| return ParseStreamIfAvailable(); |
| } |
| |
| std::vector< |
| chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse> |
| StreamParser::ParseStreamIfAvailable() { |
| DCHECK(unparsed_data_buffer_); |
| std::vector< |
| chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse> |
| receive_messages_responses; |
| |
| base::span<uint8_t> unparsed_bytes_available = |
| unparsed_data_buffer_->span_before_offset(); |
| if (unparsed_bytes_available.size() < kMinimumBytesToParseNextMessagesField) { |
| return receive_messages_responses; |
| } |
| |
| google::protobuf::io::CodedInputStream input_stream( |
| unparsed_bytes_available.data(), unparsed_bytes_available.size()); |
| size_t bytes_consumed = 0; |
| |
| // We can't use StreamBody::ParseFromString() here, as it can't do partial |
| // parsing, nor can it tell how many bytes are consumed. |
| bool continue_parsing = unparsed_bytes_available.size() > 0; |
| while (continue_parsing) { |
| chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse |
| parsed_response; |
| StreamParsingResult result = |
| ParseNextMessagesFieldFromStream(&input_stream, &parsed_response); |
| switch (result) { |
| case StreamParser::StreamParsingResult::kSuccessfullyParsedResponse: |
| receive_messages_responses.push_back(parsed_response); |
| [[fallthrough]]; |
| case StreamParser::StreamParsingResult::kNoop: |
| bytes_consumed = |
| base::checked_cast<size_t>(input_stream.CurrentPosition()); |
| continue_parsing = bytes_consumed < unparsed_bytes_available.size(); |
| break; |
| case StreamParser::StreamParsingResult::kNotEnoughDataYet: |
| case StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed: |
| continue_parsing = false; |
| break; |
| } |
| } |
| |
| if (bytes_consumed == 0) |
| return receive_messages_responses; |
| |
| // Shift the unread data back to the beginning of the buffer for the next |
| // iteration of reading data. |
| base::span<uint8_t> bytes_not_consumed = |
| unparsed_bytes_available.subspan(bytes_consumed); |
| unparsed_bytes_available.copy_prefix_from(bytes_not_consumed); |
| unparsed_data_buffer_->set_offset(bytes_not_consumed.size()); |
| |
| return receive_messages_responses; |
| } |
| |
| StreamParser::StreamParsingResult |
| StreamParser::ParseNextMessagesFieldFromStream( |
| google::protobuf::io::CodedInputStream* input_stream, |
| chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse* |
| parsed_response) { |
| // The WireFormat nature of protos allows for key:value pairs, each which |
| // contains the value of one proto field. The key (also called tag) for each |
| // pair is actually two values: the field number and the wire type. |
| // |
| // A typical stream looks like: |
| // [message tag][field data][message tag][field data]... |
| // where the message tag consists of the field id and the WireType, like so: |
| // [field id + WireType][field data][field id + WireType][field data]... |
| // |
| // In this case, we are only looking at the two fields of the StreamBody: |
| // "messages", which is field 1, and "noop", which is field 15. The "messages" |
| // field is the one containing the ReceiveMessageResponse proto, and the |
| // "noop" field is sent by the Tachyon server to keep the connection alive. |
| // Both of these fields we expect to be the 'bytes' data type, which the |
| // wire type says it is a length delimited value. From this, we know that the |
| // next bytes on should be a length followed by the actual data bytes (which |
| // will be read by WireFormatLite::ReadBytes). Note: this is only true when |
| // the wire type is set to WIRETYPE_LENGTH_DELIMITED and we know the field |
| // type is bytes. |
| // |
| // Therefore, for this specific instance, we expect our stream to look like |
| // this when it contains the ReceiveMessagesResponse with an InboxMessage or |
| // a FastPathReady message: |
| // [field id="messages"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data] |
| // or it will look like this when we receive a noop message: |
| // [field id="noop"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data] |
| // See https://developers.google.com/protocol-buffers/docs/encoding for |
| // further explanation. |
| |
| // A message tag of zero means we don't have a valid tag or we don't have |
| // enough bytes to read a tag. If we cannot read the tag, we likely need to |
| // wait for more bytes to be appended to the input stream. |
| uint32_t messages_tag = input_stream->ReadTag(); |
| if (messages_tag == 0) |
| return StreamParser::StreamParsingResult::kNotEnoughDataYet; |
| |
| // If we were able to read the full tag above, and the field id does not |
| // match the StreamBody messages field body or the noop field body we were |
| // expecting then we are encountering a field we are not prepared to handle. |
| // TODO(crbug.com/1217150) Add a way to read through bytes of the unknown |
| // fields to skip it, in order to be more robost to StreamBody changes. |
| int field_number = WireFormatLite::GetTagFieldNumber(messages_tag); |
| if (field_number != chrome_browser_nearby_sharing_instantmessaging:: |
| StreamBody::kMessagesFieldNumber && |
| field_number != chrome_browser_nearby_sharing_instantmessaging:: |
| StreamBody::kNoopFieldNumber) { |
| return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed; |
| } |
| |
| // WireType specifies the format of the data to follow. Here, we are verifying |
| // the data we are receiving matching the data we are expecting, which is in |
| // the form of WIRETYPE_LENGTH_DELIMITED. We expect this to be |
| // WIRETYPE_LENGTH_DELIMITED because the proto defines "messages" and "noop" |
| // field as the "bytes" type. |
| if (WireFormatLite::GetTagWireType(messages_tag) != |
| WireFormatLite::WireType::WIRETYPE_LENGTH_DELIMITED) { |
| return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed; |
| } |
| |
| // Read the byte field, not including tags of the StreamBody, which will |
| // either be "StreamBody.messages" or "StreamBody.noop". If it is not |
| // successful, we likely need to wait for more bytes to be appended to the |
| // input stream to form a complete StreamBody. This function makes the |
| // assumption that we already know the field and read the tag to determine |
| // what field to read, which is why we need the checks above. |
| std::string stream_body_field_bytes; |
| if (!WireFormatLite::ReadBytes(input_stream, &stream_body_field_bytes)) |
| return StreamParser::StreamParsingResult::kNotEnoughDataYet; |
| |
| // Now that we have a complete "StreamBody.messages" or "StreamBody.noop" |
| // bytes field, we want to properly handle it. If we have a |
| // "StreamBody.messages", we want to transform the bytes into a |
| // ReceiveMessagesResponse and append it to the vector we are returning, then |
| // we can move along and read the next data from the buffer, if applicable. |
| // "StreamBody.noop" messages may be generated as a way to keep the connection |
| // to the server alive, and it is not an error. However, these messages do not |
| // contain a ReceiveMessagesResponse, but we still want to remove this data |
| // from the buffer and continue reading the next data, if applicable. We |
| // update the |is_noop_field_| to true to tell ParseStreamIfAvailable that |
| // although it receives an std::nullopt, it should still remove the bytes |
| // from the buffer. |
| if (field_number == chrome_browser_nearby_sharing_instantmessaging:: |
| StreamBody::kNoopFieldNumber) { |
| return StreamParser::StreamParsingResult::kNoop; |
| } |
| |
| if (!parsed_response->ParseFromString(stream_body_field_bytes)) { |
| LOG(ERROR) << "Failed to parse ReceiveMessagesResponse from stream body " |
| "message bytes."; |
| return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed; |
| } |
| |
| return StreamParser::StreamParsingResult::kSuccessfullyParsedResponse; |
| } |