Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void setup() throws IOException {
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
filePath = generateTempFilePath();
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
RustBridge.write(filePath, writerWriteBenchmarkData.getArrowArray().memoryAddress(), writerWriteBenchmarkData.getArrowSchema().memoryAddress());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void tearDown() throws IOException {
@Benchmark
public void benchmarkCreate() throws IOException {
// This is what we're benchmarking - just writer creation
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
}

private String generateTempFilePath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void setup() throws IOException {
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
filePath = generateTempFilePath();
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
RustBridge.createWriter(filePath, "benchmark-index", writerCreationBenchmarkData.getArrowSchema().memoryAddress());
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.engine.DataFormatPlugin;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import com.parquet.parquetdataformat.bridge.RustBridge;
Expand Down Expand Up @@ -78,22 +77,13 @@
* <li>Memory management via {@link com.parquet.parquetdataformat.memory} package</li>
* </ul>
*/
public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin, DataSourcePlugin {
public class ParquetDataFormatPlugin extends Plugin implements DataSourcePlugin {
private Settings settings;

public static String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";

public static final Setting<String> INDEX_MAX_NATIVE_ALLOCATION = Setting.simpleString(
"index.parquet.max_native_allocation",
DEFAULT_MAX_NATIVE_ALLOCATION,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath, indexSettings);
}

@Override
Expand Down Expand Up @@ -148,7 +138,16 @@ public BlobContainer createBlobContainer(BlobStore blobStore, BlobPath baseBlobP

@Override
public List<Setting<?>> getSettings() {
return List.of(INDEX_MAX_NATIVE_ALLOCATION);
return List.of(
ParquetSettings.MAX_NATIVE_ALLOCATION,
ParquetSettings.PARQUET_SETTINGS,
ParquetSettings.ROW_GROUP_SIZE_BYTES,
ParquetSettings.PAGE_SIZE_BYTES,
ParquetSettings.PAGE_ROW_LIMIT,
ParquetSettings.DICT_SIZE_BYTES,
ParquetSettings.COMPRESSION_TYPE,
ParquetSettings.COMPRESSION_LEVEL
);
}

// for testing locally only
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package com.parquet.parquetdataformat;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

import static org.opensearch.common.settings.WriteableSetting.SettingType.ByteSizeValue;

/**
* Settings for Parquet data format.
*/
public final class ParquetSettings {

private ParquetSettings() {}

public static final String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";

public static final Setting<Settings> PARQUET_SETTINGS = Setting.groupSetting(
"index.parquet.",
Setting.Property.IndexScope
);

public static final Setting<ByteSizeValue> ROW_GROUP_SIZE_BYTES = Setting.byteSizeSetting(
"index.parquet.row_group_size_bytes",
new ByteSizeValue(128, ByteSizeUnit.MB),
Setting.Property.IndexScope
);

public static final Setting<ByteSizeValue> PAGE_SIZE_BYTES = Setting.byteSizeSetting(
"index.parquet.page_size_bytes",
new ByteSizeValue(1, ByteSizeUnit.MB),
Setting.Property.IndexScope
);

public static final Setting<Integer> PAGE_ROW_LIMIT = Setting.intSetting(
"index.parquet.page_row_limit",
20000,
1,
Setting.Property.IndexScope
);

public static final Setting<ByteSizeValue> DICT_SIZE_BYTES = Setting.byteSizeSetting(
"index.parquet.dict_size_bytes",
new ByteSizeValue(2, ByteSizeUnit.MB),
Setting.Property.IndexScope
);

public static final Setting<String> COMPRESSION_TYPE = Setting.simpleString(
"index.parquet.compression_type",
"ZSTD",
Setting.Property.IndexScope
);

public static final Setting<Integer> COMPRESSION_LEVEL = Setting.intSetting(
"index.parquet.compression_level",
2,
1,
9,
Setting.Property.IndexScope
);

public static final Setting<String> MAX_NATIVE_ALLOCATION = Setting.simpleString(
"index.parquet.max_native_allocation",
DEFAULT_MAX_NATIVE_ALLOCATION,
Setting.Property.NodeScope
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package com.parquet.parquetdataformat.bridge;

/**
* Field-level configuration that overrides index-level defaults.
* Null values inherit from index-level configuration.
*/
public class FieldConfig {

private Integer compressionLevel;
private String compressionType;

public Integer getCompressionLevel() { return compressionLevel; }
public void setCompressionLevel(Integer compressionLevel) { this.compressionLevel = compressionLevel; }

public String getCompressionType() { return compressionType; }
public void setCompressionType(String compressionType) { this.compressionType = compressionType; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public class NativeParquetWriter implements Closeable {
* @param schemaAddress Arrow C Data Interface schema pointer
* @throws IOException if writer creation fails
*/
public NativeParquetWriter(String filePath, long schemaAddress) throws IOException {
public NativeParquetWriter(String filePath, String indexName, long schemaAddress) throws IOException {
this.filePath = filePath;
RustBridge.createWriter(filePath, schemaAddress);
RustBridge.createWriter(filePath, indexName, schemaAddress);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package com.parquet.parquetdataformat.bridge;

import java.util.HashMap;
import java.util.Map;

/**
* Native settings passed to the Rust layer, covering writer and future merge configurations.
* Field-level configs override these defaults.
*/
public class NativeSettings {

// Index identifier
private String indexName;

// Index-level defaults
private String compressionType;
private Integer compressionLevel;
private Long pageSizeBytes;
private Integer pageRowLimit;
private Long dictSizeBytes;
private Long rowGroupSizeBytes;

// Field-level overrides
private Map<String, FieldConfig> fieldConfigs;
private Map<String, Object> customSettings;

public String getIndexName() { return indexName; }
public void setIndexName(String indexName) { this.indexName = indexName; }

public String getCompressionType() { return compressionType; }
public void setCompressionType(String compressionType) { this.compressionType = compressionType; }

public Integer getCompressionLevel() { return compressionLevel; }
public void setCompressionLevel(Integer compressionLevel) { this.compressionLevel = compressionLevel; }

public Long getPageSizeBytes() { return pageSizeBytes; }
public void setPageSizeBytes(Long pageSizeBytes) { this.pageSizeBytes = pageSizeBytes; }

public Integer getPageRowLimit() { return pageRowLimit; }
public void setPageRowLimit(Integer pageRowLimit) { this.pageRowLimit = pageRowLimit; }

public Long getDictSizeBytes() { return dictSizeBytes; }
public void setDictSizeBytes(Long dictSizeBytes) { this.dictSizeBytes = dictSizeBytes; }

public Long getRowGroupSizeBytes() { return rowGroupSizeBytes; }
public void setRowGroupSizeBytes(Long rowGroupSizeBytes) { this.rowGroupSizeBytes = rowGroupSizeBytes; }

public Map<String, FieldConfig> getFieldConfigs() { return fieldConfigs; }
public void setFieldConfigs(Map<String, FieldConfig> fieldConfigs) { this.fieldConfigs = fieldConfigs; }

public void addFieldConfig(String fieldName, FieldConfig config) {
if (fieldConfigs == null) fieldConfigs = new HashMap<>();
fieldConfigs.put(fieldName, config);
}

public Map<String, Object> getCustomSettings() { return customSettings; }
public void setCustomSettings(Map<String, Object> customSettings) { this.customSettings = customSettings; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ public class RustBridge {
public static native void initLogger();

// Enhanced native methods that handle validation and provide better error reporting
public static native void createWriter(String file, long schemaAddress) throws IOException;
public static native void createWriter(String file, String indexName, long schemaAddress) throws IOException;
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
public static native ParquetFileMetadata closeWriter(String file) throws IOException;
public static native void flushToDisk(String file) throws IOException;
public static native ParquetFileMetadata getFileMetadata(String file) throws IOException;
public static native void onSettingsUpdate(NativeSettings nativeSettings) throws IOException;

public static native long getFilteredNativeBytesUsed(String pathPrefix);


// Native method declarations - these will be implemented in the JNI library
public static native void mergeParquetFilesInRust(List<Path> inputFiles, String outputFile);
public static native void mergeParquetFilesInRust(List<Path> inputFiles, String outputFile, String indexName);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.parquet.parquetdataformat.engine;

import com.parquet.parquetdataformat.ParquetSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.engine.exec.DataFormat;
Expand All @@ -24,9 +25,10 @@
* the system to recognize and utilize Parquet-based storage operations.
*/
public class ParquetDataFormat implements DataFormat {

@Override
public Setting<Settings> dataFormatSettings() {
return null;
return ParquetSettings.PARQUET_SETTINGS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.parquet.parquetdataformat.engine;

import com.parquet.parquetdataformat.ParquetSettings;
import com.parquet.parquetdataformat.bridge.RustBridge;
import com.parquet.parquetdataformat.bridge.NativeSettings;
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
import com.parquet.parquetdataformat.merge.CompactionStrategy;
import com.parquet.parquetdataformat.merge.ParquetMergeExecutor;
Expand All @@ -11,6 +13,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import org.opensearch.index.engine.exec.Merger;
Expand Down Expand Up @@ -69,13 +72,52 @@ public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDa

private final Supplier<Schema> schema;
private final ShardPath shardPath;
private final ParquetMerger parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH);
private final ParquetMerger parquetMerger;
private final ArrowBufferPool arrowBufferPool;

public ParquetExecutionEngine(Settings settings, Supplier<Schema> schema, ShardPath shardPath) {
private final IndexSettings indexSettings;

public ParquetExecutionEngine(
Settings settings,
Supplier<Schema> schema,
ShardPath shardPath,
IndexSettings indexSettings
) {
this.schema = schema;
this.shardPath = shardPath;
this.arrowBufferPool = new ArrowBufferPool(settings);
this.indexSettings = indexSettings;
this.parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH, indexSettings.getIndex().getName());

// Push current settings to Rust store once on construction, then keep in sync on updates
pushSettingsToRust(indexSettings);

//When we make the settings as dynamic enable the below code
// indexSettings.getScopedSettings().addSettingsUpdateConsumer(
// ignored -> pushSettingsToRust(indexSettings),
// List.of(
// ParquetSettings.COMPRESSION_TYPE,
// ParquetSettings.COMPRESSION_LEVEL,
// ParquetSettings.PAGE_SIZE_BYTES,
// ParquetSettings.PAGE_ROW_LIMIT,
// ParquetSettings.DICT_SIZE_BYTES
// )
// );
}

private void pushSettingsToRust(IndexSettings indexSettings) {
NativeSettings config = new NativeSettings();
config.setIndexName(indexSettings.getIndex().getName());
config.setCompressionType(indexSettings.getValue(ParquetSettings.COMPRESSION_TYPE));
config.setCompressionLevel(indexSettings.getValue(ParquetSettings.COMPRESSION_LEVEL));
config.setPageSizeBytes(indexSettings.getValue(ParquetSettings.PAGE_SIZE_BYTES).getBytes());
config.setPageRowLimit(indexSettings.getValue(ParquetSettings.PAGE_ROW_LIMIT));
config.setDictSizeBytes(indexSettings.getValue(ParquetSettings.DICT_SIZE_BYTES).getBytes());
config.setRowGroupSizeBytes(indexSettings.getValue(ParquetSettings.ROW_GROUP_SIZE_BYTES).getBytes());
try {
RustBridge.onSettingsUpdate(config);
} catch (Exception e) {
logger.error("Failed to push Parquet settings to Rust store", e);
}
}

@Override
Expand Down Expand Up @@ -108,7 +150,7 @@ public List<String> supportedFieldTypes() {
@Override
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) {
String fileName = Path.of(shardPath.getDataPath().toString(), getDataFormat().name(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool);
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.parquet.parquetdataformat.memory;

import com.parquet.parquetdataformat.ParquetDataFormatPlugin;
import com.parquet.parquetdataformat.ParquetSettings;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void close() {

private static long getMaxAllocationInBytes(Settings settings) {
long totalAvailableSystemMemory = OsProbe.getInstance().getTotalPhysicalMemorySize() - JvmInfo.jvmInfo().getConfiguredMaxHeapSize();
RatioValue maxAllocationPercentage = RatioValue.parseRatioValue(settings.get(ParquetDataFormatPlugin.INDEX_MAX_NATIVE_ALLOCATION.getKey(), ParquetDataFormatPlugin.DEFAULT_MAX_NATIVE_ALLOCATION));
RatioValue maxAllocationPercentage = RatioValue.parseRatioValue(settings.get(ParquetSettings.MAX_NATIVE_ALLOCATION.getKey(), ParquetSettings.DEFAULT_MAX_NATIVE_ALLOCATION));
return (long) (totalAvailableSystemMemory * maxAllocationPercentage.getAsRatio());
}
}
Loading
Loading