diff --git a/pkg/api/message/service.go b/pkg/api/message/service.go index 99585773..46a64653 100644 --- a/pkg/api/message/service.go +++ b/pkg/api/message/service.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "github.com/xmtp/xmtpd/pkg/api/metadata" "time" "github.com/xmtp/xmtpd/pkg/db" @@ -15,7 +16,7 @@ import ( "github.com/xmtp/xmtpd/pkg/registrant" "github.com/xmtp/xmtpd/pkg/topic" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" + metaProtos "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -40,6 +41,7 @@ type Service struct { publishWorker *publishWorker subscribeWorker *subscribeWorker validationService mlsvalidate.MLSValidationService + cu metadata.CursorUpdater } func NewReplicationApiService( @@ -48,6 +50,7 @@ func NewReplicationApiService( registrant *registrant.Registrant, store *sql.DB, validationService mlsvalidate.MLSValidationService, + updater metadata.CursorUpdater, ) (*Service, error) { publishWorker, err := startPublishWorker(ctx, log, registrant, store) if err != nil { @@ -66,6 +69,7 @@ func NewReplicationApiService( publishWorker: publishWorker, subscribeWorker: subscribeWorker, validationService: validationService, + cu: updater, }, nil } @@ -82,7 +86,7 @@ func (s *Service) SubscribeEnvelopes( // Send a header (any header) to fix an issue with Tonic based GRPC clients. // See: https://github.com/xmtp/libxmtp/pull/58 - err := stream.SendHeader(metadata.Pairs("subscribed", "true")) + err := stream.SendHeader(metaProtos.Pairs("subscribed", "true")) if err != nil { return status.Errorf(codes.Internal, "could not send header: %v", err) } @@ -449,7 +453,25 @@ func (s *Service) validateClientInfo(clientEnv *envelopes.ClientEnvelope) error return status.Errorf(codes.InvalidArgument, "topic does not match payload") } - // TODO(rich): Verify all originators have synced past `last_seen` + if aad.GetDependsOn() != nil { + lastSeenCursor := s.cu.GetCursor() + for nodeId, seqId := range aad.GetDependsOn().NodeIdToSequenceId { + lastSeqId, exists := lastSeenCursor.NodeIdToSequenceId[nodeId] + if !exists { + return fmt.Errorf( + "node ID %d specified in DepenedsOn has not been seen by this node", + nodeId, + ) + } else if seqId > lastSeqId { + return fmt.Errorf( + "sequence ID %d for node ID %d specified in DependsOn exceeds last seen sequence ID %d", + seqId, + nodeId, + lastSeqId, + ) + } + } + } // TODO(rich): Check that the blockchain sequence ID is equal to the latest on the group // TODO(rich): Perform any payload-specific validation (e.g. identity updates) diff --git a/pkg/api/metadata/cursorUpdater.go b/pkg/api/metadata/cursorUpdater.go index 2515da5f..7a54111a 100644 --- a/pkg/api/metadata/cursorUpdater.go +++ b/pkg/api/metadata/cursorUpdater.go @@ -10,7 +10,13 @@ import ( "time" ) -type CursorUpdater struct { +type CursorUpdater interface { + GetCursor() *envelopes.Cursor + AddSubscriber(clientID string, updateChan chan struct{}) + RemoveSubscriber(clientID string) +} + +type DBBasedCursorUpdater struct { ctx context.Context log *zap.Logger store *sql.DB @@ -20,9 +26,9 @@ type CursorUpdater struct { subscribers map[string][]chan struct{} } -func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) *CursorUpdater { +func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) CursorUpdater { subscribers := make(map[string][]chan struct{}) - cu := CursorUpdater{ + cu := DBBasedCursorUpdater{ ctx: ctx, log: log.Named("cursor-updater"), store: store, @@ -33,13 +39,13 @@ func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) *Curs return &cu } -func (cu *CursorUpdater) GetCursor() *envelopes.Cursor { +func (cu *DBBasedCursorUpdater) GetCursor() *envelopes.Cursor { cu.cursorMu.RLock() defer cu.cursorMu.RUnlock() return &envelopes.Cursor{NodeIdToSequenceId: cu.cursor} } -func (cu *CursorUpdater) start() { +func (cu *DBBasedCursorUpdater) start() { ticker := time.NewTicker(100 * time.Millisecond) // Adjust the period as needed defer ticker.Stop() for { @@ -71,7 +77,7 @@ func equalCursors(a, b map[uint32]uint64) bool { return true } -func (cu *CursorUpdater) read() (bool, error) { +func (cu *DBBasedCursorUpdater) read() (bool, error) { rows, err := queries.New(cu.store).GetLatestCursor(cu.ctx) if err != nil { return false, err @@ -93,7 +99,7 @@ func (cu *CursorUpdater) read() (bool, error) { return false, nil } -func (cu *CursorUpdater) notifySubscribers() { +func (cu *DBBasedCursorUpdater) notifySubscribers() { cu.subscribersMu.Lock() defer cu.subscribersMu.Unlock() @@ -109,13 +115,13 @@ func (cu *CursorUpdater) notifySubscribers() { } } -func (cu *CursorUpdater) AddSubscriber(clientID string, updateChan chan struct{}) { +func (cu *DBBasedCursorUpdater) AddSubscriber(clientID string, updateChan chan struct{}) { cu.subscribersMu.Lock() defer cu.subscribersMu.Unlock() cu.subscribers[clientID] = append(cu.subscribers[clientID], updateChan) } -func (cu *CursorUpdater) RemoveSubscriber(clientID string) { +func (cu *DBBasedCursorUpdater) RemoveSubscriber(clientID string) { cu.subscribersMu.Lock() defer cu.subscribersMu.Unlock() delete(cu.subscribers, clientID) diff --git a/pkg/api/metadata/service.go b/pkg/api/metadata/service.go index 4ac3aa37..509557d8 100644 --- a/pkg/api/metadata/service.go +++ b/pkg/api/metadata/service.go @@ -2,7 +2,6 @@ package metadata import ( "context" - "database/sql" "fmt" "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/metadata_api" "go.uber.org/zap" @@ -16,18 +15,18 @@ type Service struct { metadata_api.UnimplementedMetadataApiServer ctx context.Context log *zap.Logger - cu *CursorUpdater + cu CursorUpdater } func NewMetadataApiService( ctx context.Context, log *zap.Logger, - store *sql.DB, + updater CursorUpdater, ) (*Service, error) { return &Service{ ctx: ctx, log: log, - cu: NewCursorUpdater(ctx, log, store), + cu: updater, }, nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 4783e3b4..ce7c0171 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -183,13 +183,14 @@ func startAPIServer( return err } } - + cursorUpdater := metadata.NewCursorUpdater(ctx, log, writerDB) replicationService, err := message.NewReplicationApiService( ctx, log, s.registrant, writerDB, s.validationService, + cursorUpdater, ) if err != nil { return err @@ -201,7 +202,7 @@ func startAPIServer( metadataService, err := metadata.NewMetadataApiService( ctx, log, - writerDB, + cursorUpdater, ) if err != nil { return err diff --git a/pkg/testutils/api/api.go b/pkg/testutils/api/api.go index 118b3b85..f9e5331b 100644 --- a/pkg/testutils/api/api.go +++ b/pkg/testutils/api/api.go @@ -124,6 +124,7 @@ func NewTestAPIServer(t *testing.T) (*api.ApiServer, *sql.DB, ApiServerMocks, fu registrant, db, mockValidationService, + metadata.NewCursorUpdater(ctx, log, db), ) require.NoError(t, err) message_api.RegisterReplicationApiServer(grpcServer, replicationService) @@ -139,7 +140,11 @@ func NewTestAPIServer(t *testing.T) (*api.ApiServer, *sql.DB, ApiServerMocks, fu require.NoError(t, err) payer_api.RegisterPayerApiServer(grpcServer, payerService) - metadataService, err := metadata.NewMetadataApiService(ctx, log, db) + metadataService, err := metadata.NewMetadataApiService( + ctx, + log, + metadata.NewCursorUpdater(ctx, log, db), + ) require.NoError(t, err) metadata_api.RegisterMetadataApiServer(grpcServer, metadataService)