| // Copyright 2017 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "services/tracing/coordinator.h" |
| |
| #include <algorithm> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/callback_forward.h" |
| #include "base/callback_helpers.h" |
| #include "base/guid.h" |
| #include "base/json/json_writer.h" |
| #include "base/json/string_escape.h" |
| #include "base/logging.h" |
| #include "base/strings/string_split.h" |
| #include "base/strings/string_util.h" |
| #include "base/task/post_task.h" |
| #include "base/task/task_traits.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "base/time/time.h" |
| #include "base/trace_event/trace_event.h" |
| #include "mojo/public/cpp/system/data_pipe_utils.h" |
| #include "services/service_manager/public/cpp/bind_source_info.h" |
| #include "services/service_manager/public/cpp/service_context_ref.h" |
| #include "services/tracing/agent_registry.h" |
| #include "services/tracing/public/mojom/constants.mojom.h" |
| #include "services/tracing/public/mojom/tracing.mojom.h" |
| #include "services/tracing/recorder.h" |
| |
| namespace { |
| |
| const char kMetadataTraceLabel[] = "metadata"; |
| |
| const char kRequestBufferUsageClosureName[] = "RequestBufferUsageClosure"; |
| |
| } // namespace |
| |
| namespace tracing { |
| |
| class Coordinator::TraceStreamer : public base::SupportsWeakPtr<TraceStreamer> { |
| public: |
| // Constructed on |main_task_runner_|. |
| TraceStreamer( |
| mojo::ScopedDataPipeProducerHandle stream, |
| const std::string& agent_label, |
| const scoped_refptr<base::SequencedTaskRunner>& main_task_runner, |
| base::WeakPtr<Coordinator> coordinator) |
| : stream_(std::move(stream)), |
| agent_label_(agent_label), |
| main_task_runner_(main_task_runner), |
| coordinator_(coordinator), |
| metadata_(new base::DictionaryValue()), |
| stream_is_empty_(true), |
| json_field_name_written_(false) {} |
| |
| // Destroyed on |backend_task_runner_|. |
| ~TraceStreamer() = default; |
| |
| // Called from |backend_task_runner_|. |
| void CreateAndSendRecorder( |
| const std::string& label, |
| mojom::TraceDataType type, |
| base::WeakPtr<AgentRegistry::AgentEntry> agent_entry) { |
| mojom::RecorderPtr ptr; |
| mojom::RecorderRequest request = MakeRequest(&ptr); |
| if (processed_labels_.count(label) == 0) { |
| auto recorder = std::make_unique<Recorder>( |
| std::move(request), type, |
| base::BindRepeating(&Coordinator::TraceStreamer::OnRecorderDataChange, |
| AsWeakPtr(), label)); |
| recorders_[label].insert(std::move(recorder)); |
| DCHECK(type != mojom::TraceDataType::STRING || |
| recorders_[label].size() == 1); |
| } |
| |
| // Tracing agent proxies are bound on the main thread and should be called |
| // from the main thread. |
| main_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&Coordinator::SendRecorder, coordinator_, |
| agent_entry, std::move(ptr))); |
| } |
| |
| // Called from |main_task_runner_| either after flushing is complete or at |
| // shutdown. We either will not write to the stream afterwards or do not care |
| // what happens to what we try to write. |
| void CloseStream() { |
| DCHECK(stream_.is_valid()); |
| stream_.reset(); |
| } |
| |
| // Called from |main_task_runner_| after flushing is completed. So we are sure |
| // there is no race in accessing metadata_. |
| std::unique_ptr<base::DictionaryValue> GetMetadata() { |
| return std::move(metadata_); |
| } |
| |
| private: |
| // Handles synchronize writes to |stream_|, if the stream is not already |
| // closed. |
| void WriteToStream(const std::string& data) { |
| if (stream_.is_valid()) |
| mojo::BlockingCopyFromString(data, stream_); |
| } |
| |
| // Called from |backend_task_runner_|. |
| void OnRecorderDataChange(const std::string& label) { |
| // Bail out if we are in the middle of writing events for another label to |
| // the stream, since we do not want to interleave chunks for different |
| // fields. For example, we do not want to mix |traceEvent| chunks with |
| // |systrace| chunks. |
| // |
| // If we receive a |systemTraceEvents| chunk from an agent while writing |
| // |traceEvent| chunks to the stream, we wait until all agents that send |
| // |traceEvent| chunks are done, and then, we start writing |
| // |systemTraceEvents| chunks. |
| if (!streaming_label_.empty() && streaming_label_ != label) |
| return; |
| |
| while (streaming_label_.empty() || !StreamEventsForCurrentLabel()) { |
| // We are not waiting for data from any particular label now. So, we look |
| // at the recorders that have some data available and select the next |
| // label to stream. |
| if (!streaming_label_.empty()) { |
| processed_labels_.insert(streaming_label_); |
| streaming_label_.clear(); |
| } |
| bool all_finished = true; |
| for (const auto& key_value : recorders_) { |
| for (const auto& recorder : key_value.second) { |
| all_finished &= !recorder->is_recording(); |
| if (!recorder->data().empty()) { |
| streaming_label_ = key_value.first; |
| json_field_name_written_ = false; |
| break; |
| } |
| } |
| if (!streaming_label_.empty()) |
| break; |
| } |
| |
| if (streaming_label_.empty()) { |
| // No recorder has any data for us, right now. |
| if (all_finished) { |
| StreamMetadata(); |
| if (!stream_is_empty_ && agent_label_.empty()) { |
| WriteToStream("}"); |
| stream_is_empty_ = false; |
| } |
| // Recorder connections should be closed on their binding thread. |
| main_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&Coordinator::OnFlushDone, coordinator_)); |
| } |
| return; |
| } |
| } |
| } |
| |
| // Called from |backend_task_runner_|. |
| bool StreamEventsForCurrentLabel() { |
| bool waiting_for_agents = false; |
| mojom::TraceDataType data_type = |
| (*recorders_[streaming_label_].begin())->data_type(); |
| for (const auto& recorder : recorders_[streaming_label_]) { |
| waiting_for_agents |= recorder->is_recording(); |
| if (!agent_label_.empty() && streaming_label_ != agent_label_) |
| recorder->clear_data(); |
| if (recorder->data().empty()) |
| continue; |
| |
| std::string prefix; |
| if (!json_field_name_written_ && agent_label_.empty()) { |
| prefix = (stream_is_empty_ ? "{\"" : ",\"") + streaming_label_ + "\":"; |
| switch (data_type) { |
| case mojom::TraceDataType::ARRAY: |
| prefix += "["; |
| break; |
| case mojom::TraceDataType::OBJECT: |
| prefix += "{"; |
| break; |
| case mojom::TraceDataType::STRING: |
| prefix += "\""; |
| break; |
| default: |
| NOTREACHED(); |
| } |
| json_field_name_written_ = true; |
| } |
| if (data_type == mojom::TraceDataType::STRING) { |
| // Escape characters if needed for string data. |
| std::string escaped; |
| base::EscapeJSONString(recorder->data(), false /* put_in_quotes */, |
| &escaped); |
| WriteToStream(prefix + escaped); |
| } else { |
| if (prefix.empty() && !stream_is_empty_) |
| prefix = ","; |
| WriteToStream(prefix + recorder->data()); |
| } |
| stream_is_empty_ = false; |
| recorder->clear_data(); |
| } |
| if (!waiting_for_agents) { |
| if (json_field_name_written_) { |
| switch (data_type) { |
| case mojom::TraceDataType::ARRAY: |
| WriteToStream("]"); |
| break; |
| case mojom::TraceDataType::OBJECT: |
| WriteToStream("}"); |
| break; |
| case mojom::TraceDataType::STRING: |
| WriteToStream("\""); |
| break; |
| default: |
| NOTREACHED(); |
| } |
| stream_is_empty_ = false; |
| } |
| } |
| return waiting_for_agents; |
| } |
| |
| // Called from |backend_task_runner_|. |
| void StreamMetadata() { |
| if (!agent_label_.empty()) |
| return; |
| |
| for (const auto& key_value : recorders_) { |
| for (const auto& recorder : key_value.second) { |
| metadata_->MergeDictionary(&(recorder->metadata())); |
| } |
| } |
| |
| std::string metadataJSON; |
| if (!metadata_->empty() && |
| base::JSONWriter::Write(*metadata_, &metadataJSON)) { |
| std::string prefix = stream_is_empty_ ? "{\"" : ",\""; |
| WriteToStream(prefix + std::string(kMetadataTraceLabel) + |
| "\":" + metadataJSON); |
| stream_is_empty_ = false; |
| } |
| } |
| |
| // The stream to which trace events from different agents should be |
| // serialized, eventually. This is set when tracing is stopped. |
| mojo::ScopedDataPipeProducerHandle stream_; |
| std::string agent_label_; |
| scoped_refptr<base::SequencedTaskRunner> main_task_runner_; |
| base::WeakPtr<Coordinator> coordinator_; |
| |
| std::map<std::string, std::set<std::unique_ptr<Recorder>>> recorders_; |
| std::set<std::string> processed_labels_; |
| |
| // If |streaming_label_| is not empty, it shows the label for which we are |
| // writing chunks to the output stream. |
| std::string streaming_label_; |
| std::unique_ptr<base::DictionaryValue> metadata_; |
| bool stream_is_empty_; |
| bool json_field_name_written_; |
| |
| DISALLOW_COPY_AND_ASSIGN(TraceStreamer); |
| }; |
| |
| Coordinator::Coordinator(AgentRegistry* agent_registry, |
| const base::RepeatingClosure& on_disconnect_callback) |
| : on_disconnect_callback_(std::move(on_disconnect_callback)), |
| binding_(this), |
| task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| // USER_VISIBLE because the task posted from StopAndFlushInternal() is |
| // required to stop tracing from the UI. |
| // TODO(fdoray): Once we have support for dynamic priorities |
| // (https://crbug.com/889029), use BEST_EFFORT initially and increase the |
| // priority only when blocking the tracing UI. |
| backend_task_runner_(base::CreateSequencedTaskRunnerWithTraits( |
| {base::TaskPriority::USER_VISIBLE, base::MayBlock(), |
| base::WithBaseSyncPrimitives()})), |
| agent_registry_(agent_registry), |
| weak_ptr_factory_(this) { |
| DCHECK(agent_registry_); |
| } |
| |
| Coordinator::~Coordinator() { |
| Reset(); |
| } |
| |
| bool Coordinator::IsConnected() { |
| return !!binding_; |
| } |
| |
| void Coordinator::Reset() { |
| if (!stop_and_flush_callback_.is_null()) { |
| base::ResetAndReturn(&stop_and_flush_callback_) |
| .Run(base::Value(base::Value::Type::DICTIONARY)); |
| } |
| if (!request_buffer_usage_callback_.is_null()) |
| base::ResetAndReturn(&request_buffer_usage_callback_).Run(false, 0, 0); |
| |
| if (trace_streamer_) { |
| // We are in the middle of flushing trace data. We need to |
| // 1- Close the stream so that the TraceStreamer does not block on writing |
| // to it. |
| // 2- Delete the TraceStreamer on the backend task runner; it owns recorders |
| // that should be destructed on the backend task runner because they are |
| // bound on the backend task runner. |
| trace_streamer_->CloseStream(); |
| backend_task_runner_->DeleteSoon(FROM_HERE, trace_streamer_.release()); |
| } |
| } |
| |
| void Coordinator::OnClientConnectionError() { |
| Reset(); |
| binding_.Close(); |
| on_disconnect_callback_.Run(); |
| } |
| void Coordinator::BindCoordinatorRequest( |
| mojom::CoordinatorRequest request, |
| const service_manager::BindSourceInfo& source_info) { |
| binding_.Bind(std::move(request)); |
| binding_.set_connection_error_handler(base::BindRepeating( |
| &Coordinator::OnClientConnectionError, base::Unretained(this))); |
| } |
| |
| void Coordinator::StartTracing(const std::string& config) { |
| if ((is_tracing_ && config == config_)) { |
| return; |
| } |
| |
| is_tracing_ = true; |
| config_ = config; |
| parsed_config_ = base::trace_event::TraceConfig(config); |
| agent_registry_->SetAgentInitializationCallback( |
| base::BindRepeating(&Coordinator::SendStartTracingToAgent, |
| weak_ptr_factory_.GetWeakPtr()), |
| false /* call_on_new_agents_only */); |
| } |
| |
| void Coordinator::SendStartTracingToAgent( |
| AgentRegistry::AgentEntry* agent_entry) { |
| if (!parsed_config_.process_filter_config().IsEnabled(agent_entry->pid())) |
| return; |
| agent_entry->agent()->StartTracing(config_, TRACE_TIME_TICKS_NOW()); |
| } |
| |
| void Coordinator::StopAndFlush(mojo::ScopedDataPipeProducerHandle stream, |
| StopAndFlushCallback callback) { |
| StopAndFlushAgent(std::move(stream), "", std::move(callback)); |
| } |
| |
| void Coordinator::StopAndFlushAgent(mojo::ScopedDataPipeProducerHandle stream, |
| const std::string& agent_label, |
| StopAndFlushCallback callback) { |
| if (!is_tracing_) { |
| stream.reset(); |
| std::move(callback).Run(base::Value(base::Value::Type::DICTIONARY)); |
| return; |
| } |
| DCHECK(!trace_streamer_); |
| DCHECK(stream.is_valid()); |
| is_tracing_ = false; |
| |
| trace_streamer_.reset(new Coordinator::TraceStreamer( |
| std::move(stream), agent_label, task_runner_, |
| weak_ptr_factory_.GetWeakPtr())); |
| stop_and_flush_callback_ = std::move(callback); |
| StopAndFlushInternal(); |
| } |
| |
| void Coordinator::StopAndFlushInternal() { |
| size_t num_initialized_agents = |
| agent_registry_->SetAgentInitializationCallback( |
| base::BindRepeating(&Coordinator::SendStopTracingToAgent, |
| weak_ptr_factory_.GetWeakPtr()), |
| false /* call_on_new_agents_only */); |
| if (num_initialized_agents == 0) |
| OnFlushDone(); |
| } |
| |
| void Coordinator::SendStopTracingToAgent( |
| AgentRegistry::AgentEntry* agent_entry) { |
| backend_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&Coordinator::TraceStreamer::CreateAndSendRecorder, |
| trace_streamer_->AsWeakPtr(), agent_entry->label(), |
| agent_entry->type(), agent_entry->AsWeakPtr())); |
| } |
| |
| void Coordinator::SendRecorder( |
| base::WeakPtr<AgentRegistry::AgentEntry> agent_entry, |
| mojom::RecorderPtr recorder) { |
| if (agent_entry) { |
| agent_entry->agent()->StopAndFlush(std::move(recorder)); |
| } else { |
| // Recorders are created and closed on |backend_task_runner_|. |
| backend_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce([](mojom::RecorderPtr ptr) {}, std::move(recorder))); |
| } |
| } |
| |
| void Coordinator::OnFlushDone() { |
| std::move(stop_and_flush_callback_) |
| .Run(std::move(*trace_streamer_->GetMetadata())); |
| backend_task_runner_->DeleteSoon(FROM_HERE, trace_streamer_.release()); |
| is_tracing_ = false; |
| agent_registry_->SetAgentInitializationCallback( |
| base::BindRepeating(&Coordinator::SendStopTracingWithNoOpRecorderToAgent, |
| weak_ptr_factory_.GetWeakPtr()), |
| true /* call_on_new_agents_only */); |
| } |
| |
| void Coordinator::SendStopTracingWithNoOpRecorderToAgent( |
| AgentRegistry::AgentEntry* agent_entry) { |
| mojom::RecorderPtr ptr; |
| // We need to create a message pipe and bind |ptr| to one end of it before |
| // sending |ptr| to the agent; otherwise, the agent will fail as soon as it |
| // tries to invoke a method on |ptr|. We do not have to bind the |
| // implementation end of the message pipe since we are going to ignore the |
| // messages. |
| MakeRequest(&ptr); |
| agent_entry->agent()->StopAndFlush(std::move(ptr)); |
| } |
| |
| void Coordinator::IsTracing(IsTracingCallback callback) { |
| std::move(callback).Run(is_tracing_); |
| } |
| |
| void Coordinator::RequestBufferUsage(RequestBufferUsageCallback callback) { |
| if (!request_buffer_usage_callback_.is_null()) { |
| std::move(callback).Run(false, 0, 0); |
| return; |
| } |
| |
| maximum_trace_buffer_usage_ = 0; |
| approximate_event_count_ = 0; |
| |
| agent_registry_->ForAllAgents([this](AgentRegistry::AgentEntry* agent_entry) { |
| agent_entry->AddDisconnectClosure( |
| &kRequestBufferUsageClosureName, |
| base::BindOnce(&Coordinator::OnRequestBufferStatusResponse, |
| weak_ptr_factory_.GetWeakPtr(), |
| base::Unretained(agent_entry), 0 /* capacity */, |
| 0 /* count */)); |
| agent_entry->agent()->RequestBufferStatus(base::BindRepeating( |
| &Coordinator::OnRequestBufferStatusResponse, |
| weak_ptr_factory_.GetWeakPtr(), base::Unretained(agent_entry))); |
| }); |
| |
| if (!agent_registry_->HasDisconnectClosure(&kRequestBufferUsageClosureName)) { |
| std::move(callback).Run(true, 0.0f, 0); |
| return; |
| } |
| request_buffer_usage_callback_ = std::move(callback); |
| } |
| |
| void Coordinator::OnRequestBufferStatusResponse( |
| AgentRegistry::AgentEntry* agent_entry, |
| uint32_t capacity, |
| uint32_t count) { |
| bool removed = |
| agent_entry->RemoveDisconnectClosure(&kRequestBufferUsageClosureName); |
| DCHECK(removed); |
| |
| if (capacity > 0) { |
| float percent_full = |
| static_cast<float>(static_cast<double>(count) / capacity); |
| maximum_trace_buffer_usage_ = |
| std::max(maximum_trace_buffer_usage_, percent_full); |
| approximate_event_count_ += count; |
| } |
| |
| if (!agent_registry_->HasDisconnectClosure(&kRequestBufferUsageClosureName)) { |
| std::move(request_buffer_usage_callback_) |
| .Run(true, maximum_trace_buffer_usage_, approximate_event_count_); |
| } |
| } |
| |
| } // namespace tracing |