Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -651,4 +651,13 @@ public TableScanParams getScanParams() {
}
return this.scanParams;
}

protected long applyMaxFileSplitNumLimit(long targetSplitSize, long totalFileSize) {
int maxFileSplitNum = sessionVariable.getMaxFileSplitNum();
if (maxFileSplitNum <= 0 || totalFileSize <= 0) {
return targetSplitSize;
}
long minSplitSizeForMaxNum = (totalFileSize + maxFileSplitNum - 1L) / (long) maxFileSplitNum;
return Math.max(targetSplitSize, minSplitSizeForMaxNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,18 +358,22 @@ private long determineTargetFileSplitSize(List<FileCacheValue> fileCaches,
}
long result = sessionVariable.getMaxInitialSplitSize();
long totalFileSize = 0;
boolean exceedInitialThreshold = false;
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() == null) {
continue;
}
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
totalFileSize += status.getLength();
if (totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
result = sessionVariable.getMaxSplitSize();
break;
if (!exceedInitialThreshold
&& totalFileSize >= sessionVariable.getMaxSplitSize()
* sessionVariable.getMaxInitialSplitNum()) {
exceedInitialThreshold = true;
}
}
}
result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result;
result = applyMaxFileSplitNumLimit(result, totalFileSize);
return result;
}

