Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/executor/aggfuncs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"func_json_objectagg.go",
"func_lead_lag.go",
"func_max_min.go",
"func_max_min_count.go",
"func_ntile.go",
"func_percent_rank.go",
"func_percentile.go",
Expand Down Expand Up @@ -73,6 +74,7 @@ go_test(
"func_json_arrayagg_test.go",
"func_json_objectagg_test.go",
"func_lead_lag_test.go",
"func_max_min_count_test.go",
"func_max_min_test.go",
"func_ntile_test.go",
"func_percent_rank_test.go",
Expand Down
12 changes: 12 additions & 0 deletions pkg/executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ var (
_ AggFunc = (*maxMin4VectorFloat32)(nil)
_ AggFunc = (*maxMin4Enum)(nil)
_ AggFunc = (*maxMin4Set)(nil)
_ AggFunc = (*maxMinCount4Int)(nil)
_ AggFunc = (*maxMinCount4Uint)(nil)
_ AggFunc = (*maxMinCount4Float32)(nil)
_ AggFunc = (*maxMinCount4Float64)(nil)
_ AggFunc = (*maxMinCount4Decimal)(nil)
_ AggFunc = (*maxMinCount4String)(nil)
_ AggFunc = (*maxMinCount4Time)(nil)
_ AggFunc = (*maxMinCount4Duration)(nil)
_ AggFunc = (*maxMinCount4JSON)(nil)
_ AggFunc = (*maxMinCount4VectorFloat32)(nil)
_ AggFunc = (*maxMinCount4Enum)(nil)
_ AggFunc = (*maxMinCount4Set)(nil)

// All the AggFunc implementations for "AVG" are listed here.
_ AggFunc = (*avgOriginal4Decimal)(nil)
Expand Down
91 changes: 91 additions & 0 deletions pkg/executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func Build(ctx AggFuncBuildContext, aggFuncDesc *aggregation.AggFuncDesc, ordina
return buildMaxMin(aggFuncDesc, ordinal, true)
case ast.AggFuncMin:
return buildMaxMin(aggFuncDesc, ordinal, false)
case ast.AggFuncMaxCount:
return buildMaxMinCount(ctx.GetEvalCtx(), aggFuncDesc, ordinal, true)
case ast.AggFuncMinCount:
return buildMaxMinCount(ctx.GetEvalCtx(), aggFuncDesc, ordinal, false)
case ast.AggFuncGroupConcat:
return buildGroupConcat(ctx, aggFuncDesc, ordinal)
case ast.AggFuncBitOr:
Expand Down Expand Up @@ -108,6 +112,10 @@ func BuildWindowFunctions(ctx AggFuncBuildContext, windowFuncDesc *aggregation.A
return buildMaxMinInWindowFunction(ctx, windowFuncDesc, ordinal, true)
case ast.AggFuncMin:
return buildMaxMinInWindowFunction(ctx, windowFuncDesc, ordinal, false)
case ast.AggFuncMaxCount:
return buildMaxMinCountInWindowFunction(ctx, windowFuncDesc, ordinal, true)
case ast.AggFuncMinCount:
return buildMaxMinCountInWindowFunction(ctx, windowFuncDesc, ordinal, false)
default:
return Build(ctx, windowFuncDesc, ordinal)
}
Expand Down Expand Up @@ -470,6 +478,64 @@ func buildMaxMin(aggFuncDesc *aggregation.AggFuncDesc, ordinal int, isMax bool)
return nil
}

// buildMaxMinCount builds the AggFunc implementation for function "MAX_COUNT" and "MIN_COUNT".
func buildMaxMinCount(ctx expression.EvalContext, aggFuncDesc *aggregation.AggFuncDesc, ordinal int, isMax bool) AggFunc {
if aggFuncDesc.Mode == aggregation.DedupMode {
return nil
}

argTp := aggFuncDesc.Args[0].GetType(ctx)
base := baseMaxMinCountAggFunc{
baseMaxMinAggFunc: baseMaxMinAggFunc{
baseAggFunc: baseAggFunc{
args: aggFuncDesc.Args,
ordinal: ordinal,
retTp: aggFuncDesc.RetTp,
},
isMax: isMax,
collator: collate.GetCollator(argTp.GetCollate()),
},
}
evalType, fieldType := argTp.EvalType(), argTp
if fieldType.GetType() == mysql.TypeBit {
evalType = types.ETString
}
switch fieldType.GetType() {
case mysql.TypeEnum:
return &maxMinCount4Enum{base}
case mysql.TypeSet:
return &maxMinCount4Set{base}
}

switch evalType {
case types.ETInt:
if mysql.HasUnsignedFlag(fieldType.GetFlag()) {
return &maxMinCount4Uint{base}
}
return &maxMinCount4Int{base}
case types.ETReal:
switch fieldType.GetType() {
case mysql.TypeFloat:
return &maxMinCount4Float32{base}
case mysql.TypeDouble:
return &maxMinCount4Float64{base}
}
case types.ETDecimal:
return &maxMinCount4Decimal{base}
case types.ETString:
return &maxMinCount4String{baseMaxMinCountAggFunc: base}
case types.ETDatetime, types.ETTimestamp:
return &maxMinCount4Time{base}
case types.ETDuration:
return &maxMinCount4Duration{base}
case types.ETJson:
return &maxMinCount4JSON{base}
case types.ETVectorFloat32:
return &maxMinCount4VectorFloat32{base}
}
return nil
}

// buildMaxMin builds the AggFunc implementation for function "MAX" and "MIN" using by window function.
func buildMaxMinInWindowFunction(ctx AggFuncBuildContext, aggFuncDesc *aggregation.AggFuncDesc, ordinal int, isMax bool) AggFunc {
base := buildMaxMin(aggFuncDesc, ordinal, isMax)
Expand All @@ -495,6 +561,31 @@ func buildMaxMinInWindowFunction(ctx AggFuncBuildContext, aggFuncDesc *aggregati
return base
}

// buildMaxMinCountInWindowFunction builds the AggFunc implementation for function "MAX_COUNT" and "MIN_COUNT" used by window function.
func buildMaxMinCountInWindowFunction(ctx AggFuncBuildContext, aggFuncDesc *aggregation.AggFuncDesc, ordinal int, isMax bool) AggFunc {
base := buildMaxMinCount(ctx.GetEvalCtx(), aggFuncDesc, ordinal, isMax)
// build max_count/min_count aggFunc for window function using sliding window
switch baseAggFunc := base.(type) {
case *maxMinCount4Int:
return &maxMinCount4IntSliding{*baseAggFunc, windowInfo{}}
case *maxMinCount4Uint:
return &maxMinCount4UintSliding{*baseAggFunc, windowInfo{}}
case *maxMinCount4Float32:
return &maxMinCount4Float32Sliding{*baseAggFunc, windowInfo{}}
case *maxMinCount4Float64:
return &maxMinCount4Float64Sliding{*baseAggFunc, windowInfo{}}
case *maxMinCount4Decimal:
return &maxMinCount4DecimalSliding{*baseAggFunc, windowInfo{}}
case *maxMinCount4String:
return &maxMinCount4StringSliding{*baseAggFunc, windowInfo{}, baseAggFunc.args[0].GetType(ctx.GetEvalCtx()).GetCollate()}
case *maxMinCount4Time:
return &maxMinCount4TimeSliding{*baseAggFunc, windowInfo{}}
case *maxMinCount4Duration:
return &maxMinCount4DurationSliding{*baseAggFunc, windowInfo{}}
}
return base
}

// buildGroupConcat builds the AggFunc implementation for function "GROUP_CONCAT".
func buildGroupConcat(ctx AggFuncBuildContext, aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
switch aggFuncDesc.Mode {
Expand Down
22 changes: 21 additions & 1 deletion pkg/executor/aggfuncs/func_max_min.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ func (e *maxMin4TimeSliding) deserializeForSpill(helper *deserializeHelper) (Par
return pr, memDelta
}

var _ SlidingWindowAggFunc = &maxMin4DurationSliding{}
var _ SlidingWindowAggFunc = &maxMin4TimeSliding{}

func (e *maxMin4TimeSliding) Slide(sctx AggFuncUpdateContext, getRow func(uint64) chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error {
p := (*partialResult4MaxMinTime)(pr)
Expand Down Expand Up @@ -1728,6 +1728,26 @@ func (e *maxMin4VectorFloat32) MergePartialResult(_ AggFuncUpdateContext, src, d
return 0, nil
}

func (e *maxMin4VectorFloat32) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4MaxMinVectorFloat32)(partialResult)
resBuf := spillHelper.serializePartialResult4MaxMinVectorFloat32(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *maxMin4VectorFloat32) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *maxMin4VectorFloat32) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4MaxMinVectorFloat32)(pr)
success := helper.deserializePartialResult4MaxMinVectorFloat32(result)
if !success {
return nil, 0
}
return pr, memDelta
}

type maxMin4Enum struct {
baseMaxMinAggFunc
}
Expand Down
Loading