@@ -37,7 +37,7 @@ use graph::{
37
37
info, lazy_static, o, warn, BlockNumber , BlockPtr , CheapClone , Logger , StoreError , ENV_VARS ,
38
38
} ,
39
39
schema:: EntityType ,
40
- slog:: error,
40
+ slog:: { debug , error} ,
41
41
} ;
42
42
use itertools:: Itertools ;
43
43
@@ -113,16 +113,33 @@ table! {
113
113
}
114
114
115
115
/// Return `true` if the site is the source of a copy operation. The copy
116
- /// operation might be just queued or in progress already
117
- pub fn is_source ( conn : & mut PgConnection , site : & Site ) -> Result < bool , StoreError > {
116
+ /// operation might be just queued or in progress already. This method will
117
+ /// block until a fdw connection becomes available.
118
+ pub fn is_source ( logger : & Logger , pool : & ConnectionPool , site : & Site ) -> Result < bool , StoreError > {
118
119
use active_copies as ac;
119
120
121
+ // We use a fdw connection to check if the site is being copied. If we
122
+ // used an ordinary connection and there are many calls to this method,
123
+ // postgres_fdw might open an unmanageable number of connections into
124
+ // the primary, which makes the primary run out of connections
125
+ let mut last_log = Instant :: now ( ) ;
126
+ let mut conn = pool. get_fdw ( & logger, || {
127
+ if last_log. elapsed ( ) > LOG_INTERVAL {
128
+ last_log = Instant :: now ( ) ;
129
+ debug ! (
130
+ logger,
131
+ "Waiting for fdw connection to check if site {} is being copied" , site. namespace
132
+ ) ;
133
+ }
134
+ false
135
+ } ) ?;
136
+
120
137
select ( diesel:: dsl:: exists (
121
138
ac:: table
122
139
. filter ( ac:: src. eq ( site. id ) )
123
140
. filter ( ac:: cancelled_at. is_null ( ) ) ,
124
141
) )
125
- . get_result :: < bool > ( conn)
142
+ . get_result :: < bool > ( & mut conn)
126
143
. map_err ( StoreError :: from)
127
144
}
128
145
0 commit comments