0

我想让 warp 运行一个进程,然后用该进程的输出响应。假设输出大于服务器的 RAM;加载整个输出然后响应不是一种选择。我以为我可以使用类似的东西来完成此任务

withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)

responseSource用途ConduitT i (Flush Builder) IO ()createSource用途ConduitT i ByteString m ()。我不知道如何将ByteString管道转换为Flush Builder管道。


所以我设计了一个似乎可行的解决方案,但遗憾的是它的定义不那么简单:

responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->                                                                                                                                
     withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
         let loop = do
             bs <- hGetSome h defaultChunkSize
             unless (BS.null bs) (send (byteString bs) *> flush *> loop)
         in loop *> hClose h 

. 这有必要吗,即使我可以尝试通过包裹mkStreamSpec或其他方式来美化它?还是我缺少更简单的方法?


编辑:对解决方案的评论:

intersperseC让我ChunkFlush一起使用。这解决了Flush Builder/ByteString转换问题。我还没有测试过它,但它看起来是正确的,我相信它已经被使用过。

然而,我发现

withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
    responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)

过早关闭进程句柄。因此我需要自己管理管道:使用createPipe而不是createSource. 但这意味着我需要hClose在最后调用,这意味着我需要一个返回的响应处理程序IO ();唯一能做的(除了responseRaw)是responseStream,它StreamingBody用作Conduit的替代品。因此,我得出结论,需要我的原始解决方案,并且 Conduit 不能用于流式处理。如果不正确,请随时纠正。

4

1 回答 1

0

responseSource有类型

responseSource :: 状态 -> ResponseHeaders -> 源 IO (Flush Builder) -> 响应

的定义Flush

数据刷新 a = 块 a | 冲洗

也就是说,type 的值Flush Builder要么是 a 要么Builder是指示warp刷新输出流的命令。

Builder来自二进制包。它基本上是一大块字节的表示,针对高效连接进行了优化。它可以从 a 构造ByteString,使用fromByteString函数。

知道了这一点,并使用mapCfrom管道,我们可以定义这个适配器:

adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) 

但是有一个问题,适配器从不刷新。但是我们可以通过以下方式散布flusing命令intersperseC

adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) .| intersperseC Flush

如果我们不想在每个块之后刷新怎么办?也许我们可以chunksOfCE在将字节块转换为Flush值之前对它们进行分组。

于 2019-09-04T20:57:39.880 回答