diff --git a/CMakeLists.txt b/CMakeLists.txt index 5da1e88bd..b841d26eb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -201,6 +201,7 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp src/query/processor/cypher/runtime/InstanceHandler.cpp src/query/processor/cypher/runtime/OperatorExecutor.cpp src/query/processor/cypher/util/SharedBuffer.cpp + src/query/processor/executor/IntraPartitionParallelExecutor.cpp src/nativestore/MetaPropertyLink.cpp src/nativestore/MetaPropertyEdgeLink.cpp src/nativestore/MetaPropertyLink.cpp diff --git a/conf/jasminegraph-server.properties b/conf/jasminegraph-server.properties index 237ce4f49..f65d58874 100644 --- a/conf/jasminegraph-server.properties +++ b/conf/jasminegraph-server.properties @@ -124,5 +124,12 @@ org.jasminegraph.vectorstore.dimension=768 org.jasminegraph.vectorstore.embedding.model=nomic-embed-text org.jasminegraph.vectorstore.embedding.ollama.endpoint=http://172.30.5.100:11441 +#-------------------------------------------------------------------------------- +# Query execution thread pool settings +#-------------------------------------------------------------------------------- +#Maximum number of worker threads for intra-partition parallel query execution +#If not set, defaults to 32. +org.jasminegraph.query.threadpool.maxworkers=32 + diff --git a/src/nativestore/NodeBlock.cpp b/src/nativestore/NodeBlock.cpp index 3b0cc3b0c..ceba1317a 100644 --- a/src/nativestore/NodeBlock.cpp +++ b/src/nativestore/NodeBlock.cpp @@ -412,14 +412,11 @@ std::list> NodeBlock::getAllEdgeNodes() { return allEdges; } -std::map NodeBlock::getAllProperties() { - std::map allProperties; +std::map> NodeBlock::getAllProperties() { + std::map> allProperties; PropertyLink* current = this->getPropertyHead(); while (current) { - // don't forget to free the allocated memory after using this method - char* copiedValue = new char[PropertyLink::MAX_VALUE_SIZE]; - std::strncpy(copiedValue, current->value, PropertyLink::MAX_VALUE_SIZE); - allProperties.insert({current->name, copiedValue}); + allProperties.try_emplace(current->name, current->value); PropertyLink* temp = current->next(); delete current; // To prevent memory leaks current = temp; @@ -486,8 +483,8 @@ NodeBlock* NodeBlock::get(unsigned int blockAddress) { new NodeBlock(id, nodeId, blockAddress, propRef, metaPropRef, edgeRef, centralEdgeRef, edgeRefPID, label, usage); if (nodeBlockPointer->id.length() == 0) { // if label not found in node block look in the properties - std::map props = nodeBlockPointer->getAllProperties(); - if (props["label"]) { + std::map> props = nodeBlockPointer->getAllProperties(); + if (!props["label"].empty()) { nodeBlockPointer->id = props["label"]; } else { node_block_logger.error("Could not find node ID/Label for node with block address = " + diff --git a/src/nativestore/NodeBlock.h b/src/nativestore/NodeBlock.h index 112784089..a229ee32c 100644 --- a/src/nativestore/NodeBlock.h +++ b/src/nativestore/NodeBlock.h @@ -77,7 +77,7 @@ class NodeBlock { std::map getProperty(std::string); PropertyLink *getPropertyHead(); MetaPropertyLink *getMetaPropertyHead(); - std::map getAllProperties(); + std::map> getAllProperties(); bool updateLocalRelation(RelationBlock *, bool relocateHead = true); bool updateCentralRelation(RelationBlock *newRelation, bool relocateHead = true); diff --git a/src/nativestore/NodeManager.cpp b/src/nativestore/NodeManager.cpp index 4569d83c9..18f7759a0 100644 --- a/src/nativestore/NodeManager.cpp +++ b/src/nativestore/NodeManager.cpp @@ -400,11 +400,16 @@ int NodeManager::dbSize(std::string path) { * @Deprecated use NodeBlock.get() instead **/ NodeBlock *NodeManager::get(std::string nodeId) { - NodeBlock *nodeBlockPointer = NULL; + NodeBlock *nodeBlockPointer = nullptr; if (this->nodeIndex.find(nodeId) == this->nodeIndex.end()) { // Not found return nodeBlockPointer; } unsigned int nodeIndex = this->nodeIndex[nodeId]; + return get(nodeIndex, nodeId); +} + +NodeBlock *NodeManager::get(unsigned int nodeIndex, const std::string& nodeId) { + NodeBlock *nodeBlockPointer = nullptr; const unsigned int blockAddress = nodeIndex * NodeBlock::BLOCK_SIZE; NodeBlock::nodesDB->seekg(blockAddress); unsigned int vertexId; @@ -455,11 +460,10 @@ NodeBlock *NodeManager::get(std::string nodeId) { nodeBlockPointer = new NodeBlock(nodeId, vertexId, blockAddress, propRef, metaPropRef, edgeRef, centralEdgeRef, edgeRefPID, label, usage); - node_manager_logger.debug("DEBUG: nodeBlockPointer after creating the object edgeRef " + - std::to_string(nodeBlockPointer->edgeRef)); if (nodeBlockPointer->edgeRef % RelationBlock::BLOCK_SIZE != 0) { - node_manager_logger.error("Exception: Invalid edge reference address = " + nodeBlockPointer->edgeRef); + node_manager_logger.error("Exception: Invalid edge reference address = " + + std::to_string(nodeBlockPointer->edgeRef)); } return nodeBlockPointer; } diff --git a/src/nativestore/NodeManager.h b/src/nativestore/NodeManager.h index f6331acf7..d5dfef11d 100644 --- a/src/nativestore/NodeManager.h +++ b/src/nativestore/NodeManager.h @@ -55,7 +55,10 @@ class NodeManager { std::unordered_map nodeIndex; std::unordered_map edgeIndex; NodeManager(GraphConfig); - ~NodeManager() { delete NodeBlock::nodesDB; }; + + NodeBlock *get(std::string); + static NodeBlock *get(unsigned int nodeIndex, const std::string& nodeId); + void addNode(NodeBlock nodeBlock); void setIndexKeySize(unsigned long); static int dbSize(std::string path); @@ -72,7 +75,6 @@ class NodeManager { RelationBlock* addCentralRelation(NodeBlock source, NodeBlock destination); NodeBlock* addNode(std::string); // will return DB block address - NodeBlock* get(std::string); std::list getCentralGraph(); std::list getLimitedGraph(int limit = 10); diff --git a/src/nativestore/RelationBlock.cpp b/src/nativestore/RelationBlock.cpp index 2d1189a35..045cbbfba 100644 --- a/src/nativestore/RelationBlock.cpp +++ b/src/nativestore/RelationBlock.cpp @@ -849,14 +849,11 @@ MetaPropertyEdgeLink *RelationBlock::getMetaPropertyHead() { return MetaPropertyEdgeLink::get(this->metaPropertyAddress); } -std::map RelationBlock::getAllProperties() { - std::map allProperties; +std::map> RelationBlock::getAllProperties() { + std::map> allProperties; PropertyEdgeLink* current = this->getPropertyHead(); while (current) { - // don't forget to free the allocated memory after using this method - char* copiedValue = new char[PropertyEdgeLink::MAX_VALUE_SIZE]; - std::strncpy(copiedValue, current->value, PropertyEdgeLink::MAX_VALUE_SIZE); - allProperties.insert({current->name, copiedValue}); + allProperties.try_emplace(current->name, current->value); PropertyEdgeLink* temp = current->next(); delete current; // To prevent memory leaks current = temp; diff --git a/src/nativestore/RelationBlock.h b/src/nativestore/RelationBlock.h index c75e9b20e..01c1661dd 100644 --- a/src/nativestore/RelationBlock.h +++ b/src/nativestore/RelationBlock.h @@ -169,7 +169,7 @@ class RelationBlock { std::string getCentralRelationshipType(); PropertyEdgeLink *getPropertyHead(); MetaPropertyEdgeLink *getMetaPropertyHead(); - std::map getAllProperties(); + std::map> getAllProperties(); }; #endif diff --git a/src/query/processor/cypher/runtime/OperatorExecutor.cpp b/src/query/processor/cypher/runtime/OperatorExecutor.cpp index 535ee730d..d0049b45b 100644 --- a/src/query/processor/cypher/runtime/OperatorExecutor.cpp +++ b/src/query/processor/cypher/runtime/OperatorExecutor.cpp @@ -18,16 +18,227 @@ limitations under the License. #include "../../../../util/telemetry/OpenTelemetryUtil.h" #include "Helpers.h" #include +#include #include #include #include +// Parallel processing configuration +// These values scale with the number of available workers to adapt to system capabilities +static constexpr size_t NODES_PER_WORKER_THRESHOLD = 125; +static constexpr size_t RELATIONS_PER_WORKER_THRESHOLD = 12500; +static constexpr size_t FILTER_BATCH_SIZE = 100; + +// Helper function to get dynamic parallel processing threshold for nodes +static size_t getNodeParallelThreshold(const IntraPartitionParallelExecutor* executor) { + if (!executor) return SIZE_MAX; // Disable parallel if no executor + + // Dynamic threshold based on worker count: scales with available parallelism + int workerCount = executor->getWorkerCount(); + return NODES_PER_WORKER_THRESHOLD * workerCount; +} + +// Helper function to get dynamic parallel processing threshold for relations +static size_t getRelationParallelThreshold(const IntraPartitionParallelExecutor* executor) { + if (!executor) return SIZE_MAX; + + int workerCount = executor->getWorkerCount(); + return RELATIONS_PER_WORKER_THRESHOLD * workerCount; +} + Logger execution_logger; std::unordered_map> OperatorExecutor::methodMap; + +// Initialize static parallel executor (shared across all instances) +std::unique_ptr OperatorExecutor::parallelExecutor = nullptr; + +// Helper function to extract node data and manage memory +static json extractNodeDataAndCleanup(NodeBlock* node) { + json nodeData; + std::string pid(node->getMetaPropertyHead()->value); + nodeData["partitionID"] = pid; + std::map> rawProps = node->getAllProperties(); + + for (const auto& [key, value] : rawProps) { + nodeData[key] = value; + } + return nodeData; +} + +// Helper function to extract relation data and manage memory +static json extractRelationDataAndCleanup(RelationBlock* relation) { + json relationData; + std::map> rawProps = relation->getAllProperties(); + + for (const auto& [key, value] : rawProps) { + relationData[key] = value; + } + return relationData; +} + +// Helper to safely close a database stream +static void closeDBStream(std::fstream*& db) { + if (db) { + db->close(); + db = nullptr; + } +} + +// Helper to open database files +static void openDatabaseFiles(const std::string& dbPrefix) { + std::ios_base::openmode openMode = std::ios::in | std::ios::out | std::ios::binary; + NodeBlock::nodesDB = Utils::openFile(dbPrefix + "_nodes.db", openMode); + RelationBlock::relationsDB = Utils::openFile(dbPrefix + "_relations.db", openMode); + RelationBlock::centralRelationsDB = Utils::openFile(dbPrefix + "_central_relations.db", openMode); + PropertyLink::propertiesDB = Utils::openFile(dbPrefix + "_properties.db", openMode); + MetaPropertyLink::metaPropertiesDB = Utils::openFile(dbPrefix + "_meta_properties.db", openMode); + PropertyEdgeLink::edgePropertiesDB = Utils::openFile(dbPrefix + "_edge_properties.db", openMode); + MetaPropertyEdgeLink::metaEdgePropertiesDB = Utils::openFile(dbPrefix + "_meta_edge_properties.db", openMode); +} + +// Initialize thread-local database connections +void initializeThreadLocalDBs(const GraphConfig& gc) { + // Use function-local static variables (thread-safe in C++11+) + static thread_local int currentPartitionID = -1; + static thread_local int currentGraphID = -1; + + // Check if already initialized for this partition + if (bool needsInit = (currentPartitionID != gc.partitionID || + currentGraphID != gc.graphID || + RelationBlock::relationsDB == nullptr); !needsInit) { + return; + } + + // Close existing connections + closeDBStream(NodeBlock::nodesDB); + closeDBStream(RelationBlock::relationsDB); + closeDBStream(RelationBlock::centralRelationsDB); + closeDBStream(PropertyLink::propertiesDB); + closeDBStream(MetaPropertyLink::metaPropertiesDB); + closeDBStream(PropertyEdgeLink::edgePropertiesDB); + closeDBStream(MetaPropertyEdgeLink::metaEdgePropertiesDB); + + std::string instanceDataFolderLocation = + Utils::getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder"); + std::string dbPrefix = instanceDataFolderLocation + "/g" + std::to_string(gc.graphID) + + "_p" + std::to_string(gc.partitionID); + + openDatabaseFiles(dbPrefix); + + currentPartitionID = gc.partitionID; + currentGraphID = gc.graphID; +} + +// Helper to add filtered results to buffer +static void addFilteredResults(const std::vector>& results, SharedBuffer& buffer) { + for (const auto& chunkResults : results) { + for (const auto& item : chunkResults) { + buffer.add(item); + } + } +} + +// Helper to process batch sequentially +static void processSequentially(const std::vector& batch, + FilterHelper& filterHelper, + SharedBuffer& buffer) { + for (const auto& item : batch) { + if (filterHelper.evaluate(item)) { + buffer.add(item); + } + } +} + +// Helper to process batch in parallel +static bool tryProcessInParallel(const std::vector& batch, + FilterHelper& filterHelper, + SharedBuffer& buffer, + IntraPartitionParallelExecutor* parallelExecutor) { + auto processor = [&filterHelper, &batch](const WorkChunk& chunk) { + std::vector localResults; + for (long i = chunk.startIndex - 1; i < chunk.endIndex && i < (long)batch.size(); ++i) { + if (filterHelper.evaluate(batch[i])) { + localResults.push_back(batch[i]); + } + } + return localResults; + }; + + auto results = parallelExecutor->processInParallel>( + static_cast(batch.size()), processor); + addFilteredResults(results, buffer); + return true; +} + +// Helper function to process batch filtering (reduce nesting complexity) +static void processBatch(const std::vector& batch, + FilterHelper& filterHelper, + SharedBuffer& buffer, + IntraPartitionParallelExecutor* parallelExecutor) { + if (bool useParallel = parallelExecutor && batch.size() > 500; + useParallel && tryProcessInParallel(batch, filterHelper, buffer, parallelExecutor)) { + return; + } + + processSequentially(batch, filterHelper, buffer); +} + +// Helper function to process a single relationship chunk +static std::vector processRelationshipChunk( + const WorkChunk& chunk, + const std::string& jsonPlan, + const GraphConfig& gc, + const std::string& masterIP, + long maxRelations) { + + initializeThreadLocalDBs(gc); + json queryJson = json::parse(jsonPlan); + std::vector results; + + string graphDirection = Utils::getGraphDirection(to_string(gc.graphID), masterIP); + bool isDirected = (graphDirection == "TRUE"); + bool isDirectionRight = (queryJson["direction"] == "right"); + + for (long i = chunk.startIndex; i <= chunk.endIndex && i < maxRelations; ++i) { + std::unique_ptr relation(RelationBlock::getLocalRelation(i * RelationBlock::BLOCK_SIZE)); + if (relation->getLocalRelationshipType() != queryJson["relType"]) { + continue; + } + + NodeBlock* startNode = relation->getSource(); + NodeBlock* destNode = relation->getDestination(); + + json startNodeData = extractNodeDataAndCleanup(startNode); + json destNodeData = extractNodeDataAndCleanup(destNode); + json relationData = extractRelationDataAndCleanup(relation.get()); + + json directionData; + string start = queryJson["sourceVariable"]; + string dest = queryJson["destVariable"]; + string rel = queryJson["relVariable"]; + + if (isDirectionRight) { + directionData[start] = startNodeData; + directionData[dest] = destNodeData; + } else if (!isDirected) { + directionData[start] = destNodeData; + directionData[dest] = startNodeData; + } + directionData[rel] = relationData; + results.push_back(directionData.dump()); + } + return results; +} + OperatorExecutor::OperatorExecutor(GraphConfig gc, std::string queryPlan, std::string masterIP): queryPlan(queryPlan), gc(gc), masterIP(masterIP) { this->query = json::parse(queryPlan); + + // Initialize parallel executor safely + if (!parallelExecutor) { + parallelExecutor = std::make_unique(); + } }; void OperatorExecutor::initializeMethodMap() { @@ -116,6 +327,20 @@ void OperatorExecutor::AllNodeScan(SharedBuffer &buffer, std::string jsonPlan, G json query = json::parse(jsonPlan); NodeManager nodeManager(gc); int nodeCount = 0; + + if (size_t nodeCount = nodeManager.nodeIndex.size(); + parallelExecutor && nodeCount > getNodeParallelThreshold(parallelExecutor.get()) && + parallelExecutor->shouldUseParallelProcessing(nodeCount)) { + try { + AllNodeScanParallel(buffer, jsonPlan, gc); + return; + } catch (const std::exception& e) { + execution_logger.warn("Parallel AllNodeScan failed, falling back to sequential: " + + std::string(e.what())); + } + } + + // Use sequential processing for small datasets (original code) for (auto it : nodeManager.nodeIndex) { json nodeData; auto nodeId = it.first; @@ -123,14 +348,10 @@ void OperatorExecutor::AllNodeScan(SharedBuffer &buffer, std::string jsonPlan, G std::string value(node->getMetaPropertyHead()->value); if (value == to_string(gc.partitionID)) { nodeData["partitionID"] = value; - std::map properties = node->getAllProperties(); + std::map> properties = node->getAllProperties(); for (auto property : properties) { nodeData[property.first] = property.second; } - for (auto& [key, value] : properties) { - delete[] value; // Free each allocated char* array - } - properties.clear(); json data; string variable = query["variables"]; @@ -150,7 +371,20 @@ void OperatorExecutor::NodeScanByLabel(SharedBuffer &buffer, std::string jsonPla OpenTelemetryUtil::addSpanAttribute("scan.label", query["Label"].get()); NodeManager nodeManager(gc); - int nodeCount = 0; + size_t nodeCount = nodeManager.nodeIndex.size(); + + if (parallelExecutor && nodeCount > getNodeParallelThreshold(parallelExecutor.get()) && + parallelExecutor->shouldUseParallelProcessing(nodeCount)) { + try { + NodeScanByLabelParallel(buffer, jsonPlan, gc); + return; + } catch (const std::exception& e) { + execution_logger.warn("Parallel NodeScanByLabel failed, falling back to sequential: " + + std::string(e.what())); + } + } + + // Use sequential processing for small datasets (original code) for (auto it : nodeManager.nodeIndex) { json nodeData; auto nodeId = it.first; @@ -159,14 +393,10 @@ void OperatorExecutor::NodeScanByLabel(SharedBuffer &buffer, std::string jsonPla std::string value(node->getMetaPropertyHead()->value); if (value == to_string(gc.partitionID) && label == query["Label"]) { nodeData["partitionID"] = value; - std::map properties = node->getAllProperties(); + std::map> properties = node->getAllProperties(); for (auto property : properties) { nodeData[property.first] = property.second; } - for (auto& [key, value] : properties) { - delete[] value; // Free each allocated char* array - } - properties.clear(); json data; string variable = query["variable"]; @@ -219,16 +449,28 @@ void OperatorExecutor::Filter(SharedBuffer &buffer, std::string jsonPlan, GraphC std::thread result(method, std::ref(*this), std::ref(sharedBuffer), query["NextOperator"], gc); auto condition = query["condition"]; - FilterHelper FilterHelper(condition.dump()); + FilterHelper filterHelper(condition.dump()); + + std::vector batch; + batch.reserve(FILTER_BATCH_SIZE); + while (true) { string raw = sharedBuffer.get(); if (raw == "-1") { + // Process remaining batch using helper function + IntraPartitionParallelExecutor* executor = parallelExecutor.get(); + processBatch(batch, filterHelper, buffer, executor); buffer.add(raw); result.join(); break; } - if (FilterHelper.evaluate(raw)) { - buffer.add(raw); + + batch.push_back(raw); + if (batch.size() >= 100) { + // Process and flush batch using helper function + IntraPartitionParallelExecutor* executor = parallelExecutor.get(); + processBatch(batch, filterHelper, buffer, executor); + batch.clear(); } } } @@ -264,34 +506,22 @@ void OperatorExecutor::UndirectedRelationshipTypeScan(SharedBuffer &buffer, std: std::string startPid(startNode->getMetaPropertyHead()->value); startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json rightDirectionData; string start = query["sourceVariable"]; @@ -337,35 +567,23 @@ void OperatorExecutor::UndirectedRelationshipTypeScan(SharedBuffer &buffer, std: continue; } startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json rightDirectionData; string start = query["sourceVariable"]; @@ -414,34 +632,22 @@ void OperatorExecutor::UndirectedAllRelationshipScan(SharedBuffer &buffer, std:: std::string startPid(startNode->getMetaPropertyHead()->value); startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json rightDirectionData; string start = query["sourceVariable"]; @@ -479,34 +685,22 @@ void OperatorExecutor::UndirectedAllRelationshipScan(SharedBuffer &buffer, std:: std::string startPid(startNode->getMetaPropertyHead()->value); startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json rightDirectionData; string start = query["sourceVariable"]; @@ -558,34 +752,22 @@ void OperatorExecutor::DirectedRelationshipTypeScan(SharedBuffer &buffer, std::s std::string startPid(startNode->getMetaPropertyHead()->value); startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json directionData; string start = query["sourceVariable"]; @@ -624,34 +806,22 @@ void OperatorExecutor::DirectedRelationshipTypeScan(SharedBuffer &buffer, std::s std::string startPid(startNode->getMetaPropertyHead()->value); startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json directionData; string start = query["sourceVariable"]; @@ -676,9 +846,23 @@ void OperatorExecutor::DirectedRelationshipTypeScan(SharedBuffer &buffer, std::s void OperatorExecutor::DirectedAllRelationshipScan(SharedBuffer &buffer, std::string jsonPlan, GraphConfig gc) { json query = json::parse(jsonPlan); NodeManager nodeManager(gc); - string direction = query["direction"]; + + // Check threshold for parallel execution const std::string& dbPrefix = nodeManager.getDbPrefix(); long localRelationCount = nodeManager.dbSize(dbPrefix + "_relations.db") / RelationBlock::BLOCK_SIZE; + + if (parallelExecutor && localRelationCount > getRelationParallelThreshold(parallelExecutor.get()) && + parallelExecutor->shouldUseParallelProcessing(localRelationCount)) { + try { + DirectedAllRelationshipScanParallel(buffer, jsonPlan, gc); + return; + } catch (const std::exception& e) { + execution_logger.warn("Parallel relationship scan failed, falling back to sequential: " + + std::string(e.what())); + } + } + + string direction = query["direction"]; long centralRelationCount = nodeManager.dbSize(dbPrefix + "_central_relations.db") / RelationBlock::CENTRAL_BLOCK_SIZE; string graphDirection = Utils::getGraphDirection(to_string(gc.graphID), masterIP); @@ -693,39 +877,30 @@ void OperatorExecutor::DirectedAllRelationshipScan(SharedBuffer &buffer, std::st json destNodeData; json relationData; RelationBlock* relation = RelationBlock::getLocalRelation(i * RelationBlock::BLOCK_SIZE); + if (relation->getLocalRelationshipType() != query["relType"]) { + continue; + } NodeBlock* startNode = relation->getSource(); NodeBlock* destNode = relation->getDestination(); std::string startPid(startNode->getMetaPropertyHead()->value); startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json directionData; string start = query["sourceVariable"]; @@ -760,34 +935,22 @@ void OperatorExecutor::DirectedAllRelationshipScan(SharedBuffer &buffer, std::st std::string startPid(startNode->getMetaPropertyHead()->value); startNodeData["partitionID"] = startPid; - std::map startProperties = startNode->getAllProperties(); + std::map> startProperties = startNode->getAllProperties(); for (auto property : startProperties) { startNodeData[property.first] = property.second; } - for (auto& [key, value] : startProperties) { - delete[] value; // Free each allocated char* array - } - startProperties.clear(); std::string destPid(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = destPid; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); - std::map relProperties = relation->getAllProperties(); + std::map> relProperties = relation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); json directionData; string start = query["sourceVariable"]; @@ -817,7 +980,7 @@ void OperatorExecutor::NodeByIdSeek(SharedBuffer &buffer, std::string jsonPlan, json nodeData; std::string value(node->getMetaPropertyHead()->value); if (value == to_string(gc.partitionID)) { - std::map properties = node->getAllProperties(); + std::map> properties = node->getAllProperties(); nodeData["partitionID"] = value; for (auto property : properties) { nodeData[property.first] = property.second; @@ -887,7 +1050,8 @@ void OperatorExecutor::ExpandAll(SharedBuffer &buffer, std::string jsonPlan, Gra json relationData; json destNodeData; - std::map relProperties = nextRelation->getAllProperties(); + std::map> relProperties = + nextRelation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } @@ -904,10 +1068,6 @@ void OperatorExecutor::ExpandAll(SharedBuffer &buffer, std::string jsonPlan, Gra nextRelation = nextRelation->nextLocalDestination(); continue; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); NodeBlock *destNode; if (isSource) { destNode = nextRelation->getDestination(); @@ -916,14 +1076,10 @@ void OperatorExecutor::ExpandAll(SharedBuffer &buffer, std::string jsonPlan, Gra } std::string value(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = value; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); rawObj[relVariable] = relationData; rawObj[destVariable] = destNodeData; @@ -950,7 +1106,8 @@ void OperatorExecutor::ExpandAll(SharedBuffer &buffer, std::string jsonPlan, Gra json relationData; json destNodeData; - std::map relProperties = nextRelation->getAllProperties(); + std::map> relProperties = + nextRelation->getAllProperties(); for (auto property : relProperties) { relationData[property.first] = property.second; } @@ -969,10 +1126,6 @@ void OperatorExecutor::ExpandAll(SharedBuffer &buffer, std::string jsonPlan, Gra continue; } - for (auto& [key, value] : relProperties) { - delete[] value; // Free each allocated char* array - } - relProperties.clear(); NodeBlock *destNode; if (isSource) { destNode = nextRelation->getDestination(); @@ -981,14 +1134,10 @@ void OperatorExecutor::ExpandAll(SharedBuffer &buffer, std::string jsonPlan, Gra } std::string value(destNode->getMetaPropertyHead()->value); destNodeData["partitionID"] = value; - std::map destProperties = destNode->getAllProperties(); + std::map> destProperties = destNode->getAllProperties(); for (auto property : destProperties) { destNodeData[property.first] = property.second; } - for (auto& [key, value] : destProperties) { - delete[] value; // Free each allocated char* array - } - destProperties.clear(); rawObj[relVariable] = relationData; rawObj[destVariable] = destNodeData; buffer.add(rawObj.dump()); @@ -1482,3 +1631,219 @@ void OperatorExecutor::OrderBy(SharedBuffer &buffer, std::string jsonPlan, Graph } } } + +// Helper function for parallel node scanning (to avoid large lambda) +static std::vector processNodeScanChunk( + const WorkChunk& chunk, + const std::vector>& nodeIndices, + const std::string& variable, + const GraphConfig& graphConfig) { + + initializeThreadLocalDBs(graphConfig); // Initialize DBs for this thread + + std::vector results; + results.reserve(chunk.endIndex - chunk.startIndex + 1); + + long start = std::max(0L, chunk.startIndex); + long end = std::min(static_cast(nodeIndices.size()) - 1, chunk.endIndex); + + for (long i = start; i <= end; ++i) { + const auto& [nodeId, addressIndex] = nodeIndices[static_cast(i)]; + + std::unique_ptr node(NodeManager::get(addressIndex, nodeId)); + if (node == nullptr) continue; + + std::string value(node->getMetaPropertyHead()->value); + if (value == to_string(graphConfig.partitionID)) { + json nodeData; + nodeData["partitionID"] = value; + std::map> rawProps = node->getAllProperties(); + + for (const auto& [key, val] : rawProps) { + nodeData[key] = val; + } + + json data; + data[variable] = nodeData; + results.push_back(data.dump()); + } + } + return results; +} + +// Helper function to process node scan by label chunk +static std::vector processNodeByLabelChunk( + const WorkChunk& chunk, + const std::vector>& nodeIndices, + std::string_view targetLabel, + const GraphConfig& graphConfig) { + + initializeThreadLocalDBs(graphConfig); + + std::vector results; + results.reserve(chunk.endIndex - chunk.startIndex + 1); + + long start = std::max(0L, chunk.startIndex); + long end = std::min(static_cast(nodeIndices.size()) - 1, chunk.endIndex); + + for (long i = start; i <= end; ++i) { + const auto& [nodeId, addressIndex] = nodeIndices[static_cast(i)]; + + std::unique_ptr node(NodeManager::get(addressIndex, nodeId)); + if (node == nullptr) continue; + + string label = node->getLabel(); + std::string partitionValue(node->getMetaPropertyHead()->value); + + if (partitionValue == to_string(graphConfig.partitionID) && label == targetLabel) { + json nodeData; + nodeData["partitionID"] = partitionValue; + std::map> rawProps = node->getAllProperties(); + + for (const auto& [key, val] : rawProps) { + nodeData[key] = val; + } + + results.push_back(nodeData.dump()); + } + } + return results; +} + +void OperatorExecutor::AllNodeScanParallel(SharedBuffer &buffer, std::string jsonPlan, const GraphConfig& graphConfig) { + // Use thread pool executor with deterministic merge + json queryJson = json::parse(jsonPlan); + NodeManager nodeManager(graphConfig); + string variable = queryJson["variables"].get(); + + // Collect node indices (ID, AddressIndex) + std::vector> nodeIndices; + nodeIndices.reserve(nodeManager.nodeIndex.size()); + + for (const auto& [nodeId, addressIdx] : nodeManager.nodeIndex) { + nodeIndices.emplace_back(nodeId, addressIdx); + } + + // Define processor using named function + auto processor = [&nodeIndices, &variable, graphConfig](const WorkChunk& chunk) { + return processNodeScanChunk(chunk, nodeIndices, variable, graphConfig); + }; + + // Execute using the thread pool with adaptive chunking + std::vector> chunkResults = + parallelExecutor->processInParallel>( + static_cast(nodeIndices.size()), + processor); + + // Deterministic merge in chunk order and emit + for (const auto& chunkVec : chunkResults) { + for (const auto& row : chunkVec) { + buffer.add(row); + } + } + buffer.add("-1"); +} + +void OperatorExecutor::NodeScanByLabelParallel(SharedBuffer &buffer, std::string jsonPlan, + const GraphConfig& graphConfig) { + json queryParsed = json::parse(jsonPlan); + NodeManager nodeManager(graphConfig); + string targetLabel = queryParsed["Label"].get(); + + // Collect node indices (ID, AddressIndex) + std::vector> nodeIndices; + nodeIndices.reserve(nodeManager.nodeIndex.size()); + + for (const auto& [nodeId, addressIdx] : nodeManager.nodeIndex) { + nodeIndices.emplace_back(nodeId, addressIdx); + } + + auto processor = [&nodeIndices, &targetLabel, graphConfig](const WorkChunk& chunk) { + return processNodeByLabelChunk(chunk, nodeIndices, targetLabel, graphConfig); + }; + + std::vector> chunkResults = + parallelExecutor->processInParallel>( + static_cast(nodeIndices.size()), processor); + + for (const auto& chunkVec : chunkResults) { + for (const auto& row : chunkVec) { + buffer.add(row); + } + } + buffer.add("-1"); +} + +void OperatorExecutor::DirectedAllRelationshipScanParallel(SharedBuffer &buffer, std::string jsonPlan, + const GraphConfig& graphConfig) { + json queryParsed = json::parse(jsonPlan); + NodeManager nodeManager(graphConfig); + const std::string& dbPrefix = nodeManager.getDbPrefix(); + long localRelationCount = NodeManager::dbSize(dbPrefix + "_relations.db") / RelationBlock::BLOCK_SIZE; + + if (parallelExecutor && localRelationCount > 0) { + // Use helper function instead of large lambda + auto processor = [jsonPlan, graphConfig, masterIP = this->masterIP, localRelationCount] + (const WorkChunk& chunk) { + return processRelationshipChunk(chunk, jsonPlan, graphConfig, masterIP, localRelationCount); + }; + + std::vector> chunkResults = + parallelExecutor->processInParallel>( + localRelationCount - 1, + processor); + + for (const auto& chunkVec : chunkResults) { + for (const auto& row : chunkVec) { + buffer.add(row); + } + } + } + + // Central relations - sequential + long centralRelationCount = + NodeManager::dbSize(dbPrefix + "_central_relations.db") / RelationBlock::CENTRAL_BLOCK_SIZE; + string graphDirection = Utils::getGraphDirection(to_string(graphConfig.graphID), masterIP); + bool isDirected = (graphDirection == "TRUE"); + bool isDirectionRight = (queryParsed["direction"] == "right"); + + for (long i = 1; i < centralRelationCount; i++) { + std::unique_ptr relation(RelationBlock::getCentralRelation(i*RelationBlock::CENTRAL_BLOCK_SIZE)); + if (relation->getCentralRelationshipType() != queryParsed["relType"]) { + continue; + } + + std::string pid(relation->getMetaPropertyHead()->value); + if (pid != to_string(graphConfig.partitionID)) { + continue; + } + + NodeBlock* startNode = relation->getSource(); + NodeBlock* destNode = relation->getDestination(); + + std::string startPid(startNode->getMetaPropertyHead()->value); + if (startPid != to_string(graphConfig.partitionID)) { + continue; + } + + json startNodeData = extractNodeDataAndCleanup(startNode); + json destNodeData = extractNodeDataAndCleanup(destNode); + json relationData = extractRelationDataAndCleanup(relation.get()); + + json directionData; + string startVar = queryParsed["sourceVariable"]; + string destVar = queryParsed["destVariable"]; + string relVar = queryParsed["relVariable"]; + + if (isDirectionRight) { + directionData[startVar] = startNodeData; + directionData[destVar] = destNodeData; + } else if (!isDirected) { + directionData[startVar] = destNodeData; + directionData[destVar] = startNodeData; + } + directionData[relVar] = relationData; + buffer.add(directionData.dump()); + } + buffer.add("-1"); +} diff --git a/src/query/processor/cypher/runtime/OperatorExecutor.h b/src/query/processor/cypher/runtime/OperatorExecutor.h index 0694df358..87fdb9b6e 100644 --- a/src/query/processor/cypher/runtime/OperatorExecutor.h +++ b/src/query/processor/cypher/runtime/OperatorExecutor.h @@ -16,6 +16,7 @@ limitations under the License. #include "../../../../nativestore/NodeManager.h" #include "InstanceHandler.h" #include "../util/SharedBuffer.h" +#include "../../executor/IntraPartitionParallelExecutor.h" #include #include @@ -40,6 +41,12 @@ class OperatorExecutor { void Projection(SharedBuffer &buffer, string jsonPlan, GraphConfig gc); void Distinct(SharedBuffer &buffer, string jsonPlan, GraphConfig gc); void OrderBy(SharedBuffer &buffer, string jsonPlan, GraphConfig gc); + + // Parallel processing methods (minimal addition) + void AllNodeScanParallel(SharedBuffer &buffer, string jsonPlan, const GraphConfig& graphConfig); + void NodeScanByLabelParallel(SharedBuffer &buffer, string jsonPlan, const GraphConfig& graphConfig); + void DirectedAllRelationshipScanParallel(SharedBuffer &buffer, string jsonPlan, const GraphConfig& graphConfig); + string masterIP; string queryPlan; GraphConfig gc; @@ -48,6 +55,10 @@ class OperatorExecutor { std::string, GraphConfig)>> methodMap; static void initializeMethodMap(); static const int INTER_OPERATOR_BUFFER_SIZE = 5; + + private: + // Single shared parallel executor instance + static std::unique_ptr parallelExecutor; }; #endif // JASMINEGRAPH_OPERATOREXECUTOR_H diff --git a/src/query/processor/executor/IntraPartitionParallelExecutor.cpp b/src/query/processor/executor/IntraPartitionParallelExecutor.cpp new file mode 100644 index 000000000..c9d69f403 --- /dev/null +++ b/src/query/processor/executor/IntraPartitionParallelExecutor.cpp @@ -0,0 +1,112 @@ +/** + * Copyright 2025 JasmineGraph Team + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "IntraPartitionParallelExecutor.h" +#include "../../../util/Utils.h" + +// DynamicThreadPool Implementation +DynamicThreadPool::DynamicThreadPool() { + // Auto-detect optimal worker count based on CPU cores + optimalWorkerCount = StatisticsCollector::getTotalNumberofCores(); + + // Get configurable max worker count from properties (default: 32) + int maxWorkerCount = 32; + try { + std::string maxWorkerStr = Utils::getJasmineGraphProperty("org.jasminegraph.query.threadpool.maxworkers"); + if (!maxWorkerStr.empty()) { + maxWorkerCount = std::stoi(maxWorkerStr); + } + } catch (...) { + // Use default if property not found or invalid + maxWorkerCount = 32; + } + + // Safety limits: minimum 1, maximum from configuration + optimalWorkerCount = std::max(1, std::min(maxWorkerCount, optimalWorkerCount)); + + // Create worker threads + for (int i = 0; i < optimalWorkerCount; ++i) { + workers.emplace_back([this] { workerFunction(); }); + } +} + +DynamicThreadPool::~DynamicThreadPool() { + { + std::scoped_lock lock(queueMutex); + stop = true; + } + condition.notify_all(); + for (std::thread &worker : workers) { + worker.join(); + } +} + +void DynamicThreadPool::workerFunction() { + for (;;) { + std::function task; + { + std::unique_lock lock(queueMutex); + condition.wait(lock, [this] { return stop || !tasks.empty(); }); + if (stop && tasks.empty()) { + return; + } + task = std::move(tasks.front()); + tasks.pop(); + } + task(); + } +} + +// IntraPartitionParallelExecutor Implementation +IntraPartitionParallelExecutor::IntraPartitionParallelExecutor() { + threadPool = std::make_unique(); + workerCount = threadPool->getWorkerCount(); +} + +size_t IntraPartitionParallelExecutor::calculateOptimalChunkSize(size_t totalItems, int workers) const { + // Target: 2-4 chunks per worker for load balancing + size_t chunksPerWorker = 3; + size_t minChunkSize = 1000; // Minimum to avoid overhead + size_t maxChunkSize = 100000; // Maximum to ensure parallelism + + size_t targetChunks = workers * chunksPerWorker; + size_t chunkSize = totalItems / targetChunks; + + return std::max(minChunkSize, std::min(maxChunkSize, chunkSize)); +} + +bool IntraPartitionParallelExecutor::shouldUseParallelProcessing(size_t dataSize) const { + // Use parallel processing for datasets > 1000 items with multiple workers + return dataSize > 1000 && workerCount > 1; +} + +std::vector IntraPartitionParallelExecutor::createWorkChunks(long totalItems, size_t chunkSize) const { + std::vector chunks; + + for (long start = 0; start < totalItems; start += static_cast(chunkSize)) { + long end = std::min(start + static_cast(chunkSize) - 1, totalItems - 1); + chunks.emplace_back(start, end, end - start + 1); + } + + return chunks; +} + +IntraPartitionParallelExecutor::PerformanceMetrics +IntraPartitionParallelExecutor::getPerformanceMetrics(size_t dataSize) const { + PerformanceMetrics metrics; + metrics.workerCount = workerCount; + metrics.estimatedSpeedup = shouldUseParallelProcessing(dataSize) ? workerCount : 1; + metrics.cpuUtilization = shouldUseParallelProcessing(dataSize) ? 100.0 : (100.0 / workerCount); + metrics.optimalChunkSize = calculateOptimalChunkSize(dataSize, workerCount); + return metrics; +} diff --git a/src/query/processor/executor/IntraPartitionParallelExecutor.h b/src/query/processor/executor/IntraPartitionParallelExecutor.h new file mode 100644 index 000000000..55d42601d --- /dev/null +++ b/src/query/processor/executor/IntraPartitionParallelExecutor.h @@ -0,0 +1,293 @@ +/** + * Copyright 2025 JasmineGraph Team + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef INTRA_PARTITION_PARALLEL_EXECUTOR_H +#define INTRA_PARTITION_PARALLEL_EXECUTOR_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../../../performance/metrics/StatisticsCollector.h" + +/** + * Thread-safe node data structure for parallel processing + * Holds pre-loaded node data to avoid thread safety issues with NodeManager + */ +struct PreloadedNodeData { + std::string nodeId; + std::string partitionId; + std::map> properties; + + PreloadedNodeData(const std::string& id, const std::string& partId, + const std::map>& props) + : nodeId(id), partitionId(partId), properties(props) {} +}; + +/** + * Thread-safe buffer for managing parallel access to partition data + */ +template +class ThreadSafeBuffer { + private: + std::queue buffer; + mutable std::mutex bufferMutex; + std::condition_variable cv; + bool finished = false; + + public: + void push(const T& item) { + std::scoped_lock lock(bufferMutex); + buffer.push(item); + cv.notify_one(); + } + + /** + * Pop an item from the buffer with timeout. + * @param item Output parameter to store the popped item + * @param timeout Maximum time to wait for an item (default: 100ms). + * Callers should handle false returns due to timeout and may need to retry. + * @return true if item was successfully popped, false on timeout or if buffer is finished and empty + */ + bool pop(T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds(100)) { + std::unique_lock lock(bufferMutex); + cv.wait_for(lock, timeout, [this] { + return !buffer.empty() || finished; + }); + + if (!buffer.empty()) { + item = buffer.front(); + buffer.pop(); + return true; + } + return false; + } + + void setFinished() { + std::scoped_lock lock(bufferMutex); + finished = true; + cv.notify_all(); + } + + size_t size() const { + std::scoped_lock lock(bufferMutex); + return buffer.size(); + } + + bool empty() const { + std::scoped_lock lock(bufferMutex); + return buffer.empty() && finished; + } +}; + +/** + * Represents a chunk of work for parallel processing + */ +struct WorkChunk { + long startIndex; + long endIndex; + size_t estimatedSize; + + WorkChunk(long start, long end, size_t size) + : startIndex(start), endIndex(end), estimatedSize(size) {} +}; + +/** + * Dynamic thread pool that adapts to system capabilities + * Automatically detects CPU core count and creates optimal number of workers + */ +class DynamicThreadPool { + private: + std::vector workers; + std::queue> tasks; + std::mutex queueMutex; + std::condition_variable condition; + std::atomic stop{false}; + int optimalWorkerCount; + + public: + DynamicThreadPool(); + ~DynamicThreadPool(); + + template + auto enqueue(F&& f, Args&&... args) + -> std::future>; + + int getWorkerCount() const { return optimalWorkerCount; } + + private: + void workerFunction(); +}; + +/** + * Intra-Partition Parallel Executor + * Main class for executing queries in parallel within a single partition + * Provides automatic hardware detection and adaptive execution + */ +class IntraPartitionParallelExecutor { + private: + std::unique_ptr threadPool; + int workerCount; + + // Adaptive chunk size calculation based on data size and worker count + size_t calculateOptimalChunkSize(size_t totalItems, int workers) const; + + public: + IntraPartitionParallelExecutor(); + ~IntraPartitionParallelExecutor() = default; + + /** + * Execute a task across multiple chunks in parallel + * Template allows for different task types and result types + */ + template + std::vector executeChunkedTasks( + const std::vector& chunks, + TaskFunc taskFunction); + + /** + * Process items in parallel within a partition + * Automatically divides work into optimal chunks + */ + template + std::vector processInParallel( + long totalItemCount, + Processor processor); + + /** + * Check if parallel processing is beneficial for given data size + * Small datasets should use sequential processing to avoid overhead + */ + bool shouldUseParallelProcessing(size_t dataSize) const; + + /** + * Get the number of worker threads (equals CPU core count) + */ + int getWorkerCount() const { return workerCount; } + + /** + * Performance metrics for monitoring and tuning + */ + struct PerformanceMetrics { + int workerCount; + size_t estimatedSpeedup; + double cpuUtilization; + size_t optimalChunkSize; + }; + + PerformanceMetrics getPerformanceMetrics(size_t dataSize) const; + + /** + * Create work chunks for parallel processing + */ + std::vector createWorkChunks(long totalItems, size_t chunkSize) const; + + /** + * Merge results from multiple chunks with deduplication + */ + template + std::vector mergeResults(const std::vector>& chunkResults) const; +}; + +// Template implementations + +template +auto DynamicThreadPool::enqueue(F&& f, Args&&... args) + -> std::future> { + using return_type = typename std::invoke_result_t; + + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...)); + + std::future res = task->get_future(); + { + std::scoped_lock lock(queueMutex); + if (stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); + tasks.emplace([task](){ (*task)(); }); + } + condition.notify_one(); + return res; +} + +template +std::vector IntraPartitionParallelExecutor::executeChunkedTasks( + const std::vector& chunks, + TaskFunc taskFunction) { + + std::vector> futures; + futures.reserve(chunks.size()); + + // Submit all chunks to thread pool + for (const auto& chunk : chunks) { + futures.push_back( + threadPool->enqueue([chunk, taskFunction]() -> ResultType { + return taskFunction(chunk); + })); + } + + // Collect results + std::vector results; + results.reserve(chunks.size()); + + for (auto& future : futures) { + results.push_back(future.get()); + } + + return results; +} + +template +std::vector IntraPartitionParallelExecutor::processInParallel( + long totalItemCount, + Processor processor) { + + // Calculate optimal chunk size + size_t chunkSize = calculateOptimalChunkSize(totalItemCount, workerCount); + + // Create work chunks + std::vector chunks = createWorkChunks(totalItemCount, chunkSize); + + // Execute in parallel + return executeChunkedTasks(chunks, processor); +} + +template +std::vector IntraPartitionParallelExecutor::mergeResults( + const std::vector>& chunkResults) const { + + // Estimate total size for efficiency + size_t totalSize = 0; + for (const auto& chunk : chunkResults) { + totalSize += chunk.size(); + } + + std::vector finalResult; + finalResult.reserve(totalSize); + + // Merge all chunk results + for (const auto& chunk : chunkResults) { + finalResult.insert(finalResult.end(), chunk.begin(), chunk.end()); + } + + return finalResult; +} + +#endif // INTRA_PARTITION_PARALLEL_EXECUTOR_H diff --git a/tests/integration/test.py b/tests/integration/test.py index 27566a3d8..f5805ae82 100644 --- a/tests/integration/test.py +++ b/tests/integration/test.py @@ -191,7 +191,7 @@ def send_and_expect_response_file(conn, test_name, send, expected_file, exit_on_ passed_all = True failed_tests = [] -def test(host, port): +def test(host, port): # pylint: disable=too-many-branches """Test the JasmineGraph server by sending a series of commands and checking the responses.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: @@ -601,6 +601,98 @@ def test(host, port): send_and_expect_response(sock, 'rmgr', b'2', DONE, exit_on_failure=True) send_and_expect_response(sock, 'rmgr', RMGR, SEND, exit_on_failure=True) send_and_expect_response(sock, 'rmgr', b'3', DONE, exit_on_failure=True) + + print() + logging.info( + '[IntraPartition] Testing getAllProperties on small graph (sequential fallback)' + ) + send_and_expect_response(sock, 'cypher', CYPHER, b'Graph ID:', exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'2', b'Input query :', exit_on_failure=True) + # Test that getAllProperties returns all node properties correctly + send_and_expect_response(sock, 'cypher', b'MATCH (n) WHERE n.id = 2 RETURN n', + b'{"n":{"id":"2","label":"Person","name":"Charlie",' + b'"occupation":"IT Engineer","partitionID":"0"}}', + exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'', b'done', exit_on_failure=True) + + print() + logging.info('[IntraPartition] Testing getAllProperties with null values') + send_and_expect_response(sock, 'cypher', CYPHER, b'Graph ID:', exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'2', b'Input query :', exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'MATCH (n:Location) WHERE n.id = 6 RETURN n', + b'{"n":{"category":"Park","id":"6","label":"Location",' + b'"name":"Central Park","partitionID":"0"}}', + exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'', b'done', exit_on_failure=True) + + print() + logging.info('[IntraPartition] Testing getAllProperties multiple nodes (lifetime safety)') + send_and_expect_response(sock, 'cypher', CYPHER, b'Graph ID:', exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'2', b'Input query :', exit_on_failure=True) + # Return multiple nodes to verify no memory corruption or dangling references + query = b'MATCH (n:Person) WHERE n.id < 4 RETURN n.id, n.name ORDER BY n.id ASC' + sock.sendall(query + LINE_END) + print('MATCH (n:Person) WHERE n.id < 4 RETURN n.id, n.name ORDER BY n.id ASC') + # Expecting exactly 4 results - Alice (0), Bob (1), Charlie (2), David (3) + expected_results = [ + b'{"n.id":"0","n.name":"Alice"}', + b'{"n.id":"1","n.name":"Bob"}', + b'{"n.id":"2","n.name":"Charlie"}', + b'{"n.id":"3","n.name":"David"}' + ] + for i, expected in enumerate(expected_results): + if not expect_response(sock, expected + LINE_END): + failed_tests.append(f'[IntraPartition] Multiple nodes - result {i}') + send_and_expect_response(sock, 'cypher', b'', b'done', exit_on_failure=True) + + print() + logging.info( + '[IntraPartition] Testing getAllProperties on large graph (parallel execution)' + ) + send_and_expect_response(sock, 'cypher', CYPHER, b'Graph ID:', exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'4', b'Input query :', exit_on_failure=True) + # Spot check: verify a node query works on large graph + sock.sendall(b'MATCH (n) WHERE n.id = 1 RETURN n' + LINE_END) + print('MATCH (n) WHERE n.id = 1 RETURN n') + response = b'' + while True: + byte = sock.recv(1) + if not byte: + break + response += byte + if response.endswith(b'\r\n') or response.endswith(b'\n'): + break + + if b'"id":"1"' in response: + logging.info('✓ Large graph node query returned results') + else: + logging.warning('Large graph query unexpected response: %s', response[:100]) + failed_tests.append('[IntraPartition] Large graph getAllProperties') + send_and_expect_response(sock, 'cypher', b'', b'done', exit_on_failure=True) + + print() + logging.info('[IntraPartition] Testing relationship getAllProperties') + send_and_expect_response(sock, 'cypher', CYPHER, b'Graph ID:', exit_on_failure=True) + send_and_expect_response(sock, 'cypher', b'4', b'Input query :', exit_on_failure=True) + # Verify relationship scan works + sock.sendall(b'MATCH (n)-[r]->(m) WHERE n.id = 1 RETURN n, r, m' + LINE_END) + print('MATCH (n)-[r]->(m) WHERE n.id = 1 RETURN n, r, m') + response = b'' + while True: + byte = sock.recv(1) + if not byte: + break + response += byte + if response.endswith(b'\r\n') or response.endswith(b'\n'): + break + + if b'"n":' in response and b'"r":' in response and b'"m":' in response: + logging.info('✓ Relationship query returned results with correct structure') + else: + logging.warning('Relationship query unexpected response: %s', response[:100]) + failed_tests.append('[IntraPartition] Relationship structure') + send_and_expect_response(sock, 'cypher', b'', b'done', exit_on_failure=True) + print() logging.info('[Cypher] Testing OrderBy for Large Graph') send_and_expect_response(sock, 'cypher', CYPHER, b'Graph ID:', exit_on_failure=True) diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 6c8c88a6a..99b37ca84 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -7,7 +7,9 @@ set(SOURCES k8s/K8sInterface_test.cpp k8s/K8sWorkerController_test.cpp metadb/SQLiteDBInterface_test.cpp - performancedb/PerformanceSQLiteDBInterface_test.cpp) + performancedb/PerformanceSQLiteDBInterface_test.cpp + query/ThreadSafeBuffer_test.cpp + query/IntraPartitionParallelExecutor_test.cpp) add_executable(${PROJECT_NAME} ${SOURCES}) target_link_libraries(${PROJECT_NAME} gtest gtest_main JasmineGraphLib) diff --git a/tests/unit/Dockerfile b/tests/unit/Dockerfile index 5149601f4..d10811f67 100644 --- a/tests/unit/Dockerfile +++ b/tests/unit/Dockerfile @@ -40,4 +40,4 @@ WORKDIR "${JASMINEGRAPH_HOME}/build" RUN cmake -DCMAKE_BUILD_TYPE=DEBUG .. RUN make -j4 -CMD ["bash", "-c", "LD_LIBRARY_PATH=/usr/local/lib make -j1 coverage && cp coverage.xml ../coverage/"] +CMD ["bash", "-c", "LD_LIBRARY_PATH=/usr/local/lib make -j1 coverage && mkdir -p ../coverage && cp coverage.xml ../coverage/ && echo 'Coverage file copied successfully' || (echo 'Coverage generation failed' && exit 1)"] diff --git a/tests/unit/query/IntraPartitionParallelExecutor_test.cpp b/tests/unit/query/IntraPartitionParallelExecutor_test.cpp new file mode 100644 index 000000000..2df5a1ed2 --- /dev/null +++ b/tests/unit/query/IntraPartitionParallelExecutor_test.cpp @@ -0,0 +1,273 @@ +/** +Copyright 2026 JasmineGraph Team +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ + +#include "../../../src/query/processor/executor/IntraPartitionParallelExecutor.h" +#include "gtest/gtest.h" +#include +#include + +// Test fixture for IntraPartitionParallelExecutor +class IntraPartitionParallelExecutorTest : public ::testing::Test { + protected: + std::unique_ptr executor; + + void SetUp() override { + executor = std::make_unique(); + } +}; + +// Test 1: Small dataset threshold check - should use sequential processing +TEST_F(IntraPartitionParallelExecutorTest, SmallDatasetThreshold) { + size_t smallDataSize = 500; // Below threshold for most systems + + bool useParallel = executor->shouldUseParallelProcessing(smallDataSize); + + EXPECT_FALSE(useParallel) << "Small datasets should not use parallel processing"; +} + +// Test 2: Large dataset should use parallel processing +TEST_F(IntraPartitionParallelExecutorTest, LargeDatasetThreshold) { + size_t largeDataSize = 10000; + + bool useParallel = executor->shouldUseParallelProcessing(largeDataSize); + + EXPECT_TRUE(useParallel) << "Large datasets should use parallel processing"; +} + +// Test 3: Worker count should be positive +TEST_F(IntraPartitionParallelExecutorTest, WorkerCountPositive) { + int workerCount = executor->getWorkerCount(); + + EXPECT_GT(workerCount, 0) << "Worker count should be at least 1"; + EXPECT_LE(workerCount, 64) << "Worker count should be reasonable (not exceed typical core counts)"; +} + +// Test 4: Chunk creation with valid parameters +TEST_F(IntraPartitionParallelExecutorTest, CreateWorkChunksValid) { + long totalItems = 1000; + size_t chunkSize = 250; + + std::vector chunks = executor->createWorkChunks(totalItems, chunkSize); + + // Should create 4 chunks: [0-249], [250-499], [500-749], [750-999] + EXPECT_EQ(chunks.size(), 4); + + // Verify first chunk + EXPECT_EQ(chunks[0].startIndex, 0); + EXPECT_EQ(chunks[0].endIndex, 249); + + // Verify last chunk + EXPECT_EQ(chunks[3].startIndex, 750); + EXPECT_EQ(chunks[3].endIndex, 999); +} + +// Test 5: Chunk creation with non-divisible size +TEST_F(IntraPartitionParallelExecutorTest, CreateWorkChunksNonDivisible) { + long totalItems = 1000; + size_t chunkSize = 300; + + std::vector chunks = executor->createWorkChunks(totalItems, chunkSize); + + // Should create 4 chunks: [0-299], [300-599], [600-899], [900-999] + EXPECT_EQ(chunks.size(), 4); + + // Last chunk should be smaller + EXPECT_EQ(chunks[3].startIndex, 900); + EXPECT_EQ(chunks[3].endIndex, 999); + EXPECT_LT(chunks[3].estimatedSize, chunkSize); +} + +// Test 6: Chunk coverage - all items covered exactly once +TEST_F(IntraPartitionParallelExecutorTest, ChunkCoverageComplete) { + long totalItems = 5000; + size_t chunkSize = 500; + + std::vector chunks = executor->createWorkChunks(totalItems, chunkSize); + + // Track which items are covered + std::vector covered(totalItems, false); + + for (const auto& chunk : chunks) { + for (long i = chunk.startIndex; i <= chunk.endIndex; ++i) { + ASSERT_FALSE(covered[i]) << "Item " << i << " covered multiple times"; + covered[i] = true; + } + } + + // Verify all items covered + for (long i = 0; i < totalItems; ++i) { + EXPECT_TRUE(covered[i]) << "Item " << i << " not covered by any chunk"; + } +} + +// Test 7: Execute chunked tasks - simple sum operation +TEST_F(IntraPartitionParallelExecutorTest, ExecuteChunkedTasksSum) { + long totalItems = 1000; + size_t chunkSize = 250; + + std::vector chunks = executor->createWorkChunks(totalItems, chunkSize); + + // Task: sum numbers in each chunk + auto sumTask = [](const WorkChunk& chunk) -> long { + long sum = 0; + for (long i = chunk.startIndex; i <= chunk.endIndex; ++i) { + sum += i; + } + return sum; + }; + + std::vector results = executor->executeChunkedTasks(chunks, sumTask); + + // Verify we got results for all chunks + EXPECT_EQ(results.size(), chunks.size()); + + // Calculate total sum + long totalSum = 0; + for (long result : results) { + totalSum += result; + } + + // Expected sum: 0 + 1 + 2 + ... + 999 = 999 * 1000 / 2 = 499500 + long expectedSum = (totalItems - 1) * totalItems / 2; + EXPECT_EQ(totalSum, expectedSum); +} + +// Test 8: Merge results without duplicates +TEST_F(IntraPartitionParallelExecutorTest, MergeResultsNoDuplicates) { + std::vector> chunkResults = { + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9} + }; + + std::vector merged = executor->mergeResults(chunkResults); + + // Should have all 9 elements + EXPECT_EQ(merged.size(), 9); + + // Verify elements are in order + for (size_t i = 0; i < merged.size(); ++i) { + EXPECT_EQ(merged[i], static_cast(i + 1)); + } +} + +// Test 9: Performance metrics accuracy +TEST_F(IntraPartitionParallelExecutorTest, PerformanceMetrics) { + size_t dataSize = 10000; + + auto metrics = executor->getPerformanceMetrics(dataSize); + + EXPECT_EQ(metrics.workerCount, executor->getWorkerCount()); + EXPECT_GT(metrics.optimalChunkSize, 0); + EXPECT_GT(metrics.estimatedSpeedup, 0); + EXPECT_GT(metrics.cpuUtilization, 0.0); + EXPECT_LE(metrics.cpuUtilization, 100.0); +} + +// Test 10: Optimal chunk size calculation +TEST_F(IntraPartitionParallelExecutorTest, OptimalChunkSizeReasonable) { + // Test with different data sizes - using larger sizes to ensure parallel execution is viable + std::vector dataSizes = {10000, 100000, 1000000}; + + for (size_t dataSize : dataSizes) { + auto metrics = executor->getPerformanceMetrics(dataSize); + size_t chunkSize = metrics.optimalChunkSize; + + // Chunk size should be reasonable + EXPECT_GT(chunkSize, 0) << "Chunk size should be positive for data size " << dataSize; + EXPECT_LE(chunkSize, dataSize) << "Chunk size should not exceed data size " << dataSize; + + // For larger datasets, chunks should enable parallel execution + size_t numChunks = (dataSize + chunkSize - 1) / chunkSize; + if (dataSize >= 10000) { + EXPECT_GE(numChunks, std::min(static_cast(executor->getWorkerCount()), + dataSize / 1000)) + << "Not enough chunks for efficient parallel processing"; + } + } +} + +// Test 11: Process in parallel - correctness test +TEST_F(IntraPartitionParallelExecutorTest, ProcessInParallelCorrectness) { + long totalItems = 5000; + + // Processor: return the square of the index + auto processor = [](const WorkChunk& chunk) -> std::vector { + std::vector localResults; + for (long i = chunk.startIndex; i <= chunk.endIndex; ++i) { + localResults.push_back(i * i); + } + return localResults; + }; + + std::vector> results = + executor->processInParallel>(totalItems, processor); + + // Merge results + std::vector merged = executor->mergeResults(results); + + // Should have all items + EXPECT_EQ(merged.size(), static_cast(totalItems)); + + // Verify each result is correct square + for (size_t i = 0; i < merged.size(); ++i) { + long expected = static_cast(i) * static_cast(i); + EXPECT_EQ(merged[i], expected) << "Mismatch at index " << i; + } +} + +// Test 12: Edge case - single item +TEST_F(IntraPartitionParallelExecutorTest, SingleItemProcessing) { + long totalItems = 1; + size_t chunkSize = 100; + + std::vector chunks = executor->createWorkChunks(totalItems, chunkSize); + + EXPECT_EQ(chunks.size(), 1); + EXPECT_EQ(chunks[0].startIndex, 0); + EXPECT_EQ(chunks[0].endIndex, 0); +} + +// Test 13: Edge case - empty dataset +TEST_F(IntraPartitionParallelExecutorTest, EmptyDataset) { + long totalItems = 0; + size_t chunkSize = 100; + + std::vector chunks = executor->createWorkChunks(totalItems, chunkSize); + + EXPECT_EQ(chunks.size(), 0) << "No chunks should be created for empty dataset"; +} + +// Test 14: Stress test - large dataset processing +TEST_F(IntraPartitionParallelExecutorTest, StressTestLargeDataset) { + long totalItems = 100000; + + auto processor = [](const WorkChunk& chunk) -> long { + long count = 0; + for (long i = chunk.startIndex; i <= chunk.endIndex; ++i) { + count++; + } + return count; + }; + + std::vector results = + executor->processInParallel(totalItems, processor); + + // Sum all counts + long totalProcessed = 0; + for (long count : results) { + totalProcessed += count; + } + + EXPECT_EQ(totalProcessed, totalItems) << "All items should be processed exactly once"; +} diff --git a/tests/unit/query/ThreadSafeBuffer_test.cpp b/tests/unit/query/ThreadSafeBuffer_test.cpp new file mode 100644 index 000000000..04657cf9b --- /dev/null +++ b/tests/unit/query/ThreadSafeBuffer_test.cpp @@ -0,0 +1,230 @@ +/** +Copyright 2026 JasmineGraph Team +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ + +#include "../../../src/query/processor/executor/IntraPartitionParallelExecutor.h" +#include "gtest/gtest.h" +#include +#include +#include + +using namespace std::chrono_literals; + +// Test fixture for ThreadSafeBuffer +class ThreadSafeBufferTest : public ::testing::Test { + protected: + ThreadSafeBuffer buffer; +}; + +// Test 1: Basic single-producer single-consumer functionality +TEST_F(ThreadSafeBufferTest, SingleProducerSingleConsumer) { + const int NUM_ITEMS = 100; + std::vector produced; + std::vector consumed; + + // Producer thread + std::thread producer([&]() { + for (int i = 0; i < NUM_ITEMS; ++i) { + buffer.push(i); + produced.push_back(i); + } + buffer.setFinished(); + }); + + // Consumer thread + std::thread consumer([&]() { + int item; + while (buffer.pop(item, 1000ms)) { + consumed.push_back(item); + } + }); + + producer.join(); + consumer.join(); + + // Verify all items were consumed in order + ASSERT_EQ(produced.size(), consumed.size()); + for (size_t i = 0; i < produced.size(); ++i) { + EXPECT_EQ(produced[i], consumed[i]); + } +} + +// Test 2: Finished behavior - pop returns false when buffer is empty and finished +TEST_F(ThreadSafeBufferTest, FinishedBehaviorEmptyBuffer) { + buffer.setFinished(); + + int item; + bool result = buffer.pop(item, 100ms); + + EXPECT_FALSE(result) << "pop should return false when buffer is empty and finished"; +} + +// Test 3: Finished behavior - existing items can still be popped +TEST_F(ThreadSafeBufferTest, FinishedBehaviorWithItems) { + // Push some items + buffer.push(1); + buffer.push(2); + buffer.push(3); + + // Mark as finished + buffer.setFinished(); + + // Should still be able to pop existing items + int item; + EXPECT_TRUE(buffer.pop(item, 100ms)); + EXPECT_EQ(item, 1); + + EXPECT_TRUE(buffer.pop(item, 100ms)); + EXPECT_EQ(item, 2); + + EXPECT_TRUE(buffer.pop(item, 100ms)); + EXPECT_EQ(item, 3); + + // Now buffer is empty and finished, should return false + EXPECT_FALSE(buffer.pop(item, 100ms)); +} + +// Test 4: Timeout behavior - pop times out if no items available +TEST_F(ThreadSafeBufferTest, PopTimeout) { + auto start = std::chrono::steady_clock::now(); + + int item; + bool result = buffer.pop(item, 50ms); + + auto elapsed = std::chrono::steady_clock::now() - start; + + EXPECT_FALSE(result) << "pop should return false on timeout"; + EXPECT_GE(elapsed, 50ms) << "pop should wait at least the timeout duration"; + EXPECT_LT(elapsed, 100ms) << "pop should not wait significantly longer than timeout"; +} + +// Test 5: Producer-consumer with varying speeds +TEST_F(ThreadSafeBufferTest, VaryingProducerConsumerSpeeds) { + const int NUM_ITEMS = 50; + std::atomic consumed_count{0}; + + // Slow producer + std::thread producer([&]() { + for (int i = 0; i < NUM_ITEMS; ++i) { + buffer.push(i); + std::this_thread::sleep_for(1ms); // Slow down production + } + buffer.setFinished(); + }); + + // Fast consumer + std::thread consumer([&]() { + int item; + while (buffer.pop(item, 1000ms)) { + consumed_count++; + } + }); + + producer.join(); + consumer.join(); + + EXPECT_EQ(consumed_count.load(), NUM_ITEMS); +} + +// Test 6: Multiple items pushed before consumption +TEST_F(ThreadSafeBufferTest, BulkPushThenConsume) { + const int NUM_ITEMS = 1000; + + // Push all items + for (int i = 0; i < NUM_ITEMS; ++i) { + buffer.push(i); + } + buffer.setFinished(); + + // Consume all items + int count = 0; + int item; + while (buffer.pop(item, 100ms)) { + EXPECT_EQ(item, count); + count++; + } + + EXPECT_EQ(count, NUM_ITEMS); +} + +// Test 7: Size tracking +TEST_F(ThreadSafeBufferTest, SizeTracking) { + EXPECT_EQ(buffer.size(), 0); + + buffer.push(1); + EXPECT_EQ(buffer.size(), 1); + + buffer.push(2); + EXPECT_EQ(buffer.size(), 2); + + int item; + buffer.pop(item, 100ms); + EXPECT_EQ(buffer.size(), 1); + + buffer.pop(item, 100ms); + EXPECT_EQ(buffer.size(), 0); +} + +// Test 8: Empty state behavior +TEST_F(ThreadSafeBufferTest, EmptyState) { + EXPECT_FALSE(buffer.empty()) << "Buffer should not be empty initially (not finished)"; + + buffer.setFinished(); + EXPECT_TRUE(buffer.empty()) << "Buffer should be empty when finished with no items"; + + buffer.push(1); + EXPECT_FALSE(buffer.empty()) << "Buffer should not be empty when it has items"; +} + +// Test 9: Thread safety under concurrent access +TEST_F(ThreadSafeBufferTest, ConcurrentAccess) { + const int NUM_ITEMS = 10000; + std::atomic total_consumed{0}; + + // Multiple producers + auto producer_func = [&](int start, int count) { + for (int i = start; i < start + count; ++i) { + buffer.push(i); + } + }; + + std::thread p1(producer_func, 0, NUM_ITEMS / 2); + std::thread p2(producer_func, NUM_ITEMS / 2, NUM_ITEMS / 2); + + // Give producers time to add items + p1.join(); + p2.join(); + buffer.setFinished(); + + // Consumer + std::thread consumer([&]() { + int item; + while (buffer.pop(item, 1000ms)) { + total_consumed++; + } + }); + + consumer.join(); + + EXPECT_EQ(total_consumed.load(), NUM_ITEMS); +} + +// Test 10: Item uninitialized on timeout (safety check) +TEST_F(ThreadSafeBufferTest, ItemNotSetOnTimeout) { + int item = 999; // Set to known value + + bool result = buffer.pop(item, 50ms); + + EXPECT_FALSE(result); + // Item value is undefined on timeout (could be 999 or changed) + // Main point is that pop returned false, so caller knows not to use item +}