Skip to content

Commit 9eab836

Browse files
Add roaring bitmap aggregation
1 parent 3c53a20 commit 9eab836

File tree

17 files changed

+514
-4
lines changed

17 files changed

+514
-4
lines changed

fluss-client/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ This project bundles the following dependencies under the Apache Software Licens
1010
- com.ververica:frocksdbjni:6.20.3-ververica-2.0
1111
- org.apache.commons:commons-lang3:3.18.0
1212
- org.apache.commons:commons-math3:3.6.1
13+
- org.roaringbitmap:RoaringBitmap:1.2.1
1314
- at.yawk.lz4:lz4-java:1.10.2
1415

1516
This project bundles the following dependencies under the MIT (https://opensource.org/licenses/MIT)

fluss-common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@
6262
<artifactId>fluss-shaded-arrow</artifactId>
6363
</dependency>
6464

65+
<dependency>
66+
<groupId>org.roaringbitmap</groupId>
67+
<artifactId>RoaringBitmap</artifactId>
68+
<version>${roaringbitmap.version}</version>
69+
</dependency>
70+
6571
<!-- TODO: these two dependencies need to be shaded. -->
6672
<dependency>
6773
<groupId>at.yawk.lz4</groupId>

fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ public enum AggFunctionType {
4949

5050
// Boolean aggregation
5151
BOOL_AND,
52-
BOOL_OR;
52+
BOOL_OR,
53+
54+
// Roaring bitmap aggregation
55+
RBM32,
56+
RBM64;
5357

5458
/** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. */
5559
public static final String PARAM_DELIMITER = "delimiter";

fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,32 @@ public static AggFunction BOOL_OR() {
271271
return new AggFunction(AggFunctionType.BOOL_OR, null);
272272
}
273273

274+
// ===================================================================================
275+
// Roaring Bitmap Aggregation Functions
276+
// ===================================================================================
277+
278+
/**
279+
* Creates a RBM32 aggregation function that merges serialized 32-bit roaring bitmaps.
280+
*
281+
* <p>Supported data types: BYTES
282+
*
283+
* @return a RBM32 aggregation function
284+
*/
285+
public static AggFunction RBM32() {
286+
return new AggFunction(AggFunctionType.RBM32, null);
287+
}
288+
289+
/**
290+
* Creates a RBM64 aggregation function that merges serialized 64-bit roaring bitmaps.
291+
*
292+
* <p>Supported data types: BYTES
293+
*
294+
* @return a RBM64 aggregation function
295+
*/
296+
public static AggFunction RBM64() {
297+
return new AggFunction(AggFunctionType.RBM64, null);
298+
}
299+
274300
// ===================================================================================
275301
// Internal Factory Methods
276302
// ===================================================================================
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.utils;
20+
21+
import org.roaringbitmap.RoaringBitmap;
22+
import org.roaringbitmap.longlong.Roaring64Bitmap;
23+
24+
import java.io.ByteArrayInputStream;
25+
import java.io.ByteArrayOutputStream;
26+
import java.io.DataInputStream;
27+
import java.io.DataOutputStream;
28+
import java.io.IOException;
29+
import java.nio.ByteBuffer;
30+
31+
/** Utility methods for serializing roaring bitmaps. */
32+
public final class RoaringBitmapUtils {
33+
34+
private RoaringBitmapUtils() {
35+
// Utility class, no instantiation
36+
}
37+
38+
public static byte[] serializeRoaringBitmap32(RoaringBitmap bitmap) throws IOException {
39+
bitmap.runOptimize();
40+
ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes());
41+
bitmap.serialize(buffer);
42+
return buffer.array();
43+
}
44+
45+
public static void deserializeRoaringBitmap32(RoaringBitmap bitmap, byte[] bytes)
46+
throws IOException {
47+
bitmap.deserialize(ByteBuffer.wrap(bytes));
48+
}
49+
50+
public static byte[] serializeRoaringBitmap64(Roaring64Bitmap bitmap) throws IOException {
51+
bitmap.runOptimize();
52+
try (ByteArrayOutputStream output = new ByteArrayOutputStream();
53+
DataOutputStream dataOutput = new DataOutputStream(output)) {
54+
bitmap.serialize(dataOutput);
55+
return output.toByteArray();
56+
}
57+
}
58+
59+
public static void deserializeRoaringBitmap64(Roaring64Bitmap bitmap, byte[] bytes)
60+
throws IOException {
61+
try (ByteArrayInputStream input = new ByteArrayInputStream(bytes);
62+
DataInputStream dataInput = new DataInputStream(input)) {
63+
bitmap.deserialize(dataInput);
64+
}
65+
}
66+
}

fluss-server/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@
138138
<pattern>org.apache.commons</pattern>
139139
<shadedPattern>org.apache.fluss.shaded.org.apache.commons</shadedPattern>
140140
</relocation>
141+
<relocation>
142+
<pattern>org.roaringbitmap</pattern>
143+
<shadedPattern>org.apache.fluss.shaded.org.roaringbitmap</shadedPattern>
144+
</relocation>
141145
</relocations>
142146
</configuration>
143147
</execution>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.kv.rowmerger.aggregate.factory;
20+
21+
/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
22+
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
23+
* additional information regarding copyright ownership. */
24+
25+
import org.apache.fluss.metadata.AggFunction;
26+
import org.apache.fluss.metadata.AggFunctionType;
27+
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap32Agg;
28+
import org.apache.fluss.types.DataType;
29+
import org.apache.fluss.types.DataTypeRoot;
30+
31+
import static org.apache.fluss.utils.Preconditions.checkArgument;
32+
33+
/** Factory for {@link FieldRoaringBitmap32Agg}. */
34+
public class FieldRoaringBitmap32AggFactory implements FieldAggregatorFactory {
35+
36+
@Override
37+
public FieldRoaringBitmap32Agg create(DataType fieldType, AggFunction aggFunction) {
38+
checkArgument(
39+
fieldType.getTypeRoot() == DataTypeRoot.BYTES,
40+
"Data type for rbm32 column must be 'BytesType' but was '%s'.",
41+
fieldType);
42+
return new FieldRoaringBitmap32Agg(fieldType);
43+
}
44+
45+
@Override
46+
public String identifier() {
47+
return AggFunctionType.RBM32.toString();
48+
}
49+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.kv.rowmerger.aggregate.factory;
20+
21+
/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
22+
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
23+
* additional information regarding copyright ownership. */
24+
25+
import org.apache.fluss.metadata.AggFunction;
26+
import org.apache.fluss.metadata.AggFunctionType;
27+
import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldRoaringBitmap64Agg;
28+
import org.apache.fluss.types.DataType;
29+
import org.apache.fluss.types.DataTypeRoot;
30+
31+
import static org.apache.fluss.utils.Preconditions.checkArgument;
32+
33+
/** Factory for {@link FieldRoaringBitmap64Agg}. */
34+
public class FieldRoaringBitmap64AggFactory implements FieldAggregatorFactory {
35+
36+
@Override
37+
public FieldRoaringBitmap64Agg create(DataType fieldType, AggFunction aggFunction) {
38+
checkArgument(
39+
fieldType.getTypeRoot() == DataTypeRoot.BYTES,
40+
"Data type for rbm64 column must be 'BytesType' but was '%s'.",
41+
fieldType);
42+
return new FieldRoaringBitmap64Agg(fieldType);
43+
}
44+
45+
@Override
46+
public String identifier() {
47+
return AggFunctionType.RBM64.toString();
48+
}
49+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.kv.rowmerger.aggregate.functions;
20+
21+
/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
22+
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
23+
* additional information regarding copyright ownership. */
24+
25+
import org.apache.fluss.types.DataType;
26+
import org.apache.fluss.utils.RoaringBitmapUtils;
27+
28+
import org.roaringbitmap.RoaringBitmap;
29+
30+
import java.io.IOException;
31+
32+
/** Roaring bitmap aggregator for serialized 32-bit bitmaps. */
33+
public class FieldRoaringBitmap32Agg extends FieldAggregator {
34+
35+
private static final long serialVersionUID = 1L;
36+
private final RoaringBitmap roaringBitmapAcc;
37+
private final RoaringBitmap roaringBitmapInput;
38+
39+
public FieldRoaringBitmap32Agg(DataType dataType) {
40+
super(dataType);
41+
this.roaringBitmapAcc = new RoaringBitmap();
42+
this.roaringBitmapInput = new RoaringBitmap();
43+
}
44+
45+
@Override
46+
public Object agg(Object accumulator, Object inputField) {
47+
if (accumulator == null || inputField == null) {
48+
return accumulator == null ? inputField : accumulator;
49+
}
50+
51+
try {
52+
RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapAcc, (byte[]) accumulator);
53+
RoaringBitmapUtils.deserializeRoaringBitmap32(roaringBitmapInput, (byte[]) inputField);
54+
roaringBitmapAcc.or(roaringBitmapInput);
55+
return RoaringBitmapUtils.serializeRoaringBitmap32(roaringBitmapAcc);
56+
} catch (IOException e) {
57+
throw new RuntimeException("Unable to se/deserialize roaring bitmap.", e);
58+
} finally {
59+
roaringBitmapAcc.clear();
60+
roaringBitmapInput.clear();
61+
}
62+
}
63+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.kv.rowmerger.aggregate.functions;
20+
21+
/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
22+
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
23+
* additional information regarding copyright ownership. */
24+
25+
import org.apache.fluss.types.DataType;
26+
import org.apache.fluss.utils.RoaringBitmapUtils;
27+
28+
import org.roaringbitmap.longlong.Roaring64Bitmap;
29+
30+
import java.io.IOException;
31+
32+
/** Roaring bitmap aggregator for serialized 64-bit bitmaps. */
33+
public class FieldRoaringBitmap64Agg extends FieldAggregator {
34+
35+
private static final long serialVersionUID = 1L;
36+
private final Roaring64Bitmap roaringBitmapAcc;
37+
private final Roaring64Bitmap roaringBitmapInput;
38+
39+
public FieldRoaringBitmap64Agg(DataType dataType) {
40+
super(dataType);
41+
this.roaringBitmapAcc = new Roaring64Bitmap();
42+
this.roaringBitmapInput = new Roaring64Bitmap();
43+
}
44+
45+
@Override
46+
public Object agg(Object accumulator, Object inputField) {
47+
if (accumulator == null || inputField == null) {
48+
return accumulator == null ? inputField : accumulator;
49+
}
50+
51+
try {
52+
RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapAcc, (byte[]) accumulator);
53+
RoaringBitmapUtils.deserializeRoaringBitmap64(roaringBitmapInput, (byte[]) inputField);
54+
roaringBitmapAcc.or(roaringBitmapInput);
55+
return RoaringBitmapUtils.serializeRoaringBitmap64(roaringBitmapAcc);
56+
} catch (IOException e) {
57+
throw new RuntimeException("Unable to se/deserialize roaring bitmap.", e);
58+
} finally {
59+
roaringBitmapAcc.clear();
60+
roaringBitmapInput.clear();
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)