Skip to content

Commit cf85da1

Browse files
luffy-zhshouzhiwgtmac
committed
ORC-2022: [C++] Add support to use dictionary for IN expression
### What changes were proposed in this pull request? Use column dictionaries to evaluate IN predicates. ### Why are the changes needed? Optimize IN predicate pruning: consult column dictionary (when reasonably sized) instead of relying on less effective min/max statistics. ### How was this patch tested? Unit tests in TestPredicatePushdown.cc verify this change. ### Was this patch authored or co-authored using generative AI tooling? NO. Closes #2453 from luffy-zh/ORC-2022. Lead-authored-by: luffy-zh <[email protected]> Co-authored-by: shouzhi <[email protected]> Co-authored-by: Hao Zou <[email protected]> Co-authored-by: Gang Wu <[email protected]> Signed-off-by: Gang Wu <[email protected]>
1 parent e42878c commit cf85da1

19 files changed

+641
-63
lines changed

c++/include/orc/Reader.hh

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,21 @@ namespace orc {
408408
* Get the number of stripes to look ahead for small stripe prefetch.
409409
*/
410410
uint64_t getSmallStripeLookAheadLimit() const;
411+
412+
/**
413+
* Set the maximum dictionary size threshold for evaluation.
414+
*
415+
* Dictionaries with more entries than this threshold will not be evaluated.
416+
* 0 to disable dictionary filtering.
417+
*
418+
* Defaults to 0.
419+
*/
420+
RowReaderOptions& setDictionaryFilteringSizeThreshold(uint32_t threshold);
421+
422+
/**
423+
* Get the dictionary filtering size threshold.
424+
*/
425+
uint32_t getDictionaryFilteringSizeThreshold() const;
411426
};
412427

413428
class RowReader;

c++/include/orc/sargs/Literal.hh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "orc/Int128.hh"
2323
#include "orc/Vector.hh"
2424

