7

当我们尝试从参与者的接收方法中启动多个期货时,我们观察到了一种奇怪的行为。如果我们将配置的调度程序用作 ExecutionContext,则期货在同一个线程上按顺序运行。如果我们使用 ExecutionContext.Implicits.global,则期货按预期并行运行。

我们将代码归结为以下示例(更完整的示例如下):

implicit val ec = context.getDispatcher

Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }

Future {
   Future{ doWork() } 
   Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
   Future{ doWork() }
   Future{ doWork() }
}

一个可编译的例子是这样的:

import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")   

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(startFuture)
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }


}

我们尝试了 thread-pool-executor 和 fork-join-executor,结果相同。

我们是否以错误的方式使用期货?那么你应该如何产生并行任务呢?

4

3 回答 3

3

从 Akka 内部的描述BatchingExecutor(重点是我的):

Executor 的 Mixin 特征,它将多个嵌套Runnable.run()调用组合成一个传递给原始 Executor 的 Runnable。这可能是一个有用的优化,因为它绕过原始上下文的任务队列并将相关(嵌套)代码保留在单个线程上,这可能会提高 CPU 亲和力。但是,如果传递给 Executor 的任务是阻塞的或代价高昂的,这种优化可以防止工作窃取并使性能变得更糟......如果代码scala.concurrent.blocking在应该使用的时候不使用,批处理执行器可能会造成死锁,因为在其他任务中创建的任务会阻止外部任务完成。

如果您使用的调度程序混合了BatchingExecutor-- 即 -- 的子类 --您MessageDispatcher可以使用该scala.concurrent.blocking构造来启用嵌套期货的并行性:

Future {
  Future {
    blocking {
      doBlockingWork()
    }
  }
}

在您的示例中,您将添加blocking以下startFuture方法:

private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
  blocking {
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

startFutures(true)(actorSystem.dispatcher)使用上述更改运行的示例输出:

Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
于 2018-04-09T13:59:06.337 回答
0

它与调度程序的“吞吐量”设置有关。我在 application.conf 中添加了一个“公平调度程序”来证明这一点:

fair-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 1
}

这是您的示例,其中进行了一些修改,以对 Futures 使用公平调度程序并打印吞吐量设置的当前值:

package com.test

import akka.actor.ActorSystem

import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")

  println("Default dispatcher throughput:")
  println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))

  println("Fair dispatcher throughput:")
  println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

输出:

Default dispatcher throughput:
5
Fair dispatcher throughput:
1
Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
Start Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
Started Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 4 finished on thread Experimental-fair-dispatcher-15
Future 2 finished on thread Experimental-fair-dispatcher-13
Future 1 finished on thread Experimental-fair-dispatcher-12
Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 5 finished on thread Experimental-fair-dispatcher-17
Future 7 finished on thread Experimental-fair-dispatcher-18
Future 8 finished on thread Experimental-fair-dispatcher-19
Future 6 finished on thread Experimental-fair-dispatcher-16
Future 3 finished on thread Experimental-fair-dispatcher-14
Future 9 finished on thread Experimental-fair-dispatcher-15

如您所见,fair-dispatcher 在大多数期货中使用不同的线程。

默认调度程序针对参与者进行了优化,因此吞吐量设置为 5 以最小化上下文切换,从而提高消息处理吞吐量,同时保持一定程度的公平性。

我的公平调度程序的唯一变化是吞吐量:1,即如果可能,每个异步执行请求都有自己的线程(最多并行度最大)。

我建议为用于不同目的的期货创建单独的调度程序。例如,一个调度程序(即线程池)用于调用某些 Web 服务,另一个用于阻止 DB 访问等。通过调整自定义调度程序设置,您可以更精确地控制它。

看看https://doc.akka.io/docs/akka/current/dispatchers.html,它对于理解细节非常有用。

另请查看 Akka 参考设置(特别是默认调度程序),那里有很多有用的评论:https ://github.com/akka/akka/blob/master/akka-actor/src/main/resources /reference.conf

于 2018-04-07T17:38:39.633 回答
0

经过一些研究,我发现Dispatcher该类实现了akka.dispatch.BatchingExecutor. 出于性能原因,此类检查应在同一线程上批处理哪些任务。Future.map在内部创建一个scala.concurrent.OnCompleteRunnableBatchingExecutor.

map()这对于/其中一项任务生成一项后续任务似乎是合理的flatMap(),但对于用于分叉工作的显式新 Futures 则不合理。在内部,Future.apply由实现Future.successful().map并因此被批处理。我现在的解决方法是以不同的方式创建期货:

object MyFuture {
  def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
    val promise = Promise[T]()
    class FuturesStarter extends Runnable {
      override def run(): Unit = {
        promise.complete(Try(body))
      }
    }
    executor.execute(new FuturesStarter)
    promise.future
  }
}

-RunnablesFutureStarter不是批处理的,因此是并行运行的。

任何人都可以确认此解决方案可以吗?有没有更好的方法来解决这个问题?是Future/BatchingExecutor想要的当前实现,还是一个错误?

于 2018-04-09T08:23:35.363 回答