Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/appkit/src/connectors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from "./genie";
export * from "./lakebase";
export * from "./lakebase-v1";
export * from "./sql-warehouse";
export * from "./vector-search";
178 changes: 178 additions & 0 deletions packages/appkit/src/connectors/vector-search/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { Context, type WorkspaceClient } from "@databricks/sdk-experimental";
import type { TelemetryOptions } from "shared";
import { createLogger } from "../../logging/logger";
import {
type Span,
SpanKind,
SpanStatusCode,
TelemetryManager,
} from "../../telemetry";
import type { TelemetryProvider } from "../../telemetry";
import type {
VectorSearchConnectorConfig,
VsNextPageParams,
VsQueryParams,
VsRawResponse,
} from "./types";

const logger = createLogger("connectors:vector-search");

export class VectorSearchConnector {
private readonly config: Required<VectorSearchConnectorConfig>;
private readonly telemetry: TelemetryProvider;

constructor(config: VectorSearchConnectorConfig = {}) {
this.config = {
timeout: config.timeout ?? 30_000,
};
Comment on lines +25 to +27
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't you intend to use ../defaults.ts instead of hardcoding the default here?

this.telemetry = TelemetryManager.getProvider(
"vector-search",
config.telemetry,
);
}

async query(
workspaceClient: WorkspaceClient,
params: VsQueryParams,
signal?: AbortSignal,
): Promise<VsRawResponse> {
if (signal?.aborted) {
throw new Error("Query cancelled before execution");
}

const body: Record<string, unknown> = {
columns: params.columns,
num_results: params.numResults,
query_type: params.queryType.toUpperCase(),
debug_level: 1,
};

if (params.queryText) body.query_text = params.queryText;
if (params.queryVector) body.query_vector = params.queryVector;
if (params.filters && Object.keys(params.filters).length > 0) {
body.filters = params.filters;
}
if (params.reranker) {
body.reranker = {
model: "databricks_reranker",
parameters: { columns_to_rerank: params.reranker.columnsToRerank },
};
}

logger.debug(
"Querying VS index %s (type=%s, num_results=%d)",
params.indexName,
params.queryType,
params.numResults,
);

return this.telemetry.startActiveSpan(
"vector-search.query",
{
kind: SpanKind.CLIENT,
attributes: {
"db.system": "databricks",
"vs.index_name": params.indexName,
"vs.query_type": params.queryType,
"vs.num_results": params.numResults,
"vs.has_filters": !!(
params.filters && Object.keys(params.filters).length > 0
),
"vs.has_reranker": !!params.reranker,
},
},
async (span: Span) => {
const startTime = Date.now();
try {
const response = (await workspaceClient.apiClient.request({
method: "POST",
path: `/api/2.0/vector-search/indexes/${params.indexName}/query`,
body,
headers: new Headers({ "Content-Type": "application/json" }),
raw: false,
query: {},
})) as VsRawResponse;
Comment on lines +87 to +94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some abort-control checks and the timeout is defined in the config. But the query is not passing the signal to the request.

Am I missing something? 🤔


const duration = Date.now() - startTime;
span.setAttribute("vs.result_count", response.result.row_count);
span.setAttribute("vs.query_time_ms", response.debug_info?.response_time ?? 0);
span.setAttribute("vs.duration_ms", duration);
span.setStatus({ code: SpanStatusCode.OK });

logger.event()?.setContext("vector-search", {
index_name: params.indexName,
query_type: params.queryType,
result_count: response.result.row_count,
query_time_ms: response.debug_info?.response_time ?? 0,
duration_ms: duration,
});

return response;
} catch (error) {
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : String(error),
});
throw error;
}
},
{ name: "vector-search", includePrefix: true },
);
}

