Skip to content

Commit

Permalink
DependsOn Validation
Browse files Browse the repository at this point in the history
  • Loading branch information
mkysel committed Feb 10, 2025
1 parent 8e4dada commit 9681737
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 19 deletions.
28 changes: 25 additions & 3 deletions pkg/api/message/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"github.com/xmtp/xmtpd/pkg/api/metadata"
"time"

"github.com/xmtp/xmtpd/pkg/db"
Expand All @@ -15,7 +16,7 @@ import (
"github.com/xmtp/xmtpd/pkg/registrant"
"github.com/xmtp/xmtpd/pkg/topic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
metaProtos "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

Expand All @@ -40,6 +41,7 @@ type Service struct {
publishWorker *publishWorker
subscribeWorker *subscribeWorker
validationService mlsvalidate.MLSValidationService
cu metadata.CursorUpdater
}

func NewReplicationApiService(
Expand All @@ -48,6 +50,7 @@ func NewReplicationApiService(
registrant *registrant.Registrant,
store *sql.DB,
validationService mlsvalidate.MLSValidationService,
updater metadata.CursorUpdater,
) (*Service, error) {
publishWorker, err := startPublishWorker(ctx, log, registrant, store)
if err != nil {
Expand All @@ -66,6 +69,7 @@ func NewReplicationApiService(
publishWorker: publishWorker,
subscribeWorker: subscribeWorker,
validationService: validationService,
cu: updater,
}, nil
}

Expand All @@ -82,7 +86,7 @@ func (s *Service) SubscribeEnvelopes(

// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
err := stream.SendHeader(metadata.Pairs("subscribed", "true"))
err := stream.SendHeader(metaProtos.Pairs("subscribed", "true"))
if err != nil {
return status.Errorf(codes.Internal, "could not send header: %v", err)
}
Expand Down Expand Up @@ -449,7 +453,25 @@ func (s *Service) validateClientInfo(clientEnv *envelopes.ClientEnvelope) error
return status.Errorf(codes.InvalidArgument, "topic does not match payload")
}

// TODO(rich): Verify all originators have synced past `last_seen`
if aad.GetDependsOn() != nil {
lastSeenCursor := s.cu.GetCursor()
for nodeId, seqId := range aad.GetDependsOn().NodeIdToSequenceId {
lastSeqId, exists := lastSeenCursor.NodeIdToSequenceId[nodeId]
if !exists {
return fmt.Errorf(
"node ID %d specified in DepenedsOn has not been seen by this node",
nodeId,
)
} else if seqId > lastSeqId {
return fmt.Errorf(
"sequence ID %d for node ID %d specified in DependsOn exceeds last seen sequence ID %d",
seqId,
nodeId,
lastSeqId,
)
}
}
}
// TODO(rich): Check that the blockchain sequence ID is equal to the latest on the group
// TODO(rich): Perform any payload-specific validation (e.g. identity updates)

Expand Down
24 changes: 15 additions & 9 deletions pkg/api/metadata/cursorUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import (
"time"
)

type CursorUpdater struct {
type CursorUpdater interface {
GetCursor() *envelopes.Cursor
AddSubscriber(clientID string, updateChan chan struct{})
RemoveSubscriber(clientID string)
}

type DBBasedCursorUpdater struct {
ctx context.Context
log *zap.Logger
store *sql.DB
Expand All @@ -20,9 +26,9 @@ type CursorUpdater struct {
subscribers map[string][]chan struct{}
}

func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) *CursorUpdater {
func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) CursorUpdater {
subscribers := make(map[string][]chan struct{})
cu := CursorUpdater{
cu := DBBasedCursorUpdater{
ctx: ctx,
log: log.Named("cursor-updater"),
store: store,
Expand All @@ -33,13 +39,13 @@ func NewCursorUpdater(ctx context.Context, log *zap.Logger, store *sql.DB) *Curs
return &cu
}

func (cu *CursorUpdater) GetCursor() *envelopes.Cursor {
func (cu *DBBasedCursorUpdater) GetCursor() *envelopes.Cursor {
cu.cursorMu.RLock()
defer cu.cursorMu.RUnlock()
return &envelopes.Cursor{NodeIdToSequenceId: cu.cursor}
}

func (cu *CursorUpdater) start() {
func (cu *DBBasedCursorUpdater) start() {
ticker := time.NewTicker(100 * time.Millisecond) // Adjust the period as needed
defer ticker.Stop()
for {
Expand Down Expand Up @@ -71,7 +77,7 @@ func equalCursors(a, b map[uint32]uint64) bool {
return true
}

func (cu *CursorUpdater) read() (bool, error) {
func (cu *DBBasedCursorUpdater) read() (bool, error) {
rows, err := queries.New(cu.store).GetLatestCursor(cu.ctx)
if err != nil {
return false, err
Expand All @@ -93,7 +99,7 @@ func (cu *CursorUpdater) read() (bool, error) {
return false, nil
}

func (cu *CursorUpdater) notifySubscribers() {
func (cu *DBBasedCursorUpdater) notifySubscribers() {
cu.subscribersMu.Lock()
defer cu.subscribersMu.Unlock()

Expand All @@ -109,13 +115,13 @@ func (cu *CursorUpdater) notifySubscribers() {
}
}

func (cu *CursorUpdater) AddSubscriber(clientID string, updateChan chan struct{}) {
func (cu *DBBasedCursorUpdater) AddSubscriber(clientID string, updateChan chan struct{}) {
cu.subscribersMu.Lock()
defer cu.subscribersMu.Unlock()
cu.subscribers[clientID] = append(cu.subscribers[clientID], updateChan)
}

func (cu *CursorUpdater) RemoveSubscriber(clientID string) {
func (cu *DBBasedCursorUpdater) RemoveSubscriber(clientID string) {
cu.subscribersMu.Lock()
defer cu.subscribersMu.Unlock()
delete(cu.subscribers, clientID)
Expand Down
7 changes: 3 additions & 4 deletions pkg/api/metadata/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package metadata

import (
"context"
"database/sql"
"fmt"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/metadata_api"
"go.uber.org/zap"
Expand All @@ -16,18 +15,18 @@ type Service struct {
metadata_api.UnimplementedMetadataApiServer
ctx context.Context
log *zap.Logger
cu *CursorUpdater
cu CursorUpdater
}

func NewMetadataApiService(
ctx context.Context,
log *zap.Logger,
store *sql.DB,
updater CursorUpdater,
) (*Service, error) {
return &Service{
ctx: ctx,
log: log,
cu: NewCursorUpdater(ctx, log, store),
cu: updater,
}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,14 @@ func startAPIServer(
return err
}
}

cursorUpdater := metadata.NewCursorUpdater(ctx, log, writerDB)
replicationService, err := message.NewReplicationApiService(
ctx,
log,
s.registrant,
writerDB,
s.validationService,
cursorUpdater,
)
if err != nil {
return err
Expand All @@ -201,7 +202,7 @@ func startAPIServer(
metadataService, err := metadata.NewMetadataApiService(
ctx,
log,
writerDB,
cursorUpdater,
)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion pkg/testutils/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func NewTestAPIServer(t *testing.T) (*api.ApiServer, *sql.DB, ApiServerMocks, fu
registrant,
db,
mockValidationService,
metadata.NewCursorUpdater(ctx, log, db),
)
require.NoError(t, err)
message_api.RegisterReplicationApiServer(grpcServer, replicationService)
Expand All @@ -139,7 +140,11 @@ func NewTestAPIServer(t *testing.T) (*api.ApiServer, *sql.DB, ApiServerMocks, fu
require.NoError(t, err)
payer_api.RegisterPayerApiServer(grpcServer, payerService)

metadataService, err := metadata.NewMetadataApiService(ctx, log, db)
metadataService, err := metadata.NewMetadataApiService(
ctx,
log,
metadata.NewCursorUpdater(ctx, log, db),
)
require.NoError(t, err)
metadata_api.RegisterMetadataApiServer(grpcServer, metadataService)

Expand Down

0 comments on commit 9681737

Please sign in to comment.