| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/system/local_message_pipe_endpoint.h" |
| |
| #include <string.h> |
| |
| #include "base/logging.h" |
| #include "mojo/system/dispatcher.h" |
| #include "mojo/system/message_in_transit.h" |
| |
| namespace mojo { |
| namespace system { |
| |
| LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() |
| : is_open_(true), |
| is_peer_open_(true) { |
| } |
| |
| LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { |
| DCHECK(!is_open_); |
| DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open. |
| } |
| |
| MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const { |
| return kTypeLocal; |
| } |
| |
| bool LocalMessagePipeEndpoint::OnPeerClose() { |
| DCHECK(is_open_); |
| DCHECK(is_peer_open_); |
| |
| HandleSignalsState old_state = GetHandleSignalsState(); |
| is_peer_open_ = false; |
| HandleSignalsState new_state = GetHandleSignalsState(); |
| |
| if (!new_state.equals(old_state)) |
| waiter_list_.AwakeWaitersForStateChange(new_state); |
| |
| return true; |
| } |
| |
| void LocalMessagePipeEndpoint::EnqueueMessage( |
| scoped_ptr<MessageInTransit> message) { |
| DCHECK(is_open_); |
| DCHECK(is_peer_open_); |
| |
| bool was_empty = message_queue_.IsEmpty(); |
| message_queue_.AddMessage(message.Pass()); |
| if (was_empty) |
| waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState()); |
| } |
| |
| void LocalMessagePipeEndpoint::Close() { |
| DCHECK(is_open_); |
| is_open_ = false; |
| message_queue_.Clear(); |
| } |
| |
| void LocalMessagePipeEndpoint::CancelAllWaiters() { |
| DCHECK(is_open_); |
| waiter_list_.CancelAllWaiters(); |
| } |
| |
| MojoResult LocalMessagePipeEndpoint::ReadMessage(void* bytes, |
| uint32_t* num_bytes, |
| DispatcherVector* dispatchers, |
| uint32_t* num_dispatchers, |
| MojoReadMessageFlags flags) { |
| DCHECK(is_open_); |
| DCHECK(!dispatchers || dispatchers->empty()); |
| |
| const uint32_t max_bytes = num_bytes ? *num_bytes : 0; |
| const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; |
| |
| if (message_queue_.IsEmpty()) { |
| return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT : |
| MOJO_RESULT_FAILED_PRECONDITION; |
| } |
| |
| // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
| // and release the lock immediately. |
| bool enough_space = true; |
| MessageInTransit* message = message_queue_.PeekMessage(); |
| if (num_bytes) |
| *num_bytes = message->num_bytes(); |
| if (message->num_bytes() <= max_bytes) |
| memcpy(bytes, message->bytes(), message->num_bytes()); |
| else |
| enough_space = false; |
| |
| if (DispatcherVector* queued_dispatchers = message->dispatchers()) { |
| if (num_dispatchers) |
| *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); |
| if (enough_space) { |
| if (queued_dispatchers->empty()) { |
| // Nothing to do. |
| } else if (queued_dispatchers->size() <= max_num_dispatchers) { |
| DCHECK(dispatchers); |
| dispatchers->swap(*queued_dispatchers); |
| } else { |
| enough_space = false; |
| } |
| } |
| } else { |
| if (num_dispatchers) |
| *num_dispatchers = 0; |
| } |
| |
| message = NULL; |
| |
| if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
| message_queue_.DiscardMessage(); |
| |
| // Now it's empty, thus no longer readable. |
| if (message_queue_.IsEmpty()) { |
| // It's currently not possible to wait for non-readability, but we should |
| // do the state change anyway. |
| waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState()); |
| } |
| } |
| |
| if (!enough_space) |
| return MOJO_RESULT_RESOURCE_EXHAUSTED; |
| |
| return MOJO_RESULT_OK; |
| } |
| |
| MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter, |
| MojoHandleSignals signals, |
| uint32_t context) { |
| DCHECK(is_open_); |
| |
| HandleSignalsState state = GetHandleSignalsState(); |
| if (state.satisfies(signals)) |
| return MOJO_RESULT_ALREADY_EXISTS; |
| if (!state.can_satisfy(signals)) |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| |
| waiter_list_.AddWaiter(waiter, signals, context); |
| return MOJO_RESULT_OK; |
| } |
| |
| void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) { |
| DCHECK(is_open_); |
| waiter_list_.RemoveWaiter(waiter); |
| } |
| |
| HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() { |
| HandleSignalsState rv; |
| if (!message_queue_.IsEmpty()) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| } |
| if (is_peer_open_) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| rv.satisfiable_signals |= |
| MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE; |
| } |
| return rv; |
| } |
| |
| } // namespace system |
| } // namespace mojo |