mirror of
https://github.com/hyprwm/Hyprland
synced 2024-11-26 13:45:58 +01:00
socket2: fix events being reordered (#5955)
* socket2: fix events being reordered * remove WL_EVENT_READABLE * initialize eventSource in SClient * add more logs oopsie * replace unordered_map with vector * fix reordering when socket becomes writable before queue is flushed * ignore EAGAIN when accepting connection * use g_pEventManager
This commit is contained in:
parent
c19903eaf8
commit
37a84c5223
3 changed files with 142 additions and 117 deletions
|
@ -338,9 +338,6 @@ void CCompositor::cleanup() {
|
||||||
m_pLastFocus = nullptr;
|
m_pLastFocus = nullptr;
|
||||||
m_pLastWindow.reset();
|
m_pLastWindow.reset();
|
||||||
|
|
||||||
// end threads
|
|
||||||
g_pEventManager->m_tThread = std::thread();
|
|
||||||
|
|
||||||
m_vWorkspaces.clear();
|
m_vWorkspaces.clear();
|
||||||
m_vWindows.clear();
|
m_vWindows.clear();
|
||||||
|
|
||||||
|
@ -463,7 +460,6 @@ void CCompositor::initManagers(eManagersInitStage stage) {
|
||||||
|
|
||||||
Debug::log(LOG, "Creating the EventManager!");
|
Debug::log(LOG, "Creating the EventManager!");
|
||||||
g_pEventManager = std::make_unique<CEventManager>();
|
g_pEventManager = std::make_unique<CEventManager>();
|
||||||
g_pEventManager->startThread();
|
|
||||||
|
|
||||||
Debug::log(LOG, "Creating the HyprDebugOverlay!");
|
Debug::log(LOG, "Creating the HyprDebugOverlay!");
|
||||||
g_pDebugOverlay = std::make_unique<CHyprDebugOverlay>();
|
g_pDebugOverlay = std::make_unique<CHyprDebugOverlay>();
|
||||||
|
|
|
@ -1,154 +1,178 @@
|
||||||
#include "EventManager.hpp"
|
#include "EventManager.hpp"
|
||||||
#include "../Compositor.hpp"
|
#include "../Compositor.hpp"
|
||||||
|
|
||||||
#include <errno.h>
|
#include <algorithm>
|
||||||
#include <fcntl.h>
|
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <stdio.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/ioctl.h>
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <algorithm>
|
|
||||||
|
|
||||||
CEventManager::CEventManager() {}
|
|
||||||
|
|
||||||
int fdHandleWrite(int fd, uint32_t mask, void* data) {
|
|
||||||
const auto PEVMGR = (CEventManager*)data;
|
|
||||||
return PEVMGR->onFDWrite(fd, mask);
|
|
||||||
}
|
|
||||||
|
|
||||||
int socket2HandleWrite(int fd, uint32_t mask, void* data) {
|
|
||||||
const auto PEVMGR = (CEventManager*)data;
|
|
||||||
return PEVMGR->onSocket2Write(fd, mask);
|
|
||||||
}
|
|
||||||
|
|
||||||
void CEventManager::startThread() {
|
|
||||||
|
|
||||||
m_iSocketFD = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
|
|
||||||
|
|
||||||
|
CEventManager::CEventManager() {
|
||||||
|
m_iSocketFD = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
|
||||||
if (m_iSocketFD < 0) {
|
if (m_iSocketFD < 0) {
|
||||||
Debug::log(ERR, "Couldn't start the Hyprland Socket 2. (1) IPC will not work.");
|
Debug::log(ERR, "Couldn't start the Hyprland Socket 2. (1) IPC will not work.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sockaddr_un SERVERADDRESS = {.sun_family = AF_UNIX};
|
sockaddr_un SERVERADDRESS = {.sun_family = AF_UNIX};
|
||||||
std::string socketPath = g_pCompositor->m_szInstancePath + "/.socket2.sock";
|
const auto PATH = g_pCompositor->m_szInstancePath + "/.socket2.sock";
|
||||||
strncpy(SERVERADDRESS.sun_path, socketPath.c_str(), sizeof(SERVERADDRESS.sun_path) - 1);
|
if (PATH.length() > sizeof(SERVERADDRESS.sun_path) - 1) {
|
||||||
|
Debug::log(ERR, "Socket2 path is too long. (2) IPC will not work.");
|
||||||
bind(m_iSocketFD, (sockaddr*)&SERVERADDRESS, SUN_LEN(&SERVERADDRESS));
|
return;
|
||||||
|
|
||||||
// 10 max queued.
|
|
||||||
listen(m_iSocketFD, 10);
|
|
||||||
|
|
||||||
m_pEventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, m_iSocketFD, WL_EVENT_READABLE, socket2HandleWrite, this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int CEventManager::onSocket2Write(int fd, uint32_t mask) {
|
strncpy(SERVERADDRESS.sun_path, PATH.c_str(), sizeof(SERVERADDRESS.sun_path) - 1);
|
||||||
|
|
||||||
|
if (bind(m_iSocketFD, (sockaddr*)&SERVERADDRESS, SUN_LEN(&SERVERADDRESS)) < 0) {
|
||||||
|
Debug::log(ERR, "Couldn't bind the Hyprland Socket 2. (3) IPC will not work.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 10 max queued.
|
||||||
|
if (listen(m_iSocketFD, 10) < 0) {
|
||||||
|
Debug::log(ERR, "Couldn't listen on the Hyprland Socket 2. (4) IPC will not work.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
m_pEventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, m_iSocketFD, WL_EVENT_READABLE, onClientEvent, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
CEventManager::~CEventManager() {
|
||||||
|
for (const auto& client : m_vClients) {
|
||||||
|
wl_event_source_remove(client.eventSource);
|
||||||
|
close(client.fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_pEventSource != nullptr)
|
||||||
|
wl_event_source_remove(m_pEventSource);
|
||||||
|
|
||||||
|
if (m_iSocketFD >= 0)
|
||||||
|
close(m_iSocketFD);
|
||||||
|
}
|
||||||
|
|
||||||
|
int CEventManager::onServerEvent(int fd, uint32_t mask, void* data) {
|
||||||
|
return g_pEventManager->onClientEvent(fd, mask);
|
||||||
|
}
|
||||||
|
|
||||||
|
int CEventManager::onClientEvent(int fd, uint32_t mask, void* data) {
|
||||||
|
return g_pEventManager->onServerEvent(fd, mask);
|
||||||
|
}
|
||||||
|
|
||||||
|
int CEventManager::onServerEvent(int fd, uint32_t mask) {
|
||||||
if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) {
|
if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) {
|
||||||
Debug::log(ERR, "Socket2 hangup?? IPC broke");
|
Debug::log(ERR, "Socket2 hangup?? IPC broke");
|
||||||
|
|
||||||
wl_event_source_remove(m_pEventSource);
|
wl_event_source_remove(m_pEventSource);
|
||||||
|
m_pEventSource = nullptr;
|
||||||
close(fd);
|
close(fd);
|
||||||
|
m_iSocketFD = -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
sockaddr_in clientAddress;
|
sockaddr_in clientAddress;
|
||||||
socklen_t clientSize = sizeof(clientAddress);
|
socklen_t clientSize = sizeof(clientAddress);
|
||||||
const auto ACCEPTEDCONNECTION = accept4(m_iSocketFD, (sockaddr*)&clientAddress, &clientSize, SOCK_CLOEXEC | SOCK_NONBLOCK);
|
const auto ACCEPTEDCONNECTION = accept4(m_iSocketFD, (sockaddr*)&clientAddress, &clientSize, SOCK_CLOEXEC | SOCK_NONBLOCK);
|
||||||
|
if (ACCEPTEDCONNECTION < 0) {
|
||||||
|
if (errno != EAGAIN) {
|
||||||
|
Debug::log(ERR, "Socket2 failed receiving connection, errno: {}", errno);
|
||||||
|
wl_event_source_remove(m_pEventSource);
|
||||||
|
m_pEventSource = nullptr;
|
||||||
|
close(fd);
|
||||||
|
m_iSocketFD = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (ACCEPTEDCONNECTION > 0) {
|
|
||||||
Debug::log(LOG, "Socket2 accepted a new client at FD {}", ACCEPTEDCONNECTION);
|
Debug::log(LOG, "Socket2 accepted a new client at FD {}", ACCEPTEDCONNECTION);
|
||||||
|
|
||||||
// add to event loop so we can close it when we need to
|
// add to event loop so we can close it when we need to
|
||||||
m_dAcceptedSocketFDs.push_back(
|
auto* eventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, ACCEPTEDCONNECTION, 0, onServerEvent, nullptr);
|
||||||
std::make_pair<>(ACCEPTEDCONNECTION, wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, ACCEPTEDCONNECTION, WL_EVENT_READABLE, fdHandleWrite, this)));
|
m_vClients.emplace_back(SClient{
|
||||||
} else {
|
ACCEPTEDCONNECTION,
|
||||||
Debug::log(ERR, "Socket2 failed receiving connection, errno: {}", errno);
|
{},
|
||||||
close(fd);
|
eventSource,
|
||||||
}
|
});
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int CEventManager::onFDWrite(int fd, uint32_t mask) {
|
int CEventManager::onClientEvent(int fd, uint32_t mask) {
|
||||||
auto removeFD = [this](int fd) -> void {
|
|
||||||
for (auto it = m_dAcceptedSocketFDs.begin(); it != m_dAcceptedSocketFDs.end();) {
|
|
||||||
if (it->first == fd) {
|
|
||||||
wl_event_source_remove(it->second); // remove this fd listener
|
|
||||||
it = m_dAcceptedSocketFDs.erase(it);
|
|
||||||
} else {
|
|
||||||
it++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close(fd);
|
|
||||||
};
|
|
||||||
|
|
||||||
if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) {
|
if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) {
|
||||||
// remove, hanged up
|
Debug::log(LOG, "Socket2 fd {} hung up", fd);
|
||||||
removeFD(fd);
|
removeClientByFD(fd);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int availableBytes;
|
if (mask & WL_EVENT_WRITABLE) {
|
||||||
if (ioctl(fd, FIONREAD, &availableBytes) == -1) {
|
const auto CLIENTIT = findClientByFD(fd);
|
||||||
Debug::log(ERR, "fd {} sent invalid data (1)", fd);
|
|
||||||
removeFD(fd);
|
// send all queued events
|
||||||
return 0;
|
while (!CLIENTIT->events.empty()) {
|
||||||
|
const auto& event = CLIENTIT->events.front();
|
||||||
|
if (write(CLIENTIT->fd, event->c_str(), event->length()) < 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
CLIENTIT->events.pop_front();
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[availableBytes];
|
// stop polling when we sent all events
|
||||||
const auto RECEIVED = recv(fd, buf, availableBytes, 0);
|
if (CLIENTIT->events.empty())
|
||||||
if (RECEIVED == -1) {
|
wl_event_source_fd_update(CLIENTIT->eventSource, 0);
|
||||||
Debug::log(ERR, "fd {} sent invalid data (2)", fd);
|
|
||||||
removeFD(fd);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void CEventManager::flushEvents() {
|
std::vector<CEventManager::SClient>::iterator CEventManager::findClientByFD(int fd) {
|
||||||
eventQueueMutex.lock();
|
return std::find_if(m_vClients.begin(), m_vClients.end(), [fd](const auto& client) { return client.fd == fd; });
|
||||||
|
|
||||||
for (auto& ev : m_dQueuedEvents) {
|
|
||||||
std::string eventString = (ev.event + ">>" + ev.data).substr(0, 1022) + "\n";
|
|
||||||
for (auto& fd : m_dAcceptedSocketFDs) {
|
|
||||||
try {
|
|
||||||
write(fd.first, eventString.c_str(), eventString.length());
|
|
||||||
} catch (...) {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m_dQueuedEvents.clear();
|
std::vector<CEventManager::SClient>::iterator CEventManager::removeClientByFD(int fd) {
|
||||||
|
const auto CLIENTIT = findClientByFD(fd);
|
||||||
|
wl_event_source_remove(CLIENTIT->eventSource);
|
||||||
|
close(fd);
|
||||||
|
|
||||||
eventQueueMutex.unlock();
|
return m_vClients.erase(CLIENTIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CEventManager::postEvent(const SHyprIPCEvent event) {
|
std::string CEventManager::formatEvent(const SHyprIPCEvent& event) const {
|
||||||
|
std::string_view data = event.data;
|
||||||
|
auto eventString = std::format("{}>>{}\n", event.event, data.substr(0, 1024));
|
||||||
|
std::replace(eventString.begin() + event.event.length() + 2, eventString.end() - 1, '\n', ' ');
|
||||||
|
return eventString;
|
||||||
|
}
|
||||||
|
|
||||||
|
void CEventManager::postEvent(const SHyprIPCEvent& event) {
|
||||||
if (g_pCompositor->m_bIsShuttingDown) {
|
if (g_pCompositor->m_bIsShuttingDown) {
|
||||||
Debug::log(WARN, "Suppressed (ignoreevents true / shutting down) event of type {}, content: {}", event.event, event.data);
|
Debug::log(WARN, "Suppressed (shutting down) event of type {}, content: {}", event.event, event.data);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::thread(
|
const size_t MAX_QUEUED_EVENTS = 64;
|
||||||
[this](SHyprIPCEvent ev) {
|
auto sharedEvent = makeShared<std::string>(formatEvent(event));
|
||||||
std::replace(ev.data.begin(), ev.data.end(), '\n', ' ');
|
for (auto it = m_vClients.begin(); it != m_vClients.end();) {
|
||||||
|
// try to send the event immediately if the queue is empty
|
||||||
eventQueueMutex.lock();
|
const auto QUEUESIZE = it->events.size();
|
||||||
m_dQueuedEvents.push_back(ev);
|
if (QUEUESIZE > 0 || write(it->fd, sharedEvent->c_str(), sharedEvent->length()) < 0) {
|
||||||
eventQueueMutex.unlock();
|
if (QUEUESIZE >= MAX_QUEUED_EVENTS) {
|
||||||
|
// too many events queued, remove the client
|
||||||
flushEvents();
|
Debug::log(ERR, "Socket2 fd {} overflowed event queue, removing", it->fd);
|
||||||
},
|
it = removeClientByFD(it->fd);
|
||||||
event)
|
continue;
|
||||||
.detach();
|
}
|
||||||
|
|
||||||
|
// queue it to send later if failed
|
||||||
|
it->events.push_back(sharedEvent);
|
||||||
|
|
||||||
|
// poll for write if queue was empty
|
||||||
|
if (QUEUESIZE == 0)
|
||||||
|
wl_event_source_fd_update(it->eventSource, WL_EVENT_WRITABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
++it;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <fstream>
|
#include <vector>
|
||||||
#include <mutex>
|
|
||||||
|
|
||||||
#include "../defines.hpp"
|
#include "../defines.hpp"
|
||||||
#include "../helpers/MiscFunctions.hpp"
|
#include "../helpers/memory/SharedPtr.hpp"
|
||||||
|
|
||||||
struct SHyprIPCEvent {
|
struct SHyprIPCEvent {
|
||||||
std::string event;
|
std::string event;
|
||||||
|
@ -14,27 +13,33 @@ struct SHyprIPCEvent {
|
||||||
class CEventManager {
|
class CEventManager {
|
||||||
public:
|
public:
|
||||||
CEventManager();
|
CEventManager();
|
||||||
|
~CEventManager();
|
||||||
|
|
||||||
void postEvent(const SHyprIPCEvent event);
|
void postEvent(const SHyprIPCEvent& event);
|
||||||
|
|
||||||
void startThread();
|
|
||||||
|
|
||||||
std::thread m_tThread;
|
|
||||||
|
|
||||||
int m_iSocketFD = -1;
|
|
||||||
|
|
||||||
int onSocket2Write(int fd, uint32_t mask);
|
|
||||||
int onFDWrite(int fd, uint32_t mask);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void flushEvents();
|
std::string formatEvent(const SHyprIPCEvent& event) const;
|
||||||
|
|
||||||
std::mutex eventQueueMutex;
|
static int onServerEvent(int fd, uint32_t mask, void* data);
|
||||||
std::deque<SHyprIPCEvent> m_dQueuedEvents;
|
static int onClientEvent(int fd, uint32_t mask, void* data);
|
||||||
|
|
||||||
std::deque<std::pair<int, wl_event_source*>> m_dAcceptedSocketFDs;
|
int onServerEvent(int fd, uint32_t mask);
|
||||||
|
int onClientEvent(int fd, uint32_t mask);
|
||||||
|
|
||||||
|
struct SClient {
|
||||||
|
int fd = -1;
|
||||||
|
std::deque<SP<std::string>> events;
|
||||||
|
wl_event_source* eventSource = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<SClient>::iterator findClientByFD(int fd);
|
||||||
|
std::vector<SClient>::iterator removeClientByFD(int fd);
|
||||||
|
|
||||||
|
private:
|
||||||
|
int m_iSocketFD = -1;
|
||||||
wl_event_source* m_pEventSource = nullptr;
|
wl_event_source* m_pEventSource = nullptr;
|
||||||
|
|
||||||
|
std::vector<SClient> m_vClients;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline std::unique_ptr<CEventManager> g_pEventManager;
|
inline std::unique_ptr<CEventManager> g_pEventManager;
|
||||||
|
|
Loading…
Reference in a new issue