mirror of
https://github.com/LadybirdBrowser/ladybird.git
synced 2025-06-09 17:44:56 +09:00
LibIPC: Change TransportSocket to write large messages in small chunks
Bring back d6080d1fdc
with a missing check
whether underlying socket is closed, before accessing `fd()` that is
optional and empty in case of closed socket.
This commit is contained in:
parent
1ee56d34e7
commit
14dc7686c3
Notes:
github-actions[bot]
2025-04-10 21:41:17 +00:00
Author: https://github.com/kalenikaliaksandr
Commit: 14dc7686c3
Pull-request: https://github.com/LadybirdBrowser/ladybird/pull/4313
Reviewed-by: https://github.com/ADKaster
2 changed files with 98 additions and 65 deletions
|
@ -13,26 +13,79 @@
|
|||
|
||||
namespace IPC {
|
||||
|
||||
void SendQueue::enqueue_message(Vector<u8>&& bytes, Vector<int>&& fds)
|
||||
{
|
||||
Threading::MutexLocker locker(m_mutex);
|
||||
m_bytes.append(bytes.data(), bytes.size());
|
||||
m_fds.append(fds.data(), fds.size());
|
||||
m_condition.signal();
|
||||
}
|
||||
|
||||
SendQueue::Running SendQueue::block_until_message_enqueued()
|
||||
{
|
||||
Threading::MutexLocker locker(m_mutex);
|
||||
while (m_bytes.is_empty() && m_fds.is_empty() && m_running)
|
||||
m_condition.wait();
|
||||
return m_running ? Running::Yes : Running::No;
|
||||
}
|
||||
|
||||
SendQueue::BytesAndFds SendQueue::dequeue(size_t max_bytes)
|
||||
{
|
||||
Threading::MutexLocker locker(m_mutex);
|
||||
auto bytes_to_send = min(max_bytes, m_bytes.size());
|
||||
Vector<u8> bytes;
|
||||
bytes.append(m_bytes.data(), bytes_to_send);
|
||||
m_bytes.remove(0, bytes_to_send);
|
||||
return { move(bytes), move(m_fds) };
|
||||
}
|
||||
|
||||
void SendQueue::return_unsent_data_to_front_of_queue(ReadonlyBytes const& bytes, Vector<int> const& fds)
|
||||
{
|
||||
Threading::MutexLocker locker(m_mutex);
|
||||
m_bytes.prepend(bytes.data(), bytes.size());
|
||||
m_fds.prepend(fds.data(), fds.size());
|
||||
}
|
||||
|
||||
void SendQueue::stop()
|
||||
{
|
||||
Threading::MutexLocker locker(m_mutex);
|
||||
m_running = false;
|
||||
m_condition.signal();
|
||||
}
|
||||
|
||||
TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
|
||||
: m_socket(move(socket))
|
||||
{
|
||||
m_send_queue = adopt_ref(*new SendQueue);
|
||||
m_send_thread = Threading::Thread::construct([this, send_queue = m_send_queue]() -> intptr_t {
|
||||
for (;;) {
|
||||
send_queue->mutex.lock();
|
||||
while (send_queue->messages.is_empty() && send_queue->running)
|
||||
send_queue->condition.wait();
|
||||
|
||||
if (!send_queue->running) {
|
||||
send_queue->mutex.unlock();
|
||||
if (send_queue->block_until_message_enqueued() == SendQueue::Running::No)
|
||||
break;
|
||||
|
||||
auto [bytes, fds] = send_queue->dequeue(4096);
|
||||
ReadonlyBytes remaining_to_send_bytes = bytes;
|
||||
|
||||
auto result = send_message(*m_socket, remaining_to_send_bytes, fds);
|
||||
if (result.is_error()) {
|
||||
dbgln("TransportSocket::send_thread: {}", result.error());
|
||||
VERIFY_NOT_REACHED();
|
||||
}
|
||||
|
||||
auto [bytes, fds] = send_queue->messages.take_first();
|
||||
send_queue->mutex.unlock();
|
||||
if (!remaining_to_send_bytes.is_empty() || !fds.is_empty()) {
|
||||
send_queue->return_unsent_data_to_front_of_queue(remaining_to_send_bytes, fds);
|
||||
}
|
||||
|
||||
if (auto result = send_message(*m_socket, bytes, fds); result.is_error()) {
|
||||
dbgln("TransportSocket::send_thread: {}", result.error());
|
||||
if (!m_socket->is_open())
|
||||
break;
|
||||
|
||||
{
|
||||
Vector<struct pollfd, 1> pollfds;
|
||||
pollfds.append({ .fd = m_socket->fd().value(), .events = POLLOUT, .revents = 0 });
|
||||
|
||||
ErrorOr<int> result { 0 };
|
||||
do {
|
||||
result = Core::System::poll(pollfds, -1);
|
||||
} while (result.is_error() && result.error().code() == EINTR);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -45,11 +98,7 @@ TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
|
|||
|
||||
TransportSocket::~TransportSocket()
|
||||
{
|
||||
{
|
||||
Threading::MutexLocker locker(m_send_queue->mutex);
|
||||
m_send_queue->running = false;
|
||||
m_send_queue->condition.signal();
|
||||
}
|
||||
m_send_queue->stop();
|
||||
(void)m_send_thread->join();
|
||||
}
|
||||
|
||||
|
@ -114,61 +163,31 @@ void TransportSocket::post_message(Vector<u8> const& bytes_to_write, Vector<Nonn
|
|||
}
|
||||
}
|
||||
|
||||
queue_message_on_send_thread({ move(message_buffer), move(raw_fds) });
|
||||
m_send_queue->enqueue_message(move(message_buffer), move(raw_fds));
|
||||
}
|
||||
|
||||
void TransportSocket::queue_message_on_send_thread(MessageToSend&& message_to_send) const
|
||||
{
|
||||
Threading::MutexLocker lock(m_send_queue->mutex);
|
||||
m_send_queue->messages.append(move(message_to_send));
|
||||
m_send_queue->condition.signal();
|
||||
}
|
||||
|
||||
ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes&& bytes_to_write, Vector<int, 1> const& unowned_fds)
|
||||
ErrorOr<void> TransportSocket::send_message(Core::LocalSocket& socket, ReadonlyBytes& bytes_to_write, Vector<int>& unowned_fds)
|
||||
{
|
||||
auto num_fds_to_transfer = unowned_fds.size();
|
||||
while (!bytes_to_write.is_empty()) {
|
||||
ErrorOr<ssize_t> maybe_nwritten = 0;
|
||||
if (num_fds_to_transfer > 0) {
|
||||
maybe_nwritten = socket.send_message(bytes_to_write, 0, unowned_fds);
|
||||
if (!maybe_nwritten.is_error())
|
||||
num_fds_to_transfer = 0;
|
||||
} else {
|
||||
maybe_nwritten = socket.write_some(bytes_to_write);
|
||||
}
|
||||
|
||||
if (maybe_nwritten.is_error()) {
|
||||
if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK)) {
|
||||
|
||||
// FIXME: Refactor this to pass the unwritten bytes back to the caller to send 'later'
|
||||
// or next time the socket is writable
|
||||
Vector<struct pollfd, 1> pollfds;
|
||||
if (pollfds.is_empty())
|
||||
pollfds.append({ .fd = socket.fd().value(), .events = POLLOUT, .revents = 0 });
|
||||
|
||||
ErrorOr<int> result { 0 };
|
||||
do {
|
||||
constexpr u32 POLL_TIMEOUT_MS = 100;
|
||||
result = Core::System::poll(pollfds, POLL_TIMEOUT_MS);
|
||||
} while (result.is_error() && result.error().code() == EINTR);
|
||||
|
||||
if (!result.is_error() && result.value() != 0)
|
||||
continue;
|
||||
|
||||
switch (error.code()) {
|
||||
case EPIPE:
|
||||
return Error::from_string_literal("IPC::transfer_message: Disconnected from peer");
|
||||
case EAGAIN:
|
||||
return Error::from_string_literal("IPC::transfer_message: Timed out waiting for socket to become writable");
|
||||
default:
|
||||
return Error::from_syscall("IPC::transfer_message write"sv, -error.code());
|
||||
}
|
||||
if (auto error = maybe_nwritten.release_error(); error.is_errno() && (error.code() == EAGAIN || error.code() == EWOULDBLOCK || error.code() == EINTR)) {
|
||||
return {};
|
||||
} else {
|
||||
return error;
|
||||
}
|
||||
}
|
||||
|
||||
bytes_to_write = bytes_to_write.slice(maybe_nwritten.value());
|
||||
num_fds_to_transfer = 0;
|
||||
unowned_fds.clear();
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
@ -252,7 +271,7 @@ TransportSocket::ShouldShutdown TransportSocket::read_as_many_messages_as_possib
|
|||
header.fd_count = received_fd_count;
|
||||
header.type = MessageHeader::Type::FileDescriptorAcknowledgement;
|
||||
memcpy(message_buffer.data(), &header, sizeof(MessageHeader));
|
||||
queue_message_on_send_thread({ move(message_buffer), {} });
|
||||
m_send_queue->enqueue_message(move(message_buffer), {});
|
||||
}
|
||||
|
||||
if (index < m_unprocessed_bytes.size()) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue