Skip to content

Naïve dependency negotiation API #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions share-api.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ library
Share.Web.UCM.SyncV2.API
Share.Web.UCM.SyncV2.Impl
Share.Web.UCM.SyncV2.Queries
Share.Web.UCM.SyncV2.Types
Unison.PrettyPrintEnvDecl.Postgres
Unison.Server.NameSearch.Postgres
Unison.Server.Share.Definitions
Expand Down
131 changes: 131 additions & 0 deletions sql/2025-01-31_dependencies-of-causal.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
-- Takes a causal_id and returns a table of ALL hashes which are dependencies of that causal.
CREATE OR REPLACE FUNCTION dependencies_of_causals(the_causal_ids INTEGER[]) RETURNS TABLE (hash TEXT) AS $$
WITH RECURSIVE all_causals(causal_id, causal_hash, causal_namespace_hash_id) AS (
-- Base causal
SELECT DISTINCT causal.id, causal.hash, causal.namespace_hash_id
FROM UNNEST(the_causal_ids) AS causal_id
JOIN causals causal ON causal.id = causal_id
UNION
-- This nested CTE is required because RECURSIVE CTEs can't refer
-- to the recursive table more than once.
-- I don't fully understand why or how this works, but it does
( WITH rec AS (
SELECT tc.causal_id, tc.causal_namespace_hash_id
FROM all_causals tc
)
SELECT ancestor_causal.id, ancestor_causal.hash, ancestor_causal.namespace_hash_id
FROM causal_ancestors ca
JOIN rec tc ON ca.causal_id = tc.causal_id
JOIN causals ancestor_causal ON ca.ancestor_id = ancestor_causal.id
UNION
SELECT child_causal.id, child_causal.hash, child_causal.namespace_hash_id
FROM rec tc
JOIN namespace_children nc ON tc.causal_namespace_hash_id = nc.parent_namespace_hash_id
JOIN causals child_causal ON nc.child_causal_id = child_causal.id
)
), all_namespaces(namespace_hash_id, namespace_hash) AS (
SELECT DISTINCT tc.causal_namespace_hash_id AS namespace_hash_id, bh.base32 as namespace_hash
FROM all_causals tc
JOIN branch_hashes bh ON tc.causal_namespace_hash_id = bh.id
), all_patches(patch_id, patch_hash) AS (
SELECT DISTINCT patch.id, patch.hash
FROM all_namespaces an
JOIN namespace_patches np ON an.namespace_hash_id = np.namespace_hash_id
JOIN patches patch ON np.patch_id = patch.id
),
-- term components to start transitively joining dependencies to
base_term_components(component_hash_id) AS (
SELECT DISTINCT term.component_hash_id
FROM all_namespaces an
JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id
JOIN terms term ON nt.term_id = term.id
UNION
SELECT DISTINCT term.component_hash_id
FROM all_patches ap
JOIN patch_term_mappings ptm ON ap.patch_id = ptm.patch_id
JOIN terms term ON ptm.to_term_id = term.id
UNION
-- term metadata
SELECT DISTINCT term.component_hash_id
FROM all_namespaces an
JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id
JOIN namespace_term_metadata meta ON nt.id = meta.named_term
JOIN terms term ON meta.metadata_term_id = term.id
UNION
-- type metadata
SELECT DISTINCT term.component_hash_id
FROM all_namespaces an
JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id
JOIN namespace_type_metadata meta ON nt.id = meta.named_type
JOIN terms term ON meta.metadata_term_id = term.id
),
-- type components to start transitively joining dependencies to
base_type_components(component_hash_id) AS (
SELECT DISTINCT typ.component_hash_id
FROM all_namespaces an
JOIN namespace_types nt ON an.namespace_hash_id = nt.namespace_hash_id
JOIN types typ ON nt.type_id = typ.id
UNION
SELECT DISTINCT typ.component_hash_id
FROM all_namespaces an
JOIN namespace_terms nt ON an.namespace_hash_id = nt.namespace_hash_id
JOIN constructors con ON nt.constructor_id = con.id
JOIN types typ ON con.type_id = typ.id
UNION
SELECT DISTINCT typ.component_hash_id
FROM all_patches ap
JOIN patch_type_mappings ptm ON ap.patch_id = ptm.patch_id
JOIN types typ ON ptm.to_type_id = typ.id
UNION
SELECT DISTINCT typ.component_hash_id
FROM all_patches ap
JOIN patch_constructor_mappings pcm ON ap.patch_id = pcm.patch_id
JOIN constructors con ON pcm.to_constructor_id = con.id
JOIN types typ ON con.type_id = typ.id
),
-- All the dependencies we join in transitively from the known term & type components we depend on.
all_components(component_hash_id) AS (
SELECT DISTINCT btc.component_hash_id
FROM base_term_components btc
UNION
SELECT DISTINCT btc.component_hash_id
FROM base_type_components btc
UNION
( WITH rec AS (
SELECT DISTINCT ac.component_hash_id
FROM all_components ac
)
-- recursively union in term dependencies
SELECT DISTINCT ref.component_hash_id
FROM rec atc
-- This joins in ALL the terms from the component, not just the one that caused the dependency on the
-- component
JOIN terms term ON atc.component_hash_id = term.component_hash_id
JOIN term_local_component_references ref ON term.id = ref.term_id
UNION
-- recursively union in type dependencies
SELECT DISTINCT ref.component_hash_id
FROM rec atc
-- This joins in ALL the types from the component, not just the one that caused the dependency on the
-- component
JOIN types typ ON atc.component_hash_id = typ.component_hash_id
JOIN type_local_component_references ref ON typ.id = ref.type_id
)
)
(SELECT ch.base32 AS hash
FROM all_components ac
JOIN component_hashes ch ON ac.component_hash_id = ch.id
)
UNION ALL
(SELECT ap.patch_hash AS hash
FROM all_patches ap
)
UNION ALL
(SELECT an.namespace_hash AS hash
FROM all_namespaces an
)
UNION ALL
(SELECT ac.causal_hash AS hash
FROM all_causals ac
)
$$ LANGUAGE SQL;
148 changes: 103 additions & 45 deletions src/Share/Web/UCM/SyncV2/Impl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import Codec.Serialise qualified as CBOR
import Conduit qualified as C
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TBMQueue qualified as STM
import Control.Monad.Except (ExceptT (ExceptT))
import Control.Monad.Except (ExceptT (ExceptT), withExceptT)
import Control.Monad.Trans.Except (runExceptT)
import Data.Binary.Builder qualified as Builder
import Data.Vector (Vector)
import Data.Set qualified as Set
import Data.Text.Encoding qualified as Text
import Data.Vector qualified as Vector
import Ki.Unlifted qualified as Ki
import Servant
import Servant.Conduit (ConduitToSourceIO (..))
import Servant.Types.SourceT (SourceT (..))
Expand All @@ -33,14 +35,15 @@ import Share.Web.Authorization qualified as AuthZ
import Share.Web.Errors
import Share.Web.UCM.Sync.HashJWT qualified as HashJWT
import Share.Web.UCM.SyncV2.Queries qualified as SSQ
import Share.Web.UCM.SyncV2.Types (IsCausalSpine (..), IsLibRoot (..))
import U.Codebase.Sqlite.Orphans ()
import Unison.Debug qualified as Debug
import Unison.Hash32 (Hash32)
import Unison.Share.API.Hash (HashJWTClaims (..))
import Unison.SyncV2.API qualified as SyncV2
import Unison.SyncV2.Types (DownloadEntitiesChunk (..), EntityChunk (..), ErrorChunk (..), StreamInitInfo (..))
import Unison.SyncV2.Types (CausalDependenciesChunk (..), DependencyType (..), DownloadEntitiesChunk (..), EntityChunk (..), ErrorChunk (..), StreamInitInfo (..))
import Unison.SyncV2.Types qualified as SyncV2
import UnliftIO qualified
import UnliftIO.Async qualified as Async

batchSize :: Int32
batchSize = 1000
Expand All @@ -51,7 +54,8 @@ streamSettings rootCausalHash rootBranchRef = StreamInitInfo {version = SyncV2.V
server :: Maybe UserId -> SyncV2.Routes WebAppServer
server mayUserId =
SyncV2.Routes
{ downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId
{ downloadEntitiesStream = downloadEntitiesStreamImpl mayUserId,
causalDependenciesStream = causalDependenciesStreamImpl mayUserId
}

parseBranchRef :: SyncV2.BranchRef -> Either Text (Either ProjectReleaseShortHand ProjectBranchShortHand)
Expand All @@ -66,30 +70,16 @@ parseBranchRef (SyncV2.BranchRef branchRef) =
parseRelease = fmap Left . eitherToMaybe $ IDs.fromText @ProjectReleaseShortHand branchRef

downloadEntitiesStreamImpl :: Maybe UserId -> SyncV2.DownloadEntitiesRequest -> WebApp (SourceIO (SyncV2.CBORStream SyncV2.DownloadEntitiesChunk))
downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes = _todo}) = do
downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {causalHash = causalHashJWT, branchRef, knownHashes}) = do
either emitErr id <$> runExceptT do
addRequestTag "branch-ref" (SyncV2.unBranchRef branchRef)
HashJWTClaims {hash = causalHash} <- lift (HashJWT.verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure)
codebase <-
case parseBranchRef branchRef of
Left err -> throwError (SyncV2.DownloadEntitiesInvalidBranchRef err branchRef)
Right (Left (ProjectReleaseShortHand {userHandle, projectSlug})) -> do
let projectShortHand = ProjectShortHand {userHandle, projectSlug}
(Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do
project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (SyncV2.DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand)
pure (project, Nothing)
authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef)
let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
pure $ Codebase.codebaseEnv authZToken codebaseLoc
Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do
let projectShortHand = ProjectShortHand {userHandle, projectSlug}
(Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do
project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (SyncV2.DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand)
mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (PGQ.userByHandle ch) `whenNothingM` throwError (SyncV2.DownloadEntitiesUserNotFound $ IDs.toText @UserHandle ch)
pure (project, mayContributorUserId)
authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (SyncV2.DownloadEntitiesNoReadPermission branchRef)
let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
pure $ Codebase.codebaseEnv authZToken codebaseLoc
flip withExceptT (codebaseForBranchRef branchRef) \case
CodebaseLoadingErrorProjectNotFound projectShortHand -> SyncV2.DownloadEntitiesProjectNotFound (IDs.toText projectShortHand)
CodebaseLoadingErrorUserNotFound userHandle -> SyncV2.DownloadEntitiesUserNotFound (IDs.toText userHandle)
CodebaseLoadingErrorNoReadPermission branchRef -> SyncV2.DownloadEntitiesNoReadPermission branchRef
CodebaseLoadingErrorInvalidBranchRef err branchRef -> SyncV2.DownloadEntitiesInvalidBranchRef err branchRef
q <- UnliftIO.atomically $ do
q <- STM.newTBMQueue 10
STM.writeTBMQueue q (Vector.singleton $ InitialC $ streamSettings causalHash (Just branchRef))
Expand All @@ -98,39 +88,107 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus
Logging.logInfoText "Starting download entities stream"
Codebase.runCodebaseTransaction codebase $ do
(_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash)
cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId
let knownCausalHashes = Set.map hash32ToCausalHash knownHashes
cursor <- SSQ.allSerializedDependenciesOfCausalCursor causalId knownCausalHashes
Cursor.foldBatched cursor batchSize \batch -> do
let entityChunkBatch = batch <&> \(entityCBOR, hash) -> EntityC (EntityChunk {hash, entityCBOR})
PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q entityChunkBatch
PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q
pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
stream q
queueToStream q
where
stream :: STM.TBMQueue (Vector DownloadEntitiesChunk) -> C.ConduitT () (SyncV2.CBORStream DownloadEntitiesChunk) IO ()
stream q = do
let loop :: C.ConduitT () (SyncV2.CBORStream DownloadEntitiesChunk) IO ()
loop = do
liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case
-- The queue is closed.
Nothing -> do
pure ()
Just batches -> do
batches
& foldMap (CBOR.serialiseIncremental)
& (SyncV2.CBORStream . Builder.toLazyByteString)
& C.yield
loop

loop

emitErr :: SyncV2.DownloadEntitiesError -> SourceIO (SyncV2.CBORStream SyncV2.DownloadEntitiesChunk)
emitErr err = SourceT.source [SyncV2.CBORStream . CBOR.serialise $ ErrorC (ErrorChunk err)]

causalDependenciesStreamImpl :: Maybe UserId -> SyncV2.CausalDependenciesRequest -> WebApp (SourceIO (SyncV2.CBORStream SyncV2.CausalDependenciesChunk))
causalDependenciesStreamImpl mayCallerUserId (SyncV2.CausalDependenciesRequest {rootCausal = causalHashJWT, branchRef}) = do
respondExceptT do
addRequestTag "branch-ref" (SyncV2.unBranchRef branchRef)
HashJWTClaims {hash = causalHash} <- lift (HashJWT.verifyHashJWT mayCallerUserId causalHashJWT >>= either respondError pure)
addRequestTag "root-causal" (tShow causalHash)
codebase <- codebaseForBranchRef branchRef
q <- UnliftIO.atomically $ STM.newTBMQueue 10
streamResults <- lift $ UnliftIO.toIO do
Logging.logInfoText "Starting causal dependencies stream"
Codebase.runCodebaseTransaction codebase $ do
(_bhId, causalId) <- CausalQ.expectCausalIdsOf id (hash32ToCausalHash causalHash)
Debug.debugLogM Debug.Temp "Getting cursor"
cursor <- SSQ.spineAndLibDependenciesOfCausalCursor causalId
Debug.debugLogM Debug.Temp "Folding cursor"
Cursor.foldBatched cursor batchSize \batch -> do
Debug.debugLogM Debug.Temp "Got batch"
let depBatch =
batch <&> \(causalHash, isCausalSpine, isLibRoot) ->
let dependencyType = case (isCausalSpine, isLibRoot) of
(IsCausalSpine, _) -> CausalSpineDependency
(_, IsLibRoot) -> LibDependency
_ -> error $ "Causal dependency which is neither spine nor lib root: " <> show causalHash
in CausalHashDepC {causalHash, dependencyType}
PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q depBatch
PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q
pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do
queueToStream q

queueToStream :: forall a f. (CBOR.Serialise a, Foldable f) => STM.TBMQueue (f a) -> C.ConduitT () (SyncV2.CBORStream a) IO ()
queueToStream q = do
let loop :: C.ConduitT () (SyncV2.CBORStream a) IO ()
loop = do
liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case
-- The queue is closed.
Nothing -> do
pure ()
Just batches -> do
batches
& foldMap (CBOR.serialiseIncremental)
& (SyncV2.CBORStream . Builder.toLazyByteString)
& C.yield
loop
loop

data CodebaseLoadingError
= CodebaseLoadingErrorProjectNotFound ProjectShortHand
| CodebaseLoadingErrorUserNotFound UserHandle
| CodebaseLoadingErrorNoReadPermission SyncV2.BranchRef
| CodebaseLoadingErrorInvalidBranchRef Text SyncV2.BranchRef
deriving stock (Show)
deriving (Logging.Loggable) via Logging.ShowLoggable Logging.UserFault CodebaseLoadingError

instance ToServerError CodebaseLoadingError where
toServerError = \case
CodebaseLoadingErrorProjectNotFound projectShortHand -> (ErrorID "codebase-loading:project-not-found", Servant.err404 {errBody = from . Text.encodeUtf8 $ "Project not found: " <> (IDs.toText projectShortHand)})
CodebaseLoadingErrorUserNotFound userHandle -> (ErrorID "codebase-loading:user-not-found", Servant.err404 {errBody = from . Text.encodeUtf8 $ "User not found: " <> (IDs.toText userHandle)})
CodebaseLoadingErrorNoReadPermission branchRef -> (ErrorID "codebase-loading:no-read-permission", Servant.err403 {errBody = from . Text.encodeUtf8 $ "No read permission for branch ref: " <> (SyncV2.unBranchRef branchRef)})
CodebaseLoadingErrorInvalidBranchRef err branchRef -> (ErrorID "codebase-loading:invalid-branch-ref", Servant.err400 {errBody = from . Text.encodeUtf8 $ "Invalid branch ref: " <> err <> " " <> (SyncV2.unBranchRef branchRef)})

codebaseForBranchRef :: SyncV2.BranchRef -> (ExceptT CodebaseLoadingError WebApp Codebase.CodebaseEnv)
codebaseForBranchRef branchRef = do
case parseBranchRef branchRef of
Left err -> throwError (CodebaseLoadingErrorInvalidBranchRef err branchRef)
Right (Left (ProjectReleaseShortHand {userHandle, projectSlug})) -> do
let projectShortHand = ProjectShortHand {userHandle, projectSlug}
(Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do
project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound $ projectShortHand)
pure (project, Nothing)
authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef)
let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
pure $ Codebase.codebaseEnv authZToken codebaseLoc
Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do
let projectShortHand = ProjectShortHand {userHandle, projectSlug}
(Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do
project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound projectShortHand)
mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (PGQ.userByHandle ch) `whenNothingM` throwError (CodebaseLoadingErrorUserNotFound ch)
pure (project, mayContributorUserId)
authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef)
let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId
pure $ Codebase.codebaseEnv authZToken codebaseLoc

-- | Run an IO action in the background while streaming the results.
--
-- Servant doesn't provide any easier way to do bracketing like this, all the IO must be
-- inside the SourceIO somehow.
sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r
sourceIOWithAsync action (SourceT k) =
SourceT \k' ->
Async.withAsync action \_ -> k k'
Ki.scoped \scope -> do
_ <- Ki.fork scope action
k k'
Loading
Loading