26

问题

你好!我正在编写一个日志库,我很想创建一个记录器,它将在单独的线程中运行,而所有应用程序线程只会向它发送消息。我想为这个问题找到最有效的解决方案。我在这里需要简单的未绑定队列。

方法

我创建了一些测试来查看可用解决方案的性能,我在这里得到了非常奇怪的结果。我基于以下内容测试了 4 个实现(下面提供了源代码):

  1. 管道并发
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. 如“Haskell 中的并行和并发编程”一书中所述,基于 MVar请注意,这种技术为我们提供了容量为 1 的有界队列 - 它仅用于测试

测试

以下是用于测试的源代码:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

您可以编译它ghc -O2 Main.hs并运行它。测试创建了 20 个消息生产者,每个生产者产生 1000000 条消息。

结果

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

问题

我很想问你为什么管道并发版本的执行速度如此之慢,以及为什么它甚至比基于 chan 的版本还要慢得多。我很惊讶,MVar 是所有版本中最快的——谁能告诉更多,为什么我们会得到这个结果,以及在任何情况下我们是否可以做得更好?

4

2 回答 2

19

Chan所以我可以给你一些对和TQueue(这里内部使用的)的分析的一些概述,这些分析pipes-concurrency激发了一些进入unagi-chan. 我不确定它是否会回答你的问题。我建议在进行基准测试时分叉不同的队列并尝试变化,以便真正了解正在发生的事情。

Chan好像:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

这是一个 s 的链表MVar。类型中的两个MVarsChan分别充当指向列表当前头部和尾部的指针。这是写的样子:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

在 1 时,作者锁定了写入端,在 2 时我们的项目a可供读者使用,在 3 时,写入端为其他作者解锁。

这实际上在单一消费者/单一生产者场景中表现得非常好(参见此处的图表),因为读取和写入不会竞争。但是一旦你有多个并发作者,你就会开始遇到麻烦:

  • 一个写入 1 而另一个写入 2 的写入将阻塞并被取消调度(我能够测量上下文切换的最快速度约为 150ns(非常快);可能在某些情况下它会慢得多)。因此,当您让许多作者争辩时,您基本上是在通过调度程序进行一次大往返,进入等待队列,MVar然后最终写入可以完成。

  • 当一个写入器在 2 时被取消调度(因为它超时),它会持有一个锁,并且在可以再次重新调度之前不允许写入完成;当我们超额订阅时,这会成为一个更大的问题,即当我们的线程/核心比率很高时。

最后,使用MVar-per-item 在分配方面需要一些开销,更重要的是,当我们积累许多可变对象时,我们会造成很大的 GC 压力。

队列

TQueue很棒,因为STM它使推理其正确性变得超级简单。这是一个功能性的出列式队列,write由简单地读取写入器堆栈、consing 我们的元素并将其写回组成:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

如果在 awriteTQueue写回其新堆栈后,另一个交错写入执行相同操作,则其中一个写入将被重试。随着更多writeTQueue的 s 被交错,竞争的影响变得更糟。然而,性能下降的速度比 in 慢得多,Chan因为只有一个writeTVar操作可以使竞争writeTQueues 无效,并且事务非常小(只有 read 和 a (:))。

读取的工作原理是从写入端“出列”堆栈,反转它,并将反转的堆栈存储在自己的变量中以便于“弹出”(总而言之,这给了我们摊销的 O(1) 推送和弹出)

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

读者对作者有一个对称的适度争论问题。在一般情况下,读者和作者不会竞争,但是当阅读器堆栈耗尽时,读者会与其他读者和作者竞争。我怀疑如果您预先加载了TQueue足够多的值,然后启动了 4 个读取器和 4 个写入器,您可能会引发活锁,因为在下一次写入之前反向难以完成。值得注意的是,与 with 不同MVar,对 a 的写入会TVar同时唤醒许多读者(这可能或多或少,具体取决于场景)。

我怀疑您在测试中没有看到太多的弱点TQueue;主要是您看到了写入争用的适度影响以及大量分配和 GC'ing 大量可变对象的开销。

鳗鱼酱

unagi-chan最初是为了很好地处理争用而设计的。它在概念上非常简单,但实现有一些复杂性

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

队列的读取端和写入端共享Stream它们协调传递值(从写入器到读取器)和阻塞指示(从读取器到写入器)的位置,并且读取和写入端各有一个独立的原子计数器。写入的工作方式如下:

  1. 写者调用incrCounter写计数器上的原子来接收其唯一索引,在该索引上与其(单个)读者进行协调

  2. writer 找到它的单元格并执行 CASWritten a

  3. 如果成功则退出,否则它会看到读者已经击败它并正在阻塞(或继续阻塞),所以它执行(\Blocking v)-> putMVar v a)并退出。

读取以类似且明显的方式工作。

第一项创新是使争用点成为在争用下不会降级的原子操作(如 CAS/重试循环或类似 Chan 的锁)。基于简单的基准测试和实验,该公开的 fetch-and-add primopatomic-primops效果最好。

然后在 2 中,读写器只需要执行一次比较和交换(读取器的快速路径是简单的非原子读取)即可完成协调。

所以为了努力unagi-chan做好,我们

  • 使用 fetch-and-add 处理争用点

  • 使用无锁技术,这样当我们被超额订阅时,一个在不合时宜的时间被取消调度的线程不会阻塞其他线程的进度(被阻塞的写入器最多可能阻塞由计数器“分配”给它的读取器;阅读警告重新。异步异常在unagi-chan文档中,并注意Chan这里有更好的语义)

  • 使用数组来存储我们的元素,它具有更好的局部性(但见下文)每个元素的开销更低并且对 GC 的压力更小

最后的说明。使用数组:对数组的并发写入通常不是扩展的一个坏主意,因为您会导致大量缓存一致性流量,因为缓存线在您的写入器线程中来回失效。通用术语是“虚假共享”。但是,我能想到的替代设计也有缓存方面的优势和劣势,它们会条纹写入或其他东西;我已经对此进行了一些试验,但目前还没有任何结论。

我们合法地关注虚假共享的一个地方是我们的计数器,我们将其对齐并填充到 64 字节;这确实出现在基准测试中,唯一的缺点是内存使用量增加。

于 2015-01-14T18:00:04.877 回答
5

如果我不得不猜测为什么pipes-concurrency性能更差,那是因为每次读取和写入都包含在STM事务中,而其他库使用更有效的低级并发原语。

于 2015-01-14T05:09:40.423 回答