我对LMAX Disruptor有一些经验,我真的很想使用 Disruptor 实现一个自定义演员邮箱。
有什么指导方针吗?甚至可能吗?Akka的actor邮箱有什么限制?
我对LMAX Disruptor有一些经验,我真的很想使用 Disruptor 实现一个自定义演员邮箱。
有什么指导方针吗?甚至可能吗?Akka的actor邮箱有什么限制?
正如这里所说,您只需要实现一些方法 - 当然您应该使用指向环形缓冲区的指针直接写入/读取消息。您还应该记住:
破坏者通常会预先分配大量内存,因此每个参与者使用一个破坏者是个坏主意,您可以将一个路由器参与者(内部带有破坏者)与BalancingPool
.
如果你想有不同的消息类型消费,不同的消费者进行日志记录,修复等 - 你应该将不同的 RingBufferPointer(类似 smthng)实例作为参数传递给你的邮箱(日志的起始值相同,不同的起始值不同消息类型),但仍使用一个 Disruptor。因此,不同的邮箱将引用一个破坏者。
您将失去对消息创建、提取等的低级控制,因此默认情况下不会进行批量分配。
您还可以使用 ring 中的历史记录来恢复失败的 actor 的状态(在主管中preRestart
或主管中)。
LMAX 说:
它的工作方式与更传统的方法不同,因此您使用它的方式与您可能习惯的方式略有不同。例如,将模式应用到您的系统并不像用魔术环缓冲区替换所有队列那么简单。我们有代码示例来指导您,越来越多的博客和文章概述了它是如何工作的,技术论文按照您的期望进行了详细介绍,性能测试提供了如何使用 Disruptor 的示例 http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html
这是一个简短的 Queues/Disruptors/Actors比较
在伪scala代码中,它将类似于:
object MyUnboundedMailbox {
val buffer = new RingBuffer()
class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then
}
def dequeue(): Envelope = readerPointer.poll()
def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
}
trait MyUnboundedMessageQueueSemantics
}
class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
import MyUnboundedMailbox._
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue = {
val pointer = ring.newPointer
val read = pointer.copy
val write = pointer.copy
new MyMessageQueue(pointer, read, write)
}
// you may use another strategy here based on owner (you can access name and path here),
// so for example may allocate same pointers for same prefixes in the name or path
}
您可以在故障恢复期间使用未更改的 MyMessageQueue.startPointer 访问消息日志(您也可以查看 akka 的Event Sourcing进行类比)。
使用 UnboundedQueue 方法并不能保证此处的消息传递,因为如果环“结束”,非常旧的未传递消息可能会被新版本覆盖,因此您可能需要 BoundedQueue,就像这里一样。