Skip to content

Commit ad8d0f0

Browse files
authored
Merge pull request #175 from tanmoysrt/innodb_fts_repair
fix(physical-restore): InnoDB FTS Index Repair
2 parents 1bdd9e3 + 834e27d commit ad8d0f0

File tree

1 file changed

+120
-64
lines changed

1 file changed

+120
-64
lines changed

agent/database_physical_restore.py

+120-64
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import re
66
import shutil
77
import subprocess
8+
from contextlib import suppress
89

910
from agent.base import AgentException
1011
from agent.database import CustomPeeweeDB
@@ -76,6 +77,7 @@ def create_restore_job(self):
7677
self.hold_write_lock_on_myisam_tables()
7778
self.perform_myisam_file_operations()
7879
self.unlock_all_tables()
80+
self._close_db_connections()
7981
self.perform_post_restoration_validation_and_fixes()
8082

8183
@step("Validate Backup Files")
@@ -281,25 +283,29 @@ def hold_write_lock_on_myisam_tables(self):
281283
def perform_myisam_file_operations(self):
282284
self._perform_file_operations(engine="myisam")
283285

286+
@step("Unlock All Tables")
287+
def unlock_all_tables(self):
288+
self._get_target_db().execute_sql("UNLOCK TABLES;")
289+
self._get_target_db_for_myisam().execute_sql("UNLOCK TABLES;")
290+
284291
@step("Validate And Fix Tables")
285292
def perform_post_restoration_validation_and_fixes(self):
286-
innodb_tables_with_fts = self.get_innodb_tables_with_fts_index()
293+
innodb_tables_with_fts = self._get_innodb_tables_with_fts_index()
287294
"""
288295
FLUSH TABLES ... FOR EXPORT does not support FULLTEXT indexes.
289296
https://dev.mysql.com/doc/refman/8.4/en/innodb-table-import.html#:~:text=in%20the%20operation.-,Limitations,-The%20Transportable%20Tablespaces
290297
291-
We can either drop + add index.
292-
Or, run `OPTIMIZE TABLE` on the table to rebuild the index.
293-
https://mariadb.com/kb/en/optimize-table/#updating-an-innodb-fulltext-index
298+
Need to drop all fulltext indexes of InnoDB tables.
299+
Then, optimize table to fix existing corruptions and rebuild table (if needed).
300+
Then, recreate the fulltext indexes.
294301
"""
295302

296303
for table in innodb_tables_with_fts:
297304
"""
298305
No need to waste time on checking whether index is corrupted or not
299306
Because, physical restoration will not work for FULLTEXT index.
300307
"""
301-
if not self.repair_table(table, "innodb"):
302-
raise Exception(f"Failed to repair table {table}")
308+
self.recreate_fts_indexes(table)
303309

304310
"""
305311
MyISAM table corruption can generally happen due to mismatch of no of records in MYD file.
@@ -312,14 +318,9 @@ def perform_post_restoration_validation_and_fixes(self):
312318
https://dev.mysql.com/doc/refman/8.4/en/myisam-repair.html
313319
"""
314320
for table in self.myisam_tables:
315-
if self.is_table_corrupted(table) and not self.repair_table(table, "myisam"):
321+
if self.is_table_corrupted(table) and not self.repair_myisam_table(table):
316322
raise Exception(f"Failed to repair table {table}")
317323

