diff --git a/sdks/go/pkg/beam/io/spannerio/CHANGESTREAM.md b/sdks/go/pkg/beam/io/spannerio/CHANGESTREAM.md new file mode 100644 index 000000000000..6529db3ec87d --- /dev/null +++ b/sdks/go/pkg/beam/io/spannerio/CHANGESTREAM.md @@ -0,0 +1,241 @@ +# Spanner Change Streams — Go Implementation Notes + +This document describes the design of the `ReadChangeStream` connector, the correctness and +scalability guarantees it provides, and what an implementor needs to know when extending or +operating it. + +--- + +## Architecture overview + +`ReadChangeStream` is implemented as a **Splittable DoFn (SDF)** with an in-memory partition +queue encoded inside the restriction. There is no Spanner metadata table and no external +coordination service. The Beam runner's native checkpoint/restart mechanism provides all +durability. + +``` +beam.Impulse ──► readChangeStreamFn (SDF) + │ + │ Restriction = PartitionQueueRestriction + │ {Pending: []PartitionWork, Bounded: bool} + │ + ▼ + PCollection +``` + +### Partition lifecycle + +Spanner change stream partitions form a tree. The root query (empty partition token) returns +`ChildPartitionsRecord` rows that name the initial leaf partitions. Each leaf may itself +return more `ChildPartitionsRecord` rows as Spanner internally reshards. + +``` +Root query (token="") + └── ChildPartitionsRecord → [token-A, token-B, ...] + ├── token-A → DataChangeRecords ... → PartitionEndRecord + └── token-B → DataChangeRecords ... → ChildPartitionsRecord → [token-C] + └── token-C → ... +``` + +The SDF models this as a work queue (`PartitionQueueRestriction.Pending`). The root entry +(empty token) is enqueued at startup. When a `ChildPartitionsRecord` is received, the child +tokens are appended to the queue via `TryClaim`. When a natural end is reached, the active +partition is dequeued. + +--- + +## Scalability: Aggressive TrySplit + +A naive queue-based design would process all partitions sequentially on a single worker. +The Go implementation avoids this through **aggressive `TrySplit`**. + +When the runner calls `TrySplit(fraction > 0)`: + +- **Primary** restriction keeps only the currently active partition (`Pending[0]`). +- **Residual** restriction gets all remaining partitions (`Pending[1:]`). + +The runner recursively splits the residual, eventually producing one restriction per partition. +Each restriction is dispatched to a separate worker, achieving **per-partition parallelism** +with no external coordination. + +``` +Initial: [root, A, B, C, D] +Split 1: Primary=[root] Residual=[A, B, C, D] +Split 2: Primary=[A] Residual=[B, C, D] +Split 3: Primary=[B] Residual=[C, D] +... +``` + +> **Note:** The initial partition set is not known until the root query completes. +> Until the first `ChildPartitionsRecord` arrives, there is only one restriction on one worker. +> Splitting begins as soon as the first child tokens are enqueued (typically within one +> `defaultCheckpointInterval` = 10 seconds of startup). + +For `TrySplit(fraction == 0)` (self-checkpoint), the primary becomes empty (done) and +the residual continues from the current restriction. This is how the SDF periodically yields +to allow checkpoint serialisation. + +--- + +## Durability: At-least-once delivery + +On every self-checkpoint (every 10 seconds by default), the Beam runner serialises the +`PartitionQueueRestriction` to durable storage. The restriction encodes: + +- The current partition token and its last processed `StartTimestamp`. +- All queued child partitions and their start timestamps. + +After a worker failure or restart, the runner deserialises the last committed restriction and +resumes `ProcessElement` from exactly the last claimed timestamp. No records are skipped, but +records between the last claimed timestamp and the failure may be re-emitted. + +**Delivery guarantee: at-least-once.** Pipelines that require exactly-once semantics must +deduplicate downstream (e.g., using a Spanner UPSERT keyed on `(PartitionToken, CommitTimestamp, RecordSequence, ServerTransactionID)`). + +--- + +## Watermark correctness + +The watermark controls how downstream windowing operations advance. An incorrect watermark +can cause records to be dropped as "late data". + +The `changeStreamWatermarkEstimator` tracks two values: + +| Field | Meaning | Sentinel | +| ------------- | ------------------------------------------------------- | --------------------------------------- | +| `maxObserved` | Highest commit/heartbeat timestamp seen so far | `math.MinInt64` (not yet advanced) | +| `minPending` | Minimum `StartTimestamp` of all partitions in the queue | `math.MaxInt64` (no pending partitions) | + +`CurrentWatermark()` returns `min(maxObserved, minPending)`. + +This prevents the watermark from advancing past data that has not yet been emitted. Without +`minPending`, if partition A advances the watermark to T1, a queued partition B with data at +T0 < T1 would arrive as late data. + +After aggressive `TrySplit`, each restriction holds exactly one partition, so in steady state +`minPending == maxObserved == the partition's current position`. The dual-state design is +necessary for correctness during the brief window before splitting occurs. + +--- + +## Transient error resilience + +Spanner streaming reads can return transient gRPC errors, particularly during: + +- Spanner backend maintenance (`UNAVAILABLE`) +- Transaction contention or leadership changes (`ABORTED`) + +Rather than failing the bundle, these errors trigger a checkpoint-and-retry: + +``` +UNAVAILABLE / ABORTED → ResumeProcessingIn(1 second) +``` + +The restriction records the last committed timestamp, so the retry resumes exactly from where +reading left off. Non-retryable errors fail the bundle normally. + +--- + +## Public API + +```go +records := spannerio.ReadChangeStream( + s, + "projects/my-project/instances/my-instance/databases/my-db", + "MyStream", // change stream name (must match [A-Za-z_][A-Za-z0-9_]*) + startTime, // inclusive start timestamp + time.Time{}, // zero value = unbounded (runs indefinitely) + 10_000, // heartbeat interval in milliseconds +) +// records is a beam.PCollection of spannerio.DataChangeRecord +``` + +### DataChangeRecord fields + +| Field | Type | Description | +| -------------------------------------- | ------------------- | ------------------------------------------------------------------------ | +| `PartitionToken` | `string` | Change stream partition that produced this record | +| `CommitTimestamp` | `time.Time` | When the mutations were committed | +| `RecordSequence` | `string` | Monotonically increasing within a partition for a given commit timestamp | +| `ServerTransactionID` | `string` | Globally unique transaction identifier | +| `IsLastRecordInTransactionInPartition` | `bool` | Whether this is the final record for this transaction in this partition | +| `Table` | `string` | Modified table name | +| `ColumnMetadata` | `[]*ColumnMetadata` | Column names, types, and key membership | +| `Mods` | `[]*Mod` | Per-row changes with `Keys`, `OldValues`, `NewValues` | +| `ModType` | `ModType` | `ModTypeInsert`, `ModTypeUpdate`, or `ModTypeDelete` | +| `ValueCaptureType` | `ValueCaptureType` | Which values are captured (see Spanner docs) | +| `NumberOfRecordsInTransaction` | `int32` | Total `DataChangeRecord`s for this transaction across all partitions | +| `NumberOfPartitionsInTransaction` | `int32` | Total partitions that produced records for this transaction | +| `TransactionTag` | `string` | Application-defined transaction tag | +| `IsSystemTransaction` | `bool` | True for Spanner-internal transactions (e.g., TTL) | + +`Mod.Keys`, `Mod.OldValues`, and `Mod.NewValues` are slices of `*ModValue`. Each `ModValue` +holds a column name and its value as a JSON-encoded string using the Spanner JSON value format +(e.g., `"\"hello\""` for a string, `"42"` for a number). + +--- + +## Beam metrics + +Three counters are emitted under the namespace `spannerio.changestream`: + +| Metric | Description | +| ---------------------- | ------------------------------------------------------------------ | +| `records_emitted` | Total `DataChangeRecord`s emitted by this stage | +| `partitions_completed` | Total partitions that reached a natural end (`PartitionEndRecord`) | +| `errors_transient` | Total transient errors that triggered a checkpoint-and-retry | + +Access these via the Beam metrics API or your runner's monitoring UI (e.g., Dataflow Monitoring). + +--- + +## Logging + +Structured log output is emitted using the Beam log package at the following levels: + +| Level | Event | +| ------- | ----------------------------------------------------------------------- | +| `DEBUG` | `ProcessElement` start: partition token, start timestamp, pending count | +| `INFO` | Partition completed | +| `INFO` | Child partitions discovered: count and start timestamp | +| `WARN` | Transient error triggering checkpoint-and-retry | +| `ERROR` | Non-retryable error causing bundle failure | + +--- + +## Comparison to the Java implementation + +The Java SDK uses a Spanner metadata table (`SpannerIO.readChangeStream` with +`MetadataDatabase`) to coordinate partitions across workers. The Go implementation +deliberately avoids this: + +| Concern | Java | Go | +| ----------------------- | -------------------------------------- | ------------------------------------------------------------------------------------------------------------ | +| Partition coordination | Spanner metadata table | SDF restriction (Beam runner state) | +| External dependencies | Spanner metadata DB required | None beyond the source database | +| Durability | Metadata table survives runner restart | Runner checkpoint storage | +| Partition deduplication | Metadata table tracks seen tokens | Not needed (newer Spanner API guarantees each token appears in exactly one parent's `ChildPartitionsRecord`) | + +The trade-off is that the Go implementation relies on the runner's checkpoint storage for +durability rather than a persistent external store. For Dataflow, checkpoint state is backed +by Google Cloud Storage and is as durable as the metadata table approach. + +--- + +## Known limitations + +1. **No initial parallelism.** All partitions are discovered dynamically from the root query. + Until the first `ChildPartitionsRecord` arrives, there is one restriction on one worker. + Expect ~10 seconds of single-worker operation at pipeline start. + +2. **At-least-once delivery.** Records can be re-emitted after a worker failure. Deduplicate + downstream if exactly-once semantics are required. + +3. **Heartbeat records are not emitted.** Heartbeat records advance the watermark internally + but are not output as pipeline elements. If you need explicit heartbeat visibility, add a + side output in a downstream DoFn. + +4. **SQL injection guard is name-pattern only.** `ReadChangeStream` panics if the change + stream name does not match `[A-Za-z_][A-Za-z0-9_]*`. The name is still interpolated + into SQL (Spanner does not accept parameterised function names), so the regex is the + complete safety measure. diff --git a/sdks/go/pkg/beam/io/spannerio/changestream_read.go b/sdks/go/pkg/beam/io/spannerio/changestream_read.go new file mode 100644 index 000000000000..d11c4edd02fc --- /dev/null +++ b/sdks/go/pkg/beam/io/spannerio/changestream_read.go @@ -0,0 +1,590 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "context" + "encoding/json" + "fmt" + "math" + "reflect" + "regexp" + "time" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ---- Internal STRUCT types matching the Spanner change stream SQL response ---- +// +// Spanner change stream SQL queries return ARRAY> rows in the old +// change stream format. These internal types decode that format via the +// spanner: struct tag. + +type csRow struct { + DataChangeRecord []*csDataChangeRecord `spanner:"data_change_record"` + HeartbeatRecord []*csHeartbeatRecord `spanner:"heartbeat_record"` + ChildPartitionsRecord []*csChildPartitionsRecord `spanner:"child_partitions_record"` +} + +type csDataChangeRecord struct { + CommitTimestamp time.Time `spanner:"commit_timestamp"` + RecordSequence string `spanner:"record_sequence"` + ServerTransactionID string `spanner:"server_transaction_id"` + IsLastRecord bool `spanner:"is_last_record_in_transaction_in_partition"` + TableName string `spanner:"table_name"` + ColumnTypes []*csColumnType `spanner:"column_types"` + Mods []*csMod `spanner:"mods"` + ModType string `spanner:"mod_type"` + ValueCaptureType string `spanner:"value_capture_type"` + NumberOfRecords int64 `spanner:"number_of_records_in_transaction"` + NumberOfPartitions int64 `spanner:"number_of_partitions_in_transaction"` + TransactionTag string `spanner:"transaction_tag"` + IsSystemTransaction bool `spanner:"is_system_transaction"` +} + +type csColumnType struct { + Name string `spanner:"name"` + Type spanner.NullJSON `spanner:"type"` + IsPrimaryKey bool `spanner:"is_primary_key"` + OrdinalPosition int64 `spanner:"ordinal_position"` +} + +type csMod struct { + Keys spanner.NullJSON `spanner:"keys"` + NewValues spanner.NullJSON `spanner:"new_values"` + OldValues spanner.NullJSON `spanner:"old_values"` +} + +type csHeartbeatRecord struct { + Timestamp time.Time `spanner:"timestamp"` +} + +type csChildPartitionsRecord struct { + StartTimestamp time.Time `spanner:"start_timestamp"` + RecordSequence string `spanner:"record_sequence"` + ChildPartitions []*csChildPartition `spanner:"child_partitions"` +} + +type csChildPartition struct { + Token string `spanner:"token"` + ParentPartitionTokens []string `spanner:"parent_partition_tokens"` +} + +// ---- SDF registration ---- + +func init() { + register.DoFn5x2[ + context.Context, *changeStreamWatermarkEstimator, *sdf.LockRTracker, []byte, + func(beam.EventTime, DataChangeRecord), sdf.ProcessContinuation, error, + ](&readChangeStreamFn{}) + register.Emitter2[beam.EventTime, DataChangeRecord]() + beam.RegisterType(reflect.TypeOf((*DataChangeRecord)(nil)).Elem()) +} + +// defaultCheckpointInterval is how long a single ProcessElement invocation +// queries Spanner before self-checkpointing and resuming. +const defaultCheckpointInterval = 10 * time.Second + +// validStreamName matches Spanner change stream identifiers, which follow +// standard SQL identifier rules. Used to prevent SQL injection in the +// READ_() query built by buildStatement. +var validStreamName = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) + +// Beam metrics for observability. +var ( + recordsEmitted = beam.NewCounter("spannerio.changestream", "records_emitted") + partitionsCompleted = beam.NewCounter("spannerio.changestream", "partitions_completed") + errorsTransient = beam.NewCounter("spannerio.changestream", "errors_transient") +) + +// ReadChangeStreamOption configures ReadChangeStream. +type ReadChangeStreamOption func(*readChangeStreamFn) + +// WithChangeStreamTestEndpoint sets a custom Spanner endpoint, typically used +// for the Spanner emulator in tests. +func WithChangeStreamTestEndpoint(endpoint string) ReadChangeStreamOption { + return func(fn *readChangeStreamFn) { + fn.TestEndpoint = endpoint + } +} + +// ReadChangeStream reads DataChangeRecords from a Spanner change stream and +// returns a PCollection. +// +// db is the Spanner database path +// (projects/{project}/instances/{instance}/databases/{database}). +// changeStreamName is the name of the change stream (e.g. "MyStream"). +// startTime is the inclusive start of the time range to read. +// endTime is the exclusive end of the time range. Use time.Time{} for an +// unbounded (continuously running) read. +// heartbeatMillis controls how often Spanner emits heartbeat records when +// there are no data changes (milliseconds). +func ReadChangeStream( + s beam.Scope, + db string, + changeStreamName string, + startTime time.Time, + endTime time.Time, + heartbeatMillis int64, + opts ...ReadChangeStreamOption, +) beam.PCollection { + s = s.Scope("spannerio.ReadChangeStream") + + if !validStreamName.MatchString(changeStreamName) { + panic(fmt.Sprintf("spannerio.ReadChangeStream: invalid change stream name %q: must match %s", changeStreamName, validStreamName.String())) + } + + fn := &readChangeStreamFn{ + spannerFn: newSpannerFn(db), + ChangeStreamName: changeStreamName, + StartTime: startTime, + EndTime: endTime, + HeartbeatMillis: heartbeatMillis, + } + for _, opt := range opts { + opt(fn) + } + + imp := beam.Impulse(s) + return beam.ParDo(s, fn, imp) +} + +// readChangeStreamFn is the SDF that reads a Spanner change stream. +// +// The entire partition work queue is encoded inside the restriction +// (PartitionQueueRestriction), which the Beam runner serialises on every +// checkpoint. This provides durable, in-memory coordination with no external +// state store. +type readChangeStreamFn struct { + spannerFn + ChangeStreamName string + StartTime time.Time + EndTime time.Time // zero = unbounded + HeartbeatMillis int64 +} + +func (fn *readChangeStreamFn) Setup(ctx context.Context) error { + return fn.spannerFn.Setup(ctx) +} + +func (fn *readChangeStreamFn) Teardown() { + fn.spannerFn.Teardown() +} + +// CreateInitialRestriction returns a restriction with a single root entry +// (empty partition token) that bootstraps the partition discovery query. +func (fn *readChangeStreamFn) CreateInitialRestriction(_ []byte) PartitionQueueRestriction { + return PartitionQueueRestriction{ + Pending: []PartitionWork{ + { + Token: "", // empty token = root discovery query + StartTimestamp: fn.StartTime, + EndTimestamp: fn.EndTime, + }, + }, + Bounded: !fn.EndTime.IsZero(), + } +} + +// SplitRestriction returns the restriction unsplit. The queue grows +// dynamically as child partitions are discovered. +func (fn *readChangeStreamFn) SplitRestriction(_ []byte, r PartitionQueueRestriction) []PartitionQueueRestriction { + return []PartitionQueueRestriction{r} +} + +// RestrictionSize returns an estimate of remaining work. +func (fn *readChangeStreamFn) RestrictionSize(_ []byte, r PartitionQueueRestriction) float64 { + _, remaining := newPartitionQueueTracker(r).GetProgress() + return remaining +} + +// CreateTracker wraps a partitionQueueTracker in a thread-safe LockRTracker. +func (fn *readChangeStreamFn) CreateTracker(r PartitionQueueRestriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(newPartitionQueueTracker(r)) +} + +// InitialWatermarkEstimatorState returns a sentinel state indicating no +// watermark has been observed yet and no pending partitions are known. +func (fn *readChangeStreamFn) InitialWatermarkEstimatorState( + _ beam.EventTime, + _ PartitionQueueRestriction, + _ []byte, +) []byte { + return encodeWatermarkState(math.MinInt64, math.MaxInt64) +} + +// CreateWatermarkEstimator creates the watermark estimator from its serialised state. +func (fn *readChangeStreamFn) CreateWatermarkEstimator(state []byte) *changeStreamWatermarkEstimator { + maxObserved, minPending := decodeWatermarkState(state) + return &changeStreamWatermarkEstimator{maxObserved: maxObserved, minPending: minPending} +} + +// WatermarkEstimatorState serialises the watermark estimator state for the runner. +func (fn *readChangeStreamFn) WatermarkEstimatorState(we *changeStreamWatermarkEstimator) []byte { + return encodeWatermarkState(we.maxObserved, we.minPending) +} + +// TruncateRestriction returns an empty (immediately done) restriction, used +// when the pipeline is draining. +func (fn *readChangeStreamFn) TruncateRestriction( + _ *sdf.LockRTracker, + _ []byte, +) PartitionQueueRestriction { + return PartitionQueueRestriction{Bounded: true} +} + +// ProcessElement reads from the active partition and emits DataChangeRecords. +// +// Each invocation queries Spanner for up to defaultCheckpointInterval, then +// returns ResumeProcessingIn(0) to self-checkpoint. The runner serialises the +// restriction (which holds the full partition queue), so work resumes exactly +// from the last claimed timestamp after a failure or checkpoint. +func (fn *readChangeStreamFn) ProcessElement( + ctx context.Context, + we *changeStreamWatermarkEstimator, + rt *sdf.LockRTracker, + _ []byte, + emit func(beam.EventTime, DataChangeRecord), +) (sdf.ProcessContinuation, error) { + rest := rt.GetRestriction().(PartitionQueueRestriction) + if len(rest.Pending) == 0 { + return sdf.StopProcessing(), nil + } + active := rest.Pending[0] + + // Update the watermark estimator with the minimum pending start timestamp + // so the watermark does not advance past unprocessed partitions. + minStart := active.StartTimestamp + for _, p := range rest.Pending[1:] { + if p.StartTimestamp.Before(minStart) { + minStart = p.StartTimestamp + } + } + we.SetMinPending(minStart) + + log.Debugf(ctx, "changestream: processing partition %q from %v (%d pending)", + active.Token, active.StartTimestamp, len(rest.Pending)) + + // Use a bounded context so we self-checkpoint periodically. + // For bounded partitions, extend the timeout to ensure the query can + // finish naturally before the context expires. Without this extension, + // the context can expire just before iterator.Done is returned (when + // the partition's time window is close to defaultCheckpointInterval), + // causing a spurious checkpoint that re-reads already-processed records. + checkpointInterval := defaultCheckpointInterval + if active.bounded() { + if remaining := time.Until(active.EndTimestamp); remaining+5*time.Second > checkpointInterval { + checkpointInterval = remaining + 5*time.Second + } + } + queryCtx, cancel := context.WithTimeout(ctx, checkpointInterval) + defer cancel() + + iter := fn.client.Single().Query(queryCtx, fn.buildStatement(active)) + defer iter.Stop() + + for { + row, err := iter.Next() + if err == iterator.Done { + // If the query context expired, it's time to checkpoint and resume. + // Otherwise the partition's query completed naturally. + if queryCtx.Err() != nil { + return sdf.ResumeProcessingIn(0), nil + } + // Natural end (bounded time window elapsed or partition split + // completed): mark the active partition as done. + rt.TryClaim(PartitionTimestampClaim{PartitionDone: true}) + partitionsCompleted.Inc(ctx, 1) + log.Infof(ctx, "changestream: partition %q completed", active.Token) + return sdf.ResumeProcessingIn(0), nil + } + if err != nil { + // If the query context expired (timeout or cancellation), checkpoint. + if queryCtx.Err() != nil { + return sdf.ResumeProcessingIn(0), nil + } + // If the parent context was cancelled (pipeline shutdown), stop. + if ctx.Err() != nil { + return sdf.StopProcessing(), nil + } + // Transient errors (UNAVAILABLE, ABORTED) are retryable: checkpoint + // and resume from the current position rather than failing the bundle. + if isTransientError(err) { + errorsTransient.Inc(ctx, 1) + log.Warnf(ctx, "changestream: transient error on partition %q, will checkpoint and retry: %v", active.Token, err) + return sdf.ResumeProcessingIn(time.Second), nil + } + return sdf.StopProcessing(), fmt.Errorf("reading change stream %q partition %q: %w", fn.ChangeStreamName, active.Token, err) + } + + var rows []*csRow + if err := row.Column(0, &rows); err != nil { + return sdf.StopProcessing(), fmt.Errorf("decoding ChangeRecord column: %w", err) + } + + for _, r := range rows { + stop, err := fn.handleCSRow(ctx, rt, we, emit, active.Token, *r) + if err != nil { + return sdf.StopProcessing(), err + } + if stop { + return sdf.ResumeProcessingIn(0), nil + } + } + } +} + +// handleCSRow dispatches a single change stream row. Returns (true, nil) if +// processing should pause (e.g., TryClaim rejected). +func (fn *readChangeStreamFn) handleCSRow( + ctx context.Context, + rt *sdf.LockRTracker, + we *changeStreamWatermarkEstimator, + emit func(beam.EventTime, DataChangeRecord), + token string, + r csRow, +) (stop bool, err error) { + for _, dcr := range r.DataChangeRecord { + rec := dataChangeRecordFromCS(token, *dcr) + if !rt.TryClaim(PartitionTimestampClaim{Timestamp: rec.CommitTimestamp}) { + return true, nil + } + we.ObserveTimestamp(rec.CommitTimestamp) + emit(mtime.FromTime(rec.CommitTimestamp), rec) + recordsEmitted.Inc(ctx, 1) + } + + for _, hb := range r.HeartbeatRecord { + if !rt.TryClaim(PartitionTimestampClaim{Timestamp: hb.Timestamp}) { + return true, nil + } + we.ObserveTimestamp(hb.Timestamp) + } + + for _, cpr := range r.ChildPartitionsRecord { + var children []PartitionWork + for _, cp := range cpr.ChildPartitions { + children = append(children, PartitionWork{ + Token: cp.Token, + StartTimestamp: cpr.StartTimestamp, + EndTimestamp: fn.EndTime, + }) + } + if !rt.TryClaim(PartitionTimestampClaim{ + Timestamp: cpr.StartTimestamp, + ChildPartitions: children, + }) { + return true, nil + } + log.Infof(ctx, "changestream: partition %q discovered %d child partitions at %v", + token, len(children), cpr.StartTimestamp) + } + + return false, nil +} + +// isTransientError returns true for gRPC errors that are safe to retry. +// These match the Java implementation's default retryable codes. +func isTransientError(err error) bool { + if s, ok := status.FromError(err); ok { + switch s.Code() { + case codes.Unavailable, codes.Aborted: + return true + } + } + return false +} + +// buildStatement constructs the Spanner SQL statement for a change stream partition. +func (fn *readChangeStreamFn) buildStatement(active PartitionWork) spanner.Statement { + params := map[string]interface{}{ + "start_timestamp": active.StartTimestamp, + "heartbeat_milliseconds": fn.HeartbeatMillis, + } + + // NULL partition_token triggers the initial partition discovery query. + if active.Token == "" { + params["partition_token"] = nil + } else { + params["partition_token"] = active.Token + } + + // NULL end_timestamp means read indefinitely. + if active.bounded() { + params["end_timestamp"] = active.EndTimestamp + } else { + params["end_timestamp"] = nil + } + + sql := fmt.Sprintf( + "SELECT ChangeRecord FROM READ_%s("+ + " start_timestamp => @start_timestamp,"+ + " end_timestamp => @end_timestamp,"+ + " partition_token => @partition_token,"+ + " heartbeat_milliseconds => @heartbeat_milliseconds"+ + ")", + fn.ChangeStreamName, + ) + return spanner.Statement{SQL: sql, Params: params} +} + +// dataChangeRecordFromCS converts a csDataChangeRecord (old STRUCT format) to +// our public DataChangeRecord type. +func dataChangeRecordFromCS(token string, cs csDataChangeRecord) DataChangeRecord { + r := DataChangeRecord{ + PartitionToken: token, + CommitTimestamp: cs.CommitTimestamp, + RecordSequence: cs.RecordSequence, + ServerTransactionID: cs.ServerTransactionID, + IsLastRecordInTransactionInPartition: cs.IsLastRecord, + Table: cs.TableName, + ModType: parseModType(cs.ModType), + ValueCaptureType: parseValueCaptureType(cs.ValueCaptureType), + NumberOfRecordsInTransaction: int32(cs.NumberOfRecords), + NumberOfPartitionsInTransaction: int32(cs.NumberOfPartitions), + TransactionTag: cs.TransactionTag, + IsSystemTransaction: cs.IsSystemTransaction, + } + + for _, ct := range cs.ColumnTypes { + col := &ColumnMetadata{ + Name: ct.Name, + IsPrimaryKey: ct.IsPrimaryKey, + OrdinalPosition: ct.OrdinalPosition, + } + if ct.Type.Valid { + col.TypeCode, col.ArrayElementTypeCode = parseTypeJSON(ct.Type.Value) + } + r.ColumnMetadata = append(r.ColumnMetadata, col) + } + + for _, m := range cs.Mods { + mod := &Mod{ + Keys: jsonObjToModValues(m.Keys), + NewValues: jsonObjToModValues(m.NewValues), + OldValues: jsonObjToModValues(m.OldValues), + } + r.Mods = append(r.Mods, mod) + } + + return r +} + +// jsonObjToModValues converts a NullJSON object (map of column name → value) +// into a slice of ModValue. +func jsonObjToModValues(nj spanner.NullJSON) []*ModValue { + if !nj.Valid || nj.Value == nil { + return nil + } + m, ok := nj.Value.(map[string]interface{}) + if !ok { + return nil + } + var out []*ModValue + for colName, v := range m { + b, _ := json.Marshal(v) + out = append(out, &ModValue{ColumnName: colName, Value: string(b)}) + } + return out +} + +// parseTypeJSON extracts the TypeCode (and array element type code) from the +// JSON column type object returned by the change stream, e.g. +// {"code":"STRING"} or {"code":"ARRAY","arrayElementType":{"code":"INT64"}}. +func parseTypeJSON(v interface{}) (spannerpb.TypeCode, spannerpb.TypeCode) { + m, ok := v.(map[string]interface{}) + if !ok { + return spannerpb.TypeCode_TYPE_CODE_UNSPECIFIED, spannerpb.TypeCode_TYPE_CODE_UNSPECIFIED + } + code := parseTypeCodeString(fmt.Sprintf("%v", m["code"])) + var elemCode spannerpb.TypeCode + if ae, ok := m["arrayElementType"].(map[string]interface{}); ok { + elemCode = parseTypeCodeString(fmt.Sprintf("%v", ae["code"])) + } + return code, elemCode +} + +func parseTypeCodeString(s string) spannerpb.TypeCode { + switch s { + case "BOOL": + return spannerpb.TypeCode_BOOL + case "INT64": + return spannerpb.TypeCode_INT64 + case "FLOAT32": + return spannerpb.TypeCode_FLOAT32 + case "FLOAT64": + return spannerpb.TypeCode_FLOAT64 + case "TIMESTAMP": + return spannerpb.TypeCode_TIMESTAMP + case "DATE": + return spannerpb.TypeCode_DATE + case "STRING": + return spannerpb.TypeCode_STRING + case "BYTES": + return spannerpb.TypeCode_BYTES + case "ARRAY": + return spannerpb.TypeCode_ARRAY + case "STRUCT": + return spannerpb.TypeCode_STRUCT + case "NUMERIC": + return spannerpb.TypeCode_NUMERIC + case "JSON": + return spannerpb.TypeCode_JSON + case "PROTO": + return spannerpb.TypeCode_PROTO + case "ENUM": + return spannerpb.TypeCode_ENUM + default: + return spannerpb.TypeCode_TYPE_CODE_UNSPECIFIED + } +} + +func parseModType(s string) ModType { + switch s { + case "INSERT": + return ModTypeInsert + case "UPDATE": + return ModTypeUpdate + case "DELETE": + return ModTypeDelete + default: + return ModTypeUnspecified + } +} + +func parseValueCaptureType(s string) ValueCaptureType { + switch s { + case "OLD_AND_NEW_VALUES": + return ValueCaptureTypeOldAndNewValues + case "NEW_VALUES": + return ValueCaptureTypeNewValues + case "NEW_ROW": + return ValueCaptureTypeNewRow + case "NEW_ROW_AND_OLD_VALUES": + return ValueCaptureTypeNewRowAndOldValues + default: + return ValueCaptureTypeUnspecified + } +} diff --git a/sdks/go/pkg/beam/io/spannerio/changestream_read_test.go b/sdks/go/pkg/beam/io/spannerio/changestream_read_test.go new file mode 100644 index 000000000000..4364c53e2e32 --- /dev/null +++ b/sdks/go/pkg/beam/io/spannerio/changestream_read_test.go @@ -0,0 +1,570 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "fmt" + "math" + "strings" + "testing" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ---- partitionQueueTracker tests ---- + +func makeWork(token string, startSec, endSec int64) PartitionWork { + var end time.Time + if endSec > 0 { + end = time.Unix(endSec, 0) + } + return PartitionWork{ + Token: token, + StartTimestamp: time.Unix(startSec, 0), + EndTimestamp: end, + } +} + +func TestTryClaim_AdvancesStartTimestamp(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + claimed := tr.TryClaim(PartitionTimestampClaim{Timestamp: time.Unix(150, 0)}) + if !claimed { + t.Fatal("expected TryClaim to return true") + } + if tr.rest.Pending[0].StartTimestamp != time.Unix(150, 0) { + t.Errorf("StartTimestamp not advanced: got %v", tr.rest.Pending[0].StartTimestamp) + } +} + +func TestTryClaim_DoesNotGoBackward(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + tr.TryClaim(PartitionTimestampClaim{Timestamp: time.Unix(150, 0)}) + tr.TryClaim(PartitionTimestampClaim{Timestamp: time.Unix(130, 0)}) // backward + if tr.rest.Pending[0].StartTimestamp != time.Unix(150, 0) { + t.Errorf("StartTimestamp went backward: got %v", tr.rest.Pending[0].StartTimestamp) + } +} + +func TestTryClaim_BeyondEndTime_ReturnsFalse(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + claimed := tr.TryClaim(PartitionTimestampClaim{Timestamp: time.Unix(250, 0)}) + if claimed { + t.Error("expected TryClaim to return false when timestamp exceeds EndTimestamp") + } + if !tr.stopped { + t.Error("expected tracker to be stopped") + } +} + +func TestTryClaim_WithChildPartitions(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("root", 100, 0)}, + Bounded: false, + } + tr := newPartitionQueueTracker(rest) + + children := []PartitionWork{makeWork("child1", 150, 0), makeWork("child2", 150, 0)} + tr.TryClaim(PartitionTimestampClaim{Timestamp: time.Unix(150, 0), ChildPartitions: children}) + + if len(tr.rest.Pending) != 3 { + t.Errorf("expected 3 pending (root + 2 children), got %d", len(tr.rest.Pending)) + } + if tr.rest.Pending[1].Token != "child1" || tr.rest.Pending[2].Token != "child2" { + t.Errorf("unexpected child tokens: %v", tr.rest.Pending) + } +} + +func TestTryClaim_PartitionDone_RemovesFront(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200), makeWork("tok2", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + claimed := tr.TryClaim(PartitionTimestampClaim{PartitionDone: true}) + if !claimed { + t.Fatal("expected TryClaim to return true") + } + if len(tr.rest.Pending) != 1 { + t.Errorf("expected 1 pending partition, got %d", len(tr.rest.Pending)) + } + if tr.rest.Pending[0].Token != "tok2" { + t.Errorf("expected tok2 to be next, got %q", tr.rest.Pending[0].Token) + } +} + +func TestTryClaim_AllDone_BoundedStops(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + // Remove the only partition. + claimed := tr.TryClaim(PartitionTimestampClaim{PartitionDone: true}) + if claimed { + t.Error("expected TryClaim to return false when all bounded partitions are done") + } + if !tr.IsDone() { + t.Error("expected IsDone to return true") + } +} + +func TestTryClaim_AfterStopped_ReturnsError(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + tr.stopped = true + + tr.TryClaim(PartitionTimestampClaim{Timestamp: time.Unix(150, 0)}) + if tr.GetError() == nil { + t.Error("expected error when TryClaim called after stopped") + } +} + +func TestTryClaim_WrongType_SetsError(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + tr.TryClaim("not a PartitionTimestampClaim") + if tr.GetError() == nil { + t.Error("expected error for wrong claim type") + } +} + +// ---- TrySplit tests ---- + +func TestTrySplit_SelfCheckpoint_FractionZero(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{ + makeWork("tok1", 100, 200), + makeWork("tok2", 100, 200), + }, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + primary, residual, err := tr.TrySplit(0) + if err != nil { + t.Fatalf("TrySplit(0) returned error: %v", err) + } + + prim := primary.(PartitionQueueRestriction) + res := residual.(PartitionQueueRestriction) + + if len(prim.Pending) != 0 { + t.Errorf("primary should be empty for self-checkpoint, got %d", len(prim.Pending)) + } + if len(res.Pending) != 2 { + t.Errorf("residual should have 2 partitions, got %d", len(res.Pending)) + } + if !tr.stopped { + t.Error("tracker should be stopped after self-checkpoint") + } +} + +func TestTrySplit_Aggressive(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{ + makeWork("active", 100, 200), + makeWork("tok1", 100, 200), + makeWork("tok2", 100, 200), + makeWork("tok3", 100, 200), + makeWork("tok4", 100, 200), + }, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + primary, residual, err := tr.TrySplit(0.5) + if err != nil { + t.Fatalf("TrySplit(0.5) returned error: %v", err) + } + + prim := primary.(PartitionQueueRestriction) + res := residual.(PartitionQueueRestriction) + + // Aggressive split: primary keeps only the active partition. + if len(prim.Pending) != 1 { + t.Errorf("primary should have 1 partition (active only), got %d", len(prim.Pending)) + } + if prim.Pending[0].Token != "active" { + t.Errorf("active partition should remain in primary, got %q", prim.Pending[0].Token) + } + // Residual gets all other partitions. + if len(res.Pending) != 4 { + t.Errorf("residual should have 4 partitions, got %d", len(res.Pending)) + } + if len(prim.Pending)+len(res.Pending) != 5 { + t.Errorf("total partitions should remain 5, got primary=%d residual=%d", + len(prim.Pending), len(res.Pending)) + } +} + +func TestTrySplit_NoTail_ReturnsNilResidual(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("active", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + primary, residual, err := tr.TrySplit(0.5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if residual != nil { + t.Error("residual should be nil when there is only one partition") + } + _ = primary +} + +func TestTrySplit_Stopped_ReturnsNilResidual(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + tr.stopped = true + + _, residual, err := tr.TrySplit(0) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if residual != nil { + t.Error("stopped tracker should return nil residual") + } +} + +// ---- IsDone tests ---- + +func TestIsDone_Bounded_EmptyQueue(t *testing.T) { + tr := newPartitionQueueTracker(PartitionQueueRestriction{Bounded: true}) + if !tr.IsDone() { + t.Error("empty bounded queue should be done") + } +} + +func TestIsDone_Bounded_NonEmptyQueue(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + if tr.IsDone() { + t.Error("non-empty bounded queue should not be done") + } +} + +func TestIsDone_Unbounded_RequiresStopped(t *testing.T) { + tr := newPartitionQueueTracker(PartitionQueueRestriction{Bounded: false}) + if tr.IsDone() { + t.Error("unbounded empty queue should not be done unless stopped") + } + tr.stopped = true + if !tr.IsDone() { + t.Error("stopped unbounded empty queue should be done") + } +} + +// ---- GetProgress tests ---- + +func TestGetProgress_TimeBased(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{ + makeWork("tok1", 100, 200), // 100s remaining + makeWork("tok2", 150, 250), // 100s remaining + }, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + _, remaining := tr.GetProgress() + wantNanos := float64(200 * time.Second.Nanoseconds()) + if remaining != wantNanos { + t.Errorf("GetProgress remaining = %v, want %v", remaining, wantNanos) + } +} + +func TestGetProgress_PartitionBased(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{ + makeWork("tok1", 100, 0), + makeWork("tok2", 100, 0), + makeWork("tok3", 100, 0), + }, + Bounded: false, + } + tr := newPartitionQueueTracker(rest) + _, remaining := tr.GetProgress() + if remaining != 3 { + t.Errorf("GetProgress remaining = %v, want 3", remaining) + } +} + +// ---- Coder tests ---- + +func TestEncodeDecodeRestriction(t *testing.T) { + original := PartitionQueueRestriction{ + Pending: []PartitionWork{ + makeWork("tok1", 100, 200), + makeWork("tok2", 150, 0), + }, + Bounded: true, + } + + encoded, err := encodePartitionQueueRestriction(original) + if err != nil { + t.Fatalf("encode error: %v", err) + } + + decoded, err := decodePartitionQueueRestriction(encoded) + if err != nil { + t.Fatalf("decode error: %v", err) + } + + if len(decoded.Pending) != len(original.Pending) { + t.Fatalf("decoded Pending length %d != original %d", len(decoded.Pending), len(original.Pending)) + } + for i, p := range decoded.Pending { + o := original.Pending[i] + if p.Token != o.Token { + t.Errorf("[%d] Token: got %q, want %q", i, p.Token, o.Token) + } + if !p.StartTimestamp.Equal(o.StartTimestamp) { + t.Errorf("[%d] StartTimestamp: got %v, want %v", i, p.StartTimestamp, o.StartTimestamp) + } + if !p.EndTimestamp.Equal(o.EndTimestamp) { + t.Errorf("[%d] EndTimestamp: got %v, want %v", i, p.EndTimestamp, o.EndTimestamp) + } + } + if decoded.Bounded != original.Bounded { + t.Errorf("Bounded: got %v, want %v", decoded.Bounded, original.Bounded) + } +} + +// ---- changeStreamWatermarkEstimator tests ---- + +func TestCurrentWatermark_Initial(t *testing.T) { + we := &changeStreamWatermarkEstimator{maxObserved: math.MinInt64, minPending: math.MaxInt64} + wm := we.CurrentWatermark() + if !wm.IsZero() { + t.Errorf("initial watermark should be zero time, got %v", wm) + } +} + +func TestObserveTimestamp_AdvancesWatermark(t *testing.T) { + we := &changeStreamWatermarkEstimator{maxObserved: math.MinInt64, minPending: math.MaxInt64} + t1 := time.Unix(1000, 0) + we.ObserveTimestamp(t1) + if !we.CurrentWatermark().Equal(t1) { + t.Errorf("watermark should be %v, got %v", t1, we.CurrentWatermark()) + } +} + +func TestObserveTimestamp_DoesNotGoBackward(t *testing.T) { + we := &changeStreamWatermarkEstimator{maxObserved: math.MinInt64, minPending: math.MaxInt64} + t1 := time.Unix(1000, 0) + t2 := time.Unix(500, 0) + we.ObserveTimestamp(t1) + we.ObserveTimestamp(t2) + if !we.CurrentWatermark().Equal(t1) { + t.Errorf("watermark should not go backward: got %v, want %v", we.CurrentWatermark(), t1) + } +} + +func TestObserveTimestamp_AdvancesForwardOnly(t *testing.T) { + we := &changeStreamWatermarkEstimator{maxObserved: math.MinInt64, minPending: math.MaxInt64} + timestamps := []time.Time{ + time.Unix(100, 0), + time.Unix(300, 0), + time.Unix(200, 0), + time.Unix(400, 0), + } + for _, ts := range timestamps { + we.ObserveTimestamp(ts) + } + want := time.Unix(400, 0) + if !we.CurrentWatermark().Equal(want) { + t.Errorf("watermark = %v, want %v", we.CurrentWatermark(), want) + } +} + +func TestCurrentWatermark_HeldByMinPending(t *testing.T) { + we := &changeStreamWatermarkEstimator{maxObserved: math.MinInt64, minPending: math.MaxInt64} + // Observe a high timestamp, but set a low min-pending. + we.ObserveTimestamp(time.Unix(1000, 0)) + we.SetMinPending(time.Unix(500, 0)) + // Watermark should be held back to min-pending. + want := time.Unix(500, 0) + if !we.CurrentWatermark().Equal(want) { + t.Errorf("watermark should be held at min-pending %v, got %v", want, we.CurrentWatermark()) + } +} + +func TestCurrentWatermark_MinPendingAboveObserved(t *testing.T) { + we := &changeStreamWatermarkEstimator{maxObserved: math.MinInt64, minPending: math.MaxInt64} + we.ObserveTimestamp(time.Unix(500, 0)) + we.SetMinPending(time.Unix(1000, 0)) + // When min-pending is above observed, watermark follows observed. + want := time.Unix(500, 0) + if !we.CurrentWatermark().Equal(want) { + t.Errorf("watermark should be at observed %v, got %v", want, we.CurrentWatermark()) + } +} + +func TestEncodeDecodeWatermarkState(t *testing.T) { + maxObs := int64(1234567890) + minPend := int64(9876543210) + b := encodeWatermarkState(maxObs, minPend) + gotMax, gotMin := decodeWatermarkState(b) + if gotMax != maxObs { + t.Errorf("maxObserved: got %d, want %d", gotMax, maxObs) + } + if gotMin != minPend { + t.Errorf("minPending: got %d, want %d", gotMin, minPend) + } +} + +// ---- buildStatement tests ---- + +func TestBuildStatement_BoundedPartition(t *testing.T) { + fn := &readChangeStreamFn{ + ChangeStreamName: "TestStream", + HeartbeatMillis: 1000, + } + active := makeWork("mytoken", 100, 200) + stmt := fn.buildStatement(active) + + if stmt.Params["partition_token"] != "mytoken" { + t.Errorf("partition_token = %v, want mytoken", stmt.Params["partition_token"]) + } + if stmt.Params["end_timestamp"] != active.EndTimestamp { + t.Errorf("end_timestamp should be set for bounded partition") + } + if stmt.Params["heartbeat_milliseconds"] != int64(1000) { + t.Errorf("heartbeat_milliseconds = %v, want 1000", stmt.Params["heartbeat_milliseconds"]) + } +} + +func TestBuildStatement_RootPartition_NullToken(t *testing.T) { + fn := &readChangeStreamFn{ + ChangeStreamName: "TestStream", + HeartbeatMillis: 1000, + } + active := makeWork("", 100, 0) // empty token = root + stmt := fn.buildStatement(active) + + if stmt.Params["partition_token"] != nil { + t.Errorf("partition_token should be nil for root partition, got %v", stmt.Params["partition_token"]) + } + if stmt.Params["end_timestamp"] != nil { + t.Errorf("end_timestamp should be nil for unbounded partition, got %v", stmt.Params["end_timestamp"]) + } +} + +func TestBuildStatement_ContainsStreamName(t *testing.T) { + fn := &readChangeStreamFn{ + ChangeStreamName: "MyChangeStream", + HeartbeatMillis: 500, + } + active := makeWork("tok1", 100, 200) + stmt := fn.buildStatement(active) + if !strings.Contains(stmt.SQL, "READ_MyChangeStream") { + t.Errorf("SQL does not contain READ_MyChangeStream: %s", stmt.SQL) + } +} + +// ---- Stream name validation tests ---- + +func TestValidStreamName(t *testing.T) { + valid := []string{"MyStream", "my_stream", "_private", "A", "Stream123"} + for _, name := range valid { + if !validStreamName.MatchString(name) { + t.Errorf("expected %q to be valid", name) + } + } + invalid := []string{"", "123abc", "my-stream", "my stream", "a.b", "SELECT 1; --"} + for _, name := range invalid { + if validStreamName.MatchString(name) { + t.Errorf("expected %q to be invalid", name) + } + } +} + +// ---- GetProgress done tracking tests ---- + +func TestGetProgress_TracksDone(t *testing.T) { + rest := PartitionQueueRestriction{ + Pending: []PartitionWork{makeWork("tok1", 100, 200)}, + Bounded: true, + } + tr := newPartitionQueueTracker(rest) + + // Initially no done work. + done, _ := tr.GetProgress() + if done != 0 { + t.Errorf("initial done should be 0, got %v", done) + } + + // Claim a timestamp 50s ahead of start (100 → 150). + tr.TryClaim(PartitionTimestampClaim{Timestamp: time.Unix(150, 0)}) + done, _ = tr.GetProgress() + wantNanos := float64(50 * time.Second.Nanoseconds()) + if done != wantNanos { + t.Errorf("done after claim = %v, want %v", done, wantNanos) + } +} + +// ---- isTransientError tests ---- + +func TestIsTransientError(t *testing.T) { + if !isTransientError(status.Error(codes.Unavailable, "service unavailable")) { + t.Error("UNAVAILABLE should be transient") + } + if !isTransientError(status.Error(codes.Aborted, "transaction aborted")) { + t.Error("ABORTED should be transient") + } + if isTransientError(status.Error(codes.NotFound, "not found")) { + t.Error("NOT_FOUND should not be transient") + } + if isTransientError(status.Error(codes.PermissionDenied, "denied")) { + t.Error("PERMISSION_DENIED should not be transient") + } + if isTransientError(fmt.Errorf("plain error")) { + t.Error("non-gRPC error should not be transient") + } +} diff --git a/sdks/go/pkg/beam/io/spannerio/changestream_restriction.go b/sdks/go/pkg/beam/io/spannerio/changestream_restriction.go new file mode 100644 index 000000000000..b385c0b04283 --- /dev/null +++ b/sdks/go/pkg/beam/io/spannerio/changestream_restriction.go @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" +) + +func init() { + runtime.RegisterType(reflect.TypeOf((*partitionQueueTracker)(nil))) + runtime.RegisterType(reflect.TypeOf((*PartitionQueueRestriction)(nil)).Elem()) + runtime.RegisterFunction(encodePartitionQueueRestriction) + runtime.RegisterFunction(decodePartitionQueueRestriction) + coder.RegisterCoder( + reflect.TypeOf((*PartitionQueueRestriction)(nil)).Elem(), + encodePartitionQueueRestriction, + decodePartitionQueueRestriction, + ) +} + +// PartitionWork represents a single change stream partition to be read. +// The partition's unprocessed time range is [StartTimestamp, EndTimestamp). +// After each checkpoint, StartTimestamp advances to the last claimed timestamp +// so that work resumes from where it left off. +type PartitionWork struct { + // Token is the Spanner change stream partition token. + // An empty token represents the root query that initialises the partition tree. + Token string `json:"token"` + StartTimestamp time.Time `json:"start"` + // EndTimestamp is the exclusive end of the time range. + // A zero value means this partition is unbounded (read indefinitely). + EndTimestamp time.Time `json:"end"` +} + +func (p PartitionWork) bounded() bool { + return !p.EndTimestamp.IsZero() +} + +// PartitionQueueRestriction is the SDF restriction for ReadChangeStream. +// It encodes the entire work queue of change stream partitions. The Beam +// runner serialises this restriction on every checkpoint, providing +// durable, in-memory coordination with no external state required. +// +// The first element of Pending is the partition currently being processed. +// After a checkpoint, StartTimestamp on Pending[0] reflects the last +// committed position, so the residual resumes exactly from there. +type PartitionQueueRestriction struct { + // Pending is the ordered list of partitions to read. + // Pending[0] is the active partition (currently being processed). + Pending []PartitionWork `json:"pending"` + // Bounded is true when all partitions have a finite EndTimestamp. + // When false, IsBounded returns false and the SDF runs indefinitely. + Bounded bool `json:"bounded"` +} + +func encodePartitionQueueRestriction(r PartitionQueueRestriction) ([]byte, error) { + return json.Marshal(r) +} + +func decodePartitionQueueRestriction(b []byte) (PartitionQueueRestriction, error) { + var r PartitionQueueRestriction + return r, json.Unmarshal(b, &r) +} + +// PartitionTimestampClaim is the position type used with partitionQueueTracker.TryClaim. +// Each call claims progress up to Timestamp within the active partition and +// optionally enqueues child partitions discovered from a PartitionStartRecord. +type PartitionTimestampClaim struct { + // Timestamp is the latest timestamp that has been processed in the active partition. + // The tracker advances the partition's StartTimestamp to this value so that + // subsequent checkpoints resume from here. + Timestamp time.Time + // ChildPartitions are new partition tokens returned by a PartitionStartRecord. + // They will be appended to the pending queue (the current partition's token + // acts as their implicit parent, so no deduplication is needed in the + // newer Spanner change stream API). + ChildPartitions []PartitionWork + // PartitionDone signals that the active partition has received a + // PartitionEndRecord and should be removed from the queue. + PartitionDone bool +} + +// partitionQueueTracker is the RTracker implementation for PartitionQueueRestriction. +// It is thread-safe when wrapped in sdf.LockRTracker (required for all SDFs). +type partitionQueueTracker struct { + rest PartitionQueueRestriction + stopped bool + err error + completedNanos float64 // nanoseconds of time range claimed (for GetProgress done tracking) +} + +func newPartitionQueueTracker(rest PartitionQueueRestriction) *partitionQueueTracker { + return &partitionQueueTracker{rest: rest} +} + +// TryClaim advances the restriction based on the given claim. +// +// - It advances Pending[0].StartTimestamp to claim.Timestamp so that a +// subsequent checkpoint or split produces a residual that resumes from +// exactly this position. +// - If claim.ChildPartitions is non-empty, the new partitions are appended +// to the queue (with owner-based deduplication from the caller if needed). +// - If claim.PartitionDone is true, the active partition is removed from the queue. +// - Returns false (stop processing) if the claim timestamp is beyond the +// restriction's end time, or if the tracker has already stopped. +func (t *partitionQueueTracker) TryClaim(pos any) bool { + if t.stopped { + t.err = errors.New("TryClaim called after tracker stopped") + return false + } + + claim, ok := pos.(PartitionTimestampClaim) + if !ok { + t.stopped = true + t.err = fmt.Errorf("TryClaim: expected PartitionTimestampClaim, got %T", pos) + return false + } + + if len(t.rest.Pending) == 0 { + t.stopped = true + return false + } + + active := &t.rest.Pending[0] + + // Check whether the claim exceeds the partition's end time. + if active.bounded() && !claim.Timestamp.IsZero() && claim.Timestamp.After(active.EndTimestamp) { + t.stopped = true + return false + } + + // Advance the start timestamp to the claimed position. + // This ensures that a checkpoint produces a residual that resumes from here. + if !claim.Timestamp.IsZero() && claim.Timestamp.After(active.StartTimestamp) { + t.completedNanos += float64(claim.Timestamp.Sub(active.StartTimestamp).Nanoseconds()) + active.StartTimestamp = claim.Timestamp + } + + // Enqueue child partitions discovered from a PartitionStartRecord. + // Each child partition only appears in one parent's PartitionStartRecord in the + // newer Spanner change stream API, so no deduplication is required. + t.rest.Pending = append(t.rest.Pending, claim.ChildPartitions...) + + if claim.PartitionDone { + // Remove the completed partition from the front of the queue. + t.rest.Pending = t.rest.Pending[1:] + } + + if len(t.rest.Pending) == 0 && t.rest.Bounded { + t.stopped = true + return false + } + + return true +} + +// TrySplit splits the restriction into primary and residual. +// +// For fraction == 0 (self-checkpoint): the primary takes no remaining work +// (IsDone immediately returns true) and the residual continues from the +// current position (Pending[0].StartTimestamp) through the rest of the queue. +// +// For fraction > 0: the primary keeps only the active partition and the +// residual gets everything else. This aggressive split enables the runner +// to recursively distribute individual partitions across workers, achieving +// per-partition parallelism. +func (t *partitionQueueTracker) TrySplit(fraction float64) (primary, residual any, err error) { + if t.stopped || len(t.rest.Pending) == 0 { + return t.rest, nil, nil + } + + if fraction == 0 { + // Self-checkpoint: primary is done, residual takes all remaining work. + residualRest := PartitionQueueRestriction{ + Pending: make([]PartitionWork, len(t.rest.Pending)), + Bounded: t.rest.Bounded, + } + copy(residualRest.Pending, t.rest.Pending) + + // Primary becomes empty (done). + primaryRest := PartitionQueueRestriction{ + Pending: nil, + Bounded: t.rest.Bounded, + } + t.rest = primaryRest + t.stopped = true + return primaryRest, residualRest, nil + } + + // Fraction > 0: split off everything except the active partition. + // The runner can recursively split the residual further, eventually + // producing one restriction per partition for maximum parallelism. + if len(t.rest.Pending) <= 1 { + return t.rest, nil, nil + } + + residualPending := make([]PartitionWork, len(t.rest.Pending)-1) + copy(residualPending, t.rest.Pending[1:]) + + residualRest := PartitionQueueRestriction{ + Pending: residualPending, + Bounded: t.rest.Bounded, + } + + t.rest.Pending = t.rest.Pending[:1] + return t.rest, residualRest, nil +} + +// GetProgress returns estimated done and remaining work. +// For bounded restrictions, progress is measured in time (nanoseconds). +// For unbounded restrictions, it is measured in partition count. +func (t *partitionQueueTracker) GetProgress() (done, remaining float64) { + if t.rest.Bounded { + return t.timeBased() + } + return t.partitionBased() +} + +func (t *partitionQueueTracker) timeBased() (done, remaining float64) { + for _, p := range t.rest.Pending { + if p.bounded() { + remaining += float64(p.EndTimestamp.Sub(p.StartTimestamp).Nanoseconds()) + } + } + return t.completedNanos, remaining +} + +func (t *partitionQueueTracker) partitionBased() (done, remaining float64) { + return t.completedNanos, float64(len(t.rest.Pending)) +} + +// IsDone returns true when there is no pending work and the restriction is bounded. +// For unbounded restrictions it always returns false, since the SDF runs indefinitely. +func (t *partitionQueueTracker) IsDone() bool { + if !t.rest.Bounded { + return t.stopped && len(t.rest.Pending) == 0 + } + return len(t.rest.Pending) == 0 +} + +// GetError returns any error that caused the tracker to stop. +func (t *partitionQueueTracker) GetError() error { + return t.err +} + +// GetRestriction returns the current restriction. +func (t *partitionQueueTracker) GetRestriction() any { + return t.rest +} + +// IsBounded returns whether the restriction is bounded. +func (t *partitionQueueTracker) IsBounded() bool { + return t.rest.Bounded +} + +// Ensure partitionQueueTracker implements sdf.BoundableRTracker. +var _ sdf.BoundableRTracker = (*partitionQueueTracker)(nil) diff --git a/sdks/go/pkg/beam/io/spannerio/changestream_types.go b/sdks/go/pkg/beam/io/spannerio/changestream_types.go new file mode 100644 index 000000000000..4de3308123eb --- /dev/null +++ b/sdks/go/pkg/beam/io/spannerio/changestream_types.go @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "time" + + "cloud.google.com/go/spanner/apiv1/spannerpb" +) + +// ModType describes the kind of mutation in a DataChangeRecord. +type ModType int32 + +const ( + ModTypeUnspecified ModType = 0 + ModTypeInsert ModType = 10 + ModTypeUpdate ModType = 20 + ModTypeDelete ModType = 30 +) + +// ValueCaptureType describes what values are captured in a DataChangeRecord. +type ValueCaptureType int32 + +const ( + ValueCaptureTypeUnspecified ValueCaptureType = 0 + ValueCaptureTypeOldAndNewValues ValueCaptureType = 10 + ValueCaptureTypeNewValues ValueCaptureType = 20 + ValueCaptureTypeNewRow ValueCaptureType = 30 + ValueCaptureTypeNewRowAndOldValues ValueCaptureType = 40 +) + +// ColumnMetadata describes the type and key membership of a column involved in a change. +type ColumnMetadata struct { + // Name is the column name. + Name string + // TypeCode is the Spanner type code of the column (e.g. STRING, INT64, BOOL). + TypeCode spannerpb.TypeCode + // ArrayElementTypeCode is the element type code when TypeCode is ARRAY. + // It is TypeCode_TYPE_CODE_UNSPECIFIED for non-array columns. + ArrayElementTypeCode spannerpb.TypeCode + // IsPrimaryKey indicates whether this column is part of the primary key. + IsPrimaryKey bool + // OrdinalPosition is the column's position in the table schema. + OrdinalPosition int64 +} + +// ModValue holds a single column value from a modification. +type ModValue struct { + // ColumnName is the name of the modified column. + ColumnName string + // Value is the column value encoded as a JSON string using the Spanner JSON + // value format. Use encoding/json to decode into a concrete Go type. + Value string +} + +// Mod describes all changes to one watched table row. +type Mod struct { + // Keys holds the primary key column values for the modified row. + Keys []*ModValue + // OldValues holds the pre-image of modified columns (empty for INSERTs or + // when the capture type does not include old values). + OldValues []*ModValue + // NewValues holds the post-image of modified columns (empty for DELETEs). + NewValues []*ModValue +} + +// DataChangeRecord describes one or more mutations to a Spanner table, committed +// atomically within a single transaction. This is the primary output element of +// ReadChangeStream. +type DataChangeRecord struct { + // PartitionToken identifies the change stream partition that produced this record. + PartitionToken string + // CommitTimestamp is when the mutations were committed in Spanner. + CommitTimestamp time.Time + // RecordSequence is monotonically increasing within a partition for a given + // commit timestamp and transaction. Use it to order records within a transaction. + RecordSequence string + // ServerTransactionID is a globally unique identifier for the transaction. + ServerTransactionID string + // IsLastRecordInTransactionInPartition indicates whether this is the final + // DataChangeRecord for this transaction in this partition. + IsLastRecordInTransactionInPartition bool + // Table is the name of the modified table. + Table string + // ColumnMetadata describes the columns present in Mods. + ColumnMetadata []*ColumnMetadata + // Mods lists the individual row modifications. + Mods []*Mod + // ModType is the type of operation (INSERT, UPDATE, DELETE). + ModType ModType + // ValueCaptureType indicates what values are present in Mods. + ValueCaptureType ValueCaptureType + // NumberOfRecordsInTransaction is the total number of DataChangeRecords for + // this transaction across all partitions. + NumberOfRecordsInTransaction int32 + // NumberOfPartitionsInTransaction is the total number of partitions that + // produced records for this transaction. + NumberOfPartitionsInTransaction int32 + // TransactionTag is the application-defined tag for the transaction. + TransactionTag string + // IsSystemTransaction indicates this is a Spanner-internal transaction (e.g., TTL). + IsSystemTransaction bool +} diff --git a/sdks/go/pkg/beam/io/spannerio/changestream_watermark.go b/sdks/go/pkg/beam/io/spannerio/changestream_watermark.go new file mode 100644 index 000000000000..d3b5f96e2d9f --- /dev/null +++ b/sdks/go/pkg/beam/io/spannerio/changestream_watermark.go @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "encoding/binary" + "math" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" +) + +// changeStreamWatermarkEstimator tracks the watermark for the ReadChangeStream SDF. +// It advances as DataChangeRecords and HeartbeatRecords are observed, but is held +// back to the minimum start timestamp of all pending (unprocessed) partitions in +// the restriction. This prevents the watermark from advancing past partitions that +// have not yet been read, which would cause their records to arrive as late data. +// +// With aggressive TrySplit, most restrictions contain a single partition, so +// minPending == the partition's current start timestamp in practice. +type changeStreamWatermarkEstimator struct { + // maxObserved holds the highest observed timestamp as Unix nanoseconds. + // math.MinInt64 means "not yet advanced". + maxObserved int64 + // minPending holds the minimum start timestamp of all pending partitions + // as Unix nanoseconds. math.MaxInt64 means "no pending partitions". + minPending int64 +} + +// watermarkStateSize is the byte size of the serialised watermark estimator state +// (two int64 values encoded as little-endian). +const watermarkStateSize = 16 + +// CurrentWatermark returns the current watermark time. It is the minimum of the +// highest observed commit/heartbeat timestamp and the earliest pending partition +// start timestamp. This ensures the watermark never advances past data that has +// not yet been emitted. +func (e *changeStreamWatermarkEstimator) CurrentWatermark() time.Time { + wm := e.maxObserved + if e.minPending < wm { + wm = e.minPending + } + if wm == math.MinInt64 || wm == math.MaxInt64 { + return time.Time{} // zero time = beginning of time + } + return time.Unix(0, wm) +} + +// ObserveTimestamp advances the max-observed watermark to t if t is later. +func (e *changeStreamWatermarkEstimator) ObserveTimestamp(t time.Time) { + ns := t.UnixNano() + if ns > e.maxObserved { + e.maxObserved = ns + } +} + +// SetMinPending updates the minimum pending partition start timestamp. This is +// called at the start of each ProcessElement invocation with the minimum start +// timestamp from the restriction's pending partition queue. +func (e *changeStreamWatermarkEstimator) SetMinPending(t time.Time) { + e.minPending = t.UnixNano() +} + +// encodeWatermarkState serialises the watermark estimator state to bytes. +func encodeWatermarkState(maxObserved, minPending int64) []byte { + b := make([]byte, watermarkStateSize) + binary.LittleEndian.PutUint64(b[0:8], uint64(maxObserved)) + binary.LittleEndian.PutUint64(b[8:16], uint64(minPending)) + return b +} + +// decodeWatermarkState deserialises the watermark estimator state from bytes. +// Returns (maxObserved, minPending). +func decodeWatermarkState(b []byte) (int64, int64) { + if len(b) < watermarkStateSize { + return math.MinInt64, math.MaxInt64 + } + maxObserved := int64(binary.LittleEndian.Uint64(b[0:8])) + minPending := int64(binary.LittleEndian.Uint64(b[8:16])) + return maxObserved, minPending +} + +// Compile-time check: changeStreamWatermarkEstimator implements sdf.TimestampObservingEstimator. +var _ sdf.TimestampObservingEstimator = (*changeStreamWatermarkEstimator)(nil) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 9d04f08de4fb..785f8c997376 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -320,6 +320,9 @@ var dataflowFilters = []string{ "TestBigQueryIO_BasicWriteQueryRead", // Can't handle the test spanner container or access a local spanner. "TestSpannerIO.*", + // Change stream tests use a local Spanner emulator container that is + // unreachable from remote Dataflow workers. + "TestReadChangeStream_.*", // Dataflow does not drain jobs by itself. "TestDrain", // Timers diff --git a/sdks/go/test/integration/io/spannerio/changestream_test.go b/sdks/go/test/integration/io/spannerio/changestream_test.go new file mode 100644 index 000000000000..0c5311cbf13e --- /dev/null +++ b/sdks/go/test/integration/io/spannerio/changestream_test.go @@ -0,0 +1,426 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerio + +import ( + "context" + "os" + "testing" + "time" + + "cloud.google.com/go/spanner" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/spannerio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/v2/go/test/integration" +) + +func init() { + register.DoFn2x0[spannerio.DataChangeRecord, func(spannerio.DataChangeRecord)](&filterModTypeFn{}) + register.Emitter1[spannerio.DataChangeRecord]() +} + +// filterModTypeFn is a DoFn that passes through only DataChangeRecords whose +// ModType matches the configured value. It is used in integration tests to +// count records by mutation kind (INSERT, UPDATE, DELETE). +type filterModTypeFn struct { + ModType int32 +} + +func (fn *filterModTypeFn) ProcessElement(rec spannerio.DataChangeRecord, emit func(spannerio.DataChangeRecord)) { + if int32(rec.ModType) == fn.ModType { + emit(rec) + } +} + +func filterByModType(s beam.Scope, records beam.PCollection, modType spannerio.ModType) beam.PCollection { + return beam.ParDo(s, &filterModTypeFn{ModType: int32(modType)}, records) +} + +// TestReadChangeStream_BoundedRead verifies that ReadChangeStream emits +// DataChangeRecords for mutations committed within the read time window. +func TestReadChangeStream_BoundedRead(t *testing.T) { + integration.CheckFilters(t) + + const db = "projects/test-project/instances/test-instance/databases/test-db-changestream" + ctx := context.Background() + + endpoint := setUpTestContainer(ctx, t) + os.Setenv("SPANNER_EMULATOR_HOST", endpoint) + + client := NewClient(ctx, t, endpoint, db) + instanceAdminClient := NewInstanceAdminClient(ctx, t, endpoint) + adminClient := NewAdminClient(ctx, t, endpoint) + + CreateInstance(ctx, t, instanceAdminClient, db) + t.Cleanup(func() { DeleteInstance(ctx, t, instanceAdminClient, db) }) + + CreateDatabase(ctx, t, adminClient, db) + t.Cleanup(func() { DropDatabase(ctx, t, adminClient, db) }) + + CreateTable(ctx, t, adminClient, db, []string{ + `CREATE TABLE Items ( + Id INT64 NOT NULL, + Name STRING(100), + ) PRIMARY KEY (Id)`, + `CREATE CHANGE STREAM ItemStream FOR Items`, + }) + + // startTime must be after the change stream was created; use the current + // time (which is after CreateTable returns) with a small buffer. + startTime := time.Now().UTC() + + // Write two rows in separate transactions so Spanner generates two distinct + // DataChangeRecords (one per transaction). A single Apply with both rows + // produces only one DataChangeRecord because they share the same transaction. + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Items", []string{"Id", "Name"}, []interface{}{int64(1), "alpha"}), + }); err != nil { + t.Fatalf("Apply first mutation: %v", err) + } + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Items", []string{"Id", "Name"}, []interface{}{int64(2), "beta"}), + }); err != nil { + t.Fatalf("Apply second mutation: %v", err) + } + + // Give the emulator a moment to commit, then set the end time far enough + // ahead that the change stream query completes naturally. + endTime := time.Now().UTC().Add(10 * time.Second) + + p := beam.NewPipeline() + s := p.Root() + + records := spannerio.ReadChangeStream( + s, db, "ItemStream", + startTime, endTime, + 1000, // heartbeat every 1 second + spannerio.WithChangeStreamTestEndpoint(endpoint), + ) + + // We expect exactly 2 DataChangeRecords (one per mutation). + passert.Count(s, records, "change stream records", 2) + + ptest.RunAndValidate(t, p) +} + +// TestReadChangeStream_FiltersByTable verifies that mutations to a table not +// covered by the change stream are not emitted. +func TestReadChangeStream_FiltersByTable(t *testing.T) { + integration.CheckFilters(t) + + const db = "projects/test-project/instances/test-instance/databases/test-db-cs-filter" + ctx := context.Background() + + endpoint := setUpTestContainer(ctx, t) + os.Setenv("SPANNER_EMULATOR_HOST", endpoint) + + client := NewClient(ctx, t, endpoint, db) + instanceAdminClient := NewInstanceAdminClient(ctx, t, endpoint) + adminClient := NewAdminClient(ctx, t, endpoint) + + CreateInstance(ctx, t, instanceAdminClient, db) + t.Cleanup(func() { DeleteInstance(ctx, t, instanceAdminClient, db) }) + + CreateDatabase(ctx, t, adminClient, db) + t.Cleanup(func() { DropDatabase(ctx, t, adminClient, db) }) + + CreateTable(ctx, t, adminClient, db, []string{ + `CREATE TABLE Watched ( + Id INT64 NOT NULL, + Val STRING(50), + ) PRIMARY KEY (Id)`, + `CREATE TABLE Unwatched ( + Id INT64 NOT NULL, + Val STRING(50), + ) PRIMARY KEY (Id)`, + // Stream covers only Watched. + `CREATE CHANGE STREAM WatchedStream FOR Watched`, + }) + + // startTime must be after the change stream was created. + startTime := time.Now().UTC() + + // Write to both tables. + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Watched", []string{"Id", "Val"}, []interface{}{int64(1), "watched"}), + spanner.Insert("Unwatched", []string{"Id", "Val"}, []interface{}{int64(1), "ignored"}), + }); err != nil { + t.Fatalf("Apply mutations: %v", err) + } + + endTime := time.Now().UTC().Add(10 * time.Second) + + p := beam.NewPipeline() + s := p.Root() + + records := spannerio.ReadChangeStream( + s, db, "WatchedStream", + startTime, endTime, + 1000, + spannerio.WithChangeStreamTestEndpoint(endpoint), + ) + + // Only the Watched mutation should appear. + passert.Count(s, records, "watched records only", 1) + + ptest.RunAndValidate(t, p) +} + +// TestReadChangeStream_Empty verifies that a bounded read with no mutations +// in the window produces an empty PCollection. +func TestReadChangeStream_Empty(t *testing.T) { + integration.CheckFilters(t) + + const db = "projects/test-project/instances/test-instance/databases/test-db-cs-empty" + ctx := context.Background() + + endpoint := setUpTestContainer(ctx, t) + os.Setenv("SPANNER_EMULATOR_HOST", endpoint) + + instanceAdminClient := NewInstanceAdminClient(ctx, t, endpoint) + adminClient := NewAdminClient(ctx, t, endpoint) + + CreateInstance(ctx, t, instanceAdminClient, db) + t.Cleanup(func() { DeleteInstance(ctx, t, instanceAdminClient, db) }) + + CreateDatabase(ctx, t, adminClient, db) + t.Cleanup(func() { DropDatabase(ctx, t, adminClient, db) }) + + CreateTable(ctx, t, adminClient, db, []string{ + `CREATE TABLE EmptyTable ( + Id INT64 NOT NULL, + ) PRIMARY KEY (Id)`, + `CREATE CHANGE STREAM EmptyStream FOR EmptyTable`, + }) + + // Use a short time window starting just after the change stream is created. + // No mutations are written, so the pipeline should produce 0 records. + startTime := time.Now().UTC() + endTime := startTime.Add(3 * time.Second) + + p := beam.NewPipeline() + s := p.Root() + + records := spannerio.ReadChangeStream( + s, db, "EmptyStream", + startTime, endTime, + 1000, + spannerio.WithChangeStreamTestEndpoint(endpoint), + ) + + passert.Count(s, records, "empty stream", 0) + + ptest.RunAndValidate(t, p) +} + +// TestReadChangeStream_UpdateRecord verifies that UPDATE mutations produce +// DataChangeRecords with ModTypeUpdate, and that both the INSERT and UPDATE +// records for the same row are captured. +func TestReadChangeStream_UpdateRecord(t *testing.T) { + integration.CheckFilters(t) + + const db = "projects/test-project/instances/test-instance/databases/test-db-cs-update" + ctx := context.Background() + + endpoint := setUpTestContainer(ctx, t) + os.Setenv("SPANNER_EMULATOR_HOST", endpoint) + + client := NewClient(ctx, t, endpoint, db) + instanceAdminClient := NewInstanceAdminClient(ctx, t, endpoint) + adminClient := NewAdminClient(ctx, t, endpoint) + + CreateInstance(ctx, t, instanceAdminClient, db) + t.Cleanup(func() { DeleteInstance(ctx, t, instanceAdminClient, db) }) + + CreateDatabase(ctx, t, adminClient, db) + t.Cleanup(func() { DropDatabase(ctx, t, adminClient, db) }) + + CreateTable(ctx, t, adminClient, db, []string{ + `CREATE TABLE Items ( + Id INT64 NOT NULL, + Name STRING(100), + ) PRIMARY KEY (Id)`, + `CREATE CHANGE STREAM ItemStream FOR Items`, + }) + + startTime := time.Now().UTC() + + // Insert a row, then update it in a separate transaction so Spanner + // generates two distinct DataChangeRecords. + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Items", []string{"Id", "Name"}, []interface{}{int64(1), "original"}), + }); err != nil { + t.Fatalf("Apply insert: %v", err) + } + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Update("Items", []string{"Id", "Name"}, []interface{}{int64(1), "updated"}), + }); err != nil { + t.Fatalf("Apply update: %v", err) + } + + endTime := time.Now().UTC().Add(10 * time.Second) + + p := beam.NewPipeline() + s := p.Root() + + records := spannerio.ReadChangeStream( + s, db, "ItemStream", + startTime, endTime, + 1000, + spannerio.WithChangeStreamTestEndpoint(endpoint), + ) + + // 1 INSERT + 1 UPDATE = 2 total records. + passert.Count(s, records, "total records", 2) + + // Exactly one record should carry ModTypeUpdate. + updates := filterByModType(s, records, spannerio.ModTypeUpdate) + passert.Count(s, updates, "update records", 1) + + ptest.RunAndValidate(t, p) +} + +// TestReadChangeStream_DeleteRecord verifies that DELETE mutations produce +// DataChangeRecords with ModTypeDelete, and that both the INSERT and DELETE +// records for the same row are captured. +func TestReadChangeStream_DeleteRecord(t *testing.T) { + integration.CheckFilters(t) + + const db = "projects/test-project/instances/test-instance/databases/test-db-cs-delete" + ctx := context.Background() + + endpoint := setUpTestContainer(ctx, t) + os.Setenv("SPANNER_EMULATOR_HOST", endpoint) + + client := NewClient(ctx, t, endpoint, db) + instanceAdminClient := NewInstanceAdminClient(ctx, t, endpoint) + adminClient := NewAdminClient(ctx, t, endpoint) + + CreateInstance(ctx, t, instanceAdminClient, db) + t.Cleanup(func() { DeleteInstance(ctx, t, instanceAdminClient, db) }) + + CreateDatabase(ctx, t, adminClient, db) + t.Cleanup(func() { DropDatabase(ctx, t, adminClient, db) }) + + CreateTable(ctx, t, adminClient, db, []string{ + `CREATE TABLE Items ( + Id INT64 NOT NULL, + Name STRING(100), + ) PRIMARY KEY (Id)`, + `CREATE CHANGE STREAM ItemStream FOR Items`, + }) + + startTime := time.Now().UTC() + + // Insert a row, then delete it in a separate transaction. + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Items", []string{"Id", "Name"}, []interface{}{int64(1), "to-be-deleted"}), + }); err != nil { + t.Fatalf("Apply insert: %v", err) + } + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Delete("Items", spanner.Key{int64(1)}), + }); err != nil { + t.Fatalf("Apply delete: %v", err) + } + + endTime := time.Now().UTC().Add(10 * time.Second) + + p := beam.NewPipeline() + s := p.Root() + + records := spannerio.ReadChangeStream( + s, db, "ItemStream", + startTime, endTime, + 1000, + spannerio.WithChangeStreamTestEndpoint(endpoint), + ) + + // 1 INSERT + 1 DELETE = 2 total records. + passert.Count(s, records, "total records", 2) + + // Exactly one record should carry ModTypeDelete. + deletes := filterByModType(s, records, spannerio.ModTypeDelete) + passert.Count(s, deletes, "delete records", 1) + + ptest.RunAndValidate(t, p) +} + +// TestReadChangeStream_MultipleTableStream verifies that a change stream +// covering more than one table captures mutations from all watched tables. +func TestReadChangeStream_MultipleTableStream(t *testing.T) { + integration.CheckFilters(t) + + const db = "projects/test-project/instances/test-instance/databases/test-db-cs-multi" + ctx := context.Background() + + endpoint := setUpTestContainer(ctx, t) + os.Setenv("SPANNER_EMULATOR_HOST", endpoint) + + client := NewClient(ctx, t, endpoint, db) + instanceAdminClient := NewInstanceAdminClient(ctx, t, endpoint) + adminClient := NewAdminClient(ctx, t, endpoint) + + CreateInstance(ctx, t, instanceAdminClient, db) + t.Cleanup(func() { DeleteInstance(ctx, t, instanceAdminClient, db) }) + + CreateDatabase(ctx, t, adminClient, db) + t.Cleanup(func() { DropDatabase(ctx, t, adminClient, db) }) + + CreateTable(ctx, t, adminClient, db, []string{ + `CREATE TABLE TableA ( + Id INT64 NOT NULL, + Val STRING(50), + ) PRIMARY KEY (Id)`, + `CREATE TABLE TableB ( + Id INT64 NOT NULL, + Val STRING(50), + ) PRIMARY KEY (Id)`, + // Stream covers both tables. + `CREATE CHANGE STREAM MultiStream FOR TableA, TableB`, + }) + + startTime := time.Now().UTC() + + // Write one row to each table in the same transaction so we get two + // DataChangeRecords (one per table) from the stream. + if _, err := client.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("TableA", []string{"Id", "Val"}, []interface{}{int64(1), "a"}), + spanner.Insert("TableB", []string{"Id", "Val"}, []interface{}{int64(1), "b"}), + }); err != nil { + t.Fatalf("Apply mutations: %v", err) + } + + endTime := time.Now().UTC().Add(10 * time.Second) + + p := beam.NewPipeline() + s := p.Root() + + records := spannerio.ReadChangeStream( + s, db, "MultiStream", + startTime, endTime, + 1000, + spannerio.WithChangeStreamTestEndpoint(endpoint), + ) + + // One DataChangeRecord per table = 2 total. + passert.Count(s, records, "multi-table records", 2) + + ptest.RunAndValidate(t, p) +}