Skip to content

add support to use query as a conduit source #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 74 additions & 9 deletions src/Database/InfluxDB/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ module Database.InfluxDB.Query
Query
, query
, queryChunked
, queryChunked'
, queryC

-- * Query parameters
, QueryParams
Expand All @@ -36,6 +38,7 @@ module Database.InfluxDB.Query

-- * Low-level functions
, withQueryResponse
, withQueryResponse'

-- * Helper types
, Ignored
Expand Down Expand Up @@ -76,6 +79,11 @@ import Database.InfluxDB.JSON
import Database.InfluxDB.Types as Types
import qualified Database.InfluxDB.Format as F

import Control.Monad.IO.Class
import Control.Monad.Trans.Resource
import Control.Monad.Catch as MC
import Data.Conduit

-- $setup
-- >>> :set -XDataKinds
-- >>> :set -XOverloadedStrings
Expand Down Expand Up @@ -454,6 +462,19 @@ precisionParam = \case
Hour -> return "h"
RFC3339 -> Nothing

instance (MonadUnliftIO m, MonadThrow m) => MonadCatch (ConduitT i o m) where
catch = catchC

queryC
:: (QueryResults a, MonadResource m, MonadCatch m, MonadUnliftIO m, MonadIO m)
=> QueryParams
-> Optional Int
-> Query
-> ConduitT () a m ()
queryC params chunkSize q =
queryChunked' params chunkSize q (L.FoldM (\_ v -> V.mapM_ yield v)
(return ()) (\_ -> return ()))

-- | Same as 'query' but it instructs InfluxDB to stream chunked responses
-- rather than returning a huge JSON object. This can be lot more efficient than
-- 'query' if the result is huge.
Expand All @@ -475,11 +496,31 @@ queryChunked
-> L.FoldM IO (Vector a) r
-> IO r
queryChunked params chunkSize q (L.FoldM step initialize extract) =
withQueryResponse params (Just chunkSize) q go
runResourceT $
queryChunked' params chunkSize q
(L.FoldM
(\x v -> liftIO $ step x v)
(liftIO $ initialize)
(\x -> liftIO $ extract x))

queryChunked'
:: (QueryResults a, MonadResource m, MonadCatch m, MonadIO m)
=> QueryParams
-> Optional Int
-- ^ Chunk size
--
-- By 'Default', InfluxDB chunks responses by series or by every 10,000
-- points, whichever occurs first. If it set to a 'Specific' value, InfluxDB
-- chunks responses by series or by that number of points.
-> Query
-> L.FoldM m (Vector a) r
-> m r
queryChunked' params chunkSize q (L.FoldM step initialize extract) =
withQueryResponse' params (Just chunkSize) q go
where
go request response = do
x0 <- initialize
chunk0 <- HC.responseBody response
chunk0 <- liftIO $ HC.responseBody response
x <- loop x0 k0 chunk0
extract x
where
Expand All @@ -488,17 +529,17 @@ queryChunked params chunkSize q (L.FoldM step initialize extract) =
| B.null chunk = return x
| otherwise = case k chunk of
AB.Fail unconsumed _contexts message ->
throwIO $ UnexpectedResponse message request $
liftIO $ throwIO $ UnexpectedResponse message request $
BL.fromStrict unconsumed
AB.Partial k' -> do
chunk' <- HC.responseBody response
chunk' <- liftIO $ HC.responseBody response
loop x k' chunk'
AB.Done leftover val ->
case A.parse (parseQueryResults (queryPrecision params)) val of
A.Success vec -> do
x' <- step x vec
loop x' k0 leftover
A.Error message -> errorQuery message request response val
A.Error message -> liftIO $ errorQuery message request response val

-- | Lower-level interface to query data.
withQueryResponse
Expand All @@ -513,10 +554,34 @@ withQueryResponse
-> Query
-> (HC.Request -> HC.Response HC.BodyReader -> IO r)
-> IO r
withQueryResponse params chunkSize q f = do
manager' <- either HC.newManager return $ queryManager params
HC.withResponse request manager' (f request)
`catch` (throwIO . HTTPException)
withQueryResponse params chunkSize q f =
runResourceT $
withQueryResponse' params chunkSize q
(\req resp -> liftIO $ f req resp)

withQueryResponse'
:: (MonadResource m, MonadCatch m, MonadIO m)
=> QueryParams
-> Maybe (Optional Int)
-- ^ Chunk size
--
-- By 'Nothing', InfluxDB returns all matching data points at once.
-- By @'Just' 'Default'@, InfluxDB chunks responses by series or by every
-- 10,000 points, whichever occurs first. If it set to a 'Specific' value,
-- InfluxDB chunks responses by series or by that number of points.
-> Query
-> (HC.Request -> HC.Response HC.BodyReader -> m r)
-> m r
withQueryResponse' params chunkSize q f = do
manager' <- liftIO $ either HC.newManager return $ queryManager params
-- HC.withResponse request manager' (f request)
let hcWithResponse req man f = do
(key, res) <- allocate (HC.responseOpen req man) HC.responseClose
retVal <- f res
release key
return retVal
hcWithResponse request manager' (f request)
`MC.catch` (liftIO . throwIO . HTTPException)
where
request =
HC.setQueryString (setPrecision (queryPrecision params) queryString) $
Expand Down