Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ public interface ClockService {
*/
HybridTimestamp updateClock(HybridTimestamp requestTime);

/**
* Advances the clock in accordance with the request time. If the request time is ahead of the clock,
* the clock is advanced to the tick that is next to the request time; otherwise, it's advanced to the tick
* that is next to the local time.
*
* @param requestTime Timestamp from request.
* @param checkClockSkew Whether to check for clock skew and log warnings if exceeded.
* @return New local hybrid timestamp that is on the clock (it is ahead of both the old clock time and the request time).
*/
HybridTimestamp updateClock(HybridTimestamp requestTime, boolean checkClockSkew);

/**
* Wait for the clock to reach the given timestamp.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ public HybridTimestamp updateClock(HybridTimestamp requestTime) {
currentLocalTimestamp, maxClockSkewMillis());
onMaxClockSkewExceededClosure.accept(requestTimePhysical - currentLocalTimePhysical);
}

return clock.update(requestTime);
}

@Override
public HybridTimestamp updateClock(HybridTimestamp requestTime, boolean checkClockSkew) {
return checkClockSkew ? updateClock(requestTime) : clock.update(requestTime);
}

@Override
public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
return clockWaiter.waitFor(targetTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public HybridTimestamp updateClock(HybridTimestamp requestTime) {
return clock.update(requestTime);
}

@Override
public HybridTimestamp updateClock(HybridTimestamp requestTime, boolean checkClockSkew) {
return clock.update(requestTime);
}

@Override
public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
if (clockWaiter == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
Expand Down Expand Up @@ -182,6 +183,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends IgniteAbstractTest {

private final HybridClock clock = new HybridClockImpl();

private final ClockService clockService = new TestClockService(clock);

private final Map<Integer, MockMvPartitionStorage> storagesByTableId = new HashMap<>();

private static class MockMvPartitionStorage {
Expand Down Expand Up @@ -336,7 +339,8 @@ private RaftGroupService startRaftGroupNode(int... tableIds) throws NodeStopping
new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE),
new PendingComparableValuesTracker<>(0L),
outgoingSnapshotsManager,
executor
executor,
clockService
);

for (int tableId : tableIds) {
Expand Down Expand Up @@ -392,6 +396,10 @@ private RaftTableProcessor createTableProcessor(int tableId) {

ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(clock.current());
lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> {
HybridTimestamp requestTime = invocation.getArgument(0);
return clock.update(requestTime);
});

return new TablePartitionProcessor(
txManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ public PartitionReplicaLifecycleManager(
catalogService,
failureProcessor,
partitionOperationsExecutor,
replicaMgr
replicaMgr,
clockService
),
metricManager,
messagingService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
Expand Down Expand Up @@ -71,6 +72,8 @@ public class ZoneResourcesManager implements ManuallyCloseable {

private final ReplicaManager replicaManager;

private final ClockService clockService;

private final RaftSnapshotsMetricsSource snapshotsMetricsSource = new RaftSnapshotsMetricsSource();

/** Map from zone IDs to their resource holders. */
Expand All @@ -86,7 +89,8 @@ public class ZoneResourcesManager implements ManuallyCloseable {
CatalogService catalogService,
FailureProcessor failureProcessor,
Executor partitionOperationsExecutor,
ReplicaManager replicaManager
ReplicaManager replicaManager,
ClockService clockService
) {
this.sharedTxStateStorage = sharedTxStateStorage;
this.txManager = txManager;
Expand All @@ -96,6 +100,7 @@ public class ZoneResourcesManager implements ManuallyCloseable {
this.failureProcessor = failureProcessor;
this.partitionOperationsExecutor = partitionOperationsExecutor;
this.replicaManager = replicaManager;
this.clockService = clockService;
}

ZonePartitionResources allocateZonePartitionResources(
Expand All @@ -119,7 +124,8 @@ ZonePartitionResources allocateZonePartitionResources(
safeTimeTracker,
storageIndexTracker,
outgoingSnapshotsManager,
partitionOperationsExecutor
partitionOperationsExecutor,
clockService
);

var snapshotStorage = new PartitionSnapshotStorage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class ZonePartitionRaftListener implements RaftGroupListener {

private final RaftGroupConfigurationConverter raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();

private final ClockService clockService;

/** Constructor. */
public ZonePartitionRaftListener(
ZonePartitionId zonePartitionId,
Expand All @@ -119,13 +122,15 @@ public ZonePartitionRaftListener(
SafeTimeValuesTracker safeTimeTracker,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
PartitionsSnapshots partitionsSnapshots,
Executor partitionOperationsExecutor
Executor partitionOperationsExecutor,
ClockService clockService
) {
this.safeTimeTracker = safeTimeTracker;
this.storageIndexTracker = storageIndexTracker;
this.partitionsSnapshots = partitionsSnapshots;
this.txStateStorage = txStatePartitionStorage;
this.partitionKey = new PartitionKey(zonePartitionId.zoneId(), zonePartitionId.partitionId());
this.clockService = clockService;

onSnapshotSaveHandler = new OnSnapshotSaveHandler(txStatePartitionStorage, partitionOperationsExecutor);

Expand Down Expand Up @@ -261,6 +266,7 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) {
// Adjust safe time before completing update to reduce waiting.
if (safeTimestamp != null) {
try {
clockService.updateClock(safeTimestamp, false);
safeTimeTracker.update(safeTimestamp, commandIndex, commandTerm, command);
} catch (TrackerClosedException ignored) {
// Ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ void setUp(
catalogService,
failureManager,
executorService,
replicaManager
replicaManager,
mock(ClockService.class)
) {
@Override
protected TxStateStorage createTxStateStorage(int zoneId, int partitionCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.manager.ComponentContext;
Expand Down Expand Up @@ -96,7 +97,8 @@ void init(
catalogService,
mock(FailureProcessor.class),
executor,
replicaManager
replicaManager,
mock(ClockService.class)
);
assertThat(sharedStorage.startAsync(new ComponentContext()), willCompleteSuccessfully());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,21 @@ void setUp() {
}

private ZonePartitionRaftListener createListener() {
HybridClock clock = mock(HybridClock.class);
ClockService clockService = mock(ClockService.class);
lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> {
HybridTimestamp requestTime = invocation.getArgument(0);
return clock.update(requestTime);
});
return new ZonePartitionRaftListener(
new ZonePartitionId(ZONE_ID, PARTITION_ID),
txStatePartitionStorage,
txManager,
safeTimeTracker,
storageIndexTracker,
outgoingSnapshotsManager,
executor
executor,
clockService
);
}

Expand Down Expand Up @@ -950,6 +957,10 @@ private TablePartitionProcessor partitionListener(int tableId) {

ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(clock.current());
lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> {
HybridTimestamp requestTime = invocation.getArgument(0);
return clock.update(requestTime);
});

StorageUpdateHandler storageUpdateHandler = mock(StorageUpdateHandler.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -380,6 +381,10 @@ void start() throws Exception {

ClockService clockService = mock(ClockService.class);
when(clockService.current()).thenReturn(clock.current());
when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> {
HybridTimestamp requestTime = invocation.getArgument(0);
return clock.update(requestTime);
});

OutgoingSnapshotsManager outgoingSnapshotsManager = new OutgoingSnapshotsManager(
clusterService.nodeName(),
Expand All @@ -397,7 +402,8 @@ void start() throws Exception {
safeTs,
mock(PendingIndependentComparableValuesTracker.class),
outgoingSnapshotsManager,
mock(Executor.class)
mock(Executor.class),
clockService
) {
@Override
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -264,6 +265,10 @@ public void before() {

ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(hybridClock.current());
lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> {
HybridTimestamp requestTime = invocation.getArgument(0);
return hybridClock.update(requestTime);
});

commandListener = new TablePartitionProcessor(
mock(TxManager.class),
Expand Down Expand Up @@ -592,6 +597,10 @@ void testBuildIndexCommandExitsEarlyOnShouldRelease() {

ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(hybridClock.current());
lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> {
HybridTimestamp requestTime = invocation.getArgument(0);
return hybridClock.update(requestTime);
});

when(indexUpdateHandler.getNextRowIdToBuildIndex(anyInt())).thenReturn(RowId.lowestRowId(PARTITION_ID));
doNothing().when(indexUpdateHandler).buildIndex(eq(indexId), any(Stream.class), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,8 @@ private RaftGroupListener getOrCreateAndPopulateRaftGroupListener(
safeTimeTracker,
storageIndexTracker,
mock(PartitionsSnapshots.class, RETURNS_DEEP_STUBS),
partitionOperationsExecutor
partitionOperationsExecutor,
clockServices.get(assignment)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -540,6 +541,10 @@ public void result(@Nullable Serializable r) {
HybridClock clock = new HybridClockImpl();
ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(clock.current());
lenient().when(clockService.updateClock(any(), anyBoolean())).thenAnswer(invocation -> {
HybridTimestamp requestTime = invocation.getArgument(0);
return clock.update(requestTime);
});

PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<>(0L);
var tablePartitionListener = new TablePartitionProcessor(
Expand All @@ -563,7 +568,8 @@ public void result(@Nullable Serializable r) {
safeTime,
storageIndexTracker,
new NoOpPartitionsSnapshots(),
mock(Executor.class)
mock(Executor.class),
clockService
);

zoneRaftListener.addTableProcessor(tableId, tablePartitionListener);
Expand Down