318-
@step("Unlock All Tables")
319-
def unlock_all_tables(self):
320-
self._get_target_db().execute_sql("UNLOCK TABLES;")
321-
self._get_target_db_for_myisam().execute_sql("UNLOCK TABLES;")
322-
323324
def _warmup_files(self, file_paths: list[str]):
324325
"""
325326
Once the snapshot is converted to disk and attached to the instance,
@@ -354,43 +355,6 @@ def _perform_file_operations(self, engine: str):
354355
os.path.join(self.target_db_directory, file),
355356
)
356357

357-
def _get_target_db(self) -> CustomPeeweeDB:
358-
if self._target_db_instance is not None:
359-
if not self._target_db_instance.is_connection_usable():
360-
raise DatabaseConnectionClosedWithDatabase()
361-
return self._target_db_instance
362-
363-
self._target_db_instance = CustomPeeweeDB(
364-
self.target_db,
365-
user=self.target_db_user,
366-
password=self.target_db_password,
367-
host=self.target_db_host,
368-
port=self.target_db_port,
369-
)
370-
self._target_db_instance.connect()
371-
# Set session wait timeout to 4 hours [EXPERIMENTAL]
372-
self._target_db_instance.execute_sql("SET SESSION wait_timeout = 14400;")
373-
return self._target_db_instance
374-
375-
def _get_target_db_for_myisam(self) -> CustomPeeweeDB:
376-
if self._target_db_instance_for_myisam is not None:
377-
if not self._target_db_instance_for_myisam.is_connection_usable():
378-
raise DatabaseConnectionClosedWithDatabase()
379-
return self._target_db_instance_for_myisam
380-
381-
self._target_db_instance_for_myisam = CustomPeeweeDB(
382-
self.target_db,
383-
user=self.target_db_user,
384-
password=self.target_db_password,
385-
host=self.target_db_host,
386-
port=self.target_db_port,
387-
autocommit=False,
388-
)
389-
self._target_db_instance_for_myisam.connect()
390-
# Set session wait timeout to 4 hours [EXPERIMENTAL]
391-
self._target_db_instance_for_myisam.execute_sql("SET SESSION wait_timeout = 14400;")
392-
return self._target_db_instance_for_myisam
393-
394358
def is_table_need_to_be_restored(self, table_name: str) -> bool:
395359
if not self.restore_specific_tables:
396360
return True
@@ -427,7 +391,10 @@ def get_drop_table_statement(self, table_name) -> str:
427391
return f"DROP TABLE IF EXISTS `{table_name}`;"
428392

429393
def is_table_corrupted(self, table_name: str) -> bool:
430-
result = run_sql_query(self._get_target_db(), f"CHECK TABLE `{table_name}` QUICK;")
394+
result = run_sql_query(
395+
self._get_target_db(raise_error_on_connection_closed=False),
396+
f"CHECK TABLE `{table_name}` QUICK;",
397+
)
431398
"""
432399
+-----------------------------------+-------+----------+------------------------------------------------------+
433400
| Table | Op | Msg_type | Msg_text |
@@ -452,18 +419,16 @@ def is_table_corrupted(self, table_name: str) -> bool:
452419
break
453420
return isError
454421

