diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockService.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockService.java index b538ab51aaf3..3c8a059d94c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockService.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockService.java @@ -64,6 +64,17 @@ public interface ClockService { */ HybridTimestamp updateClock(HybridTimestamp requestTime); + /** + * Advances the clock in accordance with the request time. If the request time is ahead of the clock, + * the clock is advanced to the tick that is next to the request time; otherwise, it's advanced to the tick + * that is next to the local time. + * + * @param requestTime Timestamp from request. + * @param checkClockSkew Whether to check for clock skew and log warnings if exceeded. + * @return New local hybrid timestamp that is on the clock (it is ahead of both the old clock time and the request time). + */ + HybridTimestamp updateClock(HybridTimestamp requestTime, boolean checkClockSkew); + /** * Wait for the clock to reach the given timestamp. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java index e96271dbf807..dc112e11bcbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java @@ -85,10 +85,14 @@ public HybridTimestamp updateClock(HybridTimestamp requestTime) { currentLocalTimestamp, maxClockSkewMillis()); onMaxClockSkewExceededClosure.accept(requestTimePhysical - currentLocalTimePhysical); } - return clock.update(requestTime); } + @Override + public HybridTimestamp updateClock(HybridTimestamp requestTime, boolean checkClockSkew) { + return checkClockSkew ? updateClock(requestTime) : clock.update(requestTime); + } + @Override public CompletableFuture waitFor(HybridTimestamp targetTimestamp) { return clockWaiter.waitFor(targetTimestamp); diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/hlc/TestClockService.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/hlc/TestClockService.java index bcaf0a3537e2..5fae8626d4cd 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/hlc/TestClockService.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/hlc/TestClockService.java @@ -70,6 +70,11 @@ public HybridTimestamp updateClock(HybridTimestamp requestTime) { return clock.update(requestTime); } + @Override + public HybridTimestamp updateClock(HybridTimestamp requestTime, boolean checkClockSkew) { + return clock.update(requestTime); + } + @Override public CompletableFuture waitFor(HybridTimestamp targetTimestamp) { if (clockWaiter == null) { diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java index 9380a6d0c8a8..b94f93748473 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.hlc.TestClockService; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.manager.ComponentContext; @@ -182,6 +183,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends IgniteAbstractTest { private final HybridClock clock = new HybridClockImpl(); + private final ClockService clockService = new TestClockService(clock); + private final Map storagesByTableId = new HashMap<>(); private static class MockMvPartitionStorage { @@ -336,7 +339,8 @@ private RaftGroupService startRaftGroupNode(int... tableIds) throws NodeStopping new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE), new PendingComparableValuesTracker<>(0L), outgoingSnapshotsManager, - executor + executor, + clockService ); for (int tableId : tableIds) { @@ -392,6 +396,10 @@ private RaftTableProcessor createTableProcessor(int tableId) { ClockService clockService = mock(ClockService.class); lenient().when(clockService.current()).thenReturn(clock.current()); + lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> { + HybridTimestamp requestTime = invocation.getArgument(0); + return clock.update(requestTime); + }); return new TablePartitionProcessor( txManager, diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index d4e366fb5f0a..567cf8500810 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -382,7 +382,8 @@ public PartitionReplicaLifecycleManager( catalogService, failureProcessor, partitionOperationsExecutor, - replicaMgr + replicaMgr, + clockService ), metricManager, messagingService, diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java index 099bc16255bf..3249c4511686 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.failure.FailureProcessor; +import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener; import org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl; @@ -71,6 +72,8 @@ public class ZoneResourcesManager implements ManuallyCloseable { private final ReplicaManager replicaManager; + private final ClockService clockService; + private final RaftSnapshotsMetricsSource snapshotsMetricsSource = new RaftSnapshotsMetricsSource(); /** Map from zone IDs to their resource holders. */ @@ -86,7 +89,8 @@ public class ZoneResourcesManager implements ManuallyCloseable { CatalogService catalogService, FailureProcessor failureProcessor, Executor partitionOperationsExecutor, - ReplicaManager replicaManager + ReplicaManager replicaManager, + ClockService clockService ) { this.sharedTxStateStorage = sharedTxStateStorage; this.txManager = txManager; @@ -96,6 +100,7 @@ public class ZoneResourcesManager implements ManuallyCloseable { this.failureProcessor = failureProcessor; this.partitionOperationsExecutor = partitionOperationsExecutor; this.replicaManager = replicaManager; + this.clockService = clockService; } ZonePartitionResources allocateZonePartitionResources( @@ -119,7 +124,8 @@ ZonePartitionResources allocateZonePartitionResources( safeTimeTracker, storageIndexTracker, outgoingSnapshotsManager, - partitionOperationsExecutor + partitionOperationsExecutor, + clockService ); var snapshotStorage = new PartitionSnapshotStorage( diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java index 9eabd1db0709..3011de42ad74 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.concurrent.Executor; import java.util.function.Consumer; +import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -111,6 +112,8 @@ public class ZonePartitionRaftListener implements RaftGroupListener { private final RaftGroupConfigurationConverter raftGroupConfigurationConverter = new RaftGroupConfigurationConverter(); + private final ClockService clockService; + /** Constructor. */ public ZonePartitionRaftListener( ZonePartitionId zonePartitionId, @@ -119,13 +122,15 @@ public ZonePartitionRaftListener( SafeTimeValuesTracker safeTimeTracker, PendingComparableValuesTracker storageIndexTracker, PartitionsSnapshots partitionsSnapshots, - Executor partitionOperationsExecutor + Executor partitionOperationsExecutor, + ClockService clockService ) { this.safeTimeTracker = safeTimeTracker; this.storageIndexTracker = storageIndexTracker; this.partitionsSnapshots = partitionsSnapshots; this.txStateStorage = txStatePartitionStorage; this.partitionKey = new PartitionKey(zonePartitionId.zoneId(), zonePartitionId.partitionId()); + this.clockService = clockService; onSnapshotSaveHandler = new OnSnapshotSaveHandler(txStatePartitionStorage, partitionOperationsExecutor); @@ -261,6 +266,7 @@ private void processWriteCommand(CommandClosure clo) { // Adjust safe time before completing update to reduce waiting. if (safeTimestamp != null) { try { + clockService.updateClock(safeTimestamp, false); safeTimeTracker.update(safeTimestamp, commandIndex, commandTerm, command); } catch (TrackerClosedException ignored) { // Ignored. diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java index ff9e319a5990..eec1e20389d0 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java @@ -263,7 +263,8 @@ void setUp( catalogService, failureManager, executorService, - replicaManager + replicaManager, + mock(ClockService.class) ) { @Override protected TxStateStorage createTxStateStorage(int zoneId, int partitionCount) { diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java index 99a1ab483288..d4866d764a00 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.components.LogSyncer; import org.apache.ignite.internal.failure.FailureProcessor; +import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.manager.ComponentContext; @@ -96,7 +97,8 @@ void init( catalogService, mock(FailureProcessor.class), executor, - replicaManager + replicaManager, + mock(ClockService.class) ); assertThat(sharedStorage.startAsync(new ComponentContext()), willCompleteSuccessfully()); } diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java index 650036f82f89..ebd0631df6b9 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java @@ -193,6 +193,12 @@ void setUp() { } private ZonePartitionRaftListener createListener() { + HybridClock clock = mock(HybridClock.class); + ClockService clockService = mock(ClockService.class); + lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> { + HybridTimestamp requestTime = invocation.getArgument(0); + return clock.update(requestTime); + }); return new ZonePartitionRaftListener( new ZonePartitionId(ZONE_ID, PARTITION_ID), txStatePartitionStorage, @@ -200,7 +206,8 @@ private ZonePartitionRaftListener createListener() { safeTimeTracker, storageIndexTracker, outgoingSnapshotsManager, - executor + executor, + clockService ); } @@ -950,6 +957,10 @@ private TablePartitionProcessor partitionListener(int tableId) { ClockService clockService = mock(ClockService.class); lenient().when(clockService.current()).thenReturn(clock.current()); + lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> { + HybridTimestamp requestTime = invocation.getArgument(0); + return clock.update(requestTime); + }); StorageUpdateHandler storageUpdateHandler = mock(StorageUpdateHandler.class); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index b9ea5c8cf0b0..3d3500cb6b9a 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -380,6 +381,10 @@ void start() throws Exception { ClockService clockService = mock(ClockService.class); when(clockService.current()).thenReturn(clock.current()); + when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> { + HybridTimestamp requestTime = invocation.getArgument(0); + return clock.update(requestTime); + }); OutgoingSnapshotsManager outgoingSnapshotsManager = new OutgoingSnapshotsManager( clusterService.nodeName(), @@ -397,7 +402,8 @@ void start() throws Exception { safeTs, mock(PendingIndependentComparableValuesTracker.class), outgoingSnapshotsManager, - mock(Executor.class) + mock(Executor.class), + clockService ) { @Override public void onWrite(Iterator> iterator) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index 54eb7e8ee5f0..2cf97e583374 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -264,6 +265,10 @@ public void before() { ClockService clockService = mock(ClockService.class); lenient().when(clockService.current()).thenReturn(hybridClock.current()); + lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> { + HybridTimestamp requestTime = invocation.getArgument(0); + return hybridClock.update(requestTime); + }); commandListener = new TablePartitionProcessor( mock(TxManager.class), @@ -592,6 +597,10 @@ void testBuildIndexCommandExitsEarlyOnShouldRelease() { ClockService clockService = mock(ClockService.class); lenient().when(clockService.current()).thenReturn(hybridClock.current()); + lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> { + HybridTimestamp requestTime = invocation.getArgument(0); + return hybridClock.update(requestTime); + }); when(indexUpdateHandler.getNextRowIdToBuildIndex(anyInt())).thenReturn(RowId.lowestRowId(PARTITION_ID)); doNothing().when(indexUpdateHandler).buildIndex(eq(indexId), any(Stream.class), any()); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 816dd602018c..000c34dc4ece 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -915,7 +915,8 @@ private RaftGroupListener getOrCreateAndPopulateRaftGroupListener( safeTimeTracker, storageIndexTracker, mock(PartitionsSnapshots.class, RETURNS_DEEP_STUBS), - partitionOperationsExecutor + partitionOperationsExecutor, + clockServices.get(assignment) ) ); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index dfe814ccef13..d44eb9f82863 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -25,6 +25,7 @@ import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -540,6 +541,10 @@ public void result(@Nullable Serializable r) { HybridClock clock = new HybridClockImpl(); ClockService clockService = mock(ClockService.class); lenient().when(clockService.current()).thenReturn(clock.current()); + lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> { + HybridTimestamp requestTime = invocation.getArgument(0); + return clock.update(requestTime); + }); PendingComparableValuesTracker storageIndexTracker = new PendingComparableValuesTracker<>(0L); var tablePartitionListener = new TablePartitionProcessor( @@ -563,7 +568,8 @@ public void result(@Nullable Serializable r) { safeTime, storageIndexTracker, new NoOpPartitionsSnapshots(), - mock(Executor.class) + mock(Executor.class), + clockService ); zoneRaftListener.addTableProcessor(tableId, tablePartitionListener);