mirror of
https://github.com/LadybirdBrowser/ladybird.git
synced 2025-06-09 09:34:57 +09:00
LibIPC+LibWeb: Delete LargeMessageWrapper workaround in IPC connection
It's no longer needed because TransportSocket is now capable of properly sending large messages.
This commit is contained in:
parent
d6080d1fdc
commit
2d625f5c23
Notes:
github-actions[bot]
2025-04-09 23:31:02 +00:00
Author: https://github.com/kalenikaliaksandr
Commit: 2d625f5c23
Pull-request: https://github.com/LadybirdBrowser/ladybird/pull/4304
11 changed files with 23 additions and 177 deletions
|
@ -12,7 +12,6 @@
|
|||
#include <LibIPC/Connection.h>
|
||||
#include <LibIPC/Message.h>
|
||||
#include <LibIPC/Stub.h>
|
||||
#include <LibIPC/UnprocessedFileDescriptors.h>
|
||||
|
||||
namespace IPC {
|
||||
|
||||
|
@ -40,21 +39,16 @@ bool ConnectionBase::is_open() const
|
|||
|
||||
ErrorOr<void> ConnectionBase::post_message(Message const& message)
|
||||
{
|
||||
return post_message(message.endpoint_magic(), TRY(message.encode()));
|
||||
return post_message(TRY(message.encode()));
|
||||
}
|
||||
|
||||
ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer)
|
||||
ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
|
||||
{
|
||||
// NOTE: If this connection is being shut down, but has not yet been destroyed,
|
||||
// the socket will be closed. Don't try to send more messages.
|
||||
if (!m_transport->is_open())
|
||||
return Error::from_string_literal("Trying to post_message during IPC shutdown");
|
||||
|
||||
if (buffer.data().size() > TransportSocket::SOCKET_BUFFER_SIZE) {
|
||||
auto wrapper = LargeMessageWrapper::create(endpoint_magic, buffer);
|
||||
buffer = MUST(wrapper->encode());
|
||||
}
|
||||
|
||||
MUST(buffer.transfer_message(*m_transport));
|
||||
|
||||
m_responsiveness_timer->start();
|
||||
|
@ -85,7 +79,7 @@ void ConnectionBase::handle_messages()
|
|||
}
|
||||
|
||||
if (auto response = handler_result.release_value()) {
|
||||
if (auto post_result = post_message(m_local_endpoint_magic, *response); post_result.is_error()) {
|
||||
if (auto post_result = post_message(*response); post_result.is_error()) {
|
||||
dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error());
|
||||
}
|
||||
}
|
||||
|
@ -100,24 +94,11 @@ void ConnectionBase::wait_for_transport_to_become_readable()
|
|||
|
||||
ErrorOr<void> ConnectionBase::drain_messages_from_peer()
|
||||
{
|
||||
auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& unparsed_message) {
|
||||
auto const& bytes = unparsed_message.bytes;
|
||||
UnprocessedFileDescriptors unprocessed_fds;
|
||||
unprocessed_fds.return_fds_to_front_of_queue(move(unparsed_message.fds));
|
||||
if (auto message = try_parse_message(bytes, unprocessed_fds)) {
|
||||
if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) {
|
||||
LargeMessageWrapper* wrapper = static_cast<LargeMessageWrapper*>(message.ptr());
|
||||
auto wrapped_message = wrapper->wrapped_message_data();
|
||||
unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
|
||||
auto parsed_message = try_parse_message(wrapped_message, unprocessed_fds);
|
||||
VERIFY(parsed_message);
|
||||
m_unprocessed_messages.append(parsed_message.release_nonnull());
|
||||
return;
|
||||
}
|
||||
|
||||
auto schedule_shutdown = m_transport->read_as_many_messages_as_possible_without_blocking([&](auto&& raw_message) {
|
||||
if (auto message = try_parse_message(raw_message.bytes, raw_message.fds)) {
|
||||
m_unprocessed_messages.append(message.release_nonnull());
|
||||
} else {
|
||||
dbgln("Failed to parse IPC message {:hex-dump}", bytes);
|
||||
dbgln("Failed to parse IPC message {:hex-dump}", raw_message.bytes);
|
||||
VERIFY_NOT_REACHED();
|
||||
}
|
||||
});
|
||||
|
|
|
@ -15,10 +15,6 @@
|
|||
#include <LibIPC/Forward.h>
|
||||
#include <LibIPC/Message.h>
|
||||
#include <LibIPC/Transport.h>
|
||||
#include <LibIPC/UnprocessedFileDescriptors.h>
|
||||
#include <LibThreading/ConditionVariable.h>
|
||||
#include <LibThreading/MutexProtected.h>
|
||||
#include <LibThreading/Thread.h>
|
||||
|
||||
namespace IPC {
|
||||
|
||||
|
@ -30,7 +26,7 @@ public:
|
|||
|
||||
[[nodiscard]] bool is_open() const;
|
||||
ErrorOr<void> post_message(Message const&);
|
||||
ErrorOr<void> post_message(u32 endpoint_magic, MessageBuffer);
|
||||
ErrorOr<void> post_message(MessageBuffer);
|
||||
|
||||
void shutdown();
|
||||
virtual void die() { }
|
||||
|
@ -43,7 +39,7 @@ protected:
|
|||
virtual void may_have_become_unresponsive() { }
|
||||
virtual void did_become_responsive() { }
|
||||
virtual void shutdown_with_error(Error const&);
|
||||
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes, UnprocessedFileDescriptors&) = 0;
|
||||
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes, Queue<File>&) = 0;
|
||||
|
||||
OwnPtr<IPC::Message> wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id);
|
||||
void wait_for_transport_to_become_readable();
|
||||
|
@ -102,7 +98,7 @@ protected:
|
|||
return {};
|
||||
}
|
||||
|
||||
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes bytes, UnprocessedFileDescriptors& fds) override
|
||||
virtual OwnPtr<Message> try_parse_message(ReadonlyBytes bytes, Queue<File>& fds) override
|
||||
{
|
||||
auto local_message = LocalEndpoint::decode_message(bytes, fds);
|
||||
if (!local_message.is_error())
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
#include <LibIPC/File.h>
|
||||
#include <LibIPC/Forward.h>
|
||||
#include <LibIPC/Message.h>
|
||||
#include <LibIPC/UnprocessedFileDescriptors.h>
|
||||
#include <LibURL/Origin.h>
|
||||
#include <LibURL/URL.h>
|
||||
|
||||
|
@ -38,7 +37,7 @@ inline ErrorOr<T> decode(Decoder&)
|
|||
|
||||
class Decoder {
|
||||
public:
|
||||
Decoder(Stream& stream, UnprocessedFileDescriptors& files)
|
||||
Decoder(Stream& stream, Queue<File>& files)
|
||||
: m_stream(stream)
|
||||
, m_files(files)
|
||||
{
|
||||
|
@ -63,11 +62,11 @@ public:
|
|||
ErrorOr<size_t> decode_size();
|
||||
|
||||
Stream& stream() { return m_stream; }
|
||||
UnprocessedFileDescriptors& files() { return m_files; }
|
||||
Queue<File>& files() { return m_files; }
|
||||
|
||||
private:
|
||||
Stream& m_stream;
|
||||
UnprocessedFileDescriptors& m_files;
|
||||
Queue<File>& m_files;
|
||||
};
|
||||
|
||||
template<Arithmetic T>
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
|
||||
#include <AK/Checked.h>
|
||||
#include <LibIPC/Decoder.h>
|
||||
#include <LibIPC/Encoder.h>
|
||||
#include <LibIPC/Message.h>
|
||||
|
||||
namespace IPC {
|
||||
|
@ -47,53 +46,4 @@ ErrorOr<void> MessageBuffer::transfer_message(Transport& transport)
|
|||
return {};
|
||||
}
|
||||
|
||||
NonnullOwnPtr<LargeMessageWrapper> LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap)
|
||||
{
|
||||
auto size = buffer_to_wrap.data().size();
|
||||
auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size));
|
||||
memcpy(wrapped_message_data.data<void>(), buffer_to_wrap.data().data(), size);
|
||||
Vector<File> files;
|
||||
for (auto& owned_fd : buffer_to_wrap.take_fds()) {
|
||||
files.append(File::adopt_fd(owned_fd->take_fd()));
|
||||
}
|
||||
return make<LargeMessageWrapper>(endpoint_magic, move(wrapped_message_data), move(files));
|
||||
}
|
||||
|
||||
LargeMessageWrapper::LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector<File>&& wrapped_fds)
|
||||
: m_endpoint_magic(endpoint_magic)
|
||||
, m_wrapped_message_data(move(wrapped_message_data))
|
||||
, m_wrapped_fds(move(wrapped_fds))
|
||||
{
|
||||
}
|
||||
|
||||
ErrorOr<MessageBuffer> LargeMessageWrapper::encode() const
|
||||
{
|
||||
MessageBuffer buffer;
|
||||
Encoder stream { buffer };
|
||||
TRY(stream.encode(m_endpoint_magic));
|
||||
TRY(stream.encode(MESSAGE_ID));
|
||||
TRY(stream.encode(m_wrapped_message_data));
|
||||
TRY(stream.encode(m_wrapped_fds.size()));
|
||||
for (auto const& wrapped_fd : m_wrapped_fds) {
|
||||
TRY(stream.append_file_descriptor(wrapped_fd.take_fd()));
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> LargeMessageWrapper::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files)
|
||||
{
|
||||
Decoder decoder { stream, files };
|
||||
auto wrapped_message_data = TRY(decoder.decode<Core::AnonymousBuffer>());
|
||||
|
||||
Vector<File> wrapped_fds;
|
||||
auto num_fds = TRY(decoder.decode<u32>());
|
||||
for (u32 i = 0; i < num_fds; ++i) {
|
||||
auto fd = TRY(decoder.decode<IPC::File>());
|
||||
wrapped_fds.append(move(fd));
|
||||
}
|
||||
|
||||
return make<LargeMessageWrapper>(endpoint_magic, wrapped_message_data, move(wrapped_fds));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,14 +8,8 @@
|
|||
#pragma once
|
||||
|
||||
#include <AK/Error.h>
|
||||
#include <AK/RefCounted.h>
|
||||
#include <AK/RefPtr.h>
|
||||
#include <AK/Vector.h>
|
||||
#include <LibCore/AnonymousBuffer.h>
|
||||
#include <LibCore/Forward.h>
|
||||
#include <LibCore/System.h>
|
||||
#include <LibIPC/Transport.h>
|
||||
#include <LibIPC/UnprocessedFileDescriptors.h>
|
||||
|
||||
namespace IPC {
|
||||
|
||||
|
@ -67,30 +61,4 @@ protected:
|
|||
Message() = default;
|
||||
};
|
||||
|
||||
class LargeMessageWrapper : public Message {
|
||||
public:
|
||||
~LargeMessageWrapper() override = default;
|
||||
|
||||
static constexpr int MESSAGE_ID = 0x0;
|
||||
|
||||
static NonnullOwnPtr<LargeMessageWrapper> create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap);
|
||||
|
||||
u32 endpoint_magic() const override { return m_endpoint_magic; }
|
||||
int message_id() const override { return MESSAGE_ID; }
|
||||
char const* message_name() const override { return "LargeMessageWrapper"; }
|
||||
ErrorOr<MessageBuffer> encode() const override;
|
||||
|
||||
static ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files);
|
||||
|
||||
ReadonlyBytes wrapped_message_data() const { return ReadonlyBytes { m_wrapped_message_data.data<u8>(), m_wrapped_message_data.size() }; }
|
||||
auto take_fds() { return move(m_wrapped_fds); }
|
||||
|
||||
LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector<IPC::File>&& wrapped_fds);
|
||||
|
||||
private:
|
||||
u32 m_endpoint_magic { 0 };
|
||||
Core::AnonymousBuffer m_wrapped_message_data;
|
||||
Vector<File> m_wrapped_fds;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -192,7 +192,7 @@ ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyB
|
|||
return {};
|
||||
}
|
||||
|
||||
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message)>&& callback)
|
||||
TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&& callback)
|
||||
{
|
||||
bool should_shutdown = false;
|
||||
while (is_open()) {
|
||||
|
@ -241,7 +241,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
|
|||
Message message;
|
||||
received_fd_count += header.fd_count;
|
||||
for (size_t i = 0; i < header.fd_count; ++i)
|
||||
message.fds.append(m_unprocessed_fds.dequeue());
|
||||
message.fds.enqueue(m_unprocessed_fds.dequeue());
|
||||
message.bytes.append(m_unprocessed_bytes.data() + index + sizeof(MessageHeader), header.payload_size);
|
||||
callback(move(message));
|
||||
} else if (header.type == MessageHeader::Type::FileDescriptorAcknowledgement) {
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
|
||||
#include <AK/Queue.h>
|
||||
#include <LibCore/Socket.h>
|
||||
#include <LibIPC/UnprocessedFileDescriptors.h>
|
||||
#include <LibThreading/ConditionVariable.h>
|
||||
#include <LibThreading/MutexProtected.h>
|
||||
#include <LibThreading/Thread.h>
|
||||
|
@ -91,9 +90,9 @@ public:
|
|||
};
|
||||
struct Message {
|
||||
Vector<u8> bytes;
|
||||
Vector<File> fds;
|
||||
Queue<File> fds;
|
||||
};
|
||||
ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function<void(Message)>&& schedule_shutdown);
|
||||
ShouldShutdown read_as_many_messages_as_possible_without_blocking(Function<void(Message&&)>&& schedule_shutdown);
|
||||
|
||||
// Obnoxious name to make it clear that this is a dangerous operation.
|
||||
ErrorOr<int> release_underlying_transport_for_transfer();
|
||||
|
@ -105,7 +104,7 @@ private:
|
|||
|
||||
NonnullOwnPtr<Core::LocalSocket> m_socket;
|
||||
ByteBuffer m_unprocessed_bytes;
|
||||
UnprocessedFileDescriptors m_unprocessed_fds;
|
||||
Queue<File> m_unprocessed_fds;
|
||||
|
||||
// After file descriptor is sent, it is moved to the wait queue until an acknowledgement is received from the peer.
|
||||
// This is necessary to handle a specific behavior of the macOS kernel, which may prematurely garbage-collect the file
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2025, Aliaksandr Kalenik <kalenik.aliaksandr@gmail.com>
|
||||
*
|
||||
* SPDX-License-Identifier: BSD-2-Clause
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <LibIPC/File.h>
|
||||
|
||||
namespace IPC {
|
||||
|
||||
class UnprocessedFileDescriptors {
|
||||
public:
|
||||
void enqueue(File&& fd)
|
||||
{
|
||||
m_fds.append(move(fd));
|
||||
}
|
||||
|
||||
File dequeue()
|
||||
{
|
||||
return m_fds.take_first();
|
||||
}
|
||||
|
||||
void return_fds_to_front_of_queue(Vector<File>&& fds)
|
||||
{
|
||||
m_fds.prepend(move(fds));
|
||||
}
|
||||
|
||||
size_t size() const { return m_fds.size(); }
|
||||
|
||||
private:
|
||||
Vector<File> m_fds;
|
||||
};
|
||||
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue