From 2f48e655037411a81669ac8f84dd90bcb6c0c00f Mon Sep 17 00:00:00 2001 From: vaxerski <43317083+vaxerski@users.noreply.github.com> Date: Sat, 16 Sep 2023 18:22:53 +0100 Subject: [PATCH] core: rework event loop for polling and C++-style awaits Stops the event loop from checking all the time in favor of locks and polling. --- src/core/PortalManager.cpp | 149 ++++++++++++++++++++++++++++++++----- src/core/PortalManager.hpp | 23 +++++- src/helpers/Timer.cpp | 8 ++ src/helpers/Timer.hpp | 2 + src/portals/Screencopy.cpp | 3 +- 5 files changed, 161 insertions(+), 24 deletions(-) diff --git a/src/core/PortalManager.cpp b/src/core/PortalManager.cpp index 74b08ad..d56ebe0 100644 --- a/src/core/PortalManager.cpp +++ b/src/core/PortalManager.cpp @@ -290,16 +290,125 @@ void CPortalManager::init() { wl_display_roundtrip(m_sWaylandConnection.display); - while (1) { - // dbus events + startEventLoop(); +} + +void CPortalManager::startEventLoop() { + + pollfd pollfds[] = { + { + .fd = m_pConnection->getEventLoopPollData().fd, + .events = POLLIN, + }, + { + .fd = wl_display_get_fd(m_sWaylandConnection.display), + .events = POLLIN, + }, + { + .fd = pw_loop_get_fd(m_sPipewire.loop), + .events = POLLIN, + }, + }; + + std::thread pollThr([this, &pollfds]() { + while (1) { + int ret = poll(pollfds, 3, -1); + if (ret < 0) { + Debug::log(CRIT, "[core] Polling fds failed with {}", strerror(errno)); + exit(1); + } + + for (size_t i = 0; i < 3; ++i) { + if (pollfds[0].revents & POLLHUP) { + Debug::log(CRIT, "[core] Disconnected from pollfd id {}", i); + exit(1); + } + } + + { + Debug::log(TRACE, "[core] got poll event"); + std::lock_guard lg(m_sEventLoopInternals.loopRequestMutex); + m_sEventLoopInternals.shouldProcess = true; + m_sEventLoopInternals.loopSignal.notify_all(); + } + } + }); + + m_sTimersThread.thread = std::make_unique([this] { + while (1) { + std::unique_lock lk(m_sTimersThread.loopMutex); + + // find nearest timer ms + m_mEventLock.lock(); + float nearest = 60000; /* reasonable timeout */ + for (auto& t : m_sTimersThread.timers) { + float until = t->duration() - t->passedMs(); + if (until < nearest) + nearest = until; + } + m_mEventLock.unlock(); + + m_sTimersThread.loopSignal.wait_for(lk, std::chrono::milliseconds((int)nearest), [this] { return m_sTimersThread.shouldProcess; }); + m_sTimersThread.shouldProcess = false; + + // awakened. Check if any timers passed + m_mEventLock.lock(); + bool notify = false; + for (auto& t : m_sTimersThread.timers) { + if (t->passed()) { + Debug::log(TRACE, "[core] got timer event"); + notify = true; + break; + } + } + m_mEventLock.unlock(); + + if (notify) { + std::lock_guard lg(m_sEventLoopInternals.loopRequestMutex); + m_sEventLoopInternals.shouldProcess = true; + m_sEventLoopInternals.loopSignal.notify_all(); + } + } + }); + + while (1) { // dbus events + // wait for being awakened + m_sEventLoopInternals.loopRequestMutex.unlock(); // unlock, we are ready to take events + + std::unique_lock lk(m_sEventLoopInternals.loopMutex); + if (m_sEventLoopInternals.shouldProcess == false) // avoid a lock if a thread managed to request something already since we .unlock()ed + m_sEventLoopInternals.loopSignal.wait(lk, [this] { return m_sEventLoopInternals.shouldProcess == true; }); // wait for events + + m_sEventLoopInternals.loopRequestMutex.lock(); // lock incoming events + + m_sEventLoopInternals.shouldProcess = false; + m_mEventLock.lock(); - while (m_pConnection->processPendingRequest()) { - ; + if (pollfds[0].revents & POLLIN /* dbus */) { + while (m_pConnection->processPendingRequest()) { + ; + } + } + + if (pollfds[1].revents & POLLIN /* wl */) { + wl_display_flush(m_sWaylandConnection.display); + if (wl_display_prepare_read(m_sWaylandConnection.display) == 0) { + wl_display_read_events(m_sWaylandConnection.display); + wl_display_dispatch_pending(m_sWaylandConnection.display); + } else { + wl_display_dispatch(m_sWaylandConnection.display); + } + } + + if (pollfds[2].revents & POLLIN /* pw */) { + while (pw_loop_iterate(m_sPipewire.loop, 0) != 0) { + ; + } } std::vector toRemove; - for (auto& t : m_vTimers) { + for (auto& t : m_sTimersThread.timers) { if (t->passed()) { t->m_fnCallback(); toRemove.emplace_back(t.get()); @@ -307,26 +416,21 @@ void CPortalManager::init() { } } - while (pw_loop_iterate(m_sPipewire.loop, 0) != 0) { - ; - } - - wl_display_flush(m_sWaylandConnection.display); - if (wl_display_prepare_read(m_sWaylandConnection.display) == 0) { - wl_display_read_events(m_sWaylandConnection.display); - wl_display_dispatch_pending(m_sWaylandConnection.display); - } else { - wl_display_dispatch(m_sWaylandConnection.display); - } + // finalize wayland dispatching. Dispatch pending on the queue + int ret = 0; + do { + ret = wl_display_dispatch_pending(m_sWaylandConnection.display); + wl_display_flush(m_sWaylandConnection.display); + } while (ret > 0); if (!toRemove.empty()) - std::erase_if(m_vTimers, + std::erase_if(m_sTimersThread.timers, [&](const auto& t) { return std::find_if(toRemove.begin(), toRemove.end(), [&](const auto& other) { return other == t.get(); }) != toRemove.end(); }); m_mEventLock.unlock(); - - std::this_thread::sleep_for(std::chrono::milliseconds(1)); } + + m_sTimersThread.thread.release(); } sdbus::IConnection* CPortalManager::getConnection() { @@ -382,3 +486,10 @@ gbm_device* CPortalManager::createGBMDevice(drmDevice* dev) { free(renderNode); return gbm_create_device(fd); } + +void CPortalManager::addTimer(const CTimer& timer) { + Debug::log(TRACE, "[core] adding timer for {}ms", timer.duration()); + m_sTimersThread.timers.emplace_back(std::make_unique(timer)); + m_sTimersThread.shouldProcess = true; + m_sTimersThread.loopSignal.notify_all(); +} diff --git a/src/core/PortalManager.hpp b/src/core/PortalManager.hpp index fc3e15f..439d9fb 100644 --- a/src/core/PortalManager.hpp +++ b/src/core/PortalManager.hpp @@ -65,13 +65,30 @@ class CPortalManager { } dma; } m_sWaylandConnection; - std::vector m_vDMABUFMods; + std::vector m_vDMABUFMods; - std::vector> m_vTimers; + void addTimer(const CTimer& timer); - gbm_device* createGBMDevice(drmDevice* dev); + gbm_device* createGBMDevice(drmDevice* dev); private: + void startEventLoop(); + + struct { + std::condition_variable loopSignal; + std::mutex loopMutex; + std::atomic shouldProcess = false; + std::mutex loopRequestMutex; + } m_sEventLoopInternals; + + struct { + std::condition_variable loopSignal; + std::mutex loopMutex; + bool shouldProcess = false; + std::vector> timers; + std::unique_ptr thread; + } m_sTimersThread; + std::unique_ptr m_pConnection; std::vector> m_vOutputs; diff --git a/src/helpers/Timer.cpp b/src/helpers/Timer.cpp index 0857e66..e2d7d84 100644 --- a/src/helpers/Timer.cpp +++ b/src/helpers/Timer.cpp @@ -8,4 +8,12 @@ CTimer::CTimer(float ms, std::function callback) { bool CTimer::passed() const { return std::chrono::high_resolution_clock::now() > (m_tStart + std::chrono::milliseconds((uint64_t)m_fDuration)); +} + +float CTimer::passedMs() const { + return std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - m_tStart).count(); +} + +float CTimer::duration() const { + return m_fDuration; } \ No newline at end of file diff --git a/src/helpers/Timer.hpp b/src/helpers/Timer.hpp index b5832a4..3a43fd2 100644 --- a/src/helpers/Timer.hpp +++ b/src/helpers/Timer.hpp @@ -8,6 +8,8 @@ class CTimer { CTimer(float ms, std::function callback); bool passed() const; + float passedMs() const; + float duration() const; std::function m_fnCallback; diff --git a/src/portals/Screencopy.cpp b/src/portals/Screencopy.cpp index 1410b20..c5b234c 100644 --- a/src/portals/Screencopy.cpp +++ b/src/portals/Screencopy.cpp @@ -654,8 +654,7 @@ void CScreencopyPortal::queueNextShareFrame(CScreencopyPortal::SSession* pSessio if (PSTREAM && !PSTREAM->streamState) return; - g_pPortalManager->m_vTimers.emplace_back( - std::make_unique(1000.0 / pSession->sharingData.framerate, [pSession]() { g_pPortalManager->m_sPortals.screencopy->startFrameCopy(pSession); })); + g_pPortalManager->addTimer({1000.0 / pSession->sharingData.framerate, [pSession]() { g_pPortalManager->m_sPortals.screencopy->startFrameCopy(pSession); }}); } bool CScreencopyPortal::hasToplevelCapabilities() { return m_sState.toplevel;