3

我正在尝试在 python 中使用 protobuf 读取一些数据流,并且我想使用 trio 使客户端用于读取流。protobuf 有一些方法调用,当我使用 trio 流时,我发现它们不起作用。

Linux 机器上的 Python 客户端。

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
    print(f"parent: connecting to 127.0.0.1:{addr[1]}")
    client_stream = await trio.open_tcp_stream(addr[0], addr[1])

    # encoding request
    print("parent: spawing encoding request ...")
    enc_req = create_enc_req(encoding) # construct encoding request
    await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

    log.debug('get_reponse: started')
    response = await client_stream.receive_some(1024)
    m_size = struct.unpack_from('<H', response[:2]) # the size of message
    m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
    m_body = response[4:]
    m_resp = Dtc.EncodingResponse()

m_body将是一些字节数据,我不知道如何解码。Dtc.EncodingResponse()是 protobuf 方法,它会给出一个包含可读格式响应的 Dtc 对象。(Dtc 是 protobuf 文件)。但我在这里什么也得不到。当我在没有三重奏的情况下执行此脚本时,Dtc.EncodingResponse()将以可读的格式给出完整的响应。

我猜问题是“client_stream”是一个只读取字节的三重流对象,所以我可能需要使用一个ReceiveChannel对象。但如果这是真的,我不知道该怎么做。

更新: Nathaniel J. Smith 下面的答案解决了我的问题。

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

我觉得很傻,但是我之前没有 ParseFromString 数据,仅此而已。非常感谢所有回复的人。希望这可以帮助那里的人。

4

1 回答 1

2

就像@shmee 在评论中所说的那样,我认为您的代码被编辑破坏了一些......您应该仔细检查。

当我在没有三重奏的情况下执行此脚本时,Dtc.EncodingResponse()将以可读格式给出完整的响应

我想你在切换到 Trio 时可能会掉线?Dtc.EncodingResponse()只是创建一个新的空EncodingResponse对象。如果您想将数据解析m_body到新对象中,则必须明确执行此操作,例如:

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

但是,还有另一个问题......它被调用的原因receive_some是它接收一些字节,但可能没有接收到您要求的所有字节。您的代码假设一次调用receive_some将获取响应中的所有字节,当您进行简单测试时这可能是正确的,但通常不能保证。如果您在第一次调用 时没有获得足够的数据receive_some,您可能需要不断重复调用它,直到获得所有数据。

这实际上是非常标准的......套接字的工作方式相同。这就是为什么您的服务器m_size在开始时首先发送一个字段的原因 - 这样您就可以判断您是否已获得所有数据!

不幸的是,截至 2019 年 6 月,Trio 没有为您提供执行此循环的帮助程序 - 您可以在本期跟踪这方面的进展。同时,也可以自己编写。我认为这样的事情应该有效:

async def receive_exactly(stream, count):
    buf = bytearray()
    while len(buf) < count:
        new_data = await stream.receive_some(count - len(buf))
        if not new_data:
            raise RuntimeError("other side closed the connection unexpectedly")
        buf += new data
    return buf

async def receive_encoding_response(stream):
    header = await receive_exactly(stream, 4)
    (m_size, m_type) = struct.unpack('<HH', header)
    m_body = await receive_exactly(stream, m_size)
    m_resp = Dtc.EncodingResponse()
    m_resp.ParseFromString(m_size)
    return m_resp
于 2019-06-15T06:24:49.080 回答