Skip to content

Introducing PartitionedVector#1596

Open
yingsu00 wants to merge 1 commit intoIBM:oss-mainfrom
yingsu00:PartitionedOutput1.0
Open

Introducing PartitionedVector#1596
yingsu00 wants to merge 1 commit intoIBM:oss-mainfrom
yingsu00:PartitionedOutput1.0

Conversation

@yingsu00
Copy link
Collaborator

This commit is the first PR for optimized PartitionedOutput. It introduces the PartitionedVector, in which the values are partitioned according to a given partitionId list. It uses in place swapping algorithm and has very high throughput. It can also be used in aggregation, sorting, etc.

@yingsu00 yingsu00 requested a review from xin-zhang2 January 14, 2026 08:06
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use a new buffer for topRowOffsetsForNextLevelBuffer, as topRowOffsetsForCurrentLevel and topRowOffsetsForNextLevelBuffer need to be separate buffers if we're testing array vectors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 Thanks for your review. I removed the array and dictionary implementation from this PR and did some cleaning. We will revisit the array part next.

Copy link
Member

@xin-zhang2 xin-zhang2 Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to calculate lastTopRowOffset before partitionedArrayVector->partition, because arraySizes will be modified in the partioning, and arrayOffsets[numRows - 1] + arraySizes[numRows - 1] would no longer represent the correct lastTopRowOffset.

Copy link
Member

@xin-zhang2 xin-zhang2 Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionOffsetsForNextLevelBuffer is used as endPartitionOffsetsBuffer, so partitionOffsetsForNextLevelBuffer and beginPartitionOffsetsBuffer might be in the wrong order here.

Copy link
Member

@xin-zhang2 xin-zhang2 Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the condition should be topRow < numTopRows as we need to calucate the offset for the last row as well.

Copy link
Member

@xin-zhang2 xin-zhang2 Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, but it seems numRows should be use as lastTopRowOffset here, as we're partitioning the arraySizesBuffer.

@yingsu00 yingsu00 force-pushed the PartitionedOutput1.0 branch 2 times, most recently from 1489aae to bf73a19 Compare February 13, 2026 05:38
This commit introduces `PartitionedVector` - a low-level execution
abstraction that provides an in-place, partition-aware layout of a
vector based on per-row partition IDs.

1. **In-place rearrangement**: Rearrange vector data in memory without
   creating multiple copies
2. **Buffer reuse**: Allow reuse of temporary buffers across multiple
   partitioning operations
3. **Minimal abstraction**: Similar to `DecodedVector`, focus on
   efficient execution rather than operator semantics
4. **Thread-unsafe by design**: Optimized for single-threaded execution
   contexts

For more information please see IBM#1703
@yingsu00 yingsu00 force-pushed the PartitionedOutput1.0 branch from bf73a19 to aded22e Compare February 13, 2026 05:42
@yingsu00 yingsu00 marked this pull request as ready for review February 13, 2026 05:47
@yingsu00 yingsu00 removed the request for review from majetideepak February 13, 2026 05:47
Copy link
Member

@czentgr czentgr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. I suppose we need to see the subsequent changes to move them to the output buffer in the PartitionedOutput operator. Is this code available somewhere too?

BufferPtr beginPartitionOffsets;

/// Optional reusable buffer for in-place row swapping.
BufferPtr swappingBuffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs initialization. Given it is optional I expect that this member won't always be set and you'd run into compiler errors on newer compilers.

const std::vector<uint32_t>& partitions,
vector_size_t*& counts) {
for (auto i = 0; i < partitions.size(); i++) {
counts[partitions[i]]++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numPartitions is vector_size_t but we have a vector with uint32_t values. Could we overflow here if we have a large number of partitions (unlikely but code won't know).

void initializeBeginPartitionOffsets(
BufferPtr& beginPartitionOffsets,
const BufferPtr& endPartitionOffsets,
int32_t numPartitions,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector_size_t?
In the test I also see unit32_t being used for numPartitions. But isn't that always the same and as such should have the same type?

The PartitionedVector uses numPartitions uint32_t as storage but sets it from a const uint32_t. Why not use vector_size_t?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector_size_t?

Sure.

The PartitionedVector uses numPartitions uint32_t as storage but sets it from a const uint32_t. Why not use vector_size_t?
In my understanding, this is a history paradox. The PartitionFunction interface defines the partitions in uint32_t

  virtual std::optional<uint32_t> partition(
      const RowVector& input,
      std::vector<uint32_t>& partitions) = 0;

But Velox is trying to use vector_size_t as size units and vector index everywhere. vector_size_t is defined as int32_t today. But I've seen places it overflows and may need to be int64_t instead. I think the right way is to unify the partition representation with Vector index, and both shall use vector_size_t. vector_size_t can be easily expanded or overriden in the future by defining using vector_size_t = int64_t; . But for now, I changed all numPartitions to vector_size_t. Another option is to make them all uint32_t. @czentgr Which one do you think makes more sense?

// TODO: This was copied from dwio::common::BufferUtil.h. However the vector
// module should not depend on dwio. Move this to a common place
template <typename T>
void ensureCapacity(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should go into the cpp file and declared for use in the test.
Or as you suggest a new utility?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a template function and the definition needs to be in the header file. IMHO it's not worthwhile to separate the declaration and definition for this very simple function. My original thought was to extract it from dwio::common and move it to common, but it's better to be done in a separate PR or commit. And to avoid future rebase conflicts I left it in this .h file because this file is new. But I just moved it to VectorUtil.h for now.

BufferPtr swappingBuffer;

/// Optional starting row offset (used when partitioning a subset of rows).
vector_size_t firstRow{0};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose for future use? This and the other member? Currently they are not used at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they are for complex types. I can remove them now but that makes it's unjustifiable to have a PartitionBuildContext. Do you prefer removing them for now?

std::memcpy(
&beginPartitionOffsets->asMutable<vector_size_t>()[1],
endPartitionOffsets->as<vector_size_t>(),
sizeof(uint32_t) * (numPartitions - 1));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be sizeof(vector_size_t). In the next line it uses sizeof(vector_size_t). As mentioned before I don't know why we switch between the types. It could cause problems.

}

/// Allow move constructor and move assignment operator.
PartitionedVector(PartitionedVector&& other) = default;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code doesn‘t compile as the default move constructor is implicitly deleted because of const field numPartitions_ .

PartitionBuildContext& ctx,
velox::memory::MemoryPool* pool);

PartitionedVector(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this constructor as it is unnecessary and not used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments