Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions velox/common/memory/AllocationPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,60 @@ char* AllocationPool::allocateFixed(uint64_t bytes, int32_t alignment) {
return result;
}

char* AllocationPool::allocateFixedTest(
int64_t bytes,
int32_t alignment,
uint64_t pageSize,
int32_t minPages,
int64_t hugePageThreshold,
int32_t hugePageNums,
bool enableHugePage) {
VELOX_CHECK_GT(bytes, 0, "Cannot allocate zero bytes");
if (freeAddressableBytes() >= bytes && alignment == 1) {
auto* result = startOfRun_ + currentOffset_;
maybeGrowLastAllocation(bytes);
return result;
}
VELOX_CHECK_EQ(
__builtin_popcount(alignment), 1, "Alignment can only be power of 2");

VELOX_CHECK(
__builtin_popcount(pageSize) == 1 &&
pageSize >= AllocationTraits::kPageSize &&
pageSize <= AllocationTraits::kHugePageSize,
"Invalid page size: {}. Page size must be 2^n bytes between 4KB and 2MB",
pageSize);

auto numPages = bits::roundUp(bytes + alignment - 1, pageSize) / pageSize;

if (freeAddressableBytes() == 0) {
newRunImplWithPageSize(
numPages,
pageSize,
minPages,
hugePageThreshold,
hugePageNums,
enableHugePage);
} else {
auto alignedBytes = bytes + alignmentPadding(firstFreeInRun(), alignment);
if (freeAddressableBytes() < alignedBytes) {
newRunImplWithPageSize(
numPages,
pageSize,
minPages,
hugePageThreshold,
hugePageNums,
enableHugePage);
}
}
currentOffset_ += alignmentPadding(firstFreeInRun(), alignment);
VELOX_CHECK_LE(bytes + currentOffset_, bytesInRun_);
auto* result = startOfRun_ + currentOffset_;
VELOX_CHECK_EQ(reinterpret_cast<uintptr_t>(result) % alignment, 0);
maybeGrowLastAllocation(bytes);
return result;
}

