Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.client.fakes;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.util.Collection;
Expand All @@ -39,7 +38,6 @@
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
Expand Down Expand Up @@ -212,16 +210,6 @@ public boolean isRolledBackWithTimeoutExceeded() {
return null;
}

@Override
public CompletableFuture<@Nullable TransactionMeta> checkEnlistedPartitionsAndAbortIfNeeded(
TxStateMeta txMeta,
InternalTransaction tx,
long currentEnlistmentConsistencyToken,
ZonePartitionId senderGroupId
) {
return completedFuture(stateMeta(tx.id()));
}

@Override
public <T extends TxStateMeta> T updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,7 @@ private class Node {
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
threadPoolsManager.commonScheduler(),
clockService,
placementDriver,
schemaSyncService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.Lazy;

/**
* Component is responsible for building indexes and making them {@link CatalogIndexStatus#AVAILABLE available}. Both in a running cluster
Expand Down Expand Up @@ -120,7 +122,10 @@ public IndexBuildingManager(
clusterService.topologyService(),
clusterService.messagingService(),
new ExecutorInclinedPlacementDriver(placementDriver, executor),
new TxMessageSender(clusterService.messagingService(), replicaService, clockService)
new TxMessageSender(clusterService.messagingService(), replicaService, clockService),
new TxRecoveryEngine(txManager, clusterService.topologyService()),
new Lazy<>(() -> clusterService.topologyService().localMember()),
executor
);

indexBuilder = new IndexBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, Operation success,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
threadPoolsManager.commonScheduler(),
clockService,
placementDriverManager.placementDriver(),
schemaSyncService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,14 @@
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorageRebalanceException;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.LongPriorityQueue;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
Expand Down Expand Up @@ -287,6 +289,8 @@ public class PartitionReplicaLifecycleManager extends

private final TxMessageSender txMessageSender;

private final TxRecoveryEngine txRecoveryEngine;

private final EventListener<CreateZoneEventParameters> onCreateZoneListener = this::onCreateZone;
private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
private final EventListener<DropZoneEventParameters> onZoneDropListener = fromConsumer(this::onZoneDrop);
Expand All @@ -312,6 +316,7 @@ public class PartitionReplicaLifecycleManager extends
* @param rebalanceScheduler Executor for scheduling rebalance routine.
* @param partitionOperationsExecutor Striped executor on which partition operations (potentially requiring I/O with storages)
* will be executed.
* @param commonExecutor Common executor.
* @param clockService Clock service.
* @param placementDriver Placement driver.
* @param schemaSyncService Schema synchronization service.
Expand All @@ -336,6 +341,7 @@ public PartitionReplicaLifecycleManager(
ExecutorService ioExecutor,
ScheduledExecutorService rebalanceScheduler,
Executor partitionOperationsExecutor,
Executor commonExecutor,
ClockService clockService,
PlacementDriver placementDriver,
SchemaSyncService schemaSyncService,
Expand All @@ -360,6 +366,7 @@ public PartitionReplicaLifecycleManager(
ioExecutor,
rebalanceScheduler,
partitionOperationsExecutor,
commonExecutor,
clockService,
placementDriver,
schemaSyncService,
Expand Down Expand Up @@ -395,6 +402,7 @@ public PartitionReplicaLifecycleManager(
ExecutorService ioExecutor,
ScheduledExecutorService rebalanceScheduler,
Executor partitionOperationsExecutor,
Executor commonExecutor,
ClockService clockService,
PlacementDriver placementDriver,
SchemaSyncService schemaSyncService,
Expand Down Expand Up @@ -434,6 +442,8 @@ public PartitionReplicaLifecycleManager(
Integer::parseInt
);

txRecoveryEngine = new TxRecoveryEngine(txManager, topologyService);

txMessageSender = new TxMessageSender(
messagingService,
replicaService,
Expand All @@ -446,7 +456,10 @@ public PartitionReplicaLifecycleManager(
topologyService,
messagingService,
executorInclinedPlacementDriver,
txMessageSender
txMessageSender,
txRecoveryEngine,
new Lazy<>(topologyService::localMember),
commonExecutor
);

pendingAssignmentsRebalanceListener = createPendingAssignmentsRebalanceListener();
Expand Down Expand Up @@ -838,7 +851,8 @@ private CompletableFuture<?> createZonePartitionReplicationNode(
topologyService.localMember(),
zonePartitionId,
transactionStateResolver,
txMessageSender
txMessageSender,
txRecoveryEngine
);

zoneResources.replicaListenerFuture().complete(replicaListener);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TableAware;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
Expand Down Expand Up @@ -120,7 +120,8 @@ public ZonePartitionReplicaListener(
InternalClusterNode localNode,
ZonePartitionId replicationGroupId,
TransactionStateResolver transactionStateResolver,
TxMessageSender txMessageSender
TxMessageSender txMessageSender,
TxRecoveryEngine txRecoveryEngine
) {
this.raftClient = raftClient;
this.failureProcessor = failureProcessor;
Expand All @@ -143,13 +144,6 @@ public ZonePartitionReplicaListener(

ReplicationRaftCommandApplicator raftCommandApplicator = new ReplicationRaftCommandApplicator(raftClient, replicationGroupId);

TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
txManager,
clusterNodeResolver,
replicationGroupId,
ZonePartitionReplicaListener::createAbandonedTxRecoveryEnlistment
);

// Request handlers initialization.

txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
Expand Down Expand Up @@ -177,12 +171,19 @@ public ZonePartitionReplicaListener(
txStatePartitionStorage,
txManager,
clusterNodeResolver,
localNode,
txRecoveryEngine,
txMessageSender
txMessageSender,
replicationGroupId,
localNode
);

txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine, txManager);
txRecoveryMessageHandler = new TxRecoveryMessageHandler(
txStatePartitionStorage,
replicationGroupId,
txRecoveryEngine,
txManager,
localNode
);

txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(
txStatePartitionStorage,
Expand All @@ -201,13 +202,6 @@ public ZonePartitionReplicaListener(
replicaSafeTimeSyncRequestHandler = new ReplicaSafeTimeSyncRequestHandler(clockService, raftCommandApplicator);
}

private static PendingTxPartitionEnlistment createAbandonedTxRecoveryEnlistment(InternalClusterNode node) {
// Enlistment consistency token is not required for the rollback, so it is 0L.
// Passing an empty set of table IDs as we don't know which tables were enlisted; this is ok as the corresponding write intents
// can still be resolved later when reads stumble upon them.
return new PendingTxPartitionEnlistment(node.name(), 0L);
}

@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) {
return replicaPrimacyEngine.validatePrimacy(request)
Expand Down Expand Up @@ -237,7 +231,7 @@ private CompletableFuture<?> processRequest(
} else if (request instanceof WriteIntentSwitchReplicaRequest) {
return writeIntentSwitchRequestHandler.handle((WriteIntentSwitchReplicaRequest) request, senderId);
} else if (request instanceof TxStateCommitPartitionRequest) {
return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request);
return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request, senderId);
} else if (request instanceof TxRecoveryMessage) {
return txRecoveryMessageHandler.handle((TxRecoveryMessage) request, senderId);
} else if (request instanceof TxCleanupRecoveryRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TransactionLogUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;

Expand All @@ -41,18 +42,21 @@ public class TxRecoveryMessageHandler {
private final ZonePartitionId replicationGroupId;
private final TxRecoveryEngine txRecoveryEngine;
private final TxManager txManager;
private final InternalClusterNode localNode;

/** Constructor. */
public TxRecoveryMessageHandler(
TxStatePartitionStorage txStatePartitionStorage,
ZonePartitionId replicationGroupId,
TxRecoveryEngine txRecoveryEngine,
TxManager txManager
TxManager txManager,
InternalClusterNode localNode
) {
this.txStatePartitionStorage = txStatePartitionStorage;
this.replicationGroupId = replicationGroupId;
this.txRecoveryEngine = txRecoveryEngine;
this.txManager = txManager;
this.localNode = localNode;
}

/**
Expand All @@ -61,7 +65,7 @@ public TxRecoveryMessageHandler(
* @param request Tx recovery request.
* @return The future is complete when the transaction state is finalized.
*/
public CompletableFuture<Void> handle(TxRecoveryMessage request, UUID senderId) {
public CompletableFuture<?> handle(TxRecoveryMessage request, UUID senderId) {
UUID txId = request.txId();

TxMeta txMeta = txStatePartitionStorage.get(txId);
Expand All @@ -78,6 +82,12 @@ public CompletableFuture<Void> handle(TxRecoveryMessage request, UUID senderId)
txMeta
);

return txRecoveryEngine.triggerTxRecovery(txId, senderId);
return txRecoveryEngine.triggerTxRecovery(
txId,
replicationGroupId,
localNode.name(),
null,
null
);
}
}
Loading