core: rework event loop for polling and C++-style awaits

Stops the event loop from checking all the time in favor of locks and polling.
This commit is contained in:
vaxerski 2023-09-16 18:22:53 +01:00
parent 8f45a64350
commit 2f48e65503
5 changed files with 161 additions and 24 deletions

View file

@ -290,16 +290,125 @@ void CPortalManager::init() {
wl_display_roundtrip(m_sWaylandConnection.display); wl_display_roundtrip(m_sWaylandConnection.display);
while (1) { startEventLoop();
// dbus events }
void CPortalManager::startEventLoop() {
pollfd pollfds[] = {
{
.fd = m_pConnection->getEventLoopPollData().fd,
.events = POLLIN,
},
{
.fd = wl_display_get_fd(m_sWaylandConnection.display),
.events = POLLIN,
},
{
.fd = pw_loop_get_fd(m_sPipewire.loop),
.events = POLLIN,
},
};
std::thread pollThr([this, &pollfds]() {
while (1) {
int ret = poll(pollfds, 3, -1);
if (ret < 0) {
Debug::log(CRIT, "[core] Polling fds failed with {}", strerror(errno));
exit(1);
}
for (size_t i = 0; i < 3; ++i) {
if (pollfds[0].revents & POLLHUP) {
Debug::log(CRIT, "[core] Disconnected from pollfd id {}", i);
exit(1);
}
}
{
Debug::log(TRACE, "[core] got poll event");
std::lock_guard<std::mutex> lg(m_sEventLoopInternals.loopRequestMutex);
m_sEventLoopInternals.shouldProcess = true;
m_sEventLoopInternals.loopSignal.notify_all();
}
}
});
m_sTimersThread.thread = std::make_unique<std::thread>([this] {
while (1) {
std::unique_lock lk(m_sTimersThread.loopMutex);
// find nearest timer ms
m_mEventLock.lock();
float nearest = 60000; /* reasonable timeout */
for (auto& t : m_sTimersThread.timers) {
float until = t->duration() - t->passedMs();
if (until < nearest)
nearest = until;
}
m_mEventLock.unlock();
m_sTimersThread.loopSignal.wait_for(lk, std::chrono::milliseconds((int)nearest), [this] { return m_sTimersThread.shouldProcess; });
m_sTimersThread.shouldProcess = false;
// awakened. Check if any timers passed
m_mEventLock.lock();
bool notify = false;
for (auto& t : m_sTimersThread.timers) {
if (t->passed()) {
Debug::log(TRACE, "[core] got timer event");
notify = true;
break;
}
}
m_mEventLock.unlock();
if (notify) {
std::lock_guard<std::mutex> lg(m_sEventLoopInternals.loopRequestMutex);
m_sEventLoopInternals.shouldProcess = true;
m_sEventLoopInternals.loopSignal.notify_all();
}
}
});
while (1) { // dbus events
// wait for being awakened
m_sEventLoopInternals.loopRequestMutex.unlock(); // unlock, we are ready to take events
std::unique_lock lk(m_sEventLoopInternals.loopMutex);
if (m_sEventLoopInternals.shouldProcess == false) // avoid a lock if a thread managed to request something already since we .unlock()ed
m_sEventLoopInternals.loopSignal.wait(lk, [this] { return m_sEventLoopInternals.shouldProcess == true; }); // wait for events
m_sEventLoopInternals.loopRequestMutex.lock(); // lock incoming events
m_sEventLoopInternals.shouldProcess = false;
m_mEventLock.lock(); m_mEventLock.lock();
while (m_pConnection->processPendingRequest()) { if (pollfds[0].revents & POLLIN /* dbus */) {
; while (m_pConnection->processPendingRequest()) {
;
}
}
if (pollfds[1].revents & POLLIN /* wl */) {
wl_display_flush(m_sWaylandConnection.display);
if (wl_display_prepare_read(m_sWaylandConnection.display) == 0) {
wl_display_read_events(m_sWaylandConnection.display);
wl_display_dispatch_pending(m_sWaylandConnection.display);
} else {
wl_display_dispatch(m_sWaylandConnection.display);
}
}
if (pollfds[2].revents & POLLIN /* pw */) {
while (pw_loop_iterate(m_sPipewire.loop, 0) != 0) {
;
}
} }
std::vector<CTimer*> toRemove; std::vector<CTimer*> toRemove;
for (auto& t : m_vTimers) { for (auto& t : m_sTimersThread.timers) {
if (t->passed()) { if (t->passed()) {
t->m_fnCallback(); t->m_fnCallback();
toRemove.emplace_back(t.get()); toRemove.emplace_back(t.get());
@ -307,26 +416,21 @@ void CPortalManager::init() {
} }
} }
while (pw_loop_iterate(m_sPipewire.loop, 0) != 0) { // finalize wayland dispatching. Dispatch pending on the queue
; int ret = 0;
} do {
ret = wl_display_dispatch_pending(m_sWaylandConnection.display);
wl_display_flush(m_sWaylandConnection.display); wl_display_flush(m_sWaylandConnection.display);
if (wl_display_prepare_read(m_sWaylandConnection.display) == 0) { } while (ret > 0);
wl_display_read_events(m_sWaylandConnection.display);
wl_display_dispatch_pending(m_sWaylandConnection.display);
} else {
wl_display_dispatch(m_sWaylandConnection.display);
}
if (!toRemove.empty()) if (!toRemove.empty())
std::erase_if(m_vTimers, std::erase_if(m_sTimersThread.timers,
[&](const auto& t) { return std::find_if(toRemove.begin(), toRemove.end(), [&](const auto& other) { return other == t.get(); }) != toRemove.end(); }); [&](const auto& t) { return std::find_if(toRemove.begin(), toRemove.end(), [&](const auto& other) { return other == t.get(); }) != toRemove.end(); });
m_mEventLock.unlock(); m_mEventLock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
m_sTimersThread.thread.release();
} }
sdbus::IConnection* CPortalManager::getConnection() { sdbus::IConnection* CPortalManager::getConnection() {
@ -382,3 +486,10 @@ gbm_device* CPortalManager::createGBMDevice(drmDevice* dev) {
free(renderNode); free(renderNode);
return gbm_create_device(fd); return gbm_create_device(fd);
} }
void CPortalManager::addTimer(const CTimer& timer) {
Debug::log(TRACE, "[core] adding timer for {}ms", timer.duration());
m_sTimersThread.timers.emplace_back(std::make_unique<CTimer>(timer));
m_sTimersThread.shouldProcess = true;
m_sTimersThread.loopSignal.notify_all();
}

View file

@ -65,13 +65,30 @@ class CPortalManager {
} dma; } dma;
} m_sWaylandConnection; } m_sWaylandConnection;
std::vector<SDMABUFModifier> m_vDMABUFMods; std::vector<SDMABUFModifier> m_vDMABUFMods;
std::vector<std::unique_ptr<CTimer>> m_vTimers; void addTimer(const CTimer& timer);
gbm_device* createGBMDevice(drmDevice* dev); gbm_device* createGBMDevice(drmDevice* dev);
private: private:
void startEventLoop();
struct {
std::condition_variable loopSignal;
std::mutex loopMutex;
std::atomic<bool> shouldProcess = false;
std::mutex loopRequestMutex;
} m_sEventLoopInternals;
struct {
std::condition_variable loopSignal;
std::mutex loopMutex;
bool shouldProcess = false;
std::vector<std::unique_ptr<CTimer>> timers;
std::unique_ptr<std::thread> thread;
} m_sTimersThread;
std::unique_ptr<sdbus::IConnection> m_pConnection; std::unique_ptr<sdbus::IConnection> m_pConnection;
std::vector<std::unique_ptr<SOutput>> m_vOutputs; std::vector<std::unique_ptr<SOutput>> m_vOutputs;

View file

@ -9,3 +9,11 @@ CTimer::CTimer(float ms, std::function<void()> callback) {
bool CTimer::passed() const { bool CTimer::passed() const {
return std::chrono::high_resolution_clock::now() > (m_tStart + std::chrono::milliseconds((uint64_t)m_fDuration)); return std::chrono::high_resolution_clock::now() > (m_tStart + std::chrono::milliseconds((uint64_t)m_fDuration));
} }
float CTimer::passedMs() const {
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - m_tStart).count();
}
float CTimer::duration() const {
return m_fDuration;
}

View file

@ -8,6 +8,8 @@ class CTimer {
CTimer(float ms, std::function<void()> callback); CTimer(float ms, std::function<void()> callback);
bool passed() const; bool passed() const;
float passedMs() const;
float duration() const;
std::function<void()> m_fnCallback; std::function<void()> m_fnCallback;

View file

@ -654,8 +654,7 @@ void CScreencopyPortal::queueNextShareFrame(CScreencopyPortal::SSession* pSessio
if (PSTREAM && !PSTREAM->streamState) if (PSTREAM && !PSTREAM->streamState)
return; return;
g_pPortalManager->m_vTimers.emplace_back( g_pPortalManager->addTimer({1000.0 / pSession->sharingData.framerate, [pSession]() { g_pPortalManager->m_sPortals.screencopy->startFrameCopy(pSession); }});
std::make_unique<CTimer>(1000.0 / pSession->sharingData.framerate, [pSession]() { g_pPortalManager->m_sPortals.screencopy->startFrameCopy(pSession); }));
} }
bool CScreencopyPortal::hasToplevelCapabilities() { bool CScreencopyPortal::hasToplevelCapabilities() {
return m_sState.toplevel; return m_sState.toplevel;