gossip 协议是一种处理异步、未知、未配置(随机)网络的方法,这些网络可能会遭受间歇性中断和分区,并且不存在领导者或默认结构。(请注意,这种情况在现实世界中有些不寻常,并且总是以某种方式对系统施加带外控制。)
考虑到这一点,让我们将其更改为异步系统(使用cast
),以便我们遵循闲聊八卦式通信概念的精神。
我们需要消息摘要来计算给定消息已被接收的次数,已接收且已超过幻数的消息摘要(因此,如果太晚了我们不会重新发送),并且在我们的系统中注册的进程列表,以便我们知道我们正在向谁广播:
(下面的例子是在 Erlang 中,因为自从我停止使用 Elixir 语法后,我就被它绊倒了……)
-module(rumor).
-record(s,
{peers = [] :: [pid()],
digest = #{} :: #{message_id(), non_neg_integer()},
dead = sets:new() :: sets:set(message_id())}).
-type message_id() :: zuuid:uuid().
在这里,我使用的是UUID,但它可以是任何东西。Erlang 引用对于测试用例来说很好,但是由于八卦在 Erlang 集群中没有用,并且引用在原始系统之外是不安全的,我只是跳到假设这是针对网络系统的。
我们将需要一个接口函数,它允许我们告诉进程将新消息注入系统。我们还需要一个接口函数,一旦它已经在系统中,它就会在两个进程之间发送消息。然后我们需要一个内部函数来向所有已知的(订阅的)对等点广播消息。啊,这意味着我们需要一个问候接口,以便对等进程可以相互通知它们的存在。
我们还需要一种方法来让流程告诉自己随着时间的推移继续广播。设置重传间隔多长时间实际上并不是一个简单的决定——它与网络拓扑、延迟、可变性等有关(实际上您可能偶尔会 ping 对等点并根据延迟开发一些启发式方法,丢弃那些似乎没有反应,等等——但我们不会在这里陷入那种疯狂)。在这里,我将其设置为 1 秒,因为对于观察系统的人类来说,这是一个易于解释的间隔。
请注意,以下所有内容都是异步的。
接口...
insert(Pid, Message) ->
gen_server:cast(Pid, {insert, Message}).
relay(Pid, ID, Message) ->
gen_server:cast(Pid, {relay, ID, Message}).
greet(Pid) ->
gen_server:cast(Pid, {greet, self()}).
make_introduction(Pid, PeerPid) ->
gen_server:cast(Pid, {make_introduction, PeerPid}).
最后一个功能将成为我们作为系统测试人员的方式,以使其中一个进程调用greet/1
某个目标 Pid,以便它们开始构建对等网络。在现实世界中,通常会发生一些稍微不同的事情。
在用于接收演员表的 gen_server 回调中,我们将获得:
handle_cast({insert, Message}, State) ->
NewState = do_insert(Message, State);
{noreply, NewState};
handle_cast({relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState};
handle_cast({greet, Peer}, State) ->
NewState = do_greet(Peer, State),
{noreply, NewState};
handle_cast({make_introduction, Peer}, State) ->
NewState = do_make_introduction(Peer, State),
{noreply, NewState}.
很简单的东西。
上面我提到我们需要一种方法让这个东西告诉自己在延迟后重新发送。为此,我们将在延迟使用后向“redo_relay”发送一条裸消息,erlang:send_after/3
因此我们需要一个 handle_info/2 来处理它:
handle_info({redo_relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState}.
消息位的实现是有趣的部分,但这些都不是非常棘手的。请原谅do_relay/3
下面的内容——它可能更简洁,但我是在头顶上的浏览器中写这个的,所以......
do_insert(Message, State = #s{peers = Peers, digest = Digest}) ->
MessageID = zuuid:v1(),
NewDigest = maps:put(MessageID, 1, Digest),
ok = broadcast(Message, Peers),
ok = schedule_resend(MessageID, Message),
State#s{digest = NewDigest}.
do_relay(ID,
Message,
State = #s{peers = Peers, digest = Digest, dead = Dead}) ->
case maps:find(ID, Digest) of
{ok, Count} when Count >= 10 ->
NewDigest = maps:remove(ID, Digest),
NewDead = sets:add_element(ID, Dead),
ok = broadcast(Message, Peers),
State#s{digest = NewDigest, dead = NewDead};
{ok, Count} ->
NewDigest = maps:put(ID, Count + 1),
ok = broadcast(ID, Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest};
error ->
case set:is_element(ID, Dead) of
true ->
State;
false ->
NewDigest = maps:put(ID, 1),
ok = broadcast(Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest}
end
end.
broadcast(ID, Message, Peers) ->
Forward = fun(P) -> relay(P, ID, Message),
lists:foreach(Forward, Peers).
schedule_resend(ID, Message) ->
_ = erlang:send_after(1000, self(), {redo_relay, ID, Message}),
ok.
现在我们需要社交位...
do_greet(Peer, State = #s{peers = Peers}) ->
case lists:member(Peer, Peers) of
false -> State#s{peers = [Peer | Peers]};
true -> State
end.
do_make_introduction(Peer, State = #s{peers = Peers}) ->
ok = greet(Peer),
do_greet(Peer, State).
那么上面那些可怕的没有类型的东西是做什么的呢?
它避免了任何死锁的可能性。死锁在对等系统中如此致命的原因是,只要您有两个相同的进程(或参与者,或其他)同步通信,您就创建了一个潜在死锁的教科书案例。
任何时候A
都有一个同步消息指向B
并且B
有一个同步消息指向A
同时你现在有一个死锁。没有办法创建同步调用彼此的相同进程而不会产生潜在的死锁。在大规模并发系统中,几乎肯定会发生的任何事情最终都会发生,因此您迟早会遇到这种情况。
Gossip 旨在异步是有原因的:它是一种草率、不可靠、低效的方式来处理草率、不可靠、低效的网络拓扑。尝试拨打电话而不是强制转换不仅违背了八卦式消息中继的目的,而且还会将您推入不可能的死锁领域事件,以将协议的性质从异步更改为同步。