| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "connections/implementation/base_endpoint_channel.h" |
| |
| #include <functional> |
| #include <string> |
| #include <utility> |
| |
| #include "securegcm/d2d_connection_context_v1.h" |
| #include "securegcm/ukey2_handshake.h" |
| #include "gmock/gmock.h" |
| #include "protobuf-matchers/protocol-buffer-matchers.h" |
| #include "gtest/gtest.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "connections/implementation/encryption_runner.h" |
| #include "connections/implementation/offline_frames.h" |
| #include "internal/platform/byte_array.h" |
| #include "internal/platform/exception.h" |
| #include "internal/platform/input_stream.h" |
| #include "internal/platform/output_stream.h" |
| #include "internal/platform/count_down_latch.h" |
| #include "internal/platform/logging.h" |
| #include "internal/platform/multi_thread_executor.h" |
| #include "internal/platform/pipe.h" |
| #include "internal/platform/single_thread_executor.h" |
| #include "proto/connections_enums.pb.h" |
| |
| namespace location { |
| namespace nearby { |
| namespace connections { |
| namespace { |
| |
| using ::location::nearby::proto::connections::DisconnectionReason; |
| using ::location::nearby::proto::connections::Medium; |
| using EncryptionContext = BaseEndpointChannel::EncryptionContext; |
| |
| class TestEndpointChannel : public BaseEndpointChannel { |
| public: |
| explicit TestEndpointChannel(InputStream* input, OutputStream* output) |
| : BaseEndpointChannel("channel", input, output) {} |
| |
| MOCK_METHOD(Medium, GetMedium, (), (const override)); |
| MOCK_METHOD(void, CloseImpl, (), (override)); |
| }; |
| |
| std::function<void()> MakeDataPump( |
| std::string label, InputStream* input, OutputStream* output, |
| std::function<void(const ByteArray&)> monitor = nullptr) { |
| return [label, input, output, monitor]() { |
| NEARBY_LOGS(INFO) << "streaming data through '" << label << "'"; |
| while (true) { |
| auto read_response = input->Read(Pipe::kChunkSize); |
| if (!read_response.ok()) { |
| NEARBY_LOGS(INFO) << "Peer reader closed on '" << label << "'"; |
| output->Close(); |
| break; |
| } |
| if (monitor) { |
| monitor(read_response.result()); |
| } |
| auto write_response = output->Write(read_response.result()); |
| if (write_response.Raised()) { |
| NEARBY_LOGS(INFO) << "Peer writer closed on '" << label << "'"; |
| input->Close(); |
| break; |
| } |
| } |
| NEARBY_LOGS(INFO) << "streaming terminated on '" << label << "'"; |
| }; |
| } |
| |
| std::function<void(const ByteArray&)> MakeDataMonitor(const std::string& label, |
| std::string* capture, |
| absl::Mutex* mutex) { |
| return [label, capture, mutex](const ByteArray& input) mutable { |
| std::string s = std::string(input); |
| { |
| absl::MutexLock lock(mutex); |
| *capture += s; |
| } |
| NEARBY_LOGS(INFO) << "source='" << label << "'" |
| << "; message='" << s << "'"; |
| }; |
| } |
| |
| std::pair<std::shared_ptr<EncryptionContext>, |
| std::shared_ptr<EncryptionContext>> |
| DoDhKeyExchange(BaseEndpointChannel* channel_a, |
| BaseEndpointChannel* channel_b) { |
| std::shared_ptr<EncryptionContext> context_a; |
| std::shared_ptr<EncryptionContext> context_b; |
| EncryptionRunner crypto_a; |
| EncryptionRunner crypto_b; |
| ClientProxy proxy_a; |
| ClientProxy proxy_b; |
| CountDownLatch latch(2); |
| crypto_a.StartClient( |
| &proxy_a, "endpoint_id", channel_a, |
| { |
| .on_success_cb = |
| [&latch, &context_a]( |
| const std::string& endpoint_id, |
| std::unique_ptr<securegcm::UKey2Handshake> ukey2, |
| const std::string& auth_token, |
| const ByteArray& raw_auth_token) { |
| NEARBY_LOGS(INFO) << "client-A side key negotiation done"; |
| EXPECT_TRUE(ukey2->VerifyHandshake()); |
| auto context = ukey2->ToConnectionContext(); |
| EXPECT_NE(context, nullptr); |
| context_a = std::move(context); |
| latch.CountDown(); |
| }, |
| .on_failure_cb = |
| [&latch](const std::string& endpoint_id, |
| EndpointChannel* channel) { |
| NEARBY_LOGS(INFO) << "client-A side key negotiation failed"; |
| latch.CountDown(); |
| }, |
| }); |
| crypto_b.StartServer( |
| &proxy_b, "endpoint_id", channel_b, |
| { |
| .on_success_cb = |
| [&latch, &context_b]( |
| const std::string& endpoint_id, |
| std::unique_ptr<securegcm::UKey2Handshake> ukey2, |
| const std::string& auth_token, |
| const ByteArray& raw_auth_token) { |
| NEARBY_LOGS(INFO) << "client-B side key negotiation done"; |
| EXPECT_TRUE(ukey2->VerifyHandshake()); |
| auto context = ukey2->ToConnectionContext(); |
| EXPECT_NE(context, nullptr); |
| context_b = std::move(context); |
| latch.CountDown(); |
| }, |
| .on_failure_cb = |
| [&latch](const std::string& endpoint_id, |
| EndpointChannel* channel) { |
| NEARBY_LOGS(INFO) << "client-B side key negotiation failed"; |
| latch.CountDown(); |
| }, |
| }); |
| EXPECT_TRUE(latch.Await(absl::Milliseconds(5000)).result()); |
| return std::make_pair(std::move(context_a), std::move(context_b)); |
| } |
| |
| TEST(BaseEndpointChannelTest, ConstructorDestructorWorks) { |
| Pipe pipe; |
| InputStream& input_stream = pipe.GetInputStream(); |
| OutputStream& output_stream = pipe.GetOutputStream(); |
| |
| TestEndpointChannel test_channel(&input_stream, &output_stream); |
| } |
| |
| TEST(BaseEndpointChannelTest, ReadWrite) { |
| // Direct not-encrypted IO. |
| Pipe pipe_a; // channel_a writes to pipe_a, reads from pipe_b. |
| Pipe pipe_b; // channel_b writes to pipe_b, reads from pipe_a. |
| TestEndpointChannel channel_a(&pipe_b.GetInputStream(), |
| &pipe_a.GetOutputStream()); |
| TestEndpointChannel channel_b(&pipe_a.GetInputStream(), |
| &pipe_b.GetOutputStream()); |
| ByteArray tx_message{"data message"}; |
| channel_a.Write(tx_message); |
| ByteArray rx_message = std::move(channel_b.Read().result()); |
| EXPECT_EQ(rx_message, tx_message); |
| } |
| |
| TEST(BaseEndpointChannelTest, NotEncryptedReadWriteCanBeIntercepted) { |
| // Not encrypted IO; MITM scenario. |
| |
| // Setup test communication environment. |
| absl::Mutex mutex; |
| std::string capture_a; |
| std::string capture_b; |
| Pipe client_a; // Channel "a" writes to client "a", reads from server "a". |
| Pipe client_b; // Channel "b" writes to client "b", reads from server "b". |
| Pipe server_a; // Data pump "a" reads from client "a", writes to server "b". |
| Pipe server_b; // Data pump "b" reads from client "b", writes to server "a". |
| TestEndpointChannel channel_a(&server_a.GetInputStream(), |
| &client_a.GetOutputStream()); |
| TestEndpointChannel channel_b(&server_b.GetInputStream(), |
| &client_b.GetOutputStream()); |
| |
| ON_CALL(channel_a, GetMedium).WillByDefault([]() { return Medium::BLE; }); |
| ON_CALL(channel_b, GetMedium).WillByDefault([]() { return Medium::BLE; }); |
| |
| MultiThreadExecutor executor(2); |
| executor.Execute(MakeDataPump( |
| "pump_a", &client_a.GetInputStream(), &server_b.GetOutputStream(), |
| MakeDataMonitor("monitor_a", &capture_a, &mutex))); |
| executor.Execute(MakeDataPump( |
| "pump_b", &client_b.GetInputStream(), &server_a.GetOutputStream(), |
| MakeDataMonitor("monitor_b", &capture_b, &mutex))); |
| |
| EXPECT_EQ(channel_a.GetType(), "BLE"); |
| EXPECT_EQ(channel_b.GetType(), "BLE"); |
| |
| // Start data transfer |
| ByteArray tx_message{"data message"}; |
| channel_a.Write(tx_message); |
| ByteArray rx_message = std::move(channel_b.Read().result()); |
| |
| // Verify expectations. |
| EXPECT_EQ(rx_message, tx_message); |
| { |
| absl::MutexLock lock(&mutex); |
| std::string message{tx_message}; |
| EXPECT_TRUE(capture_a.find(message) != std::string::npos || |
| capture_b.find(message) != std::string::npos); |
| } |
| |
| // Shutdown test environment. |
| channel_a.Close(DisconnectionReason::LOCAL_DISCONNECTION); |
| channel_b.Close(DisconnectionReason::REMOTE_DISCONNECTION); |
| } |
| |
| TEST(BaseEndpointChannelTest, EncryptedReadWriteCanNotBeIntercepted) { |
| // Encrypted IO; MITM scenario. |
| |
| // Setup test communication environment. |
| absl::Mutex mutex; |
| std::string capture_a; |
| std::string capture_b; |
| Pipe client_a; // Channel "a" writes to client "a", reads from server "a". |
| Pipe client_b; // Channel "b" writes to client "b", reads from server "b". |
| Pipe server_a; // Data pump "a" reads from client "a", writes to server "b". |
| Pipe server_b; // Data pump "b" reads from client "b", writes to server "a". |
| TestEndpointChannel channel_a(&server_a.GetInputStream(), |
| &client_a.GetOutputStream()); |
| TestEndpointChannel channel_b(&server_b.GetInputStream(), |
| &client_b.GetOutputStream()); |
| |
| ON_CALL(channel_a, GetMedium).WillByDefault([]() { |
| return Medium::BLUETOOTH; |
| }); |
| ON_CALL(channel_b, GetMedium).WillByDefault([]() { |
| return Medium::BLUETOOTH; |
| }); |
| |
| MultiThreadExecutor executor(2); |
| executor.Execute(MakeDataPump( |
| "pump_a", &client_a.GetInputStream(), &server_b.GetOutputStream(), |
| MakeDataMonitor("monitor_a", &capture_a, &mutex))); |
| executor.Execute(MakeDataPump( |
| "pump_b", &client_b.GetInputStream(), &server_a.GetOutputStream(), |
| MakeDataMonitor("monitor_b", &capture_b, &mutex))); |
| |
| // Run DH key exchange; setup encryption contexts for channels. |
| auto [context_a, context_b] = DoDhKeyExchange(&channel_a, &channel_b); |
| ASSERT_NE(context_a, nullptr); |
| ASSERT_NE(context_b, nullptr); |
| channel_a.EnableEncryption(context_a); |
| channel_b.EnableEncryption(context_b); |
| |
| EXPECT_EQ(channel_a.GetType(), "ENCRYPTED_BLUETOOTH"); |
| EXPECT_EQ(channel_b.GetType(), "ENCRYPTED_BLUETOOTH"); |
| |
| // Start data transfer |
| ByteArray tx_message{"data message"}; |
| channel_a.Write(tx_message); |
| ByteArray rx_message = std::move(channel_b.Read().result()); |
| |
| // Verify expectations. |
| EXPECT_EQ(rx_message, tx_message); |
| { |
| absl::MutexLock lock(&mutex); |
| std::string message{tx_message}; |
| EXPECT_TRUE(capture_a.find(message) == std::string::npos && |
| capture_b.find(message) == std::string::npos); |
| } |
| |
| // Shutdown test environment. |
| channel_a.Close(DisconnectionReason::LOCAL_DISCONNECTION); |
| channel_b.Close(DisconnectionReason::REMOTE_DISCONNECTION); |
| } |
| |
| TEST(BaseEndpointChannelTest, CanBesuspendedAndResumed) { |
| // Setup test communication environment. |
| Pipe pipe_a; // channel_a writes to pipe_a, reads from pipe_b. |
| Pipe pipe_b; // channel_b writes to pipe_b, reads from pipe_a. |
| TestEndpointChannel channel_a(&pipe_b.GetInputStream(), |
| &pipe_a.GetOutputStream()); |
| TestEndpointChannel channel_b(&pipe_a.GetInputStream(), |
| &pipe_b.GetOutputStream()); |
| |
| ON_CALL(channel_a, GetMedium).WillByDefault([]() { |
| return Medium::WIFI_LAN; |
| }); |
| ON_CALL(channel_b, GetMedium).WillByDefault([]() { |
| return Medium::WIFI_LAN; |
| }); |
| |
| EXPECT_EQ(channel_a.GetType(), "WIFI_LAN"); |
| EXPECT_EQ(channel_b.GetType(), "WIFI_LAN"); |
| |
| // Start data transfer |
| ByteArray tx_message{"data message"}; |
| ByteArray more_message{"more data"}; |
| channel_a.Write(tx_message); |
| ByteArray rx_message = std::move(channel_b.Read().result()); |
| |
| // Pause and make sure reader blocks. |
| MultiThreadExecutor pause_resume_executor(2); |
| channel_a.Pause(); |
| pause_resume_executor.Execute([&channel_a, &more_message]() { |
| // Write will block until channel is resumed, or closed. |
| EXPECT_TRUE(channel_a.Write(more_message).Ok()); |
| }); |
| CountDownLatch latch(1); |
| ByteArray read_more; |
| pause_resume_executor.Execute([&channel_b, &read_more, &latch]() { |
| // Read will block until channel is resumed, or closed. |
| auto response = channel_b.Read(); |
| EXPECT_TRUE(response.ok()); |
| read_more = std::move(response.result()); |
| latch.CountDown(); |
| }); |
| absl::SleepFor(absl::Milliseconds(500)); |
| EXPECT_TRUE(read_more.Empty()); |
| |
| // Resume; verify that data transfer comepleted. |
| channel_a.Resume(); |
| EXPECT_TRUE(latch.Await(absl::Milliseconds(1000)).result()); |
| EXPECT_EQ(read_more, more_message); |
| |
| // Shutdown test environment. |
| channel_a.Close(DisconnectionReason::LOCAL_DISCONNECTION); |
| channel_b.Close(DisconnectionReason::REMOTE_DISCONNECTION); |
| } |
| |
| TEST(BaseEndpointChannelTest, ReadAfterInputStreamClosed) { |
| Pipe pipe; |
| InputStream& input_stream = pipe.GetInputStream(); |
| OutputStream& output_stream = pipe.GetOutputStream(); |
| |
| TestEndpointChannel test_channel(&input_stream, &output_stream); |
| |
| // Close the output stream before trying to read from the input. |
| output_stream.Close(); |
| |
| // Trying to read should fail gracefully with an IO error. |
| ExceptionOr<ByteArray> read_data = test_channel.Read(); |
| |
| ASSERT_FALSE(read_data.ok()); |
| ASSERT_TRUE(read_data.GetException().Raised(Exception::kIo)); |
| } |
| |
| TEST(BaseEndpointChannelTest, ReadUnencryptedFrameOnEncryptedChannel) { |
| // Setup test communication environment. |
| Pipe pipe_a; // channel_a writes to pipe_a, reads from pipe_b. |
| Pipe pipe_b; // channel_b writes to pipe_b, reads from pipe_a. |
| TestEndpointChannel channel_a(&pipe_b.GetInputStream(), |
| &pipe_a.GetOutputStream()); |
| TestEndpointChannel channel_b(&pipe_a.GetInputStream(), |
| &pipe_b.GetOutputStream()); |
| |
| ON_CALL(channel_a, GetMedium).WillByDefault([]() { |
| return Medium::BLUETOOTH; |
| }); |
| ON_CALL(channel_b, GetMedium).WillByDefault([]() { |
| return Medium::BLUETOOTH; |
| }); |
| |
| // Run DH key exchange; setup encryption contexts for channels. But only |
| // encrypt |channel_b|. |
| auto [context_a, context_b] = DoDhKeyExchange(&channel_a, &channel_b); |
| ASSERT_NE(context_a, nullptr); |
| ASSERT_NE(context_b, nullptr); |
| channel_b.EnableEncryption(context_b); |
| |
| EXPECT_EQ(channel_a.GetType(), "BLUETOOTH"); |
| EXPECT_EQ(channel_b.GetType(), "ENCRYPTED_BLUETOOTH"); |
| |
| // An unencrypted KeepAlive should succeed. |
| ByteArray keep_alive_message = parser::ForKeepAlive(); |
| channel_a.Write(keep_alive_message); |
| ExceptionOr<ByteArray> result = channel_b.Read(); |
| EXPECT_TRUE(result.ok()); |
| EXPECT_EQ(result.result(), keep_alive_message); |
| |
| // An unencrypted data frame should fail. |
| ByteArray tx_message{"data message"}; |
| channel_a.Write(tx_message); |
| result = channel_b.Read(); |
| EXPECT_FALSE(result.ok()); |
| EXPECT_EQ(result.exception(), Exception::kInvalidProtocolBuffer); |
| |
| // Shutdown test environment. |
| channel_a.Close(DisconnectionReason::LOCAL_DISCONNECTION); |
| channel_b.Close(DisconnectionReason::REMOTE_DISCONNECTION); |
| } |
| |
| } // namespace |
| } // namespace connections |
| } // namespace nearby |
| } // namespace location |