From 4155cf75b973e3eaaa93979fd4548c9fdaa3c1f4 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 18 Jan 2026 22:12:06 +0200 Subject: [PATCH 1/7] add snapshot query fucntionality --- .../fluss/client/table/scanner/TableScan.java | 12 +- .../table/scanner/batch/KvBatchScanner.java | 285 ++++++++++++++++++ .../apache/fluss/config/ConfigOptions.java | 16 + .../org/apache/fluss/config/TableConfig.java | 5 + .../exception/ScannerNotFoundException.java | 27 ++ .../rpc/gateway/TabletServerGateway.java | 22 ++ .../apache/fluss/rpc/protocol/ApiKeys.java | 38 +-- .../org/apache/fluss/rpc/protocol/Errors.java | 4 +- .../fluss/server/kv/rocksdb/RocksDBKv.java | 13 + .../fluss/server/kv/scan/ScannerContext.java | 101 +++++++ .../fluss/server/kv/scan/ScannerManager.java | 161 ++++++++++ .../fluss/server/tablet/TabletServer.java | 7 + .../fluss/server/tablet/TabletService.java | 149 +++++++++ 13 files changed, 820 insertions(+), 20 deletions(-) create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index 0d1d28a0e2..bcd5a2ec5b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.client.table.scanner.batch.KvBatchScanner; import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner; import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; @@ -123,10 +124,19 @@ public TypedLogScanner createTypedLogScanner(Class pojoClass) { @Override public BatchScanner createBatchScanner(TableBucket tableBucket) { + if (tableInfo.hasPrimaryKey()) { + return new KvBatchScanner( + tableInfo, + tableBucket, + schemaGetter, + conn.getMetadataUpdater(), + projectedColumns, + limit == null ? null : (long) limit); + } if (limit == null) { throw new UnsupportedOperationException( String.format( - "Currently, BatchScanner is only available when limit is set. Table: %s, bucket: %s", + "Currently, for log table, BatchScanner is only available when limit is set. Table: %s, bucket: %s", tableInfo.getTablePath(), tableBucket)); } return new LimitBatchScanner( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java new file mode 100644 index 0000000000..b84419576f --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.LeaderNotAvailableException; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordReadContext; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.SchemaUtil; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** A {@link BatchScanner} implementation that scans records from a primary key table. */ +public class KvBatchScanner implements BatchScanner { + private static final Logger LOG = LoggerFactory.getLogger(KvBatchScanner.class); + + private final TableInfo tableInfo; + private final TableBucket tableBucket; + private final SchemaGetter schemaGetter; + private final MetadataUpdater metadataUpdater; + @Nullable private final int[] projectedFields; + @Nullable private final Long limit; + private final int targetSchemaId; + private final InternalRow.FieldGetter[] fieldGetters; + private final KvFormat kvFormat; + private final int batchSizeBytes; + + private final Map schemaProjectionCache = new HashMap<>(); + + private byte[] scannerId; + private int callSeqId = 0; + private boolean hasMoreResults = true; + private boolean isClosed = false; + + private CompletableFuture inFlightRequest; + private ScheduledExecutorService keepAliveExecutor; + + public KvBatchScanner( + TableInfo tableInfo, + TableBucket tableBucket, + SchemaGetter schemaGetter, + MetadataUpdater metadataUpdater, + @Nullable int[] projectedFields, + @Nullable Long limit) { + this.tableInfo = tableInfo; + this.tableBucket = tableBucket; + this.schemaGetter = schemaGetter; + this.metadataUpdater = metadataUpdater; + this.projectedFields = projectedFields; + this.limit = limit; + this.targetSchemaId = tableInfo.getSchemaId(); + this.kvFormat = tableInfo.getTableConfig().getKvFormat(); + this.batchSizeBytes = + (int) + tableInfo + .getTableConfig() + .getConfiguration() + .get(ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES) + .getBytes(); + + RowType rowType = tableInfo.getRowType(); + this.fieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + this.fieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i); + } + } + + @Nullable + @Override + public CloseableIterator pollBatch(Duration timeout) throws IOException { + if (isClosed || (!hasMoreResults && inFlightRequest == null)) { + return null; + } + + try { + if (inFlightRequest == null) { + sendRequest(); + } + + ScanKvResponse response = + inFlightRequest.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + inFlightRequest = null; + + if (response.hasErrorCode() && response.getErrorCode() != Errors.NONE.code()) { + throw Errors.forCode(response.getErrorCode()).exception(response.getErrorMessage()); + } + + this.scannerId = response.getScannerId(); + this.hasMoreResults = response.isHasMoreResults(); + this.callSeqId++; + + List rows = parseScanKvResponse(response); + + // pipeline: send next request if there are more results + if (hasMoreResults) { + sendRequest(); + } + + return CloseableIterator.wrap(rows.iterator()); + } catch (java.util.concurrent.TimeoutException e) { + return CloseableIterator.emptyIterator(); + } catch (Exception e) { + throw new IOException(e); + } + } + + private void sendRequest() { + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway == null) { + throw new LeaderNotAvailableException( + "Server " + leader + " is not found in metadata cache."); + } + + ScanKvRequest request = new ScanKvRequest(); + request.setBatchSizeBytes(batchSizeBytes); + if (scannerId == null) { + PbScanReqForBucket bucketScanReq = request.setBucketScanReq(); + bucketScanReq.setTableId(tableBucket.getTableId()).setBucketId(tableBucket.getBucket()); + if (tableBucket.getPartitionId() != null) { + bucketScanReq.setPartitionId(tableBucket.getPartitionId()); + } + if (limit != null) { + bucketScanReq.setLimit(limit); + } + request.setCallSeqId(0); + } else { + request.setScannerId(scannerId).setCallSeqId(callSeqId); + } + + inFlightRequest = gateway.scanKv(request); + } + + private List parseScanKvResponse(ScanKvResponse response) { + if (!response.hasRecords()) { + return Collections.emptyList(); + } + + List scanRows = new ArrayList<>(); + ByteBuffer recordsBuffer = ByteBuffer.wrap(response.getRecords()); + DefaultValueRecordBatch valueRecords = + DefaultValueRecordBatch.pointToByteBuffer(recordsBuffer); + ValueRecordReadContext readContext = + ValueRecordReadContext.createReadContext(schemaGetter, kvFormat); + + for (ValueRecord record : valueRecords.records(readContext)) { + InternalRow row = record.getRow(); + if (targetSchemaId != record.schemaId()) { + int[] indexMapping = + schemaProjectionCache.computeIfAbsent( + record.schemaId(), + sourceSchemaId -> + SchemaUtil.getIndexMapping( + schemaGetter.getSchema(sourceSchemaId), + schemaGetter.getSchema(targetSchemaId))); + row = ProjectedRow.from(indexMapping).replaceRow(row); + } + scanRows.add(maybeProject(row)); + } + return scanRows; + } + + private InternalRow maybeProject(InternalRow originRow) { + GenericRow newRow = new GenericRow(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + newRow.setField(i, fieldGetters[i].getFieldOrNull(originRow)); + } + if (projectedFields != null) { + ProjectedRow projectedRow = ProjectedRow.from(projectedFields); + projectedRow.replaceRow(newRow); + return projectedRow; + } else { + return newRow; + } + } + + public void startKeepAlivePeriodically(int keepAliveIntervalMs) { + if (keepAliveExecutor != null) { + return; + } + + keepAliveExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("kv-scanner-keep-alive-" + tableBucket)); + keepAliveExecutor.scheduleAtFixedRate( + this::sendKeepAlive, + keepAliveIntervalMs, + keepAliveIntervalMs, + TimeUnit.MILLISECONDS); + } + + private void sendKeepAlive() { + if (isClosed || scannerId == null || !hasMoreResults) { + return; + } + + try { + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway != null) { + gateway.scannerKeepAlive(new ScannerKeepAliveRequest().setScannerId(scannerId)); + } + } catch (Exception e) { + LOG.warn("Failed to send keep alive for scanner {}", tableBucket, e); + } + } + + @Override + public void close() throws IOException { + if (isClosed) { + return; + } + isClosed = true; + + if (keepAliveExecutor != null) { + keepAliveExecutor.shutdownNow(); + } + + if (scannerId != null && hasMoreResults) { + // Close scanner on server + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway != null) { + gateway.scanKv( + new ScanKvRequest() + .setScannerId(scannerId) + .setCallSeqId(callSeqId) + .setCloseScanner(true)); + } + } + + if (inFlightRequest != null) { + inFlightRequest.cancel(true); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7e04c3a1cc..b429856d56 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -439,6 +439,14 @@ public class ConfigOptions { + WRITER_ID_EXPIRATION_TIME.key() + " passing. The default value is 10 minutes."); + public static final ConfigOption SERVER_SCANNER_TTL = + key("server.scanner.ttl") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The time that the tablet server will wait without receiving any scan request from " + + "a client before expiring the related status. The default value is 10 minutes."); + public static final ConfigOption TABLET_SERVER_CONTROLLED_SHUTDOWN_MAX_RETRIES = key("tablet-server.controlled-shutdown.max-retries") .intType() @@ -1093,6 +1101,14 @@ public class ConfigOptions { + "will still be returned to ensure that the fetch can make progress. As such, " + "this is not a absolute maximum."); + public static final ConfigOption CLIENT_SCANNER_KV_FETCH_MAX_BYTES = + key("client.scanner.kv.fetch.max-bytes") + .memoryType() + .defaultValue(MemorySize.parse("1mb")) + .withDescription( + "The maximum amount of data the server should return for a kv scan request. " + + "The default value is 1mb."); + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET = key("client.scanner.log.fetch.max-bytes-for-bucket") .memoryType() diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 984a1def4a..b972a80ff8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -50,6 +50,11 @@ public TableConfig(Configuration config) { this.config = config; } + /** Gets the table properties configuration. */ + public Configuration getConfiguration() { + return config; + } + /** Gets the replication factor of the table. */ public int getReplicationFactor() { return config.get(ConfigOptions.TABLE_REPLICATION_FACTOR); diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java new file mode 100644 index 0000000000..e146cb3b09 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Exception thrown when a scanner is not found. */ +public class ScannerNotFoundException extends ApiException { + private static final long serialVersionUID = 1L; + + public ScannerNotFoundException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java index 578b74e5e2..436637edd0 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java @@ -42,6 +42,10 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; @@ -130,6 +134,24 @@ CompletableFuture notifyLeaderAndIsr( @RPC(api = ApiKeys.LIMIT_SCAN) CompletableFuture limitScan(LimitScanRequest request); + /** + * Scan kv data from the specified table bucket. + * + * @param request the scan kv request + * @return the scan kv response + */ + @RPC(api = ApiKeys.SCAN_KV) + CompletableFuture scanKv(ScanKvRequest request); + + /** + * Keep alive for the specified scanner. + * + * @param request the scanner keep alive request + * @return the scanner keep alive response + */ + @RPC(api = ApiKeys.SCANNER_KEEP_ALIVE) + CompletableFuture scannerKeepAlive(ScannerKeepAliveRequest request); + /** * List offsets for the specified table bucket. * diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index cc033ba8a9..105b3e0a9d 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -63,24 +63,26 @@ public enum ApiKeys { GET_LATEST_LAKE_SNAPSHOT(1032, 0, 0, PUBLIC), LIMIT_SCAN(1033, 0, 0, PUBLIC), PREFIX_LOOKUP(1034, 0, 0, PUBLIC), - GET_DATABASE_INFO(1035, 0, 0, PUBLIC), - CREATE_PARTITION(1036, 0, 0, PUBLIC), - DROP_PARTITION(1037, 0, 0, PUBLIC), - AUTHENTICATE(1038, 0, 0, PUBLIC), - CREATE_ACLS(1039, 0, 0, PUBLIC), - LIST_ACLS(1040, 0, 0, PUBLIC), - DROP_ACLS(1041, 0, 0, PUBLIC), - LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE), - CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE), - ALTER_TABLE(1044, 0, 0, PUBLIC), - DESCRIBE_CLUSTER_CONFIGS(1045, 0, 0, PUBLIC), - ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC), - ADD_SERVER_TAG(1047, 0, 0, PUBLIC), - REMOVE_SERVER_TAG(1048, 0, 0, PUBLIC), - REBALANCE(1049, 0, 0, PUBLIC), - LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC), - CANCEL_REBALANCE(1051, 0, 0, PUBLIC), - PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE); + SCAN_KV(1035, 0, 0, PUBLIC), + SCANNER_KEEP_ALIVE(1036, 0, 0, PUBLIC), + GET_DATABASE_INFO(1037, 0, 0, PUBLIC), + CREATE_PARTITION(1038, 0, 0, PUBLIC), + DROP_PARTITION(1039, 0, 0, PUBLIC), + AUTHENTICATE(1040, 0, 0, PUBLIC), + CREATE_ACLS(1041, 0, 0, PUBLIC), + LIST_ACLS(1042, 0, 0, PUBLIC), + DROP_ACLS(1043, 0, 0, PUBLIC), + LAKE_TIERING_HEARTBEAT(1044, 0, 0, PRIVATE), + CONTROLLED_SHUTDOWN(1045, 0, 0, PRIVATE), + ALTER_TABLE(1046, 0, 0, PUBLIC), + DESCRIBE_CLUSTER_CONFIGS(1047, 0, 0, PUBLIC), + ALTER_CLUSTER_CONFIGS(1048, 0, 0, PUBLIC), + ADD_SERVER_TAG(1049, 0, 0, PUBLIC), + REMOVE_SERVER_TAG(1050, 0, 0, PUBLIC), + REBALANCE(1051, 0, 0, PUBLIC), + LIST_REBALANCE_PROGRESS(1052, 0, 0, PUBLIC), + CANCEL_REBALANCE(1053, 0, 0, PUBLIC), + PREPARE_LAKE_TABLE_SNAPSHOT(1054, 0, 0, PRIVATE); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 5ee652cce2..a6aa827cb6 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -64,6 +64,7 @@ import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.RecordTooLargeException; import org.apache.fluss.exception.RetriableAuthenticationException; +import org.apache.fluss.exception.ScannerNotFoundException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.SecurityTokenException; @@ -240,7 +241,8 @@ public enum Errors { SEVER_TAG_NOT_EXIST_EXCEPTION(60, "The server tag not exist.", ServerTagNotExistException::new), REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", RebalanceFailureException::new), NO_REBALANCE_IN_PROGRESS_EXCEPTION( - 62, "No rebalance task in progress.", NoRebalanceInProgressException::new); + 62, "No rebalance task in progress.", NoRebalanceInProgressException::new), + SCANNER_NOT_FOUND_EXCEPTION(63, "The scanner is not found.", ScannerNotFoundException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java index f3998f4435..d7ab654151 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java @@ -32,6 +32,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; import org.rocksdb.Statistics; import org.rocksdb.WriteOptions; @@ -151,6 +152,18 @@ public List limitScan(Integer limit) { return pkList; } + public Snapshot getSnapshot() { + return db.getSnapshot(); + } + + public void releaseSnapshot(Snapshot snapshot) { + db.releaseSnapshot(snapshot); + } + + public RocksIterator newIterator(ReadOptions readOptions) { + return db.newIterator(defaultColumnFamilyHandle, readOptions); + } + public void put(byte[] key, byte[] value) throws IOException { try { db.put(writeOptions, key, value); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java new file mode 100644 index 0000000000..e74097d1f6 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.utils.clock.Clock; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; + +import javax.annotation.concurrent.NotThreadSafe; + +/** The context for a scanner. */ +@NotThreadSafe +public class ScannerContext implements AutoCloseable { + private final RocksDBKv rocksDBKv; + private final RocksIterator iterator; + private final ReadOptions readOptions; + private final Snapshot snapshot; + private final long limit; + + private int callSeqId; + private long lastAccessTime; + private long rowsScanned; + + public ScannerContext( + RocksDBKv rocksDBKv, + RocksIterator iterator, + ReadOptions readOptions, + Snapshot snapshot, + long limit, + Clock clock) { + this.rocksDBKv = rocksDBKv; + this.iterator = iterator; + this.readOptions = readOptions; + this.snapshot = snapshot; + this.limit = limit; + this.callSeqId = 0; + this.lastAccessTime = clock.milliseconds(); + this.rowsScanned = 0; + } + + public RocksIterator getIterator() { + return iterator; + } + + public Snapshot getSnapshot() { + return snapshot; + } + + public long getLimit() { + return limit; + } + + public int getCallSeqId() { + return callSeqId; + } + + public void setCallSeqId(int callSeqId) { + this.callSeqId = callSeqId; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public void updateLastAccessTime(long lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + public long getRowsScanned() { + return rowsScanned; + } + + public void incrementRowsScanned(long count) { + this.rowsScanned += count; + } + + @Override + public void close() { + iterator.close(); + readOptions.close(); + rocksDBKv.releaseSnapshot(snapshot); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java new file mode 100644 index 0000000000..4d2b8f0e48 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.utils.AutoCloseableAsync; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; +import org.apache.fluss.utils.concurrent.FutureUtils; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** A manager for scanners. */ +public class ScannerManager implements AutoCloseableAsync { + private static final Logger LOG = LoggerFactory.getLogger(ScannerManager.class); + + private final Map scanners = new ConcurrentHashMap<>(); + private final ScheduledExecutorService cleanupExecutor; + private final Clock clock; + private final long scannerTtlMs; + + public ScannerManager(Configuration conf) { + this(conf, SystemClock.getInstance()); + } + + public ScannerManager(Configuration conf, Clock clock) { + this.clock = clock; + this.scannerTtlMs = conf.get(ConfigOptions.SERVER_SCANNER_TTL).toMillis(); + this.cleanupExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("scanner-cleanup-thread")); + this.cleanupExecutor.scheduleWithFixedDelay( + this::cleanupExpiredScanners, + scannerTtlMs / 2, + scannerTtlMs / 2, + TimeUnit.MILLISECONDS); + } + + public byte[] createScanner(KvTablet kvTablet, long limit) { + RocksDBKv rocksDBKv = kvTablet.getRocksDBKv(); + Snapshot snapshot = rocksDBKv.getSnapshot(); + ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot); + RocksIterator iterator = rocksDBKv.newIterator(readOptions); + iterator.seekToFirst(); + + ScannerContext context = + new ScannerContext(rocksDBKv, iterator, readOptions, snapshot, limit, clock); + byte[] scannerId = generateScannerId(); + scanners.put(ByteBuffer.wrap(scannerId), context); + return scannerId; + } + + public ScannerContext getScanner(byte[] scannerId) { + ScannerContext context = scanners.get(ByteBuffer.wrap(scannerId)); + if (context != null) { + context.updateLastAccessTime(clock.milliseconds()); + } + return context; + } + + public void keepAlive(byte[] scannerId) { + ScannerContext context = scanners.get(ByteBuffer.wrap(scannerId)); + if (context != null) { + context.updateLastAccessTime(clock.milliseconds()); + } else { + throw Errors.SCANNER_NOT_FOUND_EXCEPTION.exception( + "Scanner not found: " + scannerIdToString(scannerId)); + } + } + + public void removeScanner(byte[] scannerId) { + ScannerContext context = scanners.remove(ByteBuffer.wrap(scannerId)); + if (context != null) { + closeScannerContext(context); + } + } + + private void cleanupExpiredScanners() { + long now = clock.milliseconds(); + for (Map.Entry entry : scanners.entrySet()) { + if (now - entry.getValue().getLastAccessTime() > scannerTtlMs) { + ScannerContext context = scanners.remove(entry.getKey()); + if (context != null) { + LOG.info( + "Scanner {} expired, closing it.", + scannerIdToString(entry.getKey().array())); + closeScannerContext(context); + } + } + } + } + + private void closeScannerContext(ScannerContext context) { + try { + context.close(); + } catch (Exception e) { + LOG.error("Fail to close scanner context.", e); + } + } + + private byte[] generateScannerId() { + return UUID.randomUUID().toString().replace("-", "").getBytes(StandardCharsets.UTF_8); + } + + private String scannerIdToString(byte[] scannerId) { + return new String(scannerId, StandardCharsets.UTF_8); + } + + @Override + public CompletableFuture closeAsync() { + try { + close(); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + + @Override + public void close() throws Exception { + cleanupExecutor.shutdownNow(); + for (ScannerContext context : scanners.values()) { + closeScannerContext(context); + } + scanners.clear(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 8eed63c844..dc98e5b8ca 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -41,6 +41,7 @@ import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.remote.RemoteLogManager; @@ -125,6 +126,9 @@ public class TabletServer extends ServerBase { @GuardedBy("lock") private TabletService tabletService; + @GuardedBy("lock") + private ScannerManager scannerManager; + @GuardedBy("lock") private MetricRegistry metricRegistry; @@ -230,6 +234,8 @@ protected void startServices() throws Exception { this.kvManager = KvManager.create(conf, zkClient, logManager, tabletServerMetricGroup); kvManager.startup(); + this.scannerManager = new ScannerManager(conf, clock); + // Register kvManager to dynamicConfigManager for dynamic reconfiguration dynamicConfigManager.register(kvManager); // Start dynamicConfigManager after all reconfigurable components are registered @@ -286,6 +292,7 @@ protected void startServices() throws Exception { metadataManager, authorizer, dynamicConfigManager, + scannerManager, ioExecutor); RequestsMetrics requestsMetrics = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 992b963334..66f6650d96 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -19,10 +19,14 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.exception.AuthorizationException; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.NonPrimaryKeyTableException; +import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; @@ -50,12 +54,17 @@ import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; import org.apache.fluss.rpc.messages.PrefixLookupRequest; import org.apache.fluss.rpc.messages.PrefixLookupResponse; import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; @@ -71,16 +80,23 @@ import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.entity.UserContext; +import org.apache.fluss.server.kv.scan.ScannerContext; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.ListOffsetsParam; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metadata.TabletServerMetadataProvider; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.utils.ServerRpcMessageUtils; import org.apache.fluss.server.zk.ZooKeeperClient; +import org.rocksdb.RocksIterator; + import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -127,6 +143,7 @@ public final class TabletService extends RpcServiceBase implements TabletServerG private final ReplicaManager replicaManager; private final TabletServerMetadataCache metadataCache; private final TabletServerMetadataProvider metadataFunctionProvider; + private final ScannerManager scannerManager; public TabletService( int serverId, @@ -137,6 +154,7 @@ public TabletService( MetadataManager metadataManager, @Nullable Authorizer authorizer, DynamicConfigManager dynamicConfigManager, + ScannerManager scannerManager, ExecutorService ioExecutor) { super( remoteFileSystem, @@ -149,6 +167,7 @@ public TabletService( this.serviceName = "server-" + serverId; this.replicaManager = replicaManager; this.metadataCache = metadataCache; + this.scannerManager = scannerManager; this.metadataFunctionProvider = new TabletServerMetadataProvider(zkClient, metadataManager, metadataCache); } @@ -285,6 +304,136 @@ public CompletableFuture limitScan(LimitScanRequest request) return response; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + CompletableFuture response = new CompletableFuture<>(); + try { + if (request.hasScannerId()) { + byte[] scannerId = request.getScannerId(); + ScannerContext context = scannerManager.getScanner(scannerId); + if (context == null) { + throw Errors.SCANNER_NOT_FOUND_EXCEPTION.exception( + "Scanner not found: " + new String(scannerId, StandardCharsets.UTF_8)); + } + + if (request.hasCloseScanner() && request.isCloseScanner()) { + scannerManager.removeScanner(scannerId); + ScanKvResponse scanResponse = new ScanKvResponse(); + scanResponse.setScannerId(scannerId).setHasMoreResults(false); + response.complete(scanResponse); + return response; + } + + // check call seq id + if (request.getCallSeqId() != context.getCallSeqId() + 1) { + throw new FlussRuntimeException( + "Out of order scan request. Expected call seq id: " + + (context.getCallSeqId() + 1) + + ", but got: " + + request.getCallSeqId()); + } + context.setCallSeqId(request.getCallSeqId()); + + response.complete(continueScan(scannerId, context, request.getBatchSizeBytes())); + } else { + PbScanReqForBucket bucketScanReq = request.getBucketScanReq(); + authorizeTable(READ, bucketScanReq.getTableId()); + + TableBucket tb = + new TableBucket( + bucketScanReq.getTableId(), + bucketScanReq.hasPartitionId() + ? bucketScanReq.getPartitionId() + : null, + bucketScanReq.getBucketId()); + Replica replica = replicaManager.getReplicaOrException(tb); + if (!replica.isLeader()) { + throw new NotLeaderOrFollowerException("Leader not local for bucket " + tb); + } + if (!replica.isKvTable()) { + throw new NonPrimaryKeyTableException( + "Table " + bucketScanReq.getTableId() + " is not a primary key table."); + } + + long limit = bucketScanReq.hasLimit() ? bucketScanReq.getLimit() : Long.MAX_VALUE; + byte[] scannerId = scannerManager.createScanner(replica.getKvTablet(), limit); + ScannerContext context = scannerManager.getScanner(scannerId); + + ScanKvResponse scanResponse = + continueScan(scannerId, context, request.getBatchSizeBytes()); + // The FIP says: "Returns the corresponding log offset at the time the scanner is + // created" + // We can use the high watermark or the current log end offset. + scanResponse.setLogOffset(replica.getLogHighWatermark()); + response.complete(scanResponse); + } + } catch (Exception e) { + response.complete(makeScanKvErrorResponse(e)); + } + return response; + } + + private ScanKvResponse continueScan( + byte[] scannerId, ScannerContext context, int batchSizeBytes) throws IOException { + RocksIterator iterator = context.getIterator(); + DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder(); + int currentBytes = 0; + long rowsScannedInThisBatch = 0; + + while (iterator.isValid() + && context.getRowsScanned() + rowsScannedInThisBatch < context.getLimit()) { + byte[] value = iterator.value(); + // Check if adding this record would exceed batch size. + // But we must add at least one record. + if (rowsScannedInThisBatch > 0 && currentBytes + value.length > batchSizeBytes) { + break; + } + + builder.append(value); + currentBytes += value.length; + rowsScannedInThisBatch++; + iterator.next(); + } + + context.incrementRowsScanned(rowsScannedInThisBatch); + boolean hasMore = iterator.isValid() && context.getRowsScanned() < context.getLimit(); + + ScanKvResponse response = new ScanKvResponse(); + response.setScannerId(scannerId).setHasMoreResults(hasMore); + if (rowsScannedInThisBatch > 0) { + DefaultValueRecordBatch batch = builder.build(); + byte[] records = new byte[batch.sizeInBytes()]; + batch.getSegment().get(0, records); + response.setRecords(records); + } + + if (!hasMore) { + scannerManager.removeScanner(scannerId); + } + + return response; + } + + private ScanKvResponse makeScanKvErrorResponse(Throwable e) { + ScanKvResponse response = new ScanKvResponse(); + ApiError error = ApiError.fromThrowable(e); + response.setErrorCode(error.error().code()).setErrorMessage(error.message()); + return response; + } + + @Override + public CompletableFuture scannerKeepAlive( + ScannerKeepAliveRequest request) { + ScannerKeepAliveResponse response = new ScannerKeepAliveResponse(); + try { + scannerManager.keepAlive(request.getScannerId()); + } catch (Exception e) { + ApiError error = ApiError.fromThrowable(e); + response.setErrorCode(error.error().code()).setErrorMessage(error.message()); + } + return CompletableFuture.completedFuture(response); + } + @Override public CompletableFuture notifyLeaderAndIsr( NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) { From 60bae8ed5bb43253128a27d5bda0be077f709c0c Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 18 Jan 2026 22:44:21 +0200 Subject: [PATCH 2/7] update api and add tests --- .../apache/fluss/client/table/FlussTable.java | 11 + .../org/apache/fluss/client/table/Table.java | 7 + .../client/table/scanner/SnapshotQuery.java | 59 ++++ .../table/scanner/TableSnapshotQuery.java | 152 ++++++++++ .../scanner/batch/KvBatchScannerITCase.java | 283 ++++++++++++++++++ .../rpc/TestingTabletGatewayService.java | 15 + .../tablet/TestTabletServerGateway.java | 15 + 7 files changed, 542 insertions(+) create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java index 8532f2a856..79cffa999d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java @@ -23,7 +23,9 @@ import org.apache.fluss.client.lookup.TableLookup; import org.apache.fluss.client.metadata.ClientSchemaGetter; import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.SnapshotQuery; import org.apache.fluss.client.table.scanner.TableScan; +import org.apache.fluss.client.table.scanner.TableSnapshotQuery; import org.apache.fluss.client.table.writer.Append; import org.apache.fluss.client.table.writer.TableAppend; import org.apache.fluss.client.table.writer.TableUpsert; @@ -67,6 +69,15 @@ public Scan newScan() { return new TableScan(conn, tableInfo, schemaGetter); } + @Override + public SnapshotQuery newSnapshotQuery() { + checkState( + hasPrimaryKey, + "Table %s is not a Primary Key Table and doesn't support SnapshotQuery.", + tablePath); + return new TableSnapshotQuery(conn, tableInfo, schemaGetter); + } + @Override public Lookup newLookup() { return new TableLookup( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java b/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java index 813b62034a..287311e9ae 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.lookup.Lookup; import org.apache.fluss.client.lookup.Lookuper; import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.SnapshotQuery; import org.apache.fluss.client.table.writer.Append; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.Upsert; @@ -55,6 +56,12 @@ public interface Table extends AutoCloseable { */ Scan newScan(); + /** + * Creates a new {@link SnapshotQuery} for this table to configure and execute a snapshot query + * to read all current data in a table bucket (requires to be a Primary Key Table). + */ + SnapshotQuery newSnapshotQuery(); + /** * Creates a new {@link Lookup} for this table to configure and create a {@link Lookuper} to * lookup data for this table by primary key or a prefix of primary key. diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java new file mode 100644 index 0000000000..37b16f5d0a --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Used to configure and execute a snapshot query to read all current data in a table bucket for a + * primary key table. + * + * @since 0.6 + */ +@PublicEvolving +public interface SnapshotQuery { + + /** + * Returns a new snapshot query from this that will read the given data columns. + * + * @param projectedColumns the selected column indexes + */ + SnapshotQuery project(@Nullable int[] projectedColumns); + + /** + * Returns a new snapshot query from this that will read the given data columns. + * + * @param projectedColumnNames the selected column names + */ + SnapshotQuery project(List projectedColumnNames); + + /** + * Executes the snapshot query to read all current data in the given table bucket. + * + * @param tableBucket the table bucket to read + * @return a closeable iterator of the rows in the table bucket + */ + CloseableIterator execute(TableBucket tableBucket); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java new file mode 100644 index 0000000000..a3494fe567 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.client.FlussConnection; +import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** Implementation of {@link SnapshotQuery}. */ +public class TableSnapshotQuery implements SnapshotQuery { + + private final FlussConnection conn; + private final TableInfo tableInfo; + private final SchemaGetter schemaGetter; + + /** The projected fields to do projection. No projection if is null. */ + @Nullable private final int[] projectedColumns; + + public TableSnapshotQuery( + FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) { + this(conn, tableInfo, schemaGetter, null); + } + + private TableSnapshotQuery( + FlussConnection conn, + TableInfo tableInfo, + SchemaGetter schemaGetter, + @Nullable int[] projectedColumns) { + this.conn = conn; + this.tableInfo = tableInfo; + this.schemaGetter = schemaGetter; + this.projectedColumns = projectedColumns; + } + + @Override + public SnapshotQuery project(@Nullable int[] projectedColumns) { + return new TableSnapshotQuery(conn, tableInfo, schemaGetter, projectedColumns); + } + + @Override + public SnapshotQuery project(List projectedColumnNames) { + int[] columnIndexes = new int[projectedColumnNames.size()]; + RowType rowType = tableInfo.getRowType(); + for (int i = 0; i < projectedColumnNames.size(); i++) { + int index = rowType.getFieldIndex(projectedColumnNames.get(i)); + if (index < 0) { + throw new IllegalArgumentException( + String.format( + "Field '%s' not found in table schema. Available fields: %s, Table: %s", + projectedColumnNames.get(i), + rowType.getFieldNames(), + tableInfo.getTablePath())); + } + columnIndexes[i] = index; + } + return new TableSnapshotQuery(conn, tableInfo, schemaGetter, columnIndexes); + } + + @Override + public CloseableIterator execute(TableBucket tableBucket) { + Scan scan = new TableScan(conn, tableInfo, schemaGetter); + if (projectedColumns != null) { + scan = scan.project(projectedColumns); + } + BatchScanner batchScanner = scan.createBatchScanner(tableBucket); + return new BatchScannerIterator(batchScanner); + } + + private static class BatchScannerIterator implements CloseableIterator { + private final BatchScanner scanner; + private Iterator currentBatch; + private boolean isClosed = false; + + private BatchScannerIterator(BatchScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean hasNext() { + ensureBatch(); + return currentBatch != null && currentBatch.hasNext(); + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentBatch.next(); + } + + private void ensureBatch() { + try { + while ((currentBatch == null || !currentBatch.hasNext()) && !isClosed) { + CloseableIterator it = + scanner.pollBatch(Duration.ofMinutes(1)); // Use a large timeout + if (it == null) { + isClosed = true; + break; + } + if (it.hasNext()) { + currentBatch = it; + } else { + it.close(); + } + } + } catch (IOException e) { + throw new RuntimeException("Error polling batch from scanner", e); + } + } + + @Override + public void close() { + if (!isClosed) { + try { + scanner.close(); + } catch (IOException e) { + throw new RuntimeException("Error closing scanner", e); + } + isClosed = true; + } + } + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java new file mode 100644 index 0000000000..6850debb29 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.admin.ClientToServerITCaseBase; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for {@link KvBatchScanner}. */ +public class KvBatchScannerITCase extends ClientToServerITCaseBase { + @BeforeEach + protected void setup() throws Exception { + super.setup(); + } + + @AfterEach + protected void teardown() throws Exception { + super.teardown(); + } + + @Test + void testBasicScan() throws Exception { + System.out.println("eddww"); + TablePath tablePath = TablePath.of("test_db", "test_basic_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "id").build(); + + createTable(tablePath, descriptor, true); + System.out.println("eddww"); + + Table table = conn.getTable(tablePath); + + // 1. write data + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.upsert(row(2, "b")); + writer.upsert(row(3, "c")); + writer.flush(); + System.out.println("never reaches this"); + + // 2. test the kvScan works as expected + TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); + List result = scanAll(table, bucket); + + assertThat(result).hasSize(3); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(result.get(0)).withSchema(schema.getRowType()).isEqualTo(row(1, "a")); + assertThatRow(result.get(1)).withSchema(schema.getRowType()).isEqualTo(row(2, "b")); + assertThatRow(result.get(2)).withSchema(schema.getRowType()).isEqualTo(row(3, "c")); + } + + @Test + void testMultiBucketScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_multi_bucket_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + // 3 buckets + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(3, "id").build(); + + createTable(tablePath, descriptor, true); + Table table = conn.getTable(tablePath); + + // 1. write data to multiple buckets + int rowCount = 100; + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < rowCount; i++) { + writer.upsert(row(i, "val" + i)); + } + writer.flush(); + + // 2. scan each bucket and collect all data + List allResult = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), i); + allResult.addAll(scanAll(table, bucket)); + } + + assertThat(allResult).hasSize(rowCount); + allResult.sort(Comparator.comparingInt(r -> r.getInt(0))); + for (int i = 0; i < rowCount; i++) { + assertThatRow(allResult.get(i)) + .withSchema(schema.getRowType()) + .isEqualTo(row(i, "val" + i)); + } + } + + @Test + void testPartitionedTableScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_partitioned_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("p", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .primaryKey("id", "p") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .partitionedBy("p") + .distributedBy(1, "id") + .build(); + + createTable(tablePath, descriptor, true); + admin.createPartition( + tablePath, + new PartitionSpec(java.util.Collections.singletonMap("p", "p1")), + false) + .get(); + admin.createPartition( + tablePath, + new PartitionSpec(java.util.Collections.singletonMap("p", "p2")), + false) + .get(); + + Table table = conn.getTable(tablePath); + long p1Id = -1; + long p2Id = -1; + for (org.apache.fluss.metadata.PartitionInfo p : + admin.listPartitionInfos(tablePath).get()) { + if (p.getPartitionName().equals("p=p1")) { + p1Id = p.getPartitionId(); + } else if (p.getPartitionName().equals("p=p2")) { + p2Id = p.getPartitionId(); + } + } + + // 1. write data to different partitions + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "p1", "a1")); + writer.upsert(row(2, "p1", "b1")); + writer.upsert(row(1, "p2", "a2")); + writer.flush(); + + // 2. scan partition p1 + TableBucket p1Bucket = new TableBucket(table.getTableInfo().getTableId(), p1Id, 0); + List p1Result = scanAll(table, p1Bucket); + assertThat(p1Result).hasSize(2); + + // 3. scan partition p2 + TableBucket p2Bucket = new TableBucket(table.getTableInfo().getTableId(), p2Id, 0); + List p2Result = scanAll(table, p2Bucket); + assertThat(p2Result).hasSize(1); + assertThatRow(p2Result.get(0)) + .withSchema(schema.getRowType()) + .isEqualTo(row(1, "p2", "a2")); + } + + @Test + void testLargeDataScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_large_data_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "id").build(); + + createTable(tablePath, descriptor, true); + Table table = conn.getTable(tablePath); + + // 1. write 10k records + int rowCount = 10000; + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < rowCount; i++) { + writer.upsert(row(i, "val" + i)); + } + writer.flush(); + + // 2. scan and verify + TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); + List result = scanAll(table, bucket); + + assertThat(result).hasSize(rowCount); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + for (int i = 0; i < rowCount; i++) { + assertThatRow(result.get(i)) + .withSchema(schema.getRowType()) + .isEqualTo(row(i, "val" + i)); + } + } + + @Test + void testSnapshotQuery() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_snapshot_query"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "id").build(); + + createTable(tablePath, descriptor, true); + Table table = conn.getTable(tablePath); + + // 1. write data + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.upsert(row(2, "b")); + writer.upsert(row(3, "c")); + writer.flush(); + + // 2. test the snapshotQuery works as expected + TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); + List result = new ArrayList<>(); + try (CloseableIterator iterator = table.newSnapshotQuery().execute(bucket)) { + while (iterator.hasNext()) { + result.add(iterator.next()); + } + } + + assertThat(result).hasSize(3); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(result.get(0)).withSchema(schema.getRowType()).isEqualTo(row(1, "a")); + assertThatRow(result.get(1)).withSchema(schema.getRowType()).isEqualTo(row(2, "b")); + assertThatRow(result.get(2)).withSchema(schema.getRowType()).isEqualTo(row(3, "c")); + } + + private List scanAll(Table table, TableBucket bucket) throws Exception { + List allRows = new ArrayList<>(); + try (BatchScanner scanner = table.newScan().createBatchScanner(bucket)) { + CloseableIterator iterator; + while ((iterator = scanner.pollBatch(Duration.ofSeconds(5))) != null) { + while (iterator.hasNext()) { + allRows.add(iterator.next()); + } + iterator.close(); + } + } + return allRows; + } +} diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 7db3654383..7503c3379a 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -71,6 +71,10 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -135,6 +139,17 @@ public CompletableFuture limitScan(LimitScanRequest request) return null; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + return null; + } + + @Override + public CompletableFuture scannerKeepAlive( + ScannerKeepAliveRequest request) { + return null; + } + @Override public CompletableFuture listOffsets(ListOffsetsRequest request) { return null; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 500d197fcf..1c4d7f78c0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -81,6 +81,10 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -204,6 +208,17 @@ public CompletableFuture limitScan(LimitScanRequest request) return null; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + return null; + } + + @Override + public CompletableFuture scannerKeepAlive( + ScannerKeepAliveRequest request) { + return null; + } + @Override public CompletableFuture listOffsets(ListOffsetsRequest request) { return null; From 63ad71f568fce417bddb66f0da8ef2b640ce6467 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 18 Jan 2026 23:04:55 +0200 Subject: [PATCH 3/7] update tests --- .../scanner/batch/KvBatchScannerITCase.java | 47 +++++++------------ 1 file changed, 16 insertions(+), 31 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java index 6850debb29..85ba5c8081 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java @@ -20,6 +20,7 @@ import org.apache.fluss.client.admin.ClientToServerITCaseBase; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; @@ -33,7 +34,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -56,7 +56,6 @@ protected void teardown() throws Exception { @Test void testBasicScan() throws Exception { - System.out.println("eddww"); TablePath tablePath = TablePath.of("test_db", "test_basic_scan"); Schema schema = Schema.newBuilder() @@ -68,7 +67,6 @@ void testBasicScan() throws Exception { TableDescriptor.builder().schema(schema).distributedBy(1, "id").build(); createTable(tablePath, descriptor, true); - System.out.println("eddww"); Table table = conn.getTable(tablePath); @@ -78,11 +76,10 @@ void testBasicScan() throws Exception { writer.upsert(row(2, "b")); writer.upsert(row(3, "c")); writer.flush(); - System.out.println("never reaches this"); - // 2. test the kvScan works as expected + // 2. test the snapshotQuery works as expected TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); - List result = scanAll(table, bucket); + List result = snapshotQueryAll(table, bucket); assertThat(result).hasSize(3); result.sort(Comparator.comparingInt(r -> r.getInt(0))); @@ -119,7 +116,7 @@ void testMultiBucketScan() throws Exception { List allResult = new ArrayList<>(); for (int i = 0; i < 3; i++) { TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), i); - allResult.addAll(scanAll(table, bucket)); + allResult.addAll(snapshotQueryAll(table, bucket)); } assertThat(allResult).hasSize(rowCount); @@ -163,30 +160,27 @@ void testPartitionedTableScan() throws Exception { Table table = conn.getTable(tablePath); long p1Id = -1; long p2Id = -1; - for (org.apache.fluss.metadata.PartitionInfo p : - admin.listPartitionInfos(tablePath).get()) { - if (p.getPartitionName().equals("p=p1")) { + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + for (PartitionInfo p : partitionInfos) { + if (p.getPartitionName().equals("p1")) { p1Id = p.getPartitionId(); - } else if (p.getPartitionName().equals("p=p2")) { + } else if (p.getPartitionName().equals("p2")) { p2Id = p.getPartitionId(); } } - // 1. write data to different partitions UpsertWriter writer = table.newUpsert().createWriter(); writer.upsert(row(1, "p1", "a1")); writer.upsert(row(2, "p1", "b1")); writer.upsert(row(1, "p2", "a2")); writer.flush(); - // 2. scan partition p1 TableBucket p1Bucket = new TableBucket(table.getTableInfo().getTableId(), p1Id, 0); - List p1Result = scanAll(table, p1Bucket); + List p1Result = snapshotQueryAll(table, p1Bucket); assertThat(p1Result).hasSize(2); - // 3. scan partition p2 TableBucket p2Bucket = new TableBucket(table.getTableInfo().getTableId(), p2Id, 0); - List p2Result = scanAll(table, p2Bucket); + List p2Result = snapshotQueryAll(table, p2Bucket); assertThat(p2Result).hasSize(1); assertThatRow(p2Result.get(0)) .withSchema(schema.getRowType()) @@ -218,7 +212,7 @@ void testLargeDataScan() throws Exception { // 2. scan and verify TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); - List result = scanAll(table, bucket); + List result = snapshotQueryAll(table, bucket); assertThat(result).hasSize(rowCount); result.sort(Comparator.comparingInt(r -> r.getInt(0))); @@ -253,12 +247,7 @@ void testSnapshotQuery() throws Exception { // 2. test the snapshotQuery works as expected TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); - List result = new ArrayList<>(); - try (CloseableIterator iterator = table.newSnapshotQuery().execute(bucket)) { - while (iterator.hasNext()) { - result.add(iterator.next()); - } - } + List result = snapshotQueryAll(table, bucket); assertThat(result).hasSize(3); result.sort(Comparator.comparingInt(r -> r.getInt(0))); @@ -267,15 +256,11 @@ void testSnapshotQuery() throws Exception { assertThatRow(result.get(2)).withSchema(schema.getRowType()).isEqualTo(row(3, "c")); } - private List scanAll(Table table, TableBucket bucket) throws Exception { + private List snapshotQueryAll(Table table, TableBucket bucket) throws Exception { List allRows = new ArrayList<>(); - try (BatchScanner scanner = table.newScan().createBatchScanner(bucket)) { - CloseableIterator iterator; - while ((iterator = scanner.pollBatch(Duration.ofSeconds(5))) != null) { - while (iterator.hasNext()) { - allRows.add(iterator.next()); - } - iterator.close(); + try (CloseableIterator iterator = table.newSnapshotQuery().execute(bucket)) { + while (iterator.hasNext()) { + allRows.add(iterator.next()); } } return allRows; From ae488e3634751316be36d16355f9448d5745f6ba Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 18 Jan 2026 23:11:44 +0200 Subject: [PATCH 4/7] update the snapshotQuery and the tests --- .../client/table/scanner/SnapshotQuery.java | 27 ++--- .../table/scanner/TableSnapshotQuery.java | 100 +++++++++++++----- ...ITCase.java => KvSnapshotQueryITCase.java} | 49 ++------- 3 files changed, 95 insertions(+), 81 deletions(-) rename fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/{KvBatchScannerITCase.java => KvSnapshotQueryITCase.java} (80%) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java index 37b16f5d0a..5fe8327b6f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java @@ -22,10 +22,6 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.utils.CloseableIterator; -import javax.annotation.Nullable; - -import java.util.List; - /** * Used to configure and execute a snapshot query to read all current data in a table bucket for a * primary key table. @@ -34,21 +30,6 @@ */ @PublicEvolving public interface SnapshotQuery { - - /** - * Returns a new snapshot query from this that will read the given data columns. - * - * @param projectedColumns the selected column indexes - */ - SnapshotQuery project(@Nullable int[] projectedColumns); - - /** - * Returns a new snapshot query from this that will read the given data columns. - * - * @param projectedColumnNames the selected column names - */ - SnapshotQuery project(List projectedColumnNames); - /** * Executes the snapshot query to read all current data in the given table bucket. * @@ -56,4 +37,12 @@ public interface SnapshotQuery { * @return a closeable iterator of the rows in the table bucket */ CloseableIterator execute(TableBucket tableBucket); + + /** + * Executes the snapshot query to read all current data in the table. Everything around + * partitions and buckets will be taken care of from the client. + * + * @return a closeable iterator of the rows in the table + */ + CloseableIterator execute(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java index a3494fe567..eb4c7ac022 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java @@ -19,17 +19,19 @@ import org.apache.fluss.client.FlussConnection; import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.row.InternalRow; -import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; import javax.annotation.Nullable; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -61,37 +63,87 @@ private TableSnapshotQuery( } @Override - public SnapshotQuery project(@Nullable int[] projectedColumns) { - return new TableSnapshotQuery(conn, tableInfo, schemaGetter, projectedColumns); + public CloseableIterator execute(TableBucket tableBucket) { + Scan scan = new TableScan(conn, tableInfo, schemaGetter); + if (projectedColumns != null) { + scan = scan.project(projectedColumns); + } + BatchScanner batchScanner = scan.createBatchScanner(tableBucket); + return new BatchScannerIterator(batchScanner); } @Override - public SnapshotQuery project(List projectedColumnNames) { - int[] columnIndexes = new int[projectedColumnNames.size()]; - RowType rowType = tableInfo.getRowType(); - for (int i = 0; i < projectedColumnNames.size(); i++) { - int index = rowType.getFieldIndex(projectedColumnNames.get(i)); - if (index < 0) { - throw new IllegalArgumentException( - String.format( - "Field '%s' not found in table schema. Available fields: %s, Table: %s", - projectedColumnNames.get(i), - rowType.getFieldNames(), - tableInfo.getTablePath())); + public CloseableIterator execute() { + List buckets = new ArrayList<>(); + try { + if (tableInfo.isPartitioned()) { + List partitions = + conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get(); + for (PartitionInfo partition : partitions) { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add( + new TableBucket( + tableInfo.getTableId(), partition.getPartitionId(), i)); + } + } + } else { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add(new TableBucket(tableInfo.getTableId(), i)); + } } - columnIndexes[i] = index; + } catch (Exception e) { + throw new FlussRuntimeException( + "Failed to list partitions for table " + tableInfo.getTablePath(), e); } - return new TableSnapshotQuery(conn, tableInfo, schemaGetter, columnIndexes); + + return new MultiBucketBatchScannerIterator(buckets); } - @Override - public CloseableIterator execute(TableBucket tableBucket) { - Scan scan = new TableScan(conn, tableInfo, schemaGetter); - if (projectedColumns != null) { - scan = scan.project(projectedColumns); + private class MultiBucketBatchScannerIterator implements CloseableIterator { + private final Iterator bucketIterator; + private CloseableIterator currentScannerIterator; + private boolean isClosed = false; + + private MultiBucketBatchScannerIterator(List buckets) { + this.bucketIterator = buckets.iterator(); + } + + @Override + public boolean hasNext() { + if (isClosed) { + return false; + } + while (currentScannerIterator == null || !currentScannerIterator.hasNext()) { + if (currentScannerIterator != null) { + currentScannerIterator.close(); + currentScannerIterator = null; + } + if (bucketIterator.hasNext()) { + currentScannerIterator = execute(bucketIterator.next()); + } else { + return false; + } + } + return true; + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentScannerIterator.next(); + } + + @Override + public void close() { + if (!isClosed) { + if (currentScannerIterator != null) { + currentScannerIterator.close(); + } + isClosed = true; + } } - BatchScanner batchScanner = scan.createBatchScanner(tableBucket); - return new BatchScannerIterator(batchScanner); } private static class BatchScannerIterator implements CloseableIterator { diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotQueryITCase.java similarity index 80% rename from fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java rename to fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotQueryITCase.java index 85ba5c8081..3754ff3880 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvBatchScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotQueryITCase.java @@ -20,10 +20,8 @@ import org.apache.fluss.client.admin.ClientToServerITCaseBase; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; -import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.Schema; -import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; @@ -42,8 +40,8 @@ import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow; import static org.assertj.core.api.Assertions.assertThat; -/** ITCase for {@link KvBatchScanner}. */ -public class KvBatchScannerITCase extends ClientToServerITCaseBase { +/** ITCase for snapshot query. */ +public class KvSnapshotQueryITCase extends ClientToServerITCaseBase { @BeforeEach protected void setup() throws Exception { super.setup(); @@ -78,8 +76,7 @@ void testBasicScan() throws Exception { writer.flush(); // 2. test the snapshotQuery works as expected - TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); - List result = snapshotQueryAll(table, bucket); + List result = snapshotQueryAll(table); assertThat(result).hasSize(3); result.sort(Comparator.comparingInt(r -> r.getInt(0))); @@ -112,12 +109,8 @@ void testMultiBucketScan() throws Exception { } writer.flush(); - // 2. scan each bucket and collect all data - List allResult = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), i); - allResult.addAll(snapshotQueryAll(table, bucket)); - } + // 2. scan all buckets and collect all data + List allResult = snapshotQueryAll(table); assertThat(allResult).hasSize(rowCount); allResult.sort(Comparator.comparingInt(r -> r.getInt(0))); @@ -158,16 +151,6 @@ void testPartitionedTableScan() throws Exception { .get(); Table table = conn.getTable(tablePath); - long p1Id = -1; - long p2Id = -1; - List partitionInfos = admin.listPartitionInfos(tablePath).get(); - for (PartitionInfo p : partitionInfos) { - if (p.getPartitionName().equals("p1")) { - p1Id = p.getPartitionId(); - } else if (p.getPartitionName().equals("p2")) { - p2Id = p.getPartitionId(); - } - } UpsertWriter writer = table.newUpsert().createWriter(); writer.upsert(row(1, "p1", "a1")); @@ -175,16 +158,8 @@ void testPartitionedTableScan() throws Exception { writer.upsert(row(1, "p2", "a2")); writer.flush(); - TableBucket p1Bucket = new TableBucket(table.getTableInfo().getTableId(), p1Id, 0); - List p1Result = snapshotQueryAll(table, p1Bucket); - assertThat(p1Result).hasSize(2); - - TableBucket p2Bucket = new TableBucket(table.getTableInfo().getTableId(), p2Id, 0); - List p2Result = snapshotQueryAll(table, p2Bucket); - assertThat(p2Result).hasSize(1); - assertThatRow(p2Result.get(0)) - .withSchema(schema.getRowType()) - .isEqualTo(row(1, "p2", "a2")); + List result = snapshotQueryAll(table); + assertThat(result).hasSize(3); } @Test @@ -211,8 +186,7 @@ void testLargeDataScan() throws Exception { writer.flush(); // 2. scan and verify - TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); - List result = snapshotQueryAll(table, bucket); + List result = snapshotQueryAll(table); assertThat(result).hasSize(rowCount); result.sort(Comparator.comparingInt(r -> r.getInt(0))); @@ -246,8 +220,7 @@ void testSnapshotQuery() throws Exception { writer.flush(); // 2. test the snapshotQuery works as expected - TableBucket bucket = new TableBucket(table.getTableInfo().getTableId(), 0); - List result = snapshotQueryAll(table, bucket); + List result = snapshotQueryAll(table); assertThat(result).hasSize(3); result.sort(Comparator.comparingInt(r -> r.getInt(0))); @@ -256,9 +229,9 @@ void testSnapshotQuery() throws Exception { assertThatRow(result.get(2)).withSchema(schema.getRowType()).isEqualTo(row(3, "c")); } - private List snapshotQueryAll(Table table, TableBucket bucket) throws Exception { + private List snapshotQueryAll(Table table) throws Exception { List allRows = new ArrayList<>(); - try (CloseableIterator iterator = table.newSnapshotQuery().execute(bucket)) { + try (CloseableIterator iterator = table.newSnapshotQuery().execute()) { while (iterator.hasNext()) { allRows.add(iterator.next()); } From 7808564c4f66d82f7755f35676af32347b77af8c Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 18 Jan 2026 23:22:54 +0200 Subject: [PATCH 5/7] fix proto --- .../apache/fluss/rpc/protocol/ApiKeys.java | 40 +++++----- fluss-rpc/src/main/proto/FlussApi.proto | 78 +++++++++++++++++++ 2 files changed, 98 insertions(+), 20 deletions(-) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 105b3e0a9d..b72edebf75 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -63,26 +63,26 @@ public enum ApiKeys { GET_LATEST_LAKE_SNAPSHOT(1032, 0, 0, PUBLIC), LIMIT_SCAN(1033, 0, 0, PUBLIC), PREFIX_LOOKUP(1034, 0, 0, PUBLIC), - SCAN_KV(1035, 0, 0, PUBLIC), - SCANNER_KEEP_ALIVE(1036, 0, 0, PUBLIC), - GET_DATABASE_INFO(1037, 0, 0, PUBLIC), - CREATE_PARTITION(1038, 0, 0, PUBLIC), - DROP_PARTITION(1039, 0, 0, PUBLIC), - AUTHENTICATE(1040, 0, 0, PUBLIC), - CREATE_ACLS(1041, 0, 0, PUBLIC), - LIST_ACLS(1042, 0, 0, PUBLIC), - DROP_ACLS(1043, 0, 0, PUBLIC), - LAKE_TIERING_HEARTBEAT(1044, 0, 0, PRIVATE), - CONTROLLED_SHUTDOWN(1045, 0, 0, PRIVATE), - ALTER_TABLE(1046, 0, 0, PUBLIC), - DESCRIBE_CLUSTER_CONFIGS(1047, 0, 0, PUBLIC), - ALTER_CLUSTER_CONFIGS(1048, 0, 0, PUBLIC), - ADD_SERVER_TAG(1049, 0, 0, PUBLIC), - REMOVE_SERVER_TAG(1050, 0, 0, PUBLIC), - REBALANCE(1051, 0, 0, PUBLIC), - LIST_REBALANCE_PROGRESS(1052, 0, 0, PUBLIC), - CANCEL_REBALANCE(1053, 0, 0, PUBLIC), - PREPARE_LAKE_TABLE_SNAPSHOT(1054, 0, 0, PRIVATE); + GET_DATABASE_INFO(1035, 0, 0, PUBLIC), + CREATE_PARTITION(1036, 0, 0, PUBLIC), + DROP_PARTITION(1037, 0, 0, PUBLIC), + AUTHENTICATE(1038, 0, 0, PUBLIC), + CREATE_ACLS(1039, 0, 0, PUBLIC), + LIST_ACLS(1040, 0, 0, PUBLIC), + DROP_ACLS(1041, 0, 0, PUBLIC), + LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE), + CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE), + ALTER_TABLE(1044, 0, 0, PUBLIC), + DESCRIBE_CLUSTER_CONFIGS(1045, 0, 0, PUBLIC), + ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC), + ADD_SERVER_TAG(1047, 0, 0, PUBLIC), + REMOVE_SERVER_TAG(1048, 0, 0, PUBLIC), + REBALANCE(1049, 0, 0, PUBLIC), + LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC), + CANCEL_REBALANCE(1051, 0, 0, PUBLIC), + PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE), + SCAN_KV(1053, 0, 0, PUBLIC), + SCANNER_KEEP_ALIVE(1054, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..d828cb6105 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -255,6 +255,84 @@ message PrefixLookupResponse { } +// scan kv request and response +message ScanKvRequest { + // If continuing an existing scan, then you must set scanner_id. + // Otherwise, you must set 'new_scan_request'. + optional bytes scanner_id = 1; + optional PbScanReqForBucket bucket_scan_req = 2; + + // The sequence ID of this call. The sequence ID should start at 0 + // with the request for a new scanner, and after each successful request, + // the client should increment it by 1. When retrying a request, the client + // should _not_ increment this value. If the server detects that the client + // missed a chunk of rows from the middle of a scan, it will respond with an + // error. + optional uint32 call_seq_id = 3; + + // The maximum number of bytes to send in the response. + optional uint32 batch_size_bytes = 4; + + // If set, the server will close the scanner after responding to + // this request, regardless of whether all rows have been delivered. + optional bool close_scanner = 5; +} + +message PbScanReqForBucket { + // The tablet to scan. + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + + // The maximum number of rows to scan with the new scanner. + // + // The scanner will automatically stop yielding results and close itself + // after reaching this number of result rows. + optional uint64 limit = 4; +} + +message ScanKvResponse { + // The error, if an error occurred with this request. + optional int32 error_code = 1; + optional string error_message = 2; + + // When a scanner is created, returns the scanner ID which may be used + // to pull new rows from the scanner. + optional bytes scanner_id = 3; + + // Set to true to indicate that there may be further results to be fetched + // from this scanner. If the scanner has no more results, then the scanner + // ID will become invalid and cannot continue to be used. + // + // Note that if a scan returns no results, then the initial response from + // the first RPC may return false in this flag, in which case there will + // be no scanner ID assigned. + optional bool has_more_results = 4; + + // The block of returned rows. + // + // NOTE: the schema-related fields will not be present in this row block. + // The schema will match the schema requested by the client when it created + // the scanner. + optional bytes records = 5; + + // Returns the corresponding log offset at the time the scanner is created + optional int64 log_offset = 6; +} + +// A scanner keep-alive request. +// Updates the scanner access time, increasing its time-to-live. +message ScannerKeepAliveRequest { + required bytes scanner_id = 1; +} + +message ScannerKeepAliveResponse { + // The error, if an error occurred with this request. + optional int32 error_code = 1; + optional string error_message = 2; +} + + // limit scan request and response message LimitScanRequest { required int64 table_id = 2; From c8f5464779df915a17dce4d7222371a4512b6be6 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 18 Jan 2026 23:28:25 +0200 Subject: [PATCH 6/7] fix violation --- .../java/org/apache/fluss/server/kv/scan/ScannerManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java index 4d2b8f0e48..4beac6e1d9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java @@ -23,6 +23,7 @@ import org.apache.fluss.server.kv.KvTablet; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; import org.apache.fluss.utils.AutoCloseableAsync; +import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; @@ -39,7 +40,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -48,7 +48,7 @@ public class ScannerManager implements AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(ScannerManager.class); - private final Map scanners = new ConcurrentHashMap<>(); + private final Map scanners = MapUtils.newConcurrentHashMap(); private final ScheduledExecutorService cleanupExecutor; private final Clock clock; private final long scannerTtlMs; From 1bd22d8a49f41eb665e82c8cc983e9a92337a3e9 Mon Sep 17 00:00:00 2001 From: ipolyzos Date: Sun, 18 Jan 2026 23:35:09 +0200 Subject: [PATCH 7/7] update interface --- .../org/apache/fluss/client/table/scanner/SnapshotQuery.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java index 5fe8327b6f..26b6c2920a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java @@ -23,10 +23,9 @@ import org.apache.fluss.utils.CloseableIterator; /** - * Used to configure and execute a snapshot query to read all current data in a table bucket for a - * primary key table. + * Used to configure and execute a snapshot query to read all kv data of a primary key table. * - * @since 0.6 + * @since 0.9 */ @PublicEvolving public interface SnapshotQuery {