它与调度程序的“吞吐量”设置有关。我在 application.conf 中添加了一个“公平调度程序”来证明这一点:
fair-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
这是您的示例,其中进行了一些修改,以对 Futures 使用公平调度程序并打印吞吐量设置的当前值:
package com.test
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
println("Default dispatcher throughput:")
println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))
println("Fair dispatcher throughput:")
println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
输出:
Default dispatcher throughput:
5
Fair dispatcher throughput:
1
Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
Start Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
Started Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 4 finished on thread Experimental-fair-dispatcher-15
Future 2 finished on thread Experimental-fair-dispatcher-13
Future 1 finished on thread Experimental-fair-dispatcher-12
Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 5 finished on thread Experimental-fair-dispatcher-17
Future 7 finished on thread Experimental-fair-dispatcher-18
Future 8 finished on thread Experimental-fair-dispatcher-19
Future 6 finished on thread Experimental-fair-dispatcher-16
Future 3 finished on thread Experimental-fair-dispatcher-14
Future 9 finished on thread Experimental-fair-dispatcher-15
如您所见,fair-dispatcher 在大多数期货中使用不同的线程。
默认调度程序针对参与者进行了优化,因此吞吐量设置为 5 以最小化上下文切换,从而提高消息处理吞吐量,同时保持一定程度的公平性。
我的公平调度程序的唯一变化是吞吐量:1,即如果可能,每个异步执行请求都有自己的线程(最多并行度最大)。
我建议为用于不同目的的期货创建单独的调度程序。例如,一个调度程序(即线程池)用于调用某些 Web 服务,另一个用于阻止 DB 访问等。通过调整自定义调度程序设置,您可以更精确地控制它。
看看https://doc.akka.io/docs/akka/current/dispatchers.html,它对于理解细节非常有用。
另请查看 Akka 参考设置(特别是默认调度程序),那里有很多有用的评论:https ://github.com/akka/akka/blob/master/akka-actor/src/main/resources /reference.conf