Skip to content

store: Do not use a fdw connection to check active_copies #5938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 111 additions & 71 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ use graph::{
info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS,
},
schema::EntityType,
slog::{debug, error},
slog::error,
tokio,
};
use itertools::Itertools;

use crate::{
advisory_lock, catalog, deployment,
dynds::DataSourcesTable,
primary::{DeploymentId, Site},
primary::{DeploymentId, Primary, Site},
relational::index::IndexList,
vid_batcher::{VidBatcher, VidRange},
};
Expand All @@ -64,8 +64,6 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
/// the lag again
const REPLICATION_SLEEP: Duration = Duration::from_secs(10);

type PooledPgConnection = PooledConnection<ConnectionManager<PgConnection>>;

lazy_static! {
static ref STATEMENT_TIMEOUT: Option<String> = ENV_VARS
.store
Expand Down Expand Up @@ -104,46 +102,6 @@ table! {
}
}

// This is the same as primary::active_copies, but mapped into each shard
table! {
primary_public.active_copies(dst) {
src -> Integer,
dst -> Integer,
cancelled_at -> Nullable<Date>,
}
}

/// Return `true` if the site is the source of a copy operation. The copy
/// operation might be just queued or in progress already. This method will
/// block until a fdw connection becomes available.
pub fn is_source(logger: &Logger, pool: &ConnectionPool, site: &Site) -> Result<bool, StoreError> {
use active_copies as ac;

// We use a fdw connection to check if the site is being copied. If we
// used an ordinary connection and there are many calls to this method,
// postgres_fdw might open an unmanageable number of connections into
// the primary, which makes the primary run out of connections
let mut last_log = Instant::now();
let mut conn = pool.get_fdw(&logger, || {
if last_log.elapsed() > LOG_INTERVAL {
last_log = Instant::now();
debug!(
logger,
"Waiting for fdw connection to check if site {} is being copied", site.namespace
);
}
false
})?;

select(diesel::dsl::exists(
ac::table
.filter(ac::src.eq(site.id))
.filter(ac::cancelled_at.is_null()),
))
.get_result::<bool>(&mut conn)
.map_err(StoreError::from)
}

#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum Status {
Finished,
Expand All @@ -161,6 +119,7 @@ struct CopyState {
impl CopyState {
fn new(
conn: &mut PgConnection,
primary: Primary,
src: Arc<Layout>,
dst: Arc<Layout>,
target_block: BlockPtr,
Expand Down Expand Up @@ -199,21 +158,22 @@ impl CopyState {
src.site.id
));
}
Self::load(conn, src, dst, target_block)
Self::load(conn, primary, src, dst, target_block)
}
None => Self::create(conn, src, dst, target_block),
None => Self::create(conn, primary.cheap_clone(), src, dst, target_block),
}?;

Ok(state)
}

