Spark: Support writing shredded variant in Iceberg-Spark#14297
Spark: Support writing shredded variant in Iceberg-Spark#14297aihuaxu wants to merge 15 commits intoapache:mainfrom
Conversation
16b7a09 to
dc4f72e
Compare
97851f0 to
b87e999
Compare
|
@amogh-jahagirdar @Fokko @huaxingao Can you help take a look at this PR and if we have better approach for this? |
|
cc @RussellSpitzer, @pvary and @rdblue Seems it's better to have the implementation with new File Format proposal but want to check if this is acceptable approach as an interim solution or you see a better alternative. |
parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
Outdated
Show resolved
Hide resolved
|
@aihuaxu: Don't we want to do the same but instead of wrapping the Would this be prohibitively complex? |
|
In Spark DSv2, planning/validation happens on the driver. For shredded variant, we don’t know the shredded schema at planning time. We have to inspect some records to derive it. Doing a read on the driver during Because of that, the current proposed Spark approach is: put the logical variant in the writer factory, on the executor, buffer the first N rows, infer the shredded schema from data, then initialize the concrete writer and flush the buffer. I believe this PR follow the same approach, which seems like a practical solution to me given DSV2's constraints. |
|
Thanks for the explanation, @huaxingao! I see several possible workarounds for the DataWriterFactory serialization issue, but I have some more fundamental concerns about the overall approach. Even if we accept that the written data should dictate the shredding logic, Spark’s implementation—while dependent on input order—is at least somewhat stable. It drops rarely used fields, handles inconsistent types, and limits the number of columns. |
|
Thanks @huaxingao and @pvary for reviewing, and thanks to Huaxin for explaining how the writer works in Spark. Regarding the concern about unstable schemas, Spark's approach makes sense:
We could implement similar heuristics. Additionally, making the shredded schema configurable would allow users to choose which fields to shred at write time based on their read patterns. For this POC, I'd like any feedback on whether there are any significant high-level design options to consider first and if this approach is acceptable. This seems hacky. I may have missed big picture on how the writers work across Spark + Iceberg + Parquet and we may have better way. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This PR caught my eye, as I've implemented the equivalent in DuckDB: duckdb/duckdb#19336 The PR description doesn't give much away, but I think the approach is similar to the proposed (interim) solution here: buffer the first rowgroup, infer the shredded schema from this, then finalize the file schema and start writing data. We've opted to create a We've also added a copy option to force the shredded schema, for debugging purposes and for power users. As for DECIMAL, it's kind of a special case in the shredding inference. We only shred on a DECIMAL type if all the decimal values we've seen for a column/field have the same width+scale, if any decimal value differs, DECIMAL won't be considered anymore when determining the shredded type of the column/field |
|
This PR is super exciting! Regarding the heuristics - I'd like to propose adding table properties as hints for variant shredding. |
That is correct.
I'm still trying to improve the heuristics to use the most common one as shredding type rather than the first one and probably cap the number of shredded fields, etc. but it doesn't need 100% consistent type to be shredded.
Yeah. I think that makes sense for advanced user to determine the shredded schema since they may know the read pattern.
Why is DECIMAL special here? If we determine DECIMAL4 to be shredded type, then we may shred as DECIMAL4 or not shred if they cannot fit in DECIMAL4, right? |
Yeah. I'm also thinking of that too. Will address that separately. Basically based on read pattern, the user can specify the shredding schema. |
gkpanda4
left a comment
There was a problem hiding this comment.
When processing JSON objects containing null field values (e.g., {"field": null}), the variant shredding creates schema columns for these null fields instead of omitting them entirely. This would cause schema bloat.
Adding a null check in ParquetVariantUtil.java:386 in the object() method should fix it.
2e81d79 to
7e1b608
Compare
I addressed this null value check in VariantShreddingAnalyzer.java instead. If it's NULL, then we will not add the shredded field. |
7c805f6 to
67dbe97
Compare
|
Hi folks, thank you for your patience. Thanks @aihuaxu for doing all the work for this. The feedback and comments from everyone here was really helpful to make the necessary fixes. I made the following changes after merging the main branch.
Happy to discuss any of the changes. Please have a look. |
I didn't have time to review the full PR yet, but I had the same discussion with @Guosmilesmile on Slack, that I don't like the change in the I have started with a similar example, but based on @rdblue's comments we removed the properties and opted for the schemas only solution. |
|
Thanks for the context @pvary , that makes sense. To make sure I implement this correctly: the |
This is how I imagine this today: |
|
Thanks for the feedback @pvary. @aihuaxu , @pvary and I synced offline to discuss how to move this forward. Adding a note here so that it's easy to review. I've made the following changes:
@huaxingao, @pvary, @aihuaxu, @RussellSpitzer , please review when you have a chance. |
| this.bufferRowCount = bufferRowCount; | ||
| this.appenderFactory = appenderFactory; | ||
| this.copyFunc = copyFunc; | ||
| this.buffer = Lists.newArrayList(); |
There was a problem hiding this comment.
nit: Could we initialize with the expected size?
| if (delegate != null) { | ||
| return delegate.splitOffsets(); | ||
| } | ||
| return null; |
| if (delegate != null) { | ||
| return delegate.length(); | ||
| } | ||
| return 0L; |
There was a problem hiding this comment.
Are we sure about the 0 length?
There was a problem hiding this comment.
since there is nothing buffered yet, 0 makes sense. Let me know if you prefer a different response
| public long length() { | ||
| if (delegate != null) { | ||
| return delegate.length(); | ||
| } |
| for (Record row : bufferedRows) { | ||
| appender.add(row); | ||
| } | ||
| return appender; |
There was a problem hiding this comment.
I figured spotless would catch some of these, but maybe not. I'll fix them in upcoming commits
| Preconditions.checkArgument( | ||
| bufferRowCount > 0, "bufferRowCount must be > 0, got %s", bufferRowCount); | ||
| Preconditions.checkNotNull(appenderFactory, "appenderFactory must not be null"); | ||
| Preconditions.checkNotNull(copyFunc, "copyFunc must not be null"); |
There was a problem hiding this comment.
How frequent is the need to copy?
Do we need to get this always, or is it worth to have a non-copy consructor?
| for (Record row : bufferedRows) { | ||
| appender.add(row); | ||
| } |
There was a problem hiding this comment.
| for (Record row : bufferedRows) { | |
| appender.add(row); | |
| } | |
| bufferedRows.forEach(appender::add); |
| public WriteBuilder withFileSchema(MessageType newFileSchema) { | ||
| this.fileSchema = newFileSchema; | ||
| return this; | ||
| } |
There was a problem hiding this comment.
Is there an easy way to encode this to the engineSchema?
There was a problem hiding this comment.
Or could we just set the createReaderFunction based on the MessageType?
| public DataWriteBuilder withFileSchema(MessageType newFileSchema) { | ||
| appenderBuilder.withFileSchema(newFileSchema); | ||
| return this; | ||
| } | ||
|
|
| MessageTypeBuilder builder = Types.buildMessage(); | ||
|
|
||
| for (Type field : fields) { | ||
| if (field != null) { |
There was a problem hiding this comment.
Do we need these null checks?
| Function<List<InternalRow>, FileAppender<InternalRow>> appenderFactory = | ||
| bufferedRows -> { | ||
| Preconditions.checkNotNull(bufferedRows, "bufferedRows must not be null"); | ||
| MessageType originalSchema = ParquetSchemaUtil.convert(dataSchema, "table"); |
There was a problem hiding this comment.
This is not the place for things like this.
There should not be any FileFormat related thing here.
ParquetFormatModel should contain this logic as this is parquet specific.
If there are Spark specific parts, that should be a method used to parametrize the ParquetFormatModel, like the WriterBuilderFunction
There was a problem hiding this comment.
+1. When shredding is enabled, this skips the FormatModelRegistry / ParquetFormatModel path and constructs the Parquet writer directly, pulling format-specific imports into the format-agnostic SparkFileWriterFactory.
There was a problem hiding this comment.
Agree that this is the main part that we can refactor to follow the new FileFormat API.
There was a problem hiding this comment.
I'll work on the refactor based on the comments
|
|
||
| private boolean shouldUseVariantShredding() { | ||
| // Variant shredding is currently only supported for Parquet files | ||
| if (dataFileFormat != FileFormat.PARQUET) { |
|
@nssalian pointed me at this: I've reviewed it as far as I'm safe to. I am doing benchmarks for read performance #15629 which goes alongside this. It'll show when shedded variant performance equals or exceeds that of unshedded. Right now the results of #https://github.com/apache/iceberg/issues/15628#issuecomment-4120285243 show that is not yet the case, at least in the test setup |
| spark.conf().set(SparkSQLProperties.SHRED_VARIANTS, "true"); | ||
|
|
||
| String values = | ||
| "(1, parse_json('{\"age\": \"25\"}'))," |
There was a problem hiding this comment.
java17 has that """" multiline string thing which is ideal for json like this
| MessageType expectedSchema = parquetSchema(address); | ||
|
|
||
| Table table = validationCatalog.loadTable(tableIdent); | ||
| verifyParquetSchema(table, expectedSchema); |
There was a problem hiding this comment.
I'd add a test to make sure that row 2 had age of value 30:int, just to make sure that the parser hasn't decided to "be helpful"
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { |
There was a problem hiding this comment.
what happens on a create().close() sequence with no data written? it should be a no-op. Is this tested?
There was a problem hiding this comment.
I'll add an empty close test
This change adds support for writing shredded variants in the iceberg-spark module, enabling Spark to write shredded variant data into Iceberg tables.
Ideally, this should follow the approach described in the reader/writer API proposal for Iceberg V4, where the execution engine provides the shredded writer schema before creating the Iceberg writer. This design is cleaner, as it delegates schema generation responsibility to the engine.
As an interim solution, this PR implements a writer with lazy initialization for the actual Parquet writer. It buffers a portion of the data first, derives the shredded schema from the buffered records, then initializes the Parquet writer and flushes the buffered data to the file.
The current shredding algorithm is to shred to the most common type for a field.