blob: c7b6c3fb5f942ef03404d2b65f760aff6d51fa5d [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "connections/implementation/payload_manager.h"
#include "gmock/gmock.h"
#include "protobuf-matchers/protocol-buffer-matchers.h"
#include "gtest/gtest.h"
#include "absl/strings/string_view.h"
#include "connections/implementation/simulation_user.h"
#include "internal/platform/byte_array.h"
#include "internal/platform/pipe.h"
#include "internal/platform/system_clock.h"
namespace location {
namespace nearby {
namespace connections {
namespace {
constexpr absl::string_view kServiceId = "service-id";
constexpr absl::string_view kDeviceA = "device-a";
constexpr absl::string_view kDeviceB = "device-b";
constexpr absl::string_view kMessage = "message";
constexpr absl::Duration kProgressTimeout = absl::Milliseconds(1000);
constexpr absl::Duration kDefaultTimeout = absl::Milliseconds(1000);
constexpr BooleanMediumSelector kTestCases[] = {
BooleanMediumSelector{
.bluetooth = true,
},
BooleanMediumSelector{
.wifi_lan = true,
},
BooleanMediumSelector{
.bluetooth = true,
.wifi_lan = true,
},
};
class PayloadSimulationUser : public SimulationUser {
public:
explicit PayloadSimulationUser(
absl::string_view name,
BooleanMediumSelector allowed = BooleanMediumSelector())
: SimulationUser(std::string(name), allowed) {}
~PayloadSimulationUser() override {
NEARBY_LOGS(INFO) << "PayloadSimulationUser: [down] name=" << info_.data();
// SystemClock::Sleep(kDefaultTimeout);
}
Payload& GetPayload() { return payload_; }
void SendPayload(Payload payload) {
sender_payload_id_ = payload.GetId();
pm_.SendPayload(&client_, {discovered_.endpoint_id}, std::move(payload));
}
Status CancelPayload() {
if (sender_payload_id_) {
return pm_.CancelPayload(&client_, sender_payload_id_);
} else {
return pm_.CancelPayload(&client_, payload_.GetId());
}
}
bool IsConnected() const {
return client_.IsConnectedToEndpoint(discovered_.endpoint_id);
}
protected:
Payload::Id sender_payload_id_ = 0;
};
class PayloadManagerTest
: public ::testing::TestWithParam<BooleanMediumSelector> {
protected:
PayloadManagerTest() { env_.Stop(); }
bool SetupConnection(PayloadSimulationUser& user_a,
PayloadSimulationUser& user_b) {
user_a.StartAdvertising(std::string(kServiceId), &connection_latch_);
user_b.StartDiscovery(std::string(kServiceId), &discovery_latch_);
EXPECT_TRUE(discovery_latch_.Await(kDefaultTimeout).result());
EXPECT_EQ(user_b.GetDiscovered().service_id, kServiceId);
EXPECT_EQ(user_b.GetDiscovered().endpoint_info, user_a.GetInfo());
EXPECT_FALSE(user_b.GetDiscovered().endpoint_id.empty());
NEARBY_LOG(INFO, "EP-B: [discovered] %s",
user_b.GetDiscovered().endpoint_id.c_str());
user_b.RequestConnection(&connection_latch_);
EXPECT_TRUE(connection_latch_.Await(kDefaultTimeout).result());
EXPECT_FALSE(user_a.GetDiscovered().endpoint_id.empty());
NEARBY_LOG(INFO, "EP-A: [discovered] %s",
user_a.GetDiscovered().endpoint_id.c_str());
NEARBY_LOG(INFO, "Both users discovered their peers.");
user_a.AcceptConnection(&accept_latch_);
user_b.AcceptConnection(&accept_latch_);
EXPECT_TRUE(accept_latch_.Await(kDefaultTimeout).result());
NEARBY_LOG(INFO, "Both users reached connected state.");
return user_a.IsConnected() && user_b.IsConnected();
}
CountDownLatch discovery_latch_{1};
CountDownLatch connection_latch_{2};
CountDownLatch accept_latch_{2};
CountDownLatch payload_latch_{1};
MediumEnvironment& env_{MediumEnvironment::Instance()};
};
TEST_P(PayloadManagerTest, CanCreateOne) {
env_.Start();
PayloadSimulationUser user_a(kDeviceA, GetParam());
env_.Stop();
}
TEST_P(PayloadManagerTest, CanCreateMultiple) {
env_.Start();
PayloadSimulationUser user_a(kDeviceA, GetParam());
PayloadSimulationUser user_b(kDeviceB, GetParam());
env_.Stop();
}
TEST_P(PayloadManagerTest, CanSendBytePayload) {
env_.Start();
PayloadSimulationUser user_a(kDeviceA, GetParam());
PayloadSimulationUser user_b(kDeviceB, GetParam());
ASSERT_TRUE(SetupConnection(user_a, user_b));
user_a.ExpectPayload(payload_latch_);
user_b.SendPayload(Payload(ByteArray{std::string(kMessage)}));
EXPECT_TRUE(payload_latch_.Await(kDefaultTimeout).result());
EXPECT_EQ(user_a.GetPayload().AsBytes(), ByteArray(std::string(kMessage)));
NEARBY_LOG(INFO, "Test completed.");
user_a.Stop();
user_b.Stop();
env_.Stop();
}
TEST_P(PayloadManagerTest, CanSendStreamPayload) {
env_.Start();
PayloadSimulationUser user_a(kDeviceA, GetParam());
PayloadSimulationUser user_b(kDeviceB, GetParam());
ASSERT_TRUE(SetupConnection(user_a, user_b));
auto pipe = std::make_shared<Pipe>();
OutputStream& tx = pipe->GetOutputStream();
user_a.ExpectPayload(payload_latch_);
const ByteArray message{std::string(kMessage)};
// The first write to the output stream will send the first PAYLOAD_TRANSFER
// packet with payload info and message data.
tx.Write(message);
user_b.SendPayload(Payload([pipe]() -> InputStream& {
return pipe->GetInputStream(); // NOLINT
}));
ASSERT_TRUE(payload_latch_.Await(kDefaultTimeout).result());
ASSERT_NE(user_a.GetPayload().AsStream(), nullptr);
InputStream& rx = *user_a.GetPayload().AsStream();
NEARBY_LOG(INFO, "Stream extracted.");
EXPECT_TRUE(user_a.WaitForProgress(
[&message](const PayloadProgressInfo& info) {
return info.bytes_transferred >= message.size();
},
kProgressTimeout));
ByteArray result = rx.Read(Pipe::kChunkSize).result();
EXPECT_EQ(result, message);
NEARBY_LOG(INFO, "Packet 1 handled.");
tx.Write(message);
EXPECT_TRUE(user_a.WaitForProgress(
[&message](const PayloadProgressInfo& info) {
return info.bytes_transferred >= 2 * message.size();
},
kProgressTimeout));
ByteArray result2 = rx.Read(Pipe::kChunkSize).result();
EXPECT_EQ(result2, message);
NEARBY_LOG(INFO, "Packet 2 handled.");
rx.Close();
tx.Close();
NEARBY_LOG(INFO, "Test completed.");
user_a.Stop();
user_b.Stop();
env_.Stop();
}
TEST_P(PayloadManagerTest, CanCancelPayloadOnReceiverSide) {
env_.Start();
PayloadSimulationUser user_a(kDeviceA, GetParam());
PayloadSimulationUser user_b(kDeviceB, GetParam());
ASSERT_TRUE(SetupConnection(user_a, user_b));
auto pipe = std::make_shared<Pipe>();
OutputStream& tx = pipe->GetOutputStream();
user_a.ExpectPayload(payload_latch_);
const ByteArray message{std::string(kMessage)};
tx.Write(message);
user_b.SendPayload(Payload([pipe]() -> InputStream& {
return pipe->GetInputStream(); // NOLINT
}));
ASSERT_TRUE(payload_latch_.Await(kDefaultTimeout).result());
ASSERT_NE(user_a.GetPayload().AsStream(), nullptr);
InputStream& rx = *user_a.GetPayload().AsStream();
NEARBY_LOG(INFO, "Stream extracted.");
EXPECT_TRUE(user_a.WaitForProgress(
[&message](const PayloadProgressInfo& info) {
return info.bytes_transferred >= message.size();
},
kProgressTimeout));
ByteArray result = rx.Read(Pipe::kChunkSize).result();
EXPECT_EQ(result, message);
NEARBY_LOG(INFO, "Packet 1 handled.");
EXPECT_EQ(user_a.CancelPayload(), Status{Status::kSuccess});
NEARBY_LOG(INFO, "Stream canceled on receiver side.");
// Sender will only handle cancel event if it is sending.
// Once cancel is handled, write will fail.
int count = 0;
while (true) {
if (!tx.Write(message).Ok()) break;
SystemClock::Sleep(kDefaultTimeout);
count++;
}
ASSERT_LE(count, 10);
EXPECT_TRUE(user_a.WaitForProgress(
[status = PayloadProgressInfo::Status::kCanceled](
const PayloadProgressInfo& info) { return info.status == status; },
kProgressTimeout));
NEARBY_LOG(INFO, "Stream cancelation received.");
tx.Close();
rx.Close();
NEARBY_LOG(INFO, "Test completed.");
user_a.Stop();
user_b.Stop();
env_.Stop();
}
TEST_P(PayloadManagerTest, CanCancelPayloadOnSenderSide) {
env_.Start();
PayloadSimulationUser user_a(kDeviceA, GetParam());
PayloadSimulationUser user_b(kDeviceB, GetParam());
ASSERT_TRUE(SetupConnection(user_a, user_b));
auto pipe = std::make_shared<Pipe>();
OutputStream& tx = pipe->GetOutputStream();
user_a.ExpectPayload(payload_latch_);
const ByteArray message{std::string(kMessage)};
tx.Write(message);
user_b.SendPayload(Payload([pipe]() -> InputStream& {
return pipe->GetInputStream(); // NOLINT
}));
ASSERT_TRUE(payload_latch_.Await(kDefaultTimeout).result());
ASSERT_NE(user_a.GetPayload().AsStream(), nullptr);
InputStream& rx = *user_a.GetPayload().AsStream();
NEARBY_LOG(INFO, "Stream extracted.");
EXPECT_TRUE(user_a.WaitForProgress(
[&message](const PayloadProgressInfo& info) {
return info.bytes_transferred >= message.size();
},
kProgressTimeout));
ByteArray result = rx.Read(Pipe::kChunkSize).result();
EXPECT_EQ(result, message);
NEARBY_LOG(INFO, "Packet 1 handled.");
EXPECT_EQ(user_b.CancelPayload(), Status{Status::kSuccess});
NEARBY_LOG(INFO, "Stream canceled on sender side.");
// Sender will only handle cancel event if it is sending.
// Once cancel is handled, write will fail.
int count = 0;
while (true) {
if (!tx.Write(message).Ok()) break;
SystemClock::Sleep(kDefaultTimeout);
count++;
}
ASSERT_LE(count, 10);
EXPECT_TRUE(user_a.WaitForProgress(
[status = PayloadProgressInfo::Status::kCanceled](
const PayloadProgressInfo& info) { return info.status == status; },
kProgressTimeout));
NEARBY_LOG(INFO, "Stream cancelation received.");
tx.Close();
rx.Close();
NEARBY_LOG(INFO, "Test completed.");
user_a.Stop();
user_b.Stop();
env_.Stop();
}
TEST_P(PayloadManagerTest, SendPayloadWithSkip_StreamPayload) {
constexpr size_t kOffset = 3;
env_.Start();
PayloadSimulationUser user_a(kDeviceA, GetParam());
PayloadSimulationUser user_b(kDeviceB, GetParam());
ASSERT_TRUE(SetupConnection(user_a, user_b));
auto pipe = std::make_shared<Pipe>();
OutputStream& tx = pipe->GetOutputStream();
user_a.ExpectPayload(payload_latch_);
const ByteArray message{std::string(kMessage)};
// The first write to the output stream will send the first PAYLOAD_TRANSFER
// packet with payload info and message data.
tx.Write(message);
Payload payload([pipe]() -> InputStream& {
return pipe->GetInputStream(); // NOLINT
});
payload.SetOffset(kOffset);
user_b.SendPayload(std::move(payload));
ASSERT_TRUE(payload_latch_.Await(kDefaultTimeout).result());
ASSERT_NE(user_a.GetPayload().AsStream(), nullptr);
InputStream& rx = *user_a.GetPayload().AsStream();
NEARBY_LOG(INFO, "Stream extracted.");
EXPECT_TRUE(user_a.WaitForProgress(
[&message](const PayloadProgressInfo& info) {
return info.bytes_transferred >= message.size() - kOffset;
},
kProgressTimeout));
ByteArray result = rx.Read(Pipe::kChunkSize).result();
EXPECT_EQ(result, ByteArray("sage"));
NEARBY_LOG(INFO, "Packet 1 handled.");
tx.Write(message);
EXPECT_TRUE(user_a.WaitForProgress(
[&message](const PayloadProgressInfo& info) {
return info.bytes_transferred >= 2 * message.size() - kOffset;
},
kProgressTimeout));
ByteArray result2 = rx.Read(Pipe::kChunkSize).result();
EXPECT_EQ(result2, message);
NEARBY_LOG(INFO, "Packet 2 handled.");
rx.Close();
tx.Close();
NEARBY_LOG(INFO, "Test completed.");
user_a.Stop();
user_b.Stop();
env_.Stop();
}
INSTANTIATE_TEST_SUITE_P(ParametrisedPayloadManagerTest, PayloadManagerTest,
::testing::ValuesIn(kTestCases));
} // namespace
} // namespace connections
} // namespace nearby
} // namespace location