Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions lib/pgslice/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,27 @@ def initialize(*args)
def version
log("pgslice #{PgSlice::VERSION}")
end

desc "enable_mirroring TABLE", "Enable mirroring triggers for live data changes during partitioning"
def enable_mirroring(table_name)
table = create_table(table_name)
intermediate_table = table.intermediate_table

assert_table(table)
assert_table(intermediate_table)

enable_mirroring_triggers(table)
log("Mirroring triggers enabled for #{table_name}")
end

desc "disable_mirroring TABLE", "Disable mirroring triggers after partitioning is complete"
def disable_mirroring(table_name)
table = create_table(table_name)

assert_table(table)

disable_mirroring_triggers(table)
log("Mirroring triggers disabled for #{table_name}")
end
end
end
58 changes: 58 additions & 0 deletions lib/pgslice/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,45 @@ def make_stat_def(stat_def, table)
stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";"
end

# mirroring triggers

def enable_mirroring_triggers(table)
intermediate_table = table.intermediate_table
function_name = "#{table.name}_mirror_to_intermediate"
trigger_name = "#{table.name}_mirror_trigger"

queries = []

# create mirror function
queries << <<~SQL
CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')};
RETURN OLD;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')};
RETURN NEW;
ELSIF TG_OP = 'INSERT' THEN
INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)});
RETURN NEW;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
SQL

# create trigger
queries << <<~SQL
CREATE TRIGGER #{quote_ident(trigger_name)}
AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)}
FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}();
SQL

run_queries(queries)
end

# retired mirroring triggers

def enable_retired_mirroring_triggers(table)
Expand Down Expand Up @@ -366,6 +405,25 @@ def enable_retired_mirroring_triggers(table)
run_queries(queries)
end

def disable_mirroring_triggers(table)
function_name = "#{table.name}_mirror_to_intermediate"
trigger_name = "#{table.name}_mirror_trigger"

queries = []

# drop trigger
queries << <<~SQL
DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)};
SQL

# drop function
queries << <<~SQL
DROP FUNCTION IF EXISTS #{quote_ident(function_name)}();
SQL

run_queries(queries)
end

def mirror_where_clause(table, record)
primary_keys = table.primary_key
if primary_keys && primary_keys.any?
Expand Down
Binary file added pgslice-0.7.1.gem
Binary file not shown.
109 changes: 102 additions & 7 deletions test/pgslice_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def test_synchronize
execute %!UPDATE "Posts" SET "createdAt" = '2020-01-01' WHERE "Id" = 1!

# Run synchronize
run_command "synchronize Posts --window-size 1000", allow_stderr: true
run_command "synchronize Posts --window-size 1000", expected_stderr: /Synchronization complete/

# Verify the difference was fixed
result = execute(%!SELECT "createdAt" FROM "Posts_intermediate" WHERE "Id" = 1!)
Expand All @@ -178,9 +178,9 @@ def test_enable_retired_mirroring
run_command "swap Posts"
assert table_exists?("Posts_retired")

run_command "enable_retired_mirroring Posts", allow_stderr: true
run_command "enable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers enabled for Posts/

run_command "disable_retired_mirroring Posts", allow_stderr: true
run_command "disable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers disabled for Posts/
run_command "unswap Posts"
run_command "unprep Posts"
end
Expand All @@ -195,8 +195,8 @@ def test_disable_retired_mirroring
run_command "swap Posts"
assert table_exists?("Posts_retired")

run_command "enable_retired_mirroring Posts", allow_stderr: true
run_command "disable_retired_mirroring Posts", allow_stderr: true
run_command "enable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers enabled for Posts/
run_command "disable_retired_mirroring Posts", expected_stderr: /Retired mirroring triggers disabled for Posts/

run_command "unswap Posts"
run_command "unprep Posts"
Expand Down Expand Up @@ -309,6 +309,99 @@ def test_ulid_numeric_still_works
run_command "unprep Posts"
end

def test_enable_mirroring_missing_table
assert_error "Table not found", "enable_mirroring Items"
end

def test_enable_mirroring_missing_intermediate_table
assert_error "Table not found", "enable_mirroring Posts"
end

def test_disable_mirroring_missing_table
assert_error "Table not found", "disable_mirroring Items"
end

def test_enable_mirroring
run_command "prep Posts createdAt day"
assert table_exists?("Posts_intermediate")

