Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions lib/pgslice/cli/swap.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,27 @@ def swap(table)
assert_table(intermediate_table)
assert_no_table(retired_table)

queries = [
"ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(retired_table)};",
"ALTER TABLE #{quote_table(intermediate_table)} RENAME TO #{quote_no_schema(table)};"
]
queries = []

# Set lock timeout
queries << "SET LOCAL lock_timeout = #{quote(options[:lock_timeout])};"

# Drop the mirror trigger created by enable_mirroring before swap
queries.concat(disable_mirroring_trigger_queries(table))

# Swap the tables
queries << "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(retired_table)};"
queries << "ALTER TABLE #{quote_table(intermediate_table)} RENAME TO #{quote_no_schema(table)};"

# Update sequence ownership
table.sequences.each do |sequence|
queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_schema"])}.#{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_table(table)}.#{quote_ident(sequence["related_column"])};"
end

queries.unshift("SET LOCAL lock_timeout = #{quote(options[:lock_timeout])};")
# Create the retired mirroring trigger after swap
# Note: After swap, table.name refers to the new main table (formerly intermediate)
# and retired_table refers to the old main table (formerly the original table)
queries.concat(enable_retired_mirroring_trigger_queries(table))

run_queries(queries)
end
Expand Down
13 changes: 9 additions & 4 deletions lib/pgslice/cli/unswap.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ def unswap(table)
assert_table(retired_table)
assert_no_table(intermediate_table)

queries = [
"ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(intermediate_table)};",
"ALTER TABLE #{quote_table(retired_table)} RENAME TO #{quote_no_schema(table)};"
]
queries = []

# Drop the retired mirroring trigger before unswap
queries.concat(disable_retired_mirroring_trigger_queries(table))

# Swap the tables back
queries << "ALTER TABLE #{quote_table(table)} RENAME TO #{quote_no_schema(intermediate_table)};"
queries << "ALTER TABLE #{quote_table(retired_table)} RENAME TO #{quote_no_schema(table)};"

# Update sequence ownership
table.sequences.each do |sequence|
queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_schema"])}.#{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_table(table)}.#{quote_ident(sequence["related_column"])};"
end
Expand Down
87 changes: 61 additions & 26 deletions lib/pgslice/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,30 @@ def make_stat_def(stat_def, table)

# mirroring triggers

def enable_mirroring_triggers(table)
def disable_mirroring_trigger_queries(table)
function_name = "#{table.name}_mirror_to_intermediate"
trigger_name = "#{table.name}_mirror_trigger"

queries = []

# drop trigger
queries << <<~SQL
DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)};
SQL

# drop function
queries << <<~SQL
DROP FUNCTION IF EXISTS #{quote_ident(function_name)}();
SQL

queries
end

def disable_mirroring_triggers(table)
run_queries(disable_mirroring_trigger_queries(table))
end

def enable_mirroring_trigger_queries(table)
intermediate_table = table.intermediate_table
function_name = "#{table.name}_mirror_to_intermediate"
trigger_name = "#{table.name}_mirror_trigger"
Expand All @@ -356,25 +379,43 @@ def enable_mirroring_triggers(table)
$$ LANGUAGE plpgsql;
SQL

# create trigger
# drop trigger if exists, then create
queries << <<~SQL
DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)};
SQL

queries << <<~SQL
CREATE TRIGGER #{quote_ident(trigger_name)}
AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)}
FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}();
SQL

run_queries(queries)
queries
end

def enable_mirroring_triggers(table)
run_queries(enable_mirroring_trigger_queries(table))
end

# retired mirroring triggers

def enable_retired_mirroring_triggers(table)
def enable_retired_mirroring_trigger_queries(table)
retired_table = table.retired_table
function_name = "#{table.name}_mirror_to_retired"
trigger_name = "#{table.name}_retired_mirror_trigger"

queries = []

# Build ON CONFLICT clause for INSERT
primary_keys = table.primary_key
conflict_clause = if primary_keys && primary_keys.any?
conflict_target = primary_keys.map { |pk| quote_ident(pk) }.join(", ")
"ON CONFLICT (#{conflict_target}) DO UPDATE SET #{mirror_set_clause(table)}"
else
# If no primary key, use DO NOTHING to avoid conflicts
"ON CONFLICT DO NOTHING"
end

# create mirror function
queries << <<~SQL
CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}()
Expand All @@ -387,41 +428,31 @@ def enable_retired_mirroring_triggers(table)
UPDATE #{quote_table(retired_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')};
RETURN NEW;
ELSIF TG_OP = 'INSERT' THEN
INSERT INTO #{quote_table(retired_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)});
INSERT INTO #{quote_table(retired_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)})
#{conflict_clause};
RETURN NEW;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
SQL