25+
#include <string_view>
26+
2527
namespace orc {
2628

2729
/**
@@ -123,6 +125,7 @@ namespace orc {
123125
Timestamp getTimestamp() const;
124126
double getFloat() const;
125127
std::string getString() const;
128+
std::string_view getStringView() const;
126129
bool getBool() const;
127130
Decimal getDecimal() const;
128131

c++/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ set(SOURCE_FILES
169169
ConvertColumnReader.cc
170170
CpuInfoUtil.cc
171171
Dictionary.cc
172+
DictionaryLoader.cc
172173
Exceptions.cc
173174
Geospatial.cc
174175
Int128.cc

c++/src/ColumnReader.cc

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
#include "orc/Int128.hh"
19+
#include "ColumnReader.hh"
20+
21+
#include <cmath>
2022

2123
#include "Adaptor.hh"
2224
#include "ByteRLE.hh"
23-
#include "ColumnReader.hh"
2425
#include "ConvertColumnReader.hh"
26+
#include "DictionaryLoader.hh"
2527
#include "RLE.hh"
2628
#include "SchemaEvolution.hh"
2729
#include "orc/Exceptions.hh"
30+
#include "orc/Int128.hh"
2831

2932
#include <math.h>
3033
#include <iostream>
@@ -36,19 +39,6 @@ namespace orc {
3639
// PASS
3740
}
3841

39-
inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
40-
switch (static_cast<int64_t>(kind)) {
41-
case proto::ColumnEncoding_Kind_DIRECT:
42-
case proto::ColumnEncoding_Kind_DICTIONARY:
43-
return RleVersion_1;
44-
case proto::ColumnEncoding_Kind_DIRECT_V2:
45-
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
46-
return RleVersion_2;
47-
default:
48-
throw ParseError("Unknown encoding in convertRleVersion");
49-
}
50-
}
51-
5242
ColumnReader::ColumnReader(const Type& type, StripeStreams& stripe)
5343
: columnId(type.getColumnId()),
5444
memoryPool(stripe.getMemoryPool()),
@@ -519,7 +509,10 @@ namespace orc {
519509
std::unique_ptr<RleDecoder> rle_;
520510

521511
public:
522-
StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
512+
StringDictionaryColumnReader(const Type& type, StripeStreams& stripe);
513+
514+
StringDictionaryColumnReader(const Type& type, StripeStreams& stripe,
515+
const std::shared_ptr<StringDictionary> dictionary);
523516
~StringDictionaryColumnReader() override;
524517

525518
uint64_t skip(uint64_t numValues) override;
@@ -533,39 +526,23 @@ namespace orc {
533526

534527
StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
535528
StripeStreams& stripe)
536-
: ColumnReader(type, stripe), dictionary_(new StringDictionary(stripe.getMemoryPool())) {
529+
: StringDictionaryColumnReader(type, stripe, nullptr) {}
530+
531+
StringDictionaryColumnReader::StringDictionaryColumnReader(
532+
const Type& type, StripeStreams& stripe, const std::shared_ptr<StringDictionary> dictionary)
533+
: ColumnReader(type, stripe), dictionary_(dictionary) {
537534
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId).kind());
538-
uint32_t dictSize = stripe.getEncoding(columnId).dictionary_size();
539535
std::unique_ptr<SeekableInputStream> stream =
540536
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
541537
if (stream == nullptr) {
542538
throw ParseError("DATA stream not found in StringDictionaryColumn");
543539
}
544540
rle_ = createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
545-
stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
546-
if (dictSize > 0 && stream == nullptr) {
547-
throw ParseError("LENGTH stream not found in StringDictionaryColumn");
548-
}
549-
std::unique_ptr<RleDecoder> lengthDecoder =
550-
createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
551-
dictionary_->dictionaryOffset.resize(dictSize + 1);
552-
int64_t* lengthArray = dictionary_->dictionaryOffset.data();
553-
lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
554-
lengthArray[0] = 0;
555-
for (uint32_t i = 1; i < dictSize + 1; ++i) {
556-
if (lengthArray[i] < 0) {
557-
throw ParseError("Negative dictionary entry length");
558-
}
559-
lengthArray[i] += lengthArray[i - 1];
560-
}
561-
int64_t blobSize = lengthArray[dictSize];
562-
dictionary_->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
563-
std::unique_ptr<SeekableInputStream> blobStream =
564-
stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
565-
if (blobSize > 0 && blobStream == nullptr) {
566-
throw ParseError("DICTIONARY_DATA stream not found in StringDictionaryColumn");
541+
542+
// If no dictionary was provided, load it
543+
if (!dictionary_) {
544+
dictionary_ = loadStringDictionary(columnId, stripe, memoryPool);
567545
}
568-
readFully(dictionary_->dictionaryBlob.data(), blobSize, blobStream.get());
569546
}
570547

571548
StringDictionaryColumnReader::~StringDictionaryColumnReader() {
@@ -1717,8 +1694,15 @@ namespace orc {
17171694
case GEOGRAPHY:
17181695
switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())) {
17191696
case proto::ColumnEncoding_Kind_DICTIONARY:
1720-
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
1721-
return std::make_unique<StringDictionaryColumnReader>(type, stripe);
1697+
case proto::ColumnEncoding_Kind_DICTIONARY_V2: {
1698+
// Check if we have a pre-loaded dictionary we can use
1699+
auto dictionary = stripe.getSharedDictionary(type.getColumnId());
1700+
if (dictionary) {
1701+
return std::make_unique<StringDictionaryColumnReader>(type, stripe, dictionary);
1702+
} else {
1703+
return std::unique_ptr<ColumnReader>(new StringDictionaryColumnReader(type, stripe));
1704+
}
1705+
}
17221706
case proto::ColumnEncoding_Kind_DIRECT:
17231707
case proto::ColumnEncoding_Kind_DIRECT_V2:
17241708
return std::make_unique<StringDirectColumnReader>(type, stripe);

c++/src/ColumnReader.hh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ namespace orc {
9797
* @return the number of scale digits
9898
*/
9999
virtual int32_t getForcedScaleOnHive11Decimal() const = 0;
100+
101+
/**
102+
* Get a shared dictionary for the given column if available.
103+
* @param columnId the id of the column
104+
* @return shared pointer to the StringDictionary or nullptr if not available
105+
*/
106+
virtual std::shared_ptr<StringDictionary> getSharedDictionary(uint64_t columnId) const = 0;
100107

101108
/**
102109
* Whether decimals that have precision <=18 are encoded as fixed scale and values

c++/src/DictionaryLoader.cc

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#include "DictionaryLoader.hh"
20+
#include "RLE.hh"
21+
22+
namespace orc {
23+
24+
namespace {
25+
26+
// Helper function to read data fully from a stream
27+
void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
28+
int64_t posn = 0;
29+
while (posn < bufferSize) {
30+
const void* chunk;
31+
int length;
32+
if (!stream->Next(&chunk, &length)) {
33+
throw ParseError("bad read in readFully");
34+
}
35+
if (posn + length > bufferSize) {
36+
throw ParseError("Corrupt dictionary blob");
37+
}
38+
memcpy(buffer + posn, chunk, static_cast<size_t>(length));
39+
posn += length;
40+
}
41+
}
42+
43+
} // namespace
44+
45+
std::shared_ptr<StringDictionary> loadStringDictionary(uint64_t columnId, StripeStreams& stripe,
46+
MemoryPool& pool) {
47+
// Get encoding information
48+
proto::ColumnEncoding encoding = stripe.getEncoding(columnId);
49+
RleVersion rleVersion = convertRleVersion(encoding.kind());
50+
uint32_t dictSize = encoding.dictionary_size();
51+
52+
// Create the dictionary object
53+
auto dictionary = std::make_shared<StringDictionary>(pool);
54+
55+
// Read LENGTH stream to get dictionary entry lengths
56+
std::unique_ptr<SeekableInputStream> stream =
57+
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
58+
if (dictSize > 0 && stream == nullptr) {
59+
std::stringstream ss;
60+
ss << "LENGTH stream not found in StringDictionaryColumn for column " << columnId;
61+
throw ParseError(ss.str());
62+
}
63+
std::unique_ptr<RleDecoder> lengthDecoder =
64+
createRleDecoder(std::move(stream), false, rleVersion, pool, stripe.getReaderMetrics());
65+
66+
// Decode dictionary entry lengths
67+
dictionary->dictionaryOffset.resize(dictSize + 1);
68+
int64_t* lengthArray = dictionary->dictionaryOffset.data();
69+
lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
70+
lengthArray[0] = 0;
71+
72+
// Convert lengths to cumulative offsets
73+
for (uint32_t i = 1; i < dictSize + 1; ++i) {
74+
if (lengthArray[i] < 0) {
75+
std::stringstream ss;
76+
ss << "Negative dictionary entry length for column " << columnId;
77+
throw ParseError(ss.str());
78+
}
79+
lengthArray[i] += lengthArray[i - 1];
80+
}
81+
82+
int64_t blobSize = lengthArray[dictSize];
83+
84+
// Read DICTIONARY_DATA stream to get dictionary content
85+
dictionary->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
86+
std::unique_ptr<SeekableInputStream> blobStream =
87+
stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
88+
if (blobSize > 0 && blobStream == nullptr) {
89+
std::stringstream ss;
90+
ss << "DICTIONARY_DATA stream not found in StringDictionaryColumn for column " << columnId;
91+
throw ParseError(ss.str());
92+
}
93+
94+
// Read the dictionary blob
95+
readFully(dictionary->dictionaryBlob.data(), blobSize, blobStream.get());
96+
97+
return dictionary;
98+
}
99+
100+
} // namespace orc

c++/src/DictionaryLoader.hh

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#ifndef ORC_DICTIONARY_LOADER_HH
20+
#define ORC_DICTIONARY_LOADER_HH
21+
22+
#include "ColumnReader.hh"
23+
#include "orc/Vector.hh"
24+
25+
namespace orc {
26+
27+
/**
28+
* Load a string dictionary for a single column from a stripe.
29+
* This function reads the LENGTH and DICTIONARY_DATA streams and populates
30+
* the StringDictionary structure. It automatically uses ReadCache if available
31+
* through the StripeStreams interface.
32+
*
33+
* @param columnId the column ID to load the dictionary for
34+
* @param stripe the StripeStreams interface providing access to streams
35+
* @param pool the memory pool to use for allocating the dictionary
36+
* @return a shared pointer to the loaded StringDictionary, or nullptr if loading fails
37+
*/
38+
std::shared_ptr<StringDictionary> loadStringDictionary(uint64_t columnId, StripeStreams& stripe,
39+
MemoryPool& pool);
40+
41+
// Helper function to convert encoding kind to RLE version
42+
inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
43+
switch (static_cast<int64_t>(kind)) {
44+
case proto::ColumnEncoding_Kind_DIRECT:
45+
case proto::ColumnEncoding_Kind_DICTIONARY:
46+
return RleVersion_1;
47+
case proto::ColumnEncoding_Kind_DIRECT_V2:
48+
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
49+
return RleVersion_2;
50+
default:
51+
throw ParseError("Unknown encoding in convertRleVersion");
52+
}
53+
}
54+
55+
} // namespace orc
56+
57+
#endif

