Skip to content

Commit

Permalink
Add tracing to send welcome messages
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Feb 19, 2025
1 parent 8e081cf commit 1923e8a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 53 deletions.
21 changes: 2 additions & 19 deletions pkg/api/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/xmtp/xmtp-node-go/pkg/logging"
messagev1 "github.com/xmtp/xmtp-node-go/pkg/proto/message_api/v1"
"github.com/xmtp/xmtp-node-go/pkg/ratelimiter"
"github.com/xmtp/xmtp-node-go/pkg/types"
"github.com/xmtp/xmtp-node-go/pkg/utils"
)

var ErrDenyListed = errors.New("wallet is deny listed")
Expand Down Expand Up @@ -145,7 +145,7 @@ func (wa *WalletAuthorizer) applyLimits(ctx context.Context, fullMethod string,
// * for other authorization failure return status.Errorf(codes.PermissionDenied, ...)
_, method := splitMethodName(fullMethod)

ip := clientIPFromContext(ctx)
ip := utils.ClientIPFromContext(ctx)
if len(ip) == 0 {
// requests without an IP address are bucketed together as "ip_unknown"
ip = "ip_unknown"
Expand Down Expand Up @@ -236,20 +236,3 @@ func allowedToPublish(topic string, wallet types.WalletAddr) bool {

return true
}

func clientIPFromContext(ctx context.Context) string {
md, _ := metadata.FromIncomingContext(ctx)
vals := md.Get("x-forwarded-for")
if len(vals) == 0 {
p, ok := peer.FromContext(ctx)
if ok {
ipAndPort := strings.Split(p.Addr.String(), ":")
return ipAndPort[0]
} else {
return ""
}
}
// There are potentially multiple comma separated IPs bundled in that first value
ips := strings.Split(vals[0], ",")
return strings.TrimSpace(ips[0])
}
3 changes: 2 additions & 1 deletion pkg/api/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context"
"github.com/xmtp/xmtp-node-go/pkg/metrics"
"github.com/xmtp/xmtp-node-go/pkg/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (ti *TelemetryInterceptor) record(ctx context.Context, fullMethod string, d
ri.ZapFields()...,
)

if ip := clientIPFromContext(ctx); len(ip) > 0 {
if ip := utils.ClientIPFromContext(ctx); len(ip) > 0 {
fields = append(fields, zap.String("client_ip", ip))
}

Expand Down
91 changes: 58 additions & 33 deletions pkg/mls/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import (
v1proto "github.com/xmtp/xmtp-node-go/pkg/proto/message_api/v1"
mlsv1 "github.com/xmtp/xmtp-node-go/pkg/proto/mls/api/v1"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
"github.com/xmtp/xmtp-node-go/pkg/utils"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
pb "google.golang.org/protobuf/proto"
emptypb "google.golang.org/protobuf/types/known/emptypb"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

type Service struct {
Expand Down Expand Up @@ -240,10 +243,10 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
msgB, err := pb.Marshal(&mlsv1.GroupMessage{
Version: &mlsv1.GroupMessage_V1_{
V1: &mlsv1.GroupMessage_V1{
Id: uint64(msg.ID),
CreatedNs: uint64(msg.CreatedAt.UnixNano()),
GroupId: msg.GroupID,
Data: msg.Data,
Id: uint64(msg.ID),
CreatedNs: uint64(msg.CreatedAt.UnixNano()),
GroupId: msg.GroupID,
Data: msg.Data,
SenderHmac: msgV1.SenderHmac,
},
},
Expand Down Expand Up @@ -278,42 +281,64 @@ func (s *Service) SendWelcomeMessages(ctx context.Context, req *mlsv1.SendWelcom
return nil, err
}

// TODO: Wrap this in a transaction so publishing is all or nothing
for _, input := range req.Messages {
msg, err := s.store.InsertWelcomeMessage(ctx, input.GetV1().InstallationKey, input.GetV1().Data, input.GetV1().HpkePublicKey)
if err != nil {
if mlsstore.IsAlreadyExistsError(err) {
continue
// TODO: Remove after debugging is done
ip := utils.ClientIPFromContext(ctx)

err = tracing.Wrap(ctx, log, "send-welcome-messages", func(ctx context.Context, log *zap.Logger, span tracing.Span) error {
tracing.SpanTag(span, "client_ip", ip)
tracing.SpanTag(span, "message_count", len(req.Messages))

// TODO: Wrap this in a transaction so publishing is all or nothing
for _, input := range req.Messages {
insertSpan, insertCtx := tracer.StartSpanFromContext(ctx, "insert-welcome-message")
insertLogger := tracing.Link(insertSpan, log)
insertLogger.Info("inserting welcome message", zap.String("client_ip", ip), zap.Int("message_length", len(input.GetV1().Data)))
msg, err := s.store.InsertWelcomeMessage(insertCtx, input.GetV1().InstallationKey, input.GetV1().Data, input.GetV1().HpkePublicKey)
insertSpan.Finish(tracing.WithError(err))
if err != nil {
if mlsstore.IsAlreadyExistsError(err) {
continue
}
return status.Errorf(codes.Internal, "failed to insert message: %s", err)
}
return nil, status.Errorf(codes.Internal, "failed to insert message: %s", err)
}

msgB, err := pb.Marshal(&mlsv1.WelcomeMessage{
Version: &mlsv1.WelcomeMessage_V1_{
V1: &mlsv1.WelcomeMessage_V1{
Id: uint64(msg.ID),
CreatedNs: uint64(msg.CreatedAt.UnixNano()),
InstallationKey: msg.InstallationKey,
Data: msg.Data,
HpkePublicKey: msg.HpkePublicKey,
msgB, err := pb.Marshal(&mlsv1.WelcomeMessage{
Version: &mlsv1.WelcomeMessage_V1_{
V1: &mlsv1.WelcomeMessage_V1{
Id: uint64(msg.ID),
CreatedNs: uint64(msg.CreatedAt.UnixNano()),
InstallationKey: msg.InstallationKey,
Data: msg.Data,
HpkePublicKey: msg.HpkePublicKey,
},
},
},
})
if err != nil {
return nil, err
}
})
if err != nil {
return err
}

err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1WelcomeTopic(input.GetV1().InstallationKey),
Timestamp: msg.CreatedAt.UnixNano(),
Payload: msgB,
})
if err != nil {
return nil, err
publishSpan, publishCtx := tracer.StartSpanFromContext(ctx, "publish-welcome-to-relay")
err = s.publishToWakuRelay(publishCtx, &wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1WelcomeTopic(input.GetV1().InstallationKey),
Timestamp: msg.CreatedAt.UnixNano(),
Payload: msgB,
})
publishSpan.Finish(tracing.WithError(err))

if err != nil {
return err
}

metrics.EmitMLSSentWelcomeMessage(ctx, log, msg)
}

metrics.EmitMLSSentWelcomeMessage(ctx, log, msg)
return nil
})

if err != nil {
return nil, err
}

return &emptypb.Empty{}, nil
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/utils/ip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package utils

import (
"context"
"strings"

"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)

func ClientIPFromContext(ctx context.Context) string {
md, _ := metadata.FromIncomingContext(ctx)
vals := md.Get("x-forwarded-for")
if len(vals) == 0 {
p, ok := peer.FromContext(ctx)
if ok {
ipAndPort := strings.Split(p.Addr.String(), ":")
return ipAndPort[0]
} else {
return ""
}
}
// There are potentially multiple comma separated IPs bundled in that first value
ips := strings.Split(vals[0], ",")
return strings.TrimSpace(ips[0])
}

0 comments on commit 1923e8a

Please sign in to comment.