这感觉有点远,但我写了一个管道来连接数据库,获取服务器上的数据库列表,连接到每个数据库,然后对每个数据库执行查询(用户计数),然后打印那些计数。不幸的是,这与我可以从我的真实示例中简化的程度差不多。我正在使用管道 4.1.0、管道安全 2.0.2 和 mysql-simple-0.2.2.4。这是代码:
{-# LANGUAGE RankNTypes, OverloadedStrings #-}
import Pipes
import qualified Pipes.Safe as PS
import qualified Pipes.Prelude as P
import Database.MySQL.Simple
import qualified Data.Text as T
import Control.Monad.Catch as MC
import Control.Monad (forever)
import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults
data DBName = DBName T.Text deriving Show
-- connect to a database and use a table.
mydb :: T.Text -> ConnectInfo
mydb = undefined
-- Quirk of (mysql|postgresql)-simple libraries
unOnly (Only a) = a
queryProducer :: (MonadIO m, QueryParams params, QueryResults r) => Connection -> Query -> params -> Pipes.Producer' r m ()
queryProducer = undefined
myDBNames :: (PS.MonadSafe m, MonadIO m) => Producer DBName m ()
myDBNames = PS.bracket (liftIO $ connect $ mydb "sometable") (liftIO . close) $ \db ->
queryProducer db "show databases" () >-> P.map (DBName . unOnly)
-- I realize this is inefficient, one step at a time.
connectToDB :: (PS.MonadSafe m, MonadIO m) => Pipe DBName Connection m ()
connectToDB = forever $ do
(DBName dbname) <- await
PS.bracket
(liftIO . connect . mydb $ dbname)
(liftIO . close)
yield
userCount :: (PS.MonadCatch m, MonadIO m) => Pipe Connection Int m ()
userCount = forever $ do
db <- await
queryProducer db "select count(*) from user" () >-> P.map unOnly
main :: IO ()
main = PS.runSafeT $ runEffect $ myDBNames >-> P.tee P.print >-> connectToDB >-> userCount >-> P.print
这工作正常。但是,假设在其中一个数据库中,用户表被命名为 users 而不是 user,因此 mysql-simple 将在运行该查询时抛出异常。我想内联捕获该错误,并为这些查询返回 0 个用户,但继续前进。我尝试过的事情:
(queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0))) >-> P.map unOnly
这行不通。有时它会打印失败并产生一个 0,只是在异常时立即终止。我想可能是因为我抛出了 queryProducer 异常,我应该再次调用它,所以我尝试了这个递归版本:
thequery db >-> P.map unOnly
where
thequery db = queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0) >> thequery db)
但这也失败了。然而,有时它实际上会执行几个查询,打印出几次失败并产生几个 0,然后再次以异常终止。我真的很困惑为什么会这样。
根据异步库,异常应该被发送到管道正在运行的线程中,所以它看起来不像是线程问题。
如果我的 queryProducer 的实现很重要,它是在管道 postgresql 查询函数之后建模的,推广到 Producer',所以我可以将它嵌入到其他组合器中。在 mysql-simple 下面,在 mysql 库中有一个 throw,如果您的 sql 没有意义,它会抛出 ConnectionError,它会一直渗透到这个函数中。
{-# LANGUAGE RankNTypes #-}
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Database.MySQL.Simple as My
import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults
import qualified Pipes
import qualified Pipes.Concurrent as Pipes
--------------------------------------------------------------------------------
-- | Convert a query to a 'Producer' of rows.
--
-- For example,
--
-- > pg <- connectToMysql
-- > query pg "SELECT * FROM widgets WHERE ID = ?" (Only widgetId) >-> print
--
-- Will select all widgets for a given @widgetId@, and then print each row to
-- standard output.
queryProducer
:: (MonadIO m, QueryResults r, QueryParams params)
=> My.Connection -> My.Query -> params -> Pipes.Producer' r m ()
queryProducer c q p = do
(o, i, seal) <- liftIO (Pipes.spawn' Pipes.Single)
worker <- liftIO $ Async.async $ do
My.fold c q p () (const $ void . STM.atomically . Pipes.send o)
STM.atomically seal
liftIO $ Async.link worker
Pipes.fromInput i
我还尝试使用 EitherT 来尝试捕获异常,因为这似乎是过去在管道中完成的方式。但是管道教程中的文档在 3 到 4 之间消失了,这让我想知道是否仍然推荐该技术。不幸的是,我无法让它工作,因为我使用 queryProducer 而不是单一的 await/yields 的方式,我不确定如何构建它。