diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 3a69d1177f4a..1239c5c19c15 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -19,9 +19,13 @@ import static java.util.stream.Collectors.toList; import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CDC_TIME_INCREMENT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; @@ -537,6 +541,8 @@ public static ReadChangeStream readChangeStream() { .setRpcPriority(DEFAULT_RPC_PRIORITY) .setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT) .setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT) + .setCdcTimeIncrement(DEFAULT_CDC_TIME_INCREMENT) + .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS) .build(); } @@ -1761,6 +1767,10 @@ public abstract static class ReadChangeStream abstract @Nullable ValueProvider getPlainText(); + abstract Duration getCdcTimeIncrement(); + + abstract Integer getHeartbeatMillis(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1790,6 +1800,17 @@ abstract static class Builder { abstract Builder setPlainText(ValueProvider plainText); + abstract Builder setCdcTimeIncrement(Duration cdcTimeIncrement); + + /** + * Heartbeat interval for all change stream queries. + * + *

Be careful when changing this interval, as it needs to be less than the checkpointing + * interval in Dataflow. Otherwise, if there are no records within checkpoint intervals, the + * consuming of a change stream query might get stuck. + */ + abstract Builder setHeartbeatMillis(Integer heartbeatMillis); + abstract ReadChangeStream build(); } @@ -1912,6 +1933,13 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) { return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText)); } + public ReadChangeStream withLowLatency() { + return toBuilder() + .setCdcTimeIncrement(DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT) + .setHeartbeatMillis(DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS) + .build(); + } + @Override public PCollection expand(PBegin input) { checkArgument( @@ -2018,13 +2046,19 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE); final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate); + final long heartbeatMillis = getHeartbeatMillis().longValue(); + final InitializeDoFn initializeDoFn = - new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp); + new InitializeDoFn( + daoFactory, mapperFactory, startTimestamp, endTimestamp, heartbeatMillis); final DetectNewPartitionsDoFn detectNewPartitionsDoFn = new DetectNewPartitionsDoFn( daoFactory, mapperFactory, actionFactory, cacheFactory, metrics); + final Duration cdcTimeIncrement = getCdcTimeIncrement(); + final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = - new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); + new ReadChangeStreamPartitionDoFn( + daoFactory, mapperFactory, actionFactory, metrics, cdcTimeIncrement); final PostProcessingMetricsDoFn postProcessingMetricsDoFn = new PostProcessingMetricsDoFn(metrics); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java index db09adb0f27e..191f120da196 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java @@ -49,6 +49,14 @@ public class ChangeStreamsConstants { */ public static final Timestamp DEFAULT_INCLUSIVE_END_AT = MAX_INCLUSIVE_END_AT; + public static final Duration DEFAULT_CDC_TIME_INCREMENT = Duration.standardMinutes(2); + + public static final int DEFAULT_HEARTBEAT_MILLIS = 2000; + + public static final Duration DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT = Duration.standardSeconds(1); + + public static final int DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS = 100; + /** The default priority for a change stream query is {@link RpcPriority#HIGH}. */ public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java index cd84168b23f7..5b11650dbb45 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java @@ -174,6 +174,7 @@ public synchronized PartitionEventRecordAction partitionEventRecordAction( * @param partitionEventRecordAction action class to process {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord}s * @param metrics metrics gathering class + * @param cdcTimeIncrement the duration added to current time for the end timestamp * @return single instance of the {@link QueryChangeStreamAction} */ public synchronized QueryChangeStreamAction queryChangeStreamAction( @@ -188,7 +189,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, ChangeStreamMetrics metrics, - boolean isMutableChangeStream) { + boolean isMutableChangeStream, + Duration cdcTimeIncrement) { if (queryChangeStreamActionInstance == null) { queryChangeStreamActionInstance = new QueryChangeStreamAction( @@ -203,7 +205,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( partitionEndRecordAction, partitionEventRecordAction, metrics, - isMutableChangeStream); + isMutableChangeStream, + cdcTimeIncrement); } return queryChangeStreamActionInstance; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 0937e896fbf1..2a6ceb311c75 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -96,6 +96,6 @@ public Optional run( watermarkEstimator.setWatermark(timestampInstant); LOG.debug("[{}] Heartbeat record action completed successfully", token); - return Optional.empty(); + return Optional.of(ProcessContinuation.resume()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 69e89e74a38b..cfc797445d37 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -91,6 +91,7 @@ public class QueryChangeStreamAction { private final PartitionEventRecordAction partitionEventRecordAction; private final ChangeStreamMetrics metrics; private final boolean isMutableChangeStream; + private final Duration cdcTimeIncrement; /** * Constructs an action class for performing a change stream query for a given partition. @@ -109,6 +110,7 @@ public class QueryChangeStreamAction { * @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s * @param metrics metrics gathering class * @param isMutableChangeStream whether the change stream is mutable or not + * @param cdcTimeIncrement duration to add to current time */ QueryChangeStreamAction( ChangeStreamDao changeStreamDao, @@ -122,7 +124,8 @@ public class QueryChangeStreamAction { PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, ChangeStreamMetrics metrics, - boolean isMutableChangeStream) { + boolean isMutableChangeStream, + Duration cdcTimeIncrement) { this.changeStreamDao = changeStreamDao; this.partitionMetadataDao = partitionMetadataDao; this.changeStreamRecordMapper = changeStreamRecordMapper; @@ -135,6 +138,7 @@ public class QueryChangeStreamAction { this.partitionEventRecordAction = partitionEventRecordAction; this.metrics = metrics; this.isMutableChangeStream = isMutableChangeStream; + this.cdcTimeIncrement = cdcTimeIncrement; } /** @@ -387,12 +391,17 @@ private boolean isTimestampOutOfRange(SpannerException e) { && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE); } - // Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if - // users want to run the connector forever. If the end timestamp is reached, we will resume + // Return (now + config duration) as the end timestamp for reading change + // streams. This is only + // used if + // users want to run the connector forever. If the end timestamp is reached, we + // will resume // processing from that timestamp on a subsequent DoFn execution. private Timestamp getNextReadChangeStreamEndTimestamp() { final Timestamp current = Timestamp.now(); - return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); + long seconds = current.getSeconds() + cdcTimeIncrement.getStandardSeconds(); + int nanos = current.getNanos() + (int) ((cdcTimeIncrement.getMillis() % 1000) * 1_000_000); + return Timestamp.ofTimeSecondsAndNanos(seconds, nanos); } // For Mutable Change Stream bounded queries, update the query end timestamp to be within 2 diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java index 60eb96ca3387..4191f2d93594 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java @@ -36,11 +36,7 @@ public class InitializeDoFn extends DoFn implements S private static final long serialVersionUID = -8921188388649003102L; - /** Heartbeat interval for all change stream queries will be of 2 seconds. */ - // Be careful when changing this interval, as it needs to be less than the checkpointing interval - // in Dataflow. Otherwise, if there are no records within checkpoint intervals, the consuming of - // a change stream query might get stuck. - private static final long DEFAULT_HEARTBEAT_MILLIS = 2000; + private final long heartbeatMillis; private final DaoFactory daoFactory; private final MapperFactory mapperFactory; @@ -53,11 +49,13 @@ public InitializeDoFn( DaoFactory daoFactory, MapperFactory mapperFactory, com.google.cloud.Timestamp startTimestamp, - com.google.cloud.Timestamp endTimestamp) { + com.google.cloud.Timestamp endTimestamp, + long heartbeatMillis) { this.daoFactory = daoFactory; this.mapperFactory = mapperFactory; this.startTimestamp = startTimestamp; this.endTimestamp = endTimestamp; + this.heartbeatMillis = heartbeatMillis; } @ProcessElement @@ -88,7 +86,7 @@ private void createFakeParentPartition() { .setPartitionToken(InitialPartition.PARTITION_TOKEN) .setStartTimestamp(startTimestamp) .setEndTimestamp(endTimestamp) - .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS) + .setHeartbeatMillis(heartbeatMillis) .setState(State.CREATED) .setWatermark(startTimestamp) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index c3650b42761b..6be085d99a75 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -80,6 +80,8 @@ public class ReadChangeStreamPartitionDoFn extends DoFn throughputEstimator; + private final Duration cdcTimeIncrement; + private transient QueryChangeStreamAction queryChangeStreamAction; /** @@ -95,17 +97,20 @@ public class ReadChangeStreamPartitionDoFn extends DoFn(); } @@ -218,7 +223,8 @@ public void setup() { partitionEndRecordAction, partitionEventRecordAction, metrics, - isMutableChangeStream); + isMutableChangeStream, + cdcTimeIncrement); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java index 56d1825c8a18..39453956e290 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java @@ -72,7 +72,7 @@ public void testRestrictionClaimed() { interrupter, watermarkEstimator); - assertEquals(Optional.empty(), maybeContinuation); + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 26ab41dff878..e3d22a466ea3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -119,7 +120,8 @@ public void setUp() throws Exception { partitionEndRecordAction, partitionEventRecordAction, metrics, - false); + false, + org.joda.time.Duration.standardMinutes(2)); final Struct row = mock(Struct.class); partition = PartitionMetadata.newBuilder() @@ -935,7 +937,8 @@ public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() { partitionEndRecordAction, partitionEventRecordAction, metrics, - true); + true, + Duration.standardMinutes(2)); // Set endTimestamp to 60 minutes in the future Timestamp now = Timestamp.now(); @@ -983,7 +986,8 @@ public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() { partitionEndRecordAction, partitionEventRecordAction, metrics, - true); + true, + Duration.standardMinutes(2)); // Set endTimestamp to only 10 seconds in the future Timestamp now = Timestamp.now(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java index 9672e23b16d7..c3bee10f8e14 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java @@ -62,7 +62,8 @@ public void setUp() { daoFactory, mapperFactory, Timestamp.ofTimeMicroseconds(1L), - Timestamp.ofTimeMicroseconds(2L)); + Timestamp.ofTimeMicroseconds(2L), + 2000L); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 9e588de77a03..a4172ec05ed9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -103,7 +104,9 @@ public void setUp() { partitionEventRecordAction = mock(PartitionEventRecordAction.class); queryChangeStreamAction = mock(QueryChangeStreamAction.class); - doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); + doFn = + new ReadChangeStreamPartitionDoFn( + daoFactory, mapperFactory, actionFactory, metrics, Duration.standardMinutes(2)); doFn.setThroughputEstimator(throughputEstimator); partition = @@ -152,7 +155,8 @@ public void setUp() { eq(partitionEndRecordAction), eq(partitionEventRecordAction), eq(metrics), - anyBoolean())) + anyBoolean(), + eq(Duration.standardMinutes(2)))) .thenReturn(queryChangeStreamAction); doFn.setup();