diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 4a5c2f1d9db2..2d401ca9e075 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -389,6 +389,11 @@ protected void setup() // Prepared statements QueryProcessor.instance.preloadPreparedStatements(); + // Apply overrides before re-enabling auto-compaction + setCompactionStrategyOverrides(Schema.instance.getKeyspaces()); + // re-enable auto-compaction after replay, so correct disk boundaries are used + enableAutoCompaction(Schema.instance.getKeyspaces()); + // start server internals StorageService.instance.registerDaemon(this); try @@ -421,11 +426,6 @@ protected void setup() ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY_MILLIS, TimeUnit.MILLISECONDS); StorageService.instance.doAuthSetup(); - // Apply overrides before re-enabling auto-compaction - setCompactionStrategyOverrides(Schema.instance.getKeyspaces()); - // re-enable auto-compaction after replay, so correct disk boundaries are used - enableAutoCompaction(Schema.instance.getKeyspaces()); - AuditLogManager.instance.initialize(); StorageService.instance.doAutoRepairSetup(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapCompactionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapCompactionTest.java new file mode 100644 index 000000000000..e53e3cef867a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapCompactionTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.ring; + +import java.util.concurrent.Callable; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; + +import org.junit.Test; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; +import org.apache.cassandra.tcm.sequences.SequenceState; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertTrue; + +public class BootstrapCompactionTest extends TestBaseImpl +{ + @Test + public void testCompactionEnabledDuringBootstrap() throws Exception + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = init(builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withInstanceInitializer(BB::install) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start())) + { + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + IInstanceConfig config = cluster.newInstanceConfig() + .set(Constants.KEY_DTEST_FULL_STARTUP, true) + .set("auto_bootstrap", true); + + IInvokableInstance newInstance = cluster.bootstrap(config); + // BB below asserts that autocompaction is enabled at each step in the join sequence + newInstance.startup(cluster); + } + } + + public static class BB + { + public static void install(ClassLoader cl, int i) + { + if (i == 3) + { + new ByteBuddy().rebase(BootstrapAndJoin.class) + .method(named("executeNext")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + public static SequenceState executeNext(@SuperCall Callable zuper) throws Exception + { + boolean isEnabled = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getCompactionStrategyManager().isEnabled(); + assertTrue("Autocompaction should be enabled during the bootstrap", isEnabled); + return zuper.call(); + } + } +} diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java index b52b550187cc..5e744f47584d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java @@ -214,8 +214,9 @@ public void testAutomaticUpgradeConcurrency() throws Exception // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down CountDownLatch latch = new CountDownLatch(1); + CountDownLatch inFindUpgradeSSTables = new CountDownLatch(1); AtomicInteger upgradeTaskCount = new AtomicInteger(0); - MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount); + MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, inFindUpgradeSSTables, upgradeTaskCount); CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock); CompactionStrategyManager mgr = mock.getCompactionStrategyManager(); @@ -224,7 +225,7 @@ public void testAutomaticUpgradeConcurrency() throws Exception // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr)); t.start(); - Thread.sleep(100); // let the thread start and grab the task + inFindUpgradeSSTables.await(); assertEquals(1, CompactionManager.instance.currentlyBackgroundUpgrading.get()); assertFalse(r.maybeRunUpgradeTask(mgr)); assertFalse(r.maybeRunUpgradeTask(mgr)); @@ -246,8 +247,9 @@ public void testAutomaticUpgradeConcurrency2() throws Exception // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down CountDownLatch latch = new CountDownLatch(1); + CountDownLatch inFindUpgradeSSTables = new CountDownLatch(2); AtomicInteger upgradeTaskCount = new AtomicInteger(); - MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount); + MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, inFindUpgradeSSTables, upgradeTaskCount); CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock); CompactionStrategyManager mgr = mock.getCompactionStrategyManager(); @@ -259,7 +261,7 @@ public void testAutomaticUpgradeConcurrency2() throws Exception t.start(); Thread t2 = new Thread(() -> r.maybeRunUpgradeTask(mgr)); t2.start(); - Thread.sleep(100); // let the threads start and grab the task + inFindUpgradeSSTables.await(); assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get()); assertFalse(r.maybeRunUpgradeTask(mgr)); assertFalse(r.maybeRunUpgradeTask(mgr)); @@ -619,18 +621,20 @@ private static class MockCFS extends ColumnFamilyStore private static class MockCFSForCSM extends ColumnFamilyStore { private final CountDownLatch latch; + private final CountDownLatch inFindUpgradeSSTables; private final AtomicInteger upgradeTaskCount; - private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount) + private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, CountDownLatch inFindUpgradeSSTables, AtomicInteger upgradeTaskCount) { super(cfs.keyspace, cfs.name, Util.newSeqGen(10), cfs.metadata.get(), cfs.getDirectories(), true, false); this.latch = latch; + this.inFindUpgradeSSTables = inFindUpgradeSSTables; this.upgradeTaskCount = upgradeTaskCount; } @Override public CompactionStrategyManager getCompactionStrategyManager() { - return new MockCSM(this, latch, upgradeTaskCount); + return new MockCSM(this, latch, inFindUpgradeSSTables, upgradeTaskCount); } } @@ -638,11 +642,13 @@ private static class MockCSM extends CompactionStrategyManager { private final CountDownLatch latch; private final AtomicInteger upgradeTaskCount; + private final CountDownLatch inFindUpgradeSSTables; - private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount) + private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, CountDownLatch inFindUpgradeSSTables, AtomicInteger upgradeTaskCount) { super(cfs); this.latch = latch; + this.inFindUpgradeSSTables = inFindUpgradeSSTables; this.upgradeTaskCount = upgradeTaskCount; } @@ -651,6 +657,7 @@ public AbstractCompactionTask findUpgradeSSTableTask() { try { + inFindUpgradeSSTables.countDown(); latch.await(); upgradeTaskCount.incrementAndGet(); }