diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index bebc9a4407c3..4e7d1b9cea08 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1505,7 +1505,7 @@ private class Node { schemaManager = new SchemaManager(registry, catalogManager); - schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime()); + schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), metaStorageManager.watchExecutor()); metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker); schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 20cc515ff44b..2f7782e189f5 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -1409,4 +1409,8 @@ CompletableFuture sendCompactionCommand(long compactionRevision) { public void markAsStopping() { metaStorageSvcFut.thenAccept(MetaStorageServiceImpl::markAsStopping); } + + public Executor watchExecutor() { + return storage.watchExecutor(); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java index 292de6fe2855..a41a8a092fba 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java @@ -31,6 +31,7 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.apache.ignite.internal.failure.FailureProcessor; @@ -396,4 +397,9 @@ protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() { notifyWatchProcessorEventsBeforeStartingWatches = null; } } + + @Override + public Executor watchExecutor() { + return watchProcessor.watchExecutor(); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index ef9b9887d89a..2afb4861cf44 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.CommandId; @@ -558,4 +559,9 @@ boolean invoke( * @return Future that's completed when flushing of the data is completed. */ CompletableFuture flush(); + + /** + * Returns executor used to execute watches. + */ + Executor watchExecutor(); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java index 4e2691ed0d42..a11b5131b241 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -513,4 +514,8 @@ private static boolean isNotIdempotentCacheCommand(Entry entry) { IDEMPOTENT_COMMAND_PREFIX_BYTES, 0, prefixLength ); } + + Executor watchExecutor() { + return watchExecutor; + } } diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index e5293fd5b420..c4449e28462a 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -639,7 +639,7 @@ public CompletableFuture invoke(Condition condition, Operation success, volatileLogStorageManagerCreator = new VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" + name)); - schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime()); + schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), metaStorageManager.watchExecutor()); metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker); LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index f602726acb68..0d2fdb92f95c 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -601,7 +601,7 @@ public CompletableFuture invoke(Condition condition, List su zoneId -> completedFuture(Set.of()) ); - var schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime()); + var schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), metaStorageMgr.watchExecutor()); metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker); LongSupplier delayDurationMsSupplier = () -> TestIgnitionManager.DEFAULT_DELAY_DURATION_MS; diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index ec77e546c2db..8fd098a1a44d 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -932,7 +932,7 @@ public class IgniteImpl implements Ignite { volatileLogStorageManagerCreator ); - schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime()); + schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), metaStorageMgr.watchExecutor()); metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker); SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java index f05e79c0adaf..9fb4beeada25 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.NodeStoppingException; @@ -39,6 +40,8 @@ public class SchemaSafeTimeTrackerImpl implements SchemaSafeTimeTracker, IgniteComponent, NotificationEnqueuedListener { private final ClusterTime clusterTime; + private final Executor watchExecutor; + private final PendingComparableValuesTracker schemaSafeTime = new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE); @@ -46,8 +49,9 @@ public class SchemaSafeTimeTrackerImpl implements SchemaSafeTimeTracker, IgniteC private final Object futureMutex = new Object(); - public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime) { + public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime, Executor watchExecutor) { this.clusterTime = clusterTime; + this.watchExecutor = watchExecutor; } @Override @@ -74,16 +78,16 @@ public void onEnqueued(CompletableFuture newNotificationFuture, List newNotificationFuture); + newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture.thenComposeAsync(unused -> newNotificationFuture, watchExecutor); } else { // The update does not concern the Catalog (schemas), so we can update schema safe time as soon as previous updates to it // get applied. newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture; } - newSchemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture.thenRun(() -> { + newSchemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture.thenRunAsync(() -> { schemaSafeTime.update(timestamp, null); - }); + }, watchExecutor); schemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture; } diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java index 5942e6ead894..4a7473b656cd 100644 --- a/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java +++ b/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java @@ -25,11 +25,14 @@ import static org.mockito.Mockito.when; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.metastorage.server.time.ClusterTime; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.InjectExecutorService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -37,15 +40,19 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) +@ExtendWith(ExecutorServiceExtension.class) class SchemaSafeTimeTrackerImplTest extends BaseIgniteAbstractTest { @Mock private ClusterTime clusterTime; + @InjectExecutorService + private ExecutorService executor; + private SchemaSafeTimeTrackerImpl tracker; @BeforeEach void createTracker() { - tracker = new SchemaSafeTimeTrackerImpl(clusterTime); + tracker = new SchemaSafeTimeTrackerImpl(clusterTime, executor); } @Test