Skip to content

Commit 0e58cbc

Browse files
authored
Fix slot drop race (#253)
* Address race condition during unregistration process * bug fix
1 parent c6cd12c commit 0e58cbc

File tree

2 files changed

+49
-12
lines changed

2 files changed

+49
-12
lines changed

cmd/pg_unregister/main.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"encoding/base64"
66
"fmt"
7+
"log"
78
"os"
9+
"time"
810

911
"github.com/fly-apps/postgres-flex/internal/flypg"
1012
"github.com/fly-apps/postgres-flex/internal/flypg/admin"
1113
"github.com/fly-apps/postgres-flex/internal/utils"
14+
"github.com/jackc/pgx/v5"
1215
)
1316

1417
func main() {
@@ -49,20 +52,43 @@ func processUnregistration(ctx context.Context) error {
4952
return fmt.Errorf("failed to unregister member: %v", err)
5053
}
5154

52-
slots, err := admin.ListReplicationSlots(ctx, conn)
53-
if err != nil {
54-
return fmt.Errorf("failed to list replication slots: %v", err)
55+
slotName := fmt.Sprintf("repmgr_slot_%d", member.ID)
56+
if err := removeReplicationSlot(ctx, conn, slotName); err != nil {
57+
return err
5558
}
5659

57-
targetSlot := fmt.Sprintf("repmgr_slot_%d", member.ID)
58-
for _, slot := range slots {
59-
if slot.Name == targetSlot {
60-
if err := admin.DropReplicationSlot(ctx, conn, targetSlot); err != nil {
61-
return fmt.Errorf("failed to drop replication slot: %v", err)
60+
return nil
61+
}
62+
63+
func removeReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error {
64+
ticker := time.NewTicker(1 * time.Second)
65+
timeout := time.After(10 * time.Second)
66+
defer ticker.Stop()
67+
for {
68+
select {
69+
case <-ctx.Done():
70+
return ctx.Err()
71+
case <-timeout:
72+
return fmt.Errorf("timed out trying to drop replication slot")
73+
case <-ticker.C:
74+
slot, err := admin.GetReplicationSlot(ctx, conn, slotName)
75+
if err != nil {
76+
if err == pgx.ErrNoRows {
77+
return nil
78+
}
79+
return fmt.Errorf("failed to get replication slot %s: %v", slotName, err)
80+
}
81+
82+
if slot.Active {
83+
log.Printf("Slot %s is still active, waiting...", slotName)
84+
continue
6285
}
63-
break
86+
87+
if err := admin.DropReplicationSlot(ctx, conn, slotName); err != nil {
88+
return fmt.Errorf("failed to drop replication slot %s: %v", slotName, err)
89+
}
90+
91+
return nil
6492
}
6593
}
66-
67-
return nil
6894
}

internal/flypg/admin/admin.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,18 @@ type ReplicationSlot struct {
130130
RetainedWalInBytes int
131131
}
132132

133+
func GetReplicationSlot(ctx context.Context, pg *pgx.Conn, slotName string) (*ReplicationSlot, error) {
134+
sql := fmt.Sprintf("SELECT slot_name, active, wal_status, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS retained_wal FROM pg_replication_slots where slot_name = '%s';", slotName)
135+
row := pg.QueryRow(ctx, sql)
136+
137+
var slot ReplicationSlot
138+
if err := row.Scan(&slot.Name, &slot.Active, &slot.WalStatus, &slot.RetainedWalInBytes); err != nil {
139+
return nil, err
140+
}
141+
142+
return &slot, nil
143+
}
144+
133145
func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot, error) {
134146
sql := "SELECT slot_name, active, wal_status, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS retained_wal FROM pg_replication_slots;"
135147
rows, err := pg.Query(ctx, sql)
@@ -167,7 +179,6 @@ func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot,
167179

168180
func DropReplicationSlot(ctx context.Context, pg *pgx.Conn, name string) error {
169181
sql := fmt.Sprintf("SELECT pg_drop_replication_slot('%s');", name)
170-
171182
_, err := pg.Exec(ctx, sql)
172183
if err != nil {
173184
return err

0 commit comments

Comments
 (0)