Skip to content

Commit 8ff108e

Browse files
iamirzhanfacebook-github-bot
authored andcommitted
Abstract Batch downloader from glean db
Summary: Add `BatchLocationParser` to db config to add the cas dependency at runtime. Reviewed By: pepeiborra Differential Revision: D70560447 fbshipit-source-id: 4f4e551efe157277e320578cfc3c8af2bab9135e
1 parent 78679d4 commit 8ff108e

File tree

6 files changed

+64
-2
lines changed

6 files changed

+64
-2
lines changed

glean.cabal.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ library db
470470
Glean.Database.Backup.Backend
471471
Glean.Database.Backup.Locator
472472
Glean.Database.Backup.Mock
473+
Glean.Database.BatchLocation
473474
Glean.Database.Catalog.Filter
474475
Glean.Database.Catalog
475476
Glean.Database.Catalog.Local.Files
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{-
2+
Copyright (c) Meta Platforms, Inc. and affiliates.
3+
All rights reserved.
4+
5+
This source code is licensed under the BSD-style license found in the
6+
LICENSE file in the root directory of this source tree.
7+
-}
8+
9+
module Glean.Database.BatchLocation
10+
( Parser(..)
11+
, Location(..)
12+
, DefaultParser(..)
13+
) where
14+
15+
import Glean.Types as Thrift
16+
import Data.Text (Text)
17+
import Glean.Util.Some
18+
import Control.Exception
19+
20+
-- | A batch's location parser
21+
class Parser a where
22+
-- | Parse a string returning a location to get the batch from
23+
fromString :: a -> Text -> Some Location
24+
25+
instance Parser (Some Parser) where
26+
fromString (Some parser) = fromString parser
27+
28+
-- | A batch's location to download a batch from
29+
class Location a where
30+
downloadBatch :: a -> Thrift.BatchFormat -> IO Thrift.Batch
31+
32+
instance Location (Some Location) where
33+
downloadBatch (Some location) format = downloadBatch location format
34+
35+
data DefaultParser = DefaultParser
36+
37+
instance Parser DefaultParser where
38+
fromString _ _ = throw $
39+
Thrift.Exception "Batch location's parser is not implemented"

glean/db/Glean/Database/Config.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import Util.Log (logInfo)
6363
import Glean.Angle.Types
6464
import qualified Glean.Database.Backup.Backend as Backup -- from glean/util
6565
import qualified Glean.Database.Backup.Mock as Backup.Mock
66+
import qualified Glean.Database.BatchLocation as BatchLocation
6667
import Glean.Database.Catalog (Catalog)
6768
import qualified Glean.Database.Catalog.Local.Files as Catalog
6869
import qualified Glean.Database.Catalog.Local.Memory as Catalog
@@ -155,6 +156,8 @@ data Config = Config
155156
-- ^ Logger for recording stats of databases produced
156157
, cfgBackupBackends :: HashMap Text (Some Backup.Backend)
157158
-- ^ Backup backends
159+
, cfgBatchLocationParser :: Some BatchLocation.Parser
160+
-- ^ Batch's location parser
158161
, cfgEnableRecursion :: Bool
159162
-- ^ Enable experimental support for recursion
160163
, cfgFilterAvailableDBs :: [Repo] -> IO [Repo]
@@ -202,6 +205,7 @@ instance Default Config where
202205
, cfgServerLogger = Some NullGleanServerLogger
203206
, cfgDatabaseLogger = Some NullGleanDatabaseLogger
204207
, cfgBackupBackends = HashMap.fromList [("mock", Backup.Mock.mock)]
208+
, cfgBatchLocationParser = Some BatchLocation.DefaultParser
205209
, cfgEnableRecursion = False
206210
, cfgFilterAvailableDBs = const $ return []
207211
, cfgTracer = mempty
@@ -454,6 +458,7 @@ options = do
454458
, cfgServerLogger = cfgServerLogger def
455459
, cfgDatabaseLogger = cfgDatabaseLogger def
456460
, cfgBackupBackends = cfgBackupBackends def
461+
, cfgBatchLocationParser = cfgBatchLocationParser def
457462
, cfgFilterAvailableDBs = const $ return []
458463
, cfgTracer = mempty
459464
, cfgSchemaId = Nothing

glean/db/Glean/Database/Env.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ initEnv evb envStorage envCatalog shardManager cfg
137137
{ envEventBase = evb
138138
, envServerLogger = cfgServerLogger cfg
139139
, envDatabaseLogger = cfgDatabaseLogger cfg
140+
, envBatchLocationParser = cfgBatchLocationParser cfg
140141
, envReadOnly = cfgReadOnly cfg
141142
, envMockWrites = cfgMockWrites cfg
142143
, envListener = cfgListener cfg

glean/db/Glean/Database/Types.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import Util.STM
3737

3838
import Glean.Angle.Types hiding (describe)
3939
import qualified Glean.Database.Backup.Backend as Backup
40+
import qualified Glean.Database.BatchLocation as BatchLocation
4041
import Glean.Database.Catalog (Catalog)
4142
import Glean.Database.Config
4243
import Glean.Database.Meta
@@ -224,6 +225,7 @@ data Env = forall storage. Storage storage => Env
224225
{ envEventBase :: EventBaseDataplane
225226
, envServerLogger :: Some GleanServerLogger
226227
, envDatabaseLogger :: Some GleanDatabaseLogger
228+
, envBatchLocationParser :: Some BatchLocation.Parser
227229
, envLoggerRateLimit :: RateLimiterMap Text
228230
, envCatalog :: Catalog
229231
, envStorage :: storage

glean/db/Glean/Database/Writes.hs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ import Util.Defer
7171
import Util.Log
7272
import Util.STM
7373

74+
import Glean.Database.BatchLocation as BatchLocation
7475
import Glean.Database.Exception
7576
import Glean.Database.Open
7677
import Glean.Database.Schema.Types
@@ -377,11 +378,11 @@ enqueueBatchDescriptor env repo enqueueBatch waitPolicy = do
377378
traceMsg (envTracer env)
378379
(GleanTraceEnqueue repo EnqueueBatchDescriptor 0) $ do
379380
handle <- UUID.toText <$> UUID.nextRandom
380-
_ <- case enqueueBatch of
381+
descriptor <- case enqueueBatch of
381382
Thrift.EnqueueBatch_descriptor descriptor -> return descriptor
382383
Thrift.EnqueueBatch_EMPTY -> throwIO $ Thrift.Exception "empty batch"
383384
write <- enqueueWrite env repo 0 Nothing False $
384-
throwIO $ Exception "not implemented"
385+
writeContentFromBatch <$> downloadBatchFromLocation env descriptor
385386
when (waitPolicy == Thrift.EnqueueBatchWaitPolicy_Remember)
386387
$ rememberWrite env handle write
387388
return $ def { enqueueBatchResponse_handle = handle }
@@ -457,6 +458,19 @@ deleteWriteQueues env OpenDB{odbWriting = Just Writing{..}} = do
457458
-- get removed by the next write thread to encounter it.
458459
deleteWriteQueues _ _ = return ()
459460

461+
downloadBatchFromLocation
462+
:: Env
463+
-> Thrift.BatchDescriptor
464+
-> IO Thrift.Batch
465+
downloadBatchFromLocation Env{..} batchDescriptor =
466+
let
467+
batchFormat = batchDescriptor_format batchDescriptor
468+
locationString = batchDescriptor_location batchDescriptor
469+
location = BatchLocation.fromString envBatchLocationParser locationString
470+
in
471+
BatchLocation.downloadBatch location batchFormat
472+
473+
460474
k :: Int
461475
k = 1024
462476

0 commit comments

Comments
 (0)