Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ public static ImmutableMap<String, String> buildPropertiesMap(
if (!isNullOrEmpty(connectionParamString)) {
String[] urlParts = connectionParamString.split(DatabricksJdbcConstants.URL_DELIMITER);
for (String urlPart : urlParts) {
String[] pair = urlPart.split(DatabricksJdbcConstants.PAIR_DELIMITER);
if (pair.length == 1) {
pair = new String[] {pair[0], ""};
}
if (pair[0].startsWith(DatabricksJdbcUrlParams.HTTP_HEADERS.getParamName())) {
parametersBuilder.put(pair[0], pair[1]);
// Split on first '=' only — values (like httpPath) may contain '=' (e.g. ?o=123)
int delimIdx = urlPart.indexOf(DatabricksJdbcConstants.PAIR_DELIMITER);
String key = delimIdx >= 0 ? urlPart.substring(0, delimIdx) : urlPart;
String value = delimIdx >= 0 ? urlPart.substring(delimIdx + 1) : "";
if (key.startsWith(DatabricksJdbcUrlParams.HTTP_HEADERS.getParamName())) {
parametersBuilder.put(key, value);
} else {
parametersBuilder.put(pair[0].toLowerCase(), pair[1]);
parametersBuilder.put(key.toLowerCase(), value);
}
}
}
Expand Down Expand Up @@ -1167,14 +1167,39 @@ private String getParameter(DatabricksJdbcUrlParams key, String defaultValue) {
return this.parameters.getOrDefault(key.getParamName().toLowerCase(), defaultValue);
}

private static final String ORG_ID_HEADER = "x-databricks-org-id";

private Map<String, String> parseCustomHeaders(ImmutableMap<String, String> parameters) {
String filterPrefix = DatabricksJdbcUrlParams.HTTP_HEADERS.getParamName();

return parameters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(filterPrefix))
.collect(
Collectors.toMap(
entry -> entry.getKey().substring(filterPrefix.length()), Map.Entry::getValue));
Map<String, String> headers =
new HashMap<>(
parameters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(filterPrefix))
.collect(
Collectors.toMap(
entry -> entry.getKey().substring(filterPrefix.length()),
Map.Entry::getValue)));

// Extract org ID from ?o= in httpPath for SPOG routing
if (!headers.containsKey(ORG_ID_HEADER)) {
String httpPath =
parameters.getOrDefault(
DatabricksJdbcUrlParams.HTTP_PATH.getParamName().toLowerCase(), "");
int queryStart = httpPath.indexOf('?');
if (queryStart >= 0) {
String queryString = httpPath.substring(queryStart + 1);
for (String param : queryString.split("&")) {
String[] kv = param.split("=", 2);
if (kv.length == 2 && "o".equals(kv[0]) && !kv[1].isEmpty()) {
headers.put(ORG_ID_HEADER, kv[1]);
break;
}
}
}
}

return headers;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ CreateUploadUrlResponse getCreateUploadUrlResponse(String objectPath)
CreateUploadUrlRequest request = new CreateUploadUrlRequest(objectPath);
try {
Request req = new Request(Request.POST, CREATE_UPLOAD_URL_PATH, apiClient.serialize(request));
req.withHeaders(JSON_HTTP_HEADERS);
req.withHeaders(JSON_HTTP_HEADERS).withHeaders(connectionContext.getCustomHeaders());
return apiClient.execute(req, CreateUploadUrlResponse.class);
} catch (IOException | DatabricksException e) {
String errorMessage =
Expand All @@ -514,7 +514,7 @@ CreateDownloadUrlResponse getCreateDownloadUrlResponse(String objectPath)
try {
Request req =
new Request(Request.POST, CREATE_DOWNLOAD_URL_PATH, apiClient.serialize(request));
req.withHeaders(JSON_HTTP_HEADERS);
req.withHeaders(JSON_HTTP_HEADERS).withHeaders(connectionContext.getCustomHeaders());
return apiClient.execute(req, CreateDownloadUrlResponse.class);
} catch (IOException | DatabricksException e) {
String errorMessage =
Expand All @@ -534,7 +534,7 @@ CreateDeleteUrlResponse getCreateDeleteUrlResponse(String objectPath)

try {
Request req = new Request(Request.POST, CREATE_DELETE_URL_PATH, apiClient.serialize(request));
req.withHeaders(JSON_HTTP_HEADERS);
req.withHeaders(JSON_HTTP_HEADERS).withHeaders(connectionContext.getCustomHeaders());
return apiClient.execute(req, CreateDeleteUrlResponse.class);
} catch (IOException | DatabricksException e) {
String errorMessage =
Expand All @@ -551,7 +551,7 @@ ListResponse getListResponse(String listPath) throws DatabricksVolumeOperationEx
ListRequest request = new ListRequest(listPath);
try {
Request req = new Request(Request.GET, LIST_PATH);
req.withHeaders(JSON_HTTP_HEADERS);
req.withHeaders(JSON_HTTP_HEADERS).withHeaders(connectionContext.getCustomHeaders());
ApiClient.setQuery(req, request);
return apiClient.execute(req, ListResponse.class);
} catch (IOException | DatabricksException e) {
Expand Down Expand Up @@ -888,6 +888,7 @@ private CompletableFuture<CreateUploadUrlResponse> requestPresignedUrlWithRetry(
Map<String, String> authHeaders = workspaceClient.config().authenticate();
authHeaders.forEach(requestBuilder::addHeader);
JSON_HTTP_HEADERS.forEach(requestBuilder::addHeader);
connectionContext.getCustomHeaders().forEach(requestBuilder::addHeader);

requestBuilder.setEntity(
AsyncEntityProducers.create(requestBody.getBytes(), ContentType.APPLICATION_JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ public final class DatabricksJdbcConstants {
"(?:/([^;]*))?"
+ // Optional Schema (captured without /)
"(?:;(.*))?"); // Optional Property=Value pairs (captured without leading ;)
public static final Pattern HTTP_WAREHOUSE_PATH_PATTERN = Pattern.compile(".*/warehouses/(.+)");
public static final Pattern HTTP_ENDPOINT_PATH_PATTERN = Pattern.compile(".*/endpoints/(.+)");
public static final Pattern HTTP_WAREHOUSE_PATH_PATTERN =
Pattern.compile(".*/warehouses/([^?&]+)");
public static final Pattern HTTP_ENDPOINT_PATH_PATTERN = Pattern.compile(".*/endpoints/([^?&]+)");
public static final Pattern HTTP_CLI_PATTERN = Pattern.compile(".*cliservice(.+)");
public static final Pattern HTTP_PATH_CLI_PATTERN = Pattern.compile("cliservice");
public static final Pattern TEST_PATH_PATTERN = Pattern.compile("jdbc:databricks://test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private void refreshAllFeatureFlags() {
.getDatabricksConfig()
.authenticate()
.forEach(request::addHeader);
connectionContext.getCustomHeaders().forEach(request::addHeader);
fetchAndSetFlagsFromServer(httpClient, request);
} catch (Exception e) {
LOGGER.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void pushEvent(TelemetryRequest request) throws Exception {
Map<String, String> authHeaders =
isAuthenticated ? databricksConfig.authenticate() : Collections.emptyMap();
authHeaders.forEach(post::addHeader);
connectionContext.getCustomHeaders().forEach(post::addHeader);
try (CloseableHttpResponse response = httpClient.execute(post)) {
// TODO: check response and add retry for partial failures
if (!HttpUtil.isSuccessfulHttpResponse(response)) {
Expand Down