void AllocationPool::maybeGrowLastAllocation(uint64_t bytesRequested) {
const auto updateOffset = currentOffset_ + bytesRequested;
if (updateOffset > endOfReservedRun()) {
Expand Down Expand Up @@ -140,6 +194,65 @@ void AllocationPool::newRunImpl(MachinePageCount numPages) {
usedBytes_ += bytesInRun_;
}

void AllocationPool::newRunImplWithPageSize(
memory::MachinePageCount numPages,
const uint64_t pageSize,
int32_t minPages,
int64_t hugePageThreshold,
int32_t hugePageNum,
bool enableHugePage) {
if (enableHugePage &&
(usedBytes_ >= hugePageThreshold ||
numPages > pool_->sizeClasses().back())) {
// At least 16 huge pages, no more than kMaxMmapBytes. The next is
// double the previous. Because the previous is a hair under the
// power of two because of fractional pages at ends of allocation,
// add an extra huge page size.
int64_t nextSize = std::min(
kMaxMmapBytes,
std::max<int64_t>(
hugePageNum * AllocationTraits::kHugePageSize,
bits::nextPowerOfTwo(
usedBytes_ + AllocationTraits::kHugePageSize)));
// Round 'numPages' to no of pages in huge page. Allocating this plus an
// extra huge page guarantees that 'numPages' worth of contiguous aligned
// huge pages will be found in the allocation.
numPages =
bits::roundUp(numPages * pageSize, AllocationTraits::kHugePageSize) /
pageSize;
if (numPages * pageSize + AllocationTraits::kHugePageSize > nextSize) {
// Extra large single request.
nextSize = numPages * pageSize + AllocationTraits::kHugePageSize;
}

ContiguousAllocation largeAlloc;
const MachinePageCount pagesToAlloc =
AllocationTraits::numPagesInHugePage();
pool_->allocateContiguous(
pagesToAlloc, largeAlloc, AllocationTraits::numPages(nextSize));

auto range = largeAlloc.hugePageRange().value();
startOfRun_ = range.data();
bytesInRun_ = range.size();
largeAllocations_.emplace_back(std::move(largeAlloc));
currentOffset_ = 0;
usedBytes_ += AllocationTraits::pageBytes(pagesToAlloc);
return;
}

Allocation allocation;
const auto roundedPages = std::max<int32_t>(minPages, numPages);
const auto standardPages =
(roundedPages * pageSize) / AllocationTraits::kPageSize;
pool_->allocateNonContiguous(standardPages, allocation, standardPages);
VELOX_CHECK_EQ(allocation.numRuns(), 1);
startOfRun_ = allocation.runAt(0).data<char>();
bytesInRun_ = allocation.runAt(0).numBytes();
currentOffset_ = 0;
allocations_.push_back(std::move(allocation));
usedBytes_ += bytesInRun_;
}

void AllocationPool::newRun(int64_t preferredSize) {
newRunImpl(AllocationTraits::numPages(preferredSize));
}
Expand Down
17 changes: 17 additions & 0 deletions velox/common/memory/AllocationPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ class AllocationPool {
// only be power of 2.
char* allocateFixed(uint64_t bytes, int32_t alignment = 1);

char* allocateFixedTest(
int64_t bytes,
int32_t alignment,
uint64_t pageSize,
int32_t minPages,
int64_t hugePageThreshold,
int32_t hugePageNums,
bool enableHugePage);

// Starts a new run for variable length allocation. The actual size
// is at least one machine page. Throws std::bad_alloc if no space.
void newRun(int64_t preferredSize);
Expand Down Expand Up @@ -137,6 +146,14 @@ class AllocationPool {

void newRunImpl(memory::MachinePageCount numPages);

void newRunImplWithPageSize(
memory::MachinePageCount numPages,
uint64_t pageSize,
int32_t minPages,
int64_t hugePageThreshold,
int32_t hugePageNum,
bool enableHugePage);

memory::MemoryPool* pool_;
std::vector<memory::Allocation> allocations_;
std::vector<memory::ContiguousAllocation> largeAllocations_;
Expand Down
39 changes: 39 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,25 @@ class QueryConfig {
/// estimates.
static constexpr const char* kRowSizeTrackingMode = "row_size_tracking_mode";

/// The memory allocation unit size for HashTable operations.
/// It controls the page size used when allocating memory for hash table rows.
/// The value must be 2^n bytes between 4KB and 2MB.
/// The default value 4KB aligns with the standard machine page size used
/// throughout Velox's memory management.
/// This is an experimental property for performance tuning.
static constexpr const char* kHashTablePageSize = "hashtable_page_size";

static constexpr const char* kHashTableMinPages = "hashtable_min_pages";

static constexpr const char* kHashTableHugePageThreshold =
"hashtable_huge_page_threshold";

static constexpr const char* kHashTableHugePageNums =
"hashtable_huge_page_nums";

static constexpr const char* kHashTableEnableHugePage =
"hashtable_enable_huge_page";

enum class RowSizeTrackingMode {
DISABLED = 0,
EXCLUDE_DELTA_SPLITS = 1,
Expand Down Expand Up @@ -1325,6 +1344,26 @@ class QueryConfig {
return get<int32_t>(kMaxNumSplitsListenedTo, 0);
}

uint64_t hashTablePageSize() const {
return get<uint64_t>(kHashTablePageSize, 4096);
}

int32_t hashTableMinPages() const {
return get<int32_t>(kHashTableMinPages, 16);
}

int32_t hashTableHugePageNums() const {
return get<int32_t>(kHashTableHugePageNums, 16);
}

int64_t hashTableHugePageThreshold() const {
return get<int64_t>(kHashTableHugePageThreshold, 256 << 10);
}

bool hashTableEnableHugePage() const {
return get<bool>(kHashTableEnableHugePage, true);
}

std::string source() const {
return get<std::string>(kSource, "");
}
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,14 @@ void HashBuild::addInput(RowVectorPtr input) {
input->childAt(spillProbedFlagChannel_)->asFlatVector<bool>();
}

const auto& queryConfig = operatorCtx()->driverCtx()->queryConfig();
activeRows_.applyToSelected([&](auto rowIndex) {
char* newRow = rows->newRow();
char* newRow = rows->newRowTest(
queryConfig.hashTablePageSize(),
queryConfig.hashTableMinPages(),
queryConfig.hashTableHugePageThreshold(),
queryConfig.hashTableHugePageNums(),
queryConfig.hashTableEnableHugePage());
if (nextOffset) {
*reinterpret_cast<char**>(newRow + nextOffset) = nullptr;
}
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,13 @@ char* HashTable<ignoreNullKeys>::insertEntry(
HashLookup& lookup,
uint64_t index,
vector_size_t row) {
char* group = rows_->newRow();
const auto& queryConfig = driverThreadContext()->driverCtx()->queryConfig();
char* group = rows_->newRowTest(
queryConfig.hashTablePageSize(),
queryConfig.hashTableMinPages(),
queryConfig.hashTableHugePageThreshold(),
queryConfig.hashTableHugePageNums(),
queryConfig.hashTableEnableHugePage());
lookup.hits[row] = group; // NOLINT
storeKeys(lookup, row);
storeRowPointer(index, lookup.hashes[row], group);
Expand Down
31 changes: 31 additions & 0 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,37 @@ char* RowContainer::newRow() {
return initializeRow(row, false /* reuse */);
}

char* RowContainer::newRowTest(
uint64_t pageSize,
int32_t minPages,
int64_t hugePageThreshold,
int32_t hugePageNums,
bool enableHugePage) {
VELOX_DCHECK(mutable_, "Can't add row into an immutable row container");
++numRows_;
char* row;
if (firstFreeRow_) {
row = firstFreeRow_;
VELOX_CHECK(bits::isBitSet(row, freeFlagOffset_));
firstFreeRow_ = nextFree(row);
--numFreeRows_;
} else {
row = rows_.allocateFixedTest(
fixedRowSize_ + normalizedKeySize_,
alignment_,
pageSize,
minPages,
hugePageThreshold,
hugePageNums,
enableHugePage) +
normalizedKeySize_;
if (normalizedKeySize_) {
++numRowsWithNormalizedKey_;
}
}
return initializeRow(row, false /* reuse */);
}

void RowContainer::setAllNull(char* row) {
VELOX_CHECK(!bits::isBitSet(row, freeFlagOffset_));
removeOrUpdateRowColumnStats(row, /*setToNull=*/true);
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ class RowContainer {
/// Allocates a new row and initializes possible aggregates to null.
char* newRow();

char* newRowTest(
uint64_t pageSize,
int32_t minPages = 16,
int64_t hugePageThreshold = 256 * 1024,
int32_t hugePageNums = 16,
bool enabHugePage = true);

uint32_t rowSize(const char* row) const {
return fixedRowSize_ +
(rowSizeOffset_
Expand Down