| // Copyright 2018 The Chromium Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "osp/impl/presentation/url_availability_requester.h" |
| |
| #include <algorithm> |
| #include <chrono> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "osp/impl/presentation/presentation_utils.h" |
| #include "osp/public/network_service_manager.h" |
| #include "platform/base/span.h" |
| #include "util/chrono_helpers.h" |
| #include "util/osp_logging.h" |
| |
| using std::chrono::seconds; |
| |
| namespace openscreen::osp { |
| namespace { |
| |
| constexpr Clock::duration kWatchDuration = seconds(20); |
| constexpr Clock::duration kWatchRefreshPadding = seconds(2); |
| |
| std::vector<std::string>::iterator PartitionUrlsBySetMembership( |
| std::vector<std::string>* urls, |
| const std::set<std::string>& membership_test) { |
| return std::partition( |
| urls->begin(), urls->end(), [&membership_test](const std::string& url) { |
| return membership_test.find(url) == membership_test.end(); |
| }); |
| } |
| |
| void MoveVectorSegment(std::vector<std::string>::iterator first, |
| std::vector<std::string>::iterator last, |
| std::set<std::string>* target) { |
| for (auto it = first; it != last; ++it) { |
| target->emplace(std::move(*it)); |
| } |
| } |
| |
| uint64_t GetNextRequestId(const uint64_t instance_id) { |
| return NetworkServiceManager::Get() |
| ->GetProtocolConnectionClient() |
| ->GetInstanceRequestIds() |
| .GetNextRequestId(instance_id); |
| } |
| |
| } // namespace |
| |
| UrlAvailabilityRequester::UrlAvailabilityRequester( |
| ClockNowFunctionPtr now_function) |
| : now_function_(now_function) { |
| OSP_CHECK(now_function_); |
| } |
| |
| UrlAvailabilityRequester::~UrlAvailabilityRequester() = default; |
| |
| void UrlAvailabilityRequester::CreateReceiverRequester( |
| std::string_view instance_name, |
| uint64_t instance_id) { |
| receiver_by_instance_name_.emplace( |
| instance_name, |
| std::make_unique<ReceiverRequester>(*this, instance_name, instance_id)); |
| } |
| |
| void UrlAvailabilityRequester::AddObserver(const std::vector<std::string>& urls, |
| ReceiverObserver* observer) { |
| for (const auto& url : urls) { |
| observers_by_url_[url].push_back(observer); |
| } |
| |
| for (auto& entry : receiver_by_instance_name_) { |
| auto& receiver = entry.second; |
| receiver->GetOrRequestAvailabilities(urls, observer); |
| } |
| } |
| |
| void UrlAvailabilityRequester::RemoveObserverUrls( |
| const std::vector<std::string>& urls, |
| ReceiverObserver* observer) { |
| std::set<std::string> unobserved_urls; |
| for (const auto& url : urls) { |
| auto observer_entry = observers_by_url_.find(url); |
| if (observer_entry == observers_by_url_.end()) { |
| continue; |
| } |
| |
| auto& observers = observer_entry->second; |
| observers.erase(std::remove(observers.begin(), observers.end(), observer), |
| observers.end()); |
| if (observers.empty()) { |
| unobserved_urls.emplace(std::move(observer_entry->first)); |
| observers_by_url_.erase(observer_entry); |
| for (auto& entry : receiver_by_instance_name_) { |
| auto& receiver = entry.second; |
| receiver->known_availability_by_url().erase(url); |
| } |
| } |
| } |
| |
| for (auto& entry : receiver_by_instance_name_) { |
| auto& receiver = entry.second; |
| receiver->RemoveUnobservedRequests(unobserved_urls); |
| receiver->RemoveUnobservedWatches(unobserved_urls); |
| } |
| } |
| |
| void UrlAvailabilityRequester::RemoveObserver(ReceiverObserver* observer) { |
| std::set<std::string> unobserved_urls; |
| for (auto& entry : observers_by_url_) { |
| auto& observer_list = entry.second; |
| auto it = std::remove(observer_list.begin(), observer_list.end(), observer); |
| if (it != observer_list.end()) { |
| observer_list.erase(it); |
| if (observer_list.empty()) { |
| unobserved_urls.insert(entry.first); |
| } |
| } |
| } |
| |
| for (auto& entry : receiver_by_instance_name_) { |
| auto& receiver = entry.second; |
| receiver->RemoveUnobservedRequests(unobserved_urls); |
| receiver->RemoveUnobservedWatches(unobserved_urls); |
| } |
| } |
| |
| void UrlAvailabilityRequester::AddReceiver(const ServiceInfo& info) {} |
| void UrlAvailabilityRequester::ChangeReceiver(const ServiceInfo& info) {} |
| |
| void UrlAvailabilityRequester::RemoveReceiver(const ServiceInfo& info) { |
| auto receiver_entry = receiver_by_instance_name_.find(info.instance_name); |
| if (receiver_entry != receiver_by_instance_name_.end()) { |
| auto& receiver = receiver_entry->second; |
| receiver->RemoveReceiver(); |
| receiver_by_instance_name_.erase(receiver_entry); |
| } |
| } |
| |
| void UrlAvailabilityRequester::RemoveAllReceivers() { |
| for (auto& entry : receiver_by_instance_name_) { |
| auto& receiver = entry.second; |
| receiver->RemoveReceiver(); |
| } |
| receiver_by_instance_name_.clear(); |
| } |
| |
| Clock::time_point UrlAvailabilityRequester::RefreshWatches() { |
| const Clock::time_point now = now_function_(); |
| Clock::time_point minimum_schedule_time = now + kWatchDuration; |
| for (auto& entry : receiver_by_instance_name_) { |
| auto& receiver = entry.second; |
| const Clock::time_point requested_schedule_time = |
| receiver->RefreshWatches(now); |
| if (requested_schedule_time < minimum_schedule_time) { |
| minimum_schedule_time = requested_schedule_time; |
| } |
| } |
| return minimum_schedule_time; |
| } |
| |
| UrlAvailabilityRequester::ReceiverRequester::ReceiverRequester( |
| UrlAvailabilityRequester& listener, |
| std::string_view instance_name, |
| uint64_t instance_id) |
| : listener_(listener), |
| instance_name_(instance_name), |
| instance_id_(instance_id) { |
| connection_ = CreateClientProtocolConnection(instance_id); |
| if (!connection_) { |
| OSP_LOG_WARN << "There is no valid underlying connection."; |
| } |
| } |
| |
| UrlAvailabilityRequester::ReceiverRequester::~ReceiverRequester() = default; |
| |
| void UrlAvailabilityRequester::ReceiverRequester::GetOrRequestAvailabilities( |
| const std::vector<std::string>& requested_urls, |
| ReceiverObserver* observer) { |
| std::vector<std::string> unknown_urls; |
| for (const auto& url : requested_urls) { |
| auto availability_entry = known_availability_by_url_.find(url); |
| if (availability_entry == known_availability_by_url_.end()) { |
| unknown_urls.emplace_back(url); |
| continue; |
| } |
| |
| msgs::UrlAvailability availability = availability_entry->second; |
| if (observer) { |
| switch (availability) { |
| case msgs::UrlAvailability::kAvailable: { |
| observer->OnReceiverAvailable(url, instance_name_); |
| break; |
| } |
| |
| case msgs::UrlAvailability::kUnavailable: // fallthrough |
| case msgs::UrlAvailability::kInvalid: { |
| observer->OnReceiverUnavailable(url, instance_name_); |
| break; |
| } |
| } |
| } |
| } |
| |
| if (!unknown_urls.empty()) { |
| RequestUrlAvailabilities(std::move(unknown_urls)); |
| } |
| } |
| |
| void UrlAvailabilityRequester::ReceiverRequester::RequestUrlAvailabilities( |
| std::vector<std::string> urls) { |
| if (urls.empty()) { |
| return; |
| } |
| |
| const uint64_t request_id = GetNextRequestId(instance_id_); |
| ErrorOr<uint64_t> watch_id_or_error = SendRequest(request_id, urls); |
| if (watch_id_or_error) { |
| request_by_id_.emplace(request_id, |
| Request{watch_id_or_error.value(), std::move(urls)}); |
| } else { |
| for (const auto& url : urls) { |
| for (auto& observer : listener_.observers_by_url_[url]) { |
| observer->OnRequestFailed(url, instance_name_); |
| } |
| } |
| } |
| } |
| |
| ErrorOr<uint64_t> UrlAvailabilityRequester::ReceiverRequester::SendRequest( |
| uint64_t request_id, |
| const std::vector<std::string>& urls) { |
| if (!connection_) { |
| return Error::Code::kNoActiveConnection; |
| } |
| |
| uint64_t watch_id = next_watch_id_++; |
| msgs::PresentationUrlAvailabilityRequest cbor_request = { |
| .request_id = request_id, |
| .urls = urls, |
| .watch_duration = to_microseconds(kWatchDuration).count(), |
| .watch_id = watch_id}; |
| |
| msgs::CborEncodeBuffer buffer; |
| if (msgs::EncodePresentationUrlAvailabilityRequest(cbor_request, &buffer)) { |
| OSP_VLOG << "writing presentation-url-availability-request"; |
| connection_->Write(ByteView(buffer.data(), buffer.size())); |
| watch_by_id_.emplace( |
| watch_id, Watch{listener_.now_function_() + kWatchDuration, urls}); |
| if (!event_watch_) { |
| event_watch_ = GetClientDemuxer().WatchMessageType( |
| instance_id_, msgs::Type::kPresentationUrlAvailabilityEvent, this); |
| } |
| |
| if (!response_watch_) { |
| response_watch_ = GetClientDemuxer().WatchMessageType( |
| instance_id_, msgs::Type::kPresentationUrlAvailabilityResponse, this); |
| } |
| return watch_id; |
| } |
| return Error::Code::kCborEncoding; |
| } |
| |
| Clock::time_point UrlAvailabilityRequester::ReceiverRequester::RefreshWatches( |
| Clock::time_point now) { |
| Clock::time_point minimum_schedule_time = now + kWatchDuration; |
| std::vector<std::vector<std::string>> new_requests; |
| for (auto entry = watch_by_id_.begin(); entry != watch_by_id_.end();) { |
| Watch& watch = entry->second; |
| const Clock::time_point buffered_deadline = |
| watch.deadline - kWatchRefreshPadding; |
| if (now > buffered_deadline) { |
| new_requests.emplace_back(std::move(watch.urls)); |
| entry = watch_by_id_.erase(entry); |
| } else { |
| ++entry; |
| if (buffered_deadline < minimum_schedule_time) { |
| minimum_schedule_time = buffered_deadline; |
| } |
| } |
| } |
| |
| if (watch_by_id_.empty()) { |
| event_watch_.Reset(); |
| } |
| |
| for (auto& request : new_requests) { |
| RequestUrlAvailabilities(std::move(request)); |
| } |
| |
| return minimum_schedule_time; |
| } |
| |
| Error::Code UrlAvailabilityRequester::ReceiverRequester::UpdateAvailabilities( |
| const std::vector<std::string>& urls, |
| const std::vector<msgs::UrlAvailability>& availabilities) { |
| auto availability_it = availabilities.begin(); |
| if (urls.size() != availabilities.size()) { |
| return Error::Code::kCborInvalidMessage; |
| } |
| |
| for (const auto& url : urls) { |
| auto observer_entry = listener_.observers_by_url_.find(url); |
| if (observer_entry == listener_.observers_by_url_.end()) { |
| continue; |
| } |
| |
| std::vector<ReceiverObserver*>& observers = observer_entry->second; |
| auto result = known_availability_by_url_.emplace(url, *availability_it); |
| auto entry = result.first; |
| bool inserted = result.second; |
| bool updated = (entry->second != *availability_it); |
| if (inserted || updated) { |
| switch (*availability_it) { |
| case msgs::UrlAvailability::kAvailable: { |
| for (auto* observer : observers) { |
| observer->OnReceiverAvailable(url, instance_name_); |
| } |
| break; |
| } |
| |
| case msgs::UrlAvailability::kUnavailable: // fallthrough |
| case msgs::UrlAvailability::kInvalid: { |
| for (auto* observer : observers) { |
| observer->OnReceiverUnavailable(url, instance_name_); |
| } |
| break; |
| } |
| |
| default: |
| break; |
| } |
| } |
| ++availability_it; |
| } |
| return Error::Code::kNone; |
| } |
| |
| void UrlAvailabilityRequester::ReceiverRequester::RemoveUnobservedRequests( |
| const std::set<std::string>& unobserved_urls) { |
| std::map<uint64_t, Request> new_requests; |
| std::set<std::string> still_observed_urls; |
| for (auto entry = request_by_id_.begin(); entry != request_by_id_.end(); |
| ++entry) { |
| Request& request = entry->second; |
| auto split = PartitionUrlsBySetMembership(&request.urls, unobserved_urls); |
| if (split == request.urls.end()) { |
| continue; |
| } |
| |
| MoveVectorSegment(request.urls.begin(), split, &still_observed_urls); |
| if (connection_) { |
| watch_by_id_.erase(request.watch_id); |
| } |
| } |
| |
| if (!still_observed_urls.empty()) { |
| std::vector<std::string> urls; |
| urls.reserve(still_observed_urls.size()); |
| for (auto& url : still_observed_urls) { |
| urls.emplace_back(std::move(url)); |
| } |
| |
| const uint64_t new_request_id = GetNextRequestId(instance_id_); |
| ErrorOr<uint64_t> watch_id_or_error = SendRequest(new_request_id, urls); |
| if (watch_id_or_error) { |
| new_requests.emplace(new_request_id, |
| Request{watch_id_or_error.value(), std::move(urls)}); |
| } else { |
| for (const auto& url : urls) { |
| for (auto& observer : listener_.observers_by_url_[url]) { |
| observer->OnRequestFailed(url, instance_name_); |
| } |
| } |
| } |
| } |
| |
| for (auto& entry : new_requests) { |
| request_by_id_.emplace(entry.first, std::move(entry.second)); |
| } |
| |
| if (request_by_id_.empty()) { |
| response_watch_.Reset(); |
| } |
| } |
| |
| void UrlAvailabilityRequester::ReceiverRequester::RemoveUnobservedWatches( |
| const std::set<std::string>& unobserved_urls) { |
| std::set<std::string> still_observed_urls; |
| for (auto entry = watch_by_id_.begin(); entry != watch_by_id_.end();) { |
| Watch& watch = entry->second; |
| auto split = PartitionUrlsBySetMembership(&watch.urls, unobserved_urls); |
| if (split == watch.urls.end()) { |
| ++entry; |
| continue; |
| } |
| |
| MoveVectorSegment(watch.urls.begin(), split, &still_observed_urls); |
| entry = watch_by_id_.erase(entry); |
| } |
| |
| std::vector<std::string> urls; |
| urls.reserve(still_observed_urls.size()); |
| for (auto& url : still_observed_urls) { |
| urls.emplace_back(std::move(url)); |
| } |
| |
| RequestUrlAvailabilities(std::move(urls)); |
| // TODO(btolsch): These message watch cancels could be tested by expecting |
| // messages to fall through to the default watch. |
| if (watch_by_id_.empty()) { |
| event_watch_.Reset(); |
| } |
| } |
| |
| void UrlAvailabilityRequester::ReceiverRequester::RemoveReceiver() { |
| for (const auto& availability : known_availability_by_url_) { |
| if (availability.second == msgs::UrlAvailability::kAvailable) { |
| const std::string& url = availability.first; |
| for (auto& observer : listener_.observers_by_url_[url]) { |
| observer->OnReceiverUnavailable(url, instance_name_); |
| } |
| } |
| } |
| } |
| |
| ErrorOr<size_t> UrlAvailabilityRequester::ReceiverRequester::OnStreamMessage( |
| uint64_t instance_id, |
| uint64_t connection_id, |
| msgs::Type message_type, |
| const uint8_t* buffer, |
| size_t buffer_size, |
| Clock::time_point now) { |
| switch (message_type) { |
| case msgs::Type::kPresentationUrlAvailabilityResponse: { |
| msgs::PresentationUrlAvailabilityResponse response; |
| const msgs::CborResult result = |
| msgs::DecodePresentationUrlAvailabilityResponse(buffer, buffer_size, |
| response); |
| if (result < 0) { |
| if (result == msgs::kParserEOF) { |
| return Error::Code::kCborIncompleteMessage; |
| } |
| |
| OSP_LOG_WARN << "parse error: " << result; |
| return Error::Code::kCborParsing; |
| } else { |
| auto request_entry = request_by_id_.find(response.request_id); |
| if (request_entry == request_by_id_.end()) { |
| OSP_LOG_ERROR << "bad response id: " << response.request_id; |
| return Error::Code::kCborInvalidResponseId; |
| } |
| |
| std::vector<std::string>& urls = request_entry->second.urls; |
| if (urls.size() != response.url_availabilities.size()) { |
| OSP_LOG_WARN << "bad response size: expected " << urls.size() |
| << " but got " << response.url_availabilities.size(); |
| return Error::Code::kCborInvalidMessage; |
| } |
| |
| Error::Code update_result = |
| UpdateAvailabilities(urls, response.url_availabilities); |
| if (update_result != Error::Code::kNone) { |
| return update_result; |
| } |
| |
| request_by_id_.erase(response.request_id); |
| if (request_by_id_.empty()) { |
| response_watch_.Reset(); |
| } |
| return result; |
| } |
| } |
| |
| case msgs::Type::kPresentationUrlAvailabilityEvent: { |
| msgs::PresentationUrlAvailabilityEvent event; |
| const msgs::CborResult result = |
| msgs::DecodePresentationUrlAvailabilityEvent(buffer, buffer_size, |
| event); |
| if (result < 0) { |
| if (result == msgs::kParserEOF) { |
| return Error::Code::kCborIncompleteMessage; |
| } |
| |
| OSP_LOG_WARN << "parse error: " << result; |
| return Error::Code::kCborParsing; |
| } else { |
| auto watch_entry = watch_by_id_.find(event.watch_id); |
| if (watch_entry != watch_by_id_.end()) { |
| std::vector<std::string> urls = watch_entry->second.urls; |
| Error::Code update_result = |
| UpdateAvailabilities(urls, event.url_availabilities); |
| if (update_result != Error::Code::kNone) { |
| return update_result; |
| } |
| } |
| return result; |
| } |
| } |
| |
| default: |
| break; |
| } |
| return Error::Code::kCborParsing; |
| } |
| |
| } // namespace openscreen::osp |