Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions duckdb_java.def
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1destroy_1db_1ref
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1extension_1type
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1disconnect
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute_1pending
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1fetch
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1cast_1result_1to_1strings
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1fetch_1size
Expand All @@ -38,11 +39,13 @@ Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1catalog
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1profiling_1information
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1schema
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1interrupt
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1pending_1query
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepared_1statement_1meta
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release_1pending
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1catalog
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1schema
Expand Down
3 changes: 3 additions & 0 deletions duckdb_java.exp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ _Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1destroy_1db_1ref
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1extension_1type
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1disconnect
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute_1pending
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1fetch
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1cast_1result_1to_1strings
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1fetch_1size
Expand All @@ -35,11 +36,13 @@ _Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1catalog
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1profiling_1information
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1schema
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1interrupt
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1pending_1query
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepared_1statement_1meta
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release_1pending
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1catalog
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1schema
Expand Down
3 changes: 3 additions & 0 deletions duckdb_java.map
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ DUCKDB_JAVA {
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1extension_1type;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1disconnect;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute_1pending;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1fetch;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1cast_1result_1to_1strings;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1fetch_1size;
Expand All @@ -37,11 +38,13 @@ DUCKDB_JAVA {
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1profiling_1information;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1get_1schema;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1interrupt;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1pending_1query;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepared_1statement_1meta;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1progress;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release_1pending;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1catalog;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1schema;
Expand Down
77 changes: 68 additions & 9 deletions src/jni/duckdb_java.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,54 @@ jobject _duckdb_jdbc_prepare(JNIEnv *env, jclass, jobject conn_ref_buf, jbyteArr
}
}

auto stmt_ref = new StatementHolder();
auto stmt_ref = make_uniq<StatementHolder>();
stmt_ref->stmt = conn_ref->Prepare(std::move(statements.back()));
if (stmt_ref->stmt->HasError()) {
string error_msg = string(stmt_ref->stmt->GetError());
stmt_ref->stmt = nullptr;

// No success, so it must be deleted
delete stmt_ref;
ThrowJNI(env, error_msg.c_str());
return nullptr;
}
return env->NewDirectByteBuffer(stmt_ref.release(), 0);
}

// Just return control flow back to JVM, as an Exception is pending anyway
jobject _duckdb_jdbc_pending_query(JNIEnv *env, jclass, jobject conn_ref_buf, jbyteArray query_j) {
auto conn_ref = get_connection(env, conn_ref_buf);
if (!conn_ref) {
return nullptr;
}
return env->NewDirectByteBuffer(stmt_ref, 0);

auto query = jbyteArray_to_string(env, query_j);

auto statements = conn_ref->ExtractStatements(query.c_str());
if (statements.empty()) {
throw InvalidInputException("No statements to execute.");
}

// if there are multiple statements, we directly execute the statements besides the last one
// we only return the result of the last statement to the user, unless one of the previous statements fails
for (idx_t i = 0; i + 1 < statements.size(); i++) {
auto res = conn_ref->Query(std::move(statements[i]));
if (res->HasError()) {
res->ThrowError();
}
}

Value result;
bool stream_results =
conn_ref->context->TryGetCurrentSetting("jdbc_stream_results", result) ? result.GetValue<bool>() : false;
QueryParameters query_parameters;
query_parameters.output_type =
stream_results ? QueryResultOutputType::ALLOW_STREAMING : QueryResultOutputType::FORCE_MATERIALIZED;

auto pending_ref = make_uniq<PendingHolder>();
pending_ref->pending = conn_ref->PendingQuery(std::move(statements.back()), query_parameters);

return env->NewDirectByteBuffer(pending_ref.release(), 0);
}