async queryNextPage(
workspaceClient: WorkspaceClient,
params: VsNextPageParams,
signal?: AbortSignal,
): Promise<VsRawResponse> {
if (signal?.aborted) {
throw new Error("Query cancelled before execution");
}

logger.debug(
"Fetching next page for index %s (endpoint=%s)",
params.indexName,
params.endpointName,
);

return this.telemetry.startActiveSpan(
"vector-search.queryNextPage",
{
kind: SpanKind.CLIENT,
attributes: {
"db.system": "databricks",
"vs.index_name": params.indexName,
"vs.endpoint_name": params.endpointName,
},
},
async (span: Span) => {
try {
const response = (await workspaceClient.apiClient.request({
method: "POST",
path: `/api/2.0/vector-search/indexes/${params.indexName}/query-next-page`,
body: {
endpoint_name: params.endpointName,
page_token: params.pageToken,
},
headers: new Headers({ "Content-Type": "application/json" }),
raw: false,
query: {},
})) as VsRawResponse;

span.setAttribute("vs.result_count", response.result.row_count);
span.setStatus({ code: SpanStatusCode.OK });
return response;
} catch (error) {
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : String(error),
});
throw error;
}
},
{ name: "vector-search", includePrefix: true },
);
}
}
2 changes: 2 additions & 0 deletions packages/appkit/src/connectors/vector-search/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./client";
export * from "./types";
42 changes: 42 additions & 0 deletions packages/appkit/src/connectors/vector-search/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { TelemetryOptions } from "shared";

export interface VectorSearchConnectorConfig {
timeout?: number;
telemetry?: TelemetryOptions;
}

export interface VsQueryParams {
indexName: string;
queryText?: string;
queryVector?: number[];
columns: string[];
numResults: number;
queryType: "ann" | "hybrid" | "full_text";
filters?: Record<string, string | number | boolean | (string | number)[]>;
reranker?: { columnsToRerank: string[] };
}

export interface VsNextPageParams {
indexName: string;
endpointName: string;
pageToken: string;
}

export interface VsRawResponse {
manifest: {
column_count: number;
columns: Array<{ name: string; type?: string }>;
};
result: {
row_count: number;
data_array: unknown[][];
};
next_page_token?: string | null;
debug_info?: {
response_time?: number;
ann_time?: number;
embedding_gen_time?: number;
latency_ms?: number;
[key: string]: unknown;
};
}
1 change: 1 addition & 0 deletions packages/appkit/src/plugins/index.ts
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also remember about the docs (docs/docs/plugins) - we need to ensure the plugin is documented. Thank you!

Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from "./files";
export * from "./genie";
export * from "./lakebase";
export * from "./server";
export * from "./vector-search";
7 changes: 7 additions & 0 deletions packages/appkit/src/plugins/vector-search/defaults.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { PluginExecuteConfig } from "shared";

export const vectorSearchDefaults: PluginExecuteConfig = {
cache: { enabled: false },
retry: { enabled: true, initialDelay: 1000, attempts: 3 },
timeout: 30_000,
};
2 changes: 2 additions & 0 deletions packages/appkit/src/plugins/vector-search/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./vector-search";
export * from "./types";
22 changes: 22 additions & 0 deletions packages/appkit/src/plugins/vector-search/manifest.json
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to update the template/ and show some simple case when someone enables the plugin - just as a starter point for the app modification. See how the Lakebase or Genie plugins are featured in the template.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "https://databricks.github.io/appkit/schemas/plugin-manifest.schema.json",
"name": "vector-search",
"displayName": "Vector Search Plugin",
"description": "Query Databricks Vector Search indexes with built-in hybrid search, reranking, and pagination",
"resources": {
"required": [],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have a Vector Search endpoint / index resources here?

This part is crucial for the databricks apps init flow, where a user can enable/disable specific plugins.

"optional": []
},
"config": {
"schema": {
"type": "object",
"properties": {
"timeout": {
"type": "number",
"default": 30000,
"description": "Query execution timeout in milliseconds"
}
}
}
}
}
Loading