Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions duckdb_java.def
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1interrupt
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepared_1statement_1meta
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1catalog
Expand Down
1 change: 1 addition & 0 deletions duckdb_java.exp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ _Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1interrupt
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepared_1statement_1meta
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1catalog
Expand Down
1 change: 1 addition & 0 deletions duckdb_java.map
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ DUCKDB_JAVA {
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepared_1statement_1meta;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1catalog;
Expand Down
13 changes: 13 additions & 0 deletions src/jni/duckdb_java.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
extern "C" {
#include "duckdb.h"
}
#include "config.hpp"
#include "duckdb.hpp"
#include "duckdb/catalog/catalog_search_path.hpp"
Expand Down Expand Up @@ -149,6 +152,16 @@ void _duckdb_jdbc_interrupt(JNIEnv *env, jclass, jobject conn_ref_buf) {
conn_ref->Interrupt();
}

jobject _duckdb_jdbc_query_progress(JNIEnv *env, jclass, jobject conn_ref_buf) {
auto conn_ref = get_connection(env, conn_ref_buf);
if (!conn_ref) {
return nullptr;
}
duckdb_query_progress_type qpc = duckdb_query_progress(reinterpret_cast<duckdb_connection>(conn_ref));
return env->NewObject(J_QueryProgress, J_QueryProgress_init, static_cast<jdouble>(qpc.percentage),
uint64_to_jlong(qpc.rows_processed), uint64_to_jlong(qpc.total_rows_to_process));
}

void _duckdb_jdbc_disconnect(JNIEnv *env, jclass, jobject conn_ref_buf) {
auto conn_ref = (ConnectionHolder *)env->GetDirectBufferAddress(conn_ref_buf);
if (conn_ref) {
Expand Down
10 changes: 10 additions & 0 deletions src/jni/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1interrupt(JNIE
}
}

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_query_progress(env, param0, param1);
} catch (const std::exception &e) {
duckdb::ErrorData error(e);
ThrowJNI(env, error.Message().c_str());
return nullptr;
}
}

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1appender_1close(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_appender_close(env, param0, param1);
Expand Down
4 changes: 4 additions & 0 deletions src/jni/functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ void _duckdb_jdbc_interrupt(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1interrupt(JNIEnv * env, jclass param0, jobject param1);

jobject _duckdb_jdbc_query_progress(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress(JNIEnv * env, jclass param0, jobject param1);

void _duckdb_jdbc_appender_close(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1appender_1close(JNIEnv * env, jclass param0, jobject param1);
Expand Down
6 changes: 6 additions & 0 deletions src/jni/refs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ jobject J_ProfilerPrintFormat_NO_OUTPUT;
jobject J_ProfilerPrintFormat_HTML;
jobject J_ProfilerPrintFormat_GRAPHVIZ;

jclass J_QueryProgress;
jmethodID J_QueryProgress_init;

static std::vector<jobject> global_refs;

template <typename T>
Expand Down Expand Up @@ -280,6 +283,9 @@ void create_refs(JNIEnv *env) {
make_static_object_field_ref(env, J_ProfilerPrintFormat, "HTML", "Lorg/duckdb/ProfilerPrintFormat;");
J_ProfilerPrintFormat_GRAPHVIZ =
make_static_object_field_ref(env, J_ProfilerPrintFormat, "GRAPHVIZ", "Lorg/duckdb/ProfilerPrintFormat;");

J_QueryProgress = make_class_ref(env, "org/duckdb/QueryProgress");
J_QueryProgress_init = get_method_id(env, J_QueryProgress, "<init>", "(DJJ)V");
}

void delete_global_refs(JNIEnv *env) noexcept {
Expand Down
3 changes: 3 additions & 0 deletions src/jni/refs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ extern jobject J_ProfilerPrintFormat_NO_OUTPUT;
extern jobject J_ProfilerPrintFormat_HTML;
extern jobject J_ProfilerPrintFormat_GRAPHVIZ;

extern jclass J_QueryProgress;
extern jmethodID J_QueryProgress_init;

void create_refs(JNIEnv *env);

void delete_global_refs(JNIEnv *env) noexcept;
10 changes: 10 additions & 0 deletions src/jni/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

#include "refs.hpp"

#include <cstdint>
#include <limits>

void check_java_exception_and_rethrow(JNIEnv *env) {
if (env->ExceptionCheck()) {
jthrowable exc = env->ExceptionOccurred();
Expand Down Expand Up @@ -42,3 +45,10 @@ jobject decode_charbuffer_to_jstring(JNIEnv *env, const char *d_str, idx_t d_str
auto j_str = env->CallObjectMethod(j_cb, J_CharBuffer_toString);
return j_str;
}

jlong uint64_to_jlong(uint64_t value) {
if (value <= std::numeric_limits<int64_t>::max()) {
return static_cast<jlong>(value);
}
return static_cast<jlong>(std::numeric_limits<int64_t>::max());
}
2 changes: 2 additions & 0 deletions src/jni/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ std::string byte_array_to_string(JNIEnv *env, jbyteArray ba_j);
std::string jstring_to_string(JNIEnv *env, jstring string_j);

jobject decode_charbuffer_to_jstring(JNIEnv *env, const char *d_str, idx_t d_str_len);

jlong uint64_to_jlong(uint64_t value);
11 changes: 11 additions & 0 deletions src/main/java/org/duckdb/DuckDBConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,15 @@ void interrupt() throws SQLException {
connRefLock.unlock();
}
}

QueryProgress queryProgress() throws SQLException {
checkOpen();
connRefLock.lock();
try {
checkOpen();
return DuckDBNative.duckdb_jdbc_query_progress(connRef);
} finally {
connRefLock.unlock();
}
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/duckdb/DuckDBNative.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ static native ByteBuffer duckdb_jdbc_create_appender(ByteBuffer conn_ref, byte[]

static native void duckdb_jdbc_interrupt(ByteBuffer conn_ref);

static native QueryProgress duckdb_jdbc_query_progress(ByteBuffer conn_ref);

static native void duckdb_jdbc_appender_close(ByteBuffer appender_ref) throws SQLException;

static native void duckdb_jdbc_appender_append_boolean(ByteBuffer appender_ref, boolean value) throws SQLException;
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/duckdb/DuckDBPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ public void setQueryTimeout(int seconds) throws SQLException {

@Override
public void cancel() throws SQLException {
checkOpen();
try {
// Cancel is intended to be called concurrently with execute,
// thus we cannot take the statement lock that is held while
Expand All @@ -452,6 +453,19 @@ public void cancel() throws SQLException {
}
}

public QueryProgress getQueryProgress() throws SQLException {
checkOpen();
try {
// getQueryProgress is intended to be called concurrently with execute,
// thus we cannot take the statement lock that is held while
// query is running. NPE may be thrown if connection is closed
// concurrently.
return conn.queryProgress();
} catch (NullPointerException e) {
throw new SQLException(e);
}
}

@Override
public SQLWarning getWarnings() throws SQLException {
checkOpen();
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/org/duckdb/QueryProgress.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.duckdb;

import java.util.Objects;
import java.util.StringJoiner;

public class QueryProgress {
private final double percentage;
private final long rowsProcessed;
private final long totalRowsToProcess;

QueryProgress(double percentage, long rowsProcessed, long totalRowsToProcess) {
this.percentage = percentage;
this.rowsProcessed = rowsProcessed;
this.totalRowsToProcess = totalRowsToProcess;
}

public double getPercentage() {
return percentage;
}

public long getRowsProcessed() {
return rowsProcessed;
}

public long getTotalRowsToProcess() {
return totalRowsToProcess;
}

@Override
public String toString() {
return new StringJoiner(", ", QueryProgress.class.getSimpleName() + "[", "]")
.add("percentage=" + percentage)
.add("rowsProcessed=" + rowsProcessed)
.add("totalRowsToProcess=" + totalRowsToProcess)
.toString();
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
QueryProgress that = (QueryProgress) o;
return Double.compare(that.percentage, percentage) == 0 && rowsProcessed == that.rowsProcessed &&
totalRowsToProcess == that.totalRowsToProcess;
}

@Override
public int hashCode() {
return Objects.hash(percentage, rowsProcessed, totalRowsToProcess);
}
}
47 changes: 47 additions & 0 deletions src/test/java/org/duckdb/TestDuckDBJDBC.java
Original file line number Diff line number Diff line change
Expand Up @@ -3437,6 +3437,53 @@ public static void test_get_profiling_information() throws Exception {
}
}

public static void test_query_progress() throws Exception {
try (Connection conn = DriverManager.getConnection(JDBC_URL);
DuckDBPreparedStatement stmt = conn.createStatement().unwrap(DuckDBPreparedStatement.class)) {

QueryProgress qpBefore = stmt.getQueryProgress();
assertEquals(qpBefore.getPercentage(), (double) -1);
assertEquals(qpBefore.getRowsProcessed(), 0L);
assertEquals(qpBefore.getTotalRowsToProcess(), 0L);

stmt.execute("CREATE TABLE test_fib1(i bigint, p double, f double)");
stmt.execute("INSERT INTO test_fib1 values(1, 0, 1)");
stmt.execute("SET enable_progress_bar = true");

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<QueryProgress> future = executorService.submit(new Callable<QueryProgress>() {
@Override
public QueryProgress call() throws Exception {
try {
Thread.sleep(1000);
QueryProgress qp = stmt.getQueryProgress();
stmt.cancel();
return qp;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
});
assertThrows(
()
-> stmt.executeQuery(
"WITH RECURSIVE cte AS ("
+
"SELECT * from test_fib1 UNION ALL SELECT cte.i + 1, cte.f, cte.p + cte.f from cte WHERE cte.i < 150000) "
+ "SELECT avg(f) FROM cte"),
SQLException.class);

QueryProgress qpRunning = future.get();
assertNotNull(qpRunning);
assertEquals(qpRunning.getPercentage(), (double) 25);
assertEquals(qpRunning.getRowsProcessed(), 1L);
assertEquals(qpRunning.getTotalRowsToProcess(), 4L);

assertThrows(stmt::getQueryProgress, SQLException.class);
}
}

public static void main(String[] args) throws Exception {
String arg1 = args.length > 0 ? args[0] : "";
final int statusCode;
Expand Down