Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,7 @@ private class Node {

schemaManager = new SchemaManager(registry, catalogManager);

schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), metaStorageManager.watchExecutor());
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);

schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,4 +1409,8 @@ CompletableFuture<Void> sendCompactionCommand(long compactionRevision) {
public void markAsStopping() {
metaStorageSvcFut.thenAccept(MetaStorageServiceImpl::markAsStopping);
}

public Executor watchExecutor() {
return storage.watchExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.FailureProcessor;
Expand Down Expand Up @@ -396,4 +397,9 @@ protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
notifyWatchProcessorEventsBeforeStartingWatches = null;
}
}

@Override
public Executor watchExecutor() {
return watchProcessor.watchExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.CommandId;
Expand Down Expand Up @@ -558,4 +559,9 @@ boolean invoke(
* @return Future that's completed when flushing of the data is completed.
*/
CompletableFuture<Void> flush();

/**
* Returns executor used to execute watches.
*/
Executor watchExecutor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -513,4 +514,8 @@ private static boolean isNotIdempotentCacheCommand(Entry entry) {
IDEMPOTENT_COMMAND_PREFIX_BYTES, 0, prefixLength
);
}

Executor watchExecutor() {
return watchExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, Operation success,

volatileLogStorageManagerCreator = new VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" + name));

schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), metaStorageManager.watchExecutor());
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);

LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, List<Operation> su
zoneId -> completedFuture(Set.of())
);

var schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
var schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), metaStorageMgr.watchExecutor());
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);

LongSupplier delayDurationMsSupplier = () -> TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ public class IgniteImpl implements Ignite {
volatileLogStorageManagerCreator
);

schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), metaStorageMgr.watchExecutor());
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);

SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
Expand All @@ -39,15 +40,18 @@
public class SchemaSafeTimeTrackerImpl implements SchemaSafeTimeTracker, IgniteComponent, NotificationEnqueuedListener {
private final ClusterTime clusterTime;

private final Executor watchExecutor;

private final PendingComparableValuesTracker<HybridTimestamp, Void> schemaSafeTime =
new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);

private CompletableFuture<Void> schemaSafeTimeUpdateFuture = nullCompletedFuture();

private final Object futureMutex = new Object();

public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime) {
public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime, Executor watchExecutor) {
this.clusterTime = clusterTime;
this.watchExecutor = watchExecutor;
}

@Override
Expand All @@ -74,16 +78,16 @@ public void onEnqueued(CompletableFuture<Void> newNotificationFuture, List<Entry
// The update touches the Catalog (i.e. schemas), so we must chain with the core notification future
// as Catalog listeners will be included in it (because we need to wait for those listeners to finish execution
// before updating the schema safe time).
newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture.thenCompose(unused -> newNotificationFuture);
newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture.thenComposeAsync(unused -> newNotificationFuture, watchExecutor);
} else {
// The update does not concern the Catalog (schemas), so we can update schema safe time as soon as previous updates to it
// get applied.
newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture;
}

newSchemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture.thenRun(() -> {
newSchemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture.thenRunAsync(() -> {
schemaSafeTime.update(timestamp, null);
});
}, watchExecutor);

schemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,34 @@
import static org.mockito.Mockito.when;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
@ExtendWith(ExecutorServiceExtension.class)
class SchemaSafeTimeTrackerImplTest extends BaseIgniteAbstractTest {
@Mock
private ClusterTime clusterTime;

@InjectExecutorService
private ExecutorService executor;

private SchemaSafeTimeTrackerImpl tracker;

@BeforeEach
void createTracker() {
tracker = new SchemaSafeTimeTrackerImpl(clusterTime);
tracker = new SchemaSafeTimeTrackerImpl(clusterTime, executor);
}

@Test
Expand Down