blob: befa546a49734c13ef27606e91c2a398b2608e4a [file] [log] [blame]
/*
* Copyright (C) 2018 Apple Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include "MessagePortChannel.h"
#include "Logging.h"
#include "MessagePortChannelRegistry.h"
#include <wtf/CompletionHandler.h>
#include <wtf/MainThread.h>
namespace WebCore {
Ref<MessagePortChannel> MessagePortChannel::create(MessagePortChannelRegistry& registry, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
{
return adoptRef(*new MessagePortChannel(registry, port1, port2));
}
MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
: m_registry(registry)
{
ASSERT(isMainThread());
relaxAdoptionRequirement();
m_ports[0] = port1;
m_processes[0] = port1.processIdentifier;
m_entangledToProcessProtectors[0] = this;
m_ports[1] = port2;
m_processes[1] = port2.processIdentifier;
m_entangledToProcessProtectors[1] = this;
m_registry.messagePortChannelCreated(*this);
}
MessagePortChannel::~MessagePortChannel()
{
m_registry.messagePortChannelDestroyed(*this);
}
std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
return m_processes[i];
}
bool MessagePortChannel::includesPort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
return m_ports[0] == port || m_ports[1] == port;
}
void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& port, ProcessIdentifier process)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
LOG(MessagePorts, "MessagePortChannel %s (%p) entangling port %s (that port has %zu messages available)", logString().utf8().data(), this, port.logString().utf8().data(), m_pendingMessages[i].size());
ASSERT(!m_processes[i] || *m_processes[i] == process);
m_processes[i] = process;
m_entangledToProcessProtectors[i] = this;
m_pendingMessagePortTransfers[i].remove(this);
}
void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
LOG(MessagePorts, "MessagePortChannel %s (%p) disentangling port %s", logString().utf8().data(), this, port.logString().utf8().data());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
ASSERT(m_processes[i] || m_isClosed[i]);
m_processes[i] = std::nullopt;
m_pendingMessagePortTransfers[i].add(this);
// This set of steps is to guarantee that the lock is unlocked before the
// last ref to this object is released.
auto protectedThis = WTFMove(m_entangledToProcessProtectors[i]);
}
void MessagePortChannel::closePort(const MessagePortIdentifier& port)
{
ASSERT(isMainThread());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
m_processes[i] = std::nullopt;
m_isClosed[i] = true;
// This set of steps is to guarantee that the lock is unlocked before the
// last ref to this object is released.
Ref protectedThis { *this };
m_pendingMessages[i].clear();
m_pendingMessagePortTransfers[i].clear();
m_pendingMessageProtectors[i] = nullptr;
m_entangledToProcessProtectors[i] = nullptr;
}
bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
{
ASSERT(isMainThread());
ASSERT(remoteTarget == m_ports[0] || remoteTarget == m_ports[1]);
size_t i = remoteTarget == m_ports[0] ? 0 : 1;
m_pendingMessages[i].append(WTFMove(message));
LOG(MessagePorts, "MessagePortChannel %s (%p) now has %zu messages pending on port %s", logString().utf8().data(), this, m_pendingMessages[i].size(), remoteTarget.logString().utf8().data());
if (m_pendingMessages[i].size() == 1) {
m_pendingMessageProtectors[i] = this;
return true;
}
ASSERT(m_pendingMessageProtectors[i] == this);
return false;
}
void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& port, CompletionHandler<void(Vector<MessageWithMessagePorts>&&, CompletionHandler<void()>&&)>&& callback)
{
ASSERT(isMainThread());
LOG(MessagePorts, "MessagePortChannel %p taking all messages for port %s", this, port.logString().utf8().data());
ASSERT(port == m_ports[0] || port == m_ports[1]);
size_t i = port == m_ports[0] ? 0 : 1;
if (m_pendingMessages[i].isEmpty()) {
callback({ }, [] { });
return;
}
ASSERT(m_pendingMessageProtectors[i]);
Vector<MessageWithMessagePorts> result;
result.swap(m_pendingMessages[i]);
++m_messageBatchesInFlight;
LOG(MessagePorts, "There are %zu messages to take for port %s. Taking them now, messages in flight is now %" PRIu64, result.size(), port.logString().utf8().data(), m_messageBatchesInFlight);
auto size = result.size();
callback(WTFMove(result), [size, this, port, protectedThis = WTFMove(m_pendingMessageProtectors[i])] {
UNUSED_PARAM(port);
#if LOG_DISABLED
UNUSED_PARAM(size);
#endif
--m_messageBatchesInFlight;
LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, logString().utf8().data(), size, port.logString().utf8().data(), m_messageBatchesInFlight);
});
}
void MessagePortChannel::checkRemotePortForActivity(const MessagePortIdentifier& remotePort, CompletionHandler<void(MessagePortChannelProvider::HasActivity)>&& callback)
{
ASSERT(isMainThread());
ASSERT(remotePort == m_ports[0] || remotePort == m_ports[1]);
// If the remote port is closed there is no pending activity.
size_t i = remotePort == m_ports[0] ? 0 : 1;
if (m_isClosed[i]) {
callback(MessagePortChannelProvider::HasActivity::No);
return;
}
// If there are any messages in flight between the ports, there is pending activity.
if (hasAnyMessagesPendingOrInFlight()) {
callback(MessagePortChannelProvider::HasActivity::Yes);
return;
}
// If the port is not currently in a process then it's being transferred as part of a postMessage.
// We treat these ports as if they do have activity since they will be revived when the message is delivered.
if (!m_processes[i]) {
callback(MessagePortChannelProvider::HasActivity::Yes);
return;
}
CompletionHandler<void(MessagePortChannelProvider::HasActivity)> outerCallback = [this, protectedThis = Ref { *this }, callback = WTFMove(callback)](auto hasActivity) mutable {
if (hasActivity == MessagePortChannelProvider::HasActivity::Yes) {
callback(hasActivity);
return;
}
// If the remote port said it had no activity, check again for any messages that might be in flight.
// This is because it might have asynchronously sent a message just before it was asked about local activity.
if (hasAnyMessagesPendingOrInFlight())
hasActivity = MessagePortChannelProvider::HasActivity::Yes;
callback(hasActivity);
};
m_registry.checkProcessLocalPortForActivity(remotePort, *m_processes[i], WTFMove(outerCallback));
}
bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const
{
ASSERT(isMainThread());
return m_messageBatchesInFlight || !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty();
}
} // namespace WebCore