Skip to content

Commit f11a59a

Browse files
committed
add jspecify nullable annotations on streams Java DSL
1 parent 0428719 commit f11a59a

File tree

9 files changed

+80
-40
lines changed

9 files changed

+80
-40
lines changed

project/Dependencies.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ object Dependencies {
108108

109109
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
110110

111+
val jspecify = "org.jspecify" % "jspecify" % "1.0.0" % Optional
112+
111113
object Docs {
112114
val sprayJson = "io.spray" %% "spray-json" % "1.3.6" % Test
113115
val gson = "com.google.code.gson" % "gson" % "2.13.2" % Test
@@ -352,7 +354,7 @@ object Dependencies {
352354

353355
// pekko stream
354356

355-
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
357+
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, jspecify, TestDependencies.scalatest)
356358

357359
lazy val streamTestkit = l ++= Seq(
358360
TestDependencies.scalatest,
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* Java API for Pekko Streams.
20+
*
21+
* This package contains the Java DSL for Pekko Streams. For the Scala DSL see
22+
* [[org.apache.pekko.stream.scaladsl]].
23+
*/
24+
@org.jspecify.annotations.NullMarked
25+
package org.apache.pekko.stream.javadsl;

stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
4343
import pekko.util.ConstantFun
4444
import pekko.util.Timeout
4545

46+
import org.jspecify.annotations.Nullable
4647
import org.reactivestreams.Processor
4748

4849
object Flow {
@@ -3492,7 +3493,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
34923493
* '''Cancels when''' downstream cancels
34933494
*/
34943495
def interleaveAll(
3495-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
3496+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
34963497
segmentSize: Int,
34973498
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
34983499
import pekko.util.Collections._
@@ -3576,7 +3577,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
35763577
* '''Cancels when''' downstream cancels
35773578
*/
35783579
def mergeAll(
3579-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
3580+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
35803581
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
35813582
import pekko.util.Collections._
35823583
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4318,7 +4319,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
43184319
*
43194320
* '''Cancels when''' downstream cancels
43204321
*/
4321-
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
4322+
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter)
4323+
: javadsl.Flow[In, Out, Mat] =
43224324
new Flow(delegate.log(name, e => extract.apply(e))(log))
43234325

43244326
/**
@@ -4359,7 +4361,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
43594361
*
43604362
* '''Cancels when''' downstream cancels
43614363
*/
4362-
def log(name: String, log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
4364+
def log(name: String, @Nullable log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
43634365
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
43644366

43654367
/**
@@ -4406,7 +4408,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
44064408
name: String,
44074409
marker: function.Function[Out, LogMarker],
44084410
extract: function.Function[Out, Any],
4409-
log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
4411+
@Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
44104412
new Flow(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))
44114413

44124414
/**
@@ -4453,7 +4455,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
44534455
def logWithMarker(
44544456
name: String,
44554457
marker: function.Function[Out, LogMarker],
4456-
log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
4458+
@Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
44574459
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
44584460

44594461
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import pekko.japi.{ function, Pair }
2929
import pekko.stream._
3030
import pekko.util.ConstantFun
3131

32+
import org.jspecify.annotations.Nullable
33+
3234
object FlowWithContext {
3335

3436
def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, pekko.NotUsed] =
@@ -304,7 +306,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
304306
def log(
305307
name: String,
306308
extract: function.Function[Out, Any],
307-
log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
309+
@Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
308310
viaScala(_.log(name, e => extract.apply(e))(log))
309311

310312
/**
@@ -320,7 +322,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
320322
*
321323
* @see [[pekko.stream.javadsl.Flow.log]]
322324
*/
323-
def log(name: String, log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
325+
def log(name: String, @Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
324326
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
325327

326328
/**
@@ -340,7 +342,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
340342
name: String,
341343
marker: function.Function2[Out, CtxOut, LogMarker],
342344
extract: function.Function[Out, Any],
343-
log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
345+
@Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
344346
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log))
345347

346348
/**
@@ -362,7 +364,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
362364
def logWithMarker(
363365
name: String,
364366
marker: function.Function2[Out, CtxOut, LogMarker],
365-
log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
367+
@Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
366368
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
367369

368370
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import pekko.stream.impl.LinearTraversalBuilder
3434
import pekko.stream.scaladsl.SinkToCompletionStage
3535
import pekko.util.ConstantFun.scalaAnyToUnit
3636

37+
import org.jspecify.annotations.Nullable
3738
import org.reactivestreams.{ Publisher, Subscriber }
3839

3940
/** Java API */
@@ -435,7 +436,7 @@ object Sink {
435436
def combine[T, U](
436437
output1: Sink[U, _],
437438
output2: Sink[U, _],
438-
rest: java.util.List[Sink[U, _]],
439+
@Nullable rest: java.util.List[Sink[U, _]],
439440
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
440441
: Sink[T, NotUsed] = {
441442
import scala.jdk.CollectionConverters._
@@ -462,7 +463,7 @@ object Sink {
462463
* @since 1.1.0
463464
*/
464465
def combine[T, U, M](
465-
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
466+
@Nullable sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
466467
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
467468
: Sink[T, java.util.List[M]] = {
468469
import pekko.util.Collections._

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
4141
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
4242
import pekko.util._
4343

44+
import org.jspecify.annotations.Nullable
4445
import org.reactivestreams.{ Publisher, Subscriber }
4546

4647
/** Java API */
@@ -547,7 +548,7 @@ object Source {
547548
def combine[T, U](
548549
first: Source[T, _ <: Any],
549550
second: Source[T, _ <: Any],
550-
rest: java.util.List[Source[T, _ <: Any]],
551+
@Nullable rest: java.util.List[Source[T, _ <: Any]],
551552
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
552553
: Source[U, NotUsed] = {
553554
import scala.jdk.CollectionConverters._
@@ -573,7 +574,7 @@ object Source {
573574
* @since 1.1.0
574575
*/
575576
def combine[T, U, M](
576-
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
577+
@Nullable sources: java.util.List[_ <: Graph[SourceShape[T], M]],
577578
fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]])
578579
: Source[U, java.util.List[M]] = {
579580
import pekko.util.Collections._
@@ -588,7 +589,7 @@ object Source {
588589
/**
589590
* Combine the elements of multiple streams into a stream of lists.
590591
*/
591-
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
592+
def zipN[T](@Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
592593
import scala.jdk.CollectionConverters._
593594
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
594595
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
@@ -599,7 +600,7 @@ object Source {
599600
*/
600601
def zipWithN[T, O](
601602
zipper: function.Function[java.util.List[T], O],
602-
sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
603+
@Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
603604
import scala.jdk.CollectionConverters._
604605
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
605606
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
@@ -847,7 +848,7 @@ object Source {
847848
* '''Cancels when''' downstream cancels
848849
*/
849850
def mergePrioritizedN[T](
850-
sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
851+
@Nullable sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
851852
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
852853
import scala.jdk.CollectionConverters._
853854
val seq =
@@ -1628,7 +1629,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
16281629
* '''Cancels when''' downstream cancels
16291630
*/
16301631
def interleaveAll(
1631-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
1632+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
16321633
segmentSize: Int,
16331634
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
16341635
import pekko.util.Collections._
@@ -1710,7 +1711,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
17101711
* '''Cancels when''' downstream cancels
17111712
*/
17121713
def mergeAll(
1713-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
1714+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
17141715
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
17151716
import pekko.util.Collections._
17161717
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4811,7 +4812,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
48114812
*
48124813
* '''Cancels when''' downstream cancels
48134814
*/
4814-
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Source[Out, Mat] =
4815+
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter): javadsl.Source[Out, Mat] =
48154816
new Source(delegate.log(name, e => extract.apply(e))(log))
48164817

48174818
/**
@@ -4852,7 +4853,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
48524853
*
48534854
* '''Cancels when''' downstream cancels
48544855
*/
4855-
def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] =
4856+
def log(name: String, @Nullable log: LoggingAdapter): javadsl.Source[Out, Mat] =
48564857
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
48574858

48584859
/**
@@ -4899,7 +4900,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
48994900
name: String,
49004901
marker: function.Function[Out, LogMarker],
49014902
extract: function.Function[Out, Any],
4902-
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
4903+
@Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
49034904
new Source(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))
49044905

49054906
/**
@@ -4946,7 +4947,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
49464947
def logWithMarker(
49474948
name: String,
49484949
marker: function.Function[Out, LogMarker],
4949-
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
4950+
@Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
49504951
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
49514952

49524953
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import pekko.japi.function
3131
import pekko.stream._
3232
import pekko.util.ConstantFun
3333

34+
import org.jspecify.annotations.Nullable
35+
3436
object SourceWithContext {
3537

3638
/**
@@ -290,7 +292,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
290292
*
291293
* @see [[pekko.stream.javadsl.Source.log]]
292294
*/
293-
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
295+
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter)
296+
: SourceWithContext[Out, Ctx, Mat] =
294297
viaScala(_.log(name, e => extract.apply(e))(log))
295298

296299
/**
@@ -306,7 +309,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
306309
*
307310
* @see [[pekko.stream.javadsl.Flow.log]]
308311
*/
309-
def log(name: String, log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
312+
def log(name: String, @Nullable log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
310313
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
311314

312315
/**
@@ -326,7 +329,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
326329
name: String,
327330
marker: function.Function2[Out, Ctx, LogMarker],
328331
extract: function.Function[Out, Any],
329-
log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
332+
@Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
330333
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log))
331334

332335
/**
@@ -348,7 +351,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
348351
def logWithMarker(
349352
name: String,
350353
marker: function.Function2[Out, Ctx, LogMarker],
351-
log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
354+
@Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
352355
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
353356

354357
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import pekko.stream.impl.Stages.DefaultAttributes
3535
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
3636
import pekko.util.ConstantFun
3737

38+
import org.jspecify.annotations.Nullable
39+
3840
object SubFlow {
3941

4042
/**
@@ -2366,7 +2368,7 @@ final class SubFlow[In, Out, Mat](
23662368
* '''Cancels when''' downstream cancels
23672369
*/
23682370
def mergeAll(
2369-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
2371+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
23702372
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
23712373
import pekko.util.Collections._
23722374
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -2424,7 +2426,7 @@ final class SubFlow[In, Out, Mat](
24242426
* '''Cancels when''' downstream cancels
24252427
*/
24262428
def interleaveAll(
2427-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
2429+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
24282430
segmentSize: Int,
24292431
eagerClose: Boolean): SubFlow[In, Out, Mat] = {
24302432
import pekko.util.Collections._
@@ -2916,7 +2918,7 @@ final class SubFlow[In, Out, Mat](
29162918
*
29172919
* '''Cancels when''' downstream cancels
29182920
*/
2919-
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SubFlow[In, Out, Mat] =
2921+
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter): SubFlow[In, Out, Mat] =
29202922
new SubFlow(delegate.log(name, e => extract.apply(e))(log))
29212923

29222924
/**
@@ -2957,7 +2959,7 @@ final class SubFlow[In, Out, Mat](
29572959
*
29582960
* '''Cancels when''' downstream cancels
29592961
*/
2960-
def log(name: String, log: LoggingAdapter): SubFlow[In, Out, Mat] =
2962+
def log(name: String, @Nullable log: LoggingAdapter): SubFlow[In, Out, Mat] =
29612963
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
29622964

29632965
/**
@@ -3004,7 +3006,7 @@ final class SubFlow[In, Out, Mat](
30043006
name: String,
30053007
marker: function.Function[Out, LogMarker],
30063008
extract: function.Function[Out, Any],
3007-
log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
3009+
@Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
30083010
new SubFlow(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))
30093011

30103012
/**
@@ -3051,7 +3053,7 @@ final class SubFlow[In, Out, Mat](
30513053
def logWithMarker(
30523054
name: String,
30533055
marker: function.Function[Out, LogMarker],
3054-
log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
3056+
@Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
30553057
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
30563058

30573059
/**

0 commit comments

Comments
 (0)