Skip to content

Commit 16521ee

Browse files
committed
store: Do not use a fdw connection to check active_copies
Copying checks the active_copies table through the primary_public.active_copies foreign table to determine whether the copy has been cancelled and it should stop. With a large number of copies running, that causes a large number of postgres_fdw connections into the primary, which can overwhelm the primary. Instead, we now pass the connection pool for the primary into the copy code so that it can do this check without involving postgres_fdw.
1 parent 1a2aaf3 commit 16521ee

File tree

4 files changed

+90
-58
lines changed

4 files changed

+90
-58
lines changed

store/postgres/src/copy.rs

Lines changed: 21 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ use graph::{
3737
info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS,
3838
},
3939
schema::EntityType,
40-
slog::{debug, error},
40+
slog::error,
4141
tokio,
4242
};
4343
use itertools::Itertools;
4444

4545
use crate::{
4646
advisory_lock, catalog, deployment,
4747
dynds::DataSourcesTable,
48-
primary::{DeploymentId, Site},
48+
primary::{DeploymentId, Primary, Site},
4949
relational::index::IndexList,
5050
vid_batcher::{VidBatcher, VidRange},
5151
};
@@ -104,46 +104,6 @@ table! {
104104
}
105105
}
106106

107-
// This is the same as primary::active_copies, but mapped into each shard
108-
table! {
109-
primary_public.active_copies(dst) {
110-
src -> Integer,
111-
dst -> Integer,
112-
cancelled_at -> Nullable<Date>,
113-
}
114-
}
115-
116-
/// Return `true` if the site is the source of a copy operation. The copy
117-
/// operation might be just queued or in progress already. This method will
118-
/// block until a fdw connection becomes available.
119-
pub fn is_source(logger: &Logger, pool: &ConnectionPool, site: &Site) -> Result<bool, StoreError> {
120-
use active_copies as ac;
121-
122-
// We use a fdw connection to check if the site is being copied. If we
123-
// used an ordinary connection and there are many calls to this method,
124-
// postgres_fdw might open an unmanageable number of connections into
125-
// the primary, which makes the primary run out of connections
126-
let mut last_log = Instant::now();
127-
let mut conn = pool.get_fdw(&logger, || {
128-
if last_log.elapsed() > LOG_INTERVAL {
129-
last_log = Instant::now();
130-
debug!(
131-
logger,
132-
"Waiting for fdw connection to check if site {} is being copied", site.namespace
133-
);
134-
}
135-
false
136-
})?;
137-
138-
select(diesel::dsl::exists(
139-
ac::table
140-
.filter(ac::src.eq(site.id))
141-
.filter(ac::cancelled_at.is_null()),
142-
))
143-
.get_result::<bool>(&mut conn)
144-
.map_err(StoreError::from)
145-
}
146-
147107
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
148108
pub enum Status {
149109
Finished,
@@ -161,6 +121,7 @@ struct CopyState {
161121
impl CopyState {
162122
fn new(
163123
conn: &mut PgConnection,
124+
primary: Primary,
164125
src: Arc<Layout>,
165126
dst: Arc<Layout>,
166127
target_block: BlockPtr,
@@ -199,21 +160,22 @@ impl CopyState {
199160
src.site.id
200161
));
201162
}
202-
Self::load(conn, src, dst, target_block)
163+
Self::load(conn, primary, src, dst, target_block)
203164
}
204-
None => Self::create(conn, src, dst, target_block),
165+
None => Self::create(conn, primary.cheap_clone(), src, dst, target_block),
205166
}?;
206167

207168
Ok(state)
208169
}
209170

