Add ThreadMessageReceiver to IPC::Connection
https://bugs.webkit.org/show_bug.cgi?id=204908

Reviewed by Brady Eidson.

ThreadMesageReceiver is similar to WorkQueueMessageReceiver, but it should handle messages (dispatched from IPC
thread) on a specific thread, while WorkQueueMessageReceiver may handle messages on different threads.

* Platform/IPC/Connection.cpp:
(IPC::Connection::addThreadMessageReceiver):
(IPC::Connection::removeThreadMessageReceiver):
(IPC::Connection::dispatchThreadMessageReceiverMessage):
(IPC::Connection::processIncomingMessage):
(IPC::Connection::dispatchMessageToThreadReceiver):
* Platform/IPC/Connection.h:
(IPC::Connection::ThreadMessageReceiver::dispatchToThread):


git-svn-id: http://svn.webkit.org/repository/webkit/trunk@253177 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/Source/WebKit/ChangeLog b/Source/WebKit/ChangeLog
index 2348790..a8e21dd 100644
--- a/Source/WebKit/ChangeLog
+++ b/Source/WebKit/ChangeLog
@@ -1,3 +1,22 @@
+2019-12-05  Sihui Liu  <sihui_liu@apple.com>
+
+        Add ThreadMessageReceiver to IPC::Connection
+        https://bugs.webkit.org/show_bug.cgi?id=204908
+
+        Reviewed by Brady Eidson.
+
+        ThreadMesageReceiver is similar to WorkQueueMessageReceiver, but it should handle messages (dispatched from IPC 
+        thread) on a specific thread, while WorkQueueMessageReceiver may handle messages on different threads.
+
+        * Platform/IPC/Connection.cpp:
+        (IPC::Connection::addThreadMessageReceiver):
+        (IPC::Connection::removeThreadMessageReceiver):
+        (IPC::Connection::dispatchThreadMessageReceiverMessage):
+        (IPC::Connection::processIncomingMessage):
+        (IPC::Connection::dispatchMessageToThreadReceiver):
+        * Platform/IPC/Connection.h:
+        (IPC::Connection::ThreadMessageReceiver::dispatchToThread):
+
 2019-12-05  Chris Dumez  <cdumez@apple.com>
 
         MESSAGE_CHECK BackForwardItemIdentifier on incoming IPC from the WebProcess
diff --git a/Source/WebKit/Platform/IPC/Connection.cpp b/Source/WebKit/Platform/IPC/Connection.cpp
index 88904ac..62fa039 100644
--- a/Source/WebKit/Platform/IPC/Connection.cpp
+++ b/Source/WebKit/Platform/IPC/Connection.cpp
@@ -360,6 +360,50 @@
         sendSyncReply(WTFMove(replyEncoder));
 }
 
+void Connection::addThreadMessageReceiver(StringReference messageReceiverName, ThreadMessageReceiver* threadMessageReceiver)
+{
+    ASSERT(RunLoop::isMain());
+
+    std::lock_guard<Lock> lock(m_threadMessageReceiversLock);
+    ASSERT(!m_threadMessageReceivers.contains(messageReceiverName));
+
+    m_threadMessageReceivers.add(messageReceiverName, threadMessageReceiver);
+}
+
+void Connection::removeThreadMessageReceiver(StringReference messageReceiverName)
+{
+    ASSERT(RunLoop::isMain());
+
+    std::lock_guard<Lock> lock(m_threadMessageReceiversLock);
+    ASSERT(m_threadMessageReceivers.contains(messageReceiverName));
+
+    m_threadMessageReceivers.remove(messageReceiverName);
+}
+
+void Connection::dispatchThreadMessageReceiverMessage(ThreadMessageReceiver& threadMessageReceiver, Decoder& decoder)
+{
+    if (!decoder.isSyncMessage()) {
+        threadMessageReceiver.didReceiveMessage(*this, decoder);
+        return;
+    }
+
+    uint64_t syncRequestID = 0;
+    if (!decoder.decode(syncRequestID) || !syncRequestID) {
+        // FIXME: Handle invalid sync message.
+        decoder.markInvalid();
+        return;
+    }
+
+    auto replyEncoder = makeUnique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
+    threadMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
+
+    // FIXME: If the message was invalid, we should send back a SyncMessageError.
+    ASSERT(!decoder.isInvalid());
+
+    if (replyEncoder)
+        sendSyncReply(WTFMove(replyEncoder));
+}
+
 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
 {
     ASSERT(!m_isConnected);
@@ -667,7 +711,7 @@
         return;
     }
 
-    if (!WorkQueueMessageReceiverMap::isValidKey(message->messageReceiverName())) {
+    if (!WorkQueueMessageReceiverMap::isValidKey(message->messageReceiverName()) || !ThreadMessageReceiverMap::isValidKey(message->messageReceiverName())) {
         RefPtr<Connection> protectedThis(this);
         StringReference messageReceiverNameReference = message->messageReceiverName();
         String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size()));
@@ -683,6 +727,9 @@
     if (dispatchMessageToWorkQueueReceiver(message))
         return;
 
+    if (dispatchMessageToThreadReceiver(message))
+        return;
+
 #if HAVE(QOS_CLASSES)
     if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) {
         pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, Thread::adjustedQOSClass(QOS_CLASS_USER_INTERACTIVE), 0);
@@ -966,6 +1013,23 @@
     return false;
 }
 
+bool Connection::dispatchMessageToThreadReceiver(std::unique_ptr<Decoder>& message)
+{
+    RefPtr<ThreadMessageReceiver> protectedThreadMessageReceiver;
+    {
+        std::lock_guard<Lock> lock(m_threadMessageReceiversLock);
+        protectedThreadMessageReceiver = m_threadMessageReceivers.get(message->messageReceiverName());
+    }
+
+    if (protectedThreadMessageReceiver) {
+        protectedThreadMessageReceiver->dispatchToThread([protectedThis = makeRef(*this), threadMessageReceiver = WTFMove(protectedThreadMessageReceiver), decoder = WTFMove(message)]() mutable {
+            protectedThis->dispatchThreadMessageReceiverMessage(*threadMessageReceiver, *decoder);
+        });
+        return true;
+    }
+    return false;
+}
+
 void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
 {
     ASSERT(RunLoop::isMain());
diff --git a/Source/WebKit/Platform/IPC/Connection.h b/Source/WebKit/Platform/IPC/Connection.h
index d7cb94c..dd91f4f 100644
--- a/Source/WebKit/Platform/IPC/Connection.h
+++ b/Source/WebKit/Platform/IPC/Connection.h
@@ -105,6 +105,11 @@
     class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<WorkQueueMessageReceiver> {
     };
 
+    class ThreadMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<ThreadMessageReceiver> {
+    public:
+        virtual void dispatchToThread(WTF::Function<void()>&&) { };
+    };
+
 #if USE(UNIX_DOMAIN_SOCKETS)
     typedef int Identifier;
     static bool identifierIsValid(Identifier identifier) { return identifier != -1; }
@@ -176,6 +181,9 @@
     void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue&, WorkQueueMessageReceiver*);
     void removeWorkQueueMessageReceiver(StringReference messageReceiverName);
 
+    void addThreadMessageReceiver(StringReference messageReceiverName, ThreadMessageReceiver*);
+    void removeThreadMessageReceiver(StringReference messageReceiverName);
+
     bool open();
     void invalidate();
     void markCurrentlyDispatchedMessageAsInvalid();
@@ -260,12 +268,14 @@
     std::unique_ptr<Decoder> waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption>);
 
     bool dispatchMessageToWorkQueueReceiver(std::unique_ptr<Decoder>&);
+    bool dispatchMessageToThreadReceiver(std::unique_ptr<Decoder>&);
 
     // Called on the connection work queue.
     void processIncomingMessage(std::unique_ptr<Decoder>);
     void processIncomingSyncReply(std::unique_ptr<Decoder>);
 
     void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver&, Decoder&);
+    void dispatchThreadMessageReceiverMessage(ThreadMessageReceiver&, Decoder&);
 
     bool canSendOutgoingMessages() const;
     bool platformCanSendOutgoingMessages() const;
@@ -328,6 +338,10 @@
     using WorkQueueMessageReceiverMap = HashMap<StringReference, std::pair<RefPtr<WorkQueue>, RefPtr<WorkQueueMessageReceiver>>>;
     WorkQueueMessageReceiverMap m_workQueueMessageReceivers;
 
+    Lock m_threadMessageReceiversLock;
+    using ThreadMessageReceiverMap = HashMap<StringReference, RefPtr<ThreadMessageReceiver>>;
+    ThreadMessageReceiverMap m_threadMessageReceivers;
+
     unsigned m_inSendSyncCount;
     unsigned m_inDispatchMessageCount;
     unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount;