From 8af9492de048a45d1fb85d2cf7eeb9df9bef7957 Mon Sep 17 00:00:00 2001 From: Carina Koo Date: Wed, 8 Jan 2025 18:08:12 -0800 Subject: [PATCH] snowflake connector handles timeout and abortsignal Signed-off-by: Carina Koo --- .../src/snowflake_connection.ts | 14 +++++++++++-- .../src/snowflake_executor.ts | 21 ++++++++++++++----- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/packages/malloy-db-snowflake/src/snowflake_connection.ts b/packages/malloy-db-snowflake/src/snowflake_connection.ts index 79c0f2322d..3f7fd6f47e 100644 --- a/packages/malloy-db-snowflake/src/snowflake_connection.ts +++ b/packages/malloy-db-snowflake/src/snowflake_connection.ts @@ -63,6 +63,9 @@ export interface SnowflakeConnectionOptions { scratchSpace?: namespace; queryOptions?: RunSQLOptions; + + // Timeout for the statement + timeoutMs?: number; } type PathChain = @@ -207,6 +210,11 @@ class SnowArray extends SnowField { } } +/** + * Default statement timeoutMs value, 10 Mins + */ +const TIMEOUT_MS = 1000 * 60 * 10; + export class SnowflakeConnection extends BaseConnection implements @@ -221,6 +229,7 @@ export class SnowflakeConnection // the database & schema where we do temporary operations like creating a temp table private scratchSpace?: namespace; private queryOptions: RunSQLOptions; + private timeoutMs: number; constructor( public readonly name: string, @@ -235,6 +244,7 @@ export class SnowflakeConnection this.executor = new SnowflakeExecutor(connOptions, options?.poolOptions); this.scratchSpace = options?.scratchSpace; this.queryOptions = options?.queryOptions ?? {}; + this.timeoutMs = options?.timeoutMs ?? TIMEOUT_MS; } get dialectName(): string { @@ -273,10 +283,10 @@ export class SnowflakeConnection public async runSQL( sql: string, - options?: RunSQLOptions + options: RunSQLOptions = {} ): Promise { const rowLimit = options?.rowLimit ?? this.queryOptions?.rowLimit; - let rows = await this.executor.batch(sql); + let rows = await this.executor.batch(sql, options, this.timeoutMs); if (rowLimit !== undefined && rows.length > rowLimit) { rows = rows.slice(0, rowLimit); } diff --git a/packages/malloy-db-snowflake/src/snowflake_executor.ts b/packages/malloy-db-snowflake/src/snowflake_executor.ts index da5274cf55..46f4782564 100644 --- a/packages/malloy-db-snowflake/src/snowflake_executor.ts +++ b/packages/malloy-db-snowflake/src/snowflake_executor.ts @@ -56,6 +56,7 @@ export interface ConnectionConfigFile { // return ret; // } + export class SnowflakeExecutor { private static defaultPoolOptions_: PoolOptions = { min: 1, @@ -149,15 +150,25 @@ export class SnowflakeExecutor { }); } - public async _execute(sqlText: string, conn: Connection): Promise { - return new Promise((resolve, reject) => { - const _statment = conn.execute({ + public async _execute(sqlText: string, conn: Connection, options?: RunSQLOptions, timeoutMs?: number): Promise { + let _statement: RowStatement | undefined; + const cancel = () => { + _statement?.cancel(); + } + const timeoutId = timeoutMs ? setTimeout(cancel, timeoutMs) : undefined; + options?.abortSignal?.addEventListener('abort', cancel); + return await new Promise((resolve, reject) => { + _statement = conn.execute({ sqlText, complete: ( err: SnowflakeError | undefined, _stmt: RowStatement, rows?: QueryData ) => { + options?.abortSignal?.removeEventListener('abort', cancel); + if (timeoutId) { + clearTimeout(timeoutId); + } if (err) { reject(err); } else if (rows) { @@ -186,10 +197,10 @@ export class SnowflakeExecutor { ); } - public async batch(sqlText: string): Promise { + public async batch(sqlText: string, options?: RunSQLOptions, timeoutMs?: number): Promise { return await this.pool_.use(async (conn: Connection) => { await this._setSessionParams(conn); - return await this._execute(sqlText, conn); + return await this._execute(sqlText, conn, options, timeoutMs); }); }