Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions duckdb_java.def
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1arrow_1register
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1arrow_1stream
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1connect
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1appender
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1db_1ref
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1destroy_1db_1ref
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1extension_1type
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1disconnect
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute
Expand Down
2 changes: 2 additions & 0 deletions duckdb_java.exp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ _Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1arrow_1register
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1arrow_1stream
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1connect
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1appender
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1db_1ref
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1destroy_1db_1ref
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1extension_1type
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1disconnect
_Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute
Expand Down
2 changes: 2 additions & 0 deletions duckdb_java.map
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ DUCKDB_JAVA {
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1arrow_1stream;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1connect;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1appender;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1db_1ref;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1destroy_1db_1ref;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1extension_1type;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1disconnect;
Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1execute;
Expand Down
31 changes: 28 additions & 3 deletions src/jni/duckdb_java.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,35 @@ jobject _duckdb_jdbc_startup(JNIEnv *env, jclass, jbyteArray database_j, jboolea
std::unique_ptr<DBConfig> config = create_db_config(env, read_only, props);
bool cache_instance = database != ":memory:" && !database.empty();
auto shared_db = instance_cache.GetOrCreateInstance(database, *config, cache_instance);
auto conn_holder = new ConnectionHolder(shared_db);
auto conn_ref = new ConnectionHolder(shared_db);

return env->NewDirectByteBuffer(conn_holder, 0);
return env->NewDirectByteBuffer(conn_ref, 0);
}

jobject _duckdb_jdbc_connect(JNIEnv *env, jclass, jobject conn_ref_buf) {
auto conn_ref = (ConnectionHolder *)env->GetDirectBufferAddress(conn_ref_buf);
auto conn_ref = get_connection_ref(env, conn_ref_buf);
auto config = ClientConfig::GetConfig(*conn_ref->connection->context);
auto conn = new ConnectionHolder(conn_ref->db);
conn->connection->context->config = config;
return env->NewDirectByteBuffer(conn, 0);
}

jobject _duckdb_jdbc_create_db_ref(JNIEnv *env, jclass, jobject conn_ref_buf) {
auto conn_ref = get_connection_ref(env, conn_ref_buf);
auto db_ref = conn_ref->create_db_ref();
return env->NewDirectByteBuffer(db_ref, 0);
}

void _duckdb_jdbc_destroy_db_ref(JNIEnv *env, jclass, jobject db_ref_buf) {
if (nullptr == db_ref_buf) {
return;
}
auto db_ref = (DBHolder *)env->GetDirectBufferAddress(db_ref_buf);
if (db_ref) {
delete db_ref;
}
}

jstring _duckdb_jdbc_get_schema(JNIEnv *env, jclass, jobject conn_ref_buf) {
auto conn_ref = get_connection(env, conn_ref_buf);
if (!conn_ref) {
Expand Down Expand Up @@ -163,6 +179,9 @@ jobject _duckdb_jdbc_query_progress(JNIEnv *env, jclass, jobject conn_ref_buf) {
}

void _duckdb_jdbc_disconnect(JNIEnv *env, jclass, jobject conn_ref_buf) {
if (nullptr == conn_ref_buf) {
return;
}
auto conn_ref = (ConnectionHolder *)env->GetDirectBufferAddress(conn_ref_buf);
if (conn_ref) {
delete conn_ref;
Expand Down Expand Up @@ -249,13 +268,19 @@ jobject _duckdb_jdbc_execute(JNIEnv *env, jclass, jobject stmt_ref_buf, jobjectA
}

void _duckdb_jdbc_release(JNIEnv *env, jclass, jobject stmt_ref_buf) {
if (nullptr == stmt_ref_buf) {
return;
}
auto stmt_ref = (StatementHolder *)env->GetDirectBufferAddress(stmt_ref_buf);
if (stmt_ref) {
delete stmt_ref;
}
}

void _duckdb_jdbc_free_result(JNIEnv *env, jclass, jobject res_ref_buf) {
if (nullptr == res_ref_buf) {
return;
}
auto res_ref = (ResultHolder *)env->GetDirectBufferAddress(res_ref_buf);
if (res_ref) {
delete res_ref;
Expand Down
21 changes: 21 additions & 0 deletions src/jni/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1connect(JNI
}
}

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1db_1ref(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_create_db_ref(env, param0, param1);
} catch (const std::exception &e) {
duckdb::ErrorData error(e);
ThrowJNI(env, error.Message().c_str());

return nullptr;
}
}

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1destroy_1db_1ref(JNIEnv * env, jclass param0, jobject param1) {
try {
return _duckdb_jdbc_destroy_db_ref(env, param0, param1);
} catch (const std::exception &e) {
duckdb::ErrorData error(e);
ThrowJNI(env, error.Message().c_str());

}
}

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit(JNIEnv * env, jclass param0, jobject param1, jboolean param2) {
try {
return _duckdb_jdbc_set_auto_commit(env, param0, param1, param2);
Expand Down
8 changes: 8 additions & 0 deletions src/jni/functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ jobject _duckdb_jdbc_connect(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1connect(JNIEnv * env, jclass param0, jobject param1);

jobject _duckdb_jdbc_create_db_ref(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1create_1db_1ref(JNIEnv * env, jclass param0, jobject param1);

void _duckdb_jdbc_destroy_db_ref(JNIEnv * env, jclass param0, jobject param1);

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1destroy_1db_1ref(JNIEnv * env, jclass param0, jobject param1);

void _duckdb_jdbc_set_auto_commit(JNIEnv * env, jclass param0, jobject param1, jboolean param2);

JNIEXPORT void JNICALL Java_org_duckdb_DuckDBNative_duckdb_1jdbc_1set_1auto_1commit(JNIEnv * env, jclass param0, jobject param1, jboolean param2);
Expand Down
38 changes: 31 additions & 7 deletions src/jni/holders.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@

#include <jni.h>

/**
* Holds a copy of a shared_ptr to an existing DB instance.
* Is used to keep this DB alive (and accessible from DB cache)
* even after the last connection to this DB is closed.
*/
struct DBHolder {
duckdb::shared_ptr<duckdb::DuckDB> db;

DBHolder(duckdb::shared_ptr<duckdb::DuckDB> _db) : db(std::move(_db)) {};

DBHolder(const DBHolder &) = delete;

DBHolder &operator=(const DBHolder &) = delete;
};

/**
* Associates a duckdb::Connection with a duckdb::DuckDB. The DB may be shared amongst many ConnectionHolders, but the
* Connection is unique to this holder. Every Java DuckDBConnection has exactly 1 of these holders, and they are never
Expand All @@ -17,6 +32,10 @@ struct ConnectionHolder {
ConnectionHolder(duckdb::shared_ptr<duckdb::DuckDB> _db)
: db(_db), connection(duckdb::make_uniq<duckdb::Connection>(*_db)) {
}

DBHolder *create_db_ref() {
return new DBHolder(db);
}
};

struct StatementHolder {
Expand All @@ -28,17 +47,22 @@ struct ResultHolder {
duckdb::unique_ptr<duckdb::DataChunk> chunk;
};

/**
* Throws a SQLException and returns nullptr if a valid Connection can't be retrieved from the buffer.
*/
inline duckdb::Connection *get_connection(JNIEnv *env, jobject conn_ref_buf) {
inline ConnectionHolder *get_connection_ref(JNIEnv *env, jobject conn_ref_buf) {
if (!conn_ref_buf) {
throw duckdb::ConnectionException("Invalid connection");
throw duckdb::ConnectionException("Invalid connection buffer ref");
}
auto conn_holder = (ConnectionHolder *)env->GetDirectBufferAddress(conn_ref_buf);
auto conn_holder = reinterpret_cast<ConnectionHolder *>(env->GetDirectBufferAddress(conn_ref_buf));
if (!conn_holder) {
throw duckdb::ConnectionException("Invalid connection");
throw duckdb::ConnectionException("Invalid connection buffer");
}
return conn_holder;
}

/**
* Throws a SQLException and returns nullptr if a valid Connection can't be retrieved from the buffer.
*/
inline duckdb::Connection *get_connection(JNIEnv *env, jobject conn_ref_buf) {
auto conn_holder = get_connection_ref(env, conn_ref_buf);
auto conn_ref = conn_holder->connection.get();
if (!conn_ref || !conn_ref->context) {
throw duckdb::ConnectionException("Invalid connection");
Expand Down
17 changes: 5 additions & 12 deletions src/main/java/org/duckdb/DuckDBConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.duckdb.DuckDBDriver.JDBC_AUTO_COMMIT;
import static org.duckdb.JdbcUtils.isStringTruish;
import static org.duckdb.JdbcUtils.removeOption;
import static org.duckdb.JdbcUtils.*;

import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -49,19 +48,13 @@ public final class DuckDBConnection implements java.sql.Connection {

public static DuckDBConnection newConnection(String url, boolean readOnly, Properties properties)
throws SQLException {
if (!url.startsWith("jdbc:duckdb:")) {
throw new SQLException("DuckDB JDBC URL needs to start with 'jdbc:duckdb:'");
}
String db_dir = url.substring("jdbc:duckdb:".length()).trim();
if (db_dir.length() == 0) {
db_dir = ":memory:";
}
if (db_dir.startsWith("memory:")) {
db_dir = ":" + db_dir;
if (null == properties) {
properties = new Properties();
}
String dbName = dbNameFromUrl(url);
String autoCommitStr = removeOption(properties, JDBC_AUTO_COMMIT);
boolean autoCommit = isStringTruish(autoCommitStr, true);
ByteBuffer nativeReference = DuckDBNative.duckdb_jdbc_startup(db_dir.getBytes(UTF_8), readOnly, properties);
ByteBuffer nativeReference = DuckDBNative.duckdb_jdbc_startup(dbName.getBytes(UTF_8), readOnly, properties);
return new DuckDBConnection(nativeReference, url, readOnly, autoCommit);
}

Expand Down
96 changes: 86 additions & 10 deletions src/main/java/org/duckdb/DuckDBDriver.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.duckdb;

import static org.duckdb.JdbcUtils.isStringTruish;
import static org.duckdb.JdbcUtils.removeOption;
import static org.duckdb.JdbcUtils.*;

import java.nio.ByteBuffer;
import java.sql.*;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import java.util.regex.Pattern;
Expand All @@ -17,15 +15,21 @@ public class DuckDBDriver implements java.sql.Driver {
public static final String DUCKDB_USER_AGENT_PROPERTY = "custom_user_agent";
public static final String JDBC_STREAM_RESULTS = "jdbc_stream_results";
public static final String JDBC_AUTO_COMMIT = "jdbc_auto_commit";
public static final String JDBC_PIN_DB = "jdbc_pin_db";

private static final String DUCKDB_URL_PREFIX = "jdbc:duckdb:";
static final String DUCKDB_URL_PREFIX = "jdbc:duckdb:";

private static final String DUCKLAKE_OPTION = "ducklake";
private static final String DUCKLAKE_ALIAS_OPTION = "ducklake_alias";
private static final Pattern DUCKLAKE_ALIAS_OPTION_PATTERN = Pattern.compile("[a-zA-Z0-9_]+");
private static final String DUCKLAKE_URL_PREFIX = "ducklake:";
private static final ReentrantLock DUCKLAKE_INIT_LOCK = new ReentrantLock();

private static final LinkedHashMap<String, ByteBuffer> pinnedDbRefs = new LinkedHashMap<>();
private static final ReentrantLock pinnedDbRefsLock = new ReentrantLock();
private static boolean pinnedDbRefsShutdownHookRegistered = false;
private static boolean pinnedDbRefsShutdownHookRun = false;

static {
try {
DriverManager.registerDriver(new DuckDBDriver());
Expand Down Expand Up @@ -60,12 +64,17 @@ public Connection connect(String url, Properties info) throws SQLException {
// to be established.
info.remove("path");

String pinDbOptStr = removeOption(info, JDBC_PIN_DB);
boolean pinDBOpt = isStringTruish(pinDbOptStr, false);

String ducklake = removeOption(info, DUCKLAKE_OPTION);
String ducklakeAlias = removeOption(info, DUCKLAKE_ALIAS_OPTION);

Connection conn = DuckDBConnection.newConnection(url, readOnly, info);
DuckDBConnection conn = DuckDBConnection.newConnection(url, readOnly, info);

initDucklake(conn, url, ducklake, ducklakeAlias);
pinDB(pinDBOpt, url, conn);

initDucklake(conn, ducklake, ducklakeAlias);

return conn;
}
Expand Down Expand Up @@ -95,8 +104,7 @@ public Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException("no logger");
}

private static void initDucklake(Connection conn, String url, String ducklake, String ducklakeAlias)
throws SQLException {
private static void initDucklake(Connection conn, String ducklake, String ducklakeAlias) throws SQLException {
if (null == ducklake) {
return;
}
Expand Down Expand Up @@ -154,6 +162,55 @@ private static ParsedProps parsePropsFromUrl(String url) throws SQLException {
return new ParsedProps(shortUrl, props);
}

private static void pinDB(boolean pinnedDbOpt, String url, DuckDBConnection conn) throws SQLException {
if (!pinnedDbOpt) {
return;
}
String dbName = dbNameFromUrl(url);
if (":memory:".equals(dbName)) {
return;
}

pinnedDbRefsLock.lock();
try {
// Actual native DB cache uses absolute paths to file DBs,
// but that should not make the difference unless CWD is changed,
// that is not expected for a JVM process, see JDK-4045688.
if (pinnedDbRefsShutdownHookRun || pinnedDbRefs.containsKey(dbName)) {
return;
}
// No need to hold connRef lock here, this connection is not
// yet available to client at this point, so it cannot be closed.
ByteBuffer dbRef = DuckDBNative.duckdb_jdbc_create_db_ref(conn.connRef);
pinnedDbRefs.put(dbName, dbRef);

if (!pinnedDbRefsShutdownHookRegistered) {
Runtime.getRuntime().addShutdownHook(new Thread(new PinnedDbRefsShutdownHook()));
pinnedDbRefsShutdownHookRegistered = true;
}
} finally {
pinnedDbRefsLock.unlock();
}
}

public static boolean releaseDB(String url) throws SQLException {
pinnedDbRefsLock.lock();
try {
if (pinnedDbRefsShutdownHookRun) {
return false;
}
String dbName = dbNameFromUrl(url);
ByteBuffer dbRef = pinnedDbRefs.remove(dbName);
if (null == dbRef) {
return false;
}
DuckDBNative.duckdb_jdbc_destroy_db_ref(dbRef);
return true;
} finally {
pinnedDbRefsLock.unlock();
}
}

private static class ParsedProps {
final String shortUrl;
final LinkedHashMap<String, String> props;
Expand All @@ -167,4 +224,23 @@ private ParsedProps(String shortUrl, LinkedHashMap<String, String> props) {
this.props = props;
}
}

private static class PinnedDbRefsShutdownHook implements Runnable {
@Override
public void run() {
pinnedDbRefsLock.lock();
try {
List<ByteBuffer> dbRefsList = new ArrayList<>(pinnedDbRefs.values());
Collections.reverse(dbRefsList);
for (ByteBuffer dbRef : dbRefsList) {
DuckDBNative.duckdb_jdbc_destroy_db_ref(dbRef);
}
pinnedDbRefsShutdownHookRun = true;
} catch (SQLException e) {
e.printStackTrace();
} finally {
pinnedDbRefsLock.unlock();
}
}
}
}
6 changes: 5 additions & 1 deletion src/main/java/org/duckdb/DuckDBNative.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ final class DuckDBNative {
static native ByteBuffer duckdb_jdbc_startup(byte[] path, boolean read_only, Properties props) throws SQLException;

// returns conn_ref connection reference object
static native ByteBuffer duckdb_jdbc_connect(ByteBuffer db_ref) throws SQLException;
static native ByteBuffer duckdb_jdbc_connect(ByteBuffer conn_ref) throws SQLException;

static native ByteBuffer duckdb_jdbc_create_db_ref(ByteBuffer conn_ref) throws SQLException;

static native void duckdb_jdbc_destroy_db_ref(ByteBuffer db_ref) throws SQLException;

static native void duckdb_jdbc_set_auto_commit(ByteBuffer conn_ref, boolean auto_commit) throws SQLException;

Expand Down
Loading