1
0
forked from mirrors/0ad

Parallelize the execution of range queries

Use futures to spawn several threads that concurrently work through the
active range queries each turn instead of doing everything serially on
the main thread -- similar to how the pathfinder computes pathfinding
requests asynchronously. This significantly increases performance.
Note: It still can't run in parallel to the rest of the simulation update since
the range computations depend on the state of the simulation (like the
position of units).
This commit is contained in:
Vantha
2025-11-10 16:01:37 +01:00
parent a32a28a5e3
commit ccd1046d67
@@ -31,7 +31,9 @@
#include "maths/MathUtil.h"
#include "maths/Sqrt.h"
#include "ps/CLogger.h"
#include "ps/Future.h"
#include "ps/Profile.h"
#include "ps/TaskManager.h"
#include "renderer/Scene.h"
#include "simulation2/MessageTypes.h"
#include "simulation2/components/ICmpFogging.h"
@@ -415,8 +417,14 @@ public:
std::map<tag_t, Query> m_Queries;
EntityMap<EntityData> m_EntityData;
using RangeUpdateMessage = std::pair<entity_id_t, CMessageRangeUpdate>;
FastSpatialSubdivision m_Subdivision; // spatial index of m_EntityData
std::vector<entity_id_t> m_SubdivisionResults;
// One persistent buffer for each hardware thread including the main thread.
// Used to temporarily store the results of all the asynchronous spatial subdivision queries
// (which can get quite long) always in the same place to minimize reallocations and memory fragmentation.
std::vector<std::vector<entity_id_t>> m_SubdivisionResultBuffers;
// LOS state:
static const player_id_t MAX_LOS_PLAYER_ID = 16;
@@ -478,7 +486,10 @@ public:
// SetBounds is called)
ResetSubdivisions(entity_pos_t::FromInt(1024), entity_pos_t::FromInt(1024));
m_SubdivisionResults.reserve(4096);
m_SubdivisionResultBuffers.resize(g_TaskManager.GetNumberOfWorkers() + 1);
for (std::vector<entity_id_t>& buffer : m_SubdivisionResultBuffers)
buffer.reserve(4096);
// The whole map should be visible to Gaia by default, else e.g. animals
// will get confused when trying to run from enemies
@@ -1025,7 +1036,7 @@ public:
{
Query q = ConstructQuery(INVALID_ENTITY, minRange, maxRange, owners, requiredInterface, GetEntityFlagMask("normal"), accountForSize);
std::vector<entity_id_t> r;
PerformQuery(q, r, pos);
PerformQuery(q, r, pos, m_SubdivisionResultBuffers[0]);
// Return the list sorted by distance from the entity
std::stable_sort(r.begin(), r.end(), EntityDistanceOrdering(m_EntityData, pos));
@@ -1051,7 +1062,7 @@ public:
}
CFixedVector2D pos = cmpSourcePosition->GetPosition2D();
PerformQuery(q, r, pos);
PerformQuery(q, r, pos, m_SubdivisionResultBuffers[0]);
// Return the list sorted by distance from the entity
std::stable_sort(r.begin(), r.end(), EntityDistanceOrdering(m_EntityData, pos));
@@ -1084,7 +1095,7 @@ public:
}
CFixedVector2D pos = cmpSourcePosition->GetPosition2D();
PerformQuery(q, r, pos);
PerformQuery(q, r, pos, m_SubdivisionResultBuffers[0]);
q.lastMatch = r;
@@ -1138,55 +1149,101 @@ public:
{
PROFILE3("ExecuteActiveQueries");
// Store a queue of all messages before sending any, so we can assume
// no entities will move until we've finished checking all the ranges
std::vector<std::pair<entity_id_t, CMessageRangeUpdate> > messages;
std::vector<entity_id_t> results;
std::vector<entity_id_t> added;
std::vector<entity_id_t> removed;
std::mutex mtx;
for (std::map<tag_t, Query>::iterator it = m_Queries.begin(); it != m_Queries.end(); ++it)
{
Query& query = it->second;
// Points to the first unexecuted query (to be processed next).
std::map<tag_t, Query>::iterator it = m_Queries.begin();
if (!query.enabled)
continue;
// Store a queue of all range update messages before sending any, since they modify the state of the simulation
// (including this component), which would interfere with the asynchronous tasks.
// Important: The order of messages must be fully deterministic.
std::vector<std::optional<RangeUpdateMessage>> messages(m_Queries.size());
results.clear();
CmpPtr<ICmpPosition> cmpSourcePosition(query.source);
if (cmpSourcePosition && cmpSourcePosition->IsInWorld())
{
results.reserve(query.lastMatch.size());
PerformQuery(query, results, cmpSourcePosition->GetPosition2D());
}
// Used to write update messages to the corresponding index in the message vector and maintain the original order.
size_t messageIdx = 0;
// Compute the changes vs the last match
added.clear();
removed.clear();
// Return the 'added' list sorted by distance from the entity
// (Don't bother sorting 'removed' because they might not even have positions or exist any more)
std::set_difference(results.begin(), results.end(), query.lastMatch.begin(), query.lastMatch.end(),
std::back_inserter(added));
std::set_difference(query.lastMatch.begin(), query.lastMatch.end(), results.begin(), results.end(),
std::back_inserter(removed));
if (added.empty() && removed.empty())
continue;
const auto ProcessQueriesAsync = [&](std::vector<entity_id_t>& subdivisionResultsBuffer) {
PROFILE2("Async range query execution");
if (cmpSourcePosition && cmpSourcePosition->IsInWorld())
std::stable_sort(added.begin(), added.end(), EntityDistanceOrdering(m_EntityData, cmpSourcePosition->GetPosition2D()));
std::vector<entity_id_t> results;
std::vector<entity_id_t> added;
std::vector<entity_id_t> removed;
messages.resize(messages.size() + 1);
std::pair<entity_id_t, CMessageRangeUpdate>& back = messages.back();
back.first = query.source.GetId();
back.second.tag = it->first;
back.second.added.swap(added);
back.second.removed.swap(removed);
query.lastMatch.swap(results);
}
while (true)
{
size_t idxCopy;
std::map<tag_t, Query>::iterator itCopy;
{
// Critical section:
// Retrieve the next query to process or stop if none are left.
std::lock_guard lg(mtx);
if (it == m_Queries.end())
break;
itCopy = it++; // Only copy the iterator now and dereference it later, outside the critical section.
idxCopy = messageIdx++;
}
tag_t tag = itCopy->first;
Query& query = itCopy->second;
if (!query.enabled)
continue;
results.clear();
CmpPtr<ICmpPosition> cmpSourcePosition(query.source);
if (cmpSourcePosition && cmpSourcePosition->IsInWorld())
{
results.reserve(query.lastMatch.size());
PerformQuery(query, results, cmpSourcePosition->GetPosition2D(), subdivisionResultsBuffer);
}
// Compute the changes vs the last match
added.clear();
removed.clear();
// Return the 'added' list sorted by distance from the entity
// (Don't bother sorting 'removed' because they might not even have positions or exist any more)
std::set_difference(results.begin(), results.end(), query.lastMatch.begin(), query.lastMatch.end(),
std::back_inserter(added));
std::set_difference(query.lastMatch.begin(), query.lastMatch.end(), results.begin(), results.end(),
std::back_inserter(removed));
if (added.empty() && removed.empty())
continue;
if (cmpSourcePosition && cmpSourcePosition->IsInWorld())
std::stable_sort(added.begin(), added.end(), EntityDistanceOrdering(m_EntityData, cmpSourcePosition->GetPosition2D()));
// Safe because it's guaranteed that no two threads can write to the same index anyway.
messages[idxCopy].emplace(
query.source.GetId(),
CMessageRangeUpdate(tag, std::move(added), std::move(removed))
);
query.lastMatch.swap(results);
}
};
size_t numFutures = std::min(g_TaskManager.GetNumberOfWorkers(), m_Queries.size());
std::vector<Future<void>> futures;
futures.reserve(numFutures);
for (size_t i = 0; i < numFutures; i++)
futures.push_back({g_TaskManager,
[&ProcessQueriesAsync, &subdivisionResultsBuffer = m_SubdivisionResultBuffers[i]]() {
ProcessQueriesAsync(subdivisionResultsBuffer);
}
});
// Start working in the main thread as well.
ProcessQueriesAsync(m_SubdivisionResultBuffers[numFutures]);
for (Future<void>& future : futures)
future.Get();
CComponentManager& cmpMgr = GetSimContext().GetComponentManager();
for (size_t i = 0; i < messages.size(); ++i)
cmpMgr.PostMessage(messages[i].first, messages[i].second);
for (const auto& msg : messages)
if (msg.has_value())
cmpMgr.PostMessage(msg->first, msg->second);
}
/**
@@ -1220,7 +1277,7 @@ public:
/**
* Returns a list of distinct entity IDs that match the given query, sorted by ID.
*/
void PerformQuery(const Query& q, std::vector<entity_id_t>& r, CFixedVector2D pos)
void PerformQuery(const Query& q, std::vector<entity_id_t>& r, CFixedVector2D pos, std::vector<uint32_t>& subdivisionResultsBuffer)
{
// Special case: range is ALWAYS_IN_RANGE means check all entities ignoring distance.
@@ -1242,18 +1299,18 @@ public:
CFixedVector3D pos3d = cmpSourcePosition->GetPosition()+
CFixedVector3D(entity_pos_t::Zero(), q.yOrigin, entity_pos_t::Zero()) ;
// Get a quick list of entities that are potentially in range, with a cutoff of 2*maxRange.
m_SubdivisionResults.clear();
m_Subdivision.GetNear(m_SubdivisionResults, pos, q.maxRange * 2);
subdivisionResultsBuffer.clear();
m_Subdivision.GetNear(subdivisionResultsBuffer, pos, q.maxRange * 2);
for (size_t i = 0; i < m_SubdivisionResults.size(); ++i)
for (size_t i = 0; i < subdivisionResultsBuffer.size(); ++i)
{
EntityMap<EntityData>::const_iterator it = m_EntityData.find(m_SubdivisionResults[i]);
EntityMap<EntityData>::const_iterator it = m_EntityData.find(subdivisionResultsBuffer[i]);
ENSURE(it != m_EntityData.end());
if (!TestEntityQuery(q, it->first, it->second))
continue;
CmpPtr<ICmpPosition> cmpSecondPosition(GetSimContext(), m_SubdivisionResults[i]);
CmpPtr<ICmpPosition> cmpSecondPosition(GetSimContext(), subdivisionResultsBuffer[i]);
if (!cmpSecondPosition || !cmpSecondPosition->IsInWorld())
continue;
CFixedVector3D secondPosition = cmpSecondPosition->GetPosition();
@@ -1281,12 +1338,12 @@ public:
else
{
// Get a quick list of entities that are potentially in range
m_SubdivisionResults.clear();
m_Subdivision.GetNear(m_SubdivisionResults, pos, q.maxRange);
subdivisionResultsBuffer.clear();
m_Subdivision.GetNear(subdivisionResultsBuffer, pos, q.maxRange);
for (size_t i = 0; i < m_SubdivisionResults.size(); ++i)
for (size_t i = 0; i < subdivisionResultsBuffer.size(); ++i)
{
EntityMap<EntityData>::const_iterator it = m_EntityData.find(m_SubdivisionResults[i]);
EntityMap<EntityData>::const_iterator it = m_EntityData.find(subdivisionResultsBuffer[i]);
ENSURE(it != m_EntityData.end());
if (!TestEntityQuery(q, it->first, it->second))