Skip to content

Commit fc94257

Browse files
author
Teemu Harju
committed
add check for maximum databases size to migrate
1 parent 807698d commit fc94257

File tree

4 files changed

+196
-2
lines changed

4 files changed

+196
-2
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ Use `--no-replicate-extension-tables` to skip extension tables. By default it a
122122

123123
With `--force-method` you can specify if you wish to use either replication or dump method. Otherwise the most suitable method is chosen automatically.
124124

125+
Using `--dbs-max-total-size` together with `--validate` you can check if the size of the source database in below some threshold.
126+
125127
### API example
126128

127129
Migrating from AWS RDS to Aiven for PostgreSQL. Logical replication is enabled in source AWS RDS PostgreSQL

aiven_db_migrate/migrate/pgmigrate.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,18 @@ def mangle_db_name(self, db_name: str) -> str:
389389

390390
class PGSource(PGCluster):
391391
"""Source PostgreSQL cluster"""
392+
def get_size(self, *, dbname, only_tables: Optional[List[str]] = None) -> float:
393+
if only_tables == []:
394+
return 0
395+
if only_tables is not None:
396+
query = "SELECT SUM(pg_total_relation_size(tablename)) AS size FROM UNNEST(%s) AS tablename"
397+
args = [only_tables]
398+
else:
399+
query = "SELECT pg_database_size(oid) AS size FROM pg_catalog.pg_database WHERE datname = %s"
400+
args = [dbname]
401+
result = self.c(query, args=args, dbname=dbname)
402+
return result[0]["size"] or 0
403+
392404
def create_publication(self, *, dbname: str, only_tables: Optional[List[str]] = None) -> str:
393405
mangled_name = self.mangle_db_name(dbname)
394406
pubname = f"aiven_db_migrate_{mangled_name}_pub"
@@ -852,6 +864,17 @@ def _check_databases(self):
852864
else:
853865
self.log.info("Database %r already exists in target", dbname)
854866

867+
def _check_database_size(self, max_size: float):
868+
dbs_size = 0
869+
for dbname, source_db in self.source.databases.items():
870+
only_tables = self.filter_tables(db=source_db)
871+
db_size = self.source.get_size(dbname=dbname, only_tables=only_tables)
872+
dbs_size += db_size
873+
if dbs_size > max_size:
874+
raise PGMigrateValidationFailedError(
875+
f"Databases do not fit to the required maximum size ({dbs_size} > {max_size})"
876+
)
877+
855878
def _check_pg_lang(self):
856879
source_lang = {lan["lanname"] for lan in self.source.pg_lang}
857880
target_lang = {lan["lanname"] for lan in self.target.pg_lang}
@@ -1131,7 +1154,7 @@ def _db_migrate(self, *, pgtask: PGMigrateTask) -> PGMigrateStatus:
11311154
pgtask.method = PGMigrateMethod.dump
11321155
return self._dump_data(db=pgtask.source_db)
11331156