fn load(
conn: &mut PgConnection,
primary: Primary,
src: Arc<Layout>,
dst: Arc<Layout>,
target_block: BlockPtr,
) -> Result<CopyState, StoreError> {
let tables = TableState::load(conn, src.as_ref(), dst.as_ref())?;
let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref())?;
let (finished, mut unfinished): (Vec<_>, Vec<_>) =
tables.into_iter().partition(|table| table.finished());
unfinished.sort_by_key(|table| table.dst.object.to_string());
Expand All @@ -228,6 +188,7 @@ impl CopyState {

fn create(
conn: &mut PgConnection,
primary: Primary,
src: Arc<Layout>,
dst: Arc<Layout>,
target_block: BlockPtr,
Expand All @@ -253,6 +214,7 @@ impl CopyState {
.map(|src_table| {
TableState::init(
conn,
primary.cheap_clone(),
dst.site.clone(),
&src,
src_table.clone(),
Expand Down Expand Up @@ -354,6 +316,7 @@ pub(crate) fn source(
/// transformation. See `CopyEntityBatchQuery` for the details of what
/// exactly that means
struct TableState {
primary: Primary,
src: Arc<Table>,
dst: Arc<Table>,
dst_site: Arc<Site>,
Expand All @@ -364,6 +327,7 @@ struct TableState {
impl TableState {
fn init(
conn: &mut PgConnection,
primary: Primary,
dst_site: Arc<Site>,
src_layout: &Layout,
src: Arc<Table>,
Expand All @@ -373,6 +337,7 @@ impl TableState {
let vid_range = VidRange::for_copy(conn, &src, target_block)?;
let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?;
Ok(Self {
primary,
src,
dst,
dst_site,
Expand All @@ -387,6 +352,7 @@ impl TableState {

fn load(
conn: &mut PgConnection,
primary: Primary,
src_layout: &Layout,
dst_layout: &Layout,
) -> Result<Vec<TableState>, StoreError> {
Expand Down Expand Up @@ -450,6 +416,7 @@ impl TableState {
.with_batch_size(size as usize);

Ok(TableState {
primary: primary.cheap_clone(),
src,
dst,
dst_site: dst_layout.site.clone(),
Expand Down Expand Up @@ -516,13 +483,8 @@ impl TableState {
}

fn is_cancelled(&self, conn: &mut PgConnection) -> Result<bool, StoreError> {
use active_copies as ac;

let dst = self.dst_site.as_ref();
let canceled = ac::table
.filter(ac::dst.eq(dst.id))
.select(ac::cancelled_at.is_not_null())
.get_result::<bool>(conn)?;
let canceled = self.primary.is_copy_cancelled(dst)?;
if canceled {
use copy_state as cs;

Expand Down Expand Up @@ -703,17 +665,77 @@ impl From<Result<CopyTableWorker, StoreError>> for WorkerResult {
}
}

/// We pass connections back and forth between the control loop and various
/// workers. We need to make sure that we end up with the connection that
/// was used to acquire the copy lock in the right place so we can release
/// the copy lock which is only possible with the connection that acquired
/// it.
///
/// This struct helps us with that. It wraps a connection and tracks whether
/// the connection was used to acquire the copy lock
struct LockTrackingConnection {
inner: PooledConnection<ConnectionManager<PgConnection>>,
has_lock: bool,
}

impl LockTrackingConnection {
fn new(inner: PooledConnection<ConnectionManager<PgConnection>>) -> Self {
Self {
inner,
has_lock: false,
}
}

fn transaction<T, F>(&mut self, f: F) -> Result<T, StoreError>
where
F: FnOnce(&mut PgConnection) -> Result<T, StoreError>,
{
let conn = &mut self.inner;
conn.transaction(|conn| f(conn))
}

/// Put `self` into `other` if `self` has the lock.
fn extract(self, other: &mut Option<Self>) {
if self.has_lock {
*other = Some(self);
}
}

fn lock(&mut self, logger: &Logger, dst: &Site) -> Result<(), StoreError> {
if self.has_lock {
warn!(logger, "already acquired copy lock for {}", dst);
return Ok(());
}
advisory_lock::lock_copying(&mut self.inner, dst)?;
self.has_lock = true;
Ok(())
}

fn unlock(&mut self, logger: &Logger, dst: &Site) -> Result<(), StoreError> {
if !self.has_lock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here and in the two previous ifs if would be nice to get an error logged. Also in the else branch in switch().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think only the ifs in lock and unlock need logging; the else in switch is not an issue; we basically just want to make sure we put the right connection back into other, not just any. Changing those two ifs

error!(
logger,
"tried to release copy lock for {} even though we are not the owner", dst
);
return Ok(());
}
advisory_lock::unlock_copying(&mut self.inner, dst)?;
self.has_lock = false;
Ok(())
}
}

/// A helper to run copying of one table. We need to thread `conn` and
/// `table` from the control loop to the background worker and back again to
/// the control loop. This worker facilitates that
struct CopyTableWorker {
conn: PooledPgConnection,
conn: LockTrackingConnection,
table: TableState,
result: Result<Status, StoreError>,
}

impl CopyTableWorker {
fn new(conn: PooledPgConnection, table: TableState) -> Self {
fn new(conn: LockTrackingConnection, table: TableState) -> Self {
Self {
conn,
table,
Expand All @@ -735,7 +757,7 @@ impl CopyTableWorker {
fn run_inner(&mut self, logger: Logger, progress: &CopyProgress) -> Result<Status, StoreError> {
use Status::*;

let conn = &mut self.conn;
let conn = &mut self.conn.inner;
progress.start_table(&self.table);
while !self.table.finished() {
// It is important that this check happens outside the write
Expand Down Expand Up @@ -891,8 +913,9 @@ pub struct Connection {
/// individual table. Except for that case, this will always be
/// `Some(..)`. Most code shouldn't access `self.conn` directly, but use
/// `self.transaction`
conn: Option<PooledPgConnection>,
conn: Option<LockTrackingConnection>,
pool: ConnectionPool,
primary: Primary,
workers: usize,
src: Arc<Layout>,
dst: Arc<Layout>,
Expand All @@ -910,6 +933,7 @@ impl Connection {
/// is available.
pub fn new(
logger: &Logger,
primary: Primary,
pool: ConnectionPool,
src: Arc<Layout>,
dst: Arc<Layout>,
Expand All @@ -935,13 +959,14 @@ impl Connection {
}
false
})?;
let conn = Some(conn);
let src_manifest_idx_and_name = Arc::new(src_manifest_idx_and_name);
let dst_manifest_idx_and_name = Arc::new(dst_manifest_idx_and_name);
let conn = Some(LockTrackingConnection::new(conn));
Ok(Self {
logger,
conn,
pool,
primary,
workers: ENV_VARS.store.batch_workers,
src,
dst,
Expand Down Expand Up @@ -1023,6 +1048,7 @@ impl Connection {
let Some(table) = state.unfinished.pop() else {
return None;
};
let conn = LockTrackingConnection::new(conn);

let worker = CopyTableWorker::new(conn, table);
Some(Box::pin(
Expand Down Expand Up @@ -1064,7 +1090,7 @@ impl Connection {
let result = workers.select().await;
match result {
Ok(worker) => {
self.conn = Some(worker.conn);
worker.conn.extract(&mut self.conn);
}
Err(e) => {
/* Ignore; we had an error previously */
Expand All @@ -1079,7 +1105,9 @@ impl Connection {
let src = self.src.clone();
let dst = self.dst.clone();
let target_block = self.target_block.clone();
let mut state = self.transaction(|conn| CopyState::new(conn, src, dst, target_block))?;
let primary = self.primary.cheap_clone();
let mut state =
self.transaction(|conn| CopyState::new(conn, primary, src, dst, target_block))?;

let progress = Arc::new(CopyProgress::new(self.logger.cheap_clone(), &state));
progress.start();
Expand Down Expand Up @@ -1122,13 +1150,14 @@ impl Connection {
W::Err(e) => {
// This is a panic in the background task. We need to
// cancel all other tasks and return the error
error!(self.logger, "copy worker panicked: {}", e);
self.cancel_workers(progress, workers).await;
return Err(e);
}
W::Ok(worker) => {
// Put the connection back into self.conn so that we can use it
// in the next iteration.
self.conn = Some(worker.conn);
worker.conn.extract(&mut self.conn);

match (worker.result, progress.is_cancelled()) {
(Ok(Status::Finished), false) => {
Expand All @@ -1146,6 +1175,7 @@ impl Connection {
return Ok(Status::Cancelled);
}
(Err(e), _) => {
error!(self.logger, "copy worker had an error: {}", e);
self.cancel_workers(progress, workers).await;
return Err(e);
}
Expand Down Expand Up @@ -1236,20 +1266,30 @@ impl Connection {
);

let dst_site = self.dst.site.cheap_clone();
self.transaction(|conn| advisory_lock::lock_copying(conn, &dst_site))?;
let Some(conn) = self.conn.as_mut() else {
return Err(constraint_violation!(
"copy connection went missing (copy_data)"
));
};
conn.lock(&self.logger, &dst_site)?;

let res = self.copy_data_internal(index_list).await;

if self.conn.is_none() {
// A background worker panicked and left us without our
// dedicated connection, but we still need to release the copy
// lock; get a normal connection, not from the fdw pool for that
// as that will be much less contended. We won't be holding on
// to the connection for long as `res` will be an error and we
// will abort starting this subgraph
self.conn = Some(self.pool.get()?);
match self.conn.as_mut() {
None => {
// A background worker panicked and left us without our
// dedicated connection; we would need to get that
// connection to unlock the advisory lock. We can't do that,
// so we just log an error
warn!(
self.logger,
"can't unlock copy lock since the default worker panicked; lock will linger until session ends"
);
}
Some(conn) => {
conn.unlock(&self.logger, &dst_site)?;
}
}
self.transaction(|conn| advisory_lock::unlock_copying(conn, &dst_site))?;

if matches!(res, Ok(Status::Cancelled)) {
warn!(&self.logger, "Copying was cancelled and is incomplete");
Expand Down
Loading
Loading