run_command "enable_mirroring Posts", expected_stderr: /Mirroring triggers enabled for Posts/

# Verify trigger exists
trigger_result = execute <<~SQL, [quote_ident("Posts")]
SELECT tgname FROM pg_trigger
WHERE tgname = 'Posts_mirror_trigger'
AND tgrelid = $1::regclass
SQL
assert trigger_result.any?, "Mirror trigger should exist"

# Verify function exists
function_result = execute <<~SQL
SELECT proname FROM pg_proc
WHERE proname = 'Posts_mirror_to_intermediate'
SQL
assert function_result.any?, "Mirror function should exist"
end

def test_disable_mirroring
run_command "prep Posts createdAt day"
run_command "enable_mirroring Posts", expected_stderr: /Mirroring triggers enabled for Posts/

run_command "disable_mirroring Posts", expected_stderr: /Mirroring triggers disabled for Posts/

# Verify trigger is removed
trigger_result = execute <<~SQL, [quote_ident("Posts")]
SELECT tgname FROM pg_trigger
WHERE tgname = 'Posts_mirror_trigger'
AND tgrelid = $1::regclass
SQL
assert !trigger_result.any?, "Mirror trigger should not exist"

# Verify function is removed
function_result = execute <<~SQL
SELECT proname FROM pg_proc
WHERE proname = 'Posts_mirror_to_intermediate'
SQL
assert !function_result.any?, "Mirror function should not exist"
end

def test_mirroring_triggers_work
run_command "prep Posts createdAt day"
# Add partitions so we can insert data
run_command "add_partitions Posts --intermediate --past 1 --future 1"
run_command "enable_mirroring Posts", expected_stderr: /Mirroring triggers enabled for Posts/

# Create users for foreign key constraints
user1_id = execute(%!INSERT INTO "Users" DEFAULT VALUES RETURNING "Id"!).first["Id"]
user2_id = execute(%!INSERT INTO "Users" DEFAULT VALUES RETURNING "Id"!).first["Id"]

# Initially Posts has data from schema, but Posts_intermediate is empty
initial_posts_count = count("Posts")
assert_equal 0, count("Posts_intermediate")

# Test INSERT mirroring - new row should appear in both tables
now = Time.now.utc
inserted_result = execute %!INSERT INTO "Posts" ("createdAt", "UserId") VALUES ($1, $2) RETURNING "Id"!, [now.iso8601, user1_id]
inserted_id = inserted_result.first["Id"]
assert_equal initial_posts_count + 1, count("Posts")
assert_equal 1, count("Posts_intermediate"), "INSERT should be mirrored to intermediate table"

# Verify the inserted row exists in intermediate table
intermediate_row = execute(%!SELECT * FROM "Posts_intermediate" WHERE "Id" = $1!, [inserted_id]).first
assert intermediate_row, "Inserted row should exist in intermediate table"
assert_equal user1_id.to_s, intermediate_row["UserId"]

# Test UPDATE mirroring - changes should be mirrored
execute %!UPDATE "Posts" SET "UserId" = $1 WHERE "Id" = $2!, [user2_id, inserted_id]
updated_intermediate = execute(%!SELECT "UserId" FROM "Posts_intermediate" WHERE "Id" = $1!, [inserted_id]).first
assert_equal user2_id.to_s, updated_intermediate["UserId"], "UPDATE should be mirrored to intermediate table"

# Test DELETE mirroring - deletion should be mirrored
execute %!DELETE FROM "Posts" WHERE "Id" = $1!, [inserted_id]
assert_equal initial_posts_count, count("Posts")
assert_equal 0, count("Posts_intermediate"), "DELETE should be mirrored to intermediate table"
end

private

def assert_period(period, column: "createdAt", trigger_based: false, tablespace: false, version: nil)
Expand Down Expand Up @@ -442,7 +535,7 @@ def assert_error(message, command)
run_command command, error: message
end

def run_command(command, error: nil, allow_stderr: false)
def run_command(command, error: nil, expected_stderr: nil)
if verbose?
puts "$ pgslice #{command}"
puts
Expand All @@ -456,7 +549,9 @@ def run_command(command, error: nil, allow_stderr: false)
end
if error
assert_match error, stderr
elsif !allow_stderr
elsif expected_stderr
assert_match expected_stderr, stderr
else
assert_equal "", stderr
end
stdout
Expand Down