1134-
def validate(self):
1157+
def validate(self, dbs_max_total_size: Optional[float] = None):
11351158
"""
11361159
Do best effort validation whether all the bits and pieces are in place for migration to succeed.
11371160
* Migrating to same server is not supported (doable but requires obviously different dbname)
@@ -1156,6 +1179,8 @@ def validate(self):
11561179
# but it can be newer than the source version: source <= pgdump <= target
11571180
self.pgbin = find_pgbin_dir(str(self.source.version), max_pgversion=str(self.target.version))
11581181
self._check_databases()
1182+
if dbs_max_total_size is not None:
1183+
self._check_database_size(max_size=dbs_max_total_size)
11591184
self._check_pg_lang()
11601185
self._check_pg_ext()
11611186
except KeyError as err:
@@ -1304,6 +1329,12 @@ def main(args=None, *, prog="pg_migrate"):
13041329
default=None,
13051330
help="Force the migration method to be used as either replication or dump.",
13061331
)
1332+
parser.add_argument(
1333+
"--dbs-max-total-size",
1334+
type=int,
1335+
default=-1,
1336+
help="Max total size of databases to be migrated, ignored by default",
1337+
)
13071338

13081339
args = parser.parse_args(args)
13091340
log_format = "%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s"
@@ -1334,7 +1365,8 @@ def main(args=None, *, prog="pg_migrate"):
13341365
raise ValueError(f"Unsupported migration method '{args.force_method}'") from e
13351366

13361367
if args.validate:
1337-
pg_mig.validate()
1368+
dbs_max_total_size = None if args.dbs_max_total_size == -1 else args.dbs_max_total_size
1369+
pg_mig.validate(dbs_max_total_size=dbs_max_total_size)
13381370
else:
13391371
result: PGMigrateResult = pg_mig.migrate(force_method=method)
13401372
print()

test/test_db_size_check.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from aiven_db_migrate.migrate.pgmigrate import PGMigrate
2+
from test.conftest import PGRunner
3+
from test.utils import random_string
4+
from typing import Tuple
5+
6+
import psycopg2
7+
import pytest
8+
9+
10+
def test_db_size(pg_source_and_target_replication: Tuple[PGRunner, PGRunner]):
11+
source, target = pg_source_and_target_replication
12+
13+
db_name = random_string(6)
14+
other_db_name = random_string(6)
15+
16+
source.create_db(dbname=db_name)
17+
source.create_db(dbname=other_db_name)
18+
19+
pg_mig = PGMigrate(
20+
source_conn_info=source.super_conn_info(),
21+
target_conn_info=target.super_conn_info(),
22+
verbose=True,
23+
)
24+
25+
# Create few tables and insert some data
26+
tables = [f'table_{i}' for i in range(4)]
27+
for dbname in {db_name, other_db_name}:
28+
with source.cursor(dbname=dbname) as c:
29+
for t in tables:
30+
c.execute(f"DROP TABLE IF EXISTS {t}")
31+
c.execute(f"CREATE TABLE {t} (foo INT)")
32+
c.execute(f"INSERT INTO {t} (foo) VALUES (1), (2), (3)")
33+
34+
size = pg_mig.source.get_size(dbname=db_name, only_tables=[])
35+
assert size == 0
36+
37+
size = pg_mig.source.get_size(dbname=db_name)
38+
assert size >= 0 # returns slightly different values per pg version
39+
40+
size = pg_mig.source.get_size(dbname=db_name, only_tables=tables)
41+
assert size == 32768
42+
43+
size = pg_mig.source.get_size(dbname=db_name, only_tables=tables[:1])
44+
assert size == 8192
45+
46+
with pytest.raises(psycopg2.OperationalError):
47+
size = pg_mig.source.get_size(dbname="notfound")

test/test_migrate_checks.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Copyright (c) 2021 Aiven, Helsinki, Finland. https://aiven.io/
2+
3+
from aiven_db_migrate.migrate.errors import PGMigrateValidationFailedError
4+
from aiven_db_migrate.migrate.pgmigrate import PGMigrate
5+
from test.conftest import PGRunner
6+
from test.utils import random_string
7+
from typing import Tuple
8+
from unittest.mock import patch
9+
10+
import pytest
11+
12+
13+
def test_dbs_max_total_size_check(pg_source_and_target: Tuple[PGRunner, PGRunner]):
14+
source, target = pg_source_and_target
15+
dbnames = {random_string() for _ in range(3)}
16+
17+
for dbname in dbnames:
18+
source.create_db(dbname=dbname)
19+
target.create_db(dbname=dbname)
20+
21+
# This DB seems to be created outside the tests
22+
dbnames.add("postgres")
23+
24+
# Create few tables and insert some data
25+
tables = [f'table_{i}' for i in range(4)]
26+
for dbname in dbnames:
27+
with source.cursor(dbname=dbname) as c:
28+
for t in tables:
29+
c.execute(f"DROP TABLE IF EXISTS {t}")
30+
c.execute(f"CREATE TABLE {t} (foo INT)")
31+
c.execute(f"INSERT INTO {t} (foo) VALUES (1), (2), (3)")
32+
33+
pg_mig = PGMigrate(
34+
source_conn_info=source.conn_info(),
35+
target_conn_info=target.conn_info(),
36+
createdb=False,
37+
verbose=True,
38+
)
39+
40+
with patch(
41+
"aiven_db_migrate.migrate.pgmigrate.PGMigrate._check_database_size", side_effect=pg_mig._check_database_size
42+
) as mock_db_size_check:
43+
# DB size check is not run
44+
pg_mig.validate()
45+
mock_db_size_check.assert_not_called()
46+
47+
mock_db_size_check.reset_mock()
48+
49+
# DB size check with max size of zero
50+
with pytest.raises(PGMigrateValidationFailedError) as e:
51+
pg_mig.validate(dbs_max_total_size=0)
52+
assert "Databases do not fit to the required maximum size" in str(e)
53+
mock_db_size_check.assert_called_once_with(max_size=0)
54+
55+
mock_db_size_check.reset_mock()
56+
57+
# DB size check with enough size
58+
pg_mig.validate(dbs_max_total_size=1073741824)
59+
mock_db_size_check.assert_called_once_with(max_size=1073741824)
60+
61+
# Test with DB name filtering
62+
pg_mig = PGMigrate(
63+
source_conn_info=source.conn_info(),
64+
target_conn_info=target.conn_info(),
65+
createdb=False,
66+
verbose=True,
67+
filtered_db=",".join(dbnames),
68+
)
69+
70+
with patch(
71+
"aiven_db_migrate.migrate.pgmigrate.PGMigrate._check_database_size", side_effect=pg_mig._check_database_size
72+
) as mock_db_size_check:
73+
# Should pass as all DBs are filtered out from size calculations
74+
pg_mig.validate(dbs_max_total_size=0)
75+
mock_db_size_check.assert_called_once_with(max_size=0)
76+
77+
# Test with table filtering
78+
79+
# Include all tables in "skip_tables"
80+
pg_mig = PGMigrate(
81+
source_conn_info=source.conn_info(),
82+
target_conn_info=target.conn_info(),
83+
createdb=False,
84+
verbose=True,
85+
skip_tables=tables, # skip all tables
86+
)
87+
88+
with patch(
89+
"aiven_db_migrate.migrate.pgmigrate.PGMigrate._check_database_size", side_effect=pg_mig._check_database_size
90+
) as mock_db_size_check:
91+
# Should pass as all tables are filtered out from size calculations
92+
pg_mig.validate(dbs_max_total_size=0)
93+
mock_db_size_check.assert_called_once_with(max_size=0)
94+
95+
# Only the first table is included
96+
pg_mig = PGMigrate(
97+
source_conn_info=source.conn_info(),
98+
target_conn_info=target.conn_info(),
99+
createdb=False,
100+
verbose=True,
101+
with_tables=tables[:1], # include only one table
102+
)
103+
with patch(
104+
"aiven_db_migrate.migrate.pgmigrate.PGMigrate._check_database_size", side_effect=pg_mig._check_database_size
105+
) as mock_db_size_check:
106+
# This fails as one table is included in check and it should have data
107+
with pytest.raises(PGMigrateValidationFailedError) as e:
108+
pg_mig.validate(dbs_max_total_size=0)
109+
assert "Databases do not fit to the required maximum size" in str(e)
110+
mock_db_size_check.assert_called_once_with(max_size=0)
111+
112+
# Should easily fit
113+
pg_mig.validate(dbs_max_total_size=1073741824)

0 commit comments

Comments
 (0)