blob: 13a832c2669b4ce8cbebf3134ba089d902dfa949 [file] [log] [blame]
/*
* Copyright (C) 2009 Brent Fulgham. All rights reserved.
* Copyright (C) 2009 Google Inc. All rights reserved.
* Copyright (C) 2018 Sony Interactive Entertainment Inc.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include "SocketStreamHandleImpl.h"
#if USE(CURL)
#include "DeprecatedGlobalSettings.h"
#include "Logging.h"
#include "SocketStreamError.h"
#include "SocketStreamHandleClient.h"
#include "StorageSessionProvider.h"
#include <wtf/MainThread.h>
#include <wtf/URL.h>
#include <wtf/text/CString.h>
namespace WebCore {
SocketStreamHandleImpl::SocketStreamHandleImpl(const URL& url, SocketStreamHandleClient& client, const StorageSessionProvider* provider)
: SocketStreamHandle(url, client)
, m_storageSessionProvider(provider)
{
LOG(Network, "SocketStreamHandle %p new client %p", this, &m_client);
ASSERT(isMainThread());
// FIXME: Using DeprecatedGlobalSettings from here is a layering violation.
if (m_url.protocolIs("wss") && DeprecatedGlobalSettings::allowsAnySSLCertificate())
CurlContext::singleton().sslHandle().setIgnoreSSLErrors(true);
m_workerThread = Thread::create("WebSocket thread", [this, protectedThis = makeRef(*this), url = url.isolatedCopy()] {
threadEntryPoint(url);
});
}
SocketStreamHandleImpl::~SocketStreamHandleImpl()
{
LOG(Network, "SocketStreamHandle %p delete", this);
stopThread();
}
Optional<size_t> SocketStreamHandleImpl::platformSendInternal(const uint8_t* data, size_t length)
{
LOG(Network, "SocketStreamHandle %p platformSend", this);
ASSERT(isMainThread());
if (m_hasPendingWriteData)
return 0;
m_hasPendingWriteData = true;
auto writeBuffer = makeUniqueArray<uint8_t>(length);
memcpy(writeBuffer.get(), data, length);
callOnWorkerThread([this, writeBuffer = WTFMove(writeBuffer), writeBufferSize = length]() mutable {
ASSERT(!isMainThread());
m_writeBuffer = WTFMove(writeBuffer);
m_writeBufferSize = writeBufferSize;
m_writeBufferOffset = 0;
});
return length;
}
void SocketStreamHandleImpl::platformClose()
{
LOG(Network, "SocketStreamHandle %p platformClose", this);
ASSERT(isMainThread());
if (m_state == Closed)
return;
m_state = Closed;
stopThread();
m_client.didCloseSocketStream(*this);
}
void SocketStreamHandleImpl::threadEntryPoint(const URL& url)
{
ASSERT(!isMainThread());
CurlSocketHandle socket { url, [this](CURLcode errorCode) {
handleError(errorCode);
}};
// Connect to host
if (!socket.connect())
return;
callOnMainThread([this, protectedThis = makeRef(*this)] {
if (m_state == Connecting) {
m_state = Open;
m_client.didOpenSocketStream(*this);
}
});
while (m_running) {
executeTasks();
auto result = socket.wait(20_ms, m_writeBuffer.get());
if (!result)
continue;
// These logic only run when there's data waiting.
if (result->writable && m_running) {
auto bytesSent = socket.send(m_writeBuffer.get() + m_writeBufferOffset, m_writeBufferSize - m_writeBufferOffset);
m_writeBufferOffset += bytesSent;
if (m_writeBufferSize <= m_writeBufferOffset) {
m_writeBuffer = nullptr;
m_writeBufferSize = 0;
m_writeBufferOffset = 0;
callOnMainThread([this, protectedThis = makeRef(*this)] {
m_hasPendingWriteData = false;
sendPendingData();
});
}
}
if (result->readable && m_running) {
auto readBuffer = makeUniqueArray<uint8_t>(kReadBufferSize);
auto bytesRead = socket.receive(readBuffer.get(), kReadBufferSize);
// `nullopt` result means nothing to handle at this moment.
if (!bytesRead)
continue;
// 0 bytes indicates a closed connection.
if (!*bytesRead) {
m_running = false;
callOnMainThread([this, protectedThis = makeRef(*this)] {
close();
});
break;
}
callOnMainThread([this, protectedThis = makeRef(*this), buffer = WTFMove(readBuffer), size = *bytesRead ] {
if (m_state == Open)
m_client.didReceiveSocketStreamData(*this, reinterpret_cast<const char*>(buffer.get()), size);
});
}
}
m_writeBuffer = nullptr;
}
void SocketStreamHandleImpl::handleError(CURLcode errorCode)
{
m_running = false;
callOnMainThread([this, protectedThis = makeRef(*this), errorCode, localizedDescription = CurlHandle::errorDescription(errorCode).isolatedCopy()] {
if (m_state == Closed)
return;
if (errorCode == CURLE_RECV_ERROR)
m_client.didFailToReceiveSocketStreamData(*this);
else
m_client.didFailSocketStream(*this, SocketStreamError(static_cast<int>(errorCode), { }, localizedDescription));
});
}
void SocketStreamHandleImpl::stopThread()
{
ASSERT(isMainThread());
m_running = false;
if (m_workerThread) {
m_workerThread->waitForCompletion();
m_workerThread = nullptr;
}
}
void SocketStreamHandleImpl::callOnWorkerThread(Function<void()>&& task)
{
ASSERT(isMainThread());
m_taskQueue.append(makeUnique<Function<void()>>(WTFMove(task)));
}
void SocketStreamHandleImpl::executeTasks()
{
ASSERT(!isMainThread());
auto tasks = m_taskQueue.takeAllMessages();
for (auto& task : tasks)
(*task)();
}
} // namespace WebCore
#endif