diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 2f15f88b954e..bf1800679691 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -272,6 +272,11 @@ public void clear() { combinedHold = null; } + @Override + public void setKnownEmpty() { + combinedHold = null; + } + @Override public Instant read() { return combinedHold; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index b08bd42b0b22..7493bc78dce3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -274,6 +274,16 @@ boolean hasNoActiveWindows() { return activeWindows.getActiveAndNewWindows().isEmpty(); } + @VisibleForTesting + TriggerStateMachineRunner getTriggerRunner() { + return triggerRunner; + } + + @VisibleForTesting + ReduceFnContextFactory getContextFactory() { + return contextFactory; + } + private Set windowsThatAreOpen(Collection windows) { Set result = new HashSet<>(); for (W window : windows) { @@ -603,6 +613,14 @@ private void processElement(Map windowToMergeResult, WindowedValue contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); + if (triggerRunner.isNew(directContext.state())) { + // Blindly clear state to ensure Windmill doesn't do unnecessary reads. + reduceFn.clearState(renamedContext); + paneInfoTracker.clear(directContext.state()); + watermarkHold.setKnownEmpty(renamedContext); + nonEmptyPanes.clearPane(renamedContext.state()); + } + nonEmptyPanes.recordContent(renamedContext.state()); scheduleGarbageCollectionTimer(directContext); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 15ae8dfe5f1a..b9185ccfba3f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -466,6 +466,23 @@ public void clearHolds(ReduceFn.Context context) { context.state().access(EXTRA_HOLD_TAG).clear(); } + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Permit marking the watermark holds as empty locally, without necessarily clearing them in + * the backend. + */ + public void setKnownEmpty(ReduceFn.Context context) { + WindowTracing.debug( + "WatermarkHold.setKnownEmpty: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + context.key(), + context.window(), + timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + context.state().access(elementHoldTag).setKnownEmpty(); + context.state().access(EXTRA_HOLD_TAG).setKnownEmpty(); + } + /** Return the current data hold, or null if none. Does not clear. For debugging only. */ public @Nullable Instant getDataCurrent(ReduceFn.Context context) { return context.state().access(elementHoldTag).read(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java index 7eebb4474c6c..967e1ef43f08 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core.triggers; import java.util.BitSet; +import org.checkerframework.checker.nullness.qual.Nullable; /** A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. */ public class FinishedTriggersBitSet implements FinishedTriggers { @@ -60,4 +61,17 @@ public void clearRecursively(ExecutableTriggerStateMachine trigger) { public FinishedTriggersBitSet copy() { return new FinishedTriggersBitSet((BitSet) bitSet.clone()); } + + @Override + public boolean equals(@Nullable Object obj) { + if (!(obj instanceof FinishedTriggersBitSet)) { + return false; + } + return bitSet.equals(((FinishedTriggersBitSet) obj).bitSet); + } + + @Override + public int hashCode() { + return bitSet.hashCode(); + } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index cf29646ebaa3..362a4bebab38 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -99,6 +99,16 @@ public boolean isClosed(StateAccessor state) { return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); } + /** Return true if the window is new (no trigger state has ever been persisted). */ + public boolean isNew(StateAccessor state) { + return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() == null; + } + + @VisibleForTesting + public BitSet getFinishedBits(StateAccessor state) { + return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet(); + } + public void prefetchIsClosed(StateAccessor state) { if (isFinishedSetNeeded()) { state.access(FINISHED_BITS_TAG).readLater(); @@ -187,15 +197,31 @@ private void persistFinishedSet( } ValueState finishedSetState = state.access(FINISHED_BITS_TAG); - if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { + @Nullable BitSet currentBits = finishedSetState.read(); + if (currentBits == null || !isEquivalent(currentBits, modifiedFinishedSet.getBitSet())) { if (modifiedFinishedSet.getBitSet().isEmpty()) { - finishedSetState.clear(); + // To distinguish between a "new" window and a "seen but empty" window, we + // write a BitSet with a sentinel bit at an index that will never be used by + // any trigger in the tree. + BitSet sentinel = new BitSet(); + sentinel.set(rootTrigger.getFirstIndexAfterSubtree()); + finishedSetState.write(sentinel); } else { finishedSetState.write(modifiedFinishedSet.getBitSet()); } } } + private boolean isEquivalent(BitSet currentBits, BitSet modifiedBits) { + if (currentBits.equals(modifiedBits)) { + return true; + } + // They might only differ by the sentinel bit. + BitSet currentBitsCopy = (BitSet) currentBits.clone(); + currentBitsCopy.clear(rootTrigger.getFirstIndexAfterSubtree()); + return currentBitsCopy.equals(modifiedBits); + } + /** Clear the finished bits. */ public void clearFinished(StateAccessor state) { clearFinishedBits(state.access(FINISHED_BITS_TAG)); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 85f6573be23e..737c9d8aa4c4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -40,9 +40,11 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachine; @@ -2343,4 +2345,41 @@ public interface TestOptions extends PipelineOptions { void setValue(int value); } + + @Test + public void testNewWindowOptimization() throws Exception { + WindowingStrategy strategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger(AfterPane.elementCountAtLeast(2)) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); + + ReduceFnTester, IntervalWindow> tester = + ReduceFnTester.nonCombining(strategy); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + // 1. First element for a new window. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); + + // Verify sentinel bit is written. + BitSet bitSet = + tester + .createRunner() + .getTriggerRunner() + .getFinishedBits( + tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state()); + + // We expect the sentinel bit to be set. + assertTrue("Sentinel bit should be set", bitSet.get(1)); + // And trigger not finished. + assertFalse("Trigger should not be finished", bitSet.get(0)); + + // 2. Second element for the same window. + // We want to verify it doesn't clear the first element. + tester.injectElements(TimestampedValue.of(2, new Instant(2))); + + // Extract output. + List>> output = tester.extractOutput(); + assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 613d87c127b7..04b39d27cd47 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -37,6 +37,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class WindmillWatermarkHold extends WindmillState implements WatermarkHoldState { + // The encoded size of an Instant. private static final int ENCODED_SIZE = 8; @@ -46,6 +47,7 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol private final String stateFamily; private boolean cleared = false; + private boolean knownEmpty = false; /** * If non-{@literal null}, the known current hold value, or absent if we know there are no output * watermark holds. If {@literal null}, the current hold value could depend on holds in Windmill @@ -77,6 +79,13 @@ public void clear() { localAdditions = null; } + @Override + public void setKnownEmpty() { + cachedValue = Optional.absent(); + localAdditions = null; + knownEmpty = true; + } + @Override @SuppressWarnings("FutureReturnValueIgnored") public WindmillWatermarkHold readLater() { @@ -133,11 +142,13 @@ public Future persist( Future result; - if (!cleared && localAdditions == null) { + if (!knownEmpty && !cleared && localAdditions == null) { // No changes, so no need to update Windmill and no need to cache any value. return Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial()); } + final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size(); + if (cleared && localAdditions == null) { // Just clearing the persisted state; blind delete Windmill.WorkItemCommitRequest.Builder commitBuilder = @@ -166,15 +177,17 @@ public Future persist( } else if (!cleared && localAdditions != null) { // Otherwise, we need to combine the local additions with the already persisted data result = combineWithPersisted(); + } else if (knownEmpty) { + result = Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial()); } else { throw new IllegalStateException("Unreachable condition"); } - final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size(); return Futures.lazyTransform( result, result1 -> { cleared = false; + knownEmpty = false; localAdditions = null; if (cachedValue != null) { cache.put(namespace, stateKey, WindmillWatermarkHold.this, estimatedByteSize); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index 7a06d3a29493..d368c52e8743 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3037,6 +3037,43 @@ public void testWatermarkClearBeforeRead() throws Exception { Mockito.verifyNoMoreInteractions(mockReader); } + @Test + public void testWatermarkSetKnownEmptyBeforeRead() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState bag = underTest.state(NAMESPACE, addr); + + bag.setKnownEmpty(); + assertThat(bag.read(), Matchers.nullValue()); + + bag.add(new Instant(300)); + assertThat(bag.read(), Matchers.equalTo(new Instant(300))); + + // Shouldn't need to read from windmill because the value is already available. + Mockito.verifyNoMoreInteractions(mockReader); + } + + @Test + public void testWatermarkSetKnownEmptyPersist() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState bag = underTest.state(NAMESPACE, addr); + + bag.add(new Instant(1000)); + bag.setKnownEmpty(); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTest.persist(commitBuilder); + + // Should be a no-op, no reset, no adds. + assertEquals(0, commitBuilder.getWatermarkHoldsCount()); + + Mockito.verifyNoMoreInteractions(mockReader); + } + @Test public void testWatermarkPersistEarliest() throws Exception { StateTag addr = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java index 6d4183da101f..f8b09bcf03a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java @@ -38,4 +38,14 @@ public interface WatermarkHoldState extends GroupingState { @Override WatermarkHoldState readLater(); + + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Permit marking the state as empty locally, without necessarily clearing it in the backend. + * + *

This may be used by runners to optimize out unnecessary state reads. + */ + @Internal + default void setKnownEmpty() {} }