[Cast Streaming] Don't subscribe to write events for UDP sockets
This patch updates the platform implementation to not subscribe
to wait events for UDP sockets. It is part of a larger cleanup
to improve the amount of time spent in loops in the platform
implementation.
Change-Id: Ib09474a1ee05629770acd6b2d7893276bb965e79
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/6539456
Reviewed-by: Muyao Xu <muyaoxu@google.com>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/platform/impl/platform_client_posix.cc b/platform/impl/platform_client_posix.cc
index 831cf51..0b49213 100644
--- a/platform/impl/platform_client_posix.cc
+++ b/platform/impl/platform_client_posix.cc
@@ -8,10 +8,13 @@
#include <utility>
#include <vector>
+#include "platform/base/trivial_clock_traits.h"
#include "platform/impl/udp_socket_reader_posix.h"
namespace openscreen {
+using clock_operators::operator<<;
+
// static
PlatformClientPosix* PlatformClientPosix::instance_ = nullptr;
@@ -101,12 +104,16 @@
}
void PlatformClientPosix::RunNetworkLoopUntilStopped() {
+ auto last = Clock::now();
while (networking_loop_running_.load()) {
if (!waiter_created_.load()) {
std::this_thread::sleep_for(networking_loop_timeout_);
continue;
}
socket_handle_waiter()->ProcessHandles(networking_loop_timeout_);
+ auto now = Clock::now();
+ OSP_LOG_ERROR << __func__ << ": loop took " << (now - last);
+ last = now;
}
}
diff --git a/platform/impl/socket_handle_waiter.cc b/platform/impl/socket_handle_waiter.cc
index 9994379..c1426f1 100644
--- a/platform/impl/socket_handle_waiter.cc
+++ b/platform/impl/socket_handle_waiter.cc
@@ -16,10 +16,11 @@
: now_function_(now_function) {}
void SocketHandleWaiter::Subscribe(Subscriber* subscriber,
- SocketHandleRef handle) {
+ SocketHandleRef handle,
+ uint32_t flags) {
std::lock_guard<std::mutex> lock(mutex_);
if (handle_mappings_.find(handle) == handle_mappings_.end()) {
- handle_mappings_.emplace(handle, SocketSubscription{subscriber});
+ handle_mappings_.emplace(handle, SocketSubscription{subscriber, flags});
}
}
@@ -105,21 +106,21 @@
Error SocketHandleWaiter::ProcessHandles(Clock::duration timeout) {
Clock::time_point start_time = now_function_();
- std::vector<SocketHandleRef> handles;
+ std::vector<ReadyHandle> handles;
{
std::lock_guard<std::mutex> lock(mutex_);
handles_being_deleted_.clear();
handle_deletion_block_.notify_all();
handles.reserve(handle_mappings_.size());
for (const auto& pair : handle_mappings_) {
- handles.push_back(pair.first);
+ handles.push_back({.handle = pair.first, .flags = pair.second.flags});
}
}
Clock::time_point current_time = now_function_();
Clock::duration remaining_timeout = timeout - (current_time - start_time);
ErrorOr<std::vector<ReadyHandle>> changed_handles =
- AwaitSocketsReadable(handles, remaining_timeout);
+ AwaitSocketsReady(handles, remaining_timeout);
std::vector<HandleWithSubscription> ready_handles;
{
diff --git a/platform/impl/socket_handle_waiter.h b/platform/impl/socket_handle_waiter.h
index bbc0ae9..1cb3470 100644
--- a/platform/impl/socket_handle_waiter.h
+++ b/platform/impl/socket_handle_waiter.h
@@ -27,8 +27,8 @@
using SocketHandleRef = std::reference_wrapper<const SocketHandle>;
enum Flags {
- kReadable = 1,
- kWriteable = 2,
+ kReadable = 1 << 0,
+ kWriteable = 1 << 1,
};
class Subscriber {
@@ -46,7 +46,9 @@
// Start notifying `subscriber` whenever `handle` has an event. May be called
// multiple times, to be notified for multiple handles, but should not be
// called multiple times for the same handle.
- void Subscribe(Subscriber* subscriber, SocketHandleRef handle);
+ void Subscribe(Subscriber* subscriber,
+ SocketHandleRef handle,
+ uint32_t flags);
// Stop receiving notifications for one of the handles currently subscribed
// to.
@@ -77,13 +79,15 @@
// Waits until data is available in one of the provided sockets or the
// provided timeout has passed - whichever is first. If any sockets have data
// available, they are returned.
- virtual ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReadable(
- const std::vector<SocketHandleRef>& socket_fds,
+ virtual ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReady(
+ const std::vector<ReadyHandle>& sockets,
const Clock::duration& timeout) = 0;
private:
struct SocketSubscription {
Subscriber* subscriber = nullptr;
+ // Subscribers are only informed of flags that they are interested in.
+ uint32_t flags = 0;
Clock::time_point last_updated = Clock::time_point::min();
};
diff --git a/platform/impl/socket_handle_waiter_posix.cc b/platform/impl/socket_handle_waiter_posix.cc
index 511c55c..2fca5d1 100644
--- a/platform/impl/socket_handle_waiter_posix.cc
+++ b/platform/impl/socket_handle_waiter_posix.cc
@@ -24,8 +24,8 @@
SocketHandleWaiterPosix::~SocketHandleWaiterPosix() = default;
ErrorOr<std::vector<SocketHandleWaiterPosix::ReadyHandle>>
-SocketHandleWaiterPosix::AwaitSocketsReadable(
- const std::vector<SocketHandleRef>& socket_handles,
+SocketHandleWaiterPosix::AwaitSocketsReady(
+ const std::vector<SocketHandleWaiterPosix::ReadyHandle>& sockets,
const Clock::duration& timeout) {
int max_fd = -1;
fd_set read_handles{};
@@ -33,10 +33,14 @@
FD_ZERO(&read_handles);
FD_ZERO(&write_handles);
- for (const SocketHandle& handle : socket_handles) {
- FD_SET(handle.fd, &read_handles);
- FD_SET(handle.fd, &write_handles);
- max_fd = std::max(max_fd, handle.fd);
+ for (const ReadyHandle& ready : sockets) {
+ if (ready.flags & Flags::kReadable) {
+ FD_SET(ready.handle.get().fd, &read_handles);
+ }
+ if (ready.flags & Flags::kWriteable) {
+ FD_SET(ready.handle.get().fd, &write_handles);
+ }
+ max_fd = std::max(max_fd, ready.handle.get().fd);
}
if (max_fd < 0) {
return Error::Code::kIOFailure;
@@ -62,16 +66,16 @@
}
std::vector<ReadyHandle> changed_handles;
- for (const SocketHandleRef& handle : socket_handles) {
+ for (const ReadyHandle& ready : sockets) {
uint32_t flags = 0;
- if (FD_ISSET(handle.get().fd, &read_handles)) {
+ if (FD_ISSET(ready.handle.get().fd, &read_handles)) {
flags |= Flags::kReadable;
}
- if (FD_ISSET(handle.get().fd, &write_handles)) {
+ if (FD_ISSET(ready.handle.get().fd, &write_handles)) {
flags |= Flags::kWriteable;
}
if (flags) {
- changed_handles.push_back({handle, flags});
+ changed_handles.push_back({ready.handle, flags});
}
}
diff --git a/platform/impl/socket_handle_waiter_posix.h b/platform/impl/socket_handle_waiter_posix.h
index 148df5c..97d52d9 100644
--- a/platform/impl/socket_handle_waiter_posix.h
+++ b/platform/impl/socket_handle_waiter_posix.h
@@ -32,8 +32,8 @@
protected:
using SocketHandleWaiter::ReadyHandle;
- ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReadable(
- const std::vector<SocketHandleRef>& socket_fds,
+ ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReady(
+ const std::vector<ReadyHandle>& sockets,
const Clock::duration& timeout) override;
private:
diff --git a/platform/impl/socket_handle_waiter_posix_unittest.cc b/platform/impl/socket_handle_waiter_posix_unittest.cc
index e17b45b..3e4e3ad 100644
--- a/platform/impl/socket_handle_waiter_posix_unittest.cc
+++ b/platform/impl/socket_handle_waiter_posix_unittest.cc
@@ -37,8 +37,8 @@
TestingSocketHandleWaiter() : SocketHandleWaiter(&FakeClock::now) {}
MOCK_METHOD2(
- AwaitSocketsReadable,
- ErrorOr<std::vector<ReadyHandle>>(const std::vector<SocketHandleRef>&,
+ AwaitSocketsReady,
+ ErrorOr<std::vector<ReadyHandle>>(const std::vector<ReadyHandle>&,
const Clock::duration&));
FakeClock fake_clock{Clock::time_point{Clock::duration{1234567}}};
@@ -46,7 +46,7 @@
} // namespace
-TEST(SocketHandleWaiterTest, BubblesUpAwaitSocketsReadableErrors) {
+TEST(SocketHandleWaiterTest, BubblesUpAwaitSocketsReadyErrors) {
MockSubscriber subscriber;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0);
@@ -55,13 +55,15 @@
const SocketHandle& handle0_ref = handle0;
const SocketHandle& handle1_ref = handle1;
const SocketHandle& handle2_ref = handle2;
+ constexpr uint32_t rw_flags = SocketHandleWaiter::Flags::kReadable |
+ SocketHandleWaiter::Flags::kWriteable;
- waiter.Subscribe(&subscriber, std::cref(handle0_ref));
- waiter.Subscribe(&subscriber, std::cref(handle1_ref));
- waiter.Subscribe(&subscriber, std::cref(handle2_ref));
+ waiter.Subscribe(&subscriber, std::cref(handle0_ref), rw_flags);
+ waiter.Subscribe(&subscriber, std::cref(handle1_ref), rw_flags);
+ waiter.Subscribe(&subscriber, std::cref(handle2_ref), rw_flags);
Error::Code response = Error::Code::kAgain;
EXPECT_CALL(subscriber, ProcessReadyHandle(_, _)).Times(0);
- EXPECT_CALL(waiter, AwaitSocketsReadable(_, _))
+ EXPECT_CALL(waiter, AwaitSocketsReady(_, _))
.WillOnce(Return(ByMove(response)));
waiter.ProcessHandles(Clock::duration{0});
}
@@ -79,14 +81,16 @@
const SocketHandle& handle2_ref = handle2;
const SocketHandle& handle3_ref = handle3;
- waiter.Subscribe(&subscriber, std::cref(handle0_ref));
- waiter.Subscribe(&subscriber, std::cref(handle2_ref));
- waiter.Subscribe(&subscriber2, std::cref(handle1_ref));
- waiter.Subscribe(&subscriber2, std::cref(handle3_ref));
constexpr uint32_t r_flags = SocketHandleWaiter::Flags::kReadable;
constexpr uint32_t w_flags = SocketHandleWaiter::Flags::kWriteable;
constexpr uint32_t rw_flags = SocketHandleWaiter::Flags::kReadable |
SocketHandleWaiter::Flags::kWriteable;
+
+ waiter.Subscribe(&subscriber, std::cref(handle0_ref), rw_flags);
+ waiter.Subscribe(&subscriber, std::cref(handle2_ref), rw_flags);
+ waiter.Subscribe(&subscriber2, std::cref(handle1_ref), rw_flags);
+ waiter.Subscribe(&subscriber2, std::cref(handle3_ref), rw_flags);
+
EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle0_ref), r_flags))
.Times(1);
EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle2_ref), w_flags))
@@ -95,7 +99,7 @@
.Times(1);
EXPECT_CALL(subscriber2, ProcessReadyHandle(std::cref(handle3_ref), rw_flags))
.Times(1);
- EXPECT_CALL(waiter, AwaitSocketsReadable(_, _))
+ EXPECT_CALL(waiter, AwaitSocketsReady(_, _))
.WillOnce(
Return(ByMove(std::vector<TestingSocketHandleWaiter::ReadyHandle>{
{std::cref(handle0_ref), r_flags},
diff --git a/platform/impl/tls_data_router_posix.cc b/platform/impl/tls_data_router_posix.cc
index 2d94025..a94d3cf 100644
--- a/platform/impl/tls_data_router_posix.cc
+++ b/platform/impl/tls_data_router_posix.cc
@@ -30,7 +30,10 @@
connections_.push_back(connection);
}
- waiter_->Subscribe(this, connection->socket_handle());
+ // We care about both read and write events
+ waiter_->Subscribe(this, connection->socket_handle(),
+ SocketHandleWaiter::Flags::kReadable |
+ SocketHandleWaiter::Flags::kWriteable);
}
void TlsDataRouterPosix::DeregisterConnection(TlsConnectionPosix* connection) {
@@ -60,7 +63,10 @@
accept_socket_mappings_[socket_ptr] = observer;
}
- waiter_->Subscribe(this, socket_ptr->socket_handle());
+ // We care about both read and write events
+ waiter_->Subscribe(this, socket_ptr->socket_handle(),
+ SocketHandleWaiter::Flags::kReadable |
+ SocketHandleWaiter::Flags::kWriteable);
}
void TlsDataRouterPosix::DeregisterAcceptObserver(SocketObserver* observer) {
diff --git a/platform/impl/tls_data_router_posix_unittest.cc b/platform/impl/tls_data_router_posix_unittest.cc
index c74b000..41605a8 100644
--- a/platform/impl/tls_data_router_posix_unittest.cc
+++ b/platform/impl/tls_data_router_posix_unittest.cc
@@ -25,8 +25,8 @@
MockNetworkWaiter() : SocketHandleWaiter(&FakeClock::now) {}
MOCK_METHOD2(
- AwaitSocketsReadable,
- ErrorOr<std::vector<ReadyHandle>>(const std::vector<SocketHandleRef>&,
+ AwaitSocketsReady,
+ ErrorOr<std::vector<ReadyHandle>>(const std::vector<ReadyHandle>&,
const Clock::duration&));
};
diff --git a/platform/impl/udp_socket_reader_posix.cc b/platform/impl/udp_socket_reader_posix.cc
index de0ad90..1048068 100644
--- a/platform/impl/udp_socket_reader_posix.cc
+++ b/platform/impl/udp_socket_reader_posix.cc
@@ -42,7 +42,9 @@
std::lock_guard<std::mutex> lock(mutex_);
sockets_.push_back(read_socket);
}
- waiter_.Subscribe(this, std::cref(read_socket->GetHandle()));
+ // We only care about read events.
+ waiter_.Subscribe(this, std::cref(read_socket->GetHandle()),
+ SocketHandleWaiter::Flags::kReadable);
}
void UdpSocketReaderPosix::OnDestroy(UdpSocket* socket) {
diff --git a/platform/impl/udp_socket_reader_posix_unittest.cc b/platform/impl/udp_socket_reader_posix_unittest.cc
index 54c6488..6e3a60f 100644
--- a/platform/impl/udp_socket_reader_posix_unittest.cc
+++ b/platform/impl/udp_socket_reader_posix_unittest.cc
@@ -52,8 +52,8 @@
~MockNetworkWaiter() override = default;
MOCK_METHOD2(
- AwaitSocketsReadable,
- ErrorOr<std::vector<ReadyHandle>>(const std::vector<SocketHandleRef>&,
+ AwaitSocketsReady,
+ ErrorOr<std::vector<ReadyHandle>>(const std::vector<ReadyHandle>&,
const Clock::duration&));
FakeClock fake_clock{Clock::time_point{Clock::duration{1234567}}};