@@ -5,9 +5,10 @@ module Share.Web.UCM.SyncStream.Impl (server) where
55
66import Conduit
77import Control.Concurrent.STM qualified as STM
8+ import Control.Concurrent.STM.TBMQueue qualified as STM
89import Control.Monad.Except (ExceptT (ExceptT ))
910import Control.Monad.Trans.Except (runExceptT )
10- import Data.ByteString.Lazy qualified as BL
11+ import Data.Conduit.Combinators qualified as Conduit
1112import Servant
1213import Servant.Conduit (ConduitToSourceIO (.. ))
1314import Servant.Types.SourceT qualified as SourceT
@@ -28,6 +29,8 @@ import Share.Web.Errors
2829import Share.Web.UCM.Sync.HashJWT qualified as HashJWT
2930import Share.Web.UCM.SyncStream.Queries qualified as SSQ
3031import U.Codebase.Sqlite.Orphans ()
32+ import U.Codebase.Sqlite.TempEntity (TempEntity )
33+ import Unison.Debug qualified as Debug
3134import Unison.Hash32 (Hash32 )
3235import Unison.Share.API.Hash (HashJWTClaims (.. ))
3336import Unison.SyncV2.API qualified as SyncV2
@@ -78,34 +81,44 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
7881 authZToken <- lift AuthZ. checkDownloadFromProjectBranchCodebase `whenLeftM` \ _err -> throwError (SyncV2. DownloadEntitiesNoReadPermission branchRef)
7982 let codebaseLoc = Codebase. codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
8083 pure $ Codebase. codebaseEnv authZToken codebaseLoc
81- q <- liftIO $ STM. newTBQueueIO 10
84+ q <- liftIO $ STM. newTBMQueueIO 10
8285 streamResults <- lift $ UnliftIO. toIO do
86+ Debug. debugLogM Debug. Temp " Starting source Stream"
8387 Codebase. runCodebaseTransaction codebase $ do
8488 (_bhId, causalId) <- CausalQ. expectCausalIdsOf id (hash32ToCausalHash causalHash)
8589 cursor <- SSQ. allSerializedDependenciesOfCausalCursor causalId
8690 Cursor. foldBatched cursor 1000 \ batch -> do
87- PG. transactionUnsafeIO $ STM. atomically $ STM. writeTBQueue q batch
91+ Debug. debugLogM Debug. Temp " Source stream batch"
92+ PG. transactionUnsafeIO $ STM. atomically $ STM. writeTBMQueue q batch
93+ PG. transactionUnsafeIO $ STM. atomically $ STM. closeTBMQueue q
8894 pure $ conduitToSourceIO do
8995 handle <- liftIO $ Async. async streamResults
9096 stream q handle
97+ Conduit. .| ( Conduit. iterM \ case
98+ EntityChunk {hash} -> Debug. debugM Debug. Temp " Chunk " hash
99+ ErrorChunk err -> Debug. debugM Debug. Temp " Error " err
100+ )
91101 where
92- stream :: STM. TBQueue (NonEmpty (Hash32 , ByteString )) -> Async. Async ( ) -> ConduitT () DownloadEntitiesChunk IO ()
93- stream q async = do
102+ stream :: STM. TBMQueue (NonEmpty (SyncV2. CBORBytes TempEntity , Hash32 )) -> ( Async. Async a ) -> ConduitT () DownloadEntitiesChunk IO ()
103+ stream q handle = do
94104 let loop :: ConduitT () DownloadEntitiesChunk IO ()
95105 loop = do
96- next <- liftIO . STM. atomically $ do
97- STM. tryReadTBQueue q >>= \ case
98- Nothing -> do
99- Async. waitSTM async $> Nothing
100- Just batch -> do
101- pure $ Just batch
102- case next of
103- Nothing -> pure ()
106+ Debug. debugLogM Debug. Temp " Waiting for batch..."
107+ liftIO (STM. atomically (STM. readTBMQueue q)) >>= \ case
108+ -- The queue is closed.
109+ Nothing -> do
110+ Debug. debugLogM Debug. Temp " Queue closed. finishing up!"
111+ pure ()
104112 Just batch -> do
105- let chunks = batch <&> \ (hash, bytes) -> EntityChunk {hash, entityCBOR = SyncV2. CBORBytes $ BL. fromStrict bytes}
113+ let chunks = batch <&> \ (entityCBOR, hash) -> EntityChunk {hash, entityCBOR}
114+ Debug. debugLogM Debug. Temp $ " Emitting chunk of " <> show (length chunks) <> " entities"
106115 yieldMany chunks
107116 loop
108117 loop
118+ Debug. debugLogM Debug. Temp " Waiting for worker thread to finish"
119+ -- It _should_ have terminated by now, but just in case, cancel it.
120+ Async. cancel handle
121+ Debug. debugLogM Debug. Temp " Done!"
109122
110123 emitErr :: SyncV2. DownloadEntitiesError -> SourceIO SyncV2. DownloadEntitiesChunk
111124 emitErr err = SourceT. source [ErrorChunk err]
0 commit comments