From 957ddcba6cfa058dd29f42368ad010dd8baca765 Mon Sep 17 00:00:00 2001 From: Martin Kysel Date: Thu, 6 Feb 2025 12:21:40 -0500 Subject: [PATCH] make stop nice --- pkg/api/metadata/cursorUpdater.go | 19 ++++++++++++++++++- pkg/server/server.go | 12 +++++++++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/api/metadata/cursorUpdater.go b/pkg/api/metadata/cursorUpdater.go index 7a54111a..383c1c97 100644 --- a/pkg/api/metadata/cursorUpdater.go +++ b/pkg/api/metadata/cursorUpdater.go @@ -5,6 +5,7 @@ import ( "database/sql" "github.com/xmtp/xmtpd/pkg/db/queries" "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes" + "github.com/xmtp/xmtpd/pkg/tracing" "go.uber.org/zap" "sync" "time" @@ -14,12 +15,15 @@ type CursorUpdater interface { GetCursor() *envelopes.Cursor AddSubscriber(clientID string, updateChan chan struct{}) RemoveSubscriber(clientID string) + Stop() } type DBBasedCursorUpdater struct { ctx context.Context log *zap.Logger store *sql.DB + cancel context.CancelFunc + wg sync.WaitGroup cursorMu sync.RWMutex cursor map[uint32]uint64 subscribersMu sync.RWMutex @@ -28,14 +32,23 @@ type DBBasedCursorUpdater struct { func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) CursorUpdater { subscribers := make(map[string][]chan struct{}) + ctx, cancel := context.WithCancel(ctx) cu := DBBasedCursorUpdater{ ctx: ctx, log: log.Named("cursor-updater"), store: store, + cancel: cancel, + wg: sync.WaitGroup{}, subscribers: subscribers, } - go cu.start() + tracing.GoPanicWrap( + cu.ctx, + &cu.wg, + "cursor-updater", + func(ctx context.Context) { + cu.start() + }) return &cu } @@ -126,3 +139,7 @@ func (cu *DBBasedCursorUpdater) RemoveSubscriber(clientID string) { defer cu.subscribersMu.Unlock() delete(cu.subscribers, clientID) } +func (cu *DBBasedCursorUpdater) Stop() { + cu.cancel() + cu.wg.Wait() +} diff --git a/pkg/server/server.go b/pkg/server/server.go index ce7c0171..852b1a9b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -47,6 +47,7 @@ type ReplicationServer struct { options config.ServerOptions metrics *metrics.Server validationService mlsvalidate.MLSValidationService + cursorUpdater metadata.CursorUpdater } func NewReplicationServer( @@ -183,14 +184,15 @@ func startAPIServer( return err } } - cursorUpdater := metadata.NewCursorUpdater(ctx, log, writerDB) + s.cursorUpdater = metadata.NewCursorUpdater(ctx, log, writerDB) + replicationService, err := message.NewReplicationApiService( ctx, log, s.registrant, writerDB, s.validationService, - cursorUpdater, + s.cursorUpdater, ) if err != nil { return err @@ -202,7 +204,7 @@ func startAPIServer( metadataService, err := metadata.NewMetadataApiService( ctx, log, - cursorUpdater, + s.cursorUpdater, ) if err != nil { return err @@ -281,6 +283,10 @@ func (s *ReplicationServer) Shutdown() { s.nodeRegistry.Stop() } + if s.cursorUpdater != nil { + s.cursorUpdater.Stop() + } + if s.indx != nil { s.indx.Close() }