fix: preserve duplicate GROUPING SETS rows#21058
fix: preserve duplicate GROUPING SETS rows#21058xiedeyantu wants to merge 6 commits intoapache:mainfrom
Conversation
|
Hi @alamb , there's another one here. |
|
Thanks @xiedeyantu -- this is great. As before, can you please:
|
I completely agree. I may not be very familiar with DataFusion's contribution process yet. Thank you for your thoughtful suggestions. I might add the necessary information in a few days, as I'm currently on vacation. |
|
Thank you @xiedeyantu |
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn duplicate_grouping_sets_are_preserved() -> Result<()> { |
There was a problem hiding this comment.
Seems like this duplicates the SLT test that this PR already adds, so I think it can be omitted.
There was a problem hiding this comment.
It's a little unfortunate that we don't support duplicate grouping sets for 8/16/32 grouping columns. I guess there's no easy way around that?
grouping_function_on_id has this code:
let grouping_id_column = Expr::Column(Column::from(Aggregate::INTERNAL_GROUPING_ID));
// The grouping call is exactly our internal grouping id
if args.len() == group_by_expr_count
&& args
.iter()
.rev()
.enumerate()
.all(|(idx, expr)| group_by_expr.get(expr) == Some(&idx))
{
return Ok(cast(grouping_id_column, DataType::Int32));
}We probably want to mask out the ordinal bits that this PR adds in to the grouping ID?
| let extra_bits = width_bits - group.len(); | ||
| if extra_bits == 0 && group_ordinal > 0 { | ||
| return not_impl_err!( | ||
| "Duplicate grouping sets with more than {} grouping columns are not supported", |
There was a problem hiding this comment.
If group.len() == 64, seems like this will produce an inaccurate error message ("more than 64" vs "64").
| group_by: &PhysicalGroupBy, | ||
| batch: &RecordBatch, | ||
| ) -> Result<Vec<Vec<ArrayRef>>> { | ||
| let mut group_ordinals: HashMap<Vec<bool>, usize> = HashMap::new(); |
There was a problem hiding this comment.
HashMap<&[bool], usize> instead?
| } | ||
|
|
||
| fn group_id_array(group: &[bool], batch: &RecordBatch) -> Result<ArrayRef> { | ||
| fn group_id_array( |
There was a problem hiding this comment.
I think this function would benefit from some commentary: you need to read the implementation pretty carefully to understand the encoded bit format.
| let group_id = group.iter().fold(0u64, |acc, &is_null| { | ||
| (acc << 1) | if is_null { 1 } else { 0 } | ||
| }); | ||
| let group_id = if group.len() == 64 { |
There was a problem hiding this comment.
Personally, I would opt for tightening the check on group.len() > 64 to group.len() >= 64 at the top of the function, which would simplify this logic.
|
@neilconway Thank you for the thorough review. I will complete these modifications this weekend and update the problem description accordingly. |
@alamb Apologies for the late reply. I have just updated the problem description in the PR. |
@neilconway Apologies for the delayed response. I have reviewed all your thoughtful comments and agree that the approach using high-bit flags indeed has significant flaws. I have now refactored the entire logic and updated the PR description; if you have the time, could you please help review it again? Thank you! |
|
@alamb Could you please take a look and let me know if my current revisions are okay? |
|
@neilconway Could you please help me to have a look again? |
neilconway
left a comment
There was a problem hiding this comment.
Sorry for the delayed response, @xiedeyantu !
Thanks for revising this. I'm a bit concerned by the overhead here; we are added a UInt32 column to every query with grouping sets just to handle a relatively rare situation.
I actually liked the approach you took in the initial PR better; encoding duplicates into the high bits of the grouping ID is a nice approach. We'd just need to take care to mask out the high bits in the GROUPING function, and if possible it would be nice to avoid arbitrary limits like not supporting duplicate grouping sets for queries with 8/16/etc. grouping sets.
Alternatively we could represent grouping IDs as an index into the list of GROUPING SETS. That would provide an ID without concern for duplicates, and then we'd implement the GROUPING function with a CASE or similar construct.
datafusion/core/src/dataframe/mod.rs
Outdated
| .build()?; | ||
| let plan = if is_grouping_set { | ||
| let grouping_id_pos = plan.schema().fields().len() - 1 - aggr_expr_len; | ||
| let grouping_id_pos = plan.schema().fields().len() - 2 - aggr_expr_len; |
There was a problem hiding this comment.
2 magic number in a few places is a bit inscrutable. Maybe create a named constant?
@neilconway Thanks for your kind guidance. I have refactored this PR according to your comments. Due to the significant changes, I force-pushed the commit. I think this version is better than the previous one. If you have time, please take a look again. Sorry for making multiple changes. |
datafusion/sql/src/unparser/utils.rs
Outdated
| @@ -247,6 +247,11 @@ fn find_agg_expr<'a>(agg: &'a Aggregate, column: &Column) -> Result<Option<&'a E | |||
| ) | |||
| } | |||
| Ordering::Greater => { | |||
| if index < grouping_expr.len() + 1 { | |||
There was a problem hiding this comment.
This seems like dead code? i.e., index > grouping_expr.len -> index >= (grouping_expr.len + 1)
| /// Returns the highest duplicate ordinal across all grouping sets. | ||
| /// | ||
| /// The ordinal counts how many times a grouping-set pattern has already | ||
| /// appeared before the current occurrence. If the same `Vec<bool>` appears |
There was a problem hiding this comment.
"the current occurrence" doesn't seem to make sense when you look at the function itself, it is talking about the call-site.
| /// three times the ordinals are 0, 1, 2 and this function returns 2. | ||
| /// Returns 0 when no grouping set is duplicated. | ||
| fn max_duplicate_ordinal(groups: &[Vec<bool>]) -> usize { | ||
| let mut counts: HashMap<&Vec<bool>, usize> = HashMap::new(); |
| counts | ||
| .values() | ||
| .copied() | ||
| .max() | ||
| .unwrap_or(1) | ||
| .saturating_sub(1) |
There was a problem hiding this comment.
counts.into_values().max().unwrap_or(0).saturating_sub(1)
| // The grouping call is exactly our internal grouping id — mask the ordinal | ||
| // bits (above position `n`) so only the semantic bitmask is visible. | ||
| let n = group_by_expr_count; | ||
| // (1 << n) - 1 masks the low n bits. Use saturating arithmetic to handle n == 0. |
There was a problem hiding this comment.
The code doesn't use saturating arithmetic?
There was a problem hiding this comment.
Although I don't think wrapping_sub is necessary anyway, n == 0 -> 1 << 0 -> 1, so it won't underflow.
| } else { | ||
| (1u64 << n).wrapping_sub(1) | ||
| }; | ||
| let masked_id = |
There was a problem hiding this comment.
Don't compute this outside the if in which it is used.
|
@neilconway Thank you very much for your detailed review! I have addressed your comments line by line and included the changes in a single commit. Please take another look to see if everything now aligns with your feedback. |
| /// Returns the Arrow data type of the `__grouping_id` column. | ||
| /// | ||
| /// The type is chosen to be wide enough to hold both the semantic bitmask | ||
| /// (in the low `n` bits) and the duplicate ordinal (in the high bits). |
There was a problem hiding this comment.
Not clear what n is here based on the context of the function itself.
| let n = group_by_expr_count; | ||
| // (1 << n) - 1 masks the low n bits. | ||
| let semantic_mask: u64 = if n >= 64 { u64::MAX } else { (1u64 << n) - 1 }; |
There was a problem hiding this comment.
As I suggested before, move this inside the if
| /// Returns 0 when no grouping set is duplicated. | ||
| fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize { | ||
| if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = group_expr.first() { | ||
| let mut counts: HashMap<&Vec<Expr>, usize> = HashMap::new(); |
| @@ -5206,6 +5203,41 @@ NULL NULL 1 | |||
| statement ok | |||
| drop table t; | |||
|
|
|||
| # regression: duplicate grouping sets must not be collapsed into one | |||
There was a problem hiding this comment.
I think it's worth having a test case for the situation where adding the duplicate ordinal widens the size of the grouping ID field, that's a bit tricky.
| /// The ordinal counts how many times a given grouping set pattern has already | ||
| /// appeared before the current occurrence. For example, if the same set |
There was a problem hiding this comment.
I mentioned this before but "the current occurrence" is not well-defined in this context. Something like this instead:
/// The ordinal for each occurrence of a grouping set pattern is its 0-based index among
/// identical entries. For example, if the same set appears three times, the ordinals are
/// 0, 1, 2 and this function returns 2.
|
@neilconway I have made the changes based on the comments; could you please review it again? Thank you! |
Which issue does this PR close?
GROUPING SETSrows are incorrectly collapsed during execution #21316.Rationale for this change
GROUPING SETSwith duplicate grouping lists were incorrectly collapsed during execution. The internal grouping id only encoded the semantic null mask, so repeated grouping sets shared the same execution key and were merged, which caused rows to be lost compared with PostgreSQL behavior.For example, with:
PostgreSQL preserves the duplicate grouping set and returns:
Before this fix, DataFusion collapsed the duplicate
(deptno, job)grouping set and returned only 4 rows for the same query shape.What changes are included in this PR?
__grouping_idas the semantic grouping mask only.__grouping_ordinalcolumn so repeated occurrences of the same grouping set produce distinct execution keys.GROUPING()semantics unchanged.GROUPING SETScase in:datafusion/core/tests/sql/aggregates/basic.rsdatafusion/sqllogictest/test_files/group_by.sltAre these changes tested?
cargo fmt --allcargo test -p datafusion duplicate_grouping_sets_are_preservedcargo test -p datafusion-physical-plan grouping_sets_preserve_duplicate_groupscargo test -p datafusion-physical-plan evaluate_group_by_supports_duplicate_grouping_sets_with_eight_columnsAre there any user-facing changes?
GROUPING SETSentries now return the correct duplicated result rows, matching PostgreSQL behavior.