Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,17 @@
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.messages.InitWriterResponse;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.PrepareLakeTableSnapshotRequest;
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
import org.apache.fluss.security.acl.AccessControlEntry;
import org.apache.fluss.security.acl.AccessControlEntryFilter;
Expand Down Expand Up @@ -825,6 +831,125 @@ void testDynamicConfigs() throws ExecutionException, InterruptedException {
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
}

@Test
void testAdjustIsr() throws Exception {
AdjustIsrRequest request = new AdjustIsrRequest().setServerId(-1);

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// test adjustIsr with external connection (CLIENT listener)
// External connections should be rejected
assertThatThrownBy(() -> guestGateway.adjustIsr(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining("Only internal requests are permitted.");
}

// test adjustIsr with internal connection (FLUSS listener)
// Internal connections should bypass authorization check
CoordinatorGateway internalGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
CoordinatorGateway.class);

// Even without any ACL permission, internal connection should succeed
// (won't throw AuthorizationException)
// The request may fail for other reasons (e.g., empty tables),
// but it should not fail due to authorization
internalGateway.adjustIsr(request).get();
}

@Test
void testCommitKvSnapshot() throws Exception {
CommitKvSnapshotRequest request =
new CommitKvSnapshotRequest()
.setCoordinatorEpoch(-1)
.setBucketLeaderEpoch(-1)
.setCompletedSnapshot(new byte[0]);

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// test commitKvSnapshot with external connection (CLIENT listener)
// External connections should be rejected
assertThatThrownBy(() -> guestGateway.commitKvSnapshot(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining("Only internal requests are permitted.");
}

// test commitKvSnapshot with internal connection (FLUSS listener)
// Internal connections should bypass authorization check
CoordinatorGateway internalGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
CoordinatorGateway.class);

// Even without any ACL permission, internal connection should succeed
// (won't throw AuthorizationException)
// The request may fail for other reasons (e.g., invalid snapshot data),
// but it should not fail due to authorization
assertThatThrownBy(() -> internalGateway.commitKvSnapshot(request).get())
.rootCause()
.isNotInstanceOf(AuthorizationException.class);
}

@Test
void testCommitRemoteLogManifest() throws Exception {
CommitRemoteLogManifestRequest request =
new CommitRemoteLogManifestRequest()
.setTableId(-1)
.setBucketId(-1)
.setRemoteLogManifestPath("test-path")
.setRemoteLogStartOffset(0)
.setRemoteLogEndOffset(0)
.setCoordinatorEpoch(-1)
.setBucketLeaderEpoch(-1);

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// test commitRemoteLogManifest with external connection (CLIENT listener)
// External connections should be rejected
assertThatThrownBy(() -> guestGateway.commitRemoteLogManifest(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining("Only internal requests are permitted.");
}

// test commitRemoteLogManifest with internal connection (FLUSS listener)
// Internal connections should bypass authorization check
CoordinatorGateway internalGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
CoordinatorGateway.class);

// Even without any ACL permission, internal connection should succeed
// (won't throw AuthorizationException)
// The request may fail for other reasons (e.g., invalid manifest data),
// but it should not fail due to authorization
internalGateway.commitRemoteLogManifest(request).get();
}

@Test
void testControlledShutdown() throws Exception {
ControlledShutdownRequest request =
Expand All @@ -838,14 +963,12 @@ void testControlledShutdown() throws Exception {
rpcClient,
CoordinatorGateway.class);

// test controlledShutdown without ALTER permission on cluster resource
// test controlledShutdown with external connection (CLIENT listener)
// External connections should be rejected
assertThatThrownBy(() -> guestGateway.controlledShutdown(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));
.hasMessageContaining("Only internal requests are permitted.");
}

// test controlledShutdown with internal connection (FLUSS listener)
Expand Down Expand Up @@ -1020,6 +1143,126 @@ void testCancelRebalance() throws Exception {
guestAdmin.cancelRebalance(null).get();
}

@Test
void testPrepareLakeTableSnapshot() throws Exception {
PrepareLakeTableSnapshotRequest request = new PrepareLakeTableSnapshotRequest();

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// test prepareLakeTableSnapshot without WRITE permission on cluster resource
assertThatThrownBy(() -> guestGateway.prepareLakeTableSnapshot(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add WRITE permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.WRITE,
PermissionType.ALLOW))))
.all()
.get();

// test prepareLakeTableSnapshot with WRITE permission should succeed
guestGateway.prepareLakeTableSnapshot(request).get();
}
}

@Test
void testCommitLakeTableSnapshot() throws Exception {
CommitLakeTableSnapshotRequest request = new CommitLakeTableSnapshotRequest();

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// test commitLakeTableSnapshot without WRITE permission on cluster resource
assertThatThrownBy(() -> guestGateway.commitLakeTableSnapshot(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add WRITE permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.WRITE,
PermissionType.ALLOW))))
.all()
.get();

// test commitLakeTableSnapshot with WRITE permission should succeed
guestGateway.commitLakeTableSnapshot(request).get();
}
}

@Test
void testLakeTieringHeartbeat() throws Exception {
LakeTieringHeartbeatRequest request = new LakeTieringHeartbeatRequest();

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// test lakeTieringHeartbeat without READ permission on cluster resource
assertThatThrownBy(() -> guestGateway.lakeTieringHeartbeat(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate READ on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add READ permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
READ,
PermissionType.ALLOW))))
.all()
.get();

// test lakeTieringHeartbeat with READ permission should succeed
guestGateway.lakeTieringHeartbeat(request).get();
}
}

private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
Expand Down Expand Up @@ -184,6 +185,18 @@ public void authorizeTable(OperationType operationType, TablePath tablePath) {
}
}

public void authorizeCluster(OperationType operationType) {
if (authorizer != null) {
authorizer.authorize(currentSession(), operationType, Resource.cluster());
}
}

public void authorizeInternal() {
if (!currentSession().isInternal()) {
throw new AuthorizationException("Only internal requests are permitted.");
}
}

@Override
public CompletableFuture<ApiVersionsResponse> apiVersions(ApiVersionsRequest request) {
Set<ApiKeys> apiKeys = apiManager.enabledApis();
Expand Down
Loading
Loading