@@ -389,6 +389,18 @@ def mangle_db_name(self, db_name: str) -> str:
389389
390390class PGSource (PGCluster ):
391391 """Source PostgreSQL cluster"""
392+ def get_size (self , * , dbname , only_tables : Optional [List [str ]] = None ) -> float :
393+ if only_tables == []:
394+ return 0
395+ if only_tables is not None :
396+ query = "SELECT SUM(pg_total_relation_size(tablename)) AS size FROM UNNEST(%s) AS tablename"
397+ args = [only_tables ]
398+ else :
399+ query = "SELECT pg_database_size(oid) AS size FROM pg_catalog.pg_database WHERE datname = %s"
400+ args = [dbname ]
401+ result = self .c (query , args = args , dbname = dbname )
402+ return result [0 ]["size" ] or 0
403+
392404 def create_publication (self , * , dbname : str , only_tables : Optional [List [str ]] = None ) -> str :
393405 mangled_name = self .mangle_db_name (dbname )
394406 pubname = f"aiven_db_migrate_{ mangled_name } _pub"
@@ -766,7 +778,7 @@ def _convert_table_names(self, tables: Optional[List[str]]) -> Set[PGTable]:
766778 )
767779 return ret
768780
769- def filter_tables (self , db : PGDatabase ) -> List [str ]:
781+ def filter_tables (self , db : PGDatabase ) -> Optional [ List [str ] ]:
770782 """
771783 Given a database, it will attempt to return a list of tables that should be data dumped / replicated
772784 based on the skip table list, with table list and the replicate extensions flag
@@ -779,7 +791,7 @@ def filter_tables(self, db: PGDatabase) -> List[str]:
779791 "Filtering tables for db %r, and skip tables %r and with tables %r" , db , self .skip_tables , self .with_tables
780792 )
781793 if not self .skip_tables and not self .with_tables and self .replicate_extensions :
782- return []
794+ return None
783795 if not db .tables :
784796 return []
785797 ret : Set [PGTable ] = set ()
@@ -852,6 +864,17 @@ def _check_databases(self):
852864 else :
853865 self .log .info ("Database %r already exists in target" , dbname )
854866
867+ def _check_database_size (self , max_size : float ):
868+ dbs_size = 0
869+ for dbname , source_db in self .source .databases .items ():
870+ only_tables = self .filter_tables (db = source_db )
871+ db_size = self .source .get_size (dbname = dbname , only_tables = only_tables )
872+ dbs_size += db_size
873+ if dbs_size > max_size :
874+ raise PGMigrateValidationFailedError (
875+ f"Databases do not fit to the required maximum size ({ dbs_size } > { max_size } )"
876+ )
877+
855878 def _check_pg_lang (self ):
856879 source_lang = {lan ["lanname" ] for lan in self .source .pg_lang }
857880 target_lang = {lan ["lanname" ] for lan in self .target .pg_lang }
@@ -1053,7 +1076,8 @@ def _dump_data(self, *, db: PGDatabase) -> PGMigrateStatus:
10531076 "--data-only" ,
10541077 self .source .conn_str (dbname = dbname ),
10551078 ]
1056- pg_dump_cmd .extend ([f"--table={ w } " for w in self .filter_tables (db )])
1079+ tables = self .filter_tables (db ) or []
1080+ pg_dump_cmd .extend ([f"--table={ w } " for w in tables ])
10571081 subtask : PGSubTask = self ._pg_dump_pipe_psql (
10581082 pg_dump_cmd = pg_dump_cmd , target_conn_str = self .target .conn_str (dbname = dbname )
10591083 )
@@ -1076,7 +1100,8 @@ def _db_replication(self, *, db: PGDatabase) -> PGMigrateStatus:
10761100 dbname = db .dbname
10771101 pubname = slotname = subname = None
10781102 try :
1079- pubname = self .source .create_publication (dbname = dbname , only_tables = self .filter_tables (db ))
1103+ tables = self .filter_tables (db ) or []
1104+ pubname = self .source .create_publication (dbname = dbname , only_tables = tables )
10801105 slotname = self .source .create_replication_slot (dbname = dbname )
10811106 subname = self .target .create_subscription (
10821107 conn_str = self .source .conn_str (dbname = dbname ), pubname = pubname , slotname = slotname , dbname = dbname
@@ -1129,7 +1154,7 @@ def _db_migrate(self, *, pgtask: PGMigrateTask) -> PGMigrateStatus:
11291154 pgtask .method = PGMigrateMethod .dump
11301155 return self ._dump_data (db = pgtask .source_db )
11311156
1132- def validate (self ):
1157+ def validate (self , dbs_max_total_size : Optional [ float ] = None ):
11331158 """
11341159 Do best effort validation whether all the bits and pieces are in place for migration to succeed.
11351160 * Migrating to same server is not supported (doable but requires obviously different dbname)
@@ -1154,6 +1179,8 @@ def validate(self):
11541179 # but it can be newer than the source version: source <= pgdump <= target
11551180 self .pgbin = find_pgbin_dir (str (self .source .version ), max_pgversion = str (self .target .version ))
11561181 self ._check_databases ()
1182+ if dbs_max_total_size is not None :
1183+ self ._check_database_size (max_size = dbs_max_total_size )
11571184 self ._check_pg_lang ()
11581185 self ._check_pg_ext ()
11591186 except KeyError as err :
@@ -1302,6 +1329,12 @@ def main(args=None, *, prog="pg_migrate"):
13021329 default = None ,
13031330 help = "Force the migration method to be used as either replication or dump." ,
13041331 )
1332+ parser .add_argument (
1333+ "--dbs-max-total-size" ,
1334+ type = int ,
1335+ default = - 1 ,
1336+ help = "Max total size of databases to be migrated, ignored by default" ,
1337+ )
13051338
13061339 args = parser .parse_args (args )
13071340 log_format = "%(asctime)s\t %(name)s\t %(levelname)s\t %(message)s"
@@ -1332,7 +1365,8 @@ def main(args=None, *, prog="pg_migrate"):
13321365 raise ValueError (f"Unsupported migration method '{ args .force_method } '" ) from e
13331366
13341367 if args .validate :
1335- pg_mig .validate ()
1368+ dbs_max_total_size = None if args .dbs_max_total_size == - 1 else args .dbs_max_total_size
1369+ pg_mig .validate (dbs_max_total_size = dbs_max_total_size )
13361370 else :
13371371 result : PGMigrateResult = pg_mig .migrate (force_method = method )
13381372 print ()
0 commit comments