Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Revision history for streaming-conduit

## Unreleased (Minor) -- YYYY-MM-DD

* Generalise the return types of `toStream` and `toBStream` to return
the conduit's return type:

```
Old: toStream :: (Monad m) => ConduitT () o m () -> Stream (Of o) m ()
New: toStream :: (Monad m) => ConduitT () o m r -> Stream (Of o) m r

Old: toBStream :: (Monad m) => ConduitT () ByteString m () -> ByteStream m ()
New: toBStream :: (Monad m) => ConduitT () ByteString m r -> ByteStream m r
```

* Stop referring to deprecated names in `conduit` and `streaming-bytestring`.
* Support GHC 9.0.

## 0.1.3.0 -- 2023-05-13

* Support `streaming-bytestring-0.3`
Expand Down
102 changes: 75 additions & 27 deletions src/Streaming/Conduit.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{-# LANGUAGE RankNTypes #-}

{- |
Module : Streaming.Conduit
Description : Bidirectional support for the streaming and conduit libraries
Expand All @@ -18,7 +16,7 @@

No 'B.ByteString'-based analogues of 'asConduit' and 'asStream' are
provided as it would be of strictly less utility, requiring both the
input and output of the 'ConduitM' to be 'ByteString'.
input and output of the 'ConduitT' to be 'ByteString'.

-}
module Streaming.Conduit
Expand All @@ -39,72 +37,122 @@ module Streaming.Conduit
, sinkBStream
) where

import Control.Monad (join, void)
import Control.Monad.Trans.Class (lift)
import Control.Monad (join)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Streaming as B
import Data.Conduit (Conduit, ConduitM, Producer, Source, Consumer,
await, runConduit, transPipe, (.|))
import Data.Conduit (await, runConduit, (.|))
import Data.Conduit.Internal (ConduitT(..), Pipe(..))
import qualified Data.Conduit.List as CL
import Data.Functor (void)
import Data.Void (Void)
import Streaming (Of, Stream)
import qualified Streaming as S
import Streaming.ByteString (ByteStream)
import qualified Streaming.ByteString as B
import qualified Streaming.Prelude as S

--------------------------------------------------------------------------------

-- | The result of this is slightly generic than a 'Source' or a
-- 'Producer'. Subject to fusion.
fromStream :: (Monad m) => Stream (Of o) m r -> ConduitM i o m r
fromStream :: (Monad m) => Stream (Of o) m r -> ConduitT i o m r
fromStream = CL.unfoldEitherM S.next

-- | A type-specialised variant of 'fromStream' that ignores the
-- result.
fromStreamSource :: (Monad m) => Stream (Of a) m r -> Source m a
--
-- It can return a Conduit @Source@:
-- @
-- fromStreamSource :: (Monad m) => Stream (Of o) m r -> Source m o
-- @
fromStreamSource :: (Monad m) => Stream (Of o) m r -> ConduitT () o m ()
fromStreamSource = void . fromStream

-- | A more specialised variant of 'fromStream' that is subject to
-- fusion.
fromStreamProducer :: (Monad m) => Stream (Of a) m r -> Producer m a
fromStreamProducer = CL.unfoldM S.uncons . void
--
-- It can return a Conduit @Producer@:
-- @
-- fromStreamProducer :: (Monad m) => Stream (Of a) m r -> Producer m a
-- @
fromStreamProducer :: (Monad m) => Stream (Of o) m r -> ConduitT i o m r
fromStreamProducer = CL.unfoldEitherM S.next

-- | Convert a streaming 'B.ByteString' into a 'Source'; subject to fusion.
fromBStream :: (Monad m) => B.ByteString m r -> ConduitM i ByteString m r
fromBStream = CL.unfoldEitherM B.nextChunk
fromBStream :: (Monad m) => ByteStream m r -> ConduitT i ByteString m r
fromBStream = CL.unfoldEitherM B.unconsChunk

-- | A more specialised variant of 'fromBStream'.
fromBStreamProducer :: (Monad m) => B.ByteString m r -> Producer m ByteString
fromBStreamProducer = CL.unfoldEitherM B.unconsChunk . void
--
-- It can return a Conduit @Producer@:
-- @
-- fromBStreamProducer :: (Monad m) => ByteStream m r -> Producer m ByteString
-- @
fromBStreamProducer :: (Monad m) => ByteStream m r -> ConduitT i ByteString m r
fromBStreamProducer = CL.unfoldEitherM B.unconsChunk

-- | Convert a 'Producer' to a 'Stream'. Subject to fusion.
--
-- It is not possible to generalise this to be a 'ConduitM' as input
-- It is not possible to generalise this to be a 'ConduitT' as input
-- values are required. If you need such functionality, see
-- 'asStream'.
toStream :: (Monad m) => Producer m o -> Stream (Of o) m ()
toStream cnd = runConduit (transPipe lift cnd .| CL.mapM_ S.yield)
--
-- It can accept a Conduit @Producer@:
-- @
-- toStream :: (Monad m) => Producer m o -> Stream (Of o) m ()
-- @
toStream :: (Monad m) => ConduitT () o m r -> Stream (Of o) m r
toStream (ConduitT k) = go $ k Done
where
go (HaveOutput p o) = S.yield o *> go p
go (NeedInput _ c) = go $ c ()
go (Done r) = pure r
go (PipeM mp) = S.effect $ go <$> mp
go (Leftover p _) = go p

