@@ -8,7 +8,7 @@ import Control.Concurrent.STM qualified as STM
8
8
import Control.Concurrent.STM.TBMQueue qualified as STM
9
9
import Control.Monad.Except (ExceptT (ExceptT ))
10
10
import Control.Monad.Trans.Except (runExceptT )
11
- import Data.Conduit.Combinators qualified as Conduit
11
+ import Data.List.NonEmpty qualified as NEL
12
12
import Servant
13
13
import Servant.Conduit (ConduitToSourceIO (.. ))
14
14
import Servant.Types.SourceT (SourceT (.. ))
@@ -31,16 +31,20 @@ import Share.Web.Errors
31
31
import Share.Web.UCM.Sync.HashJWT qualified as HashJWT
32
32
import Share.Web.UCM.SyncV2.Queries qualified as SSQ
33
33
import U.Codebase.Sqlite.Orphans ()
34
- import U.Codebase.Sqlite.TempEntity (TempEntity )
35
34
import Unison.Debug qualified as Debug
36
- import Unison.Hash32 (Hash32 )
37
35
import Unison.Share.API.Hash (HashJWTClaims (.. ))
38
36
import Unison.SyncV2.API qualified as SyncV2
39
- import Unison.SyncV2.Types (DownloadEntitiesChunk (.. ), EntityChunk (.. ), ErrorChunk (.. ))
37
+ import Unison.SyncV2.Types (DownloadEntitiesChunk (.. ), EntityChunk (.. ), ErrorChunk (.. ), StreamInitInfo ( .. ) )
40
38
import Unison.SyncV2.Types qualified as SyncV2
41
39
import UnliftIO qualified
42
40
import UnliftIO.Async qualified as Async
43
41
42
+ batchSize :: Int32
43
+ batchSize = 1000
44
+
45
+ streamSettings :: StreamInitInfo
46
+ streamSettings = StreamInitInfo {version = SyncV2. Version 1 , entitySorting = SyncV2. Unsorted , numEntities = Nothing }
47
+
44
48
server :: Maybe UserId -> SyncV2. Routes WebAppServer
45
49
server mayUserId =
46
50
SyncV2. Routes
@@ -59,7 +63,7 @@ parseBranchRef (SyncV2.BranchRef branchRef) =
59
63
parseRelease = fmap Left . eitherToMaybe $ IDs. fromText @ ProjectReleaseShortHand branchRef
60
64
61
65
downloadEntitiesStreamImpl :: Maybe UserId -> SyncV2. DownloadEntitiesRequest -> WebApp (SourceIO SyncV2. DownloadEntitiesChunk )
62
- downloadEntitiesStreamImpl mayCallerUserId (SyncV2. DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes= _todo}) = do
66
+ downloadEntitiesStreamImpl mayCallerUserId (SyncV2. DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes = _todo}) = do
63
67
either emitErr id <$> runExceptT do
64
68
addRequestTag " branch-ref" (SyncV2. unBranchRef branchRef)
65
69
HashJWTClaims {hash = causalHash} <- lift (HashJWT. verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure )
@@ -83,24 +87,35 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
83
87
authZToken <- lift AuthZ. checkDownloadFromProjectBranchCodebase `whenLeftM` \ _err -> throwError (SyncV2. DownloadEntitiesNoReadPermission branchRef)
84
88
let codebaseLoc = Codebase. codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
85
89
pure $ Codebase. codebaseEnv authZToken codebaseLoc
86
- q <- liftIO $ STM. newTBMQueueIO 10
90
+ q <- UnliftIO. atomically $ do
91
+ q <- STM. newTBMQueue 10
92
+ STM. writeTBMQueue q (NEL. singleton $ InitialC $ streamSettings)
93
+ pure q
87
94
streamResults <- lift $ UnliftIO. toIO do
88
95
Logging. logInfoText " Starting download entities stream"
89
96
Codebase. runCodebaseTransaction codebase $ do
97
+ Debug. debugM Debug. Temp " Getting IDs for:" causalHash
90
98
(_bhId, causalId) <- CausalQ. expectCausalIdsOf id (hash32ToCausalHash causalHash)
99
+ Debug. debugM Debug. Temp " Getting deps of" causalId
91
100
cursor <- SSQ. allSerializedDependenciesOfCausalCursor causalId
92
- Cursor. foldBatched cursor 1000 \ batch -> do
93
- PG. transactionUnsafeIO $ STM. atomically $ STM. writeTBMQueue q batch
101
+ Debug. debugLogM Debug. Temp " Got cursor"
102
+ Cursor. foldBatched cursor batchSize \ batch -> do
103
+ Debug. debugLogM Debug. Temp " Emitting batch"
104
+ let entityChunkBatch = batch <&> \ (entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR})
105
+ PG. transactionUnsafeIO $ STM. atomically $ STM. writeTBMQueue q entityChunkBatch
94
106
PG. transactionUnsafeIO $ STM. atomically $ STM. closeTBMQueue q
95
- pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
107
+ liftIO $ Async. async streamResults
108
+ -- pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
109
+ pure $ conduitToSourceIO do
96
110
stream q
97
- Conduit. .| ( Conduit. iterM \ case
98
- InitialC init -> Debug. debugM Debug. Temp " Initial " init
99
- EntityC ec -> Debug. debugM Debug. Temp " Chunk " ec
100
- ErrorC err -> Debug. debugM Debug. Temp " Error " err
101
- )
102
111
where
103
- stream :: STM. TBMQueue (NonEmpty (SyncV2. CBORBytes TempEntity , Hash32 )) -> ConduitT () DownloadEntitiesChunk IO ()
112
+ -- Conduit..| ( Conduit.iterM \case
113
+ -- InitialC init -> Debug.debugM Debug.Temp "Initial " init
114
+ -- EntityC ec -> Debug.debugM Debug.Temp "Chunk " ec
115
+ -- ErrorC err -> Debug.debugM Debug.Temp "Error " err
116
+ -- )
117
+
118
+ stream :: STM. TBMQueue (NonEmpty DownloadEntitiesChunk ) -> ConduitT () DownloadEntitiesChunk IO ()
104
119
stream q = do
105
120
let loop :: ConduitT () DownloadEntitiesChunk IO ()
106
121
loop = do
@@ -111,18 +126,18 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
111
126
Debug. debugLogM Debug. Temp " Queue closed. finishing up!"
112
127
pure ()
113
128
Just batch -> do
114
- let chunks = batch <&> \ (entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR})
115
- Debug. debugLogM Debug. Temp $ " Emitting chunk of " <> show (length chunks) <> " entities"
116
- yieldMany chunks
129
+ Debug. debugLogM Debug. Temp $ " Emitting chunk of " <> show (length batch) <> " entities"
130
+ yieldMany batch
117
131
loop
132
+
118
133
loop
119
134
Debug. debugLogM Debug. Temp " Done!"
120
135
121
136
emitErr :: SyncV2. DownloadEntitiesError -> SourceIO SyncV2. DownloadEntitiesChunk
122
137
emitErr err = SourceT. source [ErrorC (ErrorChunk err)]
123
138
124
139
-- | Run an IO action in the background while streaming the results.
125
- sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r
126
- sourceIOWithAsync action (SourceT k) =
140
+ _sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r
141
+ _sourceIOWithAsync action (SourceT k) =
127
142
SourceT \ k' ->
128
143
Async. withAsync action \ _ -> k k'
0 commit comments