diff options
author | Ashod Nakashian <ashod.nakashian@collabora.co.uk> | 2016-12-13 19:20:05 -0500 |
---|---|---|
committer | Ashod Nakashian <ashnakash@gmail.com> | 2016-12-14 04:21:20 +0000 |
commit | fe38e0e1e67c64bc7d91a5ac20c5bf05d4282928 (patch) | |
tree | 3d503d27a89cdf3dcbd6a3382893b30d7b660989 | |
parent | loolwsd: simplify checking for closed socket (diff) | |
download | online-fe38e0e1e67c64bc7d91a5ac20c5bf05d4282928.tar.gz online-fe38e0e1e67c64bc7d91a5ac20c5bf05d4282928.zip |
loolwsd: per-socket dedicated sending thread
To avoid degrading performance for everyone
because of a single slow/bad connection, we
send data to clients each in its own thread.
Change-Id: I6f980c25a404c4d05bcdb1979849ea3d2776c7b9
Reviewed-on: https://gerrit.libreoffice.org/31984
Reviewed-by: Ashod Nakashian <ashnakash@gmail.com>
Tested-by: Ashod Nakashian <ashnakash@gmail.com>
-rw-r--r-- | Makefile.am | 1 | ||||
-rw-r--r-- | test/Makefile.am | 1 | ||||
-rw-r--r-- | wsd/ClientSession.cpp | 44 | ||||
-rw-r--r-- | wsd/ClientSession.hpp | 39 | ||||
-rw-r--r-- | wsd/DocumentBroker.cpp | 15 | ||||
-rw-r--r-- | wsd/PrisonerSession.cpp | 2 | ||||
-rw-r--r-- | wsd/SenderQueue.hpp | 48 | ||||
-rw-r--r-- | wsd/TileCache.cpp | 12 |
8 files changed, 122 insertions, 40 deletions
diff --git a/Makefile.am b/Makefile.am index b1f1932fa6..79120bcb40 100644 --- a/Makefile.am +++ b/Makefile.am @@ -53,7 +53,6 @@ loolwsd_SOURCES = wsd/Admin.cpp \ wsd/ClientSession.cpp \ wsd/FileServer.cpp \ wsd/PrisonerSession.cpp \ - wsd/SenderQueue.cpp \ wsd/Storage.cpp \ wsd/TileCache.cpp \ $(shared_sources) diff --git a/test/Makefile.am b/test/Makefile.am index 9a6d7154a9..992830ec08 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -37,7 +37,6 @@ wsd_sources = \ ../common/Session.cpp \ ../common/MessageQueue.cpp \ ../kit/Kit.cpp \ - ../wsd/SenderQueue.cpp \ ../wsd/TileCache.cpp \ ../common/Unit.cpp \ ../common/Util.cpp diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp index f1188b5ac9..c97ef647d3 100644 --- a/wsd/ClientSession.cpp +++ b/wsd/ClientSession.cpp @@ -44,9 +44,12 @@ ClientSession::ClientSession(const std::string& id, _uriPublic(uriPublic), _isReadOnly(readOnly), _isDocumentOwner(false), - _loadPart(-1) + _loadPart(-1), + _stop(false) { Log::info("ClientSession ctor [" + getName() + "]."); + + _senderThread = std::thread([this]{ senderThread(); }); } ClientSession::~ClientSession() @@ -55,6 +58,13 @@ ClientSession::~ClientSession() // Release the save-as queue. _saveAsQueue.put(""); + + stop(); + if (_senderThread.joinable()) + { + _senderThread.join(); + } + } void ClientSession::bridgePrisonerSession() @@ -453,4 +463,36 @@ void ClientSession::setReadOnly() sendTextFrame("perm: readonly"); } +void ClientSession::senderThread() +{ + LOG_DBG(getName() + " SenderThread started"); + + while (!stopping()) + { + std::shared_ptr<MessagePayload> item; + if (_senderQueue.waitDequeue(item, static_cast<size_t>(POLL_TIMEOUT_MS))) + { + const std::vector<char>& data = item->data(); + try + { + if (item->isBinary()) + { + Session::sendBinaryFrame(data.data(), data.size()); + } + else + { + Session::sendTextFrame(data.data(), data.size()); + } + } + catch (const std::exception& ex) + { + LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) << + "] to " << getName() << ": " << ex.what()); + } + } + } + + LOG_DBG(getName() + " SenderThread finished"); +} + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp index a96d807154..f6bc8a57e8 100644 --- a/wsd/ClientSession.hpp +++ b/wsd/ClientSession.hpp @@ -13,6 +13,7 @@ #include "Session.hpp" #include "Storage.hpp" #include "MessageQueue.hpp" +#include "SenderQueue.hpp" #include <Poco/URI.h> @@ -44,6 +45,38 @@ public: void setDocumentOwner(const bool documentOwner) { _isDocumentOwner = documentOwner; } bool isDocumentOwner() const { return _isDocumentOwner; } + using Session::sendTextFrame; + + bool sendBinaryFrame(const char* buffer, int length) override + { + auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Binary); + auto& output = payload->data(); + std::memcpy(output.data(), buffer, length); + enqueueSendMessage(payload); + return true; + } + + bool sendTextFrame(const char* buffer, const int length) override + { + auto payload = std::make_shared<MessagePayload>(length, MessagePayload::Type::Text); + auto& output = payload->data(); + std::memcpy(output.data(), buffer, length); + enqueueSendMessage(payload); + return true; + } + + void enqueueSendMessage(const std::shared_ptr<MessagePayload>& data) + { + _senderQueue.enqueue(data); + } + + bool stopping() const { return _stop || _senderQueue.stopping(); } + void stop() + { + _stop = true; + _senderQueue.stop(); + } + /** * Return the URL of the saved-as document when it's ready. If called * before it's ready, the call blocks till then. @@ -95,6 +128,8 @@ private: /// Eg. in readonly mode only few messages should be allowed bool filterMessage(const std::string& msg) const; + void senderThread(); + private: std::weak_ptr<DocumentBroker> _docBroker; @@ -117,6 +152,10 @@ private: /// Wopi FileInfo object std::unique_ptr<WopiStorage::WOPIFileInfo> _wopiFileInfo; + + SenderQueue<std::shared_ptr<MessagePayload>> _senderQueue; + std::thread _senderThread; + std::atomic<bool> _stop; }; #endif diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp index 0470cbbbb9..21c3b4c97b 100644 --- a/wsd/DocumentBroker.cpp +++ b/wsd/DocumentBroker.cpp @@ -596,9 +596,6 @@ size_t DocumentBroker::addSession(std::shared_ptr<ClientSession>& session) // Now we are ready to bridge between the kit and client. session->bridgePrisonerSession(); - // Provision for another thread to service this session. - SenderThreadPool::instance().incMaxThreadCount(); - return count; } @@ -610,9 +607,6 @@ size_t DocumentBroker::removeSession(const std::string& id) { Admin::instance().rmDoc(_docKey, id); - // Reduce thread provisioning. - SenderThreadPool::instance().decMaxThreadCount(); - auto it = _sessions.find(id); if (it != _sessions.end()) { @@ -635,13 +629,14 @@ void DocumentBroker::alertAllUsers(const std::string& msg) { Util::assertIsLocked(_mutex); + auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text); + auto& output = payload->data(); + std::memcpy(output.data(), msg.data(), msg.size()); + LOG_DBG("Alerting all users of [" << _docKey << "]: " << msg); for (auto& it : _sessions) { - auto payload = std::make_shared<MessagePayload>(msg.size(), MessagePayload::Type::Text); - auto& output = payload->data(); - std::memcpy(output.data(), msg.data(), msg.size()); - SenderQueue::instance().enqueue(it.second, payload); + it.second->enqueueSendMessage(payload); } } diff --git a/wsd/PrisonerSession.cpp b/wsd/PrisonerSession.cpp index 69eafad20b..59b6a43e79 100644 --- a/wsd/PrisonerSession.cpp +++ b/wsd/PrisonerSession.cpp @@ -286,7 +286,7 @@ bool PrisonerSession::forwardToPeer(const std::shared_ptr<ClientSession>& client : MessagePayload::Type::Text); auto& output = payload->data(); std::memcpy(output.data(), buffer, length); - SenderQueue::instance().enqueue(clientSession, payload); + clientSession->enqueueSendMessage(payload); return true; } diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp index 3dc2856c5d..647b81ac54 100644 --- a/wsd/SenderQueue.hpp +++ b/wsd/SenderQueue.hpp @@ -10,12 +10,14 @@ #ifndef INCLUDED_SENDERQUEUE_HPP #define INCLUDED_SENDERQUEUE_HPP +#include <condition_variable> #include <deque> #include <memory> +#include <mutex> #include <vector> #include "common/SigUtil.hpp" -#include "Session.hpp" +#include "LOOLWebSocket.hpp" #include "Log.hpp" /// The payload type used to send/receive data. @@ -43,32 +45,38 @@ private: struct SendItem { - std::weak_ptr<::Session> Session; + std::weak_ptr<LOOLWebSocket> Socket; std::shared_ptr<MessagePayload> Data; + std::string Meta; std::chrono::steady_clock::time_point BirthTime; }; -/// A queue of data to send to certain Sessions. +/// A queue of data to send to certain Session's WS. +template <typename Item> class SenderQueue final { public: - static SenderQueue& instance() { return TheQueue; } + SenderQueue() : + _stop(false) + { + } bool stopping() const { return _stop || TerminationFlag; } void stop() { - _stop = true; - _cv.notify_all(); + _stop = true; + _cv.notify_all(); } - size_t enqueue(const std::weak_ptr<Session>& session, - const std::shared_ptr<MessagePayload>& data) + size_t enqueue(const Item& item) { - SendItem item = { session, data, std::chrono::steady_clock::now() }; - std::unique_lock<std::mutex> lock(_mutex); - _queue.push_back(item); + if (!stopping()) + { + _queue.push_back(item); + } + const size_t queuesize = _queue.size(); lock.unlock(); @@ -76,7 +84,7 @@ public: return queuesize; } - bool waitDequeue(SendItem& item, + bool waitDequeue(Item& item, const size_t timeoutMs = std::numeric_limits<size_t>::max()) { const auto timeToWait = std::chrono::milliseconds(timeoutMs); @@ -93,7 +101,7 @@ public: return true; } - LOG_INF("SenderQueue: stopping"); + LOG_DBG("SenderQueue: stopping"); return false; } @@ -109,16 +117,13 @@ public: private: mutable std::mutex _mutex; std::condition_variable _cv; - std::deque<SendItem> _queue; + std::deque<Item> _queue; std::atomic<bool> _stop; - - /// The only SenderQueue instance. - static SenderQueue TheQueue; }; /// Pool of sender threads. /// These are dedicated threads that only dequeue from -/// the SenderQueue and send to the target Session. +/// the SenderQueue and send to the target Session's WS. /// This pool has long-running threads that grow /// only on congention and shrink otherwise. class SenderThreadPool final @@ -141,7 +146,7 @@ public: { // Stop us and the queue. stop(); - SenderQueue::instance().stop(); + //SenderQueue::instance().stop(); for (const auto& threadData : _threads) { @@ -152,8 +157,6 @@ public: } } - static SenderThreadPool& instance() { return ThePool; } - void stop() { _stop = true; } bool stopping() const { return _stop || TerminationFlag; } @@ -218,9 +221,6 @@ private: /// How often to do housekeeping when we idle. static constexpr size_t HousekeepIdleIntervalMs = 60000; - - /// The only pool. - static SenderThreadPool ThePool; }; #endif diff --git a/wsd/TileCache.cpp b/wsd/TileCache.cpp index 60123a799d..c00cc55b31 100644 --- a/wsd/TileCache.cpp +++ b/wsd/TileCache.cpp @@ -183,7 +183,11 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const } auto& firstSubscriber = tileBeingRendered->_subscribers[0]; - SenderQueue::instance().enqueue(firstSubscriber, payload); + auto firstSession = firstSubscriber.lock(); + if (firstSession) + { + firstSession->enqueueSendMessage(payload); + } if (subscriberCount > 1) { @@ -201,7 +205,11 @@ void TileCache::saveTileAndNotify(const TileDesc& tile, const char *data, const for (size_t i = 1; i < subscriberCount; ++i) { auto& subscriber = tileBeingRendered->_subscribers[i]; - SenderQueue::instance().enqueue(subscriber, payload); + auto session = subscriber.lock(); + if (session) + { + session->enqueueSendMessage(payload); + } } } } |