From f9e37aec5996654c31e598955f4e06f6335d0b70 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 01:21:28 +0100 Subject: [PATCH] init work on RandomNumberGenerator scalafmt some compile issues Update ReplicatedShardingTest.java stub of JEP 356 generator Update RandomNumberGenerator.scala java version check Update RandomNumberGenerator.scala add test Update RandomNumberGenerator.scala add test Update RandomNumberGeneratorJava21Spec.scala merge issue scalafmt --- .../typed/internal/StubbedActorContext.scala | 13 ++- .../typed/javadsl/BehaviorTestKit.scala | 6 +- .../testkit/typed/javadsl/TestInbox.scala | 7 +- .../typed/scaladsl/BehaviorTestKit.scala | 5 +- .../testkit/typed/scaladsl/TestInbox.scala | 5 +- .../RandomNumberGeneratorJava21Spec.scala | 37 +++++++ .../util/RandomNumberGeneratorSpec.scala | 34 ++++++ .../WorkPullingProducerControllerImpl.scala | 5 +- .../actor/typed/internal/Supervision.scala | 6 +- .../typed/internal/routing/RoutingLogic.scala | 5 +- actor/src/main/resources/reference.conf | 9 ++ .../org/apache/pekko/actor/ActorCell.scala | 4 +- .../org/apache/pekko/actor/ActorSystem.scala | 2 +- .../org/apache/pekko/io/dns/IdGenerator.scala | 15 ++- .../apache/pekko/pattern/CircuitBreaker.scala | 5 +- .../apache/pekko/pattern/RetrySupport.scala | 6 +- .../routing/OptimalSizeExploringResizer.scala | 4 +- .../org/apache/pekko/routing/Random.scala | 5 +- .../pekko/routing/SmallestMailbox.scala | 5 +- .../pekko/util/RandomNumberGenerator.scala | 103 ++++++++++++++++++ .../metrics/ClusterMetricsCollector.scala | 5 +- .../metrics/ClusterMetricsRouting.scala | 6 +- .../pubsub/DistributedPubSubMediator.scala | 4 +- .../pekko/cluster/MembershipState.scala | 9 +- .../pekko/cluster/ddata/Replicator.scala | 7 +- .../FailureInjectorTransportAdapter.scala | 5 +- .../stream/impl/fusing/GraphInterpreter.scala | 5 +- 27 files changed, 250 insertions(+), 72 deletions(-) create mode 100644 actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala create mode 100644 actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala create mode 100644 actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala index cc2bce60207..ea19b114cf4 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala @@ -19,12 +19,11 @@ import pekko.actor.typed._ import pekko.actor.typed.internal._ import pekko.actor.{ ActorPath, ActorRefProvider, InvalidMessageException } import pekko.annotation.InternalApi -import pekko.util.Helpers +import pekko.util.{ Helpers, RandomNumberGenerator } import pekko.{ actor => classic } import org.slf4j.{ Logger, Marker } import org.slf4j.helpers.{ MessageFormatter, SubstituteLoggerFactory } -import java.util.concurrent.ThreadLocalRandom.{ current => rnd } import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration @@ -75,7 +74,7 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( extends ActorContextImpl[T] { def this(system: ActorSystemStub, name: String, currentBehaviorProvider: () => Behavior[T]) = { - this(system, (system.path / name).withUid(rnd().nextInt()), currentBehaviorProvider) + this(system, (system.path / name).withUid(RandomNumberGenerator.get().nextInt()), currentBehaviorProvider) } def this(name: String, currentBehaviorProvider: () => Behavior[T]) = { @@ -111,7 +110,8 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( override def spawnAnonymous[U](behavior: Behavior[U], props: Props = Props.empty): ActorRef[U] = { checkCurrentActorThread() - val btk = new BehaviorTestKitImpl[U](system, (path / childName.next()).withUid(rnd().nextInt()), behavior) + val btk = new BehaviorTestKitImpl[U](system, + (path / childName.next()).withUid(RandomNumberGenerator.get().nextInt()), behavior) _children += btk.context.self.path.name -> btk btk.context.self } @@ -120,7 +120,8 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( _children.get(name) match { case Some(_) => throw classic.InvalidActorNameException(s"actor name $name is already taken") case None => - val btk = new BehaviorTestKitImpl[U](system, (path / name).withUid(rnd().nextInt()), behavior) + val btk = + new BehaviorTestKitImpl[U](system, (path / name).withUid(RandomNumberGenerator.get().nextInt()), behavior) _children += name -> btk btk.context.self } @@ -172,7 +173,7 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( @InternalApi private[pekko] def internalSpawnMessageAdapter[U](f: U => T, name: String): ActorRef[U] = { val n = if (name != "") s"${childName.next()}-$name" else childName.next() - val p = (path / n).withUid(rnd().nextInt()) + val p = (path / n).withUid(RandomNumberGenerator.get().nextInt()) val i = new BehaviorTestKitImpl[U](system, p, BehaviorImpl.ignore) _children += p.name -> i diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala index 385b6549e2f..b875148c967 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala @@ -19,9 +19,9 @@ import pekko.actor.testkit.typed.{ CapturedLogEvent, Effect } import pekko.actor.typed.receptionist.Receptionist import pekko.actor.typed.{ ActorRef, Behavior, Signal } import pekko.annotation.{ ApiMayChange, DoNotInherit } -import com.typesafe.config.Config +import pekko.util.RandomNumberGenerator -import java.util.concurrent.ThreadLocalRandom +import com.typesafe.config.Config object BehaviorTestKit { @@ -37,7 +37,7 @@ object BehaviorTestKit { @ApiMayChange def create[T](initialBehavior: Behavior[T], name: String, config: Config): BehaviorTestKit[T] = { val system = new ActorSystemStub("StubbedActorContext", config) - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new BehaviorTestKitImpl(system, (system.path / name).withUid(uid), initialBehavior) } diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala index 6bd0a30f5af..3b5da4ffc83 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala @@ -13,8 +13,6 @@ package org.apache.pekko.actor.testkit.typed.javadsl -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import org.apache.pekko @@ -22,16 +20,17 @@ import pekko.actor.testkit.typed.internal.TestInboxImpl import pekko.actor.typed.ActorRef import pekko.annotation.DoNotInherit import pekko.util.ccompat.JavaConverters._ +import pekko.util.RandomNumberGenerator object TestInbox { import pekko.actor.testkit.typed.scaladsl.TestInbox.address def create[T](name: String): TestInbox[T] = { - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new TestInboxImpl((address / name).withUid(uid)) } def create[T](): TestInbox[T] = { - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new TestInboxImpl((address / "inbox").withUid(uid)) } } diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala index a69b05778b6..1a8ffc23ac5 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala @@ -19,9 +19,10 @@ import pekko.actor.testkit.typed.{ CapturedLogEvent, Effect } import pekko.actor.typed.receptionist.Receptionist import pekko.actor.typed.{ ActorRef, Behavior, Signal, TypedActorContext } import pekko.annotation.{ ApiMayChange, DoNotInherit } +import pekko.util.RandomNumberGenerator + import com.typesafe.config.Config -import java.util.concurrent.ThreadLocalRandom import scala.collection.immutable import scala.reflect.ClassTag @@ -32,7 +33,7 @@ object BehaviorTestKit { def apply[T](initialBehavior: Behavior[T], name: String, config: Config): BehaviorTestKit[T] = { val system = new ActorSystemStub("StubbedActorContext", config) - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new BehaviorTestKitImpl(system, (system.path / name).withUid(uid), initialBehavior) } def apply[T](initialBehavior: Behavior[T], name: String): BehaviorTestKit[T] = { diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala index 9f66b97a43a..9e0a767c557 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala @@ -13,8 +13,6 @@ package org.apache.pekko.actor.testkit.typed.scaladsl -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import org.apache.pekko @@ -22,11 +20,12 @@ import pekko.actor.{ Address, RootActorPath } import pekko.actor.testkit.typed.internal.TestInboxImpl import pekko.actor.typed.ActorRef import pekko.annotation.{ ApiMayChange, DoNotInherit } +import pekko.util.RandomNumberGenerator @ApiMayChange object TestInbox { def apply[T](name: String = "inbox"): TestInbox[T] = { - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new TestInboxImpl((address / name).withUid(uid)) } diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala new file mode 100644 index 00000000000..7c952054425 --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import com.typesafe.config.ConfigFactory +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class RandomNumberGeneratorJava21Spec extends AnyWordSpec with Matchers { + + "RandomNumberGenerator (Java 21+)" should { + + "support config" in { + val config = ConfigFactory.parseString( + """pekko.random.generator-implementation = "Xoroshiro128PlusPlus"""") + val rng = RandomNumberGenerator.createGenerator(config) + rng shouldBe a[Jep356RandomNumberGenerator] + rng.nextInt(10) should (be >= 0 and be < 10) + } + + } +} diff --git a/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala new file mode 100644 index 00000000000..5202ea10123 --- /dev/null +++ b/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class RandomNumberGeneratorSpec extends AnyWordSpec with Matchers { + + "RandomNumberGenerator" should { + + "default to ThreadLocalRandom" in { + val rng = RandomNumberGenerator.get() + rng shouldEqual ThreadLocalRandomNumberGenerator + rng.nextInt(10) should (be >= 0 and be < 10) + } + + } +} diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala index aa890a91acd..8df923f575a 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala @@ -14,7 +14,6 @@ package org.apache.pekko.actor.typed.delivery.internal import java.util.UUID -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeoutException import scala.reflect.ClassTag @@ -39,7 +38,7 @@ import pekko.actor.typed.scaladsl.Behaviors import pekko.actor.typed.scaladsl.LoggerOps import pekko.actor.typed.scaladsl.StashBuffer import pekko.annotation.InternalApi -import pekko.util.Timeout +import pekko.util.{ RandomNumberGenerator, Timeout } /** * INTERNAL API @@ -404,7 +403,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( if (workers.isEmpty) { None } else { - val i = ThreadLocalRandom.current().nextInt(workers.size) + val i = RandomNumberGenerator.get().nextInt(workers.size) Some(workers(i)) } } diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala index bbc4366cb26..ce7803cf12e 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala @@ -14,8 +14,6 @@ package org.apache.pekko.actor.typed package internal -import java.util.concurrent.ThreadLocalRandom - import scala.concurrent.duration.Deadline import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag @@ -33,7 +31,7 @@ import pekko.actor.typed.scaladsl.Behaviors import pekko.actor.typed.scaladsl.StashBuffer import pekko.annotation.InternalApi import pekko.event.Logging -import pekko.util.OptionVal +import pekko.util.{ OptionVal, RandomNumberGenerator } import pekko.util.unused import scala.util.Try @@ -187,7 +185,7 @@ private object RestartSupervisor { minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): FiniteDuration = { - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val rnd = 1.0 + RandomNumberGenerator.get().nextDouble() * randomFactor val calculatedDuration = Try(maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd).getOrElse(maxBackoff) calculatedDuration match { case f: FiniteDuration => f diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala index e06b1e598ec..397f51bb8df 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala @@ -13,13 +13,12 @@ package org.apache.pekko.actor.typed.internal.routing -import java.util.concurrent.ThreadLocalRandom - import org.apache.pekko import pekko.actor.Address import pekko.actor.typed.ActorRef import pekko.annotation.InternalApi import pekko.routing.ConsistentHash +import pekko.util.RandomNumberGenerator /** * Kept in the behavior, not shared between instances, meant to be stateful. @@ -89,7 +88,7 @@ private[pekko] object RoutingLogics { private var currentRoutees: Array[ActorRef[T]] = _ override def selectRoutee(msg: T): ActorRef[T] = { - val selectedIdx = ThreadLocalRandom.current().nextInt(currentRoutees.length) + val selectedIdx = RandomNumberGenerator.get().nextInt(currentRoutees.length) currentRoutees(selectedIdx) } diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 0cb04fabcc8..72064c865f7 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -1389,4 +1389,13 @@ pekko { } #//#circuit-breaker-default + random { + # The default random number generator used by the Pekko library. + # This setting does not affect SecureRandom use cases. + # This option is ignored if you are not using Java 17+. The default is "ThreadLocalRandom". + # Valid options are listed in + # https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/random/package-summary.html#algorithms + generator-implementation = "ThreadLocalRandom" + } + } diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala index 724b09c018a..91cd7dc241c 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala @@ -14,7 +14,6 @@ package org.apache.pekko.actor import java.io.{ NotSerializableException, ObjectOutputStream } -import java.util.concurrent.ThreadLocalRandom import scala.annotation.{ switch, tailrec } import scala.annotation.nowarn @@ -31,6 +30,7 @@ import pekko.dispatch.sysmsg._ import pekko.event.Logging.{ Debug, Error, LogEvent } import pekko.japi.function.Procedure import pekko.util.unused +import pekko.util.RandomNumberGenerator /** * The actor context - the view of the actor cell from the actor. @@ -392,7 +392,7 @@ private[pekko] object ActorCell { @tailrec final def newUid(): Int = { // Note that this uid is also used as hashCode in ActorRef, so be careful // to not break hashing if you change the way uid is generated - val uid = ThreadLocalRandom.current.nextInt() + val uid = RandomNumberGenerator.get().nextInt() if (uid == undefinedUid) newUid() else uid } diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala index 7a3d68e9a52..c0e22d6a81a 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala @@ -812,7 +812,7 @@ private[pekko] class ActorSystemImpl( setup: ActorSystemSetup) extends ExtendedActorSystem { - val uid: Long = ThreadLocalRandom.current.nextLong() + val uid: Long = RandomNumberGenerator.get().nextLong() if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-_]*$""")) throw new IllegalArgumentException( diff --git a/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala b/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala index 68856aa40b8..dfa9f64f129 100644 --- a/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala @@ -17,11 +17,12 @@ package org.apache.pekko.io.dns -import org.apache.pekko.annotation.InternalApi +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.util.RandomNumberGenerator import java.security.SecureRandom import java.util.Random -import java.util.concurrent.ThreadLocalRandom /** * INTERNAL API @@ -52,12 +53,12 @@ private[pekko] object IdGenerator { } def apply(policy: Policy): IdGenerator = policy match { - case Policy.ThreadLocalRandom => random(ThreadLocalRandom.current()) + case Policy.ThreadLocalRandom => random(RandomNumberGenerator.get()) case Policy.SecureRandom => random(new SecureRandom()) case Policy.EnhancedDoubleHashRandom => new EnhancedDoubleHashGenerator(new SecureRandom()) } - def apply(): IdGenerator = random(ThreadLocalRandom.current()) + def apply(): IdGenerator = random(RandomNumberGenerator.get()) /** * @return a random sequence of ids for production @@ -65,6 +66,12 @@ private[pekko] object IdGenerator { def random(rand: java.util.Random): IdGenerator = () => (rand.nextInt(UnsignedShortBound) + Short.MinValue).toShort + /** + * @return a random sequence of ids for production + */ + private def random(rand: RandomNumberGenerator): IdGenerator = + () => (rand.nextInt(UnsignedShortBound) + Short.MinValue).toShort + private[pekko] class EnhancedDoubleHashGenerator(seed: Random) extends IdGenerator { /** diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala index f9e73f7b7a0..d49702237c6 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala @@ -14,7 +14,7 @@ package org.apache.pekko.pattern import java.util.Optional -import java.util.concurrent.{ Callable, CompletionException, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom } +import java.util.concurrent.{ Callable, CompletionException, CompletionStage, CopyOnWriteArrayList } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.function.BiFunction import java.util.function.Consumer @@ -33,6 +33,7 @@ import pekko.pattern.internal.{ CircuitBreakerNoopTelemetry, CircuitBreakerTelem import pekko.annotation.InternalApi import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ +import pekko.util.RandomNumberGenerator /** * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread @@ -1064,7 +1065,7 @@ class CircuitBreaker( scheduler.scheduleOnce(currentResetTimeout) { attemptReset() } - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val rnd = 1.0 + RandomNumberGenerator.get().nextDouble() * randomFactor val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor * rnd match { case f: FiniteDuration => f case _ => currentResetTimeout diff --git a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala index 5582d360cda..5f3707d23db 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala @@ -20,9 +20,7 @@ import scala.util.control.NonFatal import org.apache.pekko import pekko.actor.Scheduler -import pekko.util.ConstantFun - -import java.util.concurrent.ThreadLocalRandom +import pekko.util.{ ConstantFun, RandomNumberGenerator } /** * This trait provides the retry utility function @@ -406,7 +404,7 @@ object RetrySupport extends RetrySupport { minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): FiniteDuration = { - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val rnd = 1.0 + RandomNumberGenerator.get().nextDouble() * randomFactor val calculatedDuration = Try(maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd).getOrElse(maxBackoff) calculatedDuration match { case f: FiniteDuration => f diff --git a/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala b/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala index 3967d264e3d..719d309727e 100644 --- a/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala +++ b/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala @@ -14,7 +14,6 @@ package org.apache.pekko.routing import java.time.LocalDateTime -import java.util.concurrent.ThreadLocalRandom import scala.collection.immutable import scala.concurrent.duration._ @@ -26,6 +25,7 @@ import org.apache.pekko import pekko.actor._ import pekko.annotation.InternalApi import pekko.util.JavaDurationConverters._ +import pekko.util.RandomNumberGenerator trait OptimalSizeExploringResizer extends Resizer { @@ -163,7 +163,7 @@ case class DefaultOptimalSizeExploringResizer( @InternalApi private[routing] var stopExploring = false - private def random = ThreadLocalRandom.current() + private def random = RandomNumberGenerator.get() private def checkParamAsProbability(value: Double, paramName: String): Unit = if (value < 0 || value > 1) diff --git a/actor/src/main/scala/org/apache/pekko/routing/Random.scala b/actor/src/main/scala/org/apache/pekko/routing/Random.scala index 9da3b228d01..6ad7adbda39 100644 --- a/actor/src/main/scala/org/apache/pekko/routing/Random.scala +++ b/actor/src/main/scala/org/apache/pekko/routing/Random.scala @@ -13,8 +13,6 @@ package org.apache.pekko.routing -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import scala.annotation.nowarn @@ -25,6 +23,7 @@ import pekko.actor.ActorSystem import pekko.actor.SupervisorStrategy import pekko.dispatch.Dispatchers import pekko.japi.Util.immutableSeq +import pekko.util.RandomNumberGenerator object RandomRoutingLogic { def apply(): RandomRoutingLogic = new RandomRoutingLogic @@ -38,7 +37,7 @@ object RandomRoutingLogic { final class RandomRoutingLogic extends RoutingLogic { override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = if (routees.isEmpty) NoRoutee - else routees(ThreadLocalRandom.current.nextInt(routees.size)) + else routees(RandomNumberGenerator.get().nextInt(routees.size)) } /** diff --git a/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala b/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala index a549d66bd5a..8aa7d6c87c5 100644 --- a/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala +++ b/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala @@ -13,8 +13,6 @@ package org.apache.pekko.routing -import java.util.concurrent.ThreadLocalRandom - import scala.annotation.tailrec import scala.collection.immutable @@ -27,6 +25,7 @@ import pekko.actor.ActorRefWithCell import pekko.actor.ActorSystem import pekko.actor.SupervisorStrategy import pekko.dispatch.Dispatchers +import pekko.util.RandomNumberGenerator object SmallestMailboxRoutingLogic { def apply(): SmallestMailboxRoutingLogic = new SmallestMailboxRoutingLogic @@ -71,7 +70,7 @@ class SmallestMailboxRoutingLogic extends RoutingLogic { NoRoutee else if (at >= targets.size) { if (deep) { - if (isTerminated(proposedTarget)) targets(ThreadLocalRandom.current.nextInt(targets.size)) else proposedTarget + if (isTerminated(proposedTarget)) targets(RandomNumberGenerator.get().nextInt(targets.size)) else proposedTarget } else selectNext(targets, proposedTarget, currentScore, 0, deep = true) } else { val target = targets(at) diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala new file mode 100644 index 00000000000..0c9893fb10e --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import org.apache.pekko +import pekko.annotation.InternalApi + +import com.typesafe.config.{ Config, ConfigFactory } + +import java.lang.invoke.{ MethodHandles, MethodType } +import java.util.concurrent.ThreadLocalRandom + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] trait RandomNumberGenerator { + def nextInt(): Int + def nextInt(n: Int): Int + def nextInt(origin: Int, n: Int): Int + def nextLong(): Long + def nextDouble(): Double +} + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGenerator { + override def nextInt(): Int = ThreadLocalRandom.current().nextInt() + override def nextInt(bound: Int): Int = ThreadLocalRandom.current().nextInt(bound) + override def nextInt(origin: Int, bound: Int): Int = ThreadLocalRandom.current().nextInt(origin, bound) + override def nextLong(): Long = ThreadLocalRandom.current().nextLong() + override def nextDouble(): Double = ThreadLocalRandom.current().nextDouble() +} + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] final class Jep356RandomNumberGenerator(impl: String) extends RandomNumberGenerator { + + // https://openjdk.org/jeps/356 + + private val rngClass = Class.forName("java.util.random.RandomGenerator") + private val lookup = MethodHandles.publicLookup() + private val createHandle = lookup.findStatic(rngClass, "of", MethodType.methodType(rngClass, classOf[String])) + private val intHandle = lookup.findVirtual(rngClass, "nextInt", MethodType.methodType(classOf[Int])) + private val intBoundHandle = + lookup.findVirtual(rngClass, "nextInt", MethodType.methodType(classOf[Int], classOf[Int])) + private val longHandle = lookup.findVirtual(rngClass, "nextLong", MethodType.methodType(classOf[Long])) + private val doubleHandle = lookup.findVirtual(rngClass, "nextDouble", MethodType.methodType(classOf[Double])) + private val rng = createHandle.invoke(impl) + + override def nextInt(): Int = intHandle.invoke(rng) + override def nextInt(bound: Int): Int = intBoundHandle.invoke(rng, bound) + override def nextInt(origin: Int, bound: Int): Int = { + if (origin >= bound) + throw new IllegalArgumentException("origin must be less than bound") + nextInt(bound - origin) + origin + } + override def nextLong(): Long = longHandle.invoke(rng) + override def nextDouble(): Double = doubleHandle.invoke(rng) +} + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] object RandomNumberGenerator { + + /** + * INTERNAL API. Open for testing. + */ + def createGenerator(cfg: Config) = + if (JavaVersion.majorVersion >= 17) { + cfg.getString("pekko.random.generator-implementation") match { + case "ThreadLocalRandom" => ThreadLocalRandomNumberGenerator + case impl => new Jep356RandomNumberGenerator(impl) + } + } else { + ThreadLocalRandomNumberGenerator + } + + private val generator = createGenerator(ConfigFactory.load()) + + def get(): RandomNumberGenerator = generator +} diff --git a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala index 57f5bcc6a09..fa2f9f8bf03 100644 --- a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala +++ b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala @@ -13,8 +13,6 @@ package org.apache.pekko.cluster.metrics -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import scala.annotation.nowarn @@ -29,6 +27,7 @@ import pekko.cluster.Cluster import pekko.cluster.ClusterEvent import pekko.cluster.Member import pekko.cluster.MemberStatus +import pekko.util.RandomNumberGenerator /** * Runtime collection management commands. @@ -280,7 +279,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { context.actorSelection(self.path.toStringWithAddress(address)) ! envelope def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] = - if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size))) + if (addresses.isEmpty) None else Some(addresses(RandomNumberGenerator.get().nextInt(addresses.size))) /** * Publishes to the event stream. diff --git a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala index 2d6712cae47..25396b7ecc9 100644 --- a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala +++ b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala @@ -14,7 +14,6 @@ package org.apache.pekko.cluster.metrics import java.util.Arrays -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec @@ -36,6 +35,7 @@ import pekko.cluster.routing.ClusterRouterSettingsBase import pekko.dispatch.Dispatchers import pekko.japi.Util.immutableSeq import pekko.routing._ +import pekko.util.RandomNumberGenerator /** * Load balancing of messages to cluster nodes based on cluster metric data. @@ -93,9 +93,9 @@ final case class AdaptiveLoadBalancingRoutingLogic( updateWeightedRoutees() match { case Some(weighted) => if (weighted.isEmpty) NoRoutee - else weighted(ThreadLocalRandom.current.nextInt(weighted.total) + 1) + else weighted(RandomNumberGenerator.get().nextInt(weighted.total) + 1) case None => - routees(ThreadLocalRandom.current.nextInt(routees.size)) + routees(RandomNumberGenerator.get().nextInt(routees.size)) } } diff --git a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala index 608ba412586..996fac382a2 100644 --- a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala +++ b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala @@ -15,7 +15,6 @@ package org.apache.pekko.cluster.pubsub import java.net.URLDecoder import java.net.URLEncoder -import java.util.concurrent.ThreadLocalRandom import scala.collection.immutable import scala.collection.immutable.Set @@ -40,6 +39,7 @@ import pekko.routing.Routee import pekko.routing.Router import pekko.routing.RouterEnvelope import pekko.routing.RoutingLogic +import pekko.util.RandomNumberGenerator object DistributedPubSubSettings { @@ -896,7 +896,7 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings) } def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] = - if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size))) + if (addresses.isEmpty) None else Some(addresses(RandomNumberGenerator.get().nextInt(addresses.size))) def prune(): Unit = { registry.foreach { diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala b/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala index 72624215c46..4a8111b481c 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala @@ -13,8 +13,6 @@ package org.apache.pekko.cluster -import java.util.concurrent.ThreadLocalRandom - import scala.annotation.tailrec import scala.collection.SortedSet import scala.collection.immutable @@ -25,6 +23,7 @@ import pekko.annotation.InternalApi import pekko.cluster.ClusterSettings.DataCenter import pekko.cluster.MemberStatus._ import pekko.util.ccompat._ +import pekko.util.RandomNumberGenerator /** * INTERNAL API @@ -405,16 +404,16 @@ import pekko.util.ccompat._ // don't go below the configured probability math.max((5 - localMembers) * 0.25, crossDcGossipProbability) } - ThreadLocalRandom.current.nextDouble() > probability + RandomNumberGenerator.get().nextDouble() > probability } protected def preferNodesWithDifferentView(state: MembershipState): Boolean = - ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size) + RandomNumberGenerator.get().nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size) protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = Random.shuffle(dcs) protected def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] = if (nodes.isEmpty) None - else Some(nodes(ThreadLocalRandom.current.nextInt(nodes.size))) + else Some(nodes(RandomNumberGenerator.get().nextInt(nodes.size))) } diff --git a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala index 39749f1687f..d6df47dd757 100644 --- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala +++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala @@ -15,7 +15,6 @@ package org.apache.pekko.cluster.ddata import java.security.MessageDigest import java.util.Optional -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit import java.util.function.{ Function => JFunction } @@ -66,7 +65,7 @@ import pekko.dispatch.Dispatchers import pekko.event.Logging import pekko.remote.RARP import pekko.serialization.SerializationExtension -import pekko.util.ByteString +import pekko.util.{ ByteString, RandomNumberGenerator } import pekko.util.Helpers.toRootLowerCase import pekko.util.JavaDurationConverters._ import pekko.util.ccompat._ @@ -1975,7 +1974,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (totChunks == statusTotChunks) statusCount += 1 else { - statusCount = ThreadLocalRandom.current.nextInt(0, totChunks) + statusCount = RandomNumberGenerator.get().nextInt(0, totChunks) statusTotChunks = totChunks } val chunk = (statusCount % totChunks).toInt @@ -1988,7 +1987,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def selectRandomNode(addresses: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] = - if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size))) + if (addresses.isEmpty) None else Some(addresses(RandomNumberGenerator.get().nextInt(addresses.size))) def replica(node: UniqueAddress): ActorSelection = context.actorSelection(self.path.toStringWithAddress(node.address)) diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala index a17da99b72d..2030d055420 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala @@ -14,7 +14,6 @@ package org.apache.pekko.remote.transport import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ThreadLocalRandom import scala.concurrent.{ Future, Promise } import scala.util.control.NoStackTrace @@ -28,7 +27,7 @@ import pekko.actor.{ Address, ExtendedActorSystem } import pekko.event.{ Logging, LoggingAdapter } import pekko.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener } import pekko.remote.transport.Transport._ -import pekko.util.ByteString +import pekko.util.{ ByteString, RandomNumberGenerator } @SerialVersionUID(1L) @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") @@ -75,7 +74,7 @@ private[remote] class FailureInjectorTransportAdapter( extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatchers.internalDispatcher) with AssociationEventListener { - private def rng = ThreadLocalRandom.current() + private def rng = RandomNumberGenerator.get() private val log = Logging(extendedSystem, classOf[FailureInjectorTransportAdapter]) private val shouldDebugLog: Boolean = extendedSystem.settings.config.getBoolean("pekko.remote.classic.gremlin.debug") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala index 43a05d946b6..a966a414e2e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala @@ -13,8 +13,6 @@ package org.apache.pekko.stream.impl.fusing -import java.util.concurrent.ThreadLocalRandom - import scala.concurrent.Promise import scala.util.control.NonFatal @@ -27,6 +25,7 @@ import pekko.stream._ import pekko.stream.Attributes.LogLevels import pekko.stream.snapshot._ import pekko.stream.stage._ +import pekko.util.RandomNumberGenerator /** * INTERNAL API @@ -576,7 +575,7 @@ import pekko.stream.stage._ private def dequeue(): Connection = { val idx = queueHead & mask if (fuzzingMode) { - val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask + val swapWith = (RandomNumberGenerator.get().nextInt(queueTail - queueHead) + queueHead) & mask val ev = eventQueue(swapWith) eventQueue(swapWith) = eventQueue(idx) eventQueue(idx) = ev