From 49ab3890aa826918568c62f9cda4df023f3dcbb2 Mon Sep 17 00:00:00 2001 From: vaxerski <43317083+vaxerski@users.noreply.github.com> Date: Mon, 3 Oct 2022 20:47:15 +0100 Subject: [PATCH] remove polling from socket2, fully event based --- src/managers/EventManager.cpp | 83 ++++++++++++++++------------------- src/managers/EventManager.hpp | 3 ++ 2 files changed, 42 insertions(+), 44 deletions(-) diff --git a/src/managers/EventManager.cpp b/src/managers/EventManager.cpp index 98dd5049..b5bcf78a 100644 --- a/src/managers/EventManager.cpp +++ b/src/managers/EventManager.cpp @@ -36,19 +36,12 @@ void CEventManager::startThread() { // 10 max queued. listen(SOCKET, 10); - char readBuf[1024] = {0}; - sockaddr_in clientAddress; socklen_t clientSize = sizeof(clientAddress); Debug::log(LOG, "Hypr socket 2 started at %s", socketPath.c_str()); - // set the socket nonblock - int flags = fcntl(SOCKET, F_GETFL, 0); - fcntl(SOCKET, F_SETFL, flags | O_NONBLOCK); - while (1) { - const auto ACCEPTEDCONNECTION = accept(SOCKET, (sockaddr*)&clientAddress, &clientSize); if (ACCEPTEDCONNECTION > 0) { @@ -61,43 +54,7 @@ void CEventManager::startThread() { Debug::log(LOG, "Socket 2 accepted a new client at FD %d", ACCEPTEDCONNECTION); } - // pong if all FDs valid - for (auto it = m_dAcceptedSocketFDs.begin(); it != m_dAcceptedSocketFDs.end();) { - auto sizeRead = recv(*it, &readBuf, 1024, 0); - - if (sizeRead != 0) { - it++; - continue; - } - - // invalid! - Debug::log(LOG, "Removed invalid socket (2) FD: %d", *it); - it = m_dAcceptedSocketFDs.erase(it); - } - - // valid FDs, check the queue - // don't do anything if main thread is writing to the eventqueue - eventQueueMutex.lock(); - - if (m_dQueuedEvents.empty()){ // if queue empty, sleep and ignore - eventQueueMutex.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - continue; - } - - // write all queued events - for (auto& ev : m_dQueuedEvents) { - std::string eventString = (ev.event + ">>" + ev.data).substr(0, 1022) + "\n"; - for (auto& fd : m_dAcceptedSocketFDs) { - write(fd, eventString.c_str(), eventString.length()); - } - } - - m_dQueuedEvents.clear(); - - eventQueueMutex.unlock(); - - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + ensureFDsValid(); } close(SOCKET); @@ -106,6 +63,42 @@ void CEventManager::startThread() { m_tThread.detach(); } +void CEventManager::ensureFDsValid() { + static char readBuf[1024] = {0}; + + // pong if all FDs valid + for (auto it = m_dAcceptedSocketFDs.begin(); it != m_dAcceptedSocketFDs.end();) { + auto sizeRead = recv(*it, &readBuf, 1024, 0); + + if (sizeRead != 0) { + it++; + continue; + } + + // invalid! + Debug::log(LOG, "Removed invalid socket (2) FD: %d", *it); + it = m_dAcceptedSocketFDs.erase(it); + } +} + +void CEventManager::flushEvents() { + + ensureFDsValid(); + + eventQueueMutex.lock(); + + for (auto& ev : m_dQueuedEvents) { + std::string eventString = (ev.event + ">>" + ev.data).substr(0, 1022) + "\n"; + for (auto& fd : m_dAcceptedSocketFDs) { + write(fd, eventString.c_str(), eventString.length()); + } + } + + m_dQueuedEvents.clear(); + + eventQueueMutex.unlock(); +} + void CEventManager::postEvent(const SHyprIPCEvent event, bool force) { if ((m_bIgnoreEvents && !force) || g_pCompositor->m_bIsShuttingDown) { @@ -117,5 +110,7 @@ void CEventManager::postEvent(const SHyprIPCEvent event, bool force) { eventQueueMutex.lock(); m_dQueuedEvents.push_back(ev); eventQueueMutex.unlock(); + + flushEvents(); }, event).detach(); } diff --git a/src/managers/EventManager.hpp b/src/managers/EventManager.hpp index c386a260..c5755ae2 100644 --- a/src/managers/EventManager.hpp +++ b/src/managers/EventManager.hpp @@ -25,6 +25,9 @@ public: private: + void flushEvents(); + void ensureFDsValid(); + std::mutex eventQueueMutex; std::deque m_dQueuedEvents;