# create trigger
# drop trigger if exists, then create
queries << <<~SQL
DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)};
SQL

queries << <<~SQL
CREATE TRIGGER #{quote_ident(trigger_name)}
AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)}
FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}();
SQL

run_queries(queries)
queries
end

def disable_mirroring_triggers(table)
function_name = "#{table.name}_mirror_to_intermediate"
trigger_name = "#{table.name}_mirror_trigger"

queries = []

# drop trigger
queries << <<~SQL
DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)};
SQL

# drop function
queries << <<~SQL
DROP FUNCTION IF EXISTS #{quote_ident(function_name)}();
SQL

run_queries(queries)
def enable_retired_mirroring_triggers(table)
run_queries(enable_retired_mirroring_trigger_queries(table))
end

def mirror_where_clause(table, record)
Expand All @@ -446,7 +477,7 @@ def mirror_new_tuple_list(table)
table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ")
end

def disable_retired_mirroring_triggers(table)
def disable_retired_mirroring_trigger_queries(table)
function_name = "#{table.name}_mirror_to_retired"
trigger_name = "#{table.name}_retired_mirror_trigger"

Expand All @@ -462,7 +493,11 @@ def disable_retired_mirroring_triggers(table)
DROP FUNCTION IF EXISTS #{quote_ident(function_name)}();
SQL

run_queries(queries)
queries
end

def disable_retired_mirroring_triggers(table)
run_queries(disable_retired_mirroring_trigger_queries(table))
end
end
end
62 changes: 61 additions & 1 deletion test/pgslice_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,55 @@ def test_swap_missing_table
assert_error "Table not found", "swap Items"
end

def test_swap_creates_retired_mirroring_trigger
run_command "prep Posts --no-partition"
run_command "fill Posts"

# Verify no retired mirroring trigger exists before swap
trigger_result = execute <<~SQL, [quote_ident("Posts")]
SELECT tgname FROM pg_trigger
WHERE tgname = 'Posts_retired_mirror_trigger'
AND tgrelid = $1::regclass
SQL
assert !trigger_result.any?, "Retired mirror trigger should not exist before swap"

run_command "swap Posts"
assert table_exists?("Posts_retired")

# Verify retired mirroring trigger exists after swap
trigger_result = execute <<~SQL, [quote_ident("Posts")]
SELECT tgname FROM pg_trigger
WHERE tgname = 'Posts_retired_mirror_trigger'
AND tgrelid = $1::regclass
SQL
assert trigger_result.any?, "Retired mirror trigger should exist after swap"

# Verify function exists
function_result = execute <<~SQL
SELECT proname FROM pg_proc
WHERE proname = 'Posts_mirror_to_retired'
SQL
assert function_result.any?, "Retired mirror function should exist after swap"

# Test that the trigger works - insert into main table and verify it's mirrored to retired
initial_retired_count = count("Posts_retired")
initial_main_count = count("Posts")

# Insert a new row into the main table
now = Time.now.utc
execute %!INSERT INTO "Posts" ("createdAt") VALUES ($1)!, [now.iso8601]

# Verify it was inserted into main table
assert_equal initial_main_count + 1, count("Posts")

# Verify it was mirrored to retired table (or updated if it already existed)
# The retired table should have at least the same number of rows or more
assert count("Posts_retired") >= initial_retired_count, "Retired table should have been updated"

run_command "unswap Posts"
run_command "unprep Posts"
end

def test_unswap_missing_table
assert_error "Table not found", "unswap Items"
end
Expand Down Expand Up @@ -178,9 +227,13 @@ def test_enable_retired_mirroring
run_command "swap Posts"
assert table_exists?("Posts_retired")

# Retired mirroring trigger is automatically created during swap
# Verify it exists by checking that we can disable it
run_command "disable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers disabled for Posts/

# Now enable it again
run_command "enable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers enabled for Posts/

run_command "disable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers disabled for Posts/
run_command "unswap Posts"
run_command "unprep Posts"
end
Expand All @@ -195,7 +248,14 @@ def test_disable_retired_mirroring
run_command "swap Posts"
assert table_exists?("Posts_retired")

# Retired mirroring trigger is automatically created during swap
# Disable it
run_command "disable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers disabled for Posts/

# Re-enable it
run_command "enable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers enabled for Posts/

# Disable it again
run_command "disable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers disabled for Posts/

run_command "unswap Posts"
Expand Down