Conversation
There was a problem hiding this comment.
It would be better to use a new buffer for topRowOffsetsForNextLevelBuffer, as topRowOffsetsForCurrentLevel and topRowOffsetsForNextLevelBuffer need to be separate buffers if we're testing array vectors.
There was a problem hiding this comment.
@xin-zhang2 Thanks for your review. I removed the array and dictionary implementation from this PR and did some cleaning. We will revisit the array part next.
velox/vector/PartitionedVector.cpp
Outdated
There was a problem hiding this comment.
We might need to calculate lastTopRowOffset before partitionedArrayVector->partition, because arraySizes will be modified in the partioning, and arrayOffsets[numRows - 1] + arraySizes[numRows - 1] would no longer represent the correct lastTopRowOffset.
velox/vector/PartitionedVector.cpp
Outdated
There was a problem hiding this comment.
partitionOffsetsForNextLevelBuffer is used as endPartitionOffsetsBuffer, so partitionOffsetsForNextLevelBuffer and beginPartitionOffsetsBuffer might be in the wrong order here.
velox/vector/PartitionedVector.cpp
Outdated
There was a problem hiding this comment.
I think the condition should be topRow < numTopRows as we need to calucate the offset for the last row as well.
velox/vector/PartitionedVector.cpp
Outdated
There was a problem hiding this comment.
Not entirely sure, but it seems numRows should be use as lastTopRowOffset here, as we're partitioning the arraySizesBuffer.
1489aae to
bf73a19
Compare
This commit introduces `PartitionedVector` - a low-level execution abstraction that provides an in-place, partition-aware layout of a vector based on per-row partition IDs. 1. **In-place rearrangement**: Rearrange vector data in memory without creating multiple copies 2. **Buffer reuse**: Allow reuse of temporary buffers across multiple partitioning operations 3. **Minimal abstraction**: Similar to `DecodedVector`, focus on efficient execution rather than operator semantics 4. **Thread-unsafe by design**: Optimized for single-threaded execution contexts For more information please see IBM#1703
bf73a19 to
aded22e
Compare
czentgr
left a comment
There was a problem hiding this comment.
Looks pretty good. I suppose we need to see the subsequent changes to move them to the output buffer in the PartitionedOutput operator. Is this code available somewhere too?
| BufferPtr beginPartitionOffsets; | ||
|
|
||
| /// Optional reusable buffer for in-place row swapping. | ||
| BufferPtr swappingBuffer; |
There was a problem hiding this comment.
This needs initialization. Given it is optional I expect that this member won't always be set and you'd run into compiler errors on newer compilers.
| const std::vector<uint32_t>& partitions, | ||
| vector_size_t*& counts) { | ||
| for (auto i = 0; i < partitions.size(); i++) { | ||
| counts[partitions[i]]++; |
There was a problem hiding this comment.
numPartitions is vector_size_t but we have a vector with uint32_t values. Could we overflow here if we have a large number of partitions (unlikely but code won't know).
| void initializeBeginPartitionOffsets( | ||
| BufferPtr& beginPartitionOffsets, | ||
| const BufferPtr& endPartitionOffsets, | ||
| int32_t numPartitions, |
There was a problem hiding this comment.
vector_size_t?
In the test I also see unit32_t being used for numPartitions. But isn't that always the same and as such should have the same type?
The PartitionedVector uses numPartitions uint32_t as storage but sets it from a const uint32_t. Why not use vector_size_t?
There was a problem hiding this comment.
vector_size_t?
Sure.
The PartitionedVector uses numPartitions uint32_t as storage but sets it from a const uint32_t. Why not use vector_size_t?
In my understanding, this is a history paradox. The PartitionFunction interface defines the partitions in uint32_t
virtual std::optional<uint32_t> partition(
const RowVector& input,
std::vector<uint32_t>& partitions) = 0;
But Velox is trying to use vector_size_t as size units and vector index everywhere. vector_size_t is defined as int32_t today. But I've seen places it overflows and may need to be int64_t instead. I think the right way is to unify the partition representation with Vector index, and both shall use vector_size_t. vector_size_t can be easily expanded or overriden in the future by defining using vector_size_t = int64_t; . But for now, I changed all numPartitions to vector_size_t. Another option is to make them all uint32_t. @czentgr Which one do you think makes more sense?
| // TODO: This was copied from dwio::common::BufferUtil.h. However the vector | ||
| // module should not depend on dwio. Move this to a common place | ||
| template <typename T> | ||
| void ensureCapacity( |
There was a problem hiding this comment.
Should go into the cpp file and declared for use in the test.
Or as you suggest a new utility?
There was a problem hiding this comment.
It is a template function and the definition needs to be in the header file. IMHO it's not worthwhile to separate the declaration and definition for this very simple function. My original thought was to extract it from dwio::common and move it to common, but it's better to be done in a separate PR or commit. And to avoid future rebase conflicts I left it in this .h file because this file is new. But I just moved it to VectorUtil.h for now.
| BufferPtr swappingBuffer; | ||
|
|
||
| /// Optional starting row offset (used when partitioning a subset of rows). | ||
| vector_size_t firstRow{0}; |
There was a problem hiding this comment.
I suppose for future use? This and the other member? Currently they are not used at all.
There was a problem hiding this comment.
Yes they are for complex types. I can remove them now but that makes it's unjustifiable to have a PartitionBuildContext. Do you prefer removing them for now?
| std::memcpy( | ||
| &beginPartitionOffsets->asMutable<vector_size_t>()[1], | ||
| endPartitionOffsets->as<vector_size_t>(), | ||
| sizeof(uint32_t) * (numPartitions - 1)); |
There was a problem hiding this comment.
This should be sizeof(vector_size_t). In the next line it uses sizeof(vector_size_t). As mentioned before I don't know why we switch between the types. It could cause problems.
| } | ||
|
|
||
| /// Allow move constructor and move assignment operator. | ||
| PartitionedVector(PartitionedVector&& other) = default; |
There was a problem hiding this comment.
This code doesn‘t compile as the default move constructor is implicitly deleted because of const field numPartitions_ .
| PartitionBuildContext& ctx, | ||
| velox::memory::MemoryPool* pool); | ||
|
|
||
| PartitionedVector( |
There was a problem hiding this comment.
We can remove this constructor as it is unnecessary and not used.
This commit is the first PR for optimized PartitionedOutput. It introduces the PartitionedVector, in which the values are partitioned according to a given partitionId list. It uses in place swapping algorithm and has very high throughput. It can also be used in aggregation, sorting, etc.