-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Bug]: Apache Beam Managed Iceberg IO Does not work with Spark Serialization #37994
Description
What happened?
Scenario 1: Use Managed Iceberg IO to write some data to Iceberg Tables using Spark Runner. Spark StorageLevel used - MEMORY_ONLY
This works perfectly fine.
Scenario 2: Use Managed Iceberg IO to write some data to Iceberg Tables using Spark Runner. Spark StorageLevel used - MEMORY_ONLY_SER or MEMORY_AND_DISK.
This does not work when run with Spark Runner. I see Null Pointer exception while writing to Iceberg Tables. If i change the sink to a Avro Sink or a file sink, the job works fine but with Managed Iceberg IO, it is failing with null pointer exception.
ANALYSIS
From my analysis, what i could find was, there is a beam tuple tag called "WRITTEN_ROWS_TAG" whose coder, Spark's Java Serializer / Kyro Serializer is not able to find. Although, i did see that when the tag output is being created, a coder is being set to the tag in WriteUngroupedRowsToFiles.java but that coder does not get propagated further, as it is not assigned to CoderMap in line 486 in TransformTranslator.java, and this happens because in the same TransformTranslator in line 472, we skip Output tags which do not have a further consumer in skipUnconsumedOutputs function.
IMPACT - I cannot scale my Spark Job as Managed Iceberg IO only allows MEMORY_ONLY storage level in spark job. For me to be able to process large amount of data, i need to be able to compress data and store it to disk .i.e, i should be able to use MEMORY_AND_DISK and MEMORY_ONLY_SER storage levels, which this bug is not allowing me to do. Requesting support from beam community here.
Exception that i see -
java.lang.NullPointerException at org.apache.beam.runners.spark.translation.ValueAndCoderLazySerializable.writeCommon(ValueAndCoderLazySerializable.java:98) at org.apache.beam.runners.spark.translation.ValueAndCoderLazySerializable.writeObject(ValueAndCoderLazySerializable.java:134) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1027) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:729) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1618) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1528) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1592) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389) at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) at org.apache.spark.rdd.RDD.iterator(RDD.scala:326) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
Sample Code that is failing -
package com.example.integration;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
public class IcebergTest {
public IcebergTest() {
}
public static void main(String[] args) {
Map<String, Object> tableConfig = new HashMap<>();
Map<String, Object> catalogProps = new HashMap<>();
catalogProps.put("type", "hadoop");
catalogProps.put("warehouse", "gs://test-bucket/test/warehouse/");
Map<String, Object> configProps = new HashMap<>();
configProps.put("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
configProps.put("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
tableConfig.put("catalog_name", "catalog");
tableConfig.put("table", "default.table1");
tableConfig.put("catalog_properties", catalogProps);
tableConfig.put("config_properties", configProps);
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);
options.setJobName("sample iceberg job");
options.as(SparkPipelineOptions.class).setSparkMaster("local[*]");
options.as(SparkPipelineOptions.class).setStorageLevel("MEMORY_ONLY_SER");
Pipeline pipeline = Pipeline.create(options);
myPojo pojo = new myPojo("abc");
myPojo pojo2 = new myPojo("def");
PCollection<myPojo> myPojoCollection = pipeline.apply(Create.of(pojo, pojo2));
PCollection<Row> rows = myPojoCollection.apply("toRow", Convert.toRows());
rows.apply(Managed.write(Managed.ICEBERG).withConfig(tableConfig)).getSinglePCollection();
pipeline.run().waitUntilFinish();
}
@Data
@DefaultSchema(JavaBeanSchema.class)
@AllArgsConstructor
@NoArgsConstructor
public static class myPojo {
@Nullable
private String name;
}
}
Now, the same above code, just change the storage level to MEMORY_ONLY and try it out. It will work, but with storage levels that require serialisation, it does not work.
Tech Stack -
Java 11
Spark 3.5.0
Iceberg 1.9.2
Beam 2.68.0
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner