summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile.am1
-rw-r--r--test/Makefile.am1
-rw-r--r--wsd/ClientSession.cpp44
-rw-r--r--wsd/ClientSession.hpp39
-rw-r--r--wsd/DocumentBroker.cpp15
-rw-r--r--wsd/PrisonerSession.cpp2
-rw-r--r--wsd/SenderQueue.hpp48
-rw-r--r--wsd/TileCache.cpp12
8 files changed, 122 insertions, 40 deletions
diff --git a/Makefile.am b/Makefile.am
index b1f1932fa6..79120bcb40 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -53,7 +53,6 @@ loolwsd_SOURCES = wsd/Admin.cpp \
wsd/ClientSession.cpp \
wsd/FileServer.cpp \
wsd/PrisonerSession.cpp \
- wsd/SenderQueue.cpp \
wsd/Storage.cpp \
wsd/TileCache.cpp \
$(shared_sources)
diff --git a/test/Makefile.am b/test/Makefile.am
index 9a6d7154a9..992830ec08 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -37,7 +37,6 @@ wsd_sources = \
../common/Session.cpp \
../common/MessageQueue.cpp \
../kit/Kit.cpp \
- ../wsd/SenderQueue.cpp \
../wsd/TileCache.cpp \
../common/Unit.cpp \
../common/Util.cpp
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index f1188b5ac9..c97ef647d3 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -44,9 +44,12 @@ ClientSession::ClientSession(const std::string& id,
_uriPublic(uriPublic),
_isReadOnly(readOnly),
_isDocumentOwner(false),
- _loadPart(-1)
+ _loadPart(-1),
+ _stop(false)
{
Log::info("ClientSession ctor [" + getName() + "].");
+
+ _senderThread = std::thread([this]{ senderThread(); });
}
ClientSession::~ClientSession()
@@ -55,6 +58,13 @@ ClientSession::~ClientSession()
// Release the save-as queue.
_saveAsQueue.put("");
+
+ stop();
+ if (_senderThread.joinable())
+ {
+ _senderThread.join();
+ }
+
}
void ClientSession::bridgePrisonerSession()
@@ -453,4 +463,36 @@ void ClientSession::setReadOnly()
sendTextFrame("perm: readonly");
}
+void ClientSession::senderThread()
+{
+ LOG_DBG(getName() + " SenderThread started");
+
+ while (!stopping())
+ {
+ std::shared_ptr<MessagePayload> item;
+ if (_senderQueue.waitDequeue(item, static_cast<size_t>(POLL_TIMEOUT_MS)))
+ {
+ const std::vector<char>& data = item->data();
+ try
+ {
+ if (item->isBinary())
+ {
+ Session::sendBinaryFrame(data.data(), data.size());
+ }
+ else
+ {
+ Session::sendTextFrame(data.data(), data.size());
+ }
+ }
+ catch (const std::exception& ex)
+ {
+ LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
+ "] to " << getName() << ": " << ex.what());
+ }
+ }
+ }
+
+ LOG_DBG(getName() + " SenderThread finished");
+}
+
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index a96d807154..f6bc8a57e8 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -13,6 +13,7 @@
#include "Session.hpp"
#include "Storage.hpp"
#include "MessageQueue.hpp"
+#include "SenderQueue.hpp"
#include <Poco/URI.h>
@@ -44,6 +45,38 @@ public:
void setDocumentOwner(const bool documentOwner) { _isDocumentOwner = documentOwner; }
bool isDocumentOwner() const { return _isDocumentOwner; }
+ using Session::sendTextFrame;
+
+ bool sendBinaryFrame(const char* buffer, int length) override
+ {
+ auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary);
+ auto& output = payload->data();
+ std::memcpy(output.data(), buffer, length);
+ enqueueSendMessage(payload);
+ return true;
+ }
+
+ bool sendTextFrame(const char* buffer, const int length) override
+ {
+ auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text);
+ auto& output = payload->data();
+ std::memcpy(output.data(), buffer, length);
+ enqueueSendMessage(payload);
+ return true;
+ }
+
+ void enqueueSendMessage(const std::shared_ptr<MessagePayload>& data)
+ {
+ _senderQueue.enqueue(data);
+ }
+
+ bool stopping() const { return _stop || _senderQueue.stopping(); }
+ void stop()
+ {
+ _stop = true;
+ _senderQueue.stop();
+ }
+
/**
* Return the URL of the saved-as document when it's ready. If called
* before it's ready, the call blocks till then.
@@ -95,6 +128,8 @@ private:
/// Eg. in readonly mode only few messages should be allowed
bool filterMessage(const std::string& msg) const;
+ void senderThread();
+
private:
std::weak_ptr<DocumentBroker> _docBroker;
@@ -117,6 +152,10 @@ private:
/// Wopi FileInfo object
std::unique_ptr<WopiStorage::WOPIFileInfo> _wopiFileInfo;
+
+ SenderQueue<std::shared_ptr<MessagePayload>> _senderQueue;
+ std::thread _senderThread;
+ std::atomic<bool> _stop;
};
#endif
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 0470cbbbb9..21c3b4c97b 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -596,9 +596,6 @@ size_t DocumentBroker::addSession(std::shared_ptr<ClientSession>& session)
// Now we are ready to bridge between the kit and client.
session->bridgePrisonerSession();
- // Provision for another thread to service this session.
- SenderThreadPool::instance().incMaxThreadCount();
-
return count;
}
@@ -610,9 +607,6 @@ size_t DocumentBroker::removeSession(const std::string& id)
{
Admin::instance().rmDoc(_docKey, id);
- // Reduce thread provisioning.
- SenderThreadPool::instance().decMaxThreadCount();
-
auto it = _sessions.find(id);
if (it != _sessions.end())
{
@@ -635,13 +629,14 @@ void DocumentBroker::alertAllUsers(const std::string& msg)
{
Util::assertIsLocked(_mutex);
+ auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
+ auto& output = payload->data();
+ std::memcpy(output.data(), msg.data(), msg.size());
+
LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg);
for (auto& it : _sessions)
{
- auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text);
- auto& output = payload->data();
- std::memcpy(output.data(), msg.data(), msg.size());
- SenderQueue::instance().enqueue(it.second, payload);
+ it.second->enqueueSendMessage(payload);
}
}
diff --git a/wsd/PrisonerSession.cpp b/wsd/PrisonerSession.cpp
index 69eafad20b..59b6a43e79 100644
--- a/wsd/PrisonerSession.cpp
+++ b/wsd/PrisonerSession.cpp
@@ -286,7 +286,7 @@ bool PrisonerSession::forwardToPeer(const std::shared_ptr<ClientSession>& client
: MessagePayload::Type::Text);
auto& output = payload->data();
std::memcpy(output.data(), buffer, length);
- SenderQueue::instance().enqueue(clientSession, payload);
+ clientSession->enqueueSendMessage(payload);
return true;
}
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index 3dc2856c5d..647b81ac54 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -10,12 +10,14 @@
#ifndef INCLUDED_SENDERQUEUE_HPP
#define INCLUDED_SENDERQUEUE_HPP
+#include <condition_variable>
#include <deque>
#include <memory>
+#include <mutex>
#include <vector>
#include "common/SigUtil.hpp"
-#include "Session.hpp"
+#include "LOOLWebSocket.hpp"
#include "Log.hpp"
/// The payload type used to send/receive data.
@@ -43,32 +45,38 @@ private:
struct SendItem
{
- std::weak_ptr<::Session> Session;
+ std::weak_ptr<LOOLWebSocket> Socket;
std::shared_ptr<MessagePayload> Data;
+ std::string Meta;
std::chrono::steady_clock::time_point BirthTime;
};
-/// A queue of data to send to certain Sessions.
+/// A queue of data to send to certain Session's WS.
+template <typename Item>
class SenderQueue final
{
public:
- static SenderQueue& instance() { return TheQueue; }
+ SenderQueue() :
+ _stop(false)
+ {
+ }
bool stopping() const { return _stop || TerminationFlag; }
void stop()
{
- _stop = true;
- _cv.notify_all();
+ _stop = true;
+ _cv.notify_all();
}
- size_t enqueue(const std::weak_ptr<Session>& session,
- const std::shared_ptr<MessagePayload>& data)
+ size_t enqueue(const Item& item)
{
- SendItem item = { session, data, std::chrono::steady_clock::now() };
-
std::unique_lock<std::mutex> lock(_mutex);
- _queue.push_back(item);
+ if (!stopping())
+ {
+ _queue.push_back(item);
+ }
+
const size_t queuesize = _queue.size();
lock.unlock();
@@ -76,7 +84,7 @@ public:
return queuesize;
}
- bool waitDequeue(SendItem& item,
+ bool waitDequeue(Item& item,
const size_t timeoutMs = std::numeric_limits<size_t>::max())
{
const auto timeToWait = std::chrono::milliseconds(timeoutMs);
@@ -93,7 +101,7 @@ public:
return true;
}
- LOG_INF("SenderQueue: stopping");
+ LOG_DBG("SenderQueue: stopping");
return false;
}
@@ -109,16 +117,13 @@ public:
private:
mutable std::mutex _mutex;
std::condition_variable _cv;
- std::deque<SendItem> _queue;
+ std::deque<Item> _queue;
std::atomic<bool> _stop;
-
- /// The only SenderQueue instance.
- static SenderQueue TheQueue;
};
/// Pool of sender threads.
/// These are dedicated threads that only dequeue from
-/// the SenderQueue and send to the target Session.
+/// the SenderQueue and send to the target Session's WS.
/// This pool has long-running threads that grow
/// only on congention and shrink otherwise.
class SenderThreadPool final
@@ -141,7 +146,7 @@ public:
{
// Stop us and the queue.
stop();
- SenderQueue::instance().stop();
+ //SenderQueue::instance().stop();
for (const auto& threadData : _threads)
{
@@ -152,8 +157,6 @@ public:
}
}
- static SenderThreadPool& instance() { return ThePool; }
-
void stop() { _stop = true; }
bool stopping() const { return _stop || TerminationFlag; }
@@ -218,9 +221,6 @@ private:
/// How often to do housekeeping when we idle.
static constexpr size_t HousekeepIdleIntervalMs = 60000;
-
- /// The only pool.
- static SenderThreadPool ThePool;
};
#endif
diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp
index 60123a799d..c00cc55b31 100644
--- a/wsd/TileCache.cpp
+++ b/wsd/TileCache.cpp
@@ -183,7 +183,11 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
}
auto& firstSubscriber = tileBeingRendered->_subscribers[0];
- SenderQueue::instance().enqueue(firstSubscriber, payload);
+ auto firstSession = firstSubscriber.lock();
+ if (firstSession)
+ {
+ firstSession->enqueueSendMessage(payload);
+ }
if (subscriberCount > 1)
{
@@ -201,7 +205,11 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const
for (size_t i = 1; i < subscriberCount; ++i)
{
auto& subscriber = tileBeingRendered->_subscribers[i];
- SenderQueue::instance().enqueue(subscriber, payload);
+ auto session = subscriber.lock();
+ if (session)
+ {
+ session->enqueueSendMessage(payload);
+ }
}
}
}