Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f7772ac
Add gold folder
ParameswaranSajeenthiran Jan 11, 2026
8787a13
Add gold folder
ParameswaranSajeenthiran Jan 11, 2026
cbf63d9
Add gold folder
ParameswaranSajeenthiran Jan 11, 2026
57c7bff
Increase timeout in kg construction timeout
ParameswaranSajeenthiran Jan 11, 2026
9fbe8b6
Add types in the tuples in the kg mock service response
ParameswaranSajeenthiran Jan 11, 2026
e4ae857
Add types in the tuples in the kg mock service response
ParameswaranSajeenthiran Jan 14, 2026
9909e0b
Add Uvicorn fastapi python dependencies
ParameswaranSajeenthiran Jan 14, 2026
63a6aec
Add setup_gemma3_ollama
ParameswaranSajeenthiran Jan 14, 2026
8218dd5
Rename gemma embedding url in conf
ParameswaranSajeenthiran Jan 14, 2026
07ad37c
Remove graph rag k8s integration test
ParameswaranSajeenthiran Jan 14, 2026
148e9bb
Add info log to debug text embedder
ParameswaranSajeenthiran Jan 15, 2026
f891621
Rename gemma hostname in docker network
ParameswaranSajeenthiran Jan 15, 2026
bf514bc
Remove ollama setup script execution in graphRAG tests
ParameswaranSajeenthiran Jan 15, 2026
fb9c53d
Fix port mapping in test-docker.sh
ParameswaranSajeenthiran Jan 15, 2026
923cb20
Inspect integration_jasminegraph_net
ParameswaranSajeenthiran Jan 15, 2026
c78649c
Change the port of embedding model
ParameswaranSajeenthiran Jan 15, 2026
2245f4b
attach the jamsinegraph workers to the integration network
ParameswaranSajeenthiran Jan 15, 2026
d5782fd
attach the jamsinegraph workers to the integration network
ParameswaranSajeenthiran Jan 15, 2026
ed9b6e3
attach the jamsinegraph workers to the integration network
ParameswaranSajeenthiran Jan 15, 2026
fc77c8b
Increase test timeout in order to embed the tuples
ParameswaranSajeenthiran Jan 17, 2026
1b12899
Revert adhdfs to not embed the graph
ParameswaranSajeenthiran Jan 17, 2026
9ede1c2
Remove unwanted info logs
ParameswaranSajeenthiran Jan 25, 2026
06da1c4
Add Gold test folder
ParameswaranSajeenthiran Jan 25, 2026
6f8f380
Merge remote-tracking branch 'origin/raw-kg-contruction' into raw-kg-…
ParameswaranSajeenthiran Jan 25, 2026
cff93e1
Fix adhdfs property graph upload
ParameswaranSajeenthiran Jan 25, 2026
28c6994
Revert properties conf file
ParameswaranSajeenthiran Jan 25, 2026
d2344ec
Fix .sh lint issues
ParameswaranSajeenthiran Jan 25, 2026
e6ed7dd
Increase timeout in hdfs validation
ParameswaranSajeenthiran Jan 26, 2026
f79f81a
Increase timeout in hdfs validation
ParameswaranSajeenthiran Jan 26, 2026
5e04c95
Add hdfs validation log
ParameswaranSajeenthiran Jan 26, 2026
1584cb4
Revert startremoteWorkers image tag
ParameswaranSajeenthiran Jan 26, 2026
5098e23
Fix cpp style issues
ParameswaranSajeenthiran Jan 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 15 additions & 59 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ jobs:
- name: Docker Build
run: docker build -t jasminegraph .

- uses: actions/setup-python@v4
with:
python-version: 3.11
- name: Install python dependencies Build
run: |
pip install fastapi uvicorn httpx

# 4. Start mock Ollama server in the background
- name: Start Mock LLM Server
run: |
nohup uvicorn mock_ollama_server:app --host 0.0.0.0 --port 11450 &

# 5. Wait a few seconds for server to be ready
- run: sleep 2

- name: Run Integration Tests
run: |
chmod +x test-docker.sh
Expand Down Expand Up @@ -129,62 +144,3 @@ jobs:
mkdir /var/tmp/worker0 /var/tmp/worker1
chmod +x test-k8s.sh
TEST_NAME="main" ./test-k8s.sh

k8s-integration-tests-graph-rag:
runs-on: ubuntu-latest
timeout-minutes: 20
if: ${{!contains(github.event.pull_request.labels.*.name, 'Skip k8s integration')}}

steps:
- uses: actions/checkout@v4
with:
ref: ${{github.head_ref}}
repository: ${{ github.event.pull_request.head.repo.full_name || github.repository }}

- uses: jupyterhub/action-k3s-helm@v4
with:
docker-enabled: true
network-policy: false

