| // Copyright 2018 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "components/mirroring/service/session_monitor.h" |
| |
| #include <string> |
| #include <vector> |
| |
| #include "base/json/json_reader.h" |
| #include "base/json/json_writer.h" |
| #include "base/stl_util.h" |
| #include "components/mirroring/service/value_util.h" |
| #include "components/mirroring/service/wifi_status_monitor.h" |
| #include "components/version_info/version_info.h" |
| #include "media/cast/cast_environment.h" |
| #include "media/cast/logging/log_serializer.h" |
| #include "media/cast/logging/logging_defines.h" |
| #include "media/cast/logging/proto/raw_events.pb.h" |
| #include "media/cast/logging/raw_event_subscriber_bundle.h" |
| #include "net/traffic_annotation/network_traffic_annotation.h" |
| #include "services/network/public/cpp/resource_request.h" |
| #include "services/network/public/cpp/simple_url_loader.h" |
| |
| namespace mirroring { |
| |
| namespace { |
| |
| // Interval between snapshots of Cast Streaming events/stats. |
| constexpr base::TimeDelta kSnapshotInterval = |
| base::TimeDelta::FromMinutes(15); // Typical: 15 min → ~3 MB |
| |
| // The maximum number of bytes for receiver's setup info. 256kb should be more |
| // than sufficient. |
| constexpr int kMaxSetupResponseSizeBytes = 262144; |
| |
| // Returns the number of milliseconds elapsed since epoch. |
| int32_t ToEpochTime(const base::Time& time) { |
| return (time - base::Time::UnixEpoch()).InMilliseconds(); |
| } |
| |
| // Helper to parse the response for receiver setup info and update the tags. |
| bool AddReceiverSetupInfoTags(const std::string& response, base::Value* tags) { |
| DCHECK(tags); |
| std::unique_ptr<base::Value> value = base::JSONReader::Read(response); |
| |
| std::string build_version; |
| bool is_connected = false; |
| bool is_on_ethernet = false; |
| bool has_update = false; |
| int32_t uptime_seconds = 0; |
| |
| const bool result = |
| value && value->is_dict() && |
| GetString(*value, "cast_build_revision", &build_version) && |
| GetBool(*value, "connected", &is_connected) && |
| GetBool(*value, "ethernet_connected", &is_on_ethernet) && |
| GetBool(*value, "has_update", &has_update) && |
| GetInt(*value, "uptime", &uptime_seconds); |
| if (result) { |
| tags->SetKey("receiverVersion", base::Value(build_version)); |
| tags->SetKey("receiverConnected", base::Value(is_connected)); |
| tags->SetKey("receiverOnEthernet", base::Value(is_on_ethernet)); |
| tags->SetKey("receiverHasUpdatePending", base::Value(has_update)); |
| tags->SetKey("receiverUptimeSeconds", base::Value(uptime_seconds)); |
| } |
| return result; |
| } |
| |
| const char* ToErrorMessage(SessionError error) { |
| switch (error) { |
| case ANSWER_TIME_OUT: |
| return "ANSWER response time out"; |
| case ANSWER_NOT_OK: |
| return "Received an error ANSWER response"; |
| case ANSWER_MISMATCHED_CAST_MODE: |
| return "Unexpected cast mode in ANSWER response."; |
| case ANSWER_MISMATCHED_SSRC_LENGTH: |
| return "sendIndexes.length != ssrcs.length in ANSWER"; |
| case ANSWER_SELECT_MULTIPLE_AUDIO: |
| return "Receiver selected audio RTP stream twice in ANSWER"; |
| case ANSWER_SELECT_MULTIPLE_VIDEO: |
| return "Receiver selected video RTP stream twice in ANSWER"; |
| case ANSWER_SELECT_INVALID_INDEX: |
| return "Invalid indexes selected in ANSWER"; |
| case ANSWER_NO_AUDIO_OR_VIDEO: |
| return "Incorrect ANSWER message: No audio or Video."; |
| case AUDIO_CAPTURE_ERROR: |
| return "Audio capture error"; |
| case VIDEO_CAPTURE_ERROR: |
| return "Video capture error"; |
| case RTP_STREAM_ERROR: |
| return "RTP stream error"; |
| case ENCODING_ERROR: |
| return "Encoding status error"; |
| case CAST_TRANSPORT_ERROR: |
| return "Transport error"; |
| } |
| return ""; |
| } |
| |
| } // namespace |
| |
| SessionMonitor::SessionMonitor( |
| int max_retention_bytes, |
| const net::IPAddress& receiver_address, |
| base::Value session_tags, |
| network::mojom::URLLoaderFactoryPtr loader_factory) |
| : max_retention_bytes_(max_retention_bytes), |
| receiver_address_(receiver_address), |
| session_tags_(std::move(session_tags)), |
| url_loader_factory_(std::move(loader_factory)), |
| stored_snapshots_bytes_(0), |
| weak_factory_(this) { |
| QueryReceiverSetupInfo(); |
| } |
| |
| SessionMonitor::~SessionMonitor() {} |
| |
| void SessionMonitor::StartStreamingSession( |
| scoped_refptr<media::cast::CastEnvironment> cast_environment, |
| std::unique_ptr<WifiStatusMonitor> wifi_status_monitor, |
| SessionType session_type, |
| bool is_remoting) { |
| DCHECK(!event_subscribers_); |
| DCHECK(!snapshot_timer_.IsRunning()); |
| |
| wifi_status_monitor_ = std::move(wifi_status_monitor); |
| std::string session_activity = |
| session_type == AUDIO_AND_VIDEO |
| ? "audio+video" |
| : session_type == AUDIO_ONLY ? "audio-only" : "video-only"; |
| session_activity += is_remoting ? " remoting" : " streaming"; |
| session_tags_.SetKey("activity", base::Value(session_activity)); |
| |
| // Query the receiver setup info at the beginning of each streaming session. |
| QueryReceiverSetupInfo(); |
| |
| // Start collecting Cast Streaming events/stats. |
| event_subscribers_ = std::make_unique<media::cast::RawEventSubscriberBundle>( |
| std::move(cast_environment)); |
| if (session_type != VIDEO_ONLY) |
| event_subscribers_->AddEventSubscribers(true /* is_audio */); |
| if (session_type != AUDIO_ONLY) |
| event_subscribers_->AddEventSubscribers(false /* is_audio */); |
| |
| // Start periodically snapshotting Cast Streaming events/stats. |
| snapshot_timer_.Start(FROM_HERE, kSnapshotInterval, |
| base::BindRepeating(&SessionMonitor::TakeSnapshot, |
| base::Unretained(this))); |
| |
| start_time_ = base::Time::Now(); |
| } |
| |
| void SessionMonitor::StopStreamingSession() { |
| if (snapshot_timer_.IsRunning()) { |
| snapshot_timer_.Stop(); |
| TakeSnapshot(); // Final snapshot of this streaming session. |
| } |
| event_subscribers_.reset(); |
| wifi_status_monitor_.reset(); |
| } |
| |
| void SessionMonitor::OnStreamingError(SessionError error) { |
| DVLOG(2) << error; |
| |
| if (!snapshot_timer_.IsRunning()) |
| return; // Ignore errors before streaming starts. |
| // If the error has already been recorded, do not overwrite it with another |
| // since the first will usually be the most indicative of the problem. |
| if (error_.has_value()) |
| return; |
| error_time_ = base::Time::Now(); |
| error_.emplace(error); |
| } |
| |
| std::vector<SessionMonitor::EventsAndStats> |
| SessionMonitor::AssembleBundlesAndClear( |
| const std::vector<int32_t>& bundle_sizes) { |
| std::vector<EventsAndStats> bundles; |
| // If a streaming session is currently active, take a snapshot now so that all |
| // data collected since the last automatic periodic snapshot is included in |
| // the bundle. |
| if (snapshot_timer_.IsRunning()) { |
| TakeSnapshot(); |
| snapshot_timer_.Reset(); |
| } |
| |
| for (int32_t max_bytes : bundle_sizes) |
| bundles.emplace_back(MakeSliceOfSnapshots(max_bytes)); |
| snapshots_.clear(); |
| stored_snapshots_bytes_ = 0; |
| return bundles; |
| } |
| |
| SessionMonitor::EventsAndStats SessionMonitor::MakeSliceOfSnapshots( |
| int32_t max_bytes) { |
| // Immediately subtract two bytes for array brackets ("[]") since |
| // AssembleSnapshotsAndClear() will produce a JSON array of each snapshot's |
| // stats JSON. |
| max_bytes -= 2; |
| base::circular_deque<EventsAndStats> slice; |
| for (int i = snapshots_.size() - 1; i >= 0; --i) { |
| max_bytes -= snapshots_[i].second.length() + 1 /* size of the comma */; |
| // If insufficient bytes remain to retain the current stats JSON, stop |
| // adding more Snapshots to the slice. |
| if (max_bytes < 0) |
| break; |
| slice.emplace_front(std::make_pair("", snapshots_[i].second)); |
| // If sufficient bytes remain to include the current events Blob, add it to |
| // the slice. |
| if (!snapshots_[i].first.empty()) { |
| const int32_t events_size = snapshots_[i].first.length(); |
| if (max_bytes >= events_size) { |
| slice[0].first = snapshots_[i].first; |
| max_bytes -= events_size; |
| } |
| } |
| } |
| |
| EventsAndStats bundle; |
| if (slice.empty()) |
| return bundle; |
| |
| bundle.second = "["; |
| for (size_t i = 0; i < slice.size(); i++) { |
| // To produce a single events gzipped-data Blob, simply concatenate the |
| // individual gzipped-data Blobs. The spec for gzip explicitly allows for |
| // this. :-) |
| bundle.first += slice[i].first; |
| // To produce the JSON stats array, concatenate the mix of string and Blob |
| // objects to produce a single UTF-8 encoded string. |
| if (i > 0) |
| bundle.second += ","; |
| bundle.second += slice[i].second; |
| } |
| bundle.second += "]"; |
| |
| return bundle; |
| } |
| |
| void SessionMonitor::TakeSnapshot() { |
| // Session-level tags. |
| base::Value tags = session_tags_.Clone(); |
| |
| // Add snapshot-level tags. |
| tags.SetKey("startTime", base::Value(ToEpochTime(start_time_))); |
| const base::Time end_time = base::Time::Now(); |
| tags.SetKey("endTime", base::Value(ToEpochTime(end_time))); |
| start_time_ = end_time; |
| |
| if (wifi_status_monitor_) { |
| const std::vector<WifiStatus> wifi_status = |
| wifi_status_monitor_->GetRecentValues(); |
| base::Value::ListStorage wifi_status_list; |
| for (const auto& status : wifi_status) { |
| base::Value status_value(base::Value::Type::DICTIONARY); |
| status_value.SetKey("wifiSnr", base::Value(status.snr)); |
| status_value.SetKey("wifiSpeed", base::Value(status.speed)); |
| status_value.SetKey("timestamp", |
| base::Value(ToEpochTime(status.timestamp))); |
| wifi_status_list.emplace_back(std::move(status_value)); |
| } |
| tags.SetKey("receiverWifiStatus", base::Value(wifi_status_list)); |
| } |
| |
| // Streaming error tags (if any). |
| if (error_.has_value()) { |
| tags.SetKey("streamingErrorTime", base::Value(ToEpochTime(error_time_))); |
| tags.SetKey("streamingErrorMessage", |
| base::Value(ToErrorMessage(error_.value()))); |
| error_.reset(); |
| } |
| |
| std::string tags_string; |
| base::JSONWriter::Write(tags, &tags_string); |
| |
| // Collect raw events. |
| std::string events = GetEventLogsAndReset(true, tags_string) + |
| GetEventLogsAndReset(false, tags_string); |
| |
| // Collect stats. |
| std::unique_ptr<base::DictionaryValue> audio_stats = |
| base::DictionaryValue::From(GetStatsAndReset(true)); |
| std::unique_ptr<base::DictionaryValue> video_stats = |
| base::DictionaryValue::From(GetStatsAndReset(false)); |
| base::DictionaryValue stats; |
| if (audio_stats) |
| stats.MergeDictionary(audio_stats.get()); |
| if (video_stats) |
| stats.MergeDictionary(video_stats.get()); |
| stats.SetKey("tags", std::move(tags)); |
| std::string stats_string; |
| base::JSONWriter::Write(stats, &stats_string); |
| |
| int snapshots_bytes = |
| stored_snapshots_bytes_ + events.size() + stats_string.size(); |
| // Prune |snapshots_| if necessary. |
| while (snapshots_bytes > max_retention_bytes_) { |
| snapshots_bytes -= snapshots_[0].first.size(); |
| snapshots_[0].first = std::string(); |
| if (snapshots_bytes <= max_retention_bytes_) |
| break; |
| snapshots_bytes -= snapshots_[0].second.size(); |
| snapshots_.pop_front(); |
| } |
| snapshots_.emplace_back(std::make_pair(events, stats_string)); |
| stored_snapshots_bytes_ = snapshots_bytes; |
| } |
| |
| std::string SessionMonitor::GetReceiverBuildVersion() const { |
| std::string build_version; |
| GetString(session_tags_, "receiverVersion", &build_version); |
| return build_version; |
| } |
| |
| std::string SessionMonitor::GetEventLogsAndReset( |
| bool is_audio, |
| const std::string& extra_data) { |
| std::string result; |
| if (!event_subscribers_.get()) |
| return result; |
| |
| media::cast::EncodingEventSubscriber* subscriber = |
| event_subscribers_->GetEncodingEventSubscriber(is_audio); |
| if (!subscriber) |
| return result; |
| |
| media::cast::proto::LogMetadata metadata; |
| media::cast::FrameEventList frame_events; |
| media::cast::PacketEventList packet_events; |
| |
| subscriber->GetEventsAndReset(&metadata, &frame_events, &packet_events); |
| |
| if (!extra_data.empty()) |
| metadata.set_extra_data(extra_data); |
| media::cast::proto::GeneralDescription* gen_desc = |
| metadata.mutable_general_description(); |
| gen_desc->set_product(version_info::GetProductName()); |
| gen_desc->set_product_version(version_info::GetVersionNumber()); |
| gen_desc->set_os(version_info::GetOSType()); |
| |
| result.resize(media::cast::kMaxSerializedBytes); |
| int output_bytes; |
| // TODO(xjz): media::cast::SerializeEvents() shouldn't require the caller to |
| // pre-allocate the memory. It should return a string result. |
| if (media::cast::SerializeEvents(metadata, frame_events, packet_events, |
| true /* compress */, |
| media::cast::kMaxSerializedBytes, |
| base::data(result), &output_bytes)) { |
| result.resize(output_bytes); |
| } else { |
| result.clear(); |
| } |
| return result; |
| } |
| |
| std::unique_ptr<base::Value> SessionMonitor::GetStatsAndReset(bool is_audio) { |
| if (!event_subscribers_.get()) |
| return nullptr; |
| |
| media::cast::StatsEventSubscriber* subscriber = |
| event_subscribers_->GetStatsEventSubscriber(is_audio); |
| if (!subscriber) |
| return nullptr; |
| |
| std::unique_ptr<base::Value> stats = subscriber->GetStats(); |
| subscriber->Reset(); |
| return stats; |
| } |
| |
| void SessionMonitor::QueryReceiverSetupInfo() { |
| auto resource_request = std::make_unique<network::ResourceRequest>(); |
| resource_request->method = "GET"; |
| resource_request->url = GURL("http://" + receiver_address_.ToString() + |
| ":8008/setup/eureka_info"); |
| net::NetworkTrafficAnnotationTag traffic_annotation = |
| net::DefineNetworkTrafficAnnotation("mirroring_get_setup_info", R"( |
| semantics { |
| sender: "Mirroring Service" |
| description: |
| "Mirroring Service sends a request to the receiver to obtain its " |
| "setup info such as the build version, the model name, etc. The " |
| "data is included in mirroring feedback for analysis." |
| trigger: |
| "A tab/desktop mirroring session starts." |
| data: "An HTTP GET request." |
| destination: OTHER |
| destination_other: |
| "A mirroring receiver, such as a ChromeCast device." |
| } |
| policy { |
| cookies_allowed: NO |
| setting: "This feature cannot be disabled in settings." |
| chrome_policy { |
| EnableMediaRouter { |
| EnableMediaRouter: false |
| } |
| } |
| })"); |
| std::unique_ptr<network::SimpleURLLoader> url_loader = |
| network::SimpleURLLoader::Create(std::move(resource_request), |
| traffic_annotation); |
| network::SimpleURLLoader* url_loader_ptr = url_loader.get(); |
| url_loader_ptr->DownloadToString( |
| url_loader_factory_.get(), |
| base::BindOnce( |
| [](base::WeakPtr<SessionMonitor> monitor, |
| std::unique_ptr<network::SimpleURLLoader> url_loader, |
| std::unique_ptr<std::string> response) { |
| if (monitor) { |
| if (url_loader->NetError() != net::OK || |
| !AddReceiverSetupInfoTags(*response, &monitor->session_tags_)) |
| VLOG(2) << "Unable to fetch/parse receiver setup info."; |
| } |
| }, |
| weak_factory_.GetWeakPtr(), std::move(url_loader)), |
| kMaxSetupResponseSizeBytes); |
| } |
| |
| } // namespace mirroring |