210171
fn load(
211172
conn: &mut PgConnection,
173+
primary: Primary,
212174
src: Arc<Layout>,
213175
dst: Arc<Layout>,
214176
target_block: BlockPtr,
215177
) -> Result<CopyState, StoreError> {
216-
let tables = TableState::load(conn, src.as_ref(), dst.as_ref())?;
178+
let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref())?;
217179
let (finished, mut unfinished): (Vec<_>, Vec<_>) =
218180
tables.into_iter().partition(|table| table.finished());
219181
unfinished.sort_by_key(|table| table.dst.object.to_string());
@@ -228,6 +190,7 @@ impl CopyState {
228190

229191
fn create(
230192
conn: &mut PgConnection,
193+
primary: Primary,
231194
src: Arc<Layout>,
232195
dst: Arc<Layout>,
233196
target_block: BlockPtr,
@@ -253,6 +216,7 @@ impl CopyState {
253216
.map(|src_table| {
254217
TableState::init(
255218
conn,
219+
primary.cheap_clone(),
256220
dst.site.clone(),
257221
&src,
258222
src_table.clone(),
@@ -354,6 +318,7 @@ pub(crate) fn source(
354318
/// transformation. See `CopyEntityBatchQuery` for the details of what
355319
/// exactly that means
356320
struct TableState {
321+
primary: Primary,
357322
src: Arc<Table>,
358323
dst: Arc<Table>,
359324
dst_site: Arc<Site>,
@@ -364,6 +329,7 @@ struct TableState {
364329
impl TableState {
365330
fn init(
366331
conn: &mut PgConnection,
332+
primary: Primary,
367333
dst_site: Arc<Site>,
368334
src_layout: &Layout,
369335
src: Arc<Table>,
@@ -373,6 +339,7 @@ impl TableState {
373339
let vid_range = VidRange::for_copy(conn, &src, target_block)?;
374340
let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?;
375341
Ok(Self {
342+
primary,
376343
src,
377344
dst,
378345
dst_site,
@@ -387,6 +354,7 @@ impl TableState {
387354

388355
fn load(
389356
conn: &mut PgConnection,
357+
primary: Primary,
390358
src_layout: &Layout,
391359
dst_layout: &Layout,
392360
) -> Result<Vec<TableState>, StoreError> {
@@ -450,6 +418,7 @@ impl TableState {
450418
.with_batch_size(size as usize);
451419

452420
Ok(TableState {
421+
primary: primary.cheap_clone(),
453422
src,
454423
dst,
455424
dst_site: dst_layout.site.clone(),
@@ -516,13 +485,8 @@ impl TableState {
516485
}
517486

518487
fn is_cancelled(&self, conn: &mut PgConnection) -> Result<bool, StoreError> {
519-
use active_copies as ac;
520-
521488
let dst = self.dst_site.as_ref();
522-
let canceled = ac::table
523-
.filter(ac::dst.eq(dst.id))
524-
.select(ac::cancelled_at.is_not_null())
525-
.get_result::<bool>(conn)?;
489+
let canceled = self.primary.is_copy_cancelled(dst)?;
526490
if canceled {
527491
use copy_state as cs;
528492

@@ -893,6 +857,7 @@ pub struct Connection {
893857
/// `self.transaction`
894858
conn: Option<PooledPgConnection>,
895859
pool: ConnectionPool,
860+
primary: Primary,
896861
workers: usize,
897862
src: Arc<Layout>,
898863
dst: Arc<Layout>,
@@ -910,6 +875,7 @@ impl Connection {
910875
/// is available.
911876
pub fn new(
912877
logger: &Logger,
878+
primary: Primary,
913879
pool: ConnectionPool,
914880
src: Arc<Layout>,
915881
dst: Arc<Layout>,
@@ -942,6 +908,7 @@ impl Connection {
942908
logger,
943909
conn,
944910
pool,
911+
primary,
945912
workers: ENV_VARS.store.batch_workers,
946913
src,
947914
dst,
@@ -1079,7 +1046,9 @@ impl Connection {
10791046
let src = self.src.clone();
10801047
let dst = self.dst.clone();
10811048
let target_block = self.target_block.clone();
1082-
let mut state = self.transaction(|conn| CopyState::new(conn, src, dst, target_block))?;
1049+
let primary = self.primary.cheap_clone();
1050+
let mut state =
1051+
self.transaction(|conn| CopyState::new(conn, primary, src, dst, target_block))?;
10831052

10841053
let progress = Arc::new(CopyProgress::new(self.logger.cheap_clone(), &state));
10851054
progress.start();

store/postgres/src/deployment_store.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
5151
use crate::deployment::{self, OnSync};
5252
use crate::detail::ErrorDetail;
5353
use crate::dynds::DataSourcesTable;
54-
use crate::primary::DeploymentId;
54+
use crate::primary::{DeploymentId, Primary};
5555
use crate::relational::index::{CreateIndex, IndexList, Method};
5656
use crate::relational::{Layout, LayoutCache, SqlName, Table};
5757
use crate::relational_queries::FromEntityData;
58-
use crate::{advisory_lock, catalog, copy, retry};
58+
use crate::{advisory_lock, catalog, retry};
5959
use crate::{connection_pool::ConnectionPool, detail};
6060
use crate::{dynds, primary::Site};
6161

@@ -93,6 +93,8 @@ type PruneHandle = JoinHandle<Result<(), StoreError>>;
9393
pub struct StoreInner {
9494
logger: Logger,
9595

96+
primary: Primary,
97+
9698
pool: ConnectionPool,
9799
read_only_pools: Vec<ConnectionPool>,
98100

@@ -130,6 +132,7 @@ impl Deref for DeploymentStore {
130132
impl DeploymentStore {
131133
pub fn new(
132134
logger: &Logger,
135+
primary: Primary,
133136
pool: ConnectionPool,
134137
read_only_pools: Vec<ConnectionPool>,
135138
mut pool_weights: Vec<usize>,
@@ -160,6 +163,7 @@ impl DeploymentStore {
160163
// Create the store
161164
let store = StoreInner {
162165
logger: logger.clone(),
166+
primary,
163167
pool,
164168
read_only_pools,
165169
replica_order,
@@ -1235,7 +1239,7 @@ impl DeploymentStore {
12351239
req: PruneRequest,
12361240
) -> Result<(), StoreError> {
12371241
{
1238-
if copy::is_source(&logger, &store.pool, &site)? {
1242+
if store.is_source(&site)? {
12391243
debug!(
12401244
logger,
12411245
"Skipping pruning since this deployment is being copied"
@@ -1520,6 +1524,7 @@ impl DeploymentStore {
15201524
// with the corresponding tables in `self`
15211525
let copy_conn = crate::copy::Connection::new(
15221526
logger,
1527+
self.primary.cheap_clone(),
15231528
self.pool.clone(),
15241529
src.clone(),
15251530
dst.clone(),
@@ -1848,6 +1853,10 @@ impl DeploymentStore {
18481853
})
18491854
.await
18501855
}
1856+
1857+
fn is_source(&self, site: &Site) -> Result<bool, StoreError> {
1858+
self.primary.is_source(site)
1859+
}
18511860
}
18521861

18531862
/// Tries to fetch a [`Table`] either by its Entity name or its SQL name.

store/postgres/src/primary.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use graph::{
3636
store::scalar::ToPrimitive,
3737
subgraph::{status, DeploymentFeatures},
3838
},
39+
derive::CheapClone,
3940
prelude::{
4041
anyhow,
4142
chrono::{DateTime, Utc},
@@ -53,9 +54,9 @@ use maybe_owned::MaybeOwnedMut;
5354
use std::{
5455
borrow::Borrow,
5556
collections::HashMap,
56-
convert::TryFrom,
57-
convert::TryInto,
57+
convert::{TryFrom, TryInto},
5858
fmt,
59+
sync::Arc,
5960
time::{SystemTime, UNIX_EPOCH},
6061
};
6162

@@ -1826,6 +1827,52 @@ impl<'a> Connection<'a> {
18261827
}
18271828
}
18281829

1830+
/// A limited interface to query the primary database.
1831+
#[derive(Clone, CheapClone)]
1832+
pub struct Primary {
1833+
pool: Arc<ConnectionPool>,
1834+
}
1835+
1836+
impl Primary {
1837+
pub fn new(pool: Arc<ConnectionPool>) -> Self {
1838+
// This really indicates a programming error
1839+
if pool.shard != *PRIMARY_SHARD {
1840+
panic!("Primary pool must be the primary shard");
1841+
}
1842+
1843+
Primary { pool }
1844+
}
1845+
1846+
/// Return `true` if the site is the source of a copy operation. The copy
1847+
/// operation might be just queued or in progress already. This method will
1848+
/// block until a fdw connection becomes available.
1849+
pub fn is_source(&self, site: &Site) -> Result<bool, StoreError> {
1850+
use active_copies as ac;
1851+
1852+
let mut conn = self.pool.get()?;
1853+
1854+
select(diesel::dsl::exists(
1855+
ac::table
1856+
.filter(ac::src.eq(site.id))
1857+
.filter(ac::cancelled_at.is_null()),
1858+
))
1859+
.get_result::<bool>(&mut conn)
1860+
.map_err(StoreError::from)
1861+
}
1862+
1863+
pub fn is_copy_cancelled(&self, dst: &Site) -> Result<bool, StoreError> {
1864+
use active_copies as ac;
1865+
1866+
let mut conn = self.pool.get()?;
1867+
1868+
ac::table
1869+
.filter(ac::dst.eq(dst.id))
1870+
.select(ac::cancelled_at.is_not_null())
1871+
.get_result::<bool>(&mut conn)
1872+
.map_err(StoreError::from)
1873+
}
1874+
}
1875+
18291876
/// Return `true` if we deem this installation to be empty, defined as
18301877
/// having no deployments and no subgraph names in the database
18311878
pub fn is_empty(conn: &mut PgConnection) -> Result<bool, StoreError> {

store/postgres/src/subgraph_store.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use graph::{
3939
use crate::{
4040
connection_pool::ConnectionPool,
4141
deployment::{OnSync, SubgraphHealth},
42-
primary::{self, DeploymentId, Mirror as PrimaryMirror, Site},
42+
primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site},
4343
relational::{
4444
index::{IndexList, Method},
4545
Layout,
@@ -360,6 +360,12 @@ impl SubgraphStoreInner {
360360
sender: Arc<NotificationSender>,
361361
registry: Arc<MetricsRegistry>,
362362
) -> Self {
363+
let primary = stores
364+
.iter()
365+
.find(|(name, _, _, _)| name == &*PRIMARY_SHARD)
366+
.map(|(_, pool, _, _)| Primary::new(Arc::new(pool.clone())))
367+
.expect("primary shard must be present");
368+
363369
let mirror = {
364370
let pools = HashMap::from_iter(
365371
stores
@@ -376,6 +382,7 @@ impl SubgraphStoreInner {
376382
name,
377383
Arc::new(DeploymentStore::new(
378384
&logger,
385+
primary.cheap_clone(),
379386
main_pool,
380387
read_only_pools,
381388
weights,

0 commit comments

Comments
 (0)