Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ private MemoryLogRecords genRecordsWithProjection(
DATA2_TABLE_ID,
testingSchemaGetter,
DEFAULT_COMPRESSION,
projection.getProjectionInOrder());
projection.getProjectionInOrder(),
false);
ByteBuffer buffer =
toByteBuffer(
fileLogProjection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Field;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Schema;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ArrowUtils;
import org.apache.fluss.utils.types.Tuple2;
Expand Down Expand Up @@ -99,7 +100,9 @@ public class FileLogProjection {
private SchemaGetter schemaGetter;
private long tableId;
private ArrowCompressionInfo compressionInfo;
private int[] selectedFieldPositions;
private int[] selectedFields;
// whether the selected fields are specified by field ids or field positions.
private boolean isSelectedByIds;

public FileLogProjection(ProjectionPushdownCache projectionsCache) {
this.projectionsCache = projectionsCache;
Expand All @@ -115,11 +118,13 @@ public void setCurrentProjection(
long tableId,
SchemaGetter schemaGetter,
ArrowCompressionInfo compressionInfo,
int[] selectedFieldPositions) {
int[] selectedFields,
boolean isSelectedByIds) {
this.tableId = tableId;
this.schemaGetter = schemaGetter;
this.compressionInfo = compressionInfo;
this.selectedFieldPositions = selectedFieldPositions;
this.selectedFields = selectedFields;
this.isSelectedByIds = isSelectedByIds;
}

/**
Expand Down Expand Up @@ -317,16 +322,46 @@ private void resizeArrowMetadataBuffer(int metadataSize) {
}

/** Flatten fields by a pre-order depth-first traversal of the fields in the schema. */
private void flattenFields(
private static void flattenFieldsByPosition(
List<Field> arrowFields,
BitSet selectedFields,
BitSet selectedFieldPositions,
List<Tuple2<Field, Boolean>> flattenedFields) {
for (int i = 0; i < arrowFields.size(); i++) {
Field field = arrowFields.get(i);
boolean selected = selectedFields.get(i);
boolean selected = selectedFieldPositions.get(i);
flattenedFields.add(Tuple2.of(field, selected));
List<Field> children = field.getChildren();
flattenFields(children, fillBitSet(children.size(), selected), flattenedFields);
flattenFieldsByPosition(
children, fillBitSet(children.size(), selected), flattenedFields);
}
}

private static void flattenFieldsById(
List<Field> arrowFields,
RowType rowType,
BitSet selectedFieldIds,
List<Tuple2<Field, Boolean>> flattenedFields) {
List<DataField> fields = rowType.getFields();
for (int i = 0; i < fields.size(); i++) {
DataField flussField = fields.get(i);
Field arrowField = arrowFields.get(i);
int fieldId = flussField.getFieldId();
boolean selected = selectedFieldIds.get(fieldId);
flattenedFields.add(Tuple2.of(arrowField, selected));

List<Field> arrowChildren = arrowField.getChildren();
// if the field is selected, all the children are selected.
if (selected) {
flattenFieldsByPosition(
arrowChildren, fillBitSet(arrowChildren.size(), true), flattenedFields);
} else if (flussField.getType() instanceof RowType) {
// if the field is not selected, search for the children in the flattened fields
flattenFieldsById(
arrowChildren,
(RowType) flussField.getType(),
selectedFieldIds,
flattenedFields);
}
}
}

Expand Down Expand Up @@ -403,24 +438,32 @@ ByteBuffer getLogHeaderBuffer() {

private ProjectionInfo getOrCreateProjectionInfo(short schemaId) {
ProjectionInfo cachedProjection =
projectionsCache.getProjectionInfo(tableId, schemaId, selectedFieldPositions);
projectionsCache.getProjectionInfo(
tableId, schemaId, selectedFields, isSelectedByIds);
if (cachedProjection == null) {
cachedProjection = createProjectionInfo(schemaId, selectedFieldPositions);
cachedProjection = createProjectionInfo(schemaId, selectedFields, isSelectedByIds);
projectionsCache.setProjectionInfo(
tableId, schemaId, selectedFieldPositions, cachedProjection);
tableId, schemaId, selectedFields, isSelectedByIds, cachedProjection);
}
return cachedProjection;
}

private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldPositions) {
private ProjectionInfo createProjectionInfo(
short schemaId, int[] selectedFields, boolean isSelectedByIds) {
org.apache.fluss.metadata.Schema schema = schemaGetter.getSchema(schemaId);
RowType rowType = schema.getRowType();

// initialize the projection util information
Schema arrowSchema = ArrowUtils.toArrowSchema(rowType);
BitSet selection = toBitSet(arrowSchema.getFields().size(), selectedFieldPositions);
List<Tuple2<Field, Boolean>> flattenedFields = new ArrayList<>();
flattenFields(arrowSchema.getFields(), selection, flattenedFields);
List<Tuple2<Field, Boolean>> flattenedFields =
isSelectedByIds
? getTargetFlattenedFieldsByIds(
arrowSchema.getFields(),
rowType,
schema.getHighestFieldId(),
selectedFields)
: getTargetFlattenedFieldsByPositions(
arrowSchema.getFields(), selectedFields);
int totalFieldNodes = flattenedFields.size();
int[] bufferLayoutCount = new int[totalFieldNodes];
BitSet nodesProjection = new BitSet(totalFieldNodes);
Expand All @@ -442,18 +485,32 @@ private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldP
}

Schema projectedArrowSchema =
ArrowUtils.toArrowSchema(rowType.project(selectedFieldPositions));
ArrowUtils.toArrowSchema(
isSelectedByIds
? projectByIds(rowType, schema.getHighestFieldId(), selectedFields)
: rowType.project(selectedFields));
ArrowBodyCompression bodyCompression =
CompressionUtil.createBodyCompression(compressionInfo.createCompressionCodec());
int metadataLength =
ArrowUtils.estimateArrowMetadataLength(projectedArrowSchema, bodyCompression);
return new ProjectionInfo(
nodesProjection,
buffersProjection,
bufferIndex,
metadataLength,
bodyCompression,
selectedFieldPositions);
nodesProjection, buffersProjection, bufferIndex, metadataLength, bodyCompression);
}

private static List<Tuple2<Field, Boolean>> getTargetFlattenedFieldsByPositions(
List<Field> arrowFields, int[] selectedFieldPositions) {
BitSet selection = toBitSet(arrowFields.size(), selectedFieldPositions);
List<Tuple2<Field, Boolean>> flattenedFields = new ArrayList<>();
flattenFieldsByPosition(arrowFields, selection, flattenedFields);
return flattenedFields;
}

private static List<Tuple2<Field, Boolean>> getTargetFlattenedFieldsByIds(
List<Field> arrowFields, RowType rowType, int highFieldId, int[] selectedFieldIds) {
BitSet selection = toBitSet(highFieldId + 1, selectedFieldIds);
List<Tuple2<Field, Boolean>> flattenedFields = new ArrayList<>();
flattenFieldsById(arrowFields, rowType, selection, flattenedFields);
return flattenedFields;
}

/** Projection pushdown information for a specific schema and selected fields. */
Expand All @@ -463,21 +520,18 @@ public static final class ProjectionInfo {
final int bufferCount;
final int arrowMetadataLength;
final ArrowBodyCompression bodyCompression;
final int[] selectedFieldPositions;

private ProjectionInfo(
BitSet nodesProjection,
BitSet buffersProjection,
int bufferCount,
int arrowMetadataLength,
ArrowBodyCompression bodyCompression,
int[] selectedFieldPositions) {
ArrowBodyCompression bodyCompression) {
this.nodesProjection = nodesProjection;
this.buffersProjection = buffersProjection;
this.bufferCount = bufferCount;
this.arrowMetadataLength = arrowMetadataLength;
this.bodyCompression = bodyCompression;
this.selectedFieldPositions = selectedFieldPositions;
}
}

Expand Down Expand Up @@ -514,4 +568,37 @@ public long bodyLength() {
return bodyLength;
}
}

@VisibleForTesting
public static RowType projectByIds(RowType rowType, int highFieldId, int[] projectFieldIds) {
BitSet selection = toBitSet(highFieldId + 1, projectFieldIds);
List<DataField> projectedFields = new ArrayList<>();
projectByIds(rowType, selection, projectedFields);
if (projectFieldIds.length != projectedFields.size()) {
throw new IllegalArgumentException(
String.format(
"The number of projected fields (%d) does not match the number of "
+ "selected field IDs (%d). This usually indicates: "
+ "(1) Some field IDs do not exist in the schema, or "
+ "(2) Both a parent row field and its nested child fields are selected, "
+ "which causes duplication. Selected field IDs: %s",
projectedFields.size(),
projectFieldIds.length,
Arrays.toString(projectFieldIds)));
}
return new RowType(projectedFields);
}

private static void projectByIds(
RowType rowType, BitSet projectFieldIds, List<DataField> projectedFields) {
List<DataField> fields = rowType.getFields();
for (DataField dataField : fields) {
boolean selected = projectFieldIds.get(dataField.getFieldId());
if (selected) {
projectedFields.add(dataField);
} else if (dataField.getType() instanceof RowType) {
projectByIds((RowType) dataField.getType(), projectFieldIds, projectedFields);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,33 @@ public ProjectionPushdownCache() {

@Nullable
public ProjectionInfo getProjectionInfo(
long tableId, short schemaId, int[] selectedFieldPositions) {
ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedFieldPositions);
long tableId, short schemaId, int[] selectedColumns, boolean isSelectedByIds) {
ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumns, isSelectedByIds);
return projectionCache.getIfPresent(key);
}

public void setProjectionInfo(
long tableId, short schemaId, int[] selectedColumnIds, ProjectionInfo projectionInfo) {
ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumnIds);
long tableId,
short schemaId,
int[] selectedColumns,
boolean isSelectedByIds,
ProjectionInfo projectionInfo) {
ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumns, isSelectedByIds);
projectionCache.put(key, projectionInfo);
}

static final class ProjectionKey {
private final long tableId;
private final short schemaId;
private final int[] selectedColumnIds;
private final int[] selectedColumns;
private final boolean isSelectedByIds;

ProjectionKey(long tableId, short schemaId, int[] selectedColumnIds) {
ProjectionKey(
long tableId, short schemaId, int[] selectedColumns, boolean isSelectedByIds) {
this.tableId = tableId;
this.schemaId = schemaId;
this.selectedColumnIds = selectedColumnIds;
this.selectedColumns = selectedColumns;
this.isSelectedByIds = isSelectedByIds;
}

@Override
Expand All @@ -80,12 +87,14 @@ public boolean equals(Object o) {
ProjectionKey that = (ProjectionKey) o;
return tableId == that.tableId
&& schemaId == that.schemaId
&& Arrays.equals(selectedColumnIds, that.selectedColumnIds);
&& Arrays.equals(selectedColumns, that.selectedColumns)
&& isSelectedByIds == that.isSelectedByIds;
}

@Override
public int hashCode() {
return Objects.hash(tableId, schemaId, Arrays.hashCode(selectedColumnIds));
return Objects.hash(
tableId, schemaId, Arrays.hashCode(selectedColumns), isSelectedByIds);
}
}
}
Loading