Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/raft/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ dependencies {
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.dropwizard.metrics
integrationTestImplementation libs.disruptor
integrationTestImplementation libs.rocksdb.jni
}

tasks.named('integrationTest').configure {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.raftsnapshot;

import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.ConfigOverride;
import org.apache.ignite.internal.raft.storage.LogStorageManager;
import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.rocksdb.ColumnFamilyOptions;

class ItLogStorageConfigurationTest extends ClusterPerTestIntegrationTest {
private Ignite node;

@Override
protected int initialNodes() {
return 1;
}

@BeforeEach
void prepare() {
node = cluster.node(0);
}

@Test
@ConfigOverride(name = "ignite.system.properties.partitionsRaftLogStorageWriteBufferSize", value = "" + (99L * 1024 * 1024))
void partitionsLogStorageWriteBufferSizeIsTakenFromConfiguration() throws Exception {
LogStorageManager logStorageManager = unwrapIgniteImpl(node).partitionsLogStorageManager();
DefaultLogStorageManager defaultLogStorageManager = (DefaultLogStorageManager) logStorageManager;

@SuppressWarnings("resource")
ColumnFamilyOptions cfOptions = defaultLogStorageManager.dataColumnFamilyHandle().getDescriptor().getOptions();

assertThat(cfOptions.writeBufferSize(), is(99L * 1024 * 1024));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import org.rocksdb.SstFileManager;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.rocksdb.util.SizeUnit;

/** Implementation of the {@link LogStorageManager} that creates {@link RocksDbSharedLogStorage}s. */
public class DefaultLogStorageManager implements LogStorageManager {
Expand All @@ -91,6 +90,8 @@ public class DefaultLogStorageManager implements LogStorageManager {
/** Path to the log storage. */
private final Path logPath;

private final RocksDbLogStorageOptions specificOptions;

/** Executor for shared storages. */
private final ExecutorService executorService;

Expand Down Expand Up @@ -139,22 +140,30 @@ public class DefaultLogStorageManager implements LogStorageManager {
*/
@TestOnly
public DefaultLogStorageManager(Path path) {
this("test", "test", path, true);
this("test", "test", path, true, RocksDbLogStorageOptions.defaults());
}

/**
* Constructor.
*
* @param factoryName Name of the log factory, will be used in logs.
* @param nodeName Node name.
* @param logPath Function to get path to the log storage.
* @param logPath Path to the log storage.
* @param fsync If should fsync after each write to database.
* @param specificOptions Options specific for this implementation.
*/
public DefaultLogStorageManager(String factoryName, String nodeName, Path logPath, boolean fsync) {
public DefaultLogStorageManager(
String factoryName,
String nodeName,
Path logPath,
boolean fsync,
RocksDbLogStorageOptions specificOptions
) {
this.factoryName = factoryName;
this.logPath = logPath;
this.fsync = fsync;
this.nodeName = nodeName;
this.specificOptions = specificOptions;

executorService = Executors.newSingleThreadExecutor(
IgniteThreadFactory.create(nodeName, "raft-shared-log-storage-pool", LOG)
Expand Down Expand Up @@ -374,7 +383,7 @@ protected DBOptions createDbOptions() {
*
* @return Default column family options.
*/
private static ColumnFamilyOptions createColumnFamilyOptions() {
private ColumnFamilyOptions createColumnFamilyOptions() {
var opts = new ColumnFamilyOptions();

if (!Platform.isWindows()) {
Expand All @@ -383,16 +392,21 @@ private static ColumnFamilyOptions createColumnFamilyOptions() {
.optimizeLevelStyleCompaction();
}

opts.setWriteBufferSize(64 * SizeUnit.MB);
long writeBufferSize = specificOptions.writeBufferSize();
int minWriteBufferNumberToMerge = 1;
int level0FileNumCompactionTrigger = 50;

opts.setWriteBufferSize(writeBufferSize);
opts.setMaxWriteBufferNumber(5);
opts.setMinWriteBufferNumberToMerge(1);
opts.setLevel0FileNumCompactionTrigger(50);

opts.setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge);
opts.setLevel0FileNumCompactionTrigger(level0FileNumCompactionTrigger);
opts.setLevel0SlowdownWritesTrigger(100);
opts.setLevel0StopWritesTrigger(200);
// Size of level 0 which is (in stable state) equal to
// WriteBufferSize * MinWriteBufferNumberToMerge * Level0FileNumCompactionTrigger
opts.setMaxBytesForLevelBase(3200 * SizeUnit.MB);
opts.setTargetFileSizeBase(320 * SizeUnit.MB);
opts.setMaxBytesForLevelBase(writeBufferSize * minWriteBufferNumberToMerge * level0FileNumCompactionTrigger);
opts.setTargetFileSizeBase(writeBufferSize * 5);

return opts;
}
Expand Down Expand Up @@ -458,7 +472,7 @@ ColumnFamilyHandle confColumnFamilyHandle() {
}

@TestOnly
ColumnFamilyHandle dataColumnFamilyHandle() {
public ColumnFamilyHandle dataColumnFamilyHandle() {
return dataHandle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.raft.storage.impl;

import org.apache.ignite.internal.configuration.SystemLocalView;
import org.apache.ignite.internal.configuration.SystemPropertyView;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.rocksdb.util.SizeUnit;

/**
* RocksDB-specific options for RocksDB-based log storage.
*/
public class RocksDbLogStorageOptions {
private static final IgniteLogger LOG = Loggers.forClass(RocksDbLogStorageOptions.class);

private static final String PARTITIONS_RAFT_LOG_STORAGE_WRITE_BUFFER_SIZE_PROPERTY_NAME = "partitionsRaftLogStorageWriteBufferSize";

private static final long DEFAULT_WRITE_BUFFER_SIZE = 64 * SizeUnit.MB;

private final long writeBufferSize;

public static RocksDbLogStorageOptions forPartitions(SystemLocalView properties) {
return new RocksDbLogStorageOptions(partitionsRaftLogStorageWriteBufferSize(properties));
}

private static long partitionsRaftLogStorageWriteBufferSize(SystemLocalView properties) {
SystemPropertyView property = properties.properties().get(PARTITIONS_RAFT_LOG_STORAGE_WRITE_BUFFER_SIZE_PROPERTY_NAME);

if (property == null) {
return DEFAULT_WRITE_BUFFER_SIZE;
}

try {
return Long.parseLong(property.propertyValue());
} catch (NumberFormatException e) {
LOG.warn(
"Failed to parse partitions writeBufferSize '{}', default value will be used ({})",
e,
property.propertyValue(),
DEFAULT_WRITE_BUFFER_SIZE
);

return DEFAULT_WRITE_BUFFER_SIZE;
}
}

public static RocksDbLogStorageOptions defaults() {
return new RocksDbLogStorageOptions(DEFAULT_WRITE_BUFFER_SIZE);
}

private RocksDbLogStorageOptions(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

public long writeBufferSize() {
return writeBufferSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.raft.storage.LogStorageManager;
import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageManager;
import org.apache.ignite.internal.raft.storage.impl.RocksDbLogStorageOptions;
import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageManager;
import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
import org.jetbrains.annotations.TestOnly;
Expand Down Expand Up @@ -48,9 +49,28 @@ public static LogStorageManager create(String nodeName, Path logStoragePath) {
* Creates a LogStorageManager with {@link DefaultLogStorageManager} or {@link LogitLogStorageManager} implementation depending on
* LOGIT_STORAGE_ENABLED_PROPERTY.
*/
public static LogStorageManager create(String factoryName, String nodeName, Path logStoragePath, boolean fsync) {
public static LogStorageManager create(
String factoryName,
String nodeName,
Path logStoragePath,
boolean fsync
) {
return create(factoryName, nodeName, logStoragePath, fsync, RocksDbLogStorageOptions.defaults());
}

/**
* Creates a LogStorageManager with {@link DefaultLogStorageManager} or {@link LogitLogStorageManager} implementation depending on
* LOGIT_STORAGE_ENABLED_PROPERTY.
*/
public static LogStorageManager create(
String factoryName,
String nodeName,
Path logStoragePath,
boolean fsync,
RocksDbLogStorageOptions specificOptions
) {
return IgniteSystemProperties.getBoolean(LOGIT_STORAGE_ENABLED_PROPERTY, LOGIT_STORAGE_ENABLED_PROPERTY_DEFAULT)
? new LogitLogStorageManager(nodeName, new StoreOptions(), logStoragePath)
: new DefaultLogStorageManager(factoryName, nodeName, logStoragePath, fsync);
: new DefaultLogStorageManager(factoryName, nodeName, logStoragePath, fsync, specificOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void setUp(TestInfo testInfo) {
logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance());

boolean disableFsync = testInfo.getTestMethod().orElseThrow().isAnnotationPresent(DisableFsync.class);
logStorageManager = new DefaultLogStorageManager("test", "test", workDir, !disableFsync);
logStorageManager = new DefaultLogStorageManager("test", "test", workDir, !disableFsync, RocksDbLogStorageOptions.defaults());

startFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@
import org.apache.ignite.internal.raft.server.impl.GroupStoragesContextResolver;
import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents;
import org.apache.ignite.internal.raft.storage.LogStorageManager;
import org.apache.ignite.internal.raft.storage.impl.RocksDbLogStorageOptions;
import org.apache.ignite.internal.raft.storage.impl.VaultGroupStoragesDestructionIntents;
import org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageManagerCreator;
import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils;
Expand Down Expand Up @@ -662,7 +663,8 @@ public class IgniteImpl implements Ignite {
"table data log",
clusterSvc.nodeName(),
partitionsWorkDir.raftLogPath(),
raftConfiguration.fsync().value()
raftConfiguration.fsync().value(),
RocksDbLogStorageOptions.forPartitions(systemConfiguration.value())
);

LogSyncer partitionsLogSyncer = partitionsLogStorageManager.logSyncer();
Expand Down