c++/src/Options.hh

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include "io/Cache.hh"
2727

28+
#include <cstdint>
2829
#include <iostream>
2930
#include <limits>
3031

@@ -156,6 +157,7 @@ namespace orc {
156157
bool throwOnSchemaEvolutionOverflow;
157158
bool enableAsyncPrefetch;
158159
uint64_t smallStripeLookAheadLimit;
160+
uint32_t dictionaryFilteringSizeThreshold;
159161

160162
RowReaderOptionsPrivate() {
161163
selection = ColumnSelection_NONE;
@@ -169,6 +171,7 @@ namespace orc {
169171
throwOnSchemaEvolutionOverflow = false;
170172
enableAsyncPrefetch = false;
171173
smallStripeLookAheadLimit = 8;
174+
dictionaryFilteringSizeThreshold = 0;
172175
}
173176
};
174177

@@ -362,6 +365,15 @@ namespace orc {
362365
return privateBits_->smallStripeLookAheadLimit;
363366
}
364367

368+
RowReaderOptions& RowReaderOptions::setDictionaryFilteringSizeThreshold(uint32_t threshold) {
369+
privateBits_->dictionaryFilteringSizeThreshold = threshold;
370+
return *this;
371+
}
372+
373+
uint32_t RowReaderOptions::getDictionaryFilteringSizeThreshold() const {
374+
return privateBits_->dictionaryFilteringSizeThreshold;
375+
}
376+
365377
} // namespace orc
366378

367379
#endif

0 commit comments

Comments
 (0)