Skip to content

Commit a39f76f

Browse files
authored
switch node names to machine ID (#251)
* WIP switch node names to machine ID * fix replica clone * updates * fixes * TODO * fix application_name on replica startup * move to function * reuse code * upgrade primary * clean up the diff * fix tests * silence warning * missed a few * fix deepsource callout * make restart-repmgrd more resilient * make pg_unregister work with new names * Accept migration failures * add missing panic * update pg_unregister comment * remove old comment
1 parent 7a8ac4d commit a39f76f

File tree

9 files changed

+220
-47
lines changed

9 files changed

+220
-47
lines changed

Diff for: bin/restart-repmgrd

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
11
#!/bin/bash
22

3-
kill `cat /tmp/repmgrd.pid`
3+
if [ -f /tmp/repmgrd.pid ]; then
4+
PID=$(cat /tmp/repmgrd.pid)
5+
6+
# Check if the process is running
7+
if ps -p $PID > /dev/null 2>&1; then
8+
kill $PID
9+
fi
10+
fi

Diff for: cmd/pg_unregister/main.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"encoding/base64"
6+
"errors"
67
"fmt"
78
"log"
89
"os"
@@ -44,7 +45,15 @@ func processUnregistration(ctx context.Context) error {
4445
defer func() { _ = conn.Close(ctx) }()
4546

4647
member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
47-
if err != nil {
48+
if errors.Is(err, pgx.ErrNoRows) {
49+
// for historical reasons, old versions of flyctl passes in the 6pn as the hostname
50+
// most likely this won't work because the hostname does not resolve if the machine is stopped,
51+
// but we try anyway
52+
member, err = node.RepMgr.MemberBy6PN(ctx, conn, string(hostnameBytes))
53+
if err != nil {
54+
return fmt.Errorf("failed to resolve member by hostname and 6pn: %s", err)
55+
}
56+
} else if err != nil {
4857
return fmt.Errorf("failed to resolve member: %s", err)
4958
}
5059

Diff for: go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ require (
88
github.com/hashicorp/consul/api v1.18.0
99
github.com/jackc/pgconn v1.14.3
1010
github.com/jackc/pgx/v5 v5.5.4
11+
github.com/olekukonko/tablewriter v0.0.5
1112
github.com/pkg/errors v0.9.1
1213
github.com/pkg/term v1.1.0
14+
github.com/spf13/cobra v1.8.1
1315
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2
1416
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
1517
golang.org/x/sync v0.1.0
@@ -36,8 +38,6 @@ require (
3638
github.com/mattn/go-runewidth v0.0.9 // indirect
3739
github.com/mitchellh/go-homedir v1.1.0 // indirect
3840
github.com/mitchellh/mapstructure v1.4.1 // indirect
39-
github.com/olekukonko/tablewriter v0.0.5 // indirect
40-
github.com/spf13/cobra v1.8.1 // indirect
4141
github.com/spf13/pflag v1.0.5 // indirect
4242
github.com/stretchr/objx v0.5.0 // indirect
4343
golang.org/x/crypto v0.20.0 // indirect

Diff for: internal/flypg/node.go

+62-3
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ import (
1616
"github.com/fly-apps/postgres-flex/internal/privnet"
1717
"github.com/fly-apps/postgres-flex/internal/utils"
1818
"github.com/jackc/pgx/v5"
19+
"golang.org/x/exp/slices"
1920
)
2021

2122
type Node struct {
2223
AppName string
24+
MachineID string
2325
PrivateIP string
2426
PrimaryRegion string
2527
DataDir string
@@ -52,6 +54,8 @@ func NewNode() (*Node, error) {
5254

5355
node.PrivateIP = ipv6.String()
5456

57+
node.MachineID = os.Getenv("FLY_MACHINE_ID")
58+
5559
node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
5660
if node.PrimaryRegion == "" {
5761
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
@@ -89,6 +93,7 @@ func NewNode() (*Node, error) {
8993
PasswordConfigPath: "/data/.pgpass",
9094
DataDir: node.DataDir,
9195
PrivateIP: node.PrivateIP,
96+
MachineID: node.MachineID,
9297
Port: 5433,
9398
DatabaseName: "repmgr",
9499
Credentials: node.ReplCredentials,
@@ -265,7 +270,7 @@ func (n *Node) PostInit(ctx context.Context) error {
265270
return fmt.Errorf("failed to resolve member role: %s", err)
266271
}
267272

268-
// Restart repmgrd in the event the IP changes for an already registered node.
273+
// Restart repmgrd in the event the machine ID changes for an already registered node.
269274
// This can happen if the underlying volume is moved to a different node.
270275
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
271276

@@ -279,6 +284,8 @@ func (n *Node) PostInit(ctx context.Context) error {
279284
if err := Quarantine(ctx, n, primary); err != nil {
280285
return fmt.Errorf("failed to quarantine failed primary: %s", err)
281286
}
287+
288+
panic(err)
282289
} else if errors.Is(err, ErrZombieDiscovered) {
283290
log.Printf("[ERROR] The majority of registered members agree that '%s' is the real primary.\n", primary)
284291
// Turn member read-only
@@ -292,10 +299,10 @@ func (n *Node) PostInit(ctx context.Context) error {
292299
}
293300

294301
// This should never happen
295-
if primary != n.PrivateIP {
302+
if primary != n.RepMgr.machineIdToDNS(n.MachineID) {
296303
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen",
297304
primary,
298-
n.PrivateIP,
305+
n.RepMgr.machineIdToDNS(n.MachineID),
299306
)
300307
}
301308

@@ -311,6 +318,11 @@ func (n *Node) PostInit(ctx context.Context) error {
311318
}
312319
}
313320
case StandbyRoleName:
321+
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
322+
log.Printf("[ERROR] failed to migrate node name: %s", err)
323+
// We try to bring the standby up anyway
324+
}
325+
314326
// Register existing standby to apply any configuration changes.
315327
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
316328
return fmt.Errorf("failed to register existing standby: %s", err)
@@ -527,3 +539,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro
527539

528540
return nil
529541
}
542+
543+
// migrate node name from 6pn to machine ID if needed
544+
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
545+
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
546+
if err != nil {
547+
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
548+
}
549+
550+
primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname)
551+
if err != nil {
552+
return fmt.Errorf("failed to establish connection to primary: %s", err)
553+
}
554+
defer func() { _ = primaryConn.Close(ctx) }()
555+
556+
rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
557+
if err != nil {
558+
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
559+
}
560+
defer rows.Close()
561+
562+
var applicationNames []string
563+
for rows.Next() {
564+
var applicationName string
565+
if err := rows.Scan(&applicationName); err != nil {
566+
return fmt.Errorf("failed to scan application_name: %s", err)
567+
}
568+
applicationNames = append(applicationNames, applicationName)
569+
}
570+
if err := rows.Err(); err != nil {
571+
return fmt.Errorf("failed to iterate over rows: %s", err)
572+
}
573+
574+
// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
575+
if slices.Contains(applicationNames, n.PrivateIP) {
576+
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")
577+
578+
if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
579+
return fmt.Errorf("failed to clone standby: %s", err)
580+
}
581+
582+
if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil {
583+
return fmt.Errorf("failed to reload postgresql: %s", err)
584+
}
585+
}
586+
587+
return nil
588+
}

Diff for: internal/flypg/readonly.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
7070

7171
for _, member := range members {
7272
if member.Role == PrimaryRoleName {
73-
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
73+
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target)
7474
resp, err := http.Get(endpoint)
7575
if err != nil {
7676
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
@@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
8585
}
8686

8787
for _, member := range members {
88-
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
88+
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint)
8989
resp, err := http.Get(endpoint)
9090
if err != nil {
9191
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)

Diff for: internal/flypg/repmgr.go

+81-10
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type RepMgr struct {
3434
PrimaryRegion string
3535
Region string
3636
PrivateIP string
37+
MachineID string
3738
DataDir string
3839
DatabaseName string
3940
Credentials admin.Credential
@@ -161,10 +162,12 @@ func (r *RepMgr) setDefaults() error {
161162
return err
162163
}
163164

165+
hostname := r.machineIdToDNS(r.MachineID)
166+
164167
conf := ConfigMap{
165168
"node_id": nodeID,
166-
"node_name": fmt.Sprintf("'%s'", r.PrivateIP),
167-
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName),
169+
"node_name": fmt.Sprintf("'%s'", hostname),
170+
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName),
168171
"data_directory": fmt.Sprintf("'%s'", r.DataDir),
169172
"failover": "'automatic'",
170173
"use_replication_slots": "yes",
@@ -276,7 +279,7 @@ func (*RepMgr) restartDaemon() error {
276279
}
277280

278281
func (r *RepMgr) daemonRestartRequired(m *Member) bool {
279-
return m.Hostname != r.PrivateIP
282+
return m.Hostname != r.MachineID
280283
}
281284

282285
func (r *RepMgr) unregisterWitness(id int) error {
@@ -301,14 +304,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error {
301304
return err
302305
}
303306

304-
func (r *RepMgr) clonePrimary(ipStr string) error {
307+
func (r *RepMgr) clonePrimary(hostname string) error {
305308
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
306309
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
307310
return fmt.Errorf("failed to create pg directory: %s", err)
308311
}
309312

310313
cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F",
311-
ipStr,
314+
hostname,
312315
r.Port,
313316
r.DatabaseName,
314317
r.Credentials.Username,
@@ -322,6 +325,21 @@ func (r *RepMgr) clonePrimary(ipStr string) error {
322325
return nil
323326
}
324327

328+
func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
329+
// TODO: do we need -c?
330+
if _, err := utils.RunCmd(ctx, "postgres",
331+
"repmgr", "--replication-conf-only",
332+
"-h", "",
333+
"-p", fmt.Sprint(r.Port),
334+
"-d", r.DatabaseName,
335+
"-U", r.Credentials.Username,
336+
"-f", r.ConfigPath,
337+
"standby", "clone", "-F"); err != nil {
338+
return fmt.Errorf("failed to regenerate replication conf: %s", err)
339+
}
340+
return nil
341+
}
342+
325343
type Member struct {
326344
ID int
327345
Hostname string
@@ -431,26 +449,56 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri
431449
return &member, nil
432450
}
433451

452+
// MemberBy6PN returns a member by its 6PN address.
453+
func (r *RepMgr) MemberBy6PN(ctx context.Context, pg *pgx.Conn, ip string) (*Member, error) {
454+
members, err := r.Members(ctx, pg)
455+
if err != nil {
456+
return nil, err
457+
}
458+
459+
resolver := privnet.GetResolver()
460+
var lastErr error
461+
for _, member := range members {
462+
ips, err := resolver.LookupIPAddr(ctx, member.Hostname)
463+
if err != nil {
464+
lastErr = err
465+
continue
466+
}
467+
468+
for _, addr := range ips {
469+
if addr.IP.String() == ip {
470+
return &member, nil
471+
}
472+
}
473+
}
474+
475+
if lastErr != nil {
476+
return nil, fmt.Errorf("no matches found for %s, and error encountered: %s", ip, lastErr)
477+
}
478+
479+
return nil, nil
480+
}
481+
434482
func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
435-
ips, err := r.InRegionPeerIPs(ctx)
483+
machineIds, err := r.InRegionPeerMachines(ctx)
436484
if err != nil {
437485
return nil, err
438486
}
439487

440488
var target *Member
441489

442-
for _, ip := range ips {
443-
if ip.String() == r.PrivateIP {
490+
for _, machineId := range machineIds {
491+
if machineId == r.MachineID {
444492
continue
445493
}
446494

447-
conn, err := r.NewRemoteConnection(ctx, ip.String())
495+
conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId))
448496
if err != nil {
449497
continue
450498
}
451499
defer func() { _ = conn.Close(ctx) }()
452500

453-
member, err := r.MemberByHostname(ctx, conn, ip.String())
501+
member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId))
454502
if err != nil {
455503
continue
456504
}
@@ -477,6 +525,21 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) {
477525
return privnet.AllPeers(ctx, targets)
478526
}
479527

528+
func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) {
529+
machines, err := privnet.AllMachines(ctx, r.AppName)
530+
if err != nil {
531+
return nil, err
532+
}
533+
534+
var machineIDs []string
535+
for _, machine := range machines {
536+
if machine.Region == r.PrimaryRegion {
537+
machineIDs = append(machineIDs, machine.Id)
538+
}
539+
}
540+
return machineIDs, nil
541+
}
542+
480543
func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) {
481544
ips, err := r.InRegionPeerIPs(ctx)
482545
if err != nil {
@@ -514,3 +577,11 @@ func (r *RepMgr) UnregisterMember(member Member) error {
514577
func (r *RepMgr) eligiblePrimary() bool {
515578
return r.Region == r.PrimaryRegion
516579
}
580+
581+
func (r *RepMgr) machineIdToDNS(nodeName string) string {
582+
if len(nodeName) != 14 {
583+
panic("invalid machine id")
584+
}
585+
586+
return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName)
587+
}

Diff for: internal/flypg/repmgr_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func TestRepmgrInitialization(t *testing.T) {
3333
UserConfigPath: repgmrUserConfigFilePath,
3434
PasswordConfigPath: repgmrPasswordConfigFilePath,
3535
DataDir: repmgrTestDirectory,
36+
MachineID: "abcdefg1234567",
3637
PrivateIP: "127.0.0.1",
3738
Credentials: admin.Credential{
3839
Username: "user",
@@ -91,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) {
9192
t.Fatal(err)
9293
}
9394

94-
if config["node_name"] != "'127.0.0.1'" {
95-
t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"])
95+
if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" {
96+
t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"])
9697
}
9798

9899
if config["location"] != "'dev'" {
@@ -122,6 +123,7 @@ func TestRepmgrNodeIDGeneration(t *testing.T) {
122123

123124
DataDir: repmgrTestDirectory,
124125
PrivateIP: "127.0.0.1",
126+
MachineID: "abcdefg1234567",
125127
Port: 5433,
126128
DatabaseName: "repmgr",
127129
Credentials: admin.Credential{

0 commit comments

Comments
 (0)