diff --git a/src/Share/Web/Impl.hs b/src/Share/Web/Impl.hs index 31a870f..aa14009 100644 --- a/src/Share/Web/Impl.hs +++ b/src/Share/Web/Impl.hs @@ -73,5 +73,5 @@ server = :<|> Sync.server -- Deprecated path :<|> Sync.server :<|> UCMProjects.server - :<|> Admin.server :<|> SyncStream.server + :<|> Admin.server diff --git a/src/Share/Web/UCM/SyncStream/Impl.hs b/src/Share/Web/UCM/SyncStream/Impl.hs index f90edd9..a41e63e 100644 --- a/src/Share/Web/UCM/SyncStream/Impl.hs +++ b/src/Share/Web/UCM/SyncStream/Impl.hs @@ -5,9 +5,10 @@ module Share.Web.UCM.SyncStream.Impl (server) where import Conduit import Control.Concurrent.STM qualified as STM +import Control.Concurrent.STM.TBMQueue qualified as STM import Control.Monad.Except (ExceptT (ExceptT)) import Control.Monad.Trans.Except (runExceptT) -import Data.ByteString.Lazy qualified as BL +import Data.Conduit.Combinators qualified as Conduit import Servant import Servant.Conduit (ConduitToSourceIO (..)) import Servant.Types.SourceT qualified as SourceT @@ -28,6 +29,8 @@ import Share.Web.Errors import Share.Web.UCM.Sync.HashJWT qualified as HashJWT import Share.Web.UCM.SyncStream.Queries qualified as SSQ import U.Codebase.Sqlite.Orphans () +import U.Codebase.Sqlite.TempEntity (TempEntity) +import Unison.Debug qualified as Debug import Unison.Hash32 (Hash32) import Unison.Share.API.Hash (HashJWTClaims (..)) import Unison.SyncV2.API qualified as SyncV2 @@ -78,34 +81,44 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc - q <- liftIO $ STM.newTBQueueIO 10 + q <- liftIO $ STM.newTBMQueueIO 10 streamResults <- lift $ UnliftIO.toIO do + Debug.debugLogM Debug.Temp "Starting source Stream" Codebase.runCodebaseTransaction codebase $ do (_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash) cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId Cursor.foldBatched cursor 1000 \batch -> do - PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBQueue q batch + Debug.debugLogM Debug.Temp "Source stream batch" + PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q batch + PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q pure $ conduitToSourceIO do handle <- liftIO $ Async.async streamResults stream q handle + Conduit..| ( Conduit.iterM \case + EntityChunk {hash} -> Debug.debugM Debug.Temp "Chunk " hash + ErrorChunk err -> Debug.debugM Debug.Temp "Error " err + ) where - stream :: STM.TBQueue (NonEmpty (Hash32, ByteString)) -> Async.Async () -> ConduitT () DownloadEntitiesChunk IO () - stream q async = do + stream :: STM.TBMQueue (NonEmpty (SyncV2.CBORBytes TempEntity, Hash32)) -> (Async.Async a) -> ConduitT () DownloadEntitiesChunk IO () + stream q handle = do let loop :: ConduitT () DownloadEntitiesChunk IO () loop = do - next <- liftIO . STM.atomically $ do - STM.tryReadTBQueue q >>= \case - Nothing -> do - Async.waitSTM async $> Nothing - Just batch -> do - pure $ Just batch - case next of - Nothing -> pure () + Debug.debugLogM Debug.Temp "Waiting for batch..." + liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case + -- The queue is closed. + Nothing -> do + Debug.debugLogM Debug.Temp "Queue closed. finishing up!" + pure () Just batch -> do - let chunks = batch <&> \(hash, bytes) -> EntityChunk {hash, entityCBOR = SyncV2.CBORBytes $ BL.fromStrict bytes} + let chunks = batch <&> \(entityCBOR, hash) -> EntityChunk {hash, entityCBOR} + Debug.debugLogM Debug.Temp $ "Emitting chunk of " <> show (length chunks) <> " entities" yieldMany chunks loop loop + Debug.debugLogM Debug.Temp "Waiting for worker thread to finish" + -- It _should_ have terminated by now, but just in case, cancel it. + Async.cancel handle + Debug.debugLogM Debug.Temp "Done!" emitErr :: SyncV2.DownloadEntitiesError -> SourceIO SyncV2.DownloadEntitiesChunk emitErr err = SourceT.source [ErrorChunk err] diff --git a/src/Share/Web/UCM/SyncStream/Queries.hs b/src/Share/Web/UCM/SyncStream/Queries.hs index dee0dfb..11278a4 100644 --- a/src/Share/Web/UCM/SyncStream/Queries.hs +++ b/src/Share/Web/UCM/SyncStream/Queries.hs @@ -11,7 +11,9 @@ import Share.Postgres.Cursors (PGCursor) import Share.Postgres.Cursors qualified as PGCursor import Share.Postgres.IDs import Share.Prelude +import U.Codebase.Sqlite.TempEntity (TempEntity) import Unison.Hash32 (Hash32) +import Unison.SyncV2.Types (CBORBytes) allHashDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor Text) allHashDependenciesOfCausalCursor cid = do @@ -180,7 +182,7 @@ allHashDependenciesOfCausalCursor cid = do JOIN component_hashes ON tc.component_hash_id = component_hashes.id |] -allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor (Hash32, ByteString)) +allSerializedDependenciesOfCausalCursor :: CausalId -> CodebaseM e (PGCursor (CBORBytes TempEntity, Hash32)) allSerializedDependenciesOfCausalCursor cid = do ownerUserId <- asks codebaseOwner PGCursor.newRowCursor @@ -217,7 +219,7 @@ allSerializedDependenciesOfCausalCursor cid = do JOIN branch_hashes bh ON tc.causal_namespace_hash_id = bh.id -- WHERE NOT EXISTS (SELECT FROM namespace_ownership no WHERE no.user_id = to_codebase_user_id AND no.namespace_hash_id = tc.causal_namespace_hash_id) ), all_patches(patch_id, patch_hash) AS ( - SELECT DISTINCT patch.id + SELECT DISTINCT patch.id, patch.hash FROM all_namespaces an JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id JOIN patches patch ON np.patch_id = patch.id