Skip to content

Commit 3d43c9b

Browse files
authored
Merge pull request #41 from fly-apps/fix-connection-leak-2
Push ticker logic into a separate function to address connection leak
2 parents 2e7cbbb + d32325d commit 3d43c9b

File tree

1 file changed

+42
-35
lines changed

1 file changed

+42
-35
lines changed

cmd/standby_cleaner/main.go

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,6 @@ func main() {
2525
os.Exit(1)
2626
}
2727

28-
// TODO - We should connect using the flypgadmin user so we can differentiate between
29-
// internal admin connection usage and the actual repmgr process.
30-
conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
31-
if err != nil {
32-
fmt.Printf("failed to open local connection: %s\n", err)
33-
os.Exit(1)
34-
}
35-
defer conn.Close(ctx)
36-
3728
internal, err := flypg.ReadFromFile("/data/flypg.internal.conf")
3829
if err != nil {
3930
fmt.Printf("failed to open config: %s\n", err)
@@ -64,45 +55,61 @@ func main() {
6455
for {
6556
select {
6657
case <-ticker.C:
67-
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
68-
if err != nil {
69-
fmt.Printf("Failed to check role: %s\n", err)
70-
continue
58+
if err := handleTick(ctx, flypgNode, seenAt, deadMemberRemovalThreshold); err != nil {
59+
fmt.Println(err)
7160
}
61+
}
62+
}
63+
}
7264

73-
if role != flypg.PrimaryRoleName {
74-
continue
75-
}
65+
func handleTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error {
66+
// TODO - We should connect using the flypgadmin user so we can differentiate between
67+
// internal admin connection usage and the actual repmgr process.
68+
conn, err := node.RepMgr.NewLocalConnection(ctx)
69+
if err != nil {
70+
fmt.Printf("failed to open local connection: %s\n", err)
71+
os.Exit(1)
72+
}
73+
defer conn.Close(ctx)
7674

77-
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
78-
if err != nil {
79-
fmt.Printf("Failed to query standbys: %s\n", err)
80-
continue
81-
}
75+
role, err := node.RepMgr.CurrentRole(ctx, conn)
76+
if err != nil {
77+
return fmt.Errorf("failed to check role: %s", err)
78+
}
8279

83-
for _, standby := range standbys {
84-
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
85-
defer newConn.Close(ctx)
86-
if err != nil {
87-
// TODO - Verify the exception that's getting thrown.
88-
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
89-
if err := flypgNode.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
90-
fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error())
91-
continue
92-
}
80+
if role != flypg.PrimaryRoleName {
81+
return nil
82+
}
9383

94-
delete(seenAt, standby.Id)
95-
}
84+
standbys, err := node.RepMgr.Standbys(ctx, conn)
85+
if err != nil {
86+
return fmt.Errorf("failed to query standbys: %s", err)
87+
}
9688

89+
for _, standby := range standbys {
90+
// Wrap this in a function so connections are properly closed.
91+
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Ip)
92+
if err != nil {
93+
// TODO - Verify the exception that's getting thrown.
94+
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
95+
if err := node.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
96+
fmt.Printf("failed to unregister member %d: %v", standby.Id, err)
9797
continue
9898
}
9999

100-
seenAt[standby.Id] = time.Now()
100+
delete(seenAt, standby.Id)
101101
}
102102

103-
removeOrphanedReplicationSlots(ctx, conn, standbys)
103+
continue
104104
}
105+
defer sConn.Close(ctx)
106+
107+
seenAt[standby.Id] = time.Now()
105108
}
109+
110+
removeOrphanedReplicationSlots(ctx, conn, standbys)
111+
112+
return nil
106113
}
107114

108115
func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Standby) {

0 commit comments

Comments
 (0)