From 31d9b0ed052cdfaa475023e6ddf9c3e735c5bf94 Mon Sep 17 00:00:00 2001 From: phosit Date: Wed, 18 Sep 2024 18:40:21 +0200 Subject: [PATCH] Make the TaskManager a automatic livetime object This way the destructor can be used for cleanup. --- source/graphics/MapReader.cpp | 2 +- source/graphics/TextureConverter.cpp | 2 +- source/main.cpp | 4 +- source/ps/TaskManager.cpp | 39 +++++-------------- source/ps/TaskManager.h | 15 +++---- source/ps/tests/test_TaskManager.h | 32 +++++++-------- .../simulation2/components/CCmpPathfinder.cpp | 15 +++---- source/test_setup.cpp | 7 +++- 8 files changed, 44 insertions(+), 72 deletions(-) diff --git a/source/graphics/MapReader.cpp b/source/graphics/MapReader.cpp index e03a594701..7bb14716db 100644 --- a/source/graphics/MapReader.cpp +++ b/source/graphics/MapReader.cpp @@ -1342,7 +1342,7 @@ int CMapReader::StartMapGeneration(const CStrW& scriptFile) m_GeneratorState = std::make_unique(); // The settings are stringified to pass them to the task. - m_GeneratorState->task = Threading::TaskManager::Instance().PushTask( + m_GeneratorState->task = g_TaskManager.PushTask( [&progress = m_GeneratorState->progress, scriptFile, settings = Script::StringifyJSON(rq, &m_ScriptSettings)](const StopToken stopToken) { diff --git a/source/graphics/TextureConverter.cpp b/source/graphics/TextureConverter.cpp index 468c38021f..fea45f0dcd 100644 --- a/source/graphics/TextureConverter.cpp +++ b/source/graphics/TextureConverter.cpp @@ -459,7 +459,7 @@ bool CTextureConverter::ConvertTexture(const CTexturePtr& texture, const VfsPath delete[] rgba; } - m_ResultQueue.push(Threading::TaskManager::Instance().PushTask([request = std::move(request)] + m_ResultQueue.push(g_TaskManager.PushTask([request = std::move(request)] { PROFILE2("compress"); // Set up the result object diff --git a/source/main.cpp b/source/main.cpp index 4885198c10..6d8e77ee99 100644 --- a/source/main.cpp +++ b/source/main.cpp @@ -576,7 +576,7 @@ static void RunGameOrAtlas(const PS::span argv) CXeromycesEngine xeromycesEngine; // Initialise the global task manager at this point (JS & Profiler2 are set up). - Threading::TaskManager::Initialise(); + Threading::TaskManager taskManager; if (ATLAS_RunIfOnCmdLine(args, false)) return; @@ -719,8 +719,6 @@ static void RunGameOrAtlas(const PS::span argv) if (g_Shutdown == ShutdownType::RestartAsAtlas) ATLAS_RunIfOnCmdLine(args, true); #endif - - Threading::TaskManager::Instance().ClearQueue(); } #if OS_ANDROID diff --git a/source/ps/TaskManager.cpp b/source/ps/TaskManager.cpp index 1017ef241e..632a2ca2e4 100644 --- a/source/ps/TaskManager.cpp +++ b/source/ps/TaskManager.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2022 Wildfire Games. +/* Copyright (C) 2024 Wildfire Games. * This file is part of 0 A.D. * * 0 A.D. is free software: you can redistribute it and/or modify @@ -57,8 +57,6 @@ size_t GetDefaultNumberOfWorkers() } // anonymous namespace -std::unique_ptr g_TaskManager; - class Thread; using QueueItem = std::function; @@ -127,8 +125,14 @@ public: Impl() = default; ~Impl() { - ClearQueue(); - m_Workers.clear(); + { + std::lock_guard lock(m_GlobalMutex); + ENSURE(m_GlobalQueue.empty()); + } + { + std::lock_guard lock(m_GlobalLowPriorityMutex); + ENSURE(m_GlobalLowPriorityQueue.empty()); + } } /** @@ -179,19 +183,6 @@ void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers) m_Workers.emplace_back(*this); } -void TaskManager::ClearQueue() { m->ClearQueue(); } -void TaskManager::Impl::ClearQueue() -{ - { - std::lock_guard lock(m_GlobalMutex); - m_GlobalQueue.clear(); - } - { - std::lock_guard lock(m_GlobalLowPriorityMutex); - m_GlobalLowPriorityQueue.clear(); - } -} - size_t TaskManager::GetNumberOfWorkers() const { return m->m_Workers.size(); @@ -236,18 +227,6 @@ bool TaskManager::Impl::PopTask(std::function& taskOut) return false; } -void TaskManager::Initialise() -{ - if (!g_TaskManager) - g_TaskManager = std::make_unique(); -} - -TaskManager& TaskManager::Instance() -{ - ENSURE(g_TaskManager); - return *g_TaskManager; -} - // Thread definition WorkerThread::WorkerThread(TaskManager::Impl& taskManager) diff --git a/source/ps/TaskManager.h b/source/ps/TaskManager.h index 5e3a007603..04b3d00d08 100644 --- a/source/ps/TaskManager.h +++ b/source/ps/TaskManager.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2022 Wildfire Games. +/* Copyright (C) 2024 Wildfire Games. * This file is part of 0 A.D. * * 0 A.D. is free software: you can redistribute it and/or modify @@ -19,6 +19,7 @@ #define INCLUDED_THREADING_TASKMANAGER #include "ps/Future.h" +#include "ps/Singleton.h" #include #include @@ -36,7 +37,7 @@ enum class TaskPriority * and manages the task queues. * See implementation for additional comments. */ -class TaskManager +class TaskManager : public Singleton { friend class WorkerThread; public: @@ -47,14 +48,6 @@ public: TaskManager& operator=(const TaskManager&) = delete; TaskManager& operator=(TaskManager&&) = delete; - static void Initialise(); - static TaskManager& Instance(); - - /** - * Clears all tasks from the queue. This blocks on started tasks. - */ - void ClearQueue(); - /** * @return the number of threaded workers. */ @@ -81,4 +74,6 @@ private: }; } // namespace Threading +#define g_TaskManager Threading::TaskManager::GetSingleton() + #endif // INCLUDED_THREADING_TASKMANAGER diff --git a/source/ps/tests/test_TaskManager.h b/source/ps/tests/test_TaskManager.h index 3e62ee392c..5fde2dfa2c 100644 --- a/source/ps/tests/test_TaskManager.h +++ b/source/ps/tests/test_TaskManager.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2021 Wildfire Games. +/* Copyright (C) 2024 Wildfire Games. * This file is part of 0 A.D. * * 0 A.D. is free software: you can redistribute it and/or modify @@ -29,13 +29,12 @@ class TestTaskManager : public CxxTest::TestSuite public: void test_basic() { - Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); // There is a minimum of 3. - TS_ASSERT(taskManager.GetNumberOfWorkers() >= 3); + TS_ASSERT(g_TaskManager.GetNumberOfWorkers() >= 3); std::atomic tasks_run = 0; auto increment_run = [&tasks_run]() { tasks_run++; }; - Future future = taskManager.PushTask(increment_run); + Future future = g_TaskManager.PushTask(increment_run); future.Wait(); TS_ASSERT_EQUALS(tasks_run.load(), 1); @@ -43,7 +42,7 @@ public: std::condition_variable cv; std::mutex mutex; std::atomic go = false; - future = taskManager.PushTask([&]() { + future = g_TaskManager.PushTask([&]() { std::unique_lock lock(mutex); cv.wait(lock, [&go]() -> bool { return go; }); lock.unlock(); @@ -67,44 +66,41 @@ public: void test_Priority() { - Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); std::atomic tasks_run = 0; // Push general tasks auto increment_run = [&tasks_run]() { tasks_run++; }; - Future future = taskManager.PushTask(increment_run); - Future futureLow = taskManager.PushTask(increment_run, Threading::TaskPriority::LOW); + Future future = g_TaskManager.PushTask(increment_run); + Future futureLow = g_TaskManager.PushTask(increment_run, Threading::TaskPriority::LOW); future.Wait(); futureLow.Wait(); TS_ASSERT_EQUALS(tasks_run.load(), 2); // Also check with no waiting expected. - taskManager.PushTask(increment_run).Wait(); + g_TaskManager.PushTask(increment_run).Wait(); TS_ASSERT_EQUALS(tasks_run.load(), 3); - taskManager.PushTask(increment_run, Threading::TaskPriority::LOW).Wait(); + g_TaskManager.PushTask(increment_run, Threading::TaskPriority::LOW).Wait(); TS_ASSERT_EQUALS(tasks_run.load(), 4); } void test_Load() { - Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); - #define ITERATIONS 100000 std::vector> futures; futures.resize(ITERATIONS); std::vector values(ITERATIONS); - auto f1 = taskManager.PushTask([&taskManager, &futures]() { + auto f1 = g_TaskManager.PushTask([&futures]() { for (u32 i = 0; i < ITERATIONS; i+=3) - futures[i] = taskManager.PushTask([]() { return 5; }); + futures[i] = g_TaskManager.PushTask([]() { return 5; }); }); - auto f2 = taskManager.PushTask([&taskManager, &futures]() { + auto f2 = g_TaskManager.PushTask([&futures]() { for (u32 i = 1; i < ITERATIONS; i+=3) - futures[i] = taskManager.PushTask([]() { return 5; }, Threading::TaskPriority::LOW); + futures[i] = g_TaskManager.PushTask([]() { return 5; }, Threading::TaskPriority::LOW); }); - auto f3 = taskManager.PushTask([&taskManager, &futures]() { + auto f3 = g_TaskManager.PushTask([&futures]() { for (u32 i = 2; i < ITERATIONS; i+=3) - futures[i] = taskManager.PushTask([]() { return 5; }); + futures[i] = g_TaskManager.PushTask([]() { return 5; }); }); f1.Wait(); diff --git a/source/simulation2/components/CCmpPathfinder.cpp b/source/simulation2/components/CCmpPathfinder.cpp index 3c7eab6b46..f50f472b19 100644 --- a/source/simulation2/components/CCmpPathfinder.cpp +++ b/source/simulation2/components/CCmpPathfinder.cpp @@ -59,7 +59,7 @@ void CCmpPathfinder::Init(const CParamNode& UNUSED(paramNode)) m_AtlasOverlay = NULL; - size_t workerThreads = Threading::TaskManager::Instance().GetNumberOfWorkers(); + size_t workerThreads = g_TaskManager.GetNumberOfWorkers(); // Store one vertex pathfinder for each thread (including the main thread). while (m_VertexPathfinders.size() < workerThreads + 1) m_VertexPathfinders.emplace_back(m_GridSize, m_TerrainOnlyGrid); @@ -825,17 +825,18 @@ void CCmpPathfinder::StartProcessingMoves(bool useMax) m_ShortPathRequests.PrepareForComputation(useMax ? m_MaxSameTurnMoves : 0); m_LongPathRequests.PrepareForComputation(useMax ? m_MaxSameTurnMoves : 0); - Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); for (size_t i = 0; i < m_Futures.size(); ++i) { ENSURE(!m_Futures[i].Valid()); // Pass the i+1th vertex pathfinder to keep the first for the main thread, // each thread get its own instance to avoid conflicts in cached data. - m_Futures[i] = taskManager.PushTask([&pathfinder=*this, &vertexPfr=m_VertexPathfinders[i + 1]]() { - PROFILE2("Async pathfinding"); - pathfinder.m_ShortPathRequests.Compute(pathfinder, vertexPfr); - pathfinder.m_LongPathRequests.Compute(pathfinder, *pathfinder.m_LongPathfinder); - }); + m_Futures[i] = g_TaskManager.PushTask( + [&pathfinder=*this, &vertexPfr=m_VertexPathfinders[i + 1]]() + { + PROFILE2("Async pathfinding"); + pathfinder.m_ShortPathRequests.Compute(pathfinder, vertexPfr); + pathfinder.m_LongPathRequests.Compute(pathfinder, *pathfinder.m_LongPathfinder); + }); } } diff --git a/source/test_setup.cpp b/source/test_setup.cpp index 98ea3c2293..82ba40c16f 100644 --- a/source/test_setup.cpp +++ b/source/test_setup.cpp @@ -38,6 +38,8 @@ #include "scriptinterface/ScriptContext.h" #include "scriptinterface/ScriptInterface.h" +#include + class LeakReporter : public CxxTest::GlobalFixture { virtual bool tearDownWorld() @@ -75,14 +77,14 @@ class MiscSetup : public CxxTest::GlobalFixture m_ScriptEngine = new ScriptEngine; g_ScriptContext = ScriptContext::CreateContext(); - Threading::TaskManager::Initialise(); + taskManager.emplace(); return true; } virtual bool tearDownWorld() { - Threading::TaskManager::Instance().ClearQueue(); + taskManager.reset(); g_ScriptContext.reset(); SAFE_DELETE(m_ScriptEngine); g_Profiler2.Shutdown(); @@ -103,6 +105,7 @@ private: // We're doing the initialization and shutdown of the ScriptEngine explicitly here // to make sure it's only initialized when setUpWorld is called. ScriptEngine* m_ScriptEngine; + std::optional taskManager; }; static LeakReporter leakReporter;