diff --git a/src/Share/Web/UCM/SyncV2/Impl.hs b/src/Share/Web/UCM/SyncV2/Impl.hs index 8b89dbf..e2c9278 100644 --- a/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/src/Share/Web/UCM/SyncV2/Impl.hs @@ -3,11 +3,12 @@ module Share.Web.UCM.SyncV2.Impl (server) where -import Conduit +import Conduit qualified as C import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TBMQueue qualified as STM import Control.Monad.Except (ExceptT (ExceptT)) import Control.Monad.Trans.Except (runExceptT) +import Data.Conduit.Combinators qualified as C import Data.List.NonEmpty qualified as NEL import Servant import Servant.Conduit (ConduitToSourceIO (..)) @@ -104,20 +105,12 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus let entityChunkBatch = batch <&> \(entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR}) PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q entityChunkBatch PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q - liftIO $ Async.async streamResults - -- pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do - pure $ conduitToSourceIO do + pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do stream q where - -- Conduit..| ( Conduit.iterM \case - -- InitialC init -> Debug.debugM Debug.Temp "Initial " init - -- EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec - -- ErrorC err -> Debug.debugM Debug.Temp "Error " err - -- ) - - stream :: STM.TBMQueue (NonEmpty DownloadEntitiesChunk) -> ConduitT () DownloadEntitiesChunk IO () + stream :: STM.TBMQueue (NonEmpty DownloadEntitiesChunk) -> C.ConduitT () DownloadEntitiesChunk IO () stream q = do - let loop :: ConduitT () DownloadEntitiesChunk IO () + let loop :: C.ConduitT () DownloadEntitiesChunk IO () loop = do Debug.debugLogM Debug.Temp "Waiting for batch..." liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case @@ -127,7 +120,7 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus pure () Just batch -> do Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length batch) <> " entities" - yieldMany batch + C.yieldMany batch loop loop @@ -137,7 +130,20 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus emitErr err = SourceT.source [ErrorC (ErrorChunk err)] -- | Run an IO action in the background while streaming the results. -_sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r -_sourceIOWithAsync action (SourceT k) = +-- +-- Servant doesn't provide any easier way to do bracketing like this, all the IO must be +-- inside the SourceIO somehow. +sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r +sourceIOWithAsync action (SourceT k) = SourceT \k' -> Async.withAsync action \_ -> k k' + +-- debug the output pipe. +_tap :: (Monad m) => (C.ConduitT a DownloadEntitiesChunk m ()) -> (C.ConduitT a DownloadEntitiesChunk m ()) +_tap s = + s + C..| ( C.iterM \case + InitialC init -> Debug.debugM Debug.Temp "Initial " init + EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec + ErrorC err -> Debug.debugM Debug.Temp "Error " err + )