jobject _duckdb_jdbc_execute(JNIEnv *env, jclass, jobject stmt_ref_buf, jobjectArray params) {
auto stmt_ref = (StatementHolder *)env->GetDirectBufferAddress(stmt_ref_buf);
auto stmt_ref = reinterpret_cast<StatementHolder *>(env->GetDirectBufferAddress(stmt_ref_buf));
if (!stmt_ref) {
throw InvalidInputException("Invalid statement");
}
Expand Down Expand Up @@ -269,21 +299,50 @@ jobject _duckdb_jdbc_execute(JNIEnv *env, jclass, jobject stmt_ref_buf, jobjectA
return env->NewDirectByteBuffer(res_ref.release(), 0);
}

jobject _duckdb_jdbc_execute_pending(JNIEnv *env, jclass, jobject pending_ref_buf) {
auto pending_ref = reinterpret_cast<PendingHolder *>(env->GetDirectBufferAddress(pending_ref_buf));
if (!pending_ref) {
throw InvalidInputException("Invalid pending query");
}

auto res_ref = make_uniq<ResultHolder>();
res_ref->res = pending_ref->pending->Execute();
if (res_ref->res->HasError()) {
std::string error_msg = std::string(res_ref->res->GetError());
duckdb::ExceptionType error_type = res_ref->res->GetErrorType();
res_ref->res = nullptr;
jclass exc_type = duckdb::ExceptionType::INTERRUPT == error_type ? J_SQLTimeoutException : J_SQLException;
env->ThrowNew(exc_type, error_msg.c_str());
return nullptr;
}
return env->NewDirectByteBuffer(res_ref.release(), 0);
}

void _duckdb_jdbc_release(JNIEnv *env, jclass, jobject stmt_ref_buf) {
if (nullptr == stmt_ref_buf) {
return;
}
auto stmt_ref = (StatementHolder *)env->GetDirectBufferAddress(stmt_ref_buf);
auto stmt_ref = reinterpret_cast<StatementHolder *>(env->GetDirectBufferAddress(stmt_ref_buf));
if (stmt_ref) {
delete stmt_ref;
}
}

void _duckdb_jdbc_release_pending(JNIEnv *env, jclass, jobject pending_ref_buf) {
if (nullptr == pending_ref_buf) {
return;
}
auto pending_ref = reinterpret_cast<PendingHolder *>(env->GetDirectBufferAddress(pending_ref_buf));
if (pending_ref) {
delete pending_ref;
}
}

void _duckdb_jdbc_free_result(JNIEnv *env, jclass, jobject res_ref_buf) {
if (nullptr == res_ref_buf) {
return;
}
auto res_ref = (ResultHolder *)env->GetDirectBufferAddress(res_ref_buf);
auto res_ref = reinterpret_cast<ResultHolder *>(env->GetDirectBufferAddress(res_ref_buf));
if (res_ref) {
delete res_ref;
}
Expand Down
32 changes: 32 additions & 0 deletions src/jni/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare(JNI
}
}

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1pending_1query(JNIEnv * env, jclass param0, jobject param1, jbyteArray param2) {
try {
return _duckdb_jdbc_pending_query(env, param0, param1, param2);
} catch (const std::exception &e) {
duckdb::ErrorData error(e);
ThrowJNI(env, error.Message().c_str());

return nullptr;
}
}

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_release(env, param0, param1);
Expand All @@ -141,6 +152,16 @@ JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release(JNIEnv
}
}

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release_1pending(JNIEnv * env, jclass param0, jobject param1) {
try {
_duckdb_jdbc_release_pending(env, param0, param1);
} catch (const std::exception &e) {
duckdb::ErrorData error(e);
ThrowJNI(env, error.Message().c_str());

}
}

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_query_result_meta(env, param0, param1);
Expand Down Expand Up @@ -174,6 +195,17 @@ JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute(JNI
}
}

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute_1pending(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_execute_pending(env, param0, param1);
} catch (const std::exception &e) {
duckdb::ErrorData error(e);
ThrowJNI(env, error.Message().c_str());

return nullptr;
}
}

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1free_1result(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_free_result(env, param0, param1);
Expand Down
12 changes: 12 additions & 0 deletions src/jni/functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,18 @@ jobject _duckdb_jdbc_prepare(JNIEnv * env, jclass param0, jobject param1, jbyteA

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1prepare(JNIEnv * env, jclass param0, jobject param1, jbyteArray param2);

jobject _duckdb_jdbc_pending_query(JNIEnv * env, jclass param0, jobject param1, jbyteArray param2);

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1pending_1query(JNIEnv * env, jclass param0, jobject param1, jbyteArray param2);

void _duckdb_jdbc_release(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release(JNIEnv * env, jclass param0, jobject param1);

void _duckdb_jdbc_release_pending(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1release_1pending(JNIEnv * env, jclass param0, jobject param1);

jobject _duckdb_jdbc_query_result_meta(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1query_1result_1meta(JNIEnv * env, jclass param0, jobject param1);
Expand All @@ -73,6 +81,10 @@ jobject _duckdb_jdbc_execute(JNIEnv * env, jclass param0, jobject param1, jobjec

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute(JNIEnv * env, jclass param0, jobject param1, jobjectArray param2);

jobject _duckdb_jdbc_execute_pending(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute_1pending(JNIEnv * env, jclass param0, jobject param1);

void _duckdb_jdbc_free_result(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1free_1result(JNIEnv * env, jclass param0, jobject param1);
Expand Down
4 changes: 4 additions & 0 deletions src/jni/holders.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ struct StatementHolder {
duckdb::unique_ptr<duckdb::PreparedStatement> stmt;
};

struct PendingHolder {
duckdb::unique_ptr<duckdb::PendingQueryResult> pending;
};

struct ResultHolder {
duckdb::unique_ptr<duckdb::QueryResult> res;
duckdb::unique_ptr<duckdb::DataChunk> chunk;
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/duckdb/DuckDBConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class DuckDBConnection implements java.sql.Connection {

ByteBuffer connRef;
final ReentrantLock connRefLock = new ReentrantLock();
final LinkedHashSet<DuckDBPendingQuery> pendingQueries = new LinkedHashSet<>();
final LinkedHashSet<DuckDBPreparedStatement> preparedStatements = new LinkedHashSet<>();
final LinkedHashSet<DuckDBAppender> appenders = new LinkedHashSet<>();
volatile boolean closing;
Expand Down Expand Up @@ -145,6 +146,14 @@ public void close() throws SQLException {
// suppress
}

// Last pending query created is first deleted
List<DuckDBPendingQuery> pendingList = new ArrayList<>(pendingQueries);
Collections.reverse(pendingList);
for (DuckDBPendingQuery pending : pendingList) {
pending.close();
}
pendingQueries.clear();

// Last statement created is first deleted
List<DuckDBPreparedStatement> psList = new ArrayList<>(preparedStatements);
Collections.reverse(psList);
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/duckdb/DuckDBNative.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ private static void loadFromCurrentJarDir(String libName) throws Exception {
// returns res_ref result reference object
static native ByteBuffer duckdb_jdbc_execute(ByteBuffer stmt_ref, Object[] params) throws SQLException;

static native ByteBuffer duckdb_jdbc_pending_query(ByteBuffer conn_ref, byte[] query) throws SQLException;

static native ByteBuffer duckdb_jdbc_execute_pending(ByteBuffer pending_ref) throws SQLException;

static native void duckdb_jdbc_release_pending(ByteBuffer pending_ref) throws SQLException;

static native void duckdb_jdbc_free_result(ByteBuffer res_ref);

static native DuckDBVector[] duckdb_jdbc_fetch(ByteBuffer res_ref, ByteBuffer conn_ref) throws SQLException;
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/org/duckdb/DuckDBPendingQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.duckdb;

import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.concurrent.locks.ReentrantLock;

class DuckDBPendingQuery {
private DuckDBConnection conn;
ByteBuffer pendingRef = null;
final ReentrantLock pendingRefLock = new ReentrantLock();

DuckDBPendingQuery(DuckDBConnection conn, ByteBuffer pendingRef) {
this.conn = conn;
this.pendingRef = pendingRef;
this.conn.connRefLock.lock();
try {
this.conn.pendingQueries.add(this);
} finally {
this.conn.connRefLock.unlock();
}
}

void close() throws SQLException {
if (pendingRef == null) {
return;
}
pendingRefLock.lock();
try {
if (pendingRef == null) {
return;
}
DuckDBNative.duckdb_jdbc_release_pending(pendingRef);
pendingRef = null;
} finally {
pendingRefLock.unlock();
}

// Untrack pending query from parent connection,
// if 'closing' flag is set it means that the parent connection itself
// is being closed and we don't need to untrack this instance
if (!conn.closing) {
conn.connRefLock.lock();
try {
conn.pendingQueries.remove(this);
} finally {
conn.connRefLock.unlock();
}
}
}
}
Loading