diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java similarity index 79% rename from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java rename to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java index 3967b76724bc..ef38773b7a33 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionUpdateHandlers.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionResources.java @@ -21,22 +21,26 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; /** - * Partition update handler container. + * Partition resource container: handlers and modification counter created during partition setup. */ -class PartitionUpdateHandlers { +class PartitionResources { final StorageUpdateHandler storageUpdateHandler; final IndexUpdateHandler indexUpdateHandler; final GcUpdateHandler gcUpdateHandler; - PartitionUpdateHandlers( + final PartitionModificationCounter modificationCounter; + + PartitionResources( StorageUpdateHandler storageUpdateHandler, IndexUpdateHandler indexUpdateHandler, - GcUpdateHandler gcUpdateHandler + GcUpdateHandler gcUpdateHandler, + PartitionModificationCounter modificationCounter ) { this.storageUpdateHandler = storageUpdateHandler; this.indexUpdateHandler = indexUpdateHandler; this.gcUpdateHandler = gcUpdateHandler; + this.modificationCounter = modificationCounter; } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 38adc3c38569..efa963281328 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -109,7 +109,6 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.Revisions; import org.apache.ignite.internal.metrics.MetricManager; -import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters; @@ -120,13 +119,12 @@ import org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService; import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver; -import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; -import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; @@ -148,17 +146,11 @@ import org.apache.ignite.internal.table.StreamerReceiverRunner; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.SizeSupplier; -import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.gc.MvGc; import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; -import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor; import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser; -import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl; -import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage; -import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; @@ -170,14 +162,11 @@ import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; import org.apache.ignite.internal.tx.impl.TransactionInflights; -import org.apache.ignite.internal.tx.impl.TransactionStateResolver; import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.LongPriorityQueue; -import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.QualifiedName; @@ -266,8 +255,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final ClockService clockService; - private final OutgoingSnapshotsManager outgoingSnapshotsManager; - private final SchemaSyncService executorInclinedSchemaSyncService; private final CatalogService catalogService; @@ -288,28 +275,17 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final SchemaVersions schemaVersions; - private final ValidationSchemasSource validationSchemasSource; - private final PartitionReplicatorNodeRecovery partitionReplicatorNodeRecovery; /** Ends at the {@link IgniteComponent#stopAsync(ComponentContext)} with an {@link NodeStoppingException}. */ private final CompletableFuture stopManagerFuture = new CompletableFuture<>(); - private final ReplicationConfiguration replicationConfiguration; - - /** - * Executes partition operations (that might cause I/O and/or be blocked on locks). - */ - private final Executor partitionOperationsExecutor; - /** Marshallers provider. */ private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider(); /** Index chooser for full state transfer. */ private final FullStateTransferIndexChooser fullStateTransferIndexChooser; - private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry; - private final TransactionInflights transactionInflights; private final String nodeName; @@ -319,8 +295,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { @Nullable private ScheduledExecutorService streamerFlushExecutor; - private final IndexMetaStorage indexMetaStorage; - private final MinimumRequiredTimeCollectorService minTimeCollectorService; @Nullable @@ -349,10 +323,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final MetricManager metricManager; - private final PartitionModificationCounterFactory partitionModificationCounterFactory; private final Map partModCounterMetricSources = new ConcurrentHashMap<>(); private final Map pendingWriteIntentsSuppliers = new ConcurrentHashMap<>(); + private final TablePartitionResourcesFactory partitionResourcesFactory; + /** * Creates a new table manager. * @@ -421,25 +396,18 @@ public TableManager( this.dataStorageMgr = dataStorageMgr; this.metaStorageMgr = metaStorageMgr; this.schemaManager = schemaManager; - this.validationSchemasSource = validationSchemasSource; this.ioExecutor = ioExecutor; - this.partitionOperationsExecutor = partitionOperationsExecutor; this.clockService = clockService; - this.outgoingSnapshotsManager = outgoingSnapshotsManager; this.catalogService = catalogService; this.failureProcessor = failureProcessor; this.observableTimestampTracker = observableTimestampTracker; this.sql = sql; - this.replicationConfiguration = replicationConfiguration; - this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry; this.lowWatermark = lowWatermark; this.transactionInflights = transactionInflights; this.nodeName = nodeName; - this.indexMetaStorage = indexMetaStorage; this.partitionReplicaLifecycleManager = partitionReplicaLifecycleManager; this.minTimeCollectorService = minTimeCollectorService; this.metricManager = metricManager; - this.partitionModificationCounterFactory = partitionModificationCounterFactory; this.executorInclinedSchemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor); this.executorInclinedPlacementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor); @@ -465,6 +433,30 @@ public TableManager( fullStateTransferIndexChooser = new FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage); + partitionResourcesFactory = new TablePartitionResourcesFactory( + txManager, + lockMgr, + scanRequestExecutor, + clockService, + catalogService, + partitionModificationCounterFactory, + outgoingSnapshotsManager, + lowWatermark, + validationSchemasSource, + this.executorInclinedSchemaSyncService, + this.executorInclinedPlacementDriver, + topologyService, + remotelyTriggeredResourceRegistry, + failureProcessor, + schemaManager, + replicationConfiguration, + partitionOperationsExecutor, + indexMetaStorage, + minTimeCollectorService, + mvGc, + fullStateTransferIndexChooser + ); + rebalanceRetryDelayConfiguration = new SystemDistributedConfigurationPropertyHolder<>( systemDistributedConfiguration, (v, r) -> {}, @@ -860,65 +852,49 @@ private void preparePartitionResourcesAndLoadToZoneReplicaBusy( return; } - PartitionDataStorage partitionDataStorage = partitionDataStorage( + PartitionDataStorage partitionDataStorage = partitionResourcesFactory.createPartitionDataStorage( new PartitionKey(zonePartitionId.zoneId(), partId), tableId, mvPartitionStorage ); - PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( + PartitionResources partitionResources = partitionResourcesFactory.createPartitionResources( partId, partitionDataStorage, table, - resources.safeTimeTracker(), - replicationConfiguration, - onNodeRecovery + resources.safeTimeTracker() ); - mvGc.addStorage(tablePartitionId, partitionUpdateHandlers.gcUpdateHandler); + partitionResources.storageUpdateHandler.start(onNodeRecovery); + + registerPartitionTableStatsMetrics(table, partId, partitionResources); + + mvGc.addStorage(tablePartitionId, partitionResources.gcUpdateHandler); minTimeCollectorService.addPartition(new TablePartitionId(tableId, partId)); - TablePartitionReplicaProcessorFactory createListener = (raftClient, transactionStateResolver) -> createReplicaListener( - zonePartitionId, - table, - resources.safeTimeTracker(), - mvPartitionStorage, - partitionUpdateHandlers, - raftClient, - transactionStateResolver - ); + TablePartitionReplicaProcessorFactory createListener = (raftClient, transactionStateResolver) -> + partitionResourcesFactory.createReplicaListener( + zonePartitionId, + table, + resources.safeTimeTracker(), + mvPartitionStorage, + partitionResources, + raftClient, + transactionStateResolver + ); - var tablePartitionRaftListener = new TablePartitionProcessor( - txManager, - partitionDataStorage, - partitionUpdateHandlers.storageUpdateHandler, - catalogService, - table.schemaView(), - indexMetaStorage, - topologyService.localMember().id(), - minTimeCollectorService, - executorInclinedPlacementDriver, - clockService, - zonePartitionId - ); + TablePartitionProcessor tablePartitionProcessor = partitionResourcesFactory.createTablePartitionProcessor( + zonePartitionId, table, partitionDataStorage, partitionResources); - var partitionStorageAccess = new PartitionMvStorageAccessImpl( - partId, - table.internalTable().storage(), - mvGc, - partitionUpdateHandlers.indexUpdateHandler, - partitionUpdateHandlers.gcUpdateHandler, - fullStateTransferIndexChooser, - schemaManager.schemaRegistry(tableId), - lowWatermark - ); + PartitionMvStorageAccess partitionStorageAccess = partitionResourcesFactory.createPartitionMvStorageAccess( + partId, table, partitionResources); partitionReplicaLifecycleManager.loadTableListenerToZoneReplica( zonePartitionId, tableId, createListener, - tablePartitionRaftListener, + tablePartitionProcessor, partitionStorageAccess, onNodeRecovery ); @@ -998,56 +974,6 @@ private CompletableFuture onTableRename(RenameTableEventParameters parameters ); } - private PartitionReplicaListener createReplicaListener( - ZonePartitionId replicationGroupId, - TableViewInternal table, - PendingComparableValuesTracker safeTimeTracker, - MvPartitionStorage mvPartitionStorage, - PartitionUpdateHandlers partitionUpdateHandlers, - RaftCommandRunner raftClient, - TransactionStateResolver transactionStateResolver - ) { - int partitionIndex = replicationGroupId.partitionId(); - - return new PartitionReplicaListener( - mvPartitionStorage, - new ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor), - txManager, - lockMgr, - scanRequestExecutor, - replicationGroupId, - table.tableId(), - table.indexesLockers(partitionIndex), - new Lazy<>(() -> table.indexStorageAdapters(partitionIndex).get().get(table.pkId())), - () -> table.indexStorageAdapters(partitionIndex).get(), - clockService, - safeTimeTracker, - transactionStateResolver, - partitionUpdateHandlers.storageUpdateHandler, - validationSchemasSource, - localNode(), - executorInclinedSchemaSyncService, - catalogService, - executorInclinedPlacementDriver, - topologyService, - remotelyTriggeredResourceRegistry, - schemaManager.schemaRegistry(table.tableId()), - indexMetaStorage, - lowWatermark, - failureProcessor, - table.metrics() - ); - } - - private PartitionDataStorage partitionDataStorage(PartitionKey partitionKey, int tableId, MvPartitionStorage partitionStorage) { - return new SnapshotAwarePartitionDataStorage( - tableId, - partitionStorage, - outgoingSnapshotsManager, - partitionKey - ); - } - @Override public void beforeNodeStop() { if (!beforeStopGuard.compareAndSet(false, true)) { @@ -1569,6 +1495,38 @@ private CompletableFuture stopTablePartition(TablePartitionId tablePartiti // In case of colocation there shouldn't be any table replica and thus it shouldn't be stopped. minTimeCollectorService.removePartition(tablePartitionId); + unregisterPartitionMetrics(tablePartitionId, table.name()); + + return mvGc.removeStorage(tablePartitionId); + } + + private void registerPartitionTableStatsMetrics( + TableViewInternal table, + int partitionId, + PartitionResources partitionResources + ) { + PartitionTableStatsMetricSource metricSource = + new PartitionTableStatsMetricSource(table.tableId(), partitionId, partitionResources.modificationCounter); + + try { + // Only register this Metrics Source and do not enable it by default + // as it is intended for online troubleshooting purposes only (see IGNITE-27813). + metricManager.registerSource(metricSource); + + TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), partitionId); + + partModCounterMetricSources.put(tablePartitionId, metricSource); + } catch (Exception e) { + LOG.warn("Failed to register metrics source for table [name={}, partitionId={}].", e, table.name(), partitionId); + } + + pendingWriteIntentsSuppliers.put( + new TablePartitionId(table.tableId(), partitionId), + partitionResources.storageUpdateHandler::getPendingRowCount + ); + } + + private void unregisterPartitionMetrics(TablePartitionId tablePartitionId, String tableName) { PartitionTableStatsMetricSource metricSource = partModCounterMetricSources.remove(tablePartitionId); pendingWriteIntentsSuppliers.remove(tablePartitionId); if (metricSource != null) { @@ -1576,11 +1534,19 @@ private CompletableFuture stopTablePartition(TablePartitionId tablePartiti metricManager.unregisterSource(metricSource); } catch (Exception e) { String message = "Failed to unregister metrics source for table [name={}, partitionId={}]."; - LOG.warn(message, e, table.name(), tablePartitionId.partitionId()); + LOG.warn(message, e, tableName, tablePartitionId.partitionId()); } } + } - return mvGc.removeStorage(tablePartitionId); + private long totalPendingWriteIntents() { + long sum = 0; + + for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) { + sum += supplier.getAsLong(); + } + + return sum; } private CompletableFuture destroyPartitionStorages( @@ -1607,78 +1573,6 @@ private CompletableFuture destroyPartitionStorages( return allOf(destroyFutures.toArray(new CompletableFuture[]{})); } - private InternalClusterNode localNode() { - return topologyService.localMember(); - } - - private PartitionUpdateHandlers createPartitionUpdateHandlers( - int partitionId, - PartitionDataStorage partitionDataStorage, - TableViewInternal table, - PendingComparableValuesTracker safeTimeTracker, - ReplicationConfiguration replicationConfiguration, - boolean onNodeRecovery - ) { - TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId); - - IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes); - - GcUpdateHandler gcUpdateHandler = new GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler); - - SizeSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize(); - - PartitionModificationCounter modificationCounter = - partitionModificationCounterFactory.create(partSizeSupplier, table::stalenessConfiguration, table.tableId(), partitionId); - - StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( - partitionId, - partitionDataStorage, - indexUpdateHandler, - replicationConfiguration, - modificationCounter, - txManager - ); - - storageUpdateHandler.start(onNodeRecovery); - - registerPartitionTableStatsMetrics(table, partitionId, modificationCounter); - - pendingWriteIntentsSuppliers.put(new TablePartitionId(table.tableId(), partitionId), storageUpdateHandler::getPendingRowCount); - - return new PartitionUpdateHandlers(storageUpdateHandler, indexUpdateHandler, gcUpdateHandler); - } - - private void registerPartitionTableStatsMetrics( - TableViewInternal table, - int partitionId, - PartitionModificationCounter counter - ) { - PartitionTableStatsMetricSource metricSource = - new PartitionTableStatsMetricSource(table.tableId(), partitionId, counter); - - try { - // Only register this Metrics Source and do not enable it by default - // as it is intended for online troubleshooting purposes only. - metricManager.registerSource(metricSource); - - TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), partitionId); - - partModCounterMetricSources.put(tablePartitionId, metricSource); - } catch (Exception e) { - LOG.warn("Failed to register metrics source for table [name={}, partitionId={}].", e, table.name(), partitionId); - } - } - - private long totalPendingWriteIntents() { - long sum = 0; - - for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) { - sum += supplier.getAsLong(); - } - - return sum; - } - /** * Returns a cached table instance if it exists, {@code null} otherwise. Can return a table that is being stopped. * diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java new file mode 100644 index 000000000000..b5a35c50f17f --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.failure.FailureProcessor; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lowwatermark.LowWatermark; +import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.placementdriver.LeasePlacementDriver; +import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; +import org.apache.ignite.internal.schema.SchemaManager; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.SizeSupplier; +import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; +import org.apache.ignite.internal.table.distributed.gc.MvGc; +import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; +import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; +import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor; +import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl; +import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage; +import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; +import org.apache.ignite.internal.tx.impl.TransactionStateResolver; +import org.apache.ignite.internal.util.Lazy; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; + +/** + * Stateless factory for creating partition-level resources: data storage wrappers, update handlers, and replica listeners. + * + *

This factory performs pure construction only — it does not start components, register metrics, + * or own any mutable state. Lifecycle management (start/stop, metric registration/deregistration) + * remains in {@link TableManager}. + * + *

Lifecycle ordering: the caller must invoke {@link StorageUpdateHandler#start} on the + * {@link PartitionResources#storageUpdateHandler} returned by {@link #createPartitionResources} before + * the constructed objects ({@link TablePartitionProcessor}, {@link PartitionMvStorageAccess}, + * {@link PartitionReplicaListener}) are used at runtime. + */ +class TablePartitionResourcesFactory { + private final TxManager txManager; + private final LockManager lockManager; + private final ExecutorService scanRequestExecutor; + private final ClockService clockService; + private final CatalogService catalogService; + private final PartitionModificationCounterFactory partitionModificationCounterFactory; + private final OutgoingSnapshotsManager outgoingSnapshotsManager; + private final LowWatermark lowWatermark; + private final ValidationSchemasSource validationSchemasSource; + private final SchemaSyncService schemaSyncService; + private final LeasePlacementDriver placementDriver; + private final TopologyService topologyService; + private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry; + private final FailureProcessor failureProcessor; + private final SchemaManager schemaManager; + private final ReplicationConfiguration replicationConfiguration; + private final Executor partitionOperationsExecutor; + private final IndexMetaStorage indexMetaStorage; + private final MinimumRequiredTimeCollectorService minTimeCollectorService; + private final MvGc mvGc; + private final FullStateTransferIndexChooser fullStateTransferIndexChooser; + + TablePartitionResourcesFactory( + TxManager txManager, + LockManager lockManager, + ExecutorService scanRequestExecutor, + ClockService clockService, + CatalogService catalogService, + PartitionModificationCounterFactory partitionModificationCounterFactory, + OutgoingSnapshotsManager outgoingSnapshotsManager, + LowWatermark lowWatermark, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + LeasePlacementDriver placementDriver, + TopologyService topologyService, + RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, + FailureProcessor failureProcessor, + SchemaManager schemaManager, + ReplicationConfiguration replicationConfiguration, + Executor partitionOperationsExecutor, + IndexMetaStorage indexMetaStorage, + MinimumRequiredTimeCollectorService minTimeCollectorService, + MvGc mvGc, + FullStateTransferIndexChooser fullStateTransferIndexChooser + ) { + this.txManager = txManager; + this.lockManager = lockManager; + this.scanRequestExecutor = scanRequestExecutor; + this.clockService = clockService; + this.catalogService = catalogService; + this.partitionModificationCounterFactory = partitionModificationCounterFactory; + this.outgoingSnapshotsManager = outgoingSnapshotsManager; + this.lowWatermark = lowWatermark; + this.validationSchemasSource = validationSchemasSource; + this.schemaSyncService = schemaSyncService; + this.placementDriver = placementDriver; + this.topologyService = topologyService; + this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry; + this.failureProcessor = failureProcessor; + this.schemaManager = schemaManager; + this.replicationConfiguration = replicationConfiguration; + this.partitionOperationsExecutor = partitionOperationsExecutor; + this.indexMetaStorage = indexMetaStorage; + this.minTimeCollectorService = minTimeCollectorService; + this.mvGc = mvGc; + this.fullStateTransferIndexChooser = fullStateTransferIndexChooser; + } + + /** + * Creates a {@link PartitionDataStorage} for the given partition. + * + * @param partitionKey Partition key. + * @param tableId Table ID. + * @param partitionStorage MV partition storage. + * @return Partition data storage. + */ + PartitionDataStorage createPartitionDataStorage(PartitionKey partitionKey, int tableId, MvPartitionStorage partitionStorage) { + return new SnapshotAwarePartitionDataStorage( + tableId, + partitionStorage, + outgoingSnapshotsManager, + partitionKey + ); + } + + /** + * Creates partition resources (index update handler, GC update handler, storage update handler, modification counter). + * + *

The returned resources are not started — the caller must invoke + * {@link StorageUpdateHandler#start} on {@link PartitionResources#storageUpdateHandler} + * before the constructed partition objects are used at runtime. + * + * @param partitionId Partition ID. + * @param partitionDataStorage Partition data storage. + * @param table Table view. + * @param safeTimeTracker Safe time tracker. + * @return Partition resources. + */ + PartitionResources createPartitionResources( + int partitionId, + PartitionDataStorage partitionDataStorage, + TableViewInternal table, + PendingComparableValuesTracker safeTimeTracker + ) { + TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId); + + IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes); + + GcUpdateHandler gcUpdateHandler = new GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler); + + SizeSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize(); + + PartitionModificationCounter modificationCounter = + partitionModificationCounterFactory.create(partSizeSupplier, table::stalenessConfiguration, table.tableId(), partitionId); + + StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( + partitionId, + partitionDataStorage, + indexUpdateHandler, + replicationConfiguration, + modificationCounter, + txManager + ); + + return new PartitionResources(storageUpdateHandler, indexUpdateHandler, gcUpdateHandler, modificationCounter); + } + + /** + * Creates a {@link TablePartitionProcessor} for the given partition. + * + * @param zonePartitionId Zone partition ID. + * @param table Table view. + * @param partitionDataStorage Partition data storage. + * @param partitionResources Partition resources. + * @return Table partition processor. + */ + TablePartitionProcessor createTablePartitionProcessor( + ZonePartitionId zonePartitionId, + TableViewInternal table, + PartitionDataStorage partitionDataStorage, + PartitionResources partitionResources + ) { + return new TablePartitionProcessor( + txManager, + partitionDataStorage, + partitionResources.storageUpdateHandler, + catalogService, + table.schemaView(), + indexMetaStorage, + topologyService.localMember().id(), + minTimeCollectorService, + placementDriver, + clockService, + zonePartitionId + ); + } + + /** + * Creates a {@link PartitionMvStorageAccess} for the given partition. + * + * @param partitionId Partition ID. + * @param table Table view. + * @param partitionResources Partition resources. + * @return Partition MV storage access. + */ + PartitionMvStorageAccess createPartitionMvStorageAccess( + int partitionId, + TableViewInternal table, + PartitionResources partitionResources + ) { + return new PartitionMvStorageAccessImpl( + partitionId, + table.internalTable().storage(), + mvGc, + partitionResources.indexUpdateHandler, + partitionResources.gcUpdateHandler, + fullStateTransferIndexChooser, + schemaManager.schemaRegistry(table.tableId()), + lowWatermark + ); + } + + /** + * Creates a {@link PartitionReplicaListener} for the given partition. + * + * @param replicationGroupId Zone partition ID used as the replication group ID. + * @param table Table view. + * @param safeTimeTracker Safe time tracker. + * @param mvPartitionStorage MV partition storage. + * @param partitionResources Partition resources. + * @param raftClient Raft command runner. + * @param transactionStateResolver Transaction state resolver. + * @return Partition replica listener. + */ + PartitionReplicaListener createReplicaListener( + ZonePartitionId replicationGroupId, + TableViewInternal table, + PendingComparableValuesTracker safeTimeTracker, + MvPartitionStorage mvPartitionStorage, + PartitionResources partitionResources, + RaftCommandRunner raftClient, + TransactionStateResolver transactionStateResolver + ) { + int partitionIndex = replicationGroupId.partitionId(); + + return new PartitionReplicaListener( + mvPartitionStorage, + new ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor), + txManager, + lockManager, + scanRequestExecutor, + replicationGroupId, + table.tableId(), + table.indexesLockers(partitionIndex), + new Lazy<>(() -> table.indexStorageAdapters(partitionIndex).get().get(table.pkId())), + () -> table.indexStorageAdapters(partitionIndex).get(), + clockService, + safeTimeTracker, + transactionStateResolver, + partitionResources.storageUpdateHandler, + validationSchemasSource, + topologyService.localMember(), + schemaSyncService, + catalogService, + placementDriver, + topologyService, + remotelyTriggeredResourceRegistry, + schemaManager.schemaRegistry(table.tableId()), + indexMetaStorage, + lowWatermark, + failureProcessor, + table.metrics() + ); + } +} diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java index a1c60506c1b1..89eb454fd4db 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java @@ -155,6 +155,7 @@ import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.InjectExecutorService; +import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; @@ -567,7 +568,7 @@ private void startComponents() throws Exception { replicationConfiguration, clusterService.messagingService(), clusterService.topologyService(), - null, + mock(LockManager.class), null, txManager, dsm, diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index afa7bd16407a..e4f89aeaba30 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager; import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; @@ -266,6 +267,7 @@ void before() { assertThat("Catalog initialization", catalogManager.catalogInitializationFuture(), willCompleteSuccessfully()); when(clusterService.messagingService()).thenReturn(mock(MessagingService.class)); + when(clusterService.topologyService()).thenReturn(mock(TopologyService.class)); when(tm.transactionMetricsSource()).thenReturn(mock(TransactionMetricsSource.class)); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java index f4acf416e403..b9cad1da0b06 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java @@ -71,7 +71,7 @@ public class TableTestUtils { new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0, () -> new TableStatsStalenessConfiguration(0, 0)); /** No-op partition modification counter factory. */ - public static PartitionModificationCounterFactory NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY = + public static final PartitionModificationCounterFactory NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY = new PartitionModificationCounterFactory(() -> HybridTimestamp.MIN_VALUE, mock(MessagingService.class)) { @Override public PartitionModificationCounter create(