diff --git a/actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java b/actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java index aab02e45ab7..77cd2b9ffbc 100644 --- a/actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java +++ b/actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java @@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandles; import java.lang.reflect.Field; import java.nio.ByteBuffer; +import org.apache.pekko.annotation.InternalStableApi; /** * Cleans a direct {@link ByteBuffer}. Without manual intervention, direct ByteBuffers will be @@ -36,7 +37,8 @@ * *
See JDK-4724038 */ -final class ByteBufferCleaner { +@InternalStableApi +public final class ByteBufferCleaner { // adapted from // https://github.com/apache/commons-io/blob/441115a4b5cd63ae808dd4c40fc238cb52c8048f/src/main/java/org/apache/commons/io/input/ByteBufferCleaner.java @@ -75,7 +77,7 @@ public void clean(final ByteBuffer buffer) throws Throwable { * @param buffer to release. * @throws IllegalStateException on internal failure. */ - static void clean(final ByteBuffer buffer) { + public static void clean(final ByteBuffer buffer) { try { INSTANCE.clean(buffer); } catch (final Throwable t) { @@ -116,7 +118,7 @@ private static Cleaner getCleaner() { * * @return {@code true} if cleaning is supported, {@code false} otherwise. */ - static boolean isSupported() { + public static boolean isSupported() { return INSTANCE != null; } } diff --git a/docs/src/main/paradox/stream/operators/Compression/zstd.md b/docs/src/main/paradox/stream/operators/Compression/zstd.md new file mode 100644 index 00000000000..1d5e635a9e2 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Compression/zstd.md @@ -0,0 +1,30 @@ +# Compression.zstd + +Creates a flow that zstd-compresses a stream of ByteStrings. + +@ref[Compression operators](../index.md#compression-operators) + +## Signature + +@apidoc[Compression.zstd](stream.*.Compression$) { scala="#zstd:org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstd()" } + +## Description + +Creates a flow that zstd-compresses a stream of ByteStrings. Note that the compressor +will SYNC_FLUSH after every @apidoc[util.ByteString] so that it is guaranteed that every @apidoc[util.ByteString] +coming out of the flow can be fully decompressed without waiting for additional data. This may +come at a compression performance cost for very small chunks. + +Use the overload method to control the compression level. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the compression algorithm produces output for the received `ByteString` + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/docs/src/main/paradox/stream/operators/Compression/zstdDecompress.md b/docs/src/main/paradox/stream/operators/Compression/zstdDecompress.md new file mode 100644 index 00000000000..df8f9b571c8 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Compression/zstdDecompress.md @@ -0,0 +1,26 @@ +# Compression.zstdDecompress + +Creates a flow that zstd-decompresses a stream of ByteStrings. + +@ref[Compression operators](../index.md#compression-operators) + +## Signature + +@apidoc[Compression.zstdDecompress](stream.*.Compression$) { scala="#zstdDecompress(maxBytesPerChunk:Int):org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstdDecompress(int)" } + +## Description + +Creates a flow that zstd-decompresses a stream of ByteStrings. If the input is truncated, uses invalid +compression method or is invalid (failed CRC checks) this operator fails with a `com.github.luben.zstd.ZstdIOException`. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the decompression algorithm produces output for the received `ByteString` (the emitted `ByteString` is of `maxBytesPerChunk` maximum length) + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ed101d1b19c..d2e2a7907cd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -352,7 +352,8 @@ object Dependencies { // pekko stream - lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest) + lazy val stream = + l ++= Seq[sbt.ModuleID](reactiveStreams, "com.github.luben" % "zstd-jni" % "1.5.7-6", TestDependencies.scalatest) lazy val streamTestkit = l ++= Seq( TestDependencies.scalatest, diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala index af7191b0d78..abbf778fbc7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala @@ -15,11 +15,11 @@ package org.apache.pekko.stream.io.compression import java.io.{ ByteArrayOutputStream, InputStream, OutputStream } import java.util.concurrent.ThreadLocalRandom -import java.util.zip.DataFormatException import scala.annotation.tailrec import scala.concurrent.Await import scala.concurrent.duration._ +import scala.reflect.ClassTag import scala.util.control.NoStackTrace import org.apache.pekko @@ -31,7 +31,8 @@ import pekko.util.ByteString import org.scalatest.Inspectors import org.scalatest.wordspec.AnyWordSpec -abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSupport with Inspectors { +abstract class CoderSpec[CorruptInputException: ClassTag](codecName: String) extends AnyWordSpec with CodecSpecSupport + with Inspectors { import CompressionTestingTools._ protected def newCompressor(): Compressor @@ -85,7 +86,7 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu "throw an error on corrupt input" in { (the[RuntimeException] thrownBy { ourDecode(corruptContent) - }).ultimateCause should be(a[DataFormatException]) + }).ultimateCause should be(a[CorruptInputException]) } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala index 90a7f3551b8..d63b29ea923 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala @@ -21,7 +21,7 @@ import pekko.stream.impl.io.compression.{ Compressor, DeflateCompressor } import pekko.stream.scaladsl.{ Compression, Flow } import pekko.util.ByteString -class DeflateSpec extends CoderSpec("deflate") { +class DeflateSpec extends CoderSpec[DataFormatException]("deflate") { import CompressionTestingTools._ protected def newCompressor(): Compressor = new DeflateCompressor diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala index 177bbc0974e..19d56902a70 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala @@ -15,14 +15,14 @@ package org.apache.pekko.stream.io.compression import java.io.{ InputStream, OutputStream } import java.nio.charset.StandardCharsets -import java.util.zip.{ GZIPInputStream, GZIPOutputStream, ZipException } +import java.util.zip.{ DataFormatException, GZIPInputStream, GZIPOutputStream, ZipException } import org.apache.pekko import pekko.stream.impl.io.compression.{ Compressor, GzipCompressor } import pekko.stream.scaladsl.{ Compression, Flow } import pekko.util.ByteString -class GzipSpec extends CoderSpec("gzip") { +class GzipSpec extends CoderSpec[DataFormatException]("gzip") { import CompressionTestingTools._ protected def newCompressor(): Compressor = new GzipCompressor diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdAutoFlushSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdAutoFlushSpec.scala new file mode 100644 index 00000000000..285130abe73 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdAutoFlushSpec.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.io.compression + +import org.apache.pekko +import pekko.stream.scaladsl.{ Compression, Flow } +import pekko.util.ByteString + +class ZstdAutoFlushSpec extends ZstdSpec { + override protected val encoderFlow: Flow[ByteString, ByteString, Any] = + Compression.zstd(Compression.ZstdDefaultCompressionLevel, dictionary = None, autoFlush = false) + + override protected val autoFlush: Boolean = false + +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdSpec.scala new file mode 100644 index 00000000000..f2368d1b158 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdSpec.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.io.compression + +import com.github.luben.zstd.{ ZstdIOException, ZstdInputStream, ZstdOutputStream } + +import org.apache.pekko +import pekko.stream.impl.io.compression.{ Compressor, ZstdCompressor } +import pekko.stream.scaladsl.{ Compression, Flow } +import pekko.util.ByteString + +import java.io.{ InputStream, OutputStream } + +class ZstdSpec extends CoderSpec[ZstdIOException]("zstd") { + import CompressionTestingTools._ + + override protected def newCompressor(): Compressor = new ZstdCompressor + + override protected def encoderFlow: Flow[ByteString, ByteString, Any] = Compression.zstd + + override protected def decoderFlow(maxBytesPerChunk: Int): Flow[ByteString, ByteString, Any] = + Compression.zstdDecompress(maxBytesPerChunk) + + override protected def newDecodedInputStream(underlying: InputStream): InputStream = + new ZstdInputStream(underlying) + + override protected def newEncodedOutputStream(underlying: OutputStream): OutputStream = + new ZstdOutputStream(underlying) + + override def extraTests(): Unit = { + "decode concatenated compressions" in { + ourDecode(Seq(encode("Hello, "), encode("dear "), encode("User!")).join) should readAs("Hello, dear User!") + } + } +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/CompressionSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/CompressionSpec.scala index 0b358da9f4c..41f5af5a234 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/CompressionSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/CompressionSpec.scala @@ -16,8 +16,7 @@ package org.apache.pekko.stream.scaladsl import java.nio.charset.StandardCharsets import org.apache.pekko -import pekko.stream.impl.io.compression.DeflateCompressor -import pekko.stream.impl.io.compression.GzipCompressor +import pekko.stream.impl.io.compression.{ DeflateCompressor, GzipCompressor, ZstdCompressor } import pekko.stream.testkit.StreamSpec import pekko.util.ByteString @@ -27,6 +26,8 @@ class CompressionSpec extends StreamSpec { def deflate(s: String): ByteString = new DeflateCompressor().compressAndFinish(ByteString(s)) + def zstd(s: String): ByteString = new ZstdCompressor().compressAndFinish(ByteString(s)) + val data = "hello world" "Gzip decompression" must { @@ -47,4 +48,14 @@ class CompressionSpec extends StreamSpec { res.futureValue should ===(data) } } + + "Zstd decompression" must { + "be able to decompress a zstd stream" in { + val source = + Source.single(zstd(data)).via(Compression.zstdDecompress()).map(_.decodeString(StandardCharsets.UTF_8)) + + val res = source.runFold("")(_ + _) + res.futureValue should ===(data) + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdCompressor.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdCompressor.scala new file mode 100644 index 00000000000..41d39473b3b --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdCompressor.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.io.compression + +import java.nio.ByteBuffer +import com.github.luben.zstd.{ ZstdDictCompress, ZstdDirectBufferCompressingStreamNoFinalizer } +import org.apache.pekko +import org.apache.pekko.stream.scaladsl.Compression +import pekko.annotation.InternalApi +import pekko.util.ByteString + +/** INTERNAL API */ +@InternalApi private[pekko] class ZstdCompressor( + compressionLevel: Int = + Compression.ZstdDefaultCompressionLevel, dictionary: Option[ZstdDictionaryImpl] = None) extends Compressor { + + private val targetBuffer = ByteBuffer.allocateDirect(65536) + private val compressingStream = new ZstdDirectBufferCompressingStreamNoFinalizer(targetBuffer, compressionLevel) + + dictionary.foreach { dict => + (dict.level, dict.length, dict.offset) match { + case (None, None, None) => + compressingStream.setDict(dict.dictionary) + case (Some(dictLevel), None, None) => + compressingStream.setDict(new ZstdDictCompress(dict.dictionary, dictLevel)) + case (Some(dictLevel), Some(dictLength), Some(dictOffset)) => + compressingStream.setDict(new ZstdDictCompress(dict.dictionary, dictLevel, dictLength, dictOffset)) + case _ => + throw new IllegalArgumentException("Invalid combination of ZstdDictionary parameters") + } + } + + override def compress(input: ByteString): ByteString = { + val inputBB = ByteBuffer.allocateDirect(input.size) + inputBB.put(input.toArrayUnsafe()) + inputBB.flip() + compressingStream.compress(inputBB) + val result = ByteString.fromByteBuffer(targetBuffer) + targetBuffer.flip() + result + } + + override def flush(): ByteString = { + targetBuffer.flip() + val result = ByteString.fromByteBuffer(targetBuffer) + targetBuffer.clear() + compressingStream.flush() + result + } + + override def finish(): ByteString = { + compressingStream.close() + targetBuffer.flip() + val arr = Array.ofDim[Byte](targetBuffer.limit()) + targetBuffer.get(arr) + val result = ByteString.fromArrayUnsafe(arr) + result + } + + override def compressAndFlush(input: ByteString): ByteString = { + val inputBB = ByteBuffer.allocateDirect(input.size) + inputBB.put(input.toArrayUnsafe()) + inputBB.flip() + compressingStream.compress(inputBB) + compressingStream.flush() + targetBuffer.flip() + + val arr = new Array[Byte](targetBuffer.limit()) + targetBuffer.get(arr) + targetBuffer.clear() + ByteString.fromArrayUnsafe(arr) + } + + override def compressAndFinish(input: ByteString): ByteString = { + val inputBB = ByteBuffer.allocateDirect(input.size) + inputBB.put(input.toArrayUnsafe()) + inputBB.flip() + compressingStream.compress(inputBB) + compressingStream.close() + targetBuffer.flip() + + val arr = new Array[Byte](targetBuffer.limit()) + targetBuffer.get(arr) + ByteString.fromArrayUnsafe(arr) + } + + override def close(): Unit = compressingStream.close() +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala new file mode 100644 index 00000000000..768e081d3b7 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.io.compression + +import java.nio.ByteBuffer + +import scala.util.control.NonFatal + +import com.github.luben.zstd.{ Zstd, ZstdDirectBufferDecompressingStreamNoFinalizer } + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.io.ByteBufferCleaner +import pekko.stream.Attributes +import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import pekko.stream.stage.{ GraphStageLogic, InHandler, OutHandler } +import pekko.util.ByteString + +/** INTERNAL API */ +@InternalApi private[pekko] class ZstdDecompressor(maxBytesPerChunk: Int = + Zstd.blockSizeMax()) extends SimpleLinearGraphStage[ByteString] { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + new GraphStageLogic(shape) with InHandler with OutHandler { + + private val sourceBuffer = ByteBuffer.allocateDirect(maxBytesPerChunk) + + // This is initialized here to avoid an allocation per onPush, remember to clear before using + private val outputBuffer = ByteBuffer.allocateDirect(maxBytesPerChunk) + private val decompressingStream = new ZstdDirectBufferDecompressingStreamNoFinalizer(sourceBuffer) + + override def onPush(): Unit = { + sourceBuffer.clear() + val inputArray = grab(in).toArrayUnsafe() + sourceBuffer.put(inputArray) + sourceBuffer.flip() + if (sourceBuffer.hasRemaining) { + var result = ByteString.empty + while (sourceBuffer.hasRemaining) { + outputBuffer.clear() + decompressingStream.read(outputBuffer) + outputBuffer.flip() + val outputArray = new Array[Byte](outputBuffer.limit()) + outputBuffer.get(outputArray) + result = result.concat(ByteString.fromArrayUnsafe(outputArray)) + } + + // Fencepost case, it's possible to still have sourceBuffer.hasRemaining and yet decompression result + // not output anything + if (result.nonEmpty) + push(out, result) + else + pull(in) + } else pull(in) + sourceBuffer.flip() + } + + override def onPull(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + sourceBuffer.flip() + if (sourceBuffer.hasRemaining) { + var result = ByteString.empty + while (sourceBuffer.hasRemaining) { + outputBuffer.clear() + decompressingStream.read(outputBuffer) + outputBuffer.flip() + val outputArray = new Array[Byte](outputBuffer.limit()) + outputBuffer.get(outputArray) + result = result.concat(ByteString.fromArrayUnsafe(outputArray)) + } + decompressingStream.close() + + // Fencepost case, it's possible to still have sourceBuffer.hasRemaining and yet decompression result + // not output anything + if (result.nonEmpty) + emit(out, result) + } + completeStage() + } + + override def postStop(): Unit = { + decompressingStream.close() + + if (ByteBufferCleaner.isSupported) + try { + ByteBufferCleaner.clean(sourceBuffer) + ByteBufferCleaner.clean(outputBuffer) + } catch { + case NonFatal(_) => /* ok, best effort attempt to cleanup failed */ + } + } + + setHandlers(in, out, this) + } + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDictionaryImpl.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDictionaryImpl.scala new file mode 100644 index 00000000000..3b6bf30d63b --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDictionaryImpl.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.io.compression + +import org.apache.pekko.annotation.InternalStableApi + +@InternalStableApi +private[pekko] abstract class ZstdDictionaryImpl { + def dictionary: Array[Byte] + def level: Option[Int] + def offset: Option[Int] + def length: Option[Int] +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala index f143528da06..049d9faac81 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala @@ -13,8 +13,16 @@ package org.apache.pekko.stream.javadsl +import java.util.Optional + +import scala.jdk.OptionConverters.RichOptional + +import com.github.luben.zstd._ + import org.apache.pekko import pekko.NotUsed +import pekko.stream.impl.io.compression.{ CompressionUtils, ZstdCompressor } +import pekko.stream.javadsl.compression.ZstdDictionary import pekko.stream.scaladsl import pekko.util.ByteString @@ -104,4 +112,67 @@ object Compression { def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] = scaladsl.Compression.deflate(level, nowrap, autoFlush).asJava + /** + * The minimum compression level supported by zstd + * @since 2.0.0 + */ + final val ZstdMinCompressionLevel: Int = Zstd.minCompressionLevel() + + /** + * The maximum compression level supported by zstd + * @since 2.0.0 + */ + final val ZstdMaxCompressionLevel: Int = Zstd.maxCompressionLevel() + + /** + * The zstd default compression level + * @since 2.0.0 + */ + final val ZstdDefaultCompressionLevel: Int = Zstd.defaultCompressionLevel() + + /** + * @since 2.0.0 + */ + def zstd: Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.zstd.asJava + + /** + * Same as [[zstd]] with a custom level and an optional dictionary. + * + * @param level The compression level, must be greater or equal to [[ZstdMinCompressionLevel]] and less than or equal + * to [[ZstdMaxCompressionLevel]] + * @param dictionary An optional dictionary that can be used for compression + * @since 2.0.0 + */ + def zstd(level: Int, dictionary: Optional[ZstdDictionary]): Flow[ByteString, ByteString, NotUsed] = { + require(level <= ZstdMaxCompressionLevel && level >= ZstdMinCompressionLevel) + CompressionUtils.compressorFlow(() => new ZstdCompressor(level, dictionary.toScala.map(_.toImpl))).asJava + } + + /** + * Same as [[zstd]] with a custom level, optional dictionary and configurable flush mode. + * + * @param level The compression level, must be greater or equal to [[ZstdMinCompressionLevel]] and less than or equal + * to [[ZstdMaxCompressionLevel]] + * @param dictionary An optional dictionary that can be used for compression + * @param autoFlush If true will automatically flush after every single element in the stream. + * @since 2.0.0 + */ + def zstd(level: Int, dictionary: Optional[ZstdDictionary], autoFlush: Boolean) + : Flow[ByteString, ByteString, NotUsed] = { + require(level <= ZstdMaxCompressionLevel && level >= ZstdMinCompressionLevel) + CompressionUtils.compressorFlow(() => new ZstdCompressor(level, dictionary.toScala.map(_.toImpl)), autoFlush).asJava + } + + /** + * The maximum block size used by zstd decompression + * @since 2.0.0 + */ + final val ZstdDecompressMaxBlockSize: Int = Zstd.blockSizeMax() + + /** + * @since 2.0.0 + */ + def zstdDecompress(maxBytesPerChunk: Int): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.zstdDecompress(maxBytesPerChunk).asJava } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/compression/ZstdDictionary.java b/stream/src/main/scala/org/apache/pekko/stream/javadsl/compression/ZstdDictionary.java new file mode 100644 index 00000000000..eb2449f5c9d --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/compression/ZstdDictionary.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.javadsl.compression; + +import org.apache.pekko.annotation.InternalApi; +import org.apache.pekko.stream.impl.io.compression.ZstdDictionaryImpl; +import org.apache.pekko.util.ByteString; +import org.apache.pekko.util.OptionalUtil; +import scala.Option; + +import java.util.OptionalInt; + +/** + * Configuration class based on the official C zstd library to specify dictionary options for + * compression. + * + * @see Zstd dictionary readme + * @see Zstd C manual + * @since 2.0.0 + */ +public class ZstdDictionary { + byte[] dictionary; + private OptionalInt level = OptionalInt.empty(); + private OptionalInt offset = OptionalInt.empty(); + private OptionalInt length = OptionalInt.empty(); + + public ZstdDictionary(byte[] dictionary) { + this.dictionary = dictionary; + } + + public ZstdDictionary(byte[] dictionary, int level) { + this.dictionary = dictionary; + this.level = OptionalInt.of(level); + } + + public ZstdDictionary(byte[] dictionary, Integer level) { + this.dictionary = dictionary; + if (level != null) { + this.level = OptionalInt.of(level); + } + } + + public ZstdDictionary(byte[] dictionary, int level, int offset, int length) { + this.dictionary = dictionary; + this.level = OptionalInt.of(level); + this.offset = OptionalInt.of(offset); + this.length = OptionalInt.of(length); + } + + public ZstdDictionary(byte[] dictionary, Integer level, Integer offset, Integer length) { + this.dictionary = dictionary; + if (level != null) { + this.level = OptionalInt.of(level); + } + if (offset != null) { + this.offset = OptionalInt.of(offset); + } + if (length != null) { + this.length = OptionalInt.of(length); + } + } + + public ZstdDictionary(ByteString dictionary) { + this(dictionary.toArray()); + } + + public ZstdDictionary(ByteString dictionary, int level) { + this(dictionary.toArray(), level); + } + + public ZstdDictionary(ByteString dictionary, Integer level) { + this(dictionary.toArray(), level); + } + + public ZstdDictionary(ByteString dictionary, int level, int offset, int length) { + this(dictionary.toArray(), level, offset, length); + } + + public ZstdDictionary(ByteString dictionary, Integer level, Integer offset, Integer length) { + this(dictionary.toArray(), level, offset, length); + } + + public ZstdDictionary(String dictionary) { + this(dictionary.getBytes()); + } + + public ZstdDictionary(String dictionary, int level) { + this(dictionary.getBytes(), level); + } + + public ZstdDictionary(String dictionary, Integer level) { + this(dictionary.getBytes(), level); + } + + public ZstdDictionary(String dictionary, int level, int offset, int length) { + this(dictionary.getBytes(), level, offset, length); + } + + public ZstdDictionary(String dictionary, Integer level, Integer offset, Integer length) { + this(dictionary.getBytes(), level, offset, length); + } + + public byte[] getDictionary() { + return dictionary; + } + + public OptionalInt getLevel() { + return level; + } + + public OptionalInt getOffset() { + return offset; + } + + public OptionalInt getLength() { + return length; + } + + public void setDictionary(byte[] dictionary) { + this.dictionary = dictionary; + } + + public void setLevel(int level) { + this.level = OptionalInt.of(level); + } + + public void setLevel(OptionalInt level) { + this.level = level; + } + + public void setLevel(Integer level) { + if (level == null) { + this.level = OptionalInt.empty(); + } else { + this.level = OptionalInt.of(level); + } + } + + public void setOffset(int offset) { + this.offset = OptionalInt.of(offset); + } + + public void setOffset(Integer offset) { + if (offset == null) { + this.offset = OptionalInt.empty(); + } else { + this.offset = OptionalInt.of(offset); + } + } + + public void setOffset(OptionalInt offset) { + this.offset = offset; + } + + public void setLength(int length) { + this.length = OptionalInt.of(length); + } + + public void setLength(Integer length) { + if (length == null) { + this.length = OptionalInt.empty(); + } else { + this.length = OptionalInt.of(length); + } + } + + public void setLength(OptionalInt length) { + this.length = length; + } + + @InternalApi + public ZstdDictionaryImpl toImpl() { + return new ZstdDictionaryImpl() { + @Override + public byte[] dictionary() { + return dictionary; + } + + @Override + public Option