Skip to content

Commit 085bb42

Browse files
committed
Add zstd compression and decompression for pekko streams
1 parent f24b072 commit 085bb42

File tree

13 files changed

+373
-9
lines changed

13 files changed

+373
-9
lines changed

.scala-steward.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ updates.pin = [
55
{ groupId = "org.scala-lang", artifactId = "scala3-library", version = "3.3." }
66
# sbt-assembly 2.3 causes build issues (https://github.com/apache/pekko/pull/1744)
77
{ groupId = "com.eed3si9n", artifactId = "sbt-assembly", version = "2.2." }
8+
# zstd-jni upgrades are fine as long as they are backwards compatible
9+
{ groupId = "com.github.luben", artifactId = "zstd-jni", version = "1."}
810
]
911

1012
updates.ignore = [
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Compression.zstd
2+
3+
Creates a flow that zstd-compresses a stream of ByteStrings.
4+
5+
@ref[Compression operators](../index.md#compression-operators)
6+
7+
## Signature
8+
9+
@apidoc[Compression.zstd](stream.*.Compression$) { scala="#zstd:org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstd()" }
10+
11+
## Description
12+
13+
Creates a flow that zstd-compresses a stream of ByteStrings. Note that the compressor
14+
will SYNC_FLUSH after every @apidoc[util.ByteString] so that it is guaranteed that every @apidoc[util.ByteString]
15+
coming out of the flow can be fully decompressed without waiting for additional data. This may
16+
come at a compression performance cost for very small chunks.
17+
18+
Use the overload method to control the compression level.
19+
20+
## Reactive Streams semantics
21+
22+
@@@div { .callout }
23+
24+
**emits** when the compression algorithm produces output for the received `ByteString`
25+
26+
**backpressures** when downstream backpressures
27+
28+
**completes** when upstream completes
29+
30+
@@@
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Compression.zstdDecompress
2+
3+
Creates a flow that zstd-decompresses a stream of ByteStrings.
4+
5+
@ref[Compression operators](../index.md#compression-operators)
6+
7+
## Signature
8+
9+
@apidoc[Compression.zstdDecompress](stream.*.Compression$) { scala="#zstdDecompress(maxBytesPerChunk:Int):org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstdDecompress(int)" }
10+
11+
## Description
12+
13+
Creates a flow that zstd-decompresses a stream of ByteStrings. If the input is truncated, uses invalid
14+
compression method or is invalid (failed CRC checks) this operator fails with a `com.github.luben.zstd.ZstdIOException`.
15+
16+
## Reactive Streams semantics
17+
18+
@@@div { .callout }
19+
20+
**emits** when the decompression algorithm produces output for the received `ByteString` (the emitted `ByteString` is of `maxBytesPerChunk` maximum length)
21+
22+
**backpressures** when downstream backpressures
23+
24+
**completes** when upstream completes
25+
26+
@@@

project/Dependencies.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,8 @@ object Dependencies {
351351

352352
// pekko stream
353353

354-
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
354+
lazy val stream =
355+
l ++= Seq[sbt.ModuleID](reactiveStreams, "com.github.luben" % "zstd-jni" % "1.5.7-6", TestDependencies.scalatest)
355356

356357
lazy val streamTestkit = l ++= Seq(
357358
TestDependencies.scalatest,

stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ package org.apache.pekko.stream.io.compression
1515

1616
import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
1717
import java.util.concurrent.ThreadLocalRandom
18-
import java.util.zip.DataFormatException
1918

2019
import scala.annotation.tailrec
2120
import scala.concurrent.Await
2221
import scala.concurrent.duration._
22+
import scala.reflect.ClassTag
2323
import scala.util.control.NoStackTrace
2424

2525
import org.apache.pekko
@@ -31,7 +31,8 @@ import pekko.util.ByteString
3131
import org.scalatest.Inspectors
3232
import org.scalatest.wordspec.AnyWordSpec
3333

34-
abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSupport with Inspectors {
34+
abstract class CoderSpec[CorruptInputException: ClassTag](codecName: String) extends AnyWordSpec with CodecSpecSupport
35+
with Inspectors {
3536
import CompressionTestingTools._
3637

3738
protected def newCompressor(): Compressor
@@ -83,7 +84,7 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu
8384
"throw an error on corrupt input" in {
8485
(the[RuntimeException] thrownBy {
8586
ourDecode(corruptContent)
86-
}).ultimateCause should be(a[DataFormatException])
87+
}).ultimateCause should be(a[CorruptInputException])
8788
}
8889
}
8990

stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import pekko.stream.impl.io.compression.{ Compressor, DeflateCompressor }
2121
import pekko.stream.scaladsl.{ Compression, Flow }
2222
import pekko.util.ByteString
2323

24-
class DeflateSpec extends CoderSpec("deflate") {
24+
class DeflateSpec extends CoderSpec[DataFormatException]("deflate") {
2525
import CompressionTestingTools._
2626

2727
protected def newCompressor(): Compressor = new DeflateCompressor

stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ package org.apache.pekko.stream.io.compression
1515

1616
import java.io.{ InputStream, OutputStream }
1717
import java.nio.charset.StandardCharsets
18-
import java.util.zip.{ GZIPInputStream, GZIPOutputStream, ZipException }
18+
import java.util.zip.{ DataFormatException, GZIPInputStream, GZIPOutputStream, ZipException }
1919

2020
import org.apache.pekko
2121
import pekko.stream.impl.io.compression.{ Compressor, GzipCompressor }
2222
import pekko.stream.scaladsl.{ Compression, Flow }
2323
import pekko.util.ByteString
2424

25-
class GzipSpec extends CoderSpec("gzip") {
25+
class GzipSpec extends CoderSpec[DataFormatException]("gzip") {
2626
import CompressionTestingTools._
2727

2828
protected def newCompressor(): Compressor = new GzipCompressor
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.io.compression
19+
20+
import com.github.luben.zstd.{ ZstdIOException, ZstdInputStream, ZstdOutputStream }
21+
import org.apache.pekko.stream.impl.io.compression.{ Compressor, ZstdCompressor }
22+
import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
23+
import org.apache.pekko.util.ByteString
24+
25+
import java.io.{ InputStream, OutputStream }
26+
27+
class ZstdSpec extends CoderSpec[ZstdIOException]("zstd") {
28+
import CompressionTestingTools._
29+
30+
override protected def newCompressor(): Compressor = new ZstdCompressor
31+
32+
override protected def encoderFlow: Flow[ByteString, ByteString, Any] = Compression.zstd
33+
34+
override protected def decoderFlow(maxBytesPerChunk: Int): Flow[ByteString, ByteString, Any] =
35+
Compression.zstdDecompress(maxBytesPerChunk)
36+
37+
override protected def newDecodedInputStream(underlying: InputStream): InputStream =
38+
new ZstdInputStream(underlying)
39+
40+
override protected def newEncodedOutputStream(underlying: OutputStream): OutputStream =
41+
new ZstdOutputStream(underlying)
42+
43+
override def extraTests(): Unit = {
44+
"decode concatenated compressions" in {
45+
ourDecode(Seq(encode("Hello, "), encode("dear "), encode("User!")).join) should readAs("Hello, dear User!")
46+
}
47+
}
48+
}

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/CompressionSpec.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ package org.apache.pekko.stream.scaladsl
1616
import java.nio.charset.StandardCharsets
1717

1818
import org.apache.pekko
19-
import pekko.stream.impl.io.compression.DeflateCompressor
20-
import pekko.stream.impl.io.compression.GzipCompressor
19+
import pekko.stream.impl.io.compression.{ DeflateCompressor, GzipCompressor, ZstdCompressor }
2120
import pekko.stream.testkit.StreamSpec
2221
import pekko.util.ByteString
2322

@@ -27,6 +26,8 @@ class CompressionSpec extends StreamSpec {
2726

2827
def deflate(s: String): ByteString = new DeflateCompressor().compressAndFinish(ByteString(s))
2928

29+
def zstd(s: String): ByteString = new ZstdCompressor().compressAndFinish(ByteString(s))
30+
3031
val data = "hello world"
3132

3233
"Gzip decompression" must {
@@ -47,4 +48,14 @@ class CompressionSpec extends StreamSpec {
4748
res.futureValue should ===(data)
4849
}
4950
}
51+
52+
"Zstd decompression" must {
53+
"be able to decompress a zstd stream" in {
54+
val source =
55+
Source.single(zstd(data)).via(Compression.zstdDecompress()).map(_.decodeString(StandardCharsets.UTF_8))
56+
57+
val res = source.runFold("")(_ + _)
58+
res.futureValue should ===(data)
59+
}
60+
}
5061
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.impl.io.compression
19+
20+
import java.nio.ByteBuffer
21+
22+
import com.github.luben.zstd.{ Zstd, ZstdDictCompress, ZstdDirectBufferCompressingStreamNoFinalizer }
23+
24+
import org.apache.pekko
25+
import pekko.annotation.InternalApi
26+
import pekko.util.ByteString
27+
28+
/** INTERNAL API */
29+
@InternalApi private[pekko] class ZstdCompressor(
30+
compressionLevel: Int =
31+
Zstd.defaultCompressionLevel(), dictionary: Option[ZstdDictCompress] = None) extends Compressor {
32+
33+
private val targetBuffer = ByteBuffer.allocateDirect(65536)
34+
private val compressingStream = new ZstdDirectBufferCompressingStreamNoFinalizer(targetBuffer, compressionLevel)
35+
36+
dictionary.foreach(compressingStream.setDict)
37+
38+
override def compress(input: ByteString): ByteString = {
39+
val inputBB = ByteBuffer.allocateDirect(input.size)
40+
inputBB.put(input.toArrayUnsafe())
41+
inputBB.flip()
42+
compressingStream.compress(inputBB)
43+
val result = ByteString.fromByteBuffer(targetBuffer)
44+
targetBuffer.flip()
45+
result
46+
}
47+
48+
override def flush(): ByteString = {
49+
targetBuffer.flip()
50+
val result = ByteString.fromByteBuffer(targetBuffer)
51+
targetBuffer.clear()
52+
compressingStream.flush()
53+
result
54+
}
55+
56+
override def finish(): ByteString = {
57+
compressingStream.close()
58+
targetBuffer.flip()
59+
val arr = Array.ofDim[Byte](targetBuffer.limit())
60+
targetBuffer.get(arr)
61+
val result = ByteString.fromArrayUnsafe(arr)
62+
result
63+
}
64+
65+
override def compressAndFlush(input: ByteString): ByteString = {
66+
val inputBB = ByteBuffer.allocateDirect(input.size)
67+
inputBB.put(input.toArray)
68+
inputBB.flip()
69+
compressingStream.compress(inputBB)
70+
compressingStream.flush()
71+
targetBuffer.flip()
72+
73+
val arr = new Array[Byte](targetBuffer.limit())
74+
targetBuffer.get(arr)
75+
targetBuffer.clear()
76+
ByteString.fromArrayUnsafe(arr)
77+
}
78+
79+
override def compressAndFinish(input: ByteString): ByteString = {
80+
val inputBB = ByteBuffer.allocateDirect(input.size)
81+
inputBB.put(input.toArray)
82+
inputBB.flip()
83+
compressingStream.compress(inputBB)
84+
compressingStream.close()
85+
targetBuffer.flip()
86+
87+
val arr = new Array[Byte](targetBuffer.limit())
88+
targetBuffer.get(arr)
89+
ByteString.fromArrayUnsafe(arr)
90+
}
91+
92+
override def close(): Unit = compressingStream.close()
93+
}

0 commit comments

Comments
 (0)