diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java index 4c7805f65589..9933204d6f6c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java @@ -54,6 +54,8 @@ class BigtableServiceFactory implements Serializable { private static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS = "bigtable_enable_client_side_metrics"; + private static final String BIGTABLE_ENABLE_SKIP_LARGE_ROWS = "bigtable_enable_skip_large_rows"; + @AutoValue abstract static class ConfigId implements Serializable { @@ -133,7 +135,10 @@ BigtableServiceEntry getServiceForReading( BigtableDataSettings.enableBuiltinMetrics(); } - BigtableService service = new BigtableServiceImpl(settings); + boolean skipLargeRows = + ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_SKIP_LARGE_ROWS); + + BigtableService service = new BigtableServiceImpl(settings, skipLargeRows); entry = BigtableServiceEntry.create(configId, service); entries.put(configId.id(), entry); refCounts.put(configId.id(), new AtomicInteger(1)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 3451bbf450c7..f7aa50a7437f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -104,10 +104,17 @@ class BigtableServiceImpl implements BigtableService { private static final double WATERMARK_PERCENTAGE = .1; private static final long MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024; // 100MB + private final boolean skipLargeRows; + BigtableServiceImpl(BigtableDataSettings settings) throws IOException { + this(settings, false); + } + + BigtableServiceImpl(BigtableDataSettings settings, boolean skipLargeRows) throws IOException { this.projectId = settings.getProjectId(); this.instanceId = settings.getInstanceId(); this.client = BigtableDataClient.create(settings); + this.skipLargeRows = skipLargeRows; LOG.info("Started Bigtable service with settings {}", settings); } @@ -142,6 +149,7 @@ static class BigtableReaderImpl implements Reader { private ServerStream stream; private boolean exhausted; + private final boolean skipLargeRows; @VisibleForTesting BigtableReaderImpl( @@ -150,13 +158,15 @@ static class BigtableReaderImpl implements Reader { String instanceId, String tableId, List ranges, - @Nullable RowFilter rowFilter) { + @Nullable RowFilter rowFilter, + boolean skipLargeRows) { this.client = client; this.projectId = projectId; this.instanceId = instanceId; this.tableId = tableId; this.ranges = ranges; this.rowFilter = rowFilter; + this.skipLargeRows = skipLargeRows; } @Override @@ -173,11 +183,19 @@ public boolean start() throws IOException { if (rowFilter != null) { query.filter(Filters.FILTERS.fromProto(rowFilter)); } + try { - stream = - client - .readRowsCallable(new BigtableRowProtoAdapter()) - .call(query, GrpcCallContext.createDefault()); + if (skipLargeRows) { + stream = + client + .skipLargeRowsCallable(new BigtableRowProtoAdapter()) + .call(query, GrpcCallContext.createDefault()); + } else { + stream = + client + .readRowsCallable(new BigtableRowProtoAdapter()) + .call(query, GrpcCallContext.createDefault()); + } results = stream.iterator(); serviceCallMetric.call("ok"); } catch (StatusRuntimeException e) { @@ -667,7 +685,8 @@ public Reader createReader(BigtableSource source) throws IOException { instanceId, source.getTableId().get(), source.getRanges(), - source.getRowFilter()); + source.getRowFilter(), + skipLargeRows); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index 4ce9ad10b2c0..b54dc2f09db0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -27,6 +27,7 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.TableId; import java.io.IOException; import java.util.Date; import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils; @@ -34,6 +35,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -148,6 +150,43 @@ public void testE2EBigtableSegmentRead() { checkLineageSourceMetric(r, tableId); } + @Test + public void testE2EBigtableReadWithSkippingLargeRows() { + // Write a few rows first + int numRows = 20; + int numLargeRows = 3; + // Each mutation can't exceed 100 MB. Break it down to 3 columns + String value = StringUtils.repeat("v", 90 * 1000 * 1000); + for (int i = 0; i < numLargeRows; i++) { + for (int j = 0; j < 3; j++) { + client.mutateRow( + RowMutation.create(TableId.of(tableId), "large_row-" + i) + .setCell(COLUMN_FAMILY_NAME, "q" + i, value)); + } + } + + for (int i = 0; i < numRows - numLargeRows; i++) { + client.mutateRow( + RowMutation.create(TableId.of(tableId), "row-" + i) + .setCell(COLUMN_FAMILY_NAME, "q", "value")); + } + + ExperimentalOptions.addExperiment( + options.as(ExperimentalOptions.class), "bigtable_enable_skip_large_rows"); + + Pipeline p = Pipeline.create(options); + PCollection count = + p.apply( + BigtableIO.read() + .withProjectId(project) + .withInstanceId(options.getInstanceId()) + .withTableId(tableId)) + .apply(Count.globally()); + PAssert.thatSingleton(count).isEqualTo((long) numRows - numLargeRows); + PipelineResult r = p.run(); + checkLineageSourceMetric(r, tableId); + } + private void checkLineageSourceMetric(PipelineResult r, String tableId) { // TODO(https://github.com/apache/beam/issues/32071) test malformed, // when pipeline.run() is non-blocking, the metrics are not available by the time of query diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java index 0564493ca1a8..37d7d89021d7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java @@ -179,7 +179,8 @@ public void testRead() throws IOException { bigtableDataSettings.getInstanceId(), mockBigtableSource.getTableId().get(), mockBigtableSource.getRanges(), - null); + null, + false); underTest.start(); Assert.assertEquals(expectedRow, underTest.getCurrentRow());