- name: Show system resources and StorageClass
run: |
echo "=== CPU Info ==="
lscpu | grep -E '^CPU\(s\)|^Model name'
echo
echo "=== Memory Info ==="
free -h
echo
echo "=== Disk Info ==="
df -h
echo
echo "=== Kubernetes StorageClasses ==="
kubectl get storageclass
kubectl describe storageclass

- name: Grant permissions for default user
run: kubectl apply -f ./k8s/rbac.yaml

- uses: actions/setup-python@v4
with:
python-version: 3.11
- name: Install python dependencies Build
run: |
pip install fastapi uvicorn httpx

# 4. Start mock Ollama server in the background
- name: Start Mock LLM Server
run: |
nohup uvicorn mock_ollama_server:app --host 0.0.0.0 --port 11450 &

# 5. Wait a few seconds for server to be ready
- run: sleep 2

- name: Docker Build
run: docker build -t jasminegraph .


- name: K8S integration tests
run: |
mkdir /var/tmp/worker0 /var/tmp/worker1
chmod +x test-k8s.sh
TEST_NAME="graphRAG" ./test-k8s.sh
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,6 @@ if(NOT GENERATED_SRC)
set(GENERATED_SRC ""
src/frontend/JasmineGraphFrontEnd.cpp
src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp
src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp
src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp
src/frontend/JasmineGraphFrontEnd.cpp)
endif()

Expand Down
10 changes: 8 additions & 2 deletions conf/jasminegraph-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ org.jasminegraph.autopartition.enabled=false
#--------------------------------------------------------------------------------

#This parameter holds the maximum label size of Node Block
org.jasminegraph.nativestore.max.label.size=256
org.jasminegraph.nativestore.max.label.size=43


# knowledge graph construction
Expand All @@ -133,5 +133,11 @@ org.jasminegraph.query.threadpool.maxworkers=32
org.jasminegraph.vectorstore.enabled=true
org.jasminegraph.vectorstore.dimension=512
org.jasminegraph.vectorstore.embedding.model=jina/jina-embeddings-v2-small-en
org.jasminegraph.vectorstore.embedding.ollama.endpoint=http://gemma3_container:11441
org.jasminegraph.vectorstore.embedding.ollama.endpoint=http://gemma3:11434
#
#org.jasminegraph.vectorstore.dimension=768
#org.jasminegraph.vectorstore.embedding.model=nomic-embed-text
#org.jasminegraph.vectorstore.embedding.ollama.endpoint=https://sajeenthiranp-21--nomic-embedder-embeddings-serve.modal.run
##org.jasminegraph.vectorstore.embedding.ollama.endpoint=http://192.168.1.19:11441
##org.jasminegraph.vectorstore.embedding.ollama.endpoint=http://10.8.100.248:11441
#
4 changes: 2 additions & 2 deletions conf/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ scrape_configs:
- job_name: "prometheus"
scrape_interval: 5s
static_configs:
- targets: ["prometheus:9090"]
- targets: ["10.8.100.248:9090"]

- job_name: "pushgateway"
scrape_interval: 2s
static_configs:
- targets: ["pushgateway:9091"]
- targets: ["10.8.100.248:9091"]

6 changes: 3 additions & 3 deletions mock_ollama_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Copyright 2025 JasmineGraph Team
"""Copyright 2026 JasmineGraph Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Expand Down Expand Up @@ -40,8 +40,8 @@ async def streamer( model: str):

# Example array-of-arrays tuples
tuples = [
["Radio City", "is", "India's first private FM radio station"],
["Radio City", "was started on", "3 July 2001", "Organization", "Date"],
["Radio City", "is", "India's first private FM radio station" ,"Organization", "Description"],
["Radio City", "was started on", "3 July 2001", "Organization", "Date", ],
["Radio City", "broadcasts on", "91.1", "Organization", "Frequency"]
]

Expand Down
111 changes: 104 additions & 7 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,45 @@ bool JasmineGraphFrontEnd::constructKGStreamHDFSCommand(std::string masterIP, in
std::string hdfsFilePathS(hdfsFilePath);
hdfsFilePathS = Utils::trim_copy(hdfsFilePathS);

CURL* curl = curl_easy_init();
if (!curl) {
frontend_logger.error("Failed to initialize CURL");
*loop_exit_p = true;
return false;
}

std::string url =
"http://" + hdfsServerIp + ":9870/webhdfs/v1/?op=GETHOMEDIRECTORY";

long http_code = 0;

curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_NOBODY, 1L); // no body
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10L); // wait up to 10s to connect
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30L); // max 30s for whole request
curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1L); // HTTP 4xx/5xx => error
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); // avoid DNS delays

CURLcode res = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
curl_easy_cleanup(curl);

// Immediate failure
if (res != CURLE_OK || http_code == 0) {
frontend_logger.error(
"HDFS file System Not reachable at: " +
hdfsServerIp + " port: " + hdfsPort);
frontend_logger.error("CURL response code: " +
std::to_string(res));
std::string error_message = "HDFS file System Not reachable.";
write(connFd, error_message.c_str(), error_message.length());
write(connFd,
Conts::CARRIAGE_RETURN_NEW_LINE.c_str(),
Conts::CARRIAGE_RETURN_NEW_LINE.size());

*loop_exit_p = true;
return false;
}
HDFSConnector* hdfsConnector = new HDFSConnector(hdfsServerIp, hdfsPort);

if (!hdfsConnector->isPathValid(hdfsFilePathS)) {
Expand All @@ -2051,11 +2090,15 @@ bool JasmineGraphFrontEnd::constructKGStreamHDFSCommand(std::string masterIP, in

std::string path = "hdfs:" + hdfsFilePathS;
double_t total_file_size = hdfsGetPathInfo(hdfsConnector->getFileSystem(), hdfsFilePathS.c_str())->mSize;
std::time_t time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::string uploadStartTime = ctime(&time);
std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
struct tm localTime;
localtime_r(&now, &localTime); // thread-safe version of localtime()
char buffer[100];
strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &localTime);
std::string uploadStartTime(buffer);


// 2. Prepare new graph insertion
std::time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
uploadStartTime.erase(uploadStartTime.find_last_not_of(Conts::CARRIAGE_RETURN_NEW_LINE) + 1); // remove newline

std::string llmRunnerMSG = "LLM runner hostname:port: ";
Expand All @@ -2078,6 +2121,12 @@ bool JasmineGraphFrontEnd::constructKGStreamHDFSCommand(std::string masterIP, in
std::string hostnamePortS(hostnamePort);
hostnamePortS = Utils::trim_copy(hostnamePortS);

if (hostnamePortS.find("exit") != std::string::npos) {
*loop_exit_p = true;
return false;
}


frontend_logger.info("Received LLM runners: " + hostnamePortS);

std::string llmInferenceMSG = "LLM inference engine? ollama/vllm? ";
Expand All @@ -2102,6 +2151,49 @@ bool JasmineGraphFrontEnd::constructKGStreamHDFSCommand(std::string masterIP, in

frontend_logger.info("received Inference Engine: " + llmInferenceEngineS);

vector<std::string> llmServers = Utils::getUniqueLLMRunners(hostnamePortS);

for (auto llmServer : llmServers) {
std::string url;
bool modelFound = false;
std::string endpointPath;
if (llmInferenceEngineS == "ollama") {
endpointPath = "api/tags";
} else if (llmInferenceEngineS == "vllm") {
endpointPath = "/v1/models";
} else {
frontend_logger.error("Unknown inference engine: " + llmInferenceEngineS);
std::string msg = "Unknown inference engine '" + llmInferenceEngineS + "'";
write(connFd, msg.c_str(), msg.length());
write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
*loop_exit_p = true;
return false;
}

url = Utils::normalizeURL(llmServer, endpointPath);
frontend_logger.info("Final LLM endpoint: " + url);

CURL* curl = curl_easy_init();
if (curl) {
std::string response;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 5L);

CURLcode res = curl_easy_perform(curl);
curl_easy_cleanup(curl);

if (res != CURLE_OK) {
frontend_logger.error("Failed to reach " + llmInferenceEngineS + " server at " + llmServer);
std::string msg = "Could not connect to " + llmInferenceEngineS + " server.";
write(connFd, msg.c_str(), msg.length());
write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
*loop_exit_p = true;
return false;
}
}
}
std::string LLM_MSG = "What is the LLM you want to use?:";
resultWr = write(connFd, LLM_MSG.c_str(), LLM_MSG.length());
if (resultWr < 0) {
Expand All @@ -2122,9 +2214,10 @@ bool JasmineGraphFrontEnd::constructKGStreamHDFSCommand(std::string masterIP, in
std::string llmS(llm);
llmS = Utils::trim_copy(llmS);
frontend_logger.info("Received LLM " + llmS);

vector<std::string> llmServers = Utils::getUniqueLLMRunners(hostnamePortS);

if (llmS.find("exit") != std::string::npos) {
*loop_exit_p = true;
return false;
}
for (auto llmServer : llmServers) {
std::string url;
bool modelFound = false;
Expand Down Expand Up @@ -2338,7 +2431,11 @@ bool JasmineGraphFrontEnd::constructKGStreamHDFSCommand(std::string masterIP, in
}

std::time_t time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::string uploadEndTime = ctime(&time);
struct tm localTime;
localtime_r(&time, &localTime);
char buffer[100];
strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &localTime);
std::string uploadEndTime(buffer);

std::string sqlStatementUpdateEndTime = "UPDATE graph SET upload_end_time = \"" + uploadEndTime +
"\" WHERE idgraph = " + std::to_string(newGraphID);
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/ui/JasmineGraphFrontEndUI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,9 @@ static void semantic_beam_search_command(int connFd, std::string command, int nu
*loop_exit_p = true;
return;
}
string done = R"({"done":"true"})";
write(connFd, done.c_str(), done.size());
write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());

ui_frontend_logger.info("Semantic beam search completed successfully for graph " + graph_id);
}
Expand Down
10 changes: 5 additions & 5 deletions src/knowledgegraph/construction/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ json Pipeline::processTupleAndSaveInPartition(const std::vector<std::unique_ptr<
using namespace std::chrono;

auto nextTick = steady_clock::now();

while (metaThreadRunning.load(std::memory_order_relaxed)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
while (metaThreadRunning.load(std::memory_order_relaxed)) {
std::this_thread::sleep_for(std::chrono::milliseconds(60000));

kg_pipeline_stream_handler_logger.debug("Meta thread running");

Expand Down Expand Up @@ -804,7 +804,7 @@ void Pipeline::extractTuples(std::string host, int port, std::string masterIP, i

char ack3[ACK_MESSAGE_SIZE] = {0};
int converted_number = htonl(chunk.length());
kg_pipeline_stream_handler_logger.info("Sending chunk length: " +
kg_pipeline_stream_handler_logger.debug("Sending chunk length: " +
std::to_string(chunk.length()));
if (!Utils::sendIntExpectResponse(sockfd, ack3,
JasmineGraphInstanceProtocol::GRAPH_STREAM_C_length_ACK.length(),
Expand All @@ -816,7 +816,7 @@ void Pipeline::extractTuples(std::string host, int port, std::string masterIP, i
break;
}

kg_pipeline_stream_handler_logger.info("Sending chunk data");
kg_pipeline_stream_handler_logger.debug("Sending chunk data");
if (!Utils::send_str_wrapper(sockfd, chunk)) {
kg_pipeline_stream_handler_logger.error("Failed to send chunk data");
retry = true;
Expand All @@ -840,7 +840,7 @@ void Pipeline::extractTuples(std::string host, int port, std::string masterIP, i
break;
}

kg_pipeline_stream_handler_logger.info("Sending currentTraceContext data:" +currentTraceContext);
kg_pipeline_stream_handler_logger.debug("Sending currentTraceContext data:" +currentTraceContext);
if (!Utils::send_str_wrapper(sockfd, currentTraceContext)) {
kg_pipeline_stream_handler_logger.error("Failed to send chunk data");
retry = true;
Expand Down
10 changes: 5 additions & 5 deletions src/knowledgegraph/construction/VLLMTupleStreamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ size_t VLLMTupleStreamer::StreamCallback(char* ptr, size_t size, size_t nmemb,
auto triple = json::parse(ctx->current_tuple);

if (!triple.is_array() || triple.size() < 5) {
vllm_tuple_streamer_logger.error(
vllm_tuple_streamer_logger.warn(
"Invalid tuple size detected. Retrying entire chunk.");

ctx->isSuccess = false;
Expand Down Expand Up @@ -158,10 +158,10 @@ size_t VLLMTupleStreamer::StreamCallback(char* ptr, size_t size, size_t nmemb,
"✅ Added formatted triple: " + formattedTriple.dump());
}
} catch (const std::exception& ex) {
vllm_tuple_streamer_logger.error(
vllm_tuple_streamer_logger.warn(
"❌ JSON array parse failed: " + std::string(ex.what()) + ". Invalid Tuple: " + std::string
(ctx->current_tuple));
vllm_tuple_streamer_logger.error(
vllm_tuple_streamer_logger.warn(
"Invalid tuple detected. Retrying entire chunk.");

ctx->isSuccess = false;
Expand All @@ -176,7 +176,7 @@ size_t VLLMTupleStreamer::StreamCallback(char* ptr, size_t size, size_t nmemb,
}
}
} catch (const std::exception& ex) {
vllm_tuple_streamer_logger.debug("JSON parse error: " +
vllm_tuple_streamer_logger.warn("JSON parse error: " +
std::string(ex.what()));
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@ void VLLMTupleStreamer::streamChunk(const std::string& chunkKey,
jsonRequest["max_tokens"] = 10000;

std::string postFields = jsonRequest.dump();
vllm_tuple_streamer_logger.info("Post fields: " + postFields);
// vllm_tuple_streamer_logger.info("Post fields: " + postFields);
ctx.current_tuple = "";
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postFields.c_str());
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, postFields.size());
Expand Down
Loading