Expand Down Expand Up @@ -635,4 +639,3 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -476,14 +476,18 @@ private CloseableIterable<FileScanTask> splitFiles(TableScan scan) {
private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) {
long result = sessionVariable.getMaxInitialSplitSize();
long accumulatedTotalFileSize = 0;
boolean exceedInitialThreshold = false;
for (FileScanTask task : tasks) {
accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file());
if (accumulatedTotalFileSize
if (!exceedInitialThreshold && accumulatedTotalFileSize
>= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
result = sessionVariable.getMaxSplitSize();
break;
exceedInitialThreshold = true;
}
}
result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result;
if (!isBatchMode()) {
result = applyMaxFileSplitNumLimit(result, accumulatedTotalFileSize);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,20 +436,22 @@ private long determineTargetFileSplitSize(List<DataSplit> dataSplits,
}
long result = sessionVariable.getMaxInitialSplitSize();
long totalFileSize = 0;
boolean exceedInitialThreshold = false;
for (DataSplit dataSplit : dataSplits) {
Optional<List<RawFile>> rawFiles = dataSplit.convertToRawFiles();
if (!supportNativeReader(rawFiles)) {
continue;
}
for (RawFile rawFile : rawFiles.get()) {
totalFileSize += rawFile.fileSize();
if (totalFileSize
if (!exceedInitialThreshold && totalFileSize
>= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
result = sessionVariable.getMaxSplitSize();
break;
exceedInitialThreshold = true;
}
}
}
result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result;
result = applyMaxFileSplitNumLimit(result, totalFileSize);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,16 @@ private long determineTargetFileSplitSize(List<TBrokerFileStatus> fileStatuses)
}
long result = sessionVariable.getMaxInitialSplitSize();
long totalFileSize = 0;
boolean exceedInitialThreshold = false;
for (TBrokerFileStatus fileStatus : fileStatuses) {
totalFileSize += fileStatus.getSize();
if (totalFileSize
>= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
result = sessionVariable.getMaxSplitSize();
break;
if (!exceedInitialThreshold
&& totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
exceedInitialThreshold = true;
}
}
result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result;
result = applyMaxFileSplitNumLimit(result, totalFileSize);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String MAX_INITIAL_FILE_SPLIT_NUM = "max_initial_file_split_num";

public static final String MAX_FILE_SPLIT_NUM = "max_file_split_num";

// Target file size in bytes for Iceberg write operations
public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES = "iceberg_write_target_file_size_bytes";

Expand Down Expand Up @@ -2225,6 +2227,13 @@ public boolean isEnableHboNonStrictMatchingMode() {
needForward = true)
public int maxInitialSplitNum = 200;

@VariableMgr.VarAttr(
name = MAX_FILE_SPLIT_NUM,
description = {"在非 batch 模式下,每个 table scan 最大允许的 split 数量,防止产生过多 split 导致 OOM。",
"In non-batch mode, the maximum number of splits allowed per table scan to avoid OOM."},
needForward = true)
public int maxFileSplitNum = 100000;

// Target file size for Iceberg write operations
// Default 0 means use config::iceberg_sink_max_file_size
@VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES, needForward = true)
Expand Down Expand Up @@ -4308,6 +4317,14 @@ public void setMaxInitialSplitNum(int maxInitialSplitNum) {
this.maxInitialSplitNum = maxInitialSplitNum;
}

public int getMaxFileSplitNum() {
return maxFileSplitNum;
}

public void setMaxFileSplitNum(int maxFileSplitNum) {
this.maxFileSplitNum = maxFileSplitNum;
}

public long getIcebergWriteTargetFileSizeBytes() {
return icebergWriteTargetFileSizeBytes;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;

import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class FileQueryScanNodeTest {
private static final long MB = 1024L * 1024L;

private static class TestFileQueryScanNode extends FileQueryScanNode {
TestFileQueryScanNode(SessionVariable sv) {
super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), "test",
StatisticalType.TEST_EXTERNAL_TABLE, false, sv);
}

@Override
protected TFileFormatType getFileFormatType() throws UserException {
return TFileFormatType.FORMAT_ORC;
}

@Override
protected List<String> getPathPartitionKeys() throws UserException {
return Collections.emptyList();
}

@Override
protected TableIf getTargetTable() throws UserException {
return null;
}

@Override
protected Map<String, String> getLocationProperties() throws UserException {
return Collections.emptyMap();
}
}

@Test
public void testApplyMaxFileSplitNumLimitRaisesTargetSize() {
SessionVariable sv = new SessionVariable();
sv.setMaxFileSplitNum(100);
TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB);
Assert.assertEquals(100 * MB, target);
}

@Test
public void testApplyMaxFileSplitNumLimitKeepsTargetSizeWhenSmall() {
SessionVariable sv = new SessionVariable();
sv.setMaxFileSplitNum(100);
TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
long target = node.applyMaxFileSplitNumLimit(32 * MB, 500L * MB);
Assert.assertEquals(32 * MB, target);
}

@Test
public void testApplyMaxFileSplitNumLimitDisabled() {
SessionVariable sv = new SessionVariable();
sv.setMaxFileSplitNum(0);
TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB);
Assert.assertEquals(32 * MB, target);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive.source;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;

import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;

public class HiveScanNodeTest {
private static final long MB = 1024L * 1024L;

@Test
public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws Exception {
SessionVariable sv = new SessionVariable();
sv.setMaxFileSplitNum(100);
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
HMSExternalTable table = Mockito.mock(HMSExternalTable.class);
HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class);
Mockito.when(table.getCatalog()).thenReturn(catalog);
Mockito.when(catalog.bindBrokerName()).thenReturn("");
desc.setTable(table);
HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, sv, null);

HiveMetaStoreCache.FileCacheValue fileCacheValue = new HiveMetaStoreCache.FileCacheValue();
HiveMetaStoreCache.HiveFileStatus status = new HiveMetaStoreCache.HiveFileStatus();
status.setLength(10_000L * MB);
fileCacheValue.getFiles().add(status);
List<HiveMetaStoreCache.FileCacheValue> caches = Collections.singletonList(fileCacheValue);

Method method = HiveScanNode.class.getDeclaredMethod(
"determineTargetFileSplitSize", List.class, boolean.class);
method.setAccessible(true);
long target = (long) method.invoke(node, caches, false);
Assert.assertEquals(100 * MB, target);
}

@Test
public void testDetermineTargetFileSplitSizeKeepsInitialSize() throws Exception {
SessionVariable sv = new SessionVariable();
sv.setMaxFileSplitNum(100);
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
HMSExternalTable table = Mockito.mock(HMSExternalTable.class);
HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class);
Mockito.when(table.getCatalog()).thenReturn(catalog);
Mockito.when(catalog.bindBrokerName()).thenReturn("");
desc.setTable(table);
HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, sv, null);

HiveMetaStoreCache.FileCacheValue fileCacheValue = new HiveMetaStoreCache.FileCacheValue();
HiveMetaStoreCache.HiveFileStatus status = new HiveMetaStoreCache.HiveFileStatus();
status.setLength(500L * MB);
fileCacheValue.getFiles().add(status);
List<HiveMetaStoreCache.FileCacheValue> caches = Collections.singletonList(fileCacheValue);

Method method = HiveScanNode.class.getDeclaredMethod(
"determineTargetFileSplitSize", List.class, boolean.class);
method.setAccessible(true);
long target = (long) method.invoke(node, caches, false);
Assert.assertEquals(32 * MB, target);
}
}
Loading