[Cast Streaming] Don't subscribe to write events for UDP sockets

This patch updates the platform implementation to not subscribe
to wait events for UDP sockets. It is part of a larger cleanup
to improve the amount of time spent in loops in the platform
implementation.

Change-Id: Ib09474a1ee05629770acd6b2d7893276bb965e79
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/6539456
Reviewed-by: Muyao Xu <muyaoxu@google.com>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/platform/impl/platform_client_posix.cc b/platform/impl/platform_client_posix.cc
index 831cf51..0b49213 100644
--- a/platform/impl/platform_client_posix.cc
+++ b/platform/impl/platform_client_posix.cc
@@ -8,10 +8,13 @@
 #include <utility>
 #include <vector>
 
+#include "platform/base/trivial_clock_traits.h"
 #include "platform/impl/udp_socket_reader_posix.h"
 
 namespace openscreen {
 
+using clock_operators::operator<<;
+
 // static
 PlatformClientPosix* PlatformClientPosix::instance_ = nullptr;
 
@@ -101,12 +104,16 @@
 }
 
 void PlatformClientPosix::RunNetworkLoopUntilStopped() {
+  auto last = Clock::now();
   while (networking_loop_running_.load()) {
     if (!waiter_created_.load()) {
       std::this_thread::sleep_for(networking_loop_timeout_);
       continue;
     }
     socket_handle_waiter()->ProcessHandles(networking_loop_timeout_);
+    auto now = Clock::now();
+    OSP_LOG_ERROR << __func__ << ": loop took " << (now - last);
+    last = now;
   }
 }
 
diff --git a/platform/impl/socket_handle_waiter.cc b/platform/impl/socket_handle_waiter.cc
index 9994379..c1426f1 100644
--- a/platform/impl/socket_handle_waiter.cc
+++ b/platform/impl/socket_handle_waiter.cc
@@ -16,10 +16,11 @@
     : now_function_(now_function) {}
 
 void SocketHandleWaiter::Subscribe(Subscriber* subscriber,
-                                   SocketHandleRef handle) {
+                                   SocketHandleRef handle,
+                                   uint32_t flags) {
   std::lock_guard<std::mutex> lock(mutex_);
   if (handle_mappings_.find(handle) == handle_mappings_.end()) {
-    handle_mappings_.emplace(handle, SocketSubscription{subscriber});
+    handle_mappings_.emplace(handle, SocketSubscription{subscriber, flags});
   }
 }
 
@@ -105,21 +106,21 @@
 
 Error SocketHandleWaiter::ProcessHandles(Clock::duration timeout) {
   Clock::time_point start_time = now_function_();
-  std::vector<SocketHandleRef> handles;
+  std::vector<ReadyHandle> handles;
   {
     std::lock_guard<std::mutex> lock(mutex_);
     handles_being_deleted_.clear();
     handle_deletion_block_.notify_all();
     handles.reserve(handle_mappings_.size());
     for (const auto& pair : handle_mappings_) {
-      handles.push_back(pair.first);
+      handles.push_back({.handle = pair.first, .flags = pair.second.flags});
     }
   }
 
   Clock::time_point current_time = now_function_();
   Clock::duration remaining_timeout = timeout - (current_time - start_time);
   ErrorOr<std::vector<ReadyHandle>> changed_handles =
-      AwaitSocketsReadable(handles, remaining_timeout);
+      AwaitSocketsReady(handles, remaining_timeout);
 
   std::vector<HandleWithSubscription> ready_handles;
   {
diff --git a/platform/impl/socket_handle_waiter.h b/platform/impl/socket_handle_waiter.h
index bbc0ae9..1cb3470 100644
--- a/platform/impl/socket_handle_waiter.h
+++ b/platform/impl/socket_handle_waiter.h
@@ -27,8 +27,8 @@
   using SocketHandleRef = std::reference_wrapper<const SocketHandle>;
 
   enum Flags {
-    kReadable = 1,
-    kWriteable = 2,
+    kReadable = 1 << 0,
+    kWriteable = 1 << 1,
   };
 
   class Subscriber {
@@ -46,7 +46,9 @@
   // Start notifying `subscriber` whenever `handle` has an event. May be called
   // multiple times, to be notified for multiple handles, but should not be
   // called multiple times for the same handle.
-  void Subscribe(Subscriber* subscriber, SocketHandleRef handle);
+  void Subscribe(Subscriber* subscriber,
+                 SocketHandleRef handle,
+                 uint32_t flags);
 
   // Stop receiving notifications for one of the handles currently subscribed
   // to.
@@ -77,13 +79,15 @@
   // Waits until data is available in one of the provided sockets or the
   // provided timeout has passed - whichever is first. If any sockets have data
   // available, they are returned.
-  virtual ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReadable(
-      const std::vector<SocketHandleRef>& socket_fds,
+  virtual ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReady(
+      const std::vector<ReadyHandle>& sockets,
       const Clock::duration& timeout) = 0;
 
  private:
   struct SocketSubscription {
     Subscriber* subscriber = nullptr;
+    // Subscribers are only informed of flags that they are interested in.
+    uint32_t flags = 0;
     Clock::time_point last_updated = Clock::time_point::min();
   };
 
diff --git a/platform/impl/socket_handle_waiter_posix.cc b/platform/impl/socket_handle_waiter_posix.cc
index 511c55c..2fca5d1 100644
--- a/platform/impl/socket_handle_waiter_posix.cc
+++ b/platform/impl/socket_handle_waiter_posix.cc
@@ -24,8 +24,8 @@
 SocketHandleWaiterPosix::~SocketHandleWaiterPosix() = default;
 
 ErrorOr<std::vector<SocketHandleWaiterPosix::ReadyHandle>>
-SocketHandleWaiterPosix::AwaitSocketsReadable(
-    const std::vector<SocketHandleRef>& socket_handles,
+SocketHandleWaiterPosix::AwaitSocketsReady(
+    const std::vector<SocketHandleWaiterPosix::ReadyHandle>& sockets,
     const Clock::duration& timeout) {
   int max_fd = -1;
   fd_set read_handles{};
@@ -33,10 +33,14 @@
 
   FD_ZERO(&read_handles);
   FD_ZERO(&write_handles);
-  for (const SocketHandle& handle : socket_handles) {
-    FD_SET(handle.fd, &read_handles);
-    FD_SET(handle.fd, &write_handles);
-    max_fd = std::max(max_fd, handle.fd);
+  for (const ReadyHandle& ready : sockets) {
+    if (ready.flags & Flags::kReadable) {
+      FD_SET(ready.handle.get().fd, &read_handles);
+    }
+    if (ready.flags & Flags::kWriteable) {
+      FD_SET(ready.handle.get().fd, &write_handles);
+    }
+    max_fd = std::max(max_fd, ready.handle.get().fd);
   }
   if (max_fd < 0) {
     return Error::Code::kIOFailure;
@@ -62,16 +66,16 @@
   }
 
   std::vector<ReadyHandle> changed_handles;
-  for (const SocketHandleRef& handle : socket_handles) {
+  for (const ReadyHandle& ready : sockets) {
     uint32_t flags = 0;
-    if (FD_ISSET(handle.get().fd, &read_handles)) {
+    if (FD_ISSET(ready.handle.get().fd, &read_handles)) {
       flags |= Flags::kReadable;
     }
-    if (FD_ISSET(handle.get().fd, &write_handles)) {
+    if (FD_ISSET(ready.handle.get().fd, &write_handles)) {
       flags |= Flags::kWriteable;
     }
     if (flags) {
-      changed_handles.push_back({handle, flags});
+      changed_handles.push_back({ready.handle, flags});
     }
   }
 
diff --git a/platform/impl/socket_handle_waiter_posix.h b/platform/impl/socket_handle_waiter_posix.h
index 148df5c..97d52d9 100644
--- a/platform/impl/socket_handle_waiter_posix.h
+++ b/platform/impl/socket_handle_waiter_posix.h
@@ -32,8 +32,8 @@
  protected:
   using SocketHandleWaiter::ReadyHandle;
 
-  ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReadable(
-      const std::vector<SocketHandleRef>& socket_fds,
+  ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReady(
+      const std::vector<ReadyHandle>& sockets,
       const Clock::duration& timeout) override;
 
  private:
diff --git a/platform/impl/socket_handle_waiter_posix_unittest.cc b/platform/impl/socket_handle_waiter_posix_unittest.cc
index e17b45b..3e4e3ad 100644
--- a/platform/impl/socket_handle_waiter_posix_unittest.cc
+++ b/platform/impl/socket_handle_waiter_posix_unittest.cc
@@ -37,8 +37,8 @@
   TestingSocketHandleWaiter() : SocketHandleWaiter(&FakeClock::now) {}
 
   MOCK_METHOD2(
-      AwaitSocketsReadable,
-      ErrorOr<std::vector<ReadyHandle>>(const std::vector<SocketHandleRef>&,
+      AwaitSocketsReady,
+      ErrorOr<std::vector<ReadyHandle>>(const std::vector<ReadyHandle>&,
                                         const Clock::duration&));
 
   FakeClock fake_clock{Clock::time_point{Clock::duration{1234567}}};
@@ -46,7 +46,7 @@
 
 }  // namespace
 
-TEST(SocketHandleWaiterTest, BubblesUpAwaitSocketsReadableErrors) {
+TEST(SocketHandleWaiterTest, BubblesUpAwaitSocketsReadyErrors) {
   MockSubscriber subscriber;
   TestingSocketHandleWaiter waiter;
   SocketHandle handle0(0);
@@ -55,13 +55,15 @@
   const SocketHandle& handle0_ref = handle0;
   const SocketHandle& handle1_ref = handle1;
   const SocketHandle& handle2_ref = handle2;
+  constexpr uint32_t rw_flags = SocketHandleWaiter::Flags::kReadable |
+                                SocketHandleWaiter::Flags::kWriteable;
 
-  waiter.Subscribe(&subscriber, std::cref(handle0_ref));
-  waiter.Subscribe(&subscriber, std::cref(handle1_ref));
-  waiter.Subscribe(&subscriber, std::cref(handle2_ref));
+  waiter.Subscribe(&subscriber, std::cref(handle0_ref), rw_flags);
+  waiter.Subscribe(&subscriber, std::cref(handle1_ref), rw_flags);
+  waiter.Subscribe(&subscriber, std::cref(handle2_ref), rw_flags);
   Error::Code response = Error::Code::kAgain;
   EXPECT_CALL(subscriber, ProcessReadyHandle(_, _)).Times(0);
-  EXPECT_CALL(waiter, AwaitSocketsReadable(_, _))
+  EXPECT_CALL(waiter, AwaitSocketsReady(_, _))
       .WillOnce(Return(ByMove(response)));
   waiter.ProcessHandles(Clock::duration{0});
 }
@@ -79,14 +81,16 @@
   const SocketHandle& handle2_ref = handle2;
   const SocketHandle& handle3_ref = handle3;
 
-  waiter.Subscribe(&subscriber, std::cref(handle0_ref));
-  waiter.Subscribe(&subscriber, std::cref(handle2_ref));
-  waiter.Subscribe(&subscriber2, std::cref(handle1_ref));
-  waiter.Subscribe(&subscriber2, std::cref(handle3_ref));
   constexpr uint32_t r_flags = SocketHandleWaiter::Flags::kReadable;
   constexpr uint32_t w_flags = SocketHandleWaiter::Flags::kWriteable;
   constexpr uint32_t rw_flags = SocketHandleWaiter::Flags::kReadable |
                                 SocketHandleWaiter::Flags::kWriteable;
+
+  waiter.Subscribe(&subscriber, std::cref(handle0_ref), rw_flags);
+  waiter.Subscribe(&subscriber, std::cref(handle2_ref), rw_flags);
+  waiter.Subscribe(&subscriber2, std::cref(handle1_ref), rw_flags);
+  waiter.Subscribe(&subscriber2, std::cref(handle3_ref), rw_flags);
+
   EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle0_ref), r_flags))
       .Times(1);
   EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle2_ref), w_flags))
@@ -95,7 +99,7 @@
       .Times(1);
   EXPECT_CALL(subscriber2, ProcessReadyHandle(std::cref(handle3_ref), rw_flags))
       .Times(1);
-  EXPECT_CALL(waiter, AwaitSocketsReadable(_, _))
+  EXPECT_CALL(waiter, AwaitSocketsReady(_, _))
       .WillOnce(
           Return(ByMove(std::vector<TestingSocketHandleWaiter::ReadyHandle>{
               {std::cref(handle0_ref), r_flags},
diff --git a/platform/impl/tls_data_router_posix.cc b/platform/impl/tls_data_router_posix.cc
index 2d94025..a94d3cf 100644
--- a/platform/impl/tls_data_router_posix.cc
+++ b/platform/impl/tls_data_router_posix.cc
@@ -30,7 +30,10 @@
     connections_.push_back(connection);
   }
 
-  waiter_->Subscribe(this, connection->socket_handle());
+  // We care about both read and write events
+  waiter_->Subscribe(this, connection->socket_handle(),
+                     SocketHandleWaiter::Flags::kReadable |
+                         SocketHandleWaiter::Flags::kWriteable);
 }
 
 void TlsDataRouterPosix::DeregisterConnection(TlsConnectionPosix* connection) {
@@ -60,7 +63,10 @@
     accept_socket_mappings_[socket_ptr] = observer;
   }
 
-  waiter_->Subscribe(this, socket_ptr->socket_handle());
+  // We care about both read and write events
+  waiter_->Subscribe(this, socket_ptr->socket_handle(),
+                     SocketHandleWaiter::Flags::kReadable |
+                         SocketHandleWaiter::Flags::kWriteable);
 }
 
 void TlsDataRouterPosix::DeregisterAcceptObserver(SocketObserver* observer) {
diff --git a/platform/impl/tls_data_router_posix_unittest.cc b/platform/impl/tls_data_router_posix_unittest.cc
index c74b000..41605a8 100644
--- a/platform/impl/tls_data_router_posix_unittest.cc
+++ b/platform/impl/tls_data_router_posix_unittest.cc
@@ -25,8 +25,8 @@
   MockNetworkWaiter() : SocketHandleWaiter(&FakeClock::now) {}
 
   MOCK_METHOD2(
-      AwaitSocketsReadable,
-      ErrorOr<std::vector<ReadyHandle>>(const std::vector<SocketHandleRef>&,
+      AwaitSocketsReady,
+      ErrorOr<std::vector<ReadyHandle>>(const std::vector<ReadyHandle>&,
                                         const Clock::duration&));
 };
 
diff --git a/platform/impl/udp_socket_reader_posix.cc b/platform/impl/udp_socket_reader_posix.cc
index de0ad90..1048068 100644
--- a/platform/impl/udp_socket_reader_posix.cc
+++ b/platform/impl/udp_socket_reader_posix.cc
@@ -42,7 +42,9 @@
     std::lock_guard<std::mutex> lock(mutex_);
     sockets_.push_back(read_socket);
   }
-  waiter_.Subscribe(this, std::cref(read_socket->GetHandle()));
+  // We only care about read events.
+  waiter_.Subscribe(this, std::cref(read_socket->GetHandle()),
+                    SocketHandleWaiter::Flags::kReadable);
 }
 
 void UdpSocketReaderPosix::OnDestroy(UdpSocket* socket) {
diff --git a/platform/impl/udp_socket_reader_posix_unittest.cc b/platform/impl/udp_socket_reader_posix_unittest.cc
index 54c6488..6e3a60f 100644
--- a/platform/impl/udp_socket_reader_posix_unittest.cc
+++ b/platform/impl/udp_socket_reader_posix_unittest.cc
@@ -52,8 +52,8 @@
   ~MockNetworkWaiter() override = default;
 
   MOCK_METHOD2(
-      AwaitSocketsReadable,
-      ErrorOr<std::vector<ReadyHandle>>(const std::vector<SocketHandleRef>&,
+      AwaitSocketsReady,
+      ErrorOr<std::vector<ReadyHandle>>(const std::vector<ReadyHandle>&,
                                         const Clock::duration&));
 
   FakeClock fake_clock{Clock::time_point{Clock::duration{1234567}}};