Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ To disable Cloud Fetch (e.g., when handling smaller datasets or to avoid additio
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?useCloudFetch=false
```

### Telemetry Configuration (Optional)

The driver includes optional telemetry to help improve performance and reliability. Telemetry is **disabled by default** and requires explicit opt-in.

**Opt-in to telemetry** (respects server-side feature flags):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=true
```

**Opt-out of telemetry** (explicitly disable):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=false
```

**Advanced configuration** (for testing/debugging):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?forceEnableTelemetry=true
```

**What data is collected:**
- ✅ Query latency and performance metrics
- ✅ Error codes (not error messages)
- ✅ Feature usage (CloudFetch, LZ4, etc.)
- ✅ Driver version and environment info

**What is NOT collected:**
- ❌ SQL query text
- ❌ Query results or data values
- ❌ Table/column names
- ❌ User identities or credentials

Telemetry has < 1% performance overhead and uses circuit breaker protection to ensure it never impacts your queries. For more details, see `telemetry/DESIGN.md` and `telemetry/TROUBLESHOOTING.md`.

### Connecting with a new Connector

You can also connect with a new connector object. For example:
Expand Down
19 changes: 17 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dbsql

Check failure on line 1 in connection.go

View workflow job for this annotation

GitHub Actions / Lint

: # github.com/databricks/databricks-sql-go [github.com/databricks/databricks-sql-go.test]

import (
"context"
Expand Down Expand Up @@ -216,7 +216,15 @@
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for tracking row fetching metrics
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, ctx, telemetryUpdate)
return rows, err

}
Expand Down Expand Up @@ -646,7 +654,14 @@
}

if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for staging operation row fetching
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, ctx, telemetryUpdate)
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
}
Expand Down
16 changes: 8 additions & 8 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1040 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1040 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1040 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery
assert.Error(t, err)
assert.Nil(t, exStmtResp)
assert.Nil(t, opStatusResp)
Expand Down Expand Up @@ -1079,7 +1079,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1082 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1082 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1082 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1125,7 +1125,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1128 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1128 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1172,7 +1172,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1175 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1175 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1225,7 +1225,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1228 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1228 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1277,7 +1277,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1280 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1280 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1330,7 +1330,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1333 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1333 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1383,7 +1383,7 @@
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

Check failure on line 1386 in connection_test.go

View workflow job for this annotation

GitHub Actions / Test and Build (1.20.x, ubuntu-latest)

too many arguments in call to testConn.runQuery

Check failure on line 1386 in connection_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to testConn.runQuery

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down
6 changes: 6 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
}

protocolVersion := int64(c.cfg.ThriftProtocolVersion)

sessionStart := time.Now()
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
ClientProtocolI64: &protocolVersion,
Configuration: sessionParams,
Expand All @@ -64,6 +66,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
},
CanUseMultipleCatalogs: &c.cfg.CanUseMultipleCatalogs,
})
sessionLatencyMs := time.Since(sessionStart).Milliseconds()

if err != nil {
return nil, dbsqlerrint.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
}
Expand All @@ -80,11 +84,13 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
conn.telemetry = telemetry.InitializeForConnection(
ctx,
c.cfg.Host,
c.cfg.DriverVersion,
c.client,
c.cfg.EnableTelemetry,
)
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs)
}

log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (ucfg UserConfig) WithDefaults() UserConfig {
ucfg.UseLz4Compression = false
ucfg.CloudFetchConfig = CloudFetchConfig{}.WithDefaults()

// EnableTelemetry defaults to unset (ConfigValue zero value),
// meaning telemetry is controlled by server feature flags.

return ucfg
}

Expand Down
52 changes: 44 additions & 8 deletions internal/rows/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ type rows struct {
logger_ *dbsqllog.DBSQLLogger

ctx context.Context

// Telemetry tracking
telemetryCtx context.Context
telemetryUpdate func(chunkCount int, bytesDownloaded int64)
chunkCount int
bytesDownloaded int64
}

