Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some good logging #525

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions pkg/sync/syncWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ func (s *syncWorker) subscribeToNode(nodeid uint32) {
func (s *syncWorker) subscribeToNodeRegistration(
registration NodeRegistration,
) {

node, err := s.nodeRegistry.GetNode(registration.nodeid)
if err != nil {
// this should never happen
s.log.Error(fmt.Sprintf("Unexpected state: Failed to get node from registry: %v", err))
s.log.Error("Unexpected state: Failed to get node from registry", zap.Uint32("nodeid", registration.nodeid), zap.Error(err))
s.handleUnhealthyNode(registration)
return
}
Expand All @@ -180,11 +181,11 @@ func (s *syncWorker) subscribeToNodeRegistration(
select {
case <-registration.ctx.Done():
// either registry has changed or we are shutting down
s.log.Debug("Context is done. Closing stream and connection")
s.log.Debug("Context is done. Closing stream and connection", zap.String("address", node.HttpAddress))
return
default:
if err != nil {
s.log.Error(fmt.Sprintf("Error: %v, retrying...", err))
s.log.Error("Error connecting to node. Retrying...", zap.String("address", node.HttpAddress), zap.Error(err))
time.Sleep(backoff)
backoff = min(backoff*2, 30*time.Second)
} else {
Expand All @@ -203,7 +204,7 @@ func (s *syncWorker) subscribeToNodeRegistration(
_ = conn.Close()
continue
}
err = s.listenToStream(stream)
err = s.listenToStream(registration.ctx, *node, stream)
_ = stream.stream.CloseSend()
_ = conn.Close()
}
Expand Down Expand Up @@ -272,7 +273,7 @@ func (s *syncWorker) connectToNode(node registry.Node) (*grpc.ClientConn, error)
return nil, fmt.Errorf("failed to connect to peer at %s: %v", node.HttpAddress, err)
}

s.log.Debug(fmt.Sprintf("Successfully connected to peer at %s", node.HttpAddress))
s.log.Debug(fmt.Sprintf("Successfully opened a connection to peer at %s", node.HttpAddress))
return conn, nil
}

Expand Down Expand Up @@ -305,8 +306,14 @@ func (s *syncWorker) setupStream(
},
)
if err != nil {
s.log.Error(
"Failed to batch subscribe to peer",
zap.String("peer", node.HttpAddress),
zap.Error(err),
)
return nil, fmt.Errorf(
"failed to batch subscribe to peer: %v",
"failed to batch subscribe to peer at %s: %v",
node.HttpAddress,
err,
)
}
Expand All @@ -324,6 +331,8 @@ func (s *syncWorker) setupStream(
}

func (s *syncWorker) listenToStream(
_ context.Context,
node registry.Node,
originatorStream *originatorStream,
) error {
recvChan := make(chan *message_api.SubscribeEnvelopesResponse)
Expand All @@ -347,7 +356,7 @@ func (s *syncWorker) listenToStream(
return nil

case envs := <-recvChan:
s.log.Debug("Received envelopes", zap.Any("numEnvelopes", len(envs.Envelopes)))
s.log.Debug("Received envelopes", zap.String("peer", node.HttpAddress), zap.Any("numEnvelopes", len(envs.Envelopes)))
for _, env := range envs.Envelopes {
s.validateAndInsertEnvelope(originatorStream, env)
}
Expand All @@ -358,7 +367,7 @@ func (s *syncWorker) listenToStream(
// let the caller rebuild the stream if required
return nil
}
s.log.Error("Stream closed with error", zap.Error(err))
s.log.Error("Stream closed with error", zap.String("peer", node.HttpAddress), zap.Error(err))
return err
}
}
Expand Down
Loading