diff --git a/binaries/data/config/default.cfg b/binaries/data/config/default.cfg index 2afc50b027..0dd317d68c 100644 --- a/binaries/data/config/default.cfg +++ b/binaries/data/config/default.cfg @@ -474,7 +474,6 @@ name_id = "0ad" duplicateplayernames = false ; Rename joining player to "User (2)" if "User" is already connected, otherwise prohibit join. lateobservers = everyone ; Allow observers to join the game after it started. Possible values: everyone, buddies, disabled. observerlimit = 8 ; Prevent further observer joins in running games if this limit is reached -gamestarttimeout = 60000 ; Don't disconnect clients timing out in the loading screen and rejoin process before exceeding this timeout. [overlay] fps = "false" ; Show frames per second in top right corner diff --git a/build/premake/premake5.lua b/build/premake/premake5.lua index 31fbfb9b91..43c6834181 100644 --- a/build/premake/premake5.lua +++ b/build/premake/premake5.lua @@ -592,7 +592,7 @@ function setup_all_libs () extern_libs = { "spidermonkey", "enet", - "boost", -- dragged in via server->simulation.h->random + "boost", -- dragged in via server->simulation.h->random and NetSession.h->lockfree "fmt", } if not _OPTIONS["without-miniupnpc"] then diff --git a/source/main.cpp b/source/main.cpp index 7e03cd04b6..91e28d03bc 100644 --- a/source/main.cpp +++ b/source/main.cpp @@ -421,10 +421,6 @@ static void Frame() g_Game->GetView()->Update(float(realTimeSinceLastFrame)); } - // Immediately flush any messages produced by simulation code - if (g_NetClient) - g_NetClient->Flush(); - // Keep us connected to any XMPP servers if (g_XmppClient) g_XmppClient->recv(); diff --git a/source/network/NetClient.cpp b/source/network/NetClient.cpp index 24467b3629..43b0b85b3a 100644 --- a/source/network/NetClient.cpp +++ b/source/network/NetClient.cpp @@ -33,6 +33,7 @@ #include "ps/CStr.h" #include "ps/Game.h" #include "ps/Loader.h" +#include "ps/Profile.h" #include "scriptinterface/ScriptInterface.h" #include "simulation2/Simulation2.h" @@ -143,6 +144,10 @@ CNetClient::CNetClient(CGame* game, bool isLocalClient) : CNetClient::~CNetClient() { + // Try to flush messages before dying (probably fails). + if (m_ClientTurnManager) + m_ClientTurnManager->OnDestroyConnection(); + DestroyConnection(); JS_RemoveExtraGCRootsTracer(GetScriptInterface().GetGeneralJSContext(), CNetClient::Trace, this); } @@ -170,6 +175,7 @@ bool CNetClient::SetupConnection(const CStr& server, const u16 port, ENetHost* e CNetClientSession* session = new CNetClientSession(*this); bool ok = session->Connect(server, port, m_IsLocalClient, enetClient); SetAndOwnSession(session); + m_PollingThread = std::thread(CNetClientSession::RunNetLoop, m_Session); return ok; } @@ -181,13 +187,17 @@ void CNetClient::SetAndOwnSession(CNetClientSession* session) void CNetClient::DestroyConnection() { - // Attempt to send network messages from the current frame before connection is destroyed. - if (m_ClientTurnManager) - { - m_ClientTurnManager->OnDestroyConnection(); - Flush(); - } - SAFE_DELETE(m_Session); + if (m_Session) + m_Session->Shutdown(); + + if (m_PollingThread.joinable()) + // Use detach() over join() because we don't want to wait for the session + // (which may be polling or trying to send messages). + m_PollingThread.detach(); + + // The polling thread will cleanup the session on its own, + // mark it as nullptr here so we know we're done using it. + m_Session = nullptr; } void CNetClient::Poll() @@ -195,8 +205,10 @@ void CNetClient::Poll() if (!m_Session) return; + PROFILE3("NetClient::poll"); + CheckServerConnection(); - m_Session->Poll(); + m_Session->ProcessPolledMessages(); } void CNetClient::CheckServerConnection() @@ -231,12 +243,6 @@ void CNetClient::CheckServerConnection() } } -void CNetClient::Flush() -{ - if (m_Session) - m_Session->Flush(); -} - void CNetClient::GuiPoll(JS::MutableHandleValue ret) { if (m_GuiMessageQueue.empty()) @@ -316,7 +322,7 @@ void CNetClient::HandleDisconnect(u32 reason) "status", "disconnected", "reason", reason); - SAFE_DELETE(m_Session); + DestroyConnection(); // Update the state immediately to UNCONNECTED (don't bother with FSM transitions since // we'd need one for every single state, and we don't need to use per-state actions) @@ -643,8 +649,6 @@ bool CNetClient::OnGameStart(void* context, CFsmEvent* event) CNetClient* client = static_cast(context); - client->m_Session->SetLongTimeout(true); - // Find the player assigned to our GUID int player = -1; if (client->m_PlayerAssignments.find(client->m_GUID) != client->m_PlayerAssignments.end()) @@ -770,21 +774,10 @@ bool CNetClient::OnClientsLoading(void *context, CFsmEvent *event) CNetClient* client = static_cast(context); CClientsLoadingMessage* message = static_cast(event->GetParamRef()); - bool finished = true; std::vector guids; guids.reserve(message->m_Clients.size()); for (const CClientsLoadingMessage::S_m_Clients& mClient : message->m_Clients) - { - if (client->m_GUID == mClient.m_GUID) - finished = false; - guids.push_back(mClient.m_GUID); - } - - // Disable the timeout here after processing the enet message, so as to ensure that the connection isn't currently - // timing out (as it is when just leaving the loading screen in LoadFinished). - if (finished) - client->m_Session->SetLongTimeout(false); client->PushGuiMessage( "type", "clients-loading", @@ -825,9 +818,6 @@ bool CNetClient::OnLoadedGame(void* context, CFsmEvent* event) if (client->m_Rejoin) client->SendRejoinedMessage(); - // The last client to leave the loading screen didn't receive the CClientsLoadingMessage, so disable here. - client->m_Session->SetLongTimeout(false); - return true; } diff --git a/source/network/NetClient.h b/source/network/NetClient.h index 7258a32175..53f9af42fb 100644 --- a/source/network/NetClient.h +++ b/source/network/NetClient.h @@ -25,8 +25,10 @@ #include "ps/CStr.h" +#include #include #include +#include class CGame; class CNetClientSession; @@ -128,12 +130,6 @@ public: */ void CheckServerConnection(); - /** - * Flush any queued outgoing network messages. - * This should be called soon after sending a group of messages that may be batched together. - */ - void Flush(); - /** * Retrieves the next queued GUI message, and removes it from the queue. * The returned value is in the GetScriptInterface() JS context. @@ -232,6 +228,10 @@ public: */ void SendPausedMessage(bool pause); + /** + * @return Whether the NetClient is shutting down. + */ + bool ShouldShutdown() const; private: void SendAuthenticateMessage(); @@ -275,6 +275,8 @@ private: /// Current network session (or NULL if not connected) CNetClientSession* m_Session; + std::thread m_PollingThread; + /// Turn manager associated with the current game (or NULL if we haven't started the game yet) CNetClientTurnManager* m_ClientTurnManager; diff --git a/source/network/NetServer.cpp b/source/network/NetServer.cpp index 6d3c4af5f1..f5515a0012 100644 --- a/source/network/NetServer.cpp +++ b/source/network/NetServer.cpp @@ -56,7 +56,7 @@ #define DEFAULT_SERVER_NAME L"Unnamed Server" -static const int CHANNEL_COUNT = 1; +constexpr int CHANNEL_COUNT = 1; /** * enet_host_service timeout (msecs). @@ -1112,8 +1112,6 @@ bool CNetServerWorker::OnAuthenticate(void* context, CFsmEvent* event) // the most efficient client to request a copy from CNetServerSession* sourceSession = server.m_Sessions.at(0); - session->SetLongTimeout(true); - sourceSession->GetFileTransferer().StartTask( shared_ptr(new CNetFileReceiveTask_ServerRejoin(server, newHostID)) ); @@ -1291,8 +1289,6 @@ bool CNetServerWorker::OnLoadedGame(void* context, CFsmEvent* event) CNetServerSession* loadedSession = (CNetServerSession*)context; CNetServerWorker& server = loadedSession->GetServer(); - loadedSession->SetLongTimeout(false); - // We're in the loading state, so wait until every client has loaded // before starting the game ENSURE(server.m_State == SERVER_STATE_LOADING); @@ -1390,8 +1386,6 @@ bool CNetServerWorker::OnRejoined(void* context, CFsmEvent* event) session->SendMessage(&pausedMessage); } - session->SetLongTimeout(false); - return true; } @@ -1488,10 +1482,7 @@ void CNetServerWorker::StartGame() m_ServerTurnManager = new CNetServerTurnManager(*this); for (CNetServerSession* session : m_Sessions) - { m_ServerTurnManager->InitialiseClient(session->GetHostID(), 0); // TODO: only for non-observers - session->SetLongTimeout(true); - } m_State = SERVER_STATE_LOADING; diff --git a/source/network/NetSession.cpp b/source/network/NetSession.cpp index 457714b125..7b5523830c 100644 --- a/source/network/NetSession.cpp +++ b/source/network/NetSession.cpp @@ -16,49 +16,32 @@ */ #include "precompiled.h" + #include "NetSession.h" + #include "NetClient.h" -#include "NetServer.h" #include "NetMessage.h" +#include "NetServer.h" #include "NetStats.h" -#include "lib/external_libraries/enet.h" #include "ps/CLogger.h" -#include "ps/ConfigDB.h" #include "ps/Profile.h" #include "scriptinterface/ScriptInterface.h" -const u32 NETWORK_WARNING_TIMEOUT = 2000; +constexpr int NETCLIENT_POLL_TIMEOUT = 50; -const u32 MAXIMUM_HOST_TIMEOUT = std::numeric_limits::max(); - -static const int CHANNEL_COUNT = 1; - -// Only disable long timeouts after a packet from the remote enet peer has been processed. -// Otherwise a long timeout can still be in progress when disabling it here. -void SetEnetLongTimeout(ENetPeer* peer, bool isLocalClient, bool enabled) -{ -#if (ENET_VERSION >= ENET_VERSION_CREATE(1, 3, 4)) - if (!peer || isLocalClient) - return; - - if (enabled) - { - u32 timeout; - CFG_GET_VAL("network.gamestarttimeout", timeout); - enet_peer_timeout(peer, 0, timeout, timeout); - } - else - enet_peer_timeout(peer, 0, 0, 0); -#endif -} +constexpr int CHANNEL_COUNT = 1; CNetClientSession::CNetClientSession(CNetClient& client) : - m_Client(client), m_FileTransferer(this), m_Host(nullptr), m_Server(nullptr), m_Stats(nullptr), m_IsLocalClient(false) + m_Client(client), m_FileTransferer(this), m_Host(nullptr), m_Server(nullptr), + m_Stats(nullptr), m_IsLocalClient(false), m_IncomingMessages(16), m_OutgoingMessages(16), + m_LoopRunning(false), m_ShouldShutdown(false), m_MeanRTT(0), m_LastReceivedTime(0) { } CNetClientSession::~CNetClientSession() { + ENSURE(!m_LoopRunning); + delete m_Stats; if (m_Host && m_Server) @@ -74,6 +57,7 @@ CNetClientSession::~CNetClientSession() bool CNetClientSession::Connect(const CStr& server, const u16 port, const bool isLocalClient, ENetHost* enetClient) { + ENSURE(!m_LoopRunning); ENSURE(!m_Host); ENSURE(!m_Server); @@ -102,12 +86,6 @@ bool CNetClientSession::Connect(const CStr& server, const u16 port, const bool i m_Server = peer; m_IsLocalClient = isLocalClient; - // Prevent the local client of the host from timing out too quickly. -#if (ENET_VERSION >= ENET_VERSION_CREATE(1, 3, 4)) - if (isLocalClient) - enet_peer_timeout(peer, 1, MAXIMUM_HOST_TIMEOUT, MAXIMUM_HOST_TIMEOUT); -#endif - m_Stats = new CNetStatsTable(m_Server); if (CProfileViewer::IsInitialised()) g_ProfileViewer.AddRootTable(m_Stats); @@ -115,60 +93,92 @@ bool CNetClientSession::Connect(const CStr& server, const u16 port, const bool i return true; } -void CNetClientSession::Disconnect(NetDisconnectReason reason) +void CNetClientSession::RunNetLoop(CNetClientSession* session) { - if (reason == NDR_UNKNOWN) - LOGWARNING("Disconnecting from the server without communicating the disconnect reason!"); + ENSURE(!session->m_LoopRunning); + session->m_LoopRunning = true; - ENSURE(m_Host && m_Server); + debug_SetThreadName("NetClientSession loop"); - // TODO: ought to do reliable async disconnects, probably - enet_peer_disconnect_now(m_Server, static_cast(reason)); - enet_host_destroy(m_Host); + while (!session->m_ShouldShutdown) + { + ENSURE(session->m_Host && session->m_Server); - m_Host = NULL; - m_Server = NULL; + session->m_FileTransferer.Poll(); + session->Poll(); + session->Flush(); - SAFE_DELETE(m_Stats); + session->m_LastReceivedTime = enet_time_get() - session->m_Server->lastReceiveTime; + session->m_MeanRTT = session->m_Server->roundTripTime; + } + + session->m_LoopRunning = false; + + // Deleting the session is handled in this thread as it might outlive the CNetClient. + SAFE_DELETE(session); +} + +void CNetClientSession::Shutdown() +{ + m_ShouldShutdown = true; } void CNetClientSession::Poll() { - PROFILE3("net client poll"); - - ENSURE(m_Host && m_Server); - - m_FileTransferer.Poll(); - ENetEvent event; - while (enet_host_service(m_Host, &event, 0) > 0) + + // Use the timeout to make the thread wait and save CPU time. + if (enet_host_service(m_Host, &event, NETCLIENT_POLL_TIMEOUT) <= 0) + return; + + if (event.type == ENET_EVENT_TYPE_CONNECT) { - switch (event.type) - { - case ENET_EVENT_TYPE_CONNECT: - { - ENSURE(event.peer == m_Server); + ENSURE(event.peer == m_Server); - // Report the server address - char hostname[256] = "(error)"; - enet_address_get_host_ip(&event.peer->address, hostname, ARRAY_SIZE(hostname)); - LOGMESSAGE("Net client: Connected to %s:%u", hostname, (unsigned int)event.peer->address.port); + // Report the server address immediately. + char hostname[256] = "(error)"; + enet_address_get_host_ip(&event.peer->address, hostname, ARRAY_SIZE(hostname)); + LOGMESSAGE("Net client: Connected to %s:%u", hostname, (unsigned int)event.peer->address.port); + m_IncomingMessages.push(event); + } + else if (event.type == ENET_EVENT_TYPE_DISCONNECT) + { + ENSURE(event.peer == m_Server); + + // Report immediately. + LOGMESSAGE("Net client: Disconnected"); + + m_IncomingMessages.push(event); + } + else if (event.type == ENET_EVENT_TYPE_RECEIVE) + m_IncomingMessages.push(event); +} + +void CNetClientSession::Flush() +{ + ENetPacket* packet; + while (m_OutgoingMessages.pop(packet)) + if (enet_peer_send(m_Server, CNetHost::DEFAULT_CHANNEL, packet) < 0) + LOGERROR("NetClient: Failed to send packet to server"); + + enet_host_flush(m_Host); +} + +void CNetClientSession::ProcessPolledMessages() +{ + ENetEvent event; + while(m_IncomingMessages.pop(event)) + { + if (event.type == ENET_EVENT_TYPE_CONNECT) m_Client.HandleConnect(); - + else if (event.type == ENET_EVENT_TYPE_DISCONNECT) + { + // This deletes the session, so we must break; + m_Client.HandleDisconnect(event.data); break; } - - case ENET_EVENT_TYPE_DISCONNECT: - { - ENSURE(event.peer == m_Server); - - LOGMESSAGE("Net client: Disconnected"); - m_Client.HandleDisconnect(event.data); - return; - } - - case ENET_EVENT_TYPE_RECEIVE: + else if (event.type == ENET_EVENT_TYPE_RECEIVE) { CNetMessage* msg = CNetMessageFactory::CreateMessage(event.packet->data, event.packet->dataLength, m_Client.GetScriptInterface()); if (msg) @@ -176,36 +186,29 @@ void CNetClientSession::Poll() LOGMESSAGE("Net client: Received message %s of size %lu from server", msg->ToString().c_str(), (unsigned long)msg->GetSerializedLength()); m_Client.HandleMessage(msg); - - delete msg; } - + // Thread-safe enet_packet_destroy(event.packet); - - break; - } - - case ENET_EVENT_TYPE_NONE: - break; } } - -} - -void CNetClientSession::Flush() -{ - PROFILE3("net client flush"); - - ENSURE(m_Host && m_Server); - - enet_host_flush(m_Host); } bool CNetClientSession::SendMessage(const CNetMessage* message) { ENSURE(m_Host && m_Server); - return CNetHost::SendMessage(message, m_Server, "server"); + // Thread-safe. + ENetPacket* packet = CNetHost::CreatePacket(message); + if (!packet) + return false; + + if (!m_OutgoingMessages.push(packet)) + { + LOGERROR("NetClient: Failed to push message on the outgoing queue."); + return false; + } + + return true; } u32 CNetClientSession::GetLastReceivedTime() const @@ -213,7 +216,7 @@ u32 CNetClientSession::GetLastReceivedTime() const if (!m_Server) return 0; - return enet_time_get() - m_Server->lastReceiveTime; + return m_LastReceivedTime; } u32 CNetClientSession::GetMeanRTT() const @@ -221,12 +224,7 @@ u32 CNetClientSession::GetMeanRTT() const if (!m_Server) return 0; - return m_Server->roundTripTime; -} - -void CNetClientSession::SetLongTimeout(bool enabled) -{ - SetEnetLongTimeout(m_Server, m_IsLocalClient, enabled); + return m_MeanRTT; } CNetServerSession::CNetServerSession(CNetServerWorker& server, ENetPeer* peer) : @@ -289,14 +287,4 @@ void CNetServerSession::SetLocalClient(bool isLocalClient) if (!isLocalClient) return; - - // Prevent the local client of the host from timing out too quickly -#if (ENET_VERSION >= ENET_VERSION_CREATE(1, 3, 4)) - enet_peer_timeout(m_Peer, 0, MAXIMUM_HOST_TIMEOUT, MAXIMUM_HOST_TIMEOUT); -#endif -} - -void CNetServerSession::SetLongTimeout(bool enabled) -{ - SetEnetLongTimeout(m_Peer, m_IsLocalClient, enabled); } diff --git a/source/network/NetSession.h b/source/network/NetSession.h index 4730bd2f43..e26effa9bd 100644 --- a/source/network/NetSession.h +++ b/source/network/NetSession.h @@ -18,20 +18,20 @@ #ifndef NETSESSION_H #define NETSESSION_H +#include "lib/external_libraries/enet.h" #include "network/fsm.h" #include "network/NetFileTransfer.h" #include "network/NetHost.h" #include "ps/CStr.h" +#include + +#include + /** * Report the peer if we didn't receive a packet after this time (milliseconds). */ -extern const u32 NETWORK_WARNING_TIMEOUT; - -/** - * Maximum timeout of the local client of the host (milliseconds). - */ -extern const u32 MAXIMUM_HOST_TIMEOUT; +inline constexpr u32 NETWORK_WARNING_TIMEOUT = 2000; class CNetClient; class CNetServerWorker; @@ -62,6 +62,7 @@ public: /** * The client end of a network session. * Provides an abstraction of the network interface, allowing communication with the server. + * The NetClientSession is threaded, so all calls to the public interface must be thread-safe. */ class CNetClientSession : public INetSession { @@ -74,25 +75,25 @@ public: bool Connect(const CStr& server, const u16 port, const bool isLocalClient, ENetHost* enetClient); /** - * Process queued incoming messages. + * The client NetSession is threaded to avoid getting timeouts if the main thread hangs. + * Call Connect() before starting this loop. */ - void Poll(); + static void RunNetLoop(CNetClientSession* session); /** - * Flush queued outgoing network messages. + * Shut down the net session. */ - void Flush(); + void Shutdown(); /** - * Disconnect from the server. - * Sends a disconnection notification to the server. + * Processes pending messages. */ - void Disconnect(NetDisconnectReason reason); + void ProcessPolledMessages(); /** - * Send a message to the server. + * Queue up a message to send to the server on the next Loop() call. */ - virtual bool SendMessage(const CNetMessage* message); + virtual bool SendMessage(const CNetMessage* message) override; /** * Number of milliseconds since the most recent packet of the server was received. @@ -104,19 +105,35 @@ public: */ u32 GetMeanRTT() const; - /** - * Allows increasing the timeout to prevent drops during an expensive operation, - * and decreasing it back to normal afterwards. - */ - void SetLongTimeout(bool longTimeout); - CNetFileTransferer& GetFileTransferer() { return m_FileTransferer; } - private: + /** + * Process queued incoming messages. + */ + void Poll(); + + /** + * Flush queued outgoing network messages. + */ + void Flush(); + CNetClient& m_Client; CNetFileTransferer m_FileTransferer; + // Net messages received and waiting for fetching. + boost::lockfree::queue m_IncomingMessages; + // Net messages to send on the next flush() call. + boost::lockfree::queue m_OutgoingMessages; + + // Wrapper around enet stats - those are atomic as the code is lock-free. + std::atomic m_LastReceivedTime; + std::atomic m_MeanRTT; + + // If this is true, calling Connect() or deleting the session is an error. + std::atomic m_LoopRunning; + std::atomic m_ShouldShutdown; + ENetHost* m_Host; ENetPeer* m_Server; CNetStatsTable* m_Stats; @@ -188,12 +205,6 @@ public: */ void SetLocalClient(bool isLocalClient); - /** - * Allows increasing the timeout to prevent drops during an expensive operation, - * and decreasing it back to normal afterwards. - */ - void SetLongTimeout(bool longTimeout); - /** * Send a message to the client. */