blob: 2eb4f7d8c81a4d4dc7a3f38adfabf6d099ec55ae [file] [log] [blame]
/*
* Copyright (C) 2009 Brent Fulgham. All rights reserved.
* Copyright (C) 2009 Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include "SocketStreamHandleImpl.h"
#if USE(CURL)
#include "Logging.h"
#include "SocketStreamHandleClient.h"
#include "URL.h"
#include <mutex>
#include <wtf/Lock.h>
#include <wtf/MainThread.h>
#include <wtf/text/CString.h>
namespace WebCore {
static UniqueArray<char> createCopy(const char* data, int length)
{
auto copy = makeUniqueArray<char>(length);
memcpy(copy.get(), data, length);
return copy;
}
SocketStreamHandleImpl::SocketStreamHandleImpl(const URL& url, SocketStreamHandleClient& client)
: SocketStreamHandle(url, client)
{
LOG(Network, "SocketStreamHandle %p new client %p", this, &m_client);
ASSERT(isMainThread());
startThread();
}
SocketStreamHandleImpl::~SocketStreamHandleImpl()
{
LOG(Network, "SocketStreamHandle %p delete", this);
ASSERT(!m_workerThread);
}
std::optional<size_t> SocketStreamHandleImpl::platformSendInternal(const uint8_t* data, size_t length)
{
LOG(Network, "SocketStreamHandle %p platformSend", this);
ASSERT(isMainThread());
startThread();
auto copy = createCopy(reinterpret_cast<const char*>(data), length);
std::lock_guard<Lock> lock(m_mutexSend);
m_sendData.append(SocketData { WTFMove(copy), length });
return length;
}
void SocketStreamHandleImpl::platformClose()
{
LOG(Network, "SocketStreamHandle %p platformClose", this);
ASSERT(isMainThread());
if (m_closed)
return;
m_closed = true;
stopThread();
m_client.didCloseSocketStream(*this);
}
bool SocketStreamHandleImpl::readData(CURL* curlHandle)
{
ASSERT(!isMainThread());
const size_t bufferSize = 1024;
auto data = makeUniqueArray<char>(bufferSize);
size_t bytesRead = 0;
CURLcode ret = curl_easy_recv(curlHandle, data.get(), bufferSize, &bytesRead);
if (ret == CURLE_OK) {
m_mutexReceive.lock();
m_receiveData.append(SocketData { WTFMove(data), bytesRead });
m_mutexReceive.unlock();
ref();
callOnMainThread([this] {
didReceiveData();
deref();
});
return true;
}
if (ret == CURLE_AGAIN)
return true;
return false;
}
bool SocketStreamHandleImpl::sendData(CURL* curlHandle)
{
ASSERT(!isMainThread());
while (true) {
m_mutexSend.lock();
if (!m_sendData.size()) {
m_mutexSend.unlock();
break;
}
auto sendData = m_sendData.takeFirst();
m_mutexSend.unlock();
size_t totalBytesSent = 0;
while (totalBytesSent < sendData.size) {
size_t bytesSent = 0;
CURLcode ret = curl_easy_send(curlHandle, sendData.data.get() + totalBytesSent, sendData.size - totalBytesSent, &bytesSent);
if (ret == CURLE_OK)
totalBytesSent += bytesSent;
else
break;
}
// Insert remaining data into send queue.
if (totalBytesSent < sendData.size) {
const size_t restLength = sendData.size - totalBytesSent;
auto copy = createCopy(sendData.data.get() + totalBytesSent, restLength);
std::lock_guard<Lock> lock(m_mutexSend);
m_sendData.prepend(SocketData { WTFMove(copy), restLength });
return false;
}
}
return true;
}
bool SocketStreamHandleImpl::waitForAvailableData(CURL* curlHandle, Seconds selectTimeout)
{
ASSERT(!isMainThread());
int64_t usec = selectTimeout.microsecondsAs<int64_t>();
struct timeval timeout;
if (usec <= 0) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
} else {
timeout.tv_sec = usec / 1000000;
timeout.tv_usec = usec % 1000000;
}
long socket;
if (curl_easy_getinfo(curlHandle, CURLINFO_LASTSOCKET, &socket) != CURLE_OK)
return false;
fd_set fdread;
FD_ZERO(&fdread);
FD_SET(socket, &fdread);
int rc = ::select(0, &fdread, nullptr, nullptr, &timeout);
return rc == 1;
}
void SocketStreamHandleImpl::startThread()
{
ASSERT(isMainThread());
if (m_workerThread)
return;
ref(); // stopThread() will call deref().
m_workerThread = Thread::create("WebSocket thread", [this] {
ASSERT(!isMainThread());
CURL* curlHandle = curl_easy_init();
if (!curlHandle)
return;
curl_easy_setopt(curlHandle, CURLOPT_URL, m_url.host().utf8().data());
if (m_url.port())
curl_easy_setopt(curlHandle, CURLOPT_PORT, m_url.port().value());
curl_easy_setopt(curlHandle, CURLOPT_CONNECT_ONLY, 1L);
// Connect to host
if (curl_easy_perform(curlHandle) != CURLE_OK)
return;
ref();
callOnMainThread([this] {
// Check reference count to fix a crash.
// When the call is invoked on the main thread after all other references are released, the SocketStreamClient
// is already deleted. Accessing the SocketStreamClient in didOpenSocket() will then cause a crash.
if (refCount() > 1)
didOpenSocket();
deref();
});
while (!m_stopThread) {
// Send queued data
sendData(curlHandle);
// Wait until socket has available data
if (waitForAvailableData(curlHandle, 20_ms))
readData(curlHandle);
}
curl_easy_cleanup(curlHandle);
});
}
void SocketStreamHandleImpl::stopThread()
{
ASSERT(isMainThread());
if (!m_workerThread)
return;
m_stopThread = true;
m_workerThread->waitForCompletion();
m_workerThread = nullptr;
deref();
}
void SocketStreamHandleImpl::didReceiveData()
{
ASSERT(isMainThread());
m_mutexReceive.lock();
auto receiveData = WTFMove(m_receiveData);
m_mutexReceive.unlock();
for (auto& socketData : receiveData) {
if (socketData.size > 0) {
if (state() == Open)
m_client.didReceiveSocketStreamData(*this, socketData.data.get(), socketData.size);
} else if (!m_closed)
platformClose();
}
}
void SocketStreamHandleImpl::didOpenSocket()
{
ASSERT(isMainThread());
m_state = Open;
m_client.didOpenSocketStream(*this);
}
} // namespace WebCore
#endif