blob: b0fa6b6114442bfe3f2c9eee411f40222bc25127 [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/mirroring/service/session_monitor.h"
#include <string>
#include <vector>
#include "base/bind.h"
#include "base/json/json_reader.h"
#include "base/json/json_writer.h"
#include "base/stl_util.h"
#include "components/mirroring/service/value_util.h"
#include "components/mirroring/service/wifi_status_monitor.h"
#include "components/version_info/version_info.h"
#include "media/cast/cast_environment.h"
#include "media/cast/logging/log_serializer.h"
#include "media/cast/logging/logging_defines.h"
#include "media/cast/logging/proto/raw_events.pb.h"
#include "media/cast/logging/raw_event_subscriber_bundle.h"
#include "net/traffic_annotation/network_traffic_annotation.h"
#include "services/network/public/cpp/resource_request.h"
#include "services/network/public/cpp/simple_url_loader.h"
using mirroring::mojom::SessionError;
namespace mirroring {
namespace {
// Interval between snapshots of Cast Streaming events/stats.
constexpr base::TimeDelta kSnapshotInterval =
base::TimeDelta::FromMinutes(15); // Typical: 15 min → ~3 MB
// The maximum number of bytes for receiver's setup info. 256kb should be more
// than sufficient.
constexpr int kMaxSetupResponseSizeBytes = 262144;
// Returns the number of milliseconds elapsed since epoch.
int32_t ToEpochTime(const base::Time& time) {
return (time - base::Time::UnixEpoch()).InMilliseconds();
}
// Helper to parse the response for receiver setup info and update the tags.
bool ParseReceiverSetupInfo(const std::string& response,
base::Value* tags,
std::string* receiver_name) {
DCHECK(tags);
std::unique_ptr<base::Value> value =
base::JSONReader::ReadDeprecated(response);
std::string build_version;
bool is_connected = false;
bool is_on_ethernet = false;
bool has_update = false;
int32_t uptime_seconds = 0;
const bool result =
value && value->is_dict() &&
GetString(*value, "cast_build_revision", &build_version) &&
GetBool(*value, "connected", &is_connected) &&
GetBool(*value, "ethernet_connected", &is_on_ethernet) &&
GetBool(*value, "has_update", &has_update) &&
GetInt(*value, "uptime", &uptime_seconds) &&
GetString(*value, "name", receiver_name);
if (result) {
tags->SetKey("receiverVersion", base::Value(build_version));
tags->SetKey("receiverConnected", base::Value(is_connected));
tags->SetKey("receiverOnEthernet", base::Value(is_on_ethernet));
tags->SetKey("receiverHasUpdatePending", base::Value(has_update));
tags->SetKey("receiverUptimeSeconds", base::Value(uptime_seconds));
}
return result;
}
const char* ToErrorMessage(SessionError error) {
switch (error) {
case SessionError::ANSWER_TIME_OUT:
return "ANSWER response time out";
case SessionError::ANSWER_NOT_OK:
return "Received an error ANSWER response";
case SessionError::ANSWER_MISMATCHED_CAST_MODE:
return "Unexpected cast mode in ANSWER response.";
case SessionError::ANSWER_MISMATCHED_SSRC_LENGTH:
return "sendIndexes.length != ssrcs.length in ANSWER";
case SessionError::ANSWER_SELECT_MULTIPLE_AUDIO:
return "Receiver selected audio RTP stream twice in ANSWER";
case SessionError::ANSWER_SELECT_MULTIPLE_VIDEO:
return "Receiver selected video RTP stream twice in ANSWER";
case SessionError::ANSWER_SELECT_INVALID_INDEX:
return "Invalid indexes selected in ANSWER";
case SessionError::ANSWER_NO_AUDIO_OR_VIDEO:
return "Incorrect ANSWER message: No audio or Video.";
case SessionError::AUDIO_CAPTURE_ERROR:
return "Audio capture error";
case SessionError::VIDEO_CAPTURE_ERROR:
return "Video capture error";
case SessionError::RTP_STREAM_ERROR:
return "RTP stream error";
case SessionError::ENCODING_ERROR:
return "Encoding status error";
case SessionError::CAST_TRANSPORT_ERROR:
return "Transport error";
}
return "";
}
} // namespace
SessionMonitor::SessionMonitor(
int max_retention_bytes,
const net::IPAddress& receiver_address,
base::Value session_tags,
network::mojom::URLLoaderFactoryPtr loader_factory)
: max_retention_bytes_(max_retention_bytes),
receiver_address_(receiver_address),
session_tags_(std::move(session_tags)),
url_loader_factory_(std::move(loader_factory)),
stored_snapshots_bytes_(0),
weak_factory_(this) {
QueryReceiverSetupInfo();
}
SessionMonitor::~SessionMonitor() {}
void SessionMonitor::StartStreamingSession(
scoped_refptr<media::cast::CastEnvironment> cast_environment,
std::unique_ptr<WifiStatusMonitor> wifi_status_monitor,
SessionType session_type,
bool is_remoting) {
DCHECK(!event_subscribers_);
DCHECK(!snapshot_timer_.IsRunning());
wifi_status_monitor_ = std::move(wifi_status_monitor);
std::string session_activity =
session_type == AUDIO_AND_VIDEO
? "audio+video"
: session_type == AUDIO_ONLY ? "audio-only" : "video-only";
session_activity += is_remoting ? " remoting" : " streaming";
session_tags_.SetKey("activity", base::Value(session_activity));
// Query the receiver setup info at the beginning of each streaming session.
QueryReceiverSetupInfo();
// Start collecting Cast Streaming events/stats.
event_subscribers_ = std::make_unique<media::cast::RawEventSubscriberBundle>(
std::move(cast_environment));
if (session_type != VIDEO_ONLY)
event_subscribers_->AddEventSubscribers(true /* is_audio */);
if (session_type != AUDIO_ONLY)
event_subscribers_->AddEventSubscribers(false /* is_audio */);
// Start periodically snapshotting Cast Streaming events/stats.
snapshot_timer_.Start(FROM_HERE, kSnapshotInterval,
base::BindRepeating(&SessionMonitor::TakeSnapshot,
base::Unretained(this)));
start_time_ = base::Time::Now();
}
void SessionMonitor::StopStreamingSession() {
if (snapshot_timer_.IsRunning()) {
snapshot_timer_.Stop();
TakeSnapshot(); // Final snapshot of this streaming session.
}
event_subscribers_.reset();
wifi_status_monitor_.reset();
}
void SessionMonitor::OnStreamingError(SessionError error) {
DVLOG(2) << error;
if (!snapshot_timer_.IsRunning())
return; // Ignore errors before streaming starts.
// If the error has already been recorded, do not overwrite it with another
// since the first will usually be the most indicative of the problem.
if (error_.has_value())
return;
error_time_ = base::Time::Now();
error_.emplace(error);
}
std::vector<SessionMonitor::EventsAndStats>
SessionMonitor::AssembleBundlesAndClear(
const std::vector<int32_t>& bundle_sizes) {
std::vector<EventsAndStats> bundles;
// If a streaming session is currently active, take a snapshot now so that all
// data collected since the last automatic periodic snapshot is included in
// the bundle.
if (snapshot_timer_.IsRunning()) {
TakeSnapshot();
snapshot_timer_.Reset();
}
for (int32_t max_bytes : bundle_sizes)
bundles.emplace_back(MakeSliceOfSnapshots(max_bytes));
snapshots_.clear();
stored_snapshots_bytes_ = 0;
return bundles;
}
SessionMonitor::EventsAndStats SessionMonitor::MakeSliceOfSnapshots(
int32_t max_bytes) {
// Immediately subtract two bytes for array brackets ("[]") since
// AssembleSnapshotsAndClear() will produce a JSON array of each snapshot's
// stats JSON.
max_bytes -= 2;
base::circular_deque<EventsAndStats> slice;
for (int i = snapshots_.size() - 1; i >= 0; --i) {
max_bytes -= snapshots_[i].second.length() + 1 /* size of the comma */;
// If insufficient bytes remain to retain the current stats JSON, stop
// adding more Snapshots to the slice.
if (max_bytes < 0)
break;
slice.emplace_front(std::make_pair("", snapshots_[i].second));
// If sufficient bytes remain to include the current events Blob, add it to
// the slice.
if (!snapshots_[i].first.empty()) {
const int32_t events_size = snapshots_[i].first.length();
if (max_bytes >= events_size) {
slice[0].first = snapshots_[i].first;
max_bytes -= events_size;
}
}
}
EventsAndStats bundle;
if (slice.empty())
return bundle;
bundle.second = "[";
for (size_t i = 0; i < slice.size(); i++) {
// To produce a single events gzipped-data Blob, simply concatenate the
// individual gzipped-data Blobs. The spec for gzip explicitly allows for
// this. :-)
bundle.first += slice[i].first;
// To produce the JSON stats array, concatenate the mix of string and Blob
// objects to produce a single UTF-8 encoded string.
if (i > 0)
bundle.second += ",";
bundle.second += slice[i].second;
}
bundle.second += "]";
return bundle;
}
void SessionMonitor::TakeSnapshot() {
// Session-level tags.
base::Value tags = session_tags_.Clone();
// Add snapshot-level tags.
tags.SetKey("startTime", base::Value(ToEpochTime(start_time_)));
const base::Time end_time = base::Time::Now();
tags.SetKey("endTime", base::Value(ToEpochTime(end_time)));
start_time_ = end_time;
if (wifi_status_monitor_) {
const std::vector<WifiStatus> wifi_status =
wifi_status_monitor_->GetRecentValues();
base::Value::ListStorage wifi_status_list;
for (const auto& status : wifi_status) {
base::Value status_value(base::Value::Type::DICTIONARY);
status_value.SetKey("wifiSnr", base::Value(status.snr));
status_value.SetKey("wifiSpeed", base::Value(status.speed));
status_value.SetKey("timestamp",
base::Value(ToEpochTime(status.timestamp)));
wifi_status_list.emplace_back(std::move(status_value));
}
tags.SetKey("receiverWifiStatus", base::Value(wifi_status_list));
}
// Streaming error tags (if any).
if (error_.has_value()) {
tags.SetKey("streamingErrorTime", base::Value(ToEpochTime(error_time_)));
tags.SetKey("streamingErrorMessage",
base::Value(ToErrorMessage(error_.value())));
error_.reset();
}
std::string tags_string;
base::JSONWriter::Write(tags, &tags_string);
// Collect raw events.
std::string events = GetEventLogsAndReset(true, tags_string) +
GetEventLogsAndReset(false, tags_string);
// Collect stats.
std::unique_ptr<base::DictionaryValue> audio_stats =
base::DictionaryValue::From(GetStatsAndReset(true));
std::unique_ptr<base::DictionaryValue> video_stats =
base::DictionaryValue::From(GetStatsAndReset(false));
base::DictionaryValue stats;
if (audio_stats)
stats.MergeDictionary(audio_stats.get());
if (video_stats)
stats.MergeDictionary(video_stats.get());
stats.SetKey("tags", std::move(tags));
std::string stats_string;
base::JSONWriter::Write(stats, &stats_string);
int snapshots_bytes =
stored_snapshots_bytes_ + events.size() + stats_string.size();
// Prune |snapshots_| if necessary.
while (snapshots_bytes > max_retention_bytes_) {
snapshots_bytes -= snapshots_[0].first.size();
snapshots_[0].first = std::string();
if (snapshots_bytes <= max_retention_bytes_)
break;
snapshots_bytes -= snapshots_[0].second.size();
snapshots_.pop_front();
}
snapshots_.emplace_back(std::make_pair(events, stats_string));
stored_snapshots_bytes_ = snapshots_bytes;
}
std::string SessionMonitor::GetReceiverBuildVersion() const {
std::string build_version;
GetString(session_tags_, "receiverVersion", &build_version);
return build_version;
}
std::string SessionMonitor::GetEventLogsAndReset(
bool is_audio,
const std::string& extra_data) {
std::string result;
if (!event_subscribers_.get())
return result;
media::cast::EncodingEventSubscriber* subscriber =
event_subscribers_->GetEncodingEventSubscriber(is_audio);
if (!subscriber)
return result;
media::cast::proto::LogMetadata metadata;
media::cast::FrameEventList frame_events;
media::cast::PacketEventList packet_events;
subscriber->GetEventsAndReset(&metadata, &frame_events, &packet_events);
if (!extra_data.empty())
metadata.set_extra_data(extra_data);
media::cast::proto::GeneralDescription* gen_desc =
metadata.mutable_general_description();
gen_desc->set_product(version_info::GetProductName());
gen_desc->set_product_version(version_info::GetVersionNumber());
gen_desc->set_os(version_info::GetOSType());
result.resize(media::cast::kMaxSerializedBytes);
int output_bytes;
// TODO(xjz): media::cast::SerializeEvents() shouldn't require the caller to
// pre-allocate the memory. It should return a string result.
if (media::cast::SerializeEvents(metadata, frame_events, packet_events,
true /* compress */,
media::cast::kMaxSerializedBytes,
base::data(result), &output_bytes)) {
result.resize(output_bytes);
} else {
result.clear();
}
return result;
}
std::unique_ptr<base::Value> SessionMonitor::GetStatsAndReset(bool is_audio) {
if (!event_subscribers_.get())
return nullptr;
media::cast::StatsEventSubscriber* subscriber =
event_subscribers_->GetStatsEventSubscriber(is_audio);
if (!subscriber)
return nullptr;
std::unique_ptr<base::Value> stats = subscriber->GetStats();
subscriber->Reset();
return stats;
}
void SessionMonitor::QueryReceiverSetupInfo() {
auto resource_request = std::make_unique<network::ResourceRequest>();
resource_request->method = "GET";
resource_request->url = GURL("http://" + receiver_address_.ToString() +
":8008/setup/eureka_info");
net::NetworkTrafficAnnotationTag traffic_annotation =
net::DefineNetworkTrafficAnnotation("mirroring_get_setup_info", R"(
semantics {
sender: "Mirroring Service"
description:
"Mirroring Service sends a request to the receiver to obtain its "
"setup info such as the build version, the model name, etc. The "
"data is included in mirroring feedback for analysis."
trigger:
"A tab/desktop mirroring session starts."
data: "An HTTP GET request."
destination: OTHER
destination_other:
"A mirroring receiver, such as a ChromeCast device."
}
policy {
cookies_allowed: NO
setting: "This feature cannot be disabled in settings."
chrome_policy {
EnableMediaRouter {
EnableMediaRouter: false
}
}
})");
std::unique_ptr<network::SimpleURLLoader> url_loader =
network::SimpleURLLoader::Create(std::move(resource_request),
traffic_annotation);
network::SimpleURLLoader* url_loader_ptr = url_loader.get();
url_loader_ptr->DownloadToString(
url_loader_factory_.get(),
base::BindOnce(
[](base::WeakPtr<SessionMonitor> monitor,
std::unique_ptr<network::SimpleURLLoader> url_loader,
std::unique_ptr<std::string> response) {
if (monitor) {
if (url_loader->NetError() != net::OK ||
!ParseReceiverSetupInfo(*response, &monitor->session_tags_,
&monitor->receiver_name_))
VLOG(2) << "Unable to fetch/parse receiver setup info.";
}
},
weak_factory_.GetWeakPtr(), std::move(url_loader)),
kMaxSetupResponseSizeBytes);
}
} // namespace mirroring