Skip to content

Commit

Permalink
make stop nice
Browse files Browse the repository at this point in the history
  • Loading branch information
mkysel committed Feb 11, 2025
1 parent 4773e24 commit e2c21cf
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
19 changes: 18 additions & 1 deletion pkg/api/metadata/cursorUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/tracing"
"go.uber.org/zap"
"sync"
"time"
Expand All @@ -14,12 +15,15 @@ type CursorUpdater interface {
GetCursor() *envelopes.Cursor
AddSubscriber(clientID string, updateChan chan struct{})
RemoveSubscriber(clientID string)
Stop()
}

type DBBasedCursorUpdater struct {
ctx context.Context
log *zap.Logger
store *sql.DB
cancel context.CancelFunc
wg sync.WaitGroup
cursorMu sync.RWMutex
cursor map[uint32]uint64
subscribersMu sync.RWMutex
Expand All @@ -28,14 +32,23 @@ type DBBasedCursorUpdater struct {

func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) CursorUpdater {
subscribers := make(map[string][]chan struct{})
ctx, cancel := context.WithCancel(ctx)
cu := DBBasedCursorUpdater{
ctx: ctx,
log: log.Named("cursor-updater"),
store: store,
cancel: cancel,
wg: sync.WaitGroup{},
subscribers: subscribers,
}

go cu.start()
tracing.GoPanicWrap(
cu.ctx,
&cu.wg,
"cursor-updater",
func(ctx context.Context) {
cu.start()
})
return &cu
}

Expand Down Expand Up @@ -126,3 +139,7 @@ func (cu *DBBasedCursorUpdater) RemoveSubscriber(clientID string) {
defer cu.subscribersMu.Unlock()
delete(cu.subscribers, clientID)
}
func (cu *DBBasedCursorUpdater) Stop() {
cu.cancel()
cu.wg.Wait()
}
12 changes: 9 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type ReplicationServer struct {
options config.ServerOptions
metrics *metrics.Server
validationService mlsvalidate.MLSValidationService
cursorUpdater metadata.CursorUpdater
}

func NewReplicationServer(
Expand Down Expand Up @@ -183,14 +184,15 @@ func startAPIServer(
return err
}
}
cursorUpdater := metadata.NewCursorUpdater(ctx, log, writerDB)
s.cursorUpdater = metadata.NewCursorUpdater(ctx, log, writerDB)

replicationService, err := message.NewReplicationApiService(
ctx,
log,
s.registrant,
writerDB,
s.validationService,
cursorUpdater,
s.cursorUpdater,
)
if err != nil {
return err
Expand All @@ -202,7 +204,7 @@ func startAPIServer(
metadataService, err := metadata.NewMetadataApiService(
ctx,
log,
cursorUpdater,
s.cursorUpdater,
)
if err != nil {
return err
Expand Down Expand Up @@ -281,6 +283,10 @@ func (s *ReplicationServer) Shutdown() {
s.nodeRegistry.Stop()
}

if s.cursorUpdater != nil {
s.cursorUpdater.Stop()
}

if s.indx != nil {
s.indx.Close()
}
Expand Down

0 comments on commit e2c21cf

Please sign in to comment.