0

我已经使用 Akka 有一段时间了,但现在正在深入探索它的演员系统。我知道有线程轮询执行器和分叉连接执行器和 afinity 执行器。我知道调度程序是如何工作的以及所有其他细节。顺便说一句,这个链接给出了很好的解释

https://scalac.io/improving-akka-dispatchers

然而,当我尝试一个简单的调用actor并切换执行上下文时,我总是得到大致相同的性能。我同时运行 60 个请求,平均执行时间约为 800 毫秒,只需将简单的字符串返回给调用者。

我在具有 8 核(英特尔 i7 处理器)的 MAC 上运行。

所以,这里是我尝试的执行上下文:

thread-poll {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 32
  }
  throughput = 10
}

fork-join {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 1
    parallelism-max = 8
  }
  throughput = 100
}

pinned {
  type = Dispatcher
  executor = "affinity-pool-executor"
}

所以,问题是:

  1. 在这个例子中是否有机会获得更好的表现?
  2. 演员实例的全部内容是什么?这有多重要,如果我们知道调度程序正在调度线程(使用执行上下文)以在来自 Actor 邮箱的下一条消息上执行该线程内的 Actor 的接收方法。演员接收方法不只是像回调吗?演员实例的数量何时开始发挥作用?
  3. 我有一些正在执行 Future 的代码,如果我直接从主文件运行该代码,它的执行速度比我将其放入 actor 并从 actor 执行 Future 将其结果通过管道传送给发送者时快 100-150 毫秒。是什么让它变慢了?

如果你有一些现实世界的例子来解释这一点,那就太受欢迎了。我读了一些文章,但都是理论上的。如果我在一个简单的例子上尝试一些东西,我会在性能方面得到一些意想不到的结果。

这是一个代码

object RedisService {
  case class Get(key: String)
  case class GetSC(key: String)
}

class RedisService extends Actor {
  private val host = config.getString("redis.host")
  private val port = config.getInt("redis.port")

  var currentConnection = 0

  val redis = Redis()

  implicit val ec = context.system.dispatchers.lookup("redis.dispatchers.fork-join")

  override def receive: Receive = {
    case GetSC(key) => {
      val sen = sender()

      sen ! ""
    }
  }
}

呼叫者:

    val as = ActorSystem("test")
    implicit val ec = as.dispatchers.lookup("redis.dispatchers.fork-join")

    val service = as.actorOf(Props(new RedisService()), "redis_service")

    var sumTime = 0L
    val futures: Seq[Future[Any]] = (0 until 4).flatMap { index =>
      terminalIds.map { terminalId =>
        val future = getRedisSymbolsAsyncSCActor(terminalId)

        val s = System.currentTimeMillis()
        future.onComplete {
          case Success(r) => {
            val duration = System.currentTimeMillis() - s
            logger.info(s"got redis symbols async in ${duration} ms: ${r}")
            sumTime = sumTime + duration
          }
          case Failure(ex) => logger.error(s"Failure on getting Redis symbols: ${ex.getMessage}", ex)
        }

        future
      }
    }

    val f = Future.sequence(futures)


    f.onComplete {
      case Success(r) => logger.info(s"Mean time: ${sumTime / (4 * terminalIds.size)}")
      case Failure(ex) => logger.error(s"error: ${ex.getMessage}")
    }

该代码非常基本,只是为了测试它的行为方式。

4

1 回答 1

3

我有点不清楚你具体要问什么,但我会试一试。

如果您的调度程序(并且,如果参与者正在做的是 CPU/内存与 IO 绑定,实际可用内核数(请注意,虚拟化越多,这会变得越模糊(谢谢,超额订阅主机 CPU ... )和容器化(感谢基于共享和配额的 cgroup 限制)发挥作用))允许m个参与者同时处理,并且您很少/永远不会有超过n 个参与者处理消息(m > n),试图通过调度程序设置增加并行度不会为您带来任何好处。(请注意,在前面,调度器上调度的任何任务,例如Future回调,实际上与参与者相同)。

上一段中的n显然最多是应用程序/调度程序中的参与者数量(取决于我们想要查看的范围:我会注意到每个调度程序超过两个(一个用于不阻塞的参与者和期货)对于那些这样做的人来说)更强烈的气味(如果在 Akka 2.5 上,围绕默认调度程序设置调整一些 2.6 更改并在他们自己的调度程序中运行诸如远程处理/集群之类的东西,这样他们就不会挨饿了,这可能是一个不错的主意出;另请注意,Alpakka Kafka 默认使用自己的调度程序:我不会将它们与这两者相提并论),因此通常更多的参与者意味着更多的并行性意味着更多的核心利用率。相对于线程而言,参与者相对便宜,因此大量他们不是一个值得关注的大问题。

单例参与者(无论是在节点还是集群(或者甚至在非常极端的情况下,实体)级别)可以做很多事情来限制整体并行性和吞吐量:一次一条消息的限制可能是一个非常有效的限制(有时这就是你想要的,但通常不是)。所以不要害怕创建做一件高级事情的短命演员(他们肯定可以处理不止一条消息)然后停止(请注意,许多简单的情况可以通过稍微更轻量级的方式完成期货)。如果他们正在与一些外部服务交互,让他们成为路由器actor的孩子,如果现有的都忙(等等),它会产生新的孩子:这个路由器是一个单例,但只要它不不要花很多时间处理任何消息,它限制系统的机会很低。您的RedisService可能是这类事情的好人选。

另请注意,性能和可伸缩性并不总是相同的,提高一个会削弱另一个。Akka 通常有点愿意以小的性能来换取大的性能下降。

于 2020-07-22T19:21:42.693 回答