3
3
4
4
module Share.Web.UCM.SyncV2.Impl (server ) where
5
5
6
- import Conduit
6
+ import Conduit qualified as C
7
7
import Control.Concurrent.STM qualified as STM
8
8
import Control.Concurrent.STM.TBMQueue qualified as STM
9
9
import Control.Monad.Except (ExceptT (ExceptT ))
10
10
import Control.Monad.Trans.Except (runExceptT )
11
+ import Data.Conduit.Combinators qualified as C
11
12
import Data.List.NonEmpty qualified as NEL
12
13
import Servant
13
14
import Servant.Conduit (ConduitToSourceIO (.. ))
@@ -104,20 +105,12 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
104
105
let entityChunkBatch = batch <&> \ (entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR})
105
106
PG. transactionUnsafeIO $ STM. atomically $ STM. writeTBMQueue q entityChunkBatch
106
107
PG. transactionUnsafeIO $ STM. atomically $ STM. closeTBMQueue q
107
- liftIO $ Async. async streamResults
108
- -- pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
109
- pure $ conduitToSourceIO do
108
+ pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
110
109
stream q
111
110
where
112
- -- Conduit..| ( Conduit.iterM \case
113
- -- InitialC init -> Debug.debugM Debug.Temp "Initial " init
114
- -- EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec
115
- -- ErrorC err -> Debug.debugM Debug.Temp "Error " err
116
- -- )
117
-
118
- stream :: STM. TBMQueue (NonEmpty DownloadEntitiesChunk ) -> ConduitT () DownloadEntitiesChunk IO ()
111
+ stream :: STM. TBMQueue (NonEmpty DownloadEntitiesChunk ) -> C. ConduitT () DownloadEntitiesChunk IO ()
119
112
stream q = do
120
- let loop :: ConduitT () DownloadEntitiesChunk IO ()
113
+ let loop :: C. ConduitT () DownloadEntitiesChunk IO ()
121
114
loop = do
122
115
Debug. debugLogM Debug. Temp " Waiting for batch..."
123
116
liftIO (STM. atomically (STM. readTBMQueue q)) >>= \ case
@@ -127,7 +120,7 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
127
120
pure ()
128
121
Just batch -> do
129
122
Debug. debugLogM Debug. Temp $ " Emitting chunk of " <> show (length batch) <> " entities"
130
- yieldMany batch
123
+ C. yieldMany batch
131
124
loop
132
125
133
126
loop
@@ -137,7 +130,20 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
137
130
emitErr err = SourceT. source [ErrorC (ErrorChunk err)]
138
131
139
132
-- | Run an IO action in the background while streaming the results.
140
- _sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r
141
- _sourceIOWithAsync action (SourceT k) =
133
+ --
134
+ -- Servant doesn't provide any easier way to do bracketing like this, all the IO must be
135
+ -- inside the SourceIO somehow.
136
+ sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r
137
+ sourceIOWithAsync action (SourceT k) =
142
138
SourceT \ k' ->
143
139
Async. withAsync action \ _ -> k k'
140
+
141
+ -- debug the output pipe.
142
+ _tap :: (Monad m ) => (C. ConduitT a DownloadEntitiesChunk m () ) -> (C. ConduitT a DownloadEntitiesChunk m () )
143
+ _tap s =
144
+ s
145
+ C. .| ( C. iterM \ case
146
+ InitialC init -> Debug. debugM Debug. Temp " Initial " init
147
+ EntityC ec -> Debug. debugM Debug. Temp " Chunk " ec
148
+ ErrorC err -> Debug. debugM Debug. Temp " Error " err
149
+ )
0 commit comments