Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CDC_TIME_INCREMENT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
Expand Down Expand Up @@ -537,6 +541,8 @@ public static ReadChangeStream readChangeStream() {
.setRpcPriority(DEFAULT_RPC_PRIORITY)
.setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT)
.setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT)
.setCdcTimeIncrement(DEFAULT_CDC_TIME_INCREMENT)
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
.build();
}

Expand Down Expand Up @@ -1761,6 +1767,10 @@ public abstract static class ReadChangeStream

abstract @Nullable ValueProvider<Boolean> getPlainText();

abstract Duration getCdcTimeIncrement();

abstract Integer getHeartbeatMillis();

abstract Builder toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1790,6 +1800,17 @@ abstract static class Builder {

abstract Builder setPlainText(ValueProvider<Boolean> plainText);

abstract Builder setCdcTimeIncrement(Duration cdcTimeIncrement);

/**
* Heartbeat interval for all change stream queries.
*
* <p>Be careful when changing this interval, as it needs to be less than the checkpointing
* interval in Dataflow. Otherwise, if there are no records within checkpoint intervals, the
* consuming of a change stream query might get stuck.
*/
abstract Builder setHeartbeatMillis(Integer heartbeatMillis);

abstract ReadChangeStream build();
}

Expand Down Expand Up @@ -1912,6 +1933,13 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

public ReadChangeStream withLowLatency() {
return toBuilder()
.setCdcTimeIncrement(DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT)
.setHeartbeatMillis(DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS)
.build();
}

@Override
public PCollection<DataChangeRecord> expand(PBegin input) {
checkArgument(
Expand Down Expand Up @@ -2018,13 +2046,19 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);

final long heartbeatMillis = getHeartbeatMillis().longValue();

final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
new InitializeDoFn(
daoFactory, mapperFactory, startTimestamp, endTimestamp, heartbeatMillis);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(
daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
final Duration cdcTimeIncrement = getCdcTimeIncrement();

final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
new ReadChangeStreamPartitionDoFn(
daoFactory, mapperFactory, actionFactory, metrics, cdcTimeIncrement);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public class ChangeStreamsConstants {
*/
public static final Timestamp DEFAULT_INCLUSIVE_END_AT = MAX_INCLUSIVE_END_AT;

public static final Duration DEFAULT_CDC_TIME_INCREMENT = Duration.standardMinutes(2);

public static final int DEFAULT_HEARTBEAT_MILLIS = 2000;

public static final Duration DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT = Duration.standardSeconds(1);

public static final int DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS = 100;

/** The default priority for a change stream query is {@link RpcPriority#HIGH}. */
public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public synchronized PartitionEventRecordAction partitionEventRecordAction(
* @param partitionEventRecordAction action class to process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord}s
* @param metrics metrics gathering class
* @param cdcTimeIncrement the duration added to current time for the end timestamp
* @return single instance of the {@link QueryChangeStreamAction}
*/
public synchronized QueryChangeStreamAction queryChangeStreamAction(
Expand All @@ -188,7 +189,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics,
boolean isMutableChangeStream) {
boolean isMutableChangeStream,
Duration cdcTimeIncrement) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
Expand All @@ -203,7 +205,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
isMutableChangeStream);
isMutableChangeStream,
cdcTimeIncrement);
}
return queryChangeStreamActionInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ public Optional<ProcessContinuation> run(
watermarkEstimator.setWatermark(timestampInstant);

LOG.debug("[{}] Heartbeat record action completed successfully", token);
return Optional.empty();
return Optional.of(ProcessContinuation.resume());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class QueryChangeStreamAction {
private final PartitionEventRecordAction partitionEventRecordAction;
private final ChangeStreamMetrics metrics;
private final boolean isMutableChangeStream;
private final Duration cdcTimeIncrement;

/**
* Constructs an action class for performing a change stream query for a given partition.
Expand All @@ -109,6 +110,7 @@ public class QueryChangeStreamAction {
* @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s
* @param metrics metrics gathering class
* @param isMutableChangeStream whether the change stream is mutable or not
* @param cdcTimeIncrement duration to add to current time
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
Expand All @@ -122,7 +124,8 @@ public class QueryChangeStreamAction {
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics,
boolean isMutableChangeStream) {
boolean isMutableChangeStream,
Duration cdcTimeIncrement) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
Expand All @@ -135,6 +138,7 @@ public class QueryChangeStreamAction {
this.partitionEventRecordAction = partitionEventRecordAction;
this.metrics = metrics;
this.isMutableChangeStream = isMutableChangeStream;
this.cdcTimeIncrement = cdcTimeIncrement;
}

/**
Expand Down Expand Up @@ -387,12 +391,17 @@ private boolean isTimestampOutOfRange(SpannerException e) {
&& e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
}

// Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
// users want to run the connector forever. If the end timestamp is reached, we will resume
// Return (now + config duration) as the end timestamp for reading change
// streams. This is only
// used if
// users want to run the connector forever. If the end timestamp is reached, we
// will resume
// processing from that timestamp on a subsequent DoFn execution.
private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
long seconds = current.getSeconds() + cdcTimeIncrement.getStandardSeconds();
int nanos = current.getNanos() + (int) ((cdcTimeIncrement.getMillis() % 1000) * 1_000_000);
return Timestamp.ofTimeSecondsAndNanos(seconds, nanos);
}

// For Mutable Change Stream bounded queries, update the query end timestamp to be within 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ public class InitializeDoFn extends DoFn<byte[], PartitionMetadata> implements S

private static final long serialVersionUID = -8921188388649003102L;

/** Heartbeat interval for all change stream queries will be of 2 seconds. */
// Be careful when changing this interval, as it needs to be less than the checkpointing interval
// in Dataflow. Otherwise, if there are no records within checkpoint intervals, the consuming of
// a change stream query might get stuck.
private static final long DEFAULT_HEARTBEAT_MILLIS = 2000;
private final long heartbeatMillis;

private final DaoFactory daoFactory;
private final MapperFactory mapperFactory;
Expand All @@ -53,11 +49,13 @@ public InitializeDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
com.google.cloud.Timestamp startTimestamp,
com.google.cloud.Timestamp endTimestamp) {
com.google.cloud.Timestamp endTimestamp,
long heartbeatMillis) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
this.heartbeatMillis = heartbeatMillis;
}

