| /* |
| * Copyright (C) 2022 Apple Inc. All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions |
| * are met: |
| * 1. Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright |
| * notice, this list of conditions and the following disclaimer in the |
| * documentation and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, |
| * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS |
| * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
| * THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "config.h" |
| |
| #include "IPCTestUtilities.h" |
| #include "Test.h" |
| #include <wtf/threads/BinarySemaphore.h> |
| |
| |
| namespace TestWebKitAPI { |
| |
| static constexpr Seconds kDefaultWaitForTimeout = 1_s; |
| |
| static constexpr Seconds kWaitForAbsenceTimeout = 300_ms; |
| |
| struct MockTestMessageWithConnection { |
| static constexpr bool isSync = false; |
| static constexpr bool canDispatchOutOfOrder = false; |
| static constexpr bool replyCanDispatchOutOfOrder = false; |
| static constexpr IPC::MessageName name() { return IPC::MessageName::IPCTester_EmptyMessage; } |
| MockTestMessageWithConnection(IPC::Connection::Handle&& handle) |
| : m_handle(WTF::move(handle)) |
| { |
| } |
| |
| template<typename Encoder> |
| void encode(Encoder& encoder) |
| { |
| encoder << WTF::move(m_handle); |
| } |
| |
| private: |
| IPC::Connection::Handle&& m_handle; |
| }; |
| |
| struct MockTestSyncMessage { |
| static constexpr bool isSync = true; |
| static constexpr bool canDispatchOutOfOrder = false; |
| static constexpr bool replyCanDispatchOutOfOrder = false; |
| static constexpr IPC::MessageName name() { return IPC::MessageName::IPCTester_SyncPing; } |
| using ReplyArguments = std::tuple<>; |
| |
| template<typename Encoder> void encode(Encoder&) { } |
| }; |
| |
| struct MockTestSyncMessageWithDataReply { |
| static constexpr bool isSync = true; |
| static constexpr bool canDispatchOutOfOrder = false; |
| static constexpr bool replyCanDispatchOutOfOrder = false; |
| static constexpr IPC::MessageName name() { return IPC::MessageName::IPCTester_SyncPing; } // Needs to be sync. |
| using ReplyArguments = std::tuple<std::span<const uint8_t>>; |
| |
| template<typename Encoder> void encode(Encoder&) { } |
| }; |
| |
| namespace { |
| class SimpleConnectionTest : public testing::Test { |
| public: |
| SimpleConnectionTest() |
| : m_mockServerClient(MockConnectionClient::create()) |
| , m_mockClientClient(MockConnectionClient::create()) |
| { |
| } |
| |
| void SetUp() override |
| { |
| WTF::initializeMainThread(); |
| } |
| protected: |
| Ref<MockConnectionClient> m_mockServerClient; |
| Ref<MockConnectionClient> m_mockClientClient; |
| }; |
| } |
| |
| TEST_F(SimpleConnectionTest, CreateServerConnection) |
| { |
| auto identifiers = IPC::Connection::createConnectionIdentifierPair(); |
| ASSERT_NE(identifiers, std::nullopt); |
| Ref<IPC::Connection> connection = IPC::Connection::createServerConnection(WTF::move(identifiers->server)); |
| connection->invalidate(); |
| } |
| |
| TEST_F(SimpleConnectionTest, CreateClientConnection) |
| { |
| auto identifiers = IPC::Connection::createConnectionIdentifierPair(); |
| ASSERT_NE(identifiers, std::nullopt); |
| Ref<IPC::Connection> connection = IPC::Connection::createClientConnection(IPC::Connection::Identifier { WTF::move(identifiers->client) }); |
| connection->invalidate(); |
| } |
| |
| TEST_F(SimpleConnectionTest, ConnectLocalConnection) |
| { |
| auto identifiers = IPC::Connection::createConnectionIdentifierPair(); |
| ASSERT_NE(identifiers, std::nullopt); |
| Ref<IPC::Connection> serverConnection = IPC::Connection::createServerConnection(WTF::move(identifiers->server)); |
| Ref<IPC::Connection> clientConnection = IPC::Connection::createClientConnection(IPC::Connection::Identifier { WTF::move(identifiers->client) }); |
| serverConnection->open(m_mockServerClient); |
| clientConnection->open(m_mockClientClient); |
| serverConnection->invalidate(); |
| clientConnection->invalidate(); |
| } |
| |
| TEST_F(SimpleConnectionTest, ClearOutgoingMessages) |
| { |
| // Create a connection, but leave the client |
| // handle pending. |
| auto firstIdentifiers = IPC::Connection::createConnectionIdentifierPair(); |
| ASSERT_NE(firstIdentifiers, std::nullopt); |
| Ref<IPC::Connection> firstServerConnection = IPC::Connection::createServerConnection(WTF::move(firstIdentifiers->server)); |
| firstServerConnection->open(m_mockServerClient); |
| |
| // Create a second connection, and send the client |
| // handle in a message over the first connection (such |
| // that it will be stored as a pending message). |
| auto secondIdentifiers = IPC::Connection::createConnectionIdentifierPair(); |
| ASSERT_NE(secondIdentifiers, std::nullopt); |
| Ref<IPC::Connection> secondServerConnection = IPC::Connection::createServerConnection(WTF::move(secondIdentifiers->server)); |
| Ref mockSecondServerClient = MockConnectionClient::create(); |
| secondServerConnection->open(mockSecondServerClient); |
| |
| firstServerConnection->send(MockTestMessageWithConnection { WTF::move(secondIdentifiers->client) }, 0); |
| |
| // Invalidate the first connection's client handle, |
| // which should clear pending messages and also invalidate |
| // the second connection. |
| firstIdentifiers->client = IPC::Connection::Handle(); |
| |
| // Try a sync send over the second connection, which should |
| // fail immediately if the client handle has been released. |
| secondServerConnection->sendSync(MockTestSyncMessage(), 0, IPC::Timeout::infinity(), IPC::SendSyncOption::UseFullySynchronousModeForTesting); |
| firstServerConnection->invalidate(); |
| secondServerConnection->invalidate(); |
| } |
| |
| class ConnectionTest : public testing::Test, protected ConnectionTestBase { |
| public: |
| void SetUp() override |
| { |
| setupBase(); |
| } |
| void TearDown() override |
| { |
| teardownBase(); |
| } |
| auto openServer() { return openA(); } |
| auto openClient() { return openB(); } |
| auto* server() { return a(); } |
| auto* client() { return b(); } |
| auto& serverClient() { return aClient(); } |
| auto& clientClient() { return bClient(); } |
| void deleteServer() { deleteA(); } |
| void deleteClient() { deleteB(); } |
| }; |
| |
| // Explicit version of AInvalidateDeliversBDidClose that was flaky on Cocoa in scenario to |
| // 1. Both connections open |
| // 2. Client sends the initialization message with the mach port to use as server's send port |
| // 3. Client is cancelled and the mach port destroyed |
| // 4. Server receives the initialization message |
| TEST_F(ConnectionTest, ClientInvalidateBeforeServerHandlesInitializationDeliversDidClose) |
| { |
| ASSERT_TRUE(openServer()); |
| // Simulation for scheduling for step 4: insert a wait after receive source has been |
| // resumed. |
| BinarySemaphore semaphore; |
| bool captureGuard = false; |
| server()->dispatchOnReceiveQueueForTesting([&semaphore, &captureGuard] { |
| semaphore.wait(); |
| captureGuard = true; |
| }); |
| ASSERT_TRUE(openClient()); |
| Util::runFor(0.2_s); // Simulation for step 2. Give client time to send the initialization message. |
| client()->invalidate(); // Step 3. |
| semaphore.signal(); // Step 4. |
| ASSERT_FALSE(serverClient().gotDidClose()); |
| |
| // Test for the contract that did not work: invalidated on client causes didClose on server. |
| EXPECT_TRUE(serverClient().waitForDidClose(kDefaultWaitForTimeout)); |
| |
| // End of test. Ensure clean up for buggy cases. |
| EXPECT_FALSE(clientClient().gotDidClose()); |
| Util::run(&captureGuard); |
| } |
| |
| TEST_P(ConnectionTestABBA, SendLocalMessage) |
| { |
| ASSERT_TRUE(openBoth()); |
| |
| for (uint64_t i = 0u; i < 55u; ++i) |
| a()->send(MockTestMessage1 { }, i); |
| for (uint64_t i = 100u; i < 160u; ++i) |
| b()->send(MockTestMessage1 { }, i); |
| for (uint64_t i = 0u; i < 55u; ++i) { |
| auto message = bClient().waitForMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, i); |
| } |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| auto message = aClient().waitForMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()) << " i:" << i; |
| EXPECT_EQ(message.destinationID, i) << " i:" << i; |
| } |
| } |
| |
| TEST_P(ConnectionTestABBA, AInvalidateDeliversBDidClose) |
| { |
| ASSERT_TRUE(openBoth()); |
| a()->invalidate(); |
| ASSERT_FALSE(bClient().gotDidClose()); |
| EXPECT_TRUE(bClient().waitForDidClose(kDefaultWaitForTimeout)); |
| EXPECT_FALSE(aClient().gotDidClose()); |
| } |
| |
| TEST_P(ConnectionTestABBA, AAndBInvalidateDoesNotDeliverDidClose) |
| { |
| ASSERT_TRUE(openBoth()); |
| a()->invalidate(); |
| b()->invalidate(); |
| EXPECT_FALSE(aClient().waitForDidClose(kWaitForAbsenceTimeout)); |
| EXPECT_FALSE(bClient().waitForDidClose(kWaitForAbsenceTimeout)); |
| } |
| |
| TEST_P(ConnectionTestABBA, UnopenedAAndInvalidateDoesNotDeliverBDidClose) |
| { |
| ASSERT_TRUE(openB()); |
| a()->invalidate(); |
| deleteA(); |
| EXPECT_FALSE(bClient().waitForDidClose(kWaitForAbsenceTimeout)); |
| } |
| |
| TEST_P(ConnectionTestABBA, IncomingMessageThrottlingWorks) |
| { |
| const size_t testedCount = 2300; |
| a()->enableIncomingMessagesThrottling(); |
| ASSERT_TRUE(openBoth()); |
| size_t otherRunLoopTasksRun = 0u; |
| |
| for (uint64_t i = 0u; i < testedCount; ++i) |
| b()->send(MockTestMessage1 { }, i); |
| while (a()->pendingMessageCountForTesting() < testedCount) |
| sleep(0.1_s); |
| |
| Vector<MessageInfo> messages; |
| std::array<size_t, 18> messageCounts { 600, 300, 200, 150, 120, 100, 85, 75, 66, 60, 60, 66, 75, 85, 100, 120, 37, 1 }; |
| for (size_t i = 0; i < messageCounts.size(); ++i) { |
| SCOPED_TRACE(i); |
| RunLoop::currentSingleton().dispatch([&otherRunLoopTasksRun] { |
| otherRunLoopTasksRun++; |
| }); |
| Util::spinRunLoop(); |
| EXPECT_EQ(otherRunLoopTasksRun, i + 1u); |
| auto messages1 = aClient().takeMessages(); |
| EXPECT_EQ(messageCounts[i], messages1.size()); |
| messages.appendVector(WTF::move(messages1)); |
| } |
| EXPECT_EQ(testedCount, messages.size()); |
| for (uint64_t i = 0u; i < messages.size(); ++i) { |
| auto& message = messages[i]; |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, i); |
| } |
| } |
| |
| // Tests the case where a throttled connection dispatches a message that |
| // spins the run loop in the message handler. A naive throttled connection |
| // would only schedule one work dispatch function, which would then fail |
| // in this scenario. Thus test the non-naive implementation where the throttled |
| // connection schedules another dispatch function that ensures that nested |
| // runloops will dispatch the throttled connection messages. |
| TEST_P(ConnectionTestABBA, IncomingMessageThrottlingNestedRunLoopDispatches) |
| { |
| const size_t testedCount = 2300; |
| a()->enableIncomingMessagesThrottling(); |
| ASSERT_TRUE(openBoth()); |
| size_t otherRunLoopTasksRun = 0u; |
| |
| for (uint64_t i = 0u; i < testedCount; ++i) |
| b()->send(MockTestMessage1 { }, i); |
| while (a()->pendingMessageCountForTesting() < testedCount) |
| sleep(0.1_s); |
| |
| // Two messages invoke nested run loop. The handler skips total 4 messages for the |
| // proofs of logic that the test was ran. |
| bool isProcessing = false; |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto destinationID = decoder.destinationID(); |
| if (destinationID == 888 || destinationID == 1299) { |
| isProcessing = true; |
| Util::spinRunLoop(); |
| isProcessing = false; |
| return true; // Skiping the message is the proof that the message was processed. |
| } |
| if (destinationID == 889 || destinationID == 1300) { |
| EXPECT_TRUE(isProcessing); // Passing the EXPECT is the proof that we ran the message in a nested event loop. |
| return true; // Skipping the message is the proof that above EXPECT was ran. |
| } |
| return false; |
| }); |
| |
| Vector<MessageInfo> messages; |
| std::array<size_t, 16> messageCounts { 600, 498, 150, 218, 85, 75, 66, 60, 60, 66, 75, 85, 100, 120, 37, 1 }; |
| for (size_t i = 0; i < messageCounts.size(); ++i) { |
| SCOPED_TRACE(i); |
| RunLoop::currentSingleton().dispatch([&otherRunLoopTasksRun] { |
| otherRunLoopTasksRun++; |
| }); |
| Util::spinRunLoop(); |
| EXPECT_EQ(otherRunLoopTasksRun, i + 1u); |
| auto messages1 = aClient().takeMessages(); |
| EXPECT_EQ(messageCounts[i], messages1.size()); |
| messages.appendVector(WTF::move(messages1)); |
| } |
| EXPECT_EQ(testedCount - 4, messages.size()); |
| for (uint64_t i = 0u, j = 0u; i < messages.size(); ++i, ++j) { |
| if (j == 888 || j == 1299) |
| j += 2; |
| auto& message = messages[i]; |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, j); |
| } |
| } |
| |
| // Sends a connection that is already closed (invalidated). We still expect to receive the connection |
| // and receive didClose on the connection. |
| TEST_P(ConnectionTestABBA, ReceiveAlreadyInvalidatedClientNoAssert) |
| { |
| ASSERT_TRUE(openBoth()); |
| constexpr size_t iterations = 800; |
| HashSet<uint64_t> done; |
| struct { |
| RefPtr<IPC::Connection> clientConnection; |
| Ref<MockConnectionClient> mockClientClient { MockConnectionClient::create() }; |
| } connections[iterations]; |
| |
| bClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto i = decoder.destinationID(); |
| auto handle = decoder.decode<IPC::Connection::Handle>(); |
| if (!handle) |
| return false; |
| Ref<IPC::Connection> clientConnection = IPC::Connection::createClientConnection(IPC::Connection::Identifier { WTF::move(*handle) }); |
| clientConnection->open(connections[i].mockClientClient); |
| connections[i].clientConnection = WTF::move(clientConnection); |
| // The connection starts as not closed in order for the system to deliver didClose(). |
| EXPECT_FALSE(connections[i].mockClientClient->gotDidClose()) << i; |
| done.add(i); |
| return true; |
| }); |
| for (uint64_t i = 1; i < iterations; ++i) { |
| auto identifiers = IPC::Connection::createConnectionIdentifierPair(); |
| ASSERT_NE(identifiers, std::nullopt); |
| Ref<IPC::Connection> serverConnection = IPC::Connection::createServerConnection(WTF::move(identifiers->server)); |
| Ref mockServerClient = MockConnectionClient::create(); |
| serverConnection->open(mockServerClient); |
| a()->send(MockTestMessageWithConnection { WTF::move(identifiers->client) }, i); |
| serverConnection->invalidate(); |
| } |
| while (done.size() < iterations - 1) |
| RunLoop::currentSingleton().cycle(); |
| |
| for (uint64_t i = 1; i < iterations; ++i) { |
| auto& connection = connections[i]; |
| EXPECT_TRUE(connection.mockClientClient->gotDidClose() || connection.mockClientClient->waitForDidClose(kDefaultWaitForTimeout)) << i; |
| connection.clientConnection->invalidate(); |
| } |
| } |
| |
| // DISABLED: currently cannot test that wait on unopened connection causes InvalidConnection, |
| // since that will crash. The semantics are that isValid() == true for unopened connection, |
| // which doesn't make much sense. |
| TEST_P(ConnectionTestABBA, DISABLED_UnopenedWaitForAndDispatchImmediatelyIsInvalidConnection) |
| { |
| IPC::Error error = a()->waitForAndDispatchImmediately<MockTestMessage1>(0, kWaitForAbsenceTimeout); |
| EXPECT_EQ(IPC::Error::InvalidConnection, error); |
| } |
| |
| TEST_P(ConnectionTestABBA, InvalidatedWaitForAndDispatchImmediatelyIsInvalidConnection) |
| { |
| ASSERT_TRUE(openA()); |
| a()->invalidate(); |
| IPC::Error error = a()->waitForAndDispatchImmediately<MockTestMessage1>(0, kWaitForAbsenceTimeout); |
| EXPECT_EQ(IPC::Error::InvalidConnection, error); |
| } |
| |
| TEST_P(ConnectionTestABBA, UnsentWaitForAndDispatchImmediatelyIsTimeout) |
| { |
| ASSERT_TRUE(openA()); |
| IPC::Error error = a()->waitForAndDispatchImmediately<MockTestMessage1>(0, kWaitForAbsenceTimeout); |
| EXPECT_EQ(IPC::Error::Timeout, error); |
| } |
| |
| template<typename C> |
| static void dispatchSync(RunLoop& runLoop, C&& function) |
| { |
| BinarySemaphore semaphore; |
| runLoop.dispatch([&] () mutable { |
| function(); |
| semaphore.signal(); |
| }); |
| semaphore.wait(); |
| } |
| |
| template<typename C> |
| static void dispatchAndWait(RunLoop& runLoop, C&& function) |
| { |
| std::atomic<bool> done = false; |
| runLoop.dispatch([&] () mutable { |
| function(); |
| done = true; |
| }); |
| while (!done) |
| RunLoop::currentSingleton().cycle(); |
| } |
| |
| class ConnectionRunLoopTest : public ConnectionTestABBA { |
| public: |
| void TearDown() override |
| { |
| // By convention the b() connection is the one that gets openend on various runloops. |
| // The API contract of Connection is that invalidate() is called on the dispatcher |
| // that called open(). The test should invalidate on that runloop. |
| ASSERT(!b() || !b()->client()); |
| |
| ConnectionTestABBA::TearDown(); |
| // Remember to call localReferenceBarrier() in test scope. |
| // Otherwise run loops might be executing code that uses variables |
| // that went out of scope. |
| EXPECT_EQ(m_runLoops.size(), 0u); |
| } |
| |
| Ref<RunLoop> createRunLoop(ASCIILiteral name) |
| { |
| auto runLoop = RunLoop::create(name, ThreadType::Unknown); |
| m_runLoops.append(runLoop); |
| return runLoop; |
| } |
| |
| void localReferenceBarrier() |
| { |
| // Since we need to send sync to create a barrier to run loops, |
| // we might as well destroy the run loops in this function. |
| Vector<Ref<Thread>> threadsToWait; |
| // FIXME: Cannot wait for RunLoop to really exit. |
| for (auto& runLoop : std::exchange(m_runLoops, { })) { |
| dispatchSync(runLoop, [&] { |
| threadsToWait.append(Thread::currentSingleton()); |
| RunLoop::currentSingleton().stop(); |
| }); |
| } |
| while (true) { |
| sleep(0.1_s); |
| for (auto& thread : threadsToWait) { |
| if (Thread::allThreads().contains(thread.get())) |
| continue; |
| } |
| break; |
| } |
| } |
| |
| protected: |
| Vector<Ref<RunLoop>> m_runLoops; |
| }; |
| |
| #define LOCAL_STRINGIFY(x) #x |
| #define RUN_LOOP_NAME "RunLoop at ConnectionTests.cpp:" LOCAL_STRINGIFY(__LINE__) ""_s |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopOpen) |
| { |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| BinarySemaphore semaphore; |
| runLoop->dispatch([&] { |
| ASSERT_TRUE(openB()); |
| bClient().waitForDidClose(kDefaultWaitForTimeout); |
| semaphore.signal(); |
| }); |
| a()->invalidate(); |
| semaphore.wait(); |
| |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopInvalidate) |
| { |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| runLoop->dispatch([&] { |
| ASSERT_TRUE(openB()); |
| b()->invalidate(); |
| }); |
| aClient().waitForDidClose(kDefaultWaitForTimeout); |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopSend) |
| { |
| ASSERT_TRUE(openA()); |
| for (uint64_t i = 0u; i < 55u; ++i) |
| a()->send(MockTestMessage1 { }, i); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| BinarySemaphore semaphore; |
| runLoop->dispatch([&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 100u; i < 160u; ++i) |
| b()->send(MockTestMessage1 { }, i); |
| for (uint64_t i = 0u; i < 55u; ++i) { |
| auto message = bClient().waitForMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, i); |
| } |
| auto flushResult = b()->flushSentMessages(kDefaultWaitForTimeout); |
| EXPECT_EQ(flushResult, IPC::Error::NoError); |
| b()->invalidate(); |
| }); |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| auto message = aClient().waitForMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, i); |
| } |
| semaphore.signal(); |
| |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopSendAsync) |
| { |
| ASSERT_TRUE(openA()); |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| a()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| HashSet<uint64_t> replies; |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| dispatchAndWait(runLoop, [&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| b()->sendWithAsyncReply(MockTestMessageWithAsyncReply1 { }, [&, j = i] (uint64_t value) { |
| if (!value) |
| WTFLogAlways("GOT: %llu", j); |
| EXPECT_GE(value, 100u); |
| replies.add(value); |
| }, i); |
| } |
| while (replies.size() < 60u) |
| RunLoop::currentSingleton().cycle(); |
| b()->invalidate(); |
| }); |
| |
| for (uint64_t i = 100u; i < 160u; ++i) |
| EXPECT_TRUE(replies.contains(i)); |
| localReferenceBarrier(); |
| } |
| |
| // Test for ensuring that async messages are not reordered when sync message is sent with SendSyncOption::MaintainOrderingWithAsyncMessages. |
| // At the time message sequence of "A, Sync, B" would see "B, A, Sync" because A would be moved to sync message specific list, but |
| // then the scheduled async message dispatch would dispatch B. |
| // The order is tested by increasing the destination id. |
| TEST_P(ConnectionRunLoopTest, SendSyncMaintainOrderingWithAsyncMessagesOrder) |
| { |
| ASSERT_TRUE(openA()); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| runLoop->dispatch([&] { |
| // Sync message handler only to respond with "message was handled". |
| bClient().setSyncMessageHandler([&](IPC::Connection& connection, IPC::Decoder& decoder, UniqueRef<IPC::Encoder>& encoder) -> bool { |
| bClient().addMessage(decoder); |
| connection.sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| ASSERT_TRUE(openB()); |
| }); |
| for (uint64_t i = 0u; i < 300u; i += 3) { |
| a()->send(MockTestMessage1 { }, i); |
| auto result = a()->sendSync(MockTestSyncMessage(), i + 1, kDefaultWaitForTimeout, { IPC::SendSyncOption::MaintainOrderingWithAsyncMessages }); |
| EXPECT_TRUE(result.succeeded()); |
| a()->send(MockTestMessage1 { }, i + 2); |
| } |
| auto result = a()->sendSync(MockTestSyncMessage(), 300, kDefaultWaitForTimeout, { IPC::SendSyncOption::MaintainOrderingWithAsyncMessages }); |
| // The last message sent is sync, so when we are here we know all messages have been added to the message list. |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| auto messages = bClient().takeMessages(); |
| ASSERT_EQ(messages.size(), 301u); |
| for (uint64_t i = 0u; i < 300u; i += 3) { |
| EXPECT_EQ(messages[i].messageName, MockTestMessage1::name()) << i; |
| EXPECT_EQ(messages[i].destinationID, i); |
| EXPECT_EQ(messages[i + 1].messageName, MockTestSyncMessage::name()) << (i + 1); |
| EXPECT_EQ(messages[i + 1].destinationID, i + 1u); |
| EXPECT_EQ(messages[i + 2].messageName, MockTestMessage1::name()) << (i + 2); |
| EXPECT_EQ(messages[i + 2].destinationID, i + 2u); |
| } |
| EXPECT_EQ(messages[300].messageName, MockTestSyncMessage::name()); |
| EXPECT_EQ(messages[300].destinationID, 300u); |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, SendSyncMaintainOrderingWithAsyncMessagesOrderMultipleSendingThreads) |
| { |
| ASSERT_TRUE(openA()); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| runLoop->dispatch([&] { |
| // Sync message handler only to respond with "message was handled". |
| bClient().setSyncMessageHandler([&](IPC::Connection& connection, IPC::Decoder& decoder, UniqueRef<IPC::Encoder>& encoder) -> bool { |
| bClient().addMessage(decoder); |
| connection.sendSyncReply(WTF::move(encoder)); |
| sleep(0.1_s); |
| return true; |
| }); |
| bClient().setAsyncMessageHandler([](IPC::Connection&, IPC::Decoder&) -> bool { |
| sleep(0.1_s); |
| return false; |
| }); |
| ASSERT_TRUE(openB()); |
| sleep(0.1_s); |
| }); |
| |
| auto sendingRunLoop = createRunLoop(RUN_LOOP_NAME); |
| sendingRunLoop->dispatch([&] { |
| for (uint64_t i = 0u; i < 30u; i++) { |
| a()->send(MockTestMessage1 { }, i); |
| sleep(0.1_s); |
| } |
| }); |
| |
| for (uint64_t i = 0u; i < 30u; i++) { |
| auto result = a()->sendSync(MockTestSyncMessage(), i, kDefaultWaitForTimeout, { IPC::SendSyncOption::MaintainOrderingWithAsyncMessages }); |
| EXPECT_TRUE(result.succeeded()); |
| sleep(0.1_s); |
| } |
| |
| dispatchAndWait(sendingRunLoop, [] { |
| }); |
| |
| auto result = a()->sendSync(MockTestSyncMessage(), 60, kDefaultWaitForTimeout, { IPC::SendSyncOption::MaintainOrderingWithAsyncMessages }); |
| // The last message sent is sync, so when we are here we know all messages have been added to the message list. |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| auto messages = bClient().takeMessages(); |
| ASSERT_EQ(messages.size(), 61u); |
| std::optional<uint64_t> seenAsyncMessage; |
| std::optional<uint64_t> seenSyncMessage; |
| for (uint64_t i = 0u; i < 60; i++) { |
| if (messages[i].messageName == MockTestMessage1::name()) { |
| if (seenAsyncMessage) |
| EXPECT_EQ(messages[i].destinationID, *seenAsyncMessage + 1); |
| else |
| EXPECT_EQ(messages[i].destinationID, 0u); |
| seenAsyncMessage = messages[i].destinationID; |
| } else { |
| EXPECT_EQ(messages[i].messageName, MockTestSyncMessage::name()); |
| if (seenSyncMessage) |
| EXPECT_EQ(messages[i].destinationID, *seenSyncMessage + 1); |
| else |
| EXPECT_EQ(messages[i].destinationID, 0u); |
| seenSyncMessage = messages[i].destinationID; |
| } |
| } |
| EXPECT_EQ(messages[60].messageName, MockTestSyncMessage::name()); |
| EXPECT_EQ(messages[60].destinationID, 60u); |
| localReferenceBarrier(); |
| } |
| |
| // Test for ensuring that async messages are not reordered when sync message is sent with SendSyncOption::MaintainOrderingWithAsyncMessages |
| // and the async messages are waited with waitForAndDispatchImmediately. |
| TEST_P(ConnectionRunLoopTest, SendSyncMaintainOrderingWithAsyncMessagesWaitForOrderMultipleSendingThreads) |
| { |
| ASSERT_TRUE(openA()); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| runLoop->dispatch([&] { |
| // Sync message handler only to respond with "message was handled". |
| bClient().setSyncMessageHandler([&](IPC::Connection& connection, IPC::Decoder& decoder, UniqueRef<IPC::Encoder>& encoder) -> bool { |
| bClient().addMessage(decoder); |
| connection.sendSyncReply(WTF::move(encoder)); |
| sleep(0.1_s); |
| return true; |
| }); |
| ASSERT_TRUE(openB()); |
| |
| // The messages the test sends are never dispatched from run loop, rather from this wait. |
| // Since we never drop to run loop after open, we catch even the first message. |
| for (uint64_t i = 0u; i < 30u; i += 3) { |
| auto result = b()->waitForAndDispatchImmediately<MockTestMessage1>(i, kDefaultWaitForTimeout); |
| EXPECT_TRUE(result == IPC::Error::NoError); |
| } |
| }); |
| |
| auto sendingRunLoop = createRunLoop(RUN_LOOP_NAME); |
| sendingRunLoop->dispatch([&] { |
| for (uint64_t i = 0u; i < 30u; i++) { |
| a()->send(MockTestMessage1 { }, i); |
| sleep(0.1_s); |
| } |
| }); |
| |
| for (uint64_t i = 0u; i < 30u; i++) { |
| auto result = a()->sendSync(MockTestSyncMessage(), i, kDefaultWaitForTimeout, { IPC::SendSyncOption::MaintainOrderingWithAsyncMessages }); |
| EXPECT_TRUE(result.succeeded()); |
| sleep(0.1_s); |
| } |
| |
| dispatchAndWait(sendingRunLoop, [] { |
| }); |
| |
| auto result = a()->sendSync(MockTestSyncMessage(), 60, kDefaultWaitForTimeout, { IPC::SendSyncOption::MaintainOrderingWithAsyncMessages }); |
| // The last message sent is sync, so when we are here we know all messages have been added to the message list. |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| auto messages = bClient().takeMessages(); |
| ASSERT_EQ(messages.size(), 61u); |
| std::optional<uint64_t> seenAsyncMessage; |
| std::optional<uint64_t> seenSyncMessage; |
| for (uint64_t i = 0u; i < 60; i++) { |
| if (messages[i].messageName == MockTestMessage1::name()) { |
| if (seenAsyncMessage) |
| EXPECT_EQ(messages[i].destinationID, *seenAsyncMessage + 1); |
| else |
| EXPECT_EQ(messages[i].destinationID, 0u); |
| seenAsyncMessage = messages[i].destinationID; |
| } else { |
| EXPECT_EQ(messages[i].messageName, MockTestSyncMessage::name()); |
| if (seenSyncMessage) |
| EXPECT_EQ(messages[i].destinationID, *seenSyncMessage + 1); |
| else |
| EXPECT_EQ(messages[i].destinationID, 0u); |
| seenSyncMessage = messages[i].destinationID; |
| } |
| } |
| EXPECT_EQ(messages[60].messageName, MockTestSyncMessage::name()); |
| EXPECT_EQ(messages[60].destinationID, 60u); |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopSendAsyncOnTarget) |
| { |
| HashSet<uint64_t> replies; |
| { |
| AutoWorkQueue awq; |
| |
| ASSERT_TRUE(openA()); |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| a()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| dispatchAndWait(runLoop, [&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| b()->sendWithAsyncReplyOnDispatcher(MockTestMessageWithAsyncReply1 { }, awq.queue(), [&, j = i, queue = awq.queue()] (uint64_t value) { |
| assertIsCurrent(queue); |
| if (!value) |
| WTFLogAlways("GOT: %llu", j); |
| EXPECT_GE(value, 100u); |
| replies.add(value); |
| }, i); |
| } |
| while (replies.size() < 60u) |
| RunLoop::currentSingleton().cycle(); |
| b()->invalidate(); |
| }); |
| awq.queue()->beginShutdown(); |
| } |
| for (uint64_t i = 100u; i < 160u; ++i) |
| EXPECT_TRUE(replies.contains(i)); |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopSendWithPromisedReply) |
| { |
| ASSERT_TRUE(openA()); |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| a()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| HashSet<uint64_t> replies; |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| dispatchAndWait(runLoop, [&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| b()->sendWithPromisedReply(MockTestMessageWithAsyncReply1 { }, i)->then(runLoop, |
| [&, j = i] (uint64_t value) { |
| if (!value) |
| WTFLogAlways("GOT: %llu", j); |
| EXPECT_GE(value, 100u); |
| replies.add(value); |
| }, |
| [] { |
| // There should never be a connection failure in this case. |
| EXPECT_TRUE(false); |
| }); |
| } |
| while (replies.size() < 60u) |
| RunLoop::currentSingleton().cycle(); |
| b()->invalidate(); |
| }); |
| |
| for (uint64_t i = 100u; i < 160u; ++i) |
| EXPECT_TRUE(replies.contains(i)); |
| localReferenceBarrier(); |
| } |
| |
| struct PromiseConverter { |
| static auto convertError(IPC::Error) |
| { |
| return makeUnexpected(String { "2"_s }); |
| } |
| }; |
| |
| TEST_P(ConnectionRunLoopTest, SendWithConvertedPromisedReply) |
| { |
| ASSERT_TRUE(openA()); |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| a()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| std::atomic<bool> isFinished = false; |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| dispatchAndWait(runLoop, [&] { |
| ASSERT_TRUE(openB()); |
| b()->sendWithPromisedReply<PromiseConverter>(MockTestMessageWithAsyncReply1 { }, 1)->then(runLoop, [&] (uint64_t value) { |
| EXPECT_EQ(value, 1u); |
| isFinished = true; |
| }, [&] (String&& error) { |
| EXPECT_EQ(error, "2"_s); |
| isFinished = true; |
| }); |
| while (!isFinished) |
| RunLoop::currentSingleton().cycle(); |
| b()->invalidate(); |
| }); |
| |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopSendWithPromisedReplyOnMixAndMatchDispatcher) |
| { |
| HashSet<uint64_t> replies; |
| |
| { |
| AutoWorkQueue awq; |
| ASSERT_TRUE(openA()); |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| a()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| dispatchAndWait(runLoop, [&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| b()->sendWithPromisedReply(MockTestMessageWithAsyncReply1 { }, i)->whenSettled(runLoop, [&, j = i] (auto&& result) { |
| EXPECT_TRUE(result); |
| auto value = *result; |
| if (!value) |
| WTFLogAlways("GOT: %llu", j); |
| EXPECT_GE(value, 100u); |
| replies.add(value); |
| }); |
| } |
| while (replies.size() < 60u) |
| RunLoop::currentSingleton().cycle(); |
| b()->invalidate(); |
| }); |
| awq.queue()->beginShutdown(); |
| } |
| for (uint64_t i = 100u; i < 160u; ++i) |
| EXPECT_TRUE(replies.contains(i)); |
| localReferenceBarrier(); |
| } |
| |
| // Tests that all sent messages with async replies are received, even if sender invalidates |
| // without synchronizing with the receiver. The async reply callbacks are always run, either |
| // with the reply or the cancel value and always on the provided dispatcher |
| TEST_P(ConnectionRunLoopTest, SendAsyncAndInvalidateOnDispatcher) |
| { |
| HashSet<uint64_t> messages; |
| HashSet<uint64_t> replies; |
| constexpr uint64_t messageCount = 1536u; |
| |
| { |
| AutoWorkQueue awq; |
| |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| BinarySemaphore semaphore; |
| runLoop->dispatch([&] { |
| bClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| b()->sendSyncReply(WTF::move(encoder)); |
| messages.add(decoder.destinationID()); |
| return true; |
| }); |
| ASSERT_TRUE(openB()); |
| while (messages.size() < messageCount - 1) |
| RunLoop::currentSingleton().cycle(); |
| semaphore.signal(); |
| }); |
| for (uint64_t i = 1u; i < messageCount; ++i) { |
| a()->sendWithAsyncReplyOnDispatcher(MockTestMessageWithAsyncReply1 { }, awq.queue(), [&, j = i] (uint64_t) { |
| assertIsCurrent(awq.queue()); |
| // The reply value might be the reply value (destinationID) or zero in case the |
| // reply was resolved as invalid. Either of these are expected valid results. |
| // Use the `j` to prove that reply callback was run as expected. |
| replies.add(j); |
| }, i); |
| } |
| auto flushResult = a()->flushSentMessages(kDefaultWaitForTimeout); |
| EXPECT_EQ(flushResult, IPC::Error::NoError); |
| a()->invalidate(); |
| semaphore.wait(); |
| |
| // Sending a message on an invalidated Connection should still deliver the message on the dispatcher. |
| a()->sendWithAsyncReplyOnDispatcher(MockTestMessageWithAsyncReply1 { }, awq.queue(), [queue = awq.queue()] (uint64_t) { |
| assertIsCurrent(queue); |
| queue->beginShutdown(); |
| }, 0); |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| } |
| |
| for (uint64_t i = 1u; i < messageCount; ++i) { |
| EXPECT_TRUE(replies.contains(i)) << i; |
| EXPECT_TRUE(messages.contains(i)) << i; |
| } |
| localReferenceBarrier(); |
| } |
| |
| // Tests that all sent messages are received, even if sender invalidates |
| // without synchronizing with the receiver. |
| // FIXME when rdar://159131152 is resolved |
| #if PLATFORM(IOS) |
| TEST_P(ConnectionRunLoopTest, DISABLED_SendAndInvalidate) |
| #else |
| TEST_P(ConnectionRunLoopTest, SendAndInvalidate) |
| #endif |
| { |
| constexpr uint64_t messageCount = 1777; |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| BinarySemaphore semaphore; |
| runLoop->dispatch([&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 1u; i < messageCount; ++i) { |
| auto message = bClient().waitForMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, i); |
| } |
| semaphore.signal(); |
| }); |
| for (uint64_t i = 1u; i < messageCount; ++i) |
| a()->send(MockTestMessage1 { }, i); |
| auto flushResult = a()->flushSentMessages(kDefaultWaitForTimeout); |
| EXPECT_EQ(flushResult, IPC::Error::NoError); |
| a()->invalidate(); |
| semaphore.wait(); |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| // Tests that all sent messages with async replies are received, even if sender invalidates |
| // without synchronizing with the receiver. The async reply callbacks are always run, either |
| // with the reply or the cancel value. |
| TEST_P(ConnectionRunLoopTest, SendAsyncAndInvalidate) |
| { |
| constexpr uint64_t messageCount = 1536u; |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| HashSet<uint64_t> messages; |
| HashSet<uint64_t> replies; |
| BinarySemaphore semaphore; |
| runLoop->dispatch([&] { |
| bClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| b()->sendSyncReply(WTF::move(encoder)); |
| messages.add(decoder.destinationID()); |
| return true; |
| }); |
| ASSERT_TRUE(openB()); |
| while (messages.size() < messageCount - 1) |
| RunLoop::currentSingleton().cycle(); |
| semaphore.signal(); |
| }); |
| for (uint64_t i = 1u; i < messageCount; ++i) { |
| a()->sendWithAsyncReply(MockTestMessageWithAsyncReply1 { }, [&, j = i] (uint64_t) { |
| // The reply value might be the reply value (destinationID) or zero in case the |
| // reply was resolved as invalid. Either of these are expected valid results. |
| // Use the `j` to prove that reply callback was run as expected. |
| replies.add(j); |
| }, i); |
| } |
| auto flushResult = a()->flushSentMessages(kDefaultWaitForTimeout); |
| EXPECT_EQ(flushResult, IPC::Error::NoError); |
| a()->invalidate(); |
| semaphore.wait(); |
| for (uint64_t i = 1u; i < messageCount; ++i) { |
| EXPECT_TRUE(replies.contains(i)) << i; |
| EXPECT_TRUE(messages.contains(i)) << i; |
| } |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| // Ensure that replies are received in the right order. |
| TEST_P(ConnectionRunLoopTest, RunLoopSendWithPromisedReplyOrder) |
| { |
| using Promise = MockTestMessageWithAsyncReply1::Promise; |
| |
| ASSERT_TRUE(openA()); |
| uint64_t replyID = 0; |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << replyID++; |
| a()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| Vector<uint64_t> replies; |
| constexpr size_t counter = 50; |
| replies.reserveInitialCapacity(counter); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| dispatchAndWait(runLoop, [&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 0; i < counter; ++i) { |
| if (!(i % 2)) { |
| b()->sendWithPromisedReply(MockTestMessageWithAsyncReply1 { }, 100)->whenSettled(runLoop, [&, i] (Promise::Result result) { |
| EXPECT_TRUE(result.has_value()); |
| EXPECT_EQ(result.value(), i); |
| replies.append(i); |
| }); |
| } else { |
| b()->sendWithAsyncReply(MockTestMessageWithAsyncReply1 { }, [&, i] (uint64_t value) { |
| EXPECT_EQ(value, i); |
| replies.append(i); |
| }, 100); |
| } |
| } |
| while (replies.size() < counter) |
| RunLoop::currentSingleton().cycle(); |
| b()->invalidate(); |
| }); |
| |
| for (uint64_t i = 0u; i < counter; ++i) |
| EXPECT_EQ(replies[i], i); |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| // This API contract does not make sense. Not only that, but there is no good way currently |
| // to capture this in a thread-safe way (construct completion handler in a thread-safe way |
| // so that it would assert that it would execute in the run loop thread). This is disabled |
| // until the API contract is changed. |
| TEST_P(ConnectionRunLoopTest, DISABLED_RunLoopSendAsyncOnAnotherRunLoopDispatchesOnConnectionRunLoop) |
| { |
| ASSERT_TRUE(openA()); |
| aClient().setAsyncMessageHandler([&] (IPC::Connection&, IPC::Decoder& decoder) -> bool { |
| auto listenerID = decoder.decode<uint64_t>(); |
| auto encoder = makeUniqueRef<IPC::Encoder>(MockTestMessageWithAsyncReply1::asyncMessageReplyName(), *listenerID); |
| encoder.get() << decoder.destinationID(); |
| a()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| HashSet<uint64_t> replies; |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| dispatchSync(runLoop, [&] { |
| ASSERT_TRUE(openB()); |
| }); |
| |
| BinarySemaphore semaphore; |
| auto otherRunLoop = createRunLoop(RUN_LOOP_NAME); |
| otherRunLoop->dispatch([&] { |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| b()->sendWithAsyncReply(MockTestMessageWithAsyncReply1 { }, [&] (uint64_t value) { |
| EXPECT_GE(value, 100u); |
| // These should be dispatched on `runLoop` above, which does not make much sense. |
| replies.add(value); |
| }, i); |
| } |
| // Halt the runloop for a proof that the async replies are not processed on |
| // this run loop. |
| semaphore.wait(); |
| }); |
| dispatchAndWait(runLoop, [&] { |
| while (replies.size() < 60u) |
| RunLoop::currentSingleton().cycle(); |
| }); |
| |
| for (uint64_t i = 100u; i < 160u; ++i) |
| EXPECT_TRUE(replies.contains(i)); |
| semaphore.signal(); |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| // This makes no sense: |
| // - async reply handlers are dispatched on the connection run loop |
| // - async reply handlers are dispatched as cancelled on connection run loop during invalidate |
| // - async reply handlers that are sent to already invalid connection are dispatched on main run loop |
| // We have to make the discrepancy as the Connection is not bound to any run loop if it is invalid, e.g. |
| // prior to open() and after invalidate(). |
| // Previously Connection was bound only to main run loop. In that scenario also the invalid send could cancel the reply handler |
| // on main run loop, as that is guaranteed to exist. After Connection could be bound to an arbitrary run loop, we cannot |
| // cancel the reply handler on a run loop we do not know about. |
| // Will be fixed later. Likely this needs an API contract change, where async reply handlers are dispatched on the |
| // calling run loop. |
| TEST_P(ConnectionRunLoopTest, InvalidSendWithAsyncReplyDispatchesCancelHandlerOnMainThread) |
| { |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| uint64_t reply = 1u; |
| BinarySemaphore semaphore; |
| runLoop->dispatch([&] { |
| ASSERT_TRUE(openB()); |
| b()->invalidate(); |
| b()->sendWithAsyncReply(MockTestMessageWithAsyncReply1 { }, [&] (uint64_t value) { |
| reply = value; |
| }, 77); |
| // Halt the runloop for a proof that the async replies are not processed on |
| // this run loop. |
| semaphore.wait(); |
| }); |
| EXPECT_EQ(reply, 1u); |
| while (reply == 1u) |
| RunLoop::currentSingleton().cycle(); |
| EXPECT_EQ(reply, 0u); |
| semaphore.signal(); |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| TEST_P(ConnectionRunLoopTest, RunLoopWaitForAndDispatchImmediately) |
| { |
| ASSERT_TRUE(openA()); |
| |
| for (uint64_t i = 0u; i < 55u; ++i) |
| a()->send(MockTestMessage1 { }, i); |
| |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| runLoop->dispatch([&] { |
| ASSERT_TRUE(openB()); |
| for (uint64_t i = 100u; i < 160u; ++i) |
| b()->send(MockTestMessage1 { }, i); |
| |
| for (uint64_t i = 0u; i < 55u; ++i) { |
| IPC::Error error = b()->waitForAndDispatchImmediately<MockTestMessage1>(i, kDefaultWaitForTimeout); |
| ASSERT_EQ(IPC::Error::NoError, error); |
| |
| auto message = bClient().waitForMessage(0_s); |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, i); |
| } |
| }); |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| IPC::Error error = a()->waitForAndDispatchImmediately<MockTestMessage1>(i, kDefaultWaitForTimeout); |
| ASSERT_EQ(IPC::Error::NoError, error); |
| |
| auto message = aClient().waitForMessage(0_s); |
| EXPECT_EQ(message.messageName, MockTestMessage1::name()); |
| EXPECT_EQ(message.destinationID, i); |
| } |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| #if PLATFORM(MAC) && __MAC_OS_X_VERSION_MIN_REQUIRED >= 260000 |
| TEST_P(ConnectionRunLoopTest, DISABLED_SendLocalSyncMessageWithDataReply) |
| #else |
| TEST_P(ConnectionRunLoopTest, SendLocalSyncMessageWithDataReply) |
| #endif |
| { |
| constexpr int iterations = 5; |
| constexpr size_t dataSize = 1e8; // 100 MB. |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| runLoop->dispatch([&] { |
| bClient().setSyncMessageHandler([&](IPC::Connection&, IPC::Decoder& decoder, UniqueRef<IPC::Encoder>& encoder) -> bool { |
| Vector<uint8_t> data(dataSize); |
| for (size_t i = 0; i < dataSize; ++i) |
| data[i] = static_cast<uint8_t>(i); |
| encoder.get() << data; |
| b()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| }); |
| ASSERT_TRUE(openB()); |
| }); |
| for (int i = 0; i < iterations; ++i) { |
| auto sendResult = a()->sendSync(MockTestSyncMessageWithDataReply { }, i, kDefaultWaitForTimeout); |
| ASSERT_TRUE(sendResult.succeeded()); |
| auto& [replyData] = sendResult.reply(); |
| for (size_t i = 0; i < replyData.size(); ++i) { |
| auto expected = static_cast<uint8_t>(i); |
| if (expected != replyData[i]) |
| ASSERT_EQ(expected, replyData[i]); |
| } |
| ASSERT_EQ(dataSize, replyData.size()); |
| } |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| // Tests that unhandled sync message is cancelled. IPC::Connection receiving unhandled messages. |
| TEST_P(ConnectionRunLoopTest, SyncMessageNotHandledIsCancelled) |
| { |
| constexpr size_t iterations = 10; |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| uint64_t gotDestination = 0; |
| runLoop->dispatch([&] { |
| bClient().setSyncMessageHandler([&](IPC::Connection&, IPC::Decoder& decoder, UniqueRef<IPC::Encoder>& encoder) -> bool { |
| gotDestination = decoder.destinationID(); |
| // Unhandled message. |
| if (decoder.destinationID() == 77) |
| return true; // Message destination was unknown, unhandled message. |
| if (decoder.destinationID() == 99) { |
| b()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| } |
| EXPECT_TRUE(false); |
| return false; |
| }); |
| ASSERT_TRUE(openB()); |
| }); |
| for (size_t i = 0; i < iterations; ++i) { |
| { |
| gotDestination = 0; |
| auto result = a()->sendSync(MockTestSyncMessage(), 77, kDefaultWaitForTimeout); |
| ASSERT_FALSE(result.succeeded()); |
| EXPECT_EQ(IPC::Error::SyncMessageCancelled, result.error()); |
| EXPECT_EQ(77u, gotDestination); |
| } |
| { |
| gotDestination = 0; |
| auto result = a()->sendSync(MockTestSyncMessage(), 99, kDefaultWaitForTimeout); |
| EXPECT_TRUE(result.succeeded()); |
| EXPECT_EQ(99u, gotDestination); |
| } |
| } |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| |
| #if ENABLE(IPC_TESTING_API) |
| // Tests that sync message with decode failure is cancelled. IPC::Connection does not allow these, |
| // but JS IPC Testing API uses these. |
| TEST_P(ConnectionRunLoopTest, SyncMessageDecodeFailureIsCancelled) |
| { |
| constexpr size_t iterations = 10; |
| ASSERT_TRUE(openA()); |
| auto runLoop = createRunLoop(RUN_LOOP_NAME); |
| uint64_t gotDestination = 0; |
| runLoop->dispatch([&] { |
| b()->setIgnoreInvalidMessageForTesting(); |
| bClient().setSyncMessageHandler([&](IPC::Connection&, IPC::Decoder& decoder, UniqueRef<IPC::Encoder>& encoder) -> bool { |
| gotDestination = decoder.destinationID(); |
| // Decode failure. |
| if (decoder.destinationID() == 88) { |
| EXPECT_FALSE(decoder.decode<uint64_t>()); |
| return true; // Message was handled, but decode failed. |
| } |
| if (decoder.destinationID() == 99) { |
| b()->sendSyncReply(WTF::move(encoder)); |
| return true; |
| } |
| EXPECT_TRUE(false); |
| return false; |
| }); |
| ASSERT_TRUE(openB()); |
| }); |
| for (size_t i = 0; i < iterations; ++i) { |
| { |
| gotDestination = 0; |
| auto result = a()->sendSync(MockTestSyncMessage(), 88, IPC::Timeout::infinity()); |
| ASSERT_FALSE(result.succeeded()); |
| EXPECT_EQ(IPC::Error::SyncMessageCancelled, result.error()); |
| EXPECT_EQ(88u, gotDestination); |
| } |
| { |
| gotDestination = 0; |
| auto result = a()->sendSync(MockTestSyncMessage(), 99, IPC::Timeout::infinity()); |
| EXPECT_TRUE(result.succeeded()); |
| EXPECT_EQ(99u, gotDestination); |
| } |
| } |
| runLoop->dispatch([&] { |
| b()->invalidate(); |
| }); |
| localReferenceBarrier(); |
| } |
| #endif |
| |
| #undef RUN_LOOP_NAME |
| #undef LOCAL_STRINGIFY |
| |
| |
| class ConnectionDidReceiveInvalidMessageTest : public testing::TestWithParam<std::tuple<ConnectionTestDirection, InvalidMessageTestType>>, protected ConnectionTestBase { |
| public: |
| bool serverIsA() const { return std::get<0>(GetParam()) == ConnectionTestDirection::ServerIsA; } |
| InvalidMessageTestType testType() |
| { |
| return std::get<1>(GetParam()); |
| } |
| |
| void SetUp() override |
| { |
| setupBase(); |
| if (!serverIsA()) |
| std::swap(m_connections[0].connection, m_connections[1].connection); |
| |
| if (testType() == InvalidMessageTestType::DecodeError) { |
| // Cause a decode error by decoding too much. |
| serverClient().setAsyncMessageHandler([] (IPC::Connection&, IPC::Decoder& decoder) { |
| while (std::optional contents = decoder.decode<uint64_t>()) { |
| } |
| return true; |
| }); |
| serverClient().setSyncMessageHandler([] (IPC::Connection&, IPC::Decoder& decoder, UniqueRef<IPC::Encoder>&) { |
| while (std::optional contents = decoder.decode<uint64_t>()) { |
| } |
| return true; |
| }); |
| } else { |
| // Cause a validation error, MESSAGE_CHECK. |
| serverClient().setAsyncMessageHandler([] (IPC::Connection& connection, IPC::Decoder&) { |
| connection.markCurrentlyDispatchedMessageAsInvalid("async message check"_s); |
| return true; |
| }); |
| serverClient().setSyncMessageHandler([] (IPC::Connection& connection, IPC::Decoder&, UniqueRef<IPC::Encoder>&) { |
| connection.markCurrentlyDispatchedMessageAsInvalid("sync message check"_s); |
| return true; |
| }); |
| } |
| |
| } |
| |
| void TearDown() override |
| { |
| teardownBase(); |
| } |
| |
| protected: |
| ::testing::AssertionResult openServer() { return openA(); } |
| ::testing::AssertionResult openClient() { return openB(); } |
| IPC::Connection* server() { return a(); } |
| IPC::Connection* client() { return b(); } |
| MockConnectionClient& serverClient() { return aClient(); } |
| MockConnectionClient& clientClient() { return bClient(); } |
| void deleteServer() { deleteA(); } |
| void deleteClient() { deleteB(); } |
| }; |
| |
| TEST_P(ConnectionDidReceiveInvalidMessageTest, Async) |
| { |
| ASSERT_TRUE(openBoth()); |
| for (uint64_t i = 100u; i < 160u; ++i) |
| client()->send(MockTestMessage1 { }, i); |
| |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| auto invalidMessage = serverClient().waitForInvalidMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(invalidMessage, MockTestMessage1::name()); |
| } |
| } |
| |
| TEST_P(ConnectionDidReceiveInvalidMessageTest, AsyncWithReply) |
| { |
| ASSERT_TRUE(openBoth()); |
| HashMap<uint64_t, uint64_t> replies; |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| client()->sendWithAsyncReply(MockTestMessageWithAsyncReply1 { }, [&] (uint64_t value) { |
| replies.add(i, value); |
| }, i); |
| } |
| |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| auto invalidMessage = serverClient().waitForInvalidMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(invalidMessage, MockTestMessageWithAsyncReply1::name()); |
| } |
| |
| // FIXME: Currently the IPC::Connection semantics are incorrect: connection is not invalidated |
| // after receiving the first invalid message. Since invalid messages must be ignored, we cannot |
| // let the sender know that the async reply handlers should be cancelled. |
| // Invalidate the client connection explicitly to receive the cancellations. |
| // Once the semantics are fixed, e.g. the server closes the connection, something like |
| // below can be enabled. |
| if ((false)) { |
| while (replies.size() < 60u) |
| RunLoop::currentSingleton().cycle(); |
| |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| auto reply = replies.find(i); |
| ASSERT_NE(reply, replies.end()); |
| EXPECT_EQ(reply->value, 0u); // Cancelled. |
| } |
| } else |
| client()->invalidate(); |
| } |
| |
| TEST_P(ConnectionDidReceiveInvalidMessageTest, Sync) |
| { |
| ASSERT_TRUE(openBoth()); |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| auto result = client()->sendSync(MockTestSyncMessage(), 99, kDefaultWaitForTimeout); |
| EXPECT_FALSE(result.succeeded()); |
| } |
| |
| for (uint64_t i = 100u; i < 160u; ++i) { |
| auto invalidMessage = serverClient().waitForInvalidMessage(kDefaultWaitForTimeout); |
| EXPECT_EQ(invalidMessage, MockTestSyncMessage::name()); |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(ConnectionTest, |
| ConnectionTestABBA, |
| testing::Values(ConnectionTestDirection::ServerIsA, ConnectionTestDirection::ClientIsA), |
| TestParametersToStringFormatter()); |
| |
| INSTANTIATE_TEST_SUITE_P(ConnectionTest, |
| ConnectionRunLoopTest, |
| testing::Values(ConnectionTestDirection::ServerIsA, ConnectionTestDirection::ClientIsA), |
| TestParametersToStringFormatter()); |
| |
| INSTANTIATE_TEST_SUITE_P(ConnectionTest, |
| ConnectionDidReceiveInvalidMessageTest, |
| testing::Combine(testing::Values(ConnectionTestDirection::ServerIsA, ConnectionTestDirection::ClientIsA), testing::Values(InvalidMessageTestType::DecodeError, InvalidMessageTestType::ValidationError)), |
| TestParametersToStringFormatter()); |
| |
| } |