From b53694b4c098a44984ddbee4e654a1272968de7d Mon Sep 17 00:00:00 2001 From: Aliaksandr Kalenik Date: Thu, 10 Apr 2025 20:26:46 +0200 Subject: [PATCH] LibIPC+LibWeb: Delete LargeMessageWrapper workaround in IPC connection Bring back 2d625f5c2343169d7e99f403225de47915859836 --- Libraries/LibIPC/Connection.cpp | 31 +++--------- Libraries/LibIPC/Connection.h | 10 ++-- Libraries/LibIPC/Decoder.h | 7 ++- Libraries/LibIPC/Message.cpp | 50 ------------------- Libraries/LibIPC/Message.h | 32 ------------ Libraries/LibIPC/TransportSocket.cpp | 4 +- Libraries/LibIPC/TransportSocket.h | 7 ++- Libraries/LibIPC/UnprocessedFileDescriptors.h | 36 ------------- Libraries/LibWeb/HTML/MessagePort.cpp | 10 ++-- Libraries/LibWeb/HTML/MessagePort.h | 1 - .../Tools/CodeGenerators/IPCCompiler/main.cpp | 12 ++--- 11 files changed, 23 insertions(+), 177 deletions(-) delete mode 100644 Libraries/LibIPC/UnprocessedFileDescriptors.h diff --git a/Libraries/LibIPC/Connection.cpp b/Libraries/LibIPC/Connection.cpp index cc02f30d85b..8f15685393d 100644 --- a/Libraries/LibIPC/Connection.cpp +++ b/Libraries/LibIPC/Connection.cpp @@ -12,7 +12,6 @@ #include #include #include -#include namespace IPC { @@ -40,21 +39,16 @@ bool ConnectionBase::is_open() const ErrorOr ConnectionBase::post_message(Message const& message) { - return post_message(message.endpoint_magic(), TRY(message.encode())); + return post_message(TRY(message.encode())); } -ErrorOr ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer) +ErrorOr ConnectionBase::post_message(MessageBuffer buffer) { // NOTE: If this connection is being shut down, but has not yet been destroyed, // the socket will be closed. Don't try to send more messages. if (!m_transport->is_open()) return Error::from_string_literal("Trying to post_message during IPC shutdown"); - if (buffer.data().size() > TransportSocket::SOCKET_BUFFER_SIZE) { - auto wrapper = LargeMessageWrapper::create(endpoint_magic, buffer); - buffer = MUST(wrapper->encode()); - } - MUST(buffer.transfer_message(*m_transport)); m_responsiveness_timer->start(); @@ -85,7 +79,7 @@ void ConnectionBase::handle_messages() } if (auto response = handler_result.release_value()) { - if (auto post_result = post_message(m_local_endpoint_magic, *response); post_result.is_error()) { + if (auto post_result = post_message(*response); post_result.is_error()) { dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error()); } } @@ -100,24 +94,11 @@ void ConnectionBase::wait_for_transport_to_become_readable() ErrorOr ConnectionBase::drain_messages_from_peer() { - auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) { - auto const& bytes = unparsed_message.bytes; - UnprocessedFileDescriptors unprocessed_fds; - unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); - if (auto message = try_parse_message(bytes, unprocessed_fds)) { - if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) { - LargeMessageWrapper* wrapper = static_cast(message.ptr()); - auto wrapped_message = wrapper->wrapped_message_data(); - unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds()); - auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds); - VERIFY(parsed_message); - m_unprocessed_messages.append(parsed_message.release_nonnull()); - return; - } - + auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& raw_message) { + if (auto message = try_parse_message(raw_message.bytes, raw_message.fds)) { m_unprocessed_messages.append(message.release_nonnull()); } else { - dbgln("Failed to parse IPC message {:hex-dump}", bytes); + dbgln("Failed to parse IPC message {:hex-dump}", raw_message.bytes); VERIFY_NOT_REACHED(); } }); diff --git a/Libraries/LibIPC/Connection.h b/Libraries/LibIPC/Connection.h index 0f5b918226f..c1644e66b6a 100644 --- a/Libraries/LibIPC/Connection.h +++ b/Libraries/LibIPC/Connection.h @@ -15,10 +15,6 @@ #include #include #include -#include -#include -#include -#include namespace IPC { @@ -30,7 +26,7 @@ public: [[nodiscard]] bool is_open() const; ErrorOr post_message(Message const&); - ErrorOr post_message(u32 endpoint_magic, MessageBuffer); + ErrorOr post_message(MessageBuffer); void shutdown(); virtual void die() { } @@ -43,7 +39,7 @@ protected: virtual void may_have_become_unresponsive() { } virtual void did_become_responsive() { } virtual void shutdown_with_error(Error const&); - virtual OwnPtr try_parse_message(ReadonlyBytes, UnprocessedFileDescriptors&) = 0; + virtual OwnPtr try_parse_message(ReadonlyBytes, Queue&) = 0; OwnPtr wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id); void wait_for_transport_to_become_readable(); @@ -102,7 +98,7 @@ protected: return {}; } - virtual OwnPtr try_parse_message(ReadonlyBytes bytes, UnprocessedFileDescriptors& fds) override + virtual OwnPtr try_parse_message(ReadonlyBytes bytes, Queue& fds) override { auto local_message = LocalEndpoint::decode_message(bytes, fds); if (!local_message.is_error()) diff --git a/Libraries/LibIPC/Decoder.h b/Libraries/LibIPC/Decoder.h index c7fdd892b72..7fa284c26a2 100644 --- a/Libraries/LibIPC/Decoder.h +++ b/Libraries/LibIPC/Decoder.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -38,7 +37,7 @@ inline ErrorOr decode(Decoder&) class Decoder { public: - Decoder(Stream& stream, UnprocessedFileDescriptors& files) + Decoder(Stream& stream, Queue& files) : m_stream(stream) , m_files(files) { @@ -63,11 +62,11 @@ public: ErrorOr decode_size(); Stream& stream() { return m_stream; } - UnprocessedFileDescriptors& files() { return m_files; } + Queue& files() { return m_files; } private: Stream& m_stream; - UnprocessedFileDescriptors& m_files; + Queue& m_files; }; template diff --git a/Libraries/LibIPC/Message.cpp b/Libraries/LibIPC/Message.cpp index 5652bac21cb..680cc329d08 100644 --- a/Libraries/LibIPC/Message.cpp +++ b/Libraries/LibIPC/Message.cpp @@ -6,7 +6,6 @@ #include #include -#include #include namespace IPC { @@ -47,53 +46,4 @@ ErrorOr MessageBuffer::transfer_message(Transport& transport) return {}; } -NonnullOwnPtr LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap) -{ - auto size = buffer_to_wrap.data().size(); - auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size)); - memcpy(wrapped_message_data.data(), buffer_to_wrap.data().data(), size); - Vector files; - for (auto& owned_fd : buffer_to_wrap.take_fds()) { - files.append(File::adopt_fd(owned_fd->take_fd())); - } - return make(endpoint_magic, move(wrapped_message_data), move(files)); -} - -LargeMessageWrapper::LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector&& wrapped_fds) - : m_endpoint_magic(endpoint_magic) - , m_wrapped_message_data(move(wrapped_message_data)) - , m_wrapped_fds(move(wrapped_fds)) -{ -} - -ErrorOr LargeMessageWrapper::encode() const -{ - MessageBuffer buffer; - Encoder stream { buffer }; - TRY(stream.encode(m_endpoint_magic)); - TRY(stream.encode(MESSAGE_ID)); - TRY(stream.encode(m_wrapped_message_data)); - TRY(stream.encode(m_wrapped_fds.size())); - for (auto const& wrapped_fd : m_wrapped_fds) { - TRY(stream.append_file_descriptor(wrapped_fd.take_fd())); - } - - return buffer; -} - -ErrorOr> LargeMessageWrapper::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files) -{ - Decoder decoder { stream, files }; - auto wrapped_message_data = TRY(decoder.decode()); - - Vector wrapped_fds; - auto num_fds = TRY(decoder.decode()); - for (u32 i = 0; i < num_fds; ++i) { - auto fd = TRY(decoder.decode()); - wrapped_fds.append(move(fd)); - } - - return make(endpoint_magic, wrapped_message_data, move(wrapped_fds)); -} - } diff --git a/Libraries/LibIPC/Message.h b/Libraries/LibIPC/Message.h index 65333340f2a..e79638618ed 100644 --- a/Libraries/LibIPC/Message.h +++ b/Libraries/LibIPC/Message.h @@ -8,14 +8,8 @@ #pragma once #include -#include -#include #include -#include -#include -#include #include -#include namespace IPC { @@ -67,30 +61,4 @@ protected: Message() = default; }; -class LargeMessageWrapper : public Message { -public: - ~LargeMessageWrapper() override = default; - - static constexpr int MESSAGE_ID = 0x0; - - static NonnullOwnPtr create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap); - - u32 endpoint_magic() const override { return m_endpoint_magic; } - int message_id() const override { return MESSAGE_ID; } - char const* message_name() const override { return "LargeMessageWrapper"; } - ErrorOr encode() const override; - - static ErrorOr> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files); - - ReadonlyBytes wrapped_message_data() const { return ReadonlyBytes { m_wrapped_message_data.data(), m_wrapped_message_data.size() }; } - auto take_fds() { return move(m_wrapped_fds); } - - LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector&& wrapped_fds); - -private: - u32 m_endpoint_magic { 0 }; - Core::AnonymousBuffer m_wrapped_message_data; - Vector m_wrapped_fds; -}; - } diff --git a/Libraries/LibIPC/TransportSocket.cpp b/Libraries/LibIPC/TransportSocket.cpp index 96d9ed78eb0..0c7a2064cd4 100644 --- a/Libraries/LibIPC/TransportSocket.cpp +++ b/Libraries/LibIPC/TransportSocket.cpp @@ -197,7 +197,7 @@ ErrorOr TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyB return {}; } -TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) +TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function&& callback) { Threading::RWLockLocker lock(m_socket_rw_lock); @@ -248,7 +248,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib Message message; received_fd_count += header.fd_count; for (size_t i = 0; i < header.fd_count; ++i) - message.fds.append(m_unprocessed_fds.dequeue()); + message.fds.enqueue(m_unprocessed_fds.dequeue()); message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size); callback(move(message)); } else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) { diff --git a/Libraries/LibIPC/TransportSocket.h b/Libraries/LibIPC/TransportSocket.h index d60c4dfebd0..e7a19f4e159 100644 --- a/Libraries/LibIPC/TransportSocket.h +++ b/Libraries/LibIPC/TransportSocket.h @@ -9,7 +9,6 @@ #include #include -#include #include #include #include @@ -92,9 +91,9 @@ public: }; struct Message { Vector bytes; - Vector fds; + Queue fds; }; - ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&&); + ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function&&); // Obnoxious name to make it clear that this is a dangerous operation. ErrorOr release_underlying_transport_for_transfer(); @@ -107,7 +106,7 @@ private: NonnullOwnPtr m_socket; mutable Threading::RWLock m_socket_rw_lock; ByteBuffer m_unprocessed_bytes; - UnprocessedFileDescriptors m_unprocessed_fds; + Queue m_unprocessed_fds; // After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer. // This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file diff --git a/Libraries/LibIPC/UnprocessedFileDescriptors.h b/Libraries/LibIPC/UnprocessedFileDescriptors.h deleted file mode 100644 index 991c5598b8a..00000000000 --- a/Libraries/LibIPC/UnprocessedFileDescriptors.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2025, Aliaksandr Kalenik - * - * SPDX-License-Identifier: BSD-2-Clause - */ - -#pragma once - -#include - -namespace IPC { - -class UnprocessedFileDescriptors { -public: - void enqueue(File&& fd) - { - m_fds.append(move(fd)); - } - - File dequeue() - { - return m_fds.take_first(); - } - - void return_fds_to_front_of_queue(Vector&& fds) - { - m_fds.prepend(move(fds)); - } - - size_t size() const { return m_fds.size(); } - -private: - Vector m_fds; -}; - -} diff --git a/Libraries/LibWeb/HTML/MessagePort.cpp b/Libraries/LibWeb/HTML/MessagePort.cpp index 03fe434b6ec..97d6a808e80 100644 --- a/Libraries/LibWeb/HTML/MessagePort.cpp +++ b/Libraries/LibWeb/HTML/MessagePort.cpp @@ -288,13 +288,9 @@ void MessagePort::post_port_message(SerializedTransferRecord serialize_with_tran void MessagePort::read_from_transport() { - auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& unparsed_message) { - auto& bytes = unparsed_message.bytes; - IPC::UnprocessedFileDescriptors unprocessed_fds; - unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds)); - - FixedMemoryStream stream { bytes.span(), FixedMemoryStream::Mode::ReadOnly }; - IPC::Decoder decoder { stream, unprocessed_fds }; + auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([this](auto&& raw_message) { + FixedMemoryStream stream { raw_message.bytes.span(), FixedMemoryStream::Mode::ReadOnly }; + IPC::Decoder decoder { stream, raw_message.fds }; auto serialized_transfer_record = MUST(decoder.decode()); diff --git a/Libraries/LibWeb/HTML/MessagePort.h b/Libraries/LibWeb/HTML/MessagePort.h index 0d48fd0adfb..24e3bbeccbf 100644 --- a/Libraries/LibWeb/HTML/MessagePort.h +++ b/Libraries/LibWeb/HTML/MessagePort.h @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include diff --git a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp index 4d041620171..c7fe62dfa4a 100644 --- a/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp +++ b/Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp @@ -404,7 +404,7 @@ public:)~~~"); static i32 static_message_id() { return (int)MessageID::@message.pascal_name@; } virtual const char* message_name() const override { return "@endpoint.name@::@message.pascal_name@"; } - static ErrorOr> decode(Stream& stream, IPC::UnprocessedFileDescriptors& files) + static ErrorOr> decode(Stream& stream, Queue& files) { IPC::Decoder decoder { stream, files };)~~~"); @@ -649,7 +649,7 @@ void generate_proxy_method(SourceGenerator& message_generator, Endpoint const& e } } else { message_generator.append(R"~~~()); - MUST(m_connection.post_message(@endpoint.magic@, move(message_buffer))); )~~~"); + MUST(m_connection.post_message(move(message_buffer))); )~~~"); } message_generator.appendln(R"~~~( @@ -720,7 +720,7 @@ public: static u32 static_magic() { return @endpoint.magic@; } - static ErrorOr> decode_message(ReadonlyBytes buffer, [[maybe_unused]] IPC::UnprocessedFileDescriptors& files) + static ErrorOr> decode_message(ReadonlyBytes buffer, [[maybe_unused]] Queue& files) { FixedMemoryStream stream { buffer }; auto message_endpoint_magic = TRY(stream.read_value());)~~~"); @@ -757,11 +757,6 @@ public: do_decode_message(message.response_name()); } - generator.append(R"~~~( - case (int)IPC::LargeMessageWrapper::MESSAGE_ID: - return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files)); -)~~~"); - generator.append(R"~~~( default:)~~~"); if constexpr (GENERATE_DEBUG) { @@ -903,7 +898,6 @@ void build(StringBuilder& builder, Vector const& endpoints) #include #include #include -#include #if defined(AK_COMPILER_CLANG) #pragma clang diagnostic push