1

在文档中的 RubyIO.pipe示例中,一条消息通过进程传递。

我想做类似的事情,但有两个不同:

  1. 使用线程而不是进程
  2. 使用管道进行持续消息传递,而不是一次性消息

这是显而易见但不起作用的代码:

rd, wr = IO.pipe

reader_thread = Thread.new(rd) do |rd|
  data_received = rd.read
  puts "Read: #{data_received.inspect}"
end

write_thread = Thread.new(wr) do |wr|
  wr.write "Message"
  wr.flush
end

write_thread.join
reader_thread.join

这导致reader_thread挂起rd.read

我可以通过使用使其工作IO#read_nonblock

reader_thread = Thread.new(rd) do |rd|
  data_received = \
    begin
      rd.read_nonblock(100)
    rescue IO::WaitReadable, IO::EAGAINWaitReadable
      IO.select([rd])
      retry
    end

  puts "Read: #{data_received.inspect}"
end

这是正确的模式吗?还是使用IO.pipe错误的工具进行线程消息传递?

4

2 回答 2

0

您还可以使用 aQueue在多个线程之间安全地交换信息:

q = Queue.new

reader_thread = Thread.new(q) do |q|
  data_received = q.pop
  puts "Read: #{data_received.inspect}"
end

write_thread = Thread.new(q) do |q|
  q.push "Message"
end

write_thread.join
reader_thread.join
于 2020-04-29T22:20:21.583 回答
0

您的阅读器线程挂起,因为没有参数,IO.read将读取 - 并阻止 - 直到遇到 EOF。(如果您传递 a length,它将一直读取,直到它读取那么多字节或 EOF,以先发生者为准,因此它仍然会阻塞,直到它至少获得那么多输入。)这在IO.pipedocs中有详细说明。

如果你wd.close之前打电话reader_thread.joinread会得到那个EOF,你会得到你的输出——所有的一次,当read解除阻塞时。

在现实场景中,您可能不想只读取一次,您可能希望循环直到rd遇到 EOF,沿途对数据进行处理。最简单的事情就是一次读取一个字节,使用read(1). (为了简单起见,我省略了单独的编写器线程——你也应该这样做,除非你真的需要三个单独的指令流;通常你会想要一个后台读取器线程或一个后台编写器线程,主线程处理另一端——但行为基本相同。

text = <<~TEXT.strip
  Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
  sed do eiusmod tempor incididunt ut labore et dolore magna 
  aliqua.
TEXT

read_io, write_io = IO.pipe

reader_thread = Thread.new(read_io) do |io|
  puts('Reading:')
  while (c = io.read(1)) # block till we read one byte
    $stdout.write(c)
  end
  puts('...Done.')
end

# Write 50 chars/second, so we can see them get read one at a time
text.chars.each { |c| write_io.write(c); sleep(0.02) } 

reader_thread.join

# => Reading:
#    Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
#    sed do eiusmod tempor incididunt ut labore et dolore magna 
#    aliqua.

但是,这仍然挂起,因为IO.read(1)仍在等待那个 EOF,所以再次,您需要关闭write_io.

此外,逐字节读取通常效率不高。实际上,您可能需要8K缓冲区,甚至更大,具体取决于您的用例。

reader_thread = Thread.new(read_io) do |io|
  puts('Reading:')
  while (c = io.read(8192))
    $stdout.write(c)
  end
  puts('...Done.')
end

# We're writing 50 chars/second, but we won't see them print out
# till `read_io` has read 8192 bytes, or hit an EOF
text.chars.each { |c| write_io.write(c); sleep(0.02) }

write_io.close      # we have to close `write_io` *sometime* --
reader_thread.join  # -- or this will hang.

# => Reading:
#    Lorem ipsum dolor sit amet, consectetur adipiscing elit, 
#    sed do eiusmod tempor incididunt ut labore et dolore magna 
#    aliqua....Done.
于 2021-02-09T19:54:20.413 回答