2

我正在使用Haskell 管道包

我正在尝试使用管道并发将生产者列表合并在一起。

我想要达到的是:

merge :: MonadIO m => [Producer a m ()] -> Producer a m ()

所以给定一个生产者 s1 和另一个生产者 s2: r = merge [s1, s2] 这将给出以下行为:

s1 --1--1--1--|
s2 ---2---2---2|
r  --12-1-21--2|

按照教程页面中的代码,我想出了:

mergeIO :: [Producer a IO ()] -> Producer a IO ()
mergeIO producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    fork :: Output a -> Producer a IO () -> IO ()
    fork output producer = void $ forkIO $ do runEffect $ producer >-> toOutput output
                                              performGC

按预期工作。

但是,我很难概括事物。

我的尝试:

merge :: (MonadIO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    runEffectIO :: Monad m => Effect m r -> IO (m r)
    runEffectIO e = do
        x <- evaluate $ runEffect e
        return x
    fork output producer = forkIO $ do runEffectIO $ producer >-> toOutput output
                                       performGC

不幸的是,这可以编译,但并没有做太多其他事情。我猜我弄得一团糟runEffectIO。我目前的其他方法runEffectIO没有产生更好的结果。

该程序:

main = do
    let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
    _ <- runEffect $ producer >-> taker 20
  where repeater :: Int -> Int -> Producer Int IO r
        repeater val delay = forever $ do
            lift $ threadDelay delay
            yield val
        taker :: Int -> Consumer Int IO ()
        taker 0 = return ()
        taker n = do
            val <- await
            liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
            taker $ n - 1

命中val <- await但没有到达,liftIO $ putStrLn因此它不会产生任何输出。但是它没有挂起就可以正常退出。

当我代替然后mergeIO程序merge运行时,我希望输出 20 行。

4

3 回答 3

3

虽然MonadIO对于此操作还不够,但MonadBaseControl(来自monad-control)旨在允许在基本 monad 内嵌入任意变压器堆栈。配套软件包提升基础提供了一个fork适用于变压器堆栈的版本。我在下面的 Gist 中汇总了一个使用它来解决您的问题的示例,尽管主要的魔力是:

import qualified Control.Concurrent.Lifted as L
fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
fork output producer = L.fork $ do
    runEffect $ producer >-> toOutput output
    liftIO performGC

请注意,您应该了解以这种方式处理一元状态会发生什么:对子线程中执行的任何可变状态的修改将仅与这些子线程隔离。换句话说,如果你使用 a StateT,每个子线程都会从它被派生时在上下文中的相同状态值开始,但是你会有许多不同的状态,它们不会相互更新。

Yesod 书中有一个关于 monad-control 的附录,尽管坦率地说它有点过时了。我只是不知道任何最近的教程。

于 2014-03-12T04:37:37.883 回答
2

问题似乎是您对 的使用evaluate,我认为它是evaluatefrom Control.Exception

您似乎正在使用它来将通用 monad 中的值“转换”mIO,但实际上并不是这样。您只是从中获取m值,Effect然后将其返回内部IO而不实际执行它。以下代码不打印“foo”:

evaluate (putStrLn "foo") >> return ""

也许您的merge函数可以将函数作为附加参数,m a -> IO a以便merge知道如何将结果runEffect带入IO.

于 2014-03-11T20:26:01.593 回答
0

不幸的是,您不能Producer使用MonadIO基本单子(或任何MonadIO计算)来分叉 a。您需要特别包含运行所有其他 monad 转换器以获取IO操作所需的逻辑,然后才能分叉计算。

于 2014-03-12T02:19:24.877 回答