我已经使用 Akka 有一段时间了,但现在正在深入探索它的演员系统。我知道有线程轮询执行器和分叉连接执行器和 afinity 执行器。我知道调度程序是如何工作的以及所有其他细节。顺便说一句,这个链接给出了很好的解释
https://scalac.io/improving-akka-dispatchers
然而,当我尝试一个简单的调用actor并切换执行上下文时,我总是得到大致相同的性能。我同时运行 60 个请求,平均执行时间约为 800 毫秒,只需将简单的字符串返回给调用者。
我在具有 8 核(英特尔 i7 处理器)的 MAC 上运行。
所以,这里是我尝试的执行上下文:
thread-poll {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 10
}
fork-join {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 8
}
throughput = 100
}
pinned {
type = Dispatcher
executor = "affinity-pool-executor"
}
所以,问题是:
- 在这个例子中是否有机会获得更好的表现?
- 演员实例的全部内容是什么?这有多重要,如果我们知道调度程序正在调度线程(使用执行上下文)以在来自 Actor 邮箱的下一条消息上执行该线程内的 Actor 的接收方法。演员接收方法不只是像回调吗?演员实例的数量何时开始发挥作用?
- 我有一些正在执行 Future 的代码,如果我直接从主文件运行该代码,它的执行速度比我将其放入 actor 并从 actor 执行 Future 将其结果通过管道传送给发送者时快 100-150 毫秒。是什么让它变慢了?
如果你有一些现实世界的例子来解释这一点,那就太受欢迎了。我读了一些文章,但都是理论上的。如果我在一个简单的例子上尝试一些东西,我会在性能方面得到一些意想不到的结果。
这是一个代码
object RedisService {
case class Get(key: String)
case class GetSC(key: String)
}
class RedisService extends Actor {
private val host = config.getString("redis.host")
private val port = config.getInt("redis.port")
var currentConnection = 0
val redis = Redis()
implicit val ec = context.system.dispatchers.lookup("redis.dispatchers.fork-join")
override def receive: Receive = {
case GetSC(key) => {
val sen = sender()
sen ! ""
}
}
}
呼叫者:
val as = ActorSystem("test")
implicit val ec = as.dispatchers.lookup("redis.dispatchers.fork-join")
val service = as.actorOf(Props(new RedisService()), "redis_service")
var sumTime = 0L
val futures: Seq[Future[Any]] = (0 until 4).flatMap { index =>
terminalIds.map { terminalId =>
val future = getRedisSymbolsAsyncSCActor(terminalId)
val s = System.currentTimeMillis()
future.onComplete {
case Success(r) => {
val duration = System.currentTimeMillis() - s
logger.info(s"got redis symbols async in ${duration} ms: ${r}")
sumTime = sumTime + duration
}
case Failure(ex) => logger.error(s"Failure on getting Redis symbols: ${ex.getMessage}", ex)
}
future
}
}
val f = Future.sequence(futures)
f.onComplete {
case Success(r) => logger.info(s"Mean time: ${sumTime / (4 * terminalIds.size)}")
case Failure(ex) => logger.error(s"error: ${ex.getMessage}")
}
该代码非常基本,只是为了测试它的行为方式。