var _ driver.Rows = (*rows)(nil)
Expand All @@ -72,6 +78,8 @@ func NewRows(
client cli_service.TCLIService,
config *config.Config,
directResults *cli_service.TSparkDirectResults,
telemetryCtx context.Context,
telemetryUpdate func(chunkCount int, bytesDownloaded int64),
) (driver.Rows, dbsqlerr.DBError) {

connId := driverctx.ConnIdFromContext(ctx)
Expand Down Expand Up @@ -103,14 +111,18 @@ func NewRows(
logger.Debug().Msgf("databricks: creating Rows, pageSize: %d, location: %v", pageSize, location)

r := &rows{
client: client,
opHandle: opHandle,
connId: connId,
correlationId: correlationId,
location: location,
config: config,
logger_: logger,
ctx: ctx,
client: client,
opHandle: opHandle,
connId: connId,
correlationId: correlationId,
location: location,
config: config,
logger_: logger,
ctx: ctx,
telemetryCtx: telemetryCtx,
telemetryUpdate: telemetryUpdate,
chunkCount: 0,
bytesDownloaded: 0,
}

// if we already have results for the query do some additional initialization
Expand All @@ -127,6 +139,17 @@ func NewRows(
if err != nil {
return r, err
}

r.chunkCount++
if directResults.ResultSet != nil && directResults.ResultSet.Results != nil && directResults.ResultSet.Results.ArrowBatches != nil {
for _, batch := range directResults.ResultSet.Results.ArrowBatches {
r.bytesDownloaded += int64(len(batch.Batch))
}
}

if r.telemetryUpdate != nil {
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
}
}

var d rowscanner.Delimiter
Expand Down Expand Up @@ -458,6 +481,19 @@ func (r *rows) fetchResultPage() error {
return err1
}

r.chunkCount++
if fetchResult != nil && fetchResult.Results != nil {
if fetchResult.Results.ArrowBatches != nil {
for _, batch := range fetchResult.Results.ArrowBatches {
r.bytesDownloaded += int64(len(batch.Batch))
}
}
}

if r.telemetryUpdate != nil {
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
}

err1 = r.makeRowScanner(fetchResult)
if err1 != nil {
return err1
Expand Down
18 changes: 9 additions & 9 deletions internal/rows/rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestColumnsWithDirectResults(t *testing.T) {
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")

d, err := NewRows(ctx, nil, client, nil, nil)
d, err := NewRows(ctx, nil, client, nil, nil, nil, nil)
assert.Nil(t, err)

rowSet := d.(*rows)
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestRowsCloseOptimization(t *testing.T) {
ctx := driverctx.NewContextWithConnId(context.Background(), "connId")
ctx = driverctx.NewContextWithCorrelationId(ctx, "corrId")
opHandle := &cli_service.TOperationHandle{OperationId: &cli_service.THandleIdentifier{GUID: []byte{'f', 'o'}}}
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil)
rowSet, _ := NewRows(ctx, opHandle, client, nil, nil, nil, nil)

// rowSet has no direct results calling Close should result in call to client to close operation
err := rowSet.Close()
Expand All @@ -733,7 +733,7 @@ func TestRowsCloseOptimization(t *testing.T) {
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
}
closeCount = 0
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults)
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
err = rowSet.Close()
assert.Nil(t, err, "rows.Close should not throw an error")
assert.Equal(t, 1, closeCount)
Expand All @@ -746,7 +746,7 @@ func TestRowsCloseOptimization(t *testing.T) {
ResultSetMetadata: &cli_service.TGetResultSetMetadataResp{Schema: &cli_service.TTableSchema{}},
ResultSet: &cli_service.TFetchResultsResp{Results: &cli_service.TRowSet{Columns: []*cli_service.TColumn{}}},
}
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults)
rowSet, _ = NewRows(ctx, opHandle, client, nil, directResults, nil, nil)
err = rowSet.Close()
assert.Nil(t, err, "rows.Close should not throw an error")
assert.Equal(t, 0, closeCount)
Expand Down Expand Up @@ -816,7 +816,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand Down Expand Up @@ -889,7 +889,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1, fetchResp2, fetchResp3})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, nil)
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, nil)
rows, err := NewRows(ctx, nil, client, cfg, nil, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand All @@ -977,7 +977,7 @@ func TestGetArrowBatches(t *testing.T) {

client := getSimpleClient([]cli_service.TFetchResultsResp{})
cfg := config.WithDefaults()
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
rows, err := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
Expand Down Expand Up @@ -1556,7 +1556,7 @@ func TestFetchResultPage_PropagatesGetNextPageError(t *testing.T) {

executeStatementResp := cli_service.TExecuteStatementResp{}
cfg := config.WithDefaults()
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults)
rows, _ := NewRows(ctx, nil, client, cfg, executeStatementResp.DirectResults, nil, nil)
// Call Next and ensure it propagates the error from getNextPage
actualErr := rows.Next(nil)

Expand Down
Loading
Loading