blob: eb7fa353193b0c7a05201148840578d238958ffd [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/spy/spy.h"
#include <vector>
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/strings/string_number_conversions.h"
#include "base/strings/string_split.h"
#include "base/threading/thread.h"
#include "base/threading/worker_pool.h"
#include "base/time/time.h"
#include "mojo/application_manager/application_manager.h"
#include "mojo/public/cpp/system/core.h"
#include "mojo/spy/common.h"
#include "mojo/spy/public/spy.mojom.h"
#include "mojo/spy/spy_server_impl.h"
#include "mojo/spy/websocket_server.h"
#include "url/gurl.h"
namespace {
mojo::WebSocketServer* ws_server = NULL;
const size_t kMessageBufSize = 2 * 1024;
const size_t kHandleBufSize = 64;
const int kDefaultWebSocketPort = 42424;
void CloseHandles(MojoHandle* handles, size_t count) {
for (size_t ix = 0; ix != count; ++count)
MojoClose(handles[ix]);
}
// In charge of processing messages that flow over a
// single message pipe.
class MessageProcessor :
public base::RefCountedThreadSafe<MessageProcessor> {
public:
MessageProcessor(base::MessageLoopProxy* control_loop_proxy)
: last_result_(MOJO_RESULT_OK),
bytes_transfered_(0),
control_loop_proxy_(control_loop_proxy) {
message_count_[0] = 0;
message_count_[1] = 0;
handle_count_[0] = 0;
handle_count_[1] = 0;
}
void Start(mojo::ScopedMessagePipeHandle client,
mojo::ScopedMessagePipeHandle interceptor,
const GURL& url) {
std::vector<mojo::MessagePipeHandle> pipes;
pipes.push_back(client.get());
pipes.push_back(interceptor.get());
std::vector<MojoHandleSignals> handle_signals;
handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
handle_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
scoped_ptr<char[]> mbuf(new char[kMessageBufSize]);
scoped_ptr<MojoHandle[]> hbuf(new MojoHandle[kHandleBufSize]);
// Main processing loop:
// 1- Wait for an endpoint to have a message.
// 2- Read the message
// 3- Log data
// 4- Wait until the opposite port is ready for writting
// 4- Write the message to opposite port.
for (;;) {
int r = WaitMany(pipes, handle_signals, MOJO_DEADLINE_INDEFINITE);
if ((r < 0) || (r > 1)) {
last_result_ = r;
break;
}
uint32_t bytes_read = kMessageBufSize;
uint32_t handles_read = kHandleBufSize;
if (!CheckResult(ReadMessageRaw(pipes[r],
mbuf.get(), &bytes_read,
hbuf.get(), &handles_read,
MOJO_READ_MESSAGE_FLAG_NONE)))
break;
if (!bytes_read && !handles_read)
continue;
if (handles_read) {
handle_count_[r] += handles_read;
// Intercept message pipes which are returned via the ReadMessageRaw
// call
for (uint32_t i = 0; i < handles_read; i++) {
// Hack to determine if a handle is a message pipe.
// TODO(ananta)
// We should have an API which given a handle returns additional
// information about the handle which includes its type, etc.
if (MojoReadMessage(hbuf[i], NULL, NULL, NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE) !=
MOJO_RESULT_INVALID_ARGUMENT) {
mojo::ScopedMessagePipeHandle message_pipe_handle;
message_pipe_handle.reset(mojo::MessagePipeHandle(hbuf[i]));
mojo::ScopedMessagePipeHandle faux_client;
mojo::ScopedMessagePipeHandle interceptor;
CreateMessagePipe(NULL, &faux_client, &interceptor);
base::WorkerPool::PostTask(
FROM_HERE,
base::Bind(&MessageProcessor::Start,
this,
base::Passed(&message_pipe_handle),
base::Passed(&interceptor),
url),
true);
hbuf.get()[i] = faux_client.release().value();
}
}
}
++message_count_[r];
bytes_transfered_ += bytes_read;
LogMessageInfo(mbuf.get(), url);
mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0];
if (!CheckResult(Wait(write_handle,
MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_DEADLINE_INDEFINITE)))
break;
if (!CheckResult(WriteMessageRaw(write_handle,
mbuf.get(), bytes_read,
hbuf.get(), handles_read,
MOJO_WRITE_MESSAGE_FLAG_NONE))) {
// On failure we own the handles. For now just close them.
if (handles_read)
CloseHandles(hbuf.get(), handles_read);
break;
}
}
}
private:
friend class base::RefCountedThreadSafe<MessageProcessor>;
virtual ~MessageProcessor() {}
bool CheckResult(MojoResult mr) {
if (mr == MOJO_RESULT_OK)
return true;
last_result_ = mr;
return false;
}
void LogInvalidMessage(const mojo::MojoMessageHeader& header) {
LOG(ERROR) << "Invalid message: Number of Fields: "
<< header.num_fields
<< " Number of bytes: "
<< header.num_bytes
<< " Flags: "
<< header.flags;
}
// Validates the message as per the mojo spec.
bool IsValidMessage(const mojo::MojoMessageHeader& header) {
if (header.num_fields == 2) {
if (header.num_bytes != sizeof(mojo::MojoMessageHeader)) {
LogInvalidMessage(header);
return false;
}
} else if (header.num_fields == 3) {
if (header.num_bytes != sizeof(mojo::MojoRequestHeader)) {
LogInvalidMessage(header);
}
} else if (header.num_fields > 3) {
if (header.num_bytes < sizeof(mojo::MojoRequestHeader)) {
LogInvalidMessage(header);
return false;
}
}
// These flags should be specified in request or response messages.
if (header.num_fields < 3 &&
((header.flags & mojo::kMessageExpectsResponse) ||
(header.flags & mojo::kMessageIsResponse))) {
LOG(ERROR) << "Invalid request message.";
LogInvalidMessage(header);
return false;
}
// These flags are mutually exclusive.
if ((header.flags & mojo::kMessageExpectsResponse) &&
(header.flags & mojo::kMessageIsResponse)) {
LOG(ERROR) << "Invalid flags combination in request message.";
LogInvalidMessage(header);
return false;
}
return true;
}
void LogMessageInfo(void* data, const GURL& url) {
mojo::MojoMessageData* message_data =
reinterpret_cast<mojo::MojoMessageData*>(data);
if (IsValidMessage(message_data->header)) {
control_loop_proxy_->PostTask(
FROM_HERE,
base::Bind(&mojo::WebSocketServer::LogMessageInfo,
base::Unretained(ws_server),
message_data->header, url, base::Time::Now()));
}
}
MojoResult last_result_;
uint32_t bytes_transfered_;
uint32_t message_count_[2];
uint32_t handle_count_[2];
scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
};
// In charge of intercepting access to the service manager.
class SpyInterceptor : public mojo::ApplicationManager::Interceptor {
public:
explicit SpyInterceptor(scoped_refptr<mojo::SpyServerImpl> spy_server,
base::MessageLoopProxy* control_loop_proxy)
: spy_server_(spy_server),
proxy_(base::MessageLoopProxy::current()),
control_loop_proxy_(control_loop_proxy){
}
private:
virtual mojo::ServiceProviderPtr OnConnectToClient(
const GURL& url, mojo::ServiceProviderPtr real_client) OVERRIDE {
if (!MustIntercept(url))
return real_client.Pass();
// You can get an invalid handle if the app (or service) is
// created by unconventional means, for example the command line.
if (!real_client)
return real_client.Pass();
mojo::ScopedMessagePipeHandle faux_client;
mojo::ScopedMessagePipeHandle interceptor;
CreateMessagePipe(NULL, &faux_client, &interceptor);
scoped_refptr<MessageProcessor> processor =
new MessageProcessor(control_loop_proxy_.get());
mojo::ScopedMessagePipeHandle real_handle = real_client.PassMessagePipe();
base::WorkerPool::PostTask(
FROM_HERE,
base::Bind(&MessageProcessor::Start,
processor,
base::Passed(&real_handle), base::Passed(&interceptor),
url),
true);
mojo::ServiceProviderPtr faux_provider;
faux_provider.Bind(faux_client.Pass());
return faux_provider.Pass();
}
bool MustIntercept(const GURL& url) {
// TODO(cpu): manage who and when to intercept.
proxy_->PostTask(
FROM_HERE,
base::Bind(&mojo::SpyServerImpl::OnIntercept, spy_server_, url));
return true;
}
scoped_refptr<mojo::SpyServerImpl> spy_server_;
scoped_refptr<base::MessageLoopProxy> proxy_;
scoped_refptr<base::MessageLoopProxy> control_loop_proxy_;
};
void StartWebServer(int port, mojo::ScopedMessagePipeHandle pipe) {
// TODO(cpu) figure out lifetime of the server. See Spy() dtor.
ws_server = new mojo::WebSocketServer(port, pipe.Pass());
ws_server->Start();
}
struct SpyOptions {
int websocket_port;
SpyOptions()
: websocket_port(kDefaultWebSocketPort) {
}
};
SpyOptions ProcessOptions(const std::string& options) {
SpyOptions spy_options;
if (options.empty())
return spy_options;
base::StringPairs kv_pairs;
base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs);
base::StringPairs::iterator it = kv_pairs.begin();
for (; it != kv_pairs.end(); ++it) {
if (it->first == "port") {
int port;
if (base::StringToInt(it->second, &port))
spy_options.websocket_port = port;
}
}
return spy_options;
}
} // namespace
namespace mojo {
Spy::Spy(mojo::ApplicationManager* application_manager,
const std::string& options) {
SpyOptions spy_options = ProcessOptions(options);
spy_server_ = new SpyServerImpl();
// Start the tread what will accept commands from the frontend.
control_thread_.reset(new base::Thread("mojo_spy_control_thread"));
base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0);
control_thread_->StartWithOptions(thread_options);
control_thread_->message_loop_proxy()->PostTask(
FROM_HERE, base::Bind(&StartWebServer,
spy_options.websocket_port,
base::Passed(spy_server_->ServerPipe())));
// Start intercepting mojo services.
application_manager->SetInterceptor(
new SpyInterceptor(spy_server_, control_thread_->message_loop_proxy()));
}
Spy::~Spy() {
// TODO(cpu): Do not leak the interceptor. Lifetime between the
// application_manager and the spy is still unclear hence the leak.
}
} // namespace mojo