@ProcessElement
Expand Down Expand Up @@ -88,7 +86,7 @@ private void createFakeParentPartition() {
.setPartitionToken(InitialPartition.PARTITION_TOKEN)
.setStartTimestamp(startTimestamp)
.setEndTimestamp(endTimestamp)
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
.setHeartbeatMillis(heartbeatMillis)
.setState(State.CREATED)
.setWatermark(startTimestamp)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
*/
private ThroughputEstimator<DataChangeRecord> throughputEstimator;

private final Duration cdcTimeIncrement;

private transient QueryChangeStreamAction queryChangeStreamAction;

/**
Expand All @@ -95,17 +97,20 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
* @param mapperFactory the {@link MapperFactory} to construct {@link ChangeStreamRecordMapper}s
* @param actionFactory the {@link ActionFactory} to construct actions
* @param metrics the {@link ChangeStreamMetrics} to emit partition related metrics
* @param cdcTimeIncrement duration to be used for the next end timestamp
*/
public ReadChangeStreamPartitionDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
ActionFactory actionFactory,
ChangeStreamMetrics metrics) {
ChangeStreamMetrics metrics,
Duration cdcTimeIncrement) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.mapperFactory = mapperFactory;
this.metrics = metrics;
this.isMutableChangeStream = daoFactory.isMutableChangeStream();
this.cdcTimeIncrement = cdcTimeIncrement;
this.throughputEstimator = new NullThroughputEstimator<>();
}

Expand Down Expand Up @@ -218,7 +223,8 @@ public void setup() {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
isMutableChangeStream);
isMutableChangeStream,
cdcTimeIncrement);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testRestrictionClaimed() {
interrupter,
watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -119,7 +120,8 @@ public void setUp() throws Exception {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
false);
false,
org.joda.time.Duration.standardMinutes(2));
final Struct row = mock(Struct.class);
partition =
PartitionMetadata.newBuilder()
Expand Down Expand Up @@ -935,7 +937,8 @@ public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
true,
Duration.standardMinutes(2));

// Set endTimestamp to 60 minutes in the future
Timestamp now = Timestamp.now();
Expand Down Expand Up @@ -983,7 +986,8 @@ public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
true,
Duration.standardMinutes(2));

// Set endTimestamp to only 10 seconds in the future
Timestamp now = Timestamp.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void setUp() {
daoFactory,
mapperFactory,
Timestamp.ofTimeMicroseconds(1L),
Timestamp.ofTimeMicroseconds(2L));
Timestamp.ofTimeMicroseconds(2L),
2000L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -103,7 +104,9 @@ public void setUp() {
partitionEventRecordAction = mock(PartitionEventRecordAction.class);
queryChangeStreamAction = mock(QueryChangeStreamAction.class);

doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
doFn =
new ReadChangeStreamPartitionDoFn(
daoFactory, mapperFactory, actionFactory, metrics, Duration.standardMinutes(2));
doFn.setThroughputEstimator(throughputEstimator);

partition =
Expand Down Expand Up @@ -152,7 +155,8 @@ public void setUp() {
eq(partitionEndRecordAction),
eq(partitionEventRecordAction),
eq(metrics),
anyBoolean()))
anyBoolean(),
eq(Duration.standardMinutes(2))))
.thenReturn(queryChangeStreamAction);

doFn.setup();
Expand Down
Loading