Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public void clear() {
combinedHold = null;
}

@Override
public void setKnownEmpty() {
combinedHold = null;
}

@Override
public Instant read() {
return combinedHold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ boolean hasNoActiveWindows() {
return activeWindows.getActiveAndNewWindows().isEmpty();
}

@VisibleForTesting
TriggerStateMachineRunner<W> getTriggerRunner() {
return triggerRunner;
}

@VisibleForTesting
ReduceFnContextFactory<K, InputT, OutputT, W> getContextFactory() {
return contextFactory;
}

private Set<W> windowsThatAreOpen(Collection<W> windows) {
Set<W> result = new HashSet<>();
for (W window : windows) {
Expand Down Expand Up @@ -603,6 +613,14 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);

if (triggerRunner.isNew(directContext.state())) {
// Blindly clear state to ensure Windmill doesn't do unnecessary reads.
reduceFn.clearState(renamedContext);
paneInfoTracker.clear(directContext.state());
watermarkHold.setKnownEmpty(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
}

nonEmptyPanes.recordContent(renamedContext.state());
scheduleGarbageCollectionTimer(directContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,23 @@ public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
context.state().access(EXTRA_HOLD_TAG).clear();
}

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Permit marking the watermark holds as empty locally, without necessarily clearing them in
* the backend.
*/
public void setKnownEmpty(ReduceFn<?, ?, ?, W>.Context context) {
WindowTracing.debug(
"WatermarkHold.setKnownEmpty: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
context.key(),
context.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
context.state().access(elementHoldTag).setKnownEmpty();
context.state().access(EXTRA_HOLD_TAG).setKnownEmpty();
}

/** Return the current data hold, or null if none. Does not clear. For debugging only. */
public @Nullable Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
return context.state().access(elementHoldTag).read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.core.triggers;

import java.util.BitSet;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. */
public class FinishedTriggersBitSet implements FinishedTriggers {
Expand Down Expand Up @@ -60,4 +61,17 @@ public void clearRecursively(ExecutableTriggerStateMachine trigger) {
public FinishedTriggersBitSet copy() {
return new FinishedTriggersBitSet((BitSet) bitSet.clone());
}

@Override
public boolean equals(@Nullable Object obj) {
if (!(obj instanceof FinishedTriggersBitSet)) {
return false;
}
return bitSet.equals(((FinishedTriggersBitSet) obj).bitSet);
}

@Override
public int hashCode() {
return bitSet.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ public boolean isClosed(StateAccessor<?> state) {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
}

/** Return true if the window is new (no trigger state has ever been persisted). */
public boolean isNew(StateAccessor<?> state) {
return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() == null;
}

@VisibleForTesting
public BitSet getFinishedBits(StateAccessor<?> state) {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet();
}

public void prefetchIsClosed(StateAccessor<?> state) {
if (isFinishedSetNeeded()) {
state.access(FINISHED_BITS_TAG).readLater();
Expand Down Expand Up @@ -187,15 +197,31 @@ private void persistFinishedSet(
}

ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
@Nullable BitSet currentBits = finishedSetState.read();
if (currentBits == null || !isEquivalent(currentBits, modifiedFinishedSet.getBitSet())) {
if (modifiedFinishedSet.getBitSet().isEmpty()) {
finishedSetState.clear();
// To distinguish between a "new" window and a "seen but empty" window, we
// write a BitSet with a sentinel bit at an index that will never be used by
// any trigger in the tree.
BitSet sentinel = new BitSet();
sentinel.set(rootTrigger.getFirstIndexAfterSubtree());
finishedSetState.write(sentinel);
} else {
finishedSetState.write(modifiedFinishedSet.getBitSet());
}
}
}

private boolean isEquivalent(BitSet currentBits, BitSet modifiedBits) {
if (currentBits.equals(modifiedBits)) {
return true;
}
// They might only differ by the sentinel bit.
BitSet currentBitsCopy = (BitSet) currentBits.clone();
currentBitsCopy.clear(rootTrigger.getFirstIndexAfterSubtree());
return currentBitsCopy.equals(modifiedBits);
}

/** Clear the finished bits. */
public void clearFinished(StateAccessor<?> state) {
clearFinishedBits(state.access(FINISHED_BITS_TAG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
Expand Down Expand Up @@ -2343,4 +2345,41 @@ public interface TestOptions extends PipelineOptions {

void setValue(int value);
}

@Test
public void testNewWindowOptimization() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(AfterPane.elementCountAtLeast(2))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);

ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(strategy);

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));

// 1. First element for a new window.
tester.injectElements(TimestampedValue.of(1, new Instant(1)));

// Verify sentinel bit is written.
BitSet bitSet =
tester
.createRunner()
.getTriggerRunner()
.getFinishedBits(
tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state());

// We expect the sentinel bit to be set.
assertTrue("Sentinel bit should be set", bitSet.get(1));
// And trigger not finished.
assertFalse("Trigger should not be finished", bitSet.get(0));

// 2. Second element for the same window.
// We want to verify it doesn't clear the first element.
tester.injectElements(TimestampedValue.of(2, new Instant(2)));

// Extract output.
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class WindmillWatermarkHold extends WindmillState implements WatermarkHoldState {

// The encoded size of an Instant.
private static final int ENCODED_SIZE = 8;

Expand All @@ -46,6 +47,7 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol
private final String stateFamily;

private boolean cleared = false;
private boolean knownEmpty = false;
/**
* If non-{@literal null}, the known current hold value, or absent if we know there are no output
* watermark holds. If {@literal null}, the current hold value could depend on holds in Windmill
Expand Down Expand Up @@ -77,6 +79,13 @@ public void clear() {
localAdditions = null;
}

@Override
public void setKnownEmpty() {
cachedValue = Optional.absent();
localAdditions = null;
knownEmpty = true;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public WindmillWatermarkHold readLater() {
Expand Down Expand Up @@ -133,11 +142,13 @@ public Future<Windmill.WorkItemCommitRequest> persist(

Future<Windmill.WorkItemCommitRequest> result;

if (!cleared && localAdditions == null) {
if (!knownEmpty && !cleared && localAdditions == null) {
// No changes, so no need to update Windmill and no need to cache any value.
return Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
}

final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size();

if (cleared && localAdditions == null) {
// Just clearing the persisted state; blind delete
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Expand Down Expand Up @@ -166,15 +177,17 @@ public Future<Windmill.WorkItemCommitRequest> persist(
} else if (!cleared && localAdditions != null) {
// Otherwise, we need to combine the local additions with the already persisted data
result = combineWithPersisted();
} else if (knownEmpty) {
result = Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
} else {
throw new IllegalStateException("Unreachable condition");
}

final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size();
return Futures.lazyTransform(
result,
result1 -> {
cleared = false;
knownEmpty = false;
localAdditions = null;
if (cachedValue != null) {
cache.put(namespace, stateKey, WindmillWatermarkHold.this, estimatedByteSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3037,6 +3037,43 @@ public void testWatermarkClearBeforeRead() throws Exception {
Mockito.verifyNoMoreInteractions(mockReader);
}

@Test
public void testWatermarkSetKnownEmptyBeforeRead() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);

WatermarkHoldState bag = underTest.state(NAMESPACE, addr);

bag.setKnownEmpty();
assertThat(bag.read(), Matchers.nullValue());

bag.add(new Instant(300));
assertThat(bag.read(), Matchers.equalTo(new Instant(300)));

// Shouldn't need to read from windmill because the value is already available.
Mockito.verifyNoMoreInteractions(mockReader);
}

@Test
public void testWatermarkSetKnownEmptyPersist() throws Exception {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);

WatermarkHoldState bag = underTest.state(NAMESPACE, addr);

bag.add(new Instant(1000));
bag.setKnownEmpty();

Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);

// Should be a no-op, no reset, no adds.
assertEquals(0, commitBuilder.getWatermarkHoldsCount());

Mockito.verifyNoMoreInteractions(mockReader);
}

@Test
public void testWatermarkPersistEarliest() throws Exception {
StateTag<WatermarkHoldState> addr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,14 @@ public interface WatermarkHoldState extends GroupingState<Instant, Instant> {

@Override
WatermarkHoldState readLater();

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Permit marking the state as empty locally, without necessarily clearing it in the backend.
*
* <p>This may be used by runners to optimize out unnecessary state reads.
*/
@Internal
default void setKnownEmpty() {}
}
Loading