Skip to content

Commit 299a302

Browse files
authored
Merge pull request #1006: [proxima-beam-core] bump beam to 2.70.0
2 parents a445a4b + 1e1a504 commit 299a302

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,11 @@
8686
import org.apache.beam.sdk.util.ByteBuddyUtils;
8787
import org.apache.beam.sdk.util.CoderUtils;
8888
import org.apache.beam.sdk.values.KV;
89+
import org.apache.beam.sdk.values.OutputBuilder;
8990
import org.apache.beam.sdk.values.Row;
9091
import org.apache.beam.sdk.values.TimestampedValue;
9192
import org.apache.beam.sdk.values.TupleTag;
93+
import org.apache.beam.sdk.values.WindowedValues;
9294
import org.checkerframework.checker.nullness.qual.Nullable;
9395
import org.joda.time.Duration;
9496
import org.joda.time.Instant;
@@ -339,6 +341,12 @@ private static <T> OutputReceiver<T> singleOutput(
339341
TupleTag<T> mainTag) {
340342

341343
return new OutputReceiver<T>() {
344+
@Override
345+
public OutputBuilder<T> builder(T value) {
346+
return WindowedValues.<T>builder()
347+
.setReceiver(output -> multiOutput.get(mainTag).builder(value).output());
348+
}
349+
342350
@Override
343351
public void output(T output) {
344352
if (elem == null) {
@@ -734,6 +742,11 @@ public TimestampedOutputReceiver(OutputReceiver<T> parentReceiver, Instant times
734742
this.elementTimestamp = timestamp;
735743
}
736744

745+
@Override
746+
public OutputBuilder<T> builder(T value) {
747+
return parentReceiver.builder(value);
748+
}
749+
737750
@Override
738751
public void output(T output) {
739752
outputWithTimestamp(output, elementTimestamp);

beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/BatchLogReadTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@
6464
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
6565
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
6666
import org.apache.beam.sdk.transforms.windowing.Window;
67+
import org.apache.beam.sdk.values.OutputBuilder;
6768
import org.apache.beam.sdk.values.PBegin;
6869
import org.apache.beam.sdk.values.PCollection;
6970
import org.apache.beam.sdk.values.TypeDescriptors;
71+
import org.apache.beam.sdk.values.WindowedValues;
7072
import org.junit.Ignore;
7173
import org.junit.Test;
7274
import org.junit.runner.RunWith;
@@ -239,6 +241,12 @@ public void testInitialSplitting() {
239241
readFn.splitRestriction(
240242
list,
241243
new OutputReceiver<>() {
244+
@Override
245+
public OutputBuilder<PartitionList> builder(PartitionList value) {
246+
return WindowedValues.<PartitionList>builder()
247+
.setReceiver(output -> this.output(output.getValue()));
248+
}
249+
242250
@Override
243251
public void output(PartitionList part) {
244252
output.add(part);

buildSrc/src/main/groovy/cz.o2.proxima.java-conventions.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ plugins {
2424
java.sourceCompatibility = JavaVersion.VERSION_11
2525

2626
def auto_service_version = "1.1.1"
27-
def beam_version = "2.68.0"
27+
def beam_version = "2.70.0"
2828
def errorprone_version = "2.18.0"
2929
def flink_version = "1.18.0"
3030
def grpc_version = "1.71.0"

0 commit comments

Comments
 (0)