Add ThreadMessageReceiver to IPC::Connection
https://bugs.webkit.org/show_bug.cgi?id=204908
Reviewed by Brady Eidson.
ThreadMesageReceiver is similar to WorkQueueMessageReceiver, but it should handle messages (dispatched from IPC
thread) on a specific thread, while WorkQueueMessageReceiver may handle messages on different threads.
* Platform/IPC/Connection.cpp:
(IPC::Connection::addThreadMessageReceiver):
(IPC::Connection::removeThreadMessageReceiver):
(IPC::Connection::dispatchThreadMessageReceiverMessage):
(IPC::Connection::processIncomingMessage):
(IPC::Connection::dispatchMessageToThreadReceiver):
* Platform/IPC/Connection.h:
(IPC::Connection::ThreadMessageReceiver::dispatchToThread):
git-svn-id: http://svn.webkit.org/repository/webkit/trunk@253177 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/Source/WebKit/ChangeLog b/Source/WebKit/ChangeLog
index 2348790..a8e21dd 100644
--- a/Source/WebKit/ChangeLog
+++ b/Source/WebKit/ChangeLog
@@ -1,3 +1,22 @@
+2019-12-05 Sihui Liu <sihui_liu@apple.com>
+
+ Add ThreadMessageReceiver to IPC::Connection
+ https://bugs.webkit.org/show_bug.cgi?id=204908
+
+ Reviewed by Brady Eidson.
+
+ ThreadMesageReceiver is similar to WorkQueueMessageReceiver, but it should handle messages (dispatched from IPC
+ thread) on a specific thread, while WorkQueueMessageReceiver may handle messages on different threads.
+
+ * Platform/IPC/Connection.cpp:
+ (IPC::Connection::addThreadMessageReceiver):
+ (IPC::Connection::removeThreadMessageReceiver):
+ (IPC::Connection::dispatchThreadMessageReceiverMessage):
+ (IPC::Connection::processIncomingMessage):
+ (IPC::Connection::dispatchMessageToThreadReceiver):
+ * Platform/IPC/Connection.h:
+ (IPC::Connection::ThreadMessageReceiver::dispatchToThread):
+
2019-12-05 Chris Dumez <cdumez@apple.com>
MESSAGE_CHECK BackForwardItemIdentifier on incoming IPC from the WebProcess
diff --git a/Source/WebKit/Platform/IPC/Connection.cpp b/Source/WebKit/Platform/IPC/Connection.cpp
index 88904ac..62fa039 100644
--- a/Source/WebKit/Platform/IPC/Connection.cpp
+++ b/Source/WebKit/Platform/IPC/Connection.cpp
@@ -360,6 +360,50 @@
sendSyncReply(WTFMove(replyEncoder));
}
+void Connection::addThreadMessageReceiver(StringReference messageReceiverName, ThreadMessageReceiver* threadMessageReceiver)
+{
+ ASSERT(RunLoop::isMain());
+
+ std::lock_guard<Lock> lock(m_threadMessageReceiversLock);
+ ASSERT(!m_threadMessageReceivers.contains(messageReceiverName));
+
+ m_threadMessageReceivers.add(messageReceiverName, threadMessageReceiver);
+}
+
+void Connection::removeThreadMessageReceiver(StringReference messageReceiverName)
+{
+ ASSERT(RunLoop::isMain());
+
+ std::lock_guard<Lock> lock(m_threadMessageReceiversLock);
+ ASSERT(m_threadMessageReceivers.contains(messageReceiverName));
+
+ m_threadMessageReceivers.remove(messageReceiverName);
+}
+
+void Connection::dispatchThreadMessageReceiverMessage(ThreadMessageReceiver& threadMessageReceiver, Decoder& decoder)
+{
+ if (!decoder.isSyncMessage()) {
+ threadMessageReceiver.didReceiveMessage(*this, decoder);
+ return;
+ }
+
+ uint64_t syncRequestID = 0;
+ if (!decoder.decode(syncRequestID) || !syncRequestID) {
+ // FIXME: Handle invalid sync message.
+ decoder.markInvalid();
+ return;
+ }
+
+ auto replyEncoder = makeUnique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
+ threadMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
+
+ // FIXME: If the message was invalid, we should send back a SyncMessageError.
+ ASSERT(!decoder.isInvalid());
+
+ if (replyEncoder)
+ sendSyncReply(WTFMove(replyEncoder));
+}
+
void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
{
ASSERT(!m_isConnected);
@@ -667,7 +711,7 @@
return;
}
- if (!WorkQueueMessageReceiverMap::isValidKey(message->messageReceiverName())) {
+ if (!WorkQueueMessageReceiverMap::isValidKey(message->messageReceiverName()) || !ThreadMessageReceiverMap::isValidKey(message->messageReceiverName())) {
RefPtr<Connection> protectedThis(this);
StringReference messageReceiverNameReference = message->messageReceiverName();
String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size()));
@@ -683,6 +727,9 @@
if (dispatchMessageToWorkQueueReceiver(message))
return;
+ if (dispatchMessageToThreadReceiver(message))
+ return;
+
#if HAVE(QOS_CLASSES)
if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) {
pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, Thread::adjustedQOSClass(QOS_CLASS_USER_INTERACTIVE), 0);
@@ -966,6 +1013,23 @@
return false;
}
+bool Connection::dispatchMessageToThreadReceiver(std::unique_ptr<Decoder>& message)
+{
+ RefPtr<ThreadMessageReceiver> protectedThreadMessageReceiver;
+ {
+ std::lock_guard<Lock> lock(m_threadMessageReceiversLock);
+ protectedThreadMessageReceiver = m_threadMessageReceivers.get(message->messageReceiverName());
+ }
+
+ if (protectedThreadMessageReceiver) {
+ protectedThreadMessageReceiver->dispatchToThread([protectedThis = makeRef(*this), threadMessageReceiver = WTFMove(protectedThreadMessageReceiver), decoder = WTFMove(message)]() mutable {
+ protectedThis->dispatchThreadMessageReceiverMessage(*threadMessageReceiver, *decoder);
+ });
+ return true;
+ }
+ return false;
+}
+
void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
{
ASSERT(RunLoop::isMain());
diff --git a/Source/WebKit/Platform/IPC/Connection.h b/Source/WebKit/Platform/IPC/Connection.h
index d7cb94c..dd91f4f 100644
--- a/Source/WebKit/Platform/IPC/Connection.h
+++ b/Source/WebKit/Platform/IPC/Connection.h
@@ -105,6 +105,11 @@
class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<WorkQueueMessageReceiver> {
};
+ class ThreadMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<ThreadMessageReceiver> {
+ public:
+ virtual void dispatchToThread(WTF::Function<void()>&&) { };
+ };
+
#if USE(UNIX_DOMAIN_SOCKETS)
typedef int Identifier;
static bool identifierIsValid(Identifier identifier) { return identifier != -1; }
@@ -176,6 +181,9 @@
void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue&, WorkQueueMessageReceiver*);
void removeWorkQueueMessageReceiver(StringReference messageReceiverName);
+ void addThreadMessageReceiver(StringReference messageReceiverName, ThreadMessageReceiver*);
+ void removeThreadMessageReceiver(StringReference messageReceiverName);
+
bool open();
void invalidate();
void markCurrentlyDispatchedMessageAsInvalid();
@@ -260,12 +268,14 @@
std::unique_ptr<Decoder> waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption>);
bool dispatchMessageToWorkQueueReceiver(std::unique_ptr<Decoder>&);
+ bool dispatchMessageToThreadReceiver(std::unique_ptr<Decoder>&);
// Called on the connection work queue.
void processIncomingMessage(std::unique_ptr<Decoder>);
void processIncomingSyncReply(std::unique_ptr<Decoder>);
void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver&, Decoder&);
+ void dispatchThreadMessageReceiverMessage(ThreadMessageReceiver&, Decoder&);
bool canSendOutgoingMessages() const;
bool platformCanSendOutgoingMessages() const;
@@ -328,6 +338,10 @@
using WorkQueueMessageReceiverMap = HashMap<StringReference, std::pair<RefPtr<WorkQueue>, RefPtr<WorkQueueMessageReceiver>>>;
WorkQueueMessageReceiverMap m_workQueueMessageReceivers;
+ Lock m_threadMessageReceiversLock;
+ using ThreadMessageReceiverMap = HashMap<StringReference, RefPtr<ThreadMessageReceiver>>;
+ ThreadMessageReceiverMap m_threadMessageReceivers;
+
unsigned m_inSendSyncCount;
unsigned m_inDispatchMessageCount;
unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount;