Skip to content

Commit d59571d

Browse files
committed
address's Yang Wang's comments
1 parent dda591c commit d59571d

File tree

17 files changed

+93
-85
lines changed

17 files changed

+93
-85
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@
7272

7373
import java.util.Collection;
7474
import java.util.List;
75-
import java.util.Optional;
7675
import java.util.Map;
76+
import java.util.Optional;
7777
import java.util.Set;
7878
import java.util.concurrent.CompletableFuture;
7979

@@ -456,12 +456,12 @@ CompletableFuture<Void> releaseKvSnapshotLease(
456456
String leaseId, Set<TableBucket> bucketsToRelease);
457457

458458
/**
459-
* Drops the entire lease asynchronously.
459+
* Releases the entire lease asynchronously.
460460
*
461461
* <p>All snapshots locked under this {@code leaseId} will be released immediately. This is
462462
* equivalent to calling {@link #releaseKvSnapshotLease} with all held buckets.
463463
*
464-
* @param leaseId The lease id to drop.
464+
* @param leaseId The lease id to release.
465465
*/
466466
CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
467467

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.fluss.row.encode.KeyEncoder;
3838
import org.apache.fluss.server.zk.ZooKeeperClient;
3939
import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
40-
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataHelper;
40+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
4141
import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
4242
import org.apache.fluss.types.DataTypes;
4343

@@ -205,8 +205,8 @@ public void testKvSnapshotLease() throws Exception {
205205

206206
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
207207
String remoteDataDir = FLUSS_CLUSTER_EXTENSION.getRemoteDataDir();
208-
KvSnapshotLeaseMetadataHelper metadataHelper =
209-
new KvSnapshotLeaseMetadataHelper(zkClient, remoteDataDir);
208+
KvSnapshotLeaseMetadataManager metadataHelper =
209+
new KvSnapshotLeaseMetadataManager(zkClient, remoteDataDir);
210210

211211
assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty();
212212

@@ -346,7 +346,7 @@ private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets, long sn
346346
}
347347

348348
private void checkKvSnapshotLeaseEquals(
349-
KvSnapshotLeaseMetadataHelper metadataHelper,
349+
KvSnapshotLeaseMetadataManager metadataHelper,
350350
String leaseId,
351351
long tableId,
352352
Long[] expectedBucketIndex)

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class MetricNames {
5454
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize";
5555

5656
// for kv snapshot lease.
57+
// TODO implemented it at the table level. Trace by: https://github.com/apache/fluss/issues/2297
5758
public static final String KV_SNAPSHOT_LEASE_COUNT = "kvSnapshotLeaseCount";
5859
public static final String LEASED_KV_SNAPSHOT_COUNT = "leasedKvSnapshotCount";
5960

fluss-common/src/test/java/org/apache/fluss/record/TestData.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public final class TestData {
9797
currentMillis);
9898

9999
// for log table / partition table
100+
public static final TablePath PARTITION_TABLE_PATH =
101+
new TablePath("test_db_1", "test_partition_table");
102+
public static final long PARTITION_TABLE_ID = 150008L;
103+
100104
public static final TableDescriptor DATA1_PARTITIONED_TABLE_DESCRIPTOR =
101105
TableDescriptor.builder()
102106
.schema(DATA1_SCHEMA)
@@ -107,6 +111,16 @@ public final class TestData {
107111
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
108112
AutoPartitionTimeUnit.YEAR)
109113
.build();
114+
115+
public static final TableInfo PARTITION_TABLE_INFO =
116+
TableInfo.of(
117+
PARTITION_TABLE_PATH,
118+
PARTITION_TABLE_ID,
119+
1,
120+
DATA1_PARTITIONED_TABLE_DESCRIPTOR,
121+
System.currentTimeMillis(),
122+
System.currentTimeMillis());
123+
110124
public static final PhysicalTablePath DATA1_PHYSICAL_TABLE_PATH_PA_2024 =
111125
PhysicalTablePath.of(DATA1_TABLE_PATH, "2024");
112126

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.table.annotation.ProcedureHint;
2323
import org.apache.flink.table.procedure.ProcedureContext;
2424

25-
/** Procedure to drop kv snapshot lease. */
25+
/** Procedure to release kv snapshot lease. */
2626
public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
2727

2828
@ProcedureHint(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ private SourceEnumeratorState deserializeV0(byte[] serialized) throws IOExceptio
199199
return new SourceEnumeratorState(
200200
assignBucketAndPartitions.f0,
201201
assignBucketAndPartitions.f1,
202-
remainingHybridLakeFlussSplits, leaseContext);
202+
remainingHybridLakeFlussSplits,
203+
leaseContext);
203204
}
204205

205206
private SourceEnumeratorState deserializeV1(byte[] serialized) throws IOException {
@@ -213,14 +214,14 @@ private SourceEnumeratorState deserializeV1(byte[] serialized) throws IOExceptio
213214
// this logic no longer depends on the lakeSource flag. This unconditional
214215
// deserialization is the intended behavior change compared to VERSION_0.
215216

216-
217217
// deserialize lease context
218218
LeaseContext leaseContext = deserializeLeaseContext(in);
219219

220220
return new SourceEnumeratorState(
221221
assignBucketAndPartitions.f0,
222222
assignBucketAndPartitions.f1,
223-
remainingHybridLakeFlussSplits, leaseContext);
223+
remainingHybridLakeFlussSplits,
224+
leaseContext);
224225
}
225226

226227
private Tuple2<Set<TableBucket>, Map<Long, String>> deserializeAssignBucketAndPartitions(

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.fluss.flink.procedure;
1919

20-
import org.apache.fluss.client.Connection;
21-
import org.apache.fluss.client.ConnectionFactory;
22-
import org.apache.fluss.client.admin.Admin;
23-
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2420
import org.apache.fluss.client.Connection;
2521
import org.apache.fluss.client.ConnectionFactory;
2622
import org.apache.fluss.client.admin.Admin;
2723
import org.apache.fluss.client.metadata.KvSnapshots;
24+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2825
import org.apache.fluss.cluster.rebalance.ServerTag;
2926
import org.apache.fluss.config.ConfigOptions;
3027
import org.apache.fluss.config.Configuration;
@@ -34,7 +31,6 @@
3431
import org.apache.fluss.metadata.DataLakeFormat;
3532
import org.apache.fluss.metadata.TablePath;
3633
import org.apache.fluss.row.InternalRow;
37-
import org.apache.fluss.metadata.TablePath;
3834
import org.apache.fluss.server.testutils.FlussClusterExtension;
3935
import org.apache.fluss.server.zk.ZooKeeperClient;
4036
import org.apache.fluss.server.zk.data.ServerTags;
@@ -46,7 +42,6 @@
4642
import org.apache.flink.util.CollectionUtil;
4743
import org.junit.jupiter.api.AfterEach;
4844
import org.junit.jupiter.api.BeforeAll;
49-
import org.junit.jupiter.api.BeforeAll;
5045
import org.junit.jupiter.api.BeforeEach;
5146
import org.junit.jupiter.api.Test;
5247
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -64,10 +59,10 @@
6459
import static org.apache.fluss.cluster.rebalance.ServerTag.PERMANENT_OFFLINE;
6560
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
6661
import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
67-
import static org.apache.fluss.testutils.DataTestUtils.row;
68-
import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
6962
import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
63+
import static org.apache.fluss.testutils.DataTestUtils.row;
7064
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
65+
import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
7166
import static org.assertj.core.api.Assertions.assertThat;
7267
import static org.assertj.core.api.Assertions.assertThatThrownBy;
7368

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.lake.source.TestingLakeSplit;
2828
import org.apache.fluss.metadata.TableBucket;
2929

30+
import org.junit.jupiter.api.Test;
3031
import org.junit.jupiter.params.ParameterizedTest;
3132
import org.junit.jupiter.params.provider.ValueSource;
3233

@@ -117,7 +118,8 @@ void testV0Compatibility() throws Exception {
117118
assignedPartitions.put(1L, "partition1");
118119
assignedPartitions.put(2L, "partition2");
119120
SourceEnumeratorState sourceEnumeratorState =
120-
new SourceEnumeratorState(assignedBuckets, assignedPartitions, null);
121+
new SourceEnumeratorState(
122+
assignedBuckets, assignedPartitions, null, new LeaseContext(null, null));
121123
byte[] serialized = serializer.serializeV0(sourceEnumeratorState);
122124

123125
// then deserialize
@@ -134,7 +136,10 @@ void testV0Compatibility() throws Exception {
134136
remainingHybridLakeFlussSplits.add(logSplit);
135137
sourceEnumeratorState =
136138
new SourceEnumeratorState(
137-
assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);
139+
assignedBuckets,
140+
assignedPartitions,
141+
remainingHybridLakeFlussSplits,
142+
new LeaseContext(null, null));
138143

139144
serialized = serializer.serializeV0(sourceEnumeratorState);
140145

@@ -155,7 +160,8 @@ void testInconsistentLakeSourceSerde() throws Exception {
155160
assignedPartitions.put(1L, "partition1");
156161
assignedPartitions.put(2L, "partition2");
157162
SourceEnumeratorState sourceEnumeratorState =
158-
new SourceEnumeratorState(assignedBuckets, assignedPartitions, null);
163+
new SourceEnumeratorState(
164+
assignedBuckets, assignedPartitions, null, new LeaseContext(null, null));
159165
byte[] serialized = serializer.serialize(sourceEnumeratorState);
160166

161167
// test deserialize with nonnull lake source

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@
6060
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
6161
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
6262
import org.apache.fluss.rpc.messages.PbKvSnapshotLeaseForBucket;
63-
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseResponse;
6463
import org.apache.fluss.rpc.messages.RebalanceResponse;
64+
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseResponse;
6565
import org.apache.fluss.rpc.messages.RemoveServerTagResponse;
6666
import org.apache.fluss.rpc.protocol.ApiError;
6767
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
@@ -89,8 +89,8 @@
8989
import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
9090
import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent;
9191
import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
92-
import org.apache.fluss.server.coordinator.event.ReleaseKvSnapshotLeaseEvent;
9392
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
93+
import org.apache.fluss.server.coordinator.event.ReleaseKvSnapshotLeaseEvent;
9494
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
9595
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
9696
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
@@ -125,7 +125,7 @@
125125
import org.apache.fluss.server.zk.data.lake.LakeTable;
126126
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
127127
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
128-
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataHelper;
128+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
129129
import org.apache.fluss.utils.clock.Clock;
130130
import org.apache.fluss.utils.types.Tuple2;
131131

@@ -244,7 +244,7 @@ public CoordinatorEventProcessor(
244244
this.kvSnapshotLeaseManager =
245245
new KvSnapshotLeaseManager(
246246
conf,
247-
new KvSnapshotLeaseMetadataHelper(zooKeeperClient, remoteDataDir),
247+
new KvSnapshotLeaseMetadataManager(zooKeeperClient, remoteDataDir),
248248
coordinatorContext,
249249
clock,
250250
coordinatorMetricGroup);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@
128128
import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
129129
import org.apache.fluss.server.coordinator.event.DropKvSnapshotLeaseEvent;
130130
import org.apache.fluss.server.coordinator.event.EventManager;
131-
import org.apache.fluss.server.coordinator.event.ReleaseKvSnapshotLeaseEvent;
132131
import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent;
133132
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
133+
import org.apache.fluss.server.coordinator.event.ReleaseKvSnapshotLeaseEvent;
134134
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
135135
import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
136136
import org.apache.fluss.server.entity.CommitKvSnapshotData;

0 commit comments

Comments
 (0)