455-
def repair_table(self, table_name: str, engine: str) -> bool:
456-
if engine == "innodb":
457-
result = run_sql_query(self._get_target_db(), f"OPTIMIZE TABLE `{table_name}`;")
458-
elif engine == "myisam":
459-
result = run_sql_query(self._get_target_db(), f"REPAIR TABLE `{table_name}` USE_FRM;")
460-
else:
461-
raise Exception(f"Engine {engine} is not supported")
422+
def repair_myisam_table(self, table_name: str) -> bool:
423+
result = run_sql_query(
424+
self._get_target_db(raise_error_on_connection_closed=False),
425+
f"REPAIR TABLE `{table_name}` USE_FRM;",
426+
)
462427
"""
463428
+---------------------------------------------------+--------+----------+----------+
464429
| Table | Op | Msg_type | Msg_text |
465430
+---------------------------------------------------+--------+----------+----------+
466-
| _8edd549f4b072174.tabInsights Query Execution Log | repair | status | OK |
431+
| _8edd549f4b072174.tabInsights Query Execution Log | repair | status | OK |
467432
+---------------------------------------------------+--------+----------+----------+
468433
469434
Msg Type can be status, error, info, note, or warning
@@ -476,9 +441,27 @@ def repair_table(self, table_name: str, engine: str) -> bool:
476441

477442
return not isErrorOccurred
478443

479-
def get_innodb_tables_with_fts_index(self):
444+
def recreate_fts_indexes(self, table: str):
445+
fts_indexes = self._get_fts_indexes_of_table(table)
446+
for index_name, _ in fts_indexes.items():
447+
run_sql_query(
448+
self._get_target_db(raise_error_on_connection_closed=False),
449+
f"ALTER TABLE `{table}` DROP INDEX IF EXISTS `{index_name}`;",
450+
)
451+
# Optimize table to fix existing corruptions
452+
run_sql_query(
453+
self._get_target_db(raise_error_on_connection_closed=False), f"OPTIMIZE TABLE `{table}`;"
454+
)
455+
# Recreate the indexes
456+
for index_name, columns in fts_indexes.items():
457+
run_sql_query(
458+
self._get_target_db(raise_error_on_connection_closed=False),
459+
f"ALTER TABLE `{table}` ADD FULLTEXT INDEX `{index_name}` ({columns});",
460+
)
461+
462+
def _get_innodb_tables_with_fts_index(self):
480463
rows = run_sql_query(
481-
self._get_target_db(),
464+
self._get_target_db(raise_error_on_connection_closed=False),
482465
f"""
483466
SELECT
484467
DISTINCT(t.TABLE_NAME)
@@ -491,16 +474,89 @@ def get_innodb_tables_with_fts_index(self):
491474
WHERE
492475
s.INDEX_TYPE = 'FULLTEXT'
493476
AND t.TABLE_SCHEMA = '{self.target_db}'
494-
AND t.ENGINE = 'InnoDB'
477+
AND t.ENGINE = 'InnoDB';
495478
""",
496479
)
497480
return [row[0] for row in rows]
498481

499-
def __del__(self):
482+
def _get_fts_indexes_of_table(self, table: str) -> dict[str, str]:
483+
rows = run_sql_query(
484+
self._get_target_db(raise_error_on_connection_closed=False),
485+
f"""
486+
SELECT
487+
INDEX_NAME, group_concat(column_name ORDER BY seq_in_index) AS columns
488+
FROM
489+
information_schema.statistics
490+
WHERE
491+
TABLE_SCHEMA = '{self.target_db}'
492+
AND TABLE_NAME = '{table}'
493+
AND INDEX_TYPE = 'FULLTEXT'
494+
GROUP BY
495+
INDEX_NAME;
496+
""",
497+
)
498+
return {row[0]: row[1] for row in rows}
499+
500+
def _get_target_db(self, raise_error_on_connection_closed: bool = True) -> CustomPeeweeDB:
501+
if self._target_db_instance is not None and not is_db_connection_usable(self._target_db_instance):
502+
if raise_error_on_connection_closed:
503+
raise DatabaseConnectionClosedWithDatabase()
504+
self._target_db_instance = None
505+
506+
if self._target_db_instance is not None:
507+
return self._target_db_instance
508+
509+
self._target_db_instance = CustomPeeweeDB(
510+
self.target_db,
511+
user=self.target_db_user,
512+
password=self.target_db_password,
513+
host=self.target_db_host,
514+
port=self.target_db_port,
515+
)
516+
self._target_db_instance.connect()
517+
# Set session wait timeout to 4 hours [EXPERIMENTAL]
518+
self._target_db_instance.execute_sql("SET SESSION wait_timeout = 14400;")
519+
return self._target_db_instance
520+
521+
def _get_target_db_for_myisam(self) -> CustomPeeweeDB:
522+
if self._target_db_instance_for_myisam is not None:
523+
if not is_db_connection_usable(self._target_db_instance_for_myisam):
524+
raise DatabaseConnectionClosedWithDatabase()
525+
return self._target_db_instance_for_myisam
526+
527+
self._target_db_instance_for_myisam = CustomPeeweeDB(
528+
self.target_db,
529+
user=self.target_db_user,
530+
password=self.target_db_password,
531+
host=self.target_db_host,
532+
port=self.target_db_port,
533+
autocommit=False,
534+
)
535+
self._target_db_instance_for_myisam.connect()
536+
# Set session wait timeout to 4 hours [EXPERIMENTAL]
537+
self._target_db_instance_for_myisam.execute_sql("SET SESSION wait_timeout = 14400;")
538+
return self._target_db_instance_for_myisam
539+
540+
def _close_db_connections(self):
500541
if self._target_db_instance is not None:
501-
self._target_db_instance.close()
542+
with suppress(Exception):
543+
self._target_db_instance.close()
502544
if self._target_db_instance_for_myisam is not None:
503-
self._target_db_instance_for_myisam.close()
545+
with suppress(Exception):
546+
self._target_db_instance_for_myisam.close()
547+
548+
def __del__(self):
549+
self._close_db_connections()
550+
551+
552+
def is_db_connection_usable(db: CustomPeeweeDB) -> bool:
553+
try:
554+
if not db.is_connection_usable():
555+
return False
556+
db.execute_sql("SELECT 1;")
557+
return True
558+
except Exception:
559+
return False
504560

505561

506562
def run_sql_query(db: CustomPeeweeDB, query: str) -> list[str]:

0 commit comments

Comments
 (0)