blob: 4d7323ee60514a8f741b7966adfaf1a07f04d8d0 [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/profiling/memlog_receiver_pipe_posix.h"
#include "base/bind.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/threading/thread.h"
#include "chrome/profiling/memlog_stream_receiver.h"
#include "mojo/edk/embedder/platform_channel_utils_posix.h"
#include "mojo/edk/embedder/platform_handle.h"
namespace profiling {
namespace {
// Use a large buffer for our pipe. We don't want the sender to block
// if at all possible since it will slow the app down quite a bit.
//
// TODO(ajwong): Figure out how to size this. Currently the number is cribbed
// from the right size for windows named pipes.
const int kReadBufferSize = 1024 * 64;
} // namespace
MemlogReceiverPipe::MemlogReceiverPipe(base::ScopedPlatformFile file)
: handle_(mojo::edk::PlatformHandle(file.release())),
controller_(FROM_HERE),
read_buffer_(new char[kReadBufferSize]) {
base::MessageLoopForIO::current()->WatchFileDescriptor(
handle_.get().handle, true, base::MessageLoopForIO::WATCH_READ,
&controller_, this);
}
MemlogReceiverPipe::~MemlogReceiverPipe() {}
void MemlogReceiverPipe::StartReadingOnIOThread() {
OnFileCanReadWithoutBlocking(handle_.get().handle);
}
void MemlogReceiverPipe::SetReceiver(
scoped_refptr<base::TaskRunner> task_runner,
scoped_refptr<MemlogStreamReceiver> receiver) {
receiver_task_runner_ = task_runner;
receiver_ = receiver;
}
void MemlogReceiverPipe::OnFileCanReadWithoutBlocking(int fd) {
ssize_t bytes_read = 0;
do {
base::circular_deque<mojo::edk::PlatformHandle> dummy_for_receive;
bytes_read = mojo::edk::PlatformChannelRecvmsg(
handle_.get(), read_buffer_.get(), kReadBufferSize, &dummy_for_receive);
if (bytes_read > 0) {
receiver_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&MemlogStreamReceiver::OnStreamData, receiver_,
std::move(read_buffer_), bytes_read));
read_buffer_.reset(new char[kReadBufferSize]);
return;
} else if (bytes_read == 0) {
// Other end closed the pipe.
if (receiver_) {
controller_.StopWatchingFileDescriptor();
DCHECK(receiver_task_runner_);
receiver_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&MemlogStreamReceiver::OnStreamComplete, receiver_));
}
return;
} else {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
controller_.StopWatchingFileDescriptor();
PLOG(ERROR) << "Problem reading socket.";
}
}
} while (bytes_read > 0);
}
void MemlogReceiverPipe::OnFileCanWriteWithoutBlocking(int fd) {
NOTREACHED();
}
} // namespace profiling