我正在尝试修改老师模块中的功能,以从 Twitter 的 Streaming API 中保存推文。问题是,如果我将流打开超过一分钟,API 会开始向流发送空行以验证其连接。
据我了解,推文被分解然后重新组合,但这成为下面代码中最后一个函数pop_size的问题。
Pop_size 期望参数以数字的文本表示形式开始。因此,当发送由<<"\r\n">>表示的空行时,流会因以下消息而崩溃:
Error in process <0.118.0> with exit value: {function_clause,
[{twitterminer_source,pop_size,[<<2 bytes>>],[{file,"src/twitterminer_source.erl"}
如果我添加下面的行,有没有人有任何建议我应该尝试在其中执行什么?
pop_size(<<"\r\n">>) -> %%Code here!
澄清一下:我希望空白行被忽略,循环继续检查下一条推文。我在这里有点不知所措,但我会尽可能详细地回答任何后续问题。
代码:(三个功能)
% Get HTTP chunks and reassemble them into chunks that we get
% as a result of specifying delimited=length.
% https://dev.twitter.com/streaming/overview/processing
split_loop(Sink, Sender, Buffer) ->
case pop_size(Buffer) of
{size, N, Rest} ->
case buffer_pop_n(Rest, N, Sender) of
{pop, Chunk, NewBuf} ->
Sink ! {message, Chunk},
receive next -> ok end,
split_loop(Sink, Sender, NewBuf);
{incomplete, Chunk} -> Sink ! {error, {incomplete, Chunk}};
{terminate, _Chunk} -> Sink ! terminate;
{error, Reason, Chunk} -> Sink ! {error, {Reason, Chunk}}
end;
{more, L} ->
case buffer_pop_n(Buffer, L, Sender) of
{pop, Chunk, NewBuf} ->
split_loop(Sink, Sender, <<Chunk/binary, NewBuf/binary>>);
{incomplete, <<>>} -> Sink ! finished;
{incomplete, Chunk} -> Sink ! {error, {incomplete, Chunk}};
{terminate, _Chunk} -> Sink ! terminate;
{error, Reason, Chunk} -> Sink ! {error, {Reason, Chunk}}
end
end.
% Get a chunk of N bytes from the buffer. If there is not enough data
% in the buffer, get more messages from the pipeline.
buffer_pop_n(B, N, Sender) ->
if
byte_size(B) < N ->
Sender ! next,
receive
{message, Part} ->
Part2 = Part,
buffer_pop_n(<<B/binary, Part2/binary>>, N, Sender);
finished -> {incomplete, B};
terminate -> {terminate, B};
{error, Reason} -> {error, Reason, B}
end;
true -> {pop, binary:part(B, {0, N}), binary:part(B, {N, byte_size(B)-N})}
end.
% We should also support discarding \r\n here
% (see 'blank lines' in https://dev.twitter.com/streaming/overview/messages-types)
pop_size(<<>>) -> {more, 1};
pop_size(<<A,Rest/binary>>) when A >= $0, A =< $9 ->
pop_size((A - $0), 1, Rest);
pop_size(_N, L, <<>>) -> {more, L+1};
pop_size(_N, L, <<"\r">>) -> {more, L+2};
pop_size(N, L, <<A,Rest/binary>>) when A >= $0, A =< $9 ->
pop_size(N * 10 + (A - $0), L+1, Rest);
pop_size(N, _L, <<"\r\n",Rest/binary>>) -> {size, N, Rest}.