-- | Convert a 'Producer' to a 'B.ByteString' stream. Subject to
-- | Convert a 'Producer' to a 'ByteStream' stream. Subject to
-- fusion.
toBStream :: (Monad m) => Producer m ByteString -> B.ByteString m ()
toBStream cnd = runConduit (transPipe lift cnd .| CL.mapM_ B.chunk)
--
-- It can accept a Conduit @Producer@:
-- @
-- toBStream :: (Monad m) => Producer m ByteString -> ByteStream m ()
-- @
toBStream :: (Monad m) => ConduitT () ByteString m r -> ByteStream m r
toBStream (ConduitT k) = go $ k Done
where
go (HaveOutput p o) = B.fromStrict o *> go p
go (NeedInput _ c) = go $ c ()
go (Done r) = pure r
go (PipeM mp) = B.mwrap $ go <$> mp
go (Leftover p _) = go p

-- | Treat a 'Conduit' as a function between 'Stream's. Subject to
-- fusion.
asStream :: (Monad m) => Conduit i m o -> Stream (Of i) m () -> Stream (Of o) m ()
asStream :: (Monad m) => ConduitT i o m () -> Stream (Of i) m () -> Stream (Of o) m ()
asStream cnd stream = toStream (fromStream stream .| cnd)

-- | Treat a 'Consumer' as a function which consumes a 'Stream'.
-- Subject to fusion.
sinkStream :: (Monad m) => Consumer i m r -> Stream (Of i) m () -> m r
--
-- It can accept a Conduit @Consumer@:
-- @
-- sinkStream :: (Monad m) => Consumer i m r -> Stream (Of i) m () -> m r
-- @
sinkStream :: (Monad m) => ConduitT i Void m r -> Stream (Of i) m () -> m r
sinkStream cns stream = runConduit (fromStream stream .| cns)

-- | Treat a 'Consumer' as a function which consumes a 'B.ByteString'.
-- | Treat a 'Consumer' as a function which consumes a 'ByteStream'.
-- Subject to fusion.
sinkBStream :: (Monad m) => Consumer ByteString m r -> B.ByteString m () -> m r
--
-- It can accept a Conduit @Consumer@:
-- @
-- sinkBStream :: (Monad m) => Consumer ByteString m r -> ByteStream m () -> m r
-- @
sinkBStream :: (Monad m) => ConduitT ByteString Void m r -> ByteStream m () -> m r
sinkBStream cns stream = runConduit (fromBStream stream .| cns)

-- | Treat a function between 'Stream's as a 'Conduit'. May be
-- subject to fusion.
asConduit :: (Monad m) => (Stream (Of i) m () -> Stream (Of o) m r) -> ConduitM i o m r
asConduit :: (Monad m) => (Stream (Of i) m () -> Stream (Of o) m r) -> ConduitT i o m r
asConduit f = join . fmap (fromStream . f) $ go
where
-- Probably not the best way to go about it, but it works.
Expand Down
9 changes: 4 additions & 5 deletions streaming-conduit.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,19 @@ category: Data, Streaming
build-type: Simple
extra-source-files: ChangeLog.md, README.md
cabal-version: >=1.10
tested-with: GHC == 7.10.2, GHC == 8.0.2, GHC == 8.2.2, GHC == 8.3.*
tested-with: GHC == 8.0.2, GHC == 8.2.2, GHC == 8.3.*, GHC == 9.0.2, GHC == 9.2.4

source-repository head
type: git
location: https://github.com/haskell-streaming/streaming-conduit.git

library
exposed-modules: Streaming.Conduit
build-depends: base >=4.6 && <5
build-depends: base >=4.9 && <5
, bytestring
, conduit >= 1.2.11 && < 1.4
, conduit >= 1.3.0 && < 1.4
, streaming >= 0.1.3.0 && < 0.3
, streaming-bytestring >= 0.3 && < 0.4
, transformers >= 0.2
hs-source-dirs: src
default-language: Haskell2010

Expand All @@ -34,7 +33,7 @@ test-suite conversions
build-depends: streaming-conduit
, base
, conduit
, hspec == 2.4.*
, hspec >= 2.8.0 && < 2.12
, streaming
hs-source-dirs: test
default-language: Haskell2010
2 changes: 1 addition & 1 deletion test/conversions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ main = hspec $ do
prop "sinkStream" $
testPipeline (\xs f -> runIdentity $ sinkStream C.consume $ S.map f $ S.each xs)

conduitList :: C.Source Identity a -> [a]
conduitList :: C.ConduitT () a Identity () -> [a]
conduitList = runIdentity . C.sourceToList

streamList :: S.Stream (S.Of a) Identity () -> [a]
Expand Down