remove polling from socket2, fully event based

This commit is contained in:
vaxerski 2022-10-03 20:47:15 +01:00
parent 85eea70be4
commit 49ab3890aa
2 changed files with 42 additions and 44 deletions

View file

@ -36,19 +36,12 @@ void CEventManager::startThread() {
// 10 max queued. // 10 max queued.
listen(SOCKET, 10); listen(SOCKET, 10);
char readBuf[1024] = {0};
sockaddr_in clientAddress; sockaddr_in clientAddress;
socklen_t clientSize = sizeof(clientAddress); socklen_t clientSize = sizeof(clientAddress);
Debug::log(LOG, "Hypr socket 2 started at %s", socketPath.c_str()); Debug::log(LOG, "Hypr socket 2 started at %s", socketPath.c_str());
// set the socket nonblock
int flags = fcntl(SOCKET, F_GETFL, 0);
fcntl(SOCKET, F_SETFL, flags | O_NONBLOCK);
while (1) { while (1) {
const auto ACCEPTEDCONNECTION = accept(SOCKET, (sockaddr*)&clientAddress, &clientSize); const auto ACCEPTEDCONNECTION = accept(SOCKET, (sockaddr*)&clientAddress, &clientSize);
if (ACCEPTEDCONNECTION > 0) { if (ACCEPTEDCONNECTION > 0) {
@ -61,43 +54,7 @@ void CEventManager::startThread() {
Debug::log(LOG, "Socket 2 accepted a new client at FD %d", ACCEPTEDCONNECTION); Debug::log(LOG, "Socket 2 accepted a new client at FD %d", ACCEPTEDCONNECTION);
} }
// pong if all FDs valid ensureFDsValid();
for (auto it = m_dAcceptedSocketFDs.begin(); it != m_dAcceptedSocketFDs.end();) {
auto sizeRead = recv(*it, &readBuf, 1024, 0);
if (sizeRead != 0) {
it++;
continue;
}
// invalid!
Debug::log(LOG, "Removed invalid socket (2) FD: %d", *it);
it = m_dAcceptedSocketFDs.erase(it);
}
// valid FDs, check the queue
// don't do anything if main thread is writing to the eventqueue
eventQueueMutex.lock();
if (m_dQueuedEvents.empty()){ // if queue empty, sleep and ignore
eventQueueMutex.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
// write all queued events
for (auto& ev : m_dQueuedEvents) {
std::string eventString = (ev.event + ">>" + ev.data).substr(0, 1022) + "\n";
for (auto& fd : m_dAcceptedSocketFDs) {
write(fd, eventString.c_str(), eventString.length());
}
}
m_dQueuedEvents.clear();
eventQueueMutex.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
close(SOCKET); close(SOCKET);
@ -106,6 +63,42 @@ void CEventManager::startThread() {
m_tThread.detach(); m_tThread.detach();
} }
void CEventManager::ensureFDsValid() {
static char readBuf[1024] = {0};
// pong if all FDs valid
for (auto it = m_dAcceptedSocketFDs.begin(); it != m_dAcceptedSocketFDs.end();) {
auto sizeRead = recv(*it, &readBuf, 1024, 0);
if (sizeRead != 0) {
it++;
continue;
}
// invalid!
Debug::log(LOG, "Removed invalid socket (2) FD: %d", *it);
it = m_dAcceptedSocketFDs.erase(it);
}
}
void CEventManager::flushEvents() {
ensureFDsValid();
eventQueueMutex.lock();
for (auto& ev : m_dQueuedEvents) {
std::string eventString = (ev.event + ">>" + ev.data).substr(0, 1022) + "\n";
for (auto& fd : m_dAcceptedSocketFDs) {
write(fd, eventString.c_str(), eventString.length());
}
}
m_dQueuedEvents.clear();
eventQueueMutex.unlock();
}
void CEventManager::postEvent(const SHyprIPCEvent event, bool force) { void CEventManager::postEvent(const SHyprIPCEvent event, bool force) {
if ((m_bIgnoreEvents && !force) || g_pCompositor->m_bIsShuttingDown) { if ((m_bIgnoreEvents && !force) || g_pCompositor->m_bIsShuttingDown) {
@ -117,5 +110,7 @@ void CEventManager::postEvent(const SHyprIPCEvent event, bool force) {
eventQueueMutex.lock(); eventQueueMutex.lock();
m_dQueuedEvents.push_back(ev); m_dQueuedEvents.push_back(ev);
eventQueueMutex.unlock(); eventQueueMutex.unlock();
flushEvents();
}, event).detach(); }, event).detach();
} }

View file

@ -25,6 +25,9 @@ public:
private: private:
void flushEvents();
void ensureFDsValid();
std::mutex eventQueueMutex; std::mutex eventQueueMutex;
std::deque<SHyprIPCEvent> m_dQueuedEvents; std::deque<SHyprIPCEvent> m_dQueuedEvents;