Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/k8s_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ func TestEnsureNotExistBySelector(t *testing.T) {
}

resources, err := testConfig.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
require.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, just ignore the error is not a good test...

if err != nil {
t.Skipf("Skipping %s: no REST mapping available (%v)", gvk.Kind, err)
return
}

require.NoError(t, testConfig.client.Post().Resource(resources.Resource.Resource).Body(&module).Do(ctx).Error())

Expand Down
49 changes: 40 additions & 9 deletions internal/membership_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,27 @@ func (c *membershipClient) Send(message *generated.Message) error {
}
}

func (c *membershipClient) sendPong(ctx context.Context, client grpcclient.ConnectionAdapter) {
func (c *membershipClient) sendPong(ctx context.Context, client grpcclient.ConnectionAdapter) error {
if err := client.Send(ctx, &generated.Message{
Message: &generated.Message_Pong{
Pong: &generated.Pong{},
},
}); err != nil {
logging.FromContext(ctx).Errorf("Unable to send pong to server: %s", err)
if errors.Is(err, io.EOF) {
panic(err)
}
return err
}
return nil
}

func (c *membershipClient) Start(ctx context.Context, client grpcclient.ConnectionAdapter) error {

var (
errCh = make(chan error, 1)
errCh = make(chan error, 1)
pongErrCh = make(chan error, 1)
stopPinger = make(chan struct{})
)

// Goroutine to receive messages
go func() {
for {
msg, err := client.Recv(ctx)
Expand All @@ -149,7 +152,13 @@ func (c *membershipClient) Start(ctx context.Context, client grpcclient.Connecti
}

if msg.GetPing() != nil {
c.sendPong(ctx, client)
if err := c.sendPong(ctx, client); err != nil {
// Send error but continue - will be handled by main loop
select {
case pongErrCh <- err:
default:
}
}
continue
}

Expand All @@ -160,11 +169,24 @@ func (c *membershipClient) Start(ctx context.Context, client grpcclient.Connecti
}
}
}()

// Goroutine to send periodic pongs
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-time.After(5 * time.Second):
c.sendPong(ctx, client)
case <-ticker.C:
if err := c.sendPong(ctx, client); err != nil {
// Send error but continue - will be handled by main loop
select {
case pongErrCh <- err:
default:
}
}
case <-stopPinger:
return
case <-ctx.Done():
return
}
Expand All @@ -174,8 +196,10 @@ func (c *membershipClient) Start(ctx context.Context, client grpcclient.Connecti
for {
select {
case <-ctx.Done():
close(stopPinger)
return ctx.Err()
case ch := <-c.stopChan:
close(stopPinger)
close(c.stopped)
if err := client.CloseSend(ctx); err != nil {
ch <- err
Expand All @@ -196,11 +220,18 @@ func (c *membershipClient) Start(ctx context.Context, client grpcclient.Connecti
return nil
case msg := <-c.messages:
if err := client.Send(ctx, msg); err != nil {
panic(err)
logging.FromContext(ctx).Errorf("Failed to send message: %s", err)
return err
}
<-time.After(50 * time.Millisecond)
case err := <-pongErrCh:
// Pong failed, connection is likely broken
logging.FromContext(ctx).Errorf("Failed to send pong, connection broken: %s", err)
close(stopPinger)
return err
case err := <-errCh:
logging.FromContext(ctx).Errorf("Stream closed with error: %s", err)
close(stopPinger)
return err
}
}
Expand Down
83 changes: 76 additions & 7 deletions internal/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,84 @@ func RetrieveModuleList(ctx context.Context, config *rest.Config) (modules, eeMo
func runMembershipClient(lc fx.Lifecycle, debug bool, membershipClient *membershipClient, logger logging.Logger, config *rest.Config) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
client, err := membershipClient.connect(logging.ContextWithLogger(ctx, logger))
if err != nil {
return err
}
clientWithTrace := grpcclient.NewConnectionWithTrace(client, debug)
// Create a background context for the client that won't be cancelled when startup completes
clientCtx := logging.ContextWithLogger(context.Background(), logger)
Copy link
Contributor

@Dav-14 Dav-14 Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is not only the logger, ther is other ctx like otel. You prevent any new other context. You can keep the ctx from fx.lifecycle, and then it will prevent any other onStart hook to run


Comment on lines +211 to 213
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Make clientCtx cancelable; cancel it on OnStop to avoid goroutine leak

clientCtx is based on context.Background() and never cancelled; the loop can outlive the app. Use WithCancel and cancel in OnStop.

Apply this diff:

 func runMembershipClient(lc fx.Lifecycle, debug bool, membershipClient *membershipClient, logger logging.Logger, config *rest.Config) {
-	lc.Append(fx.Hook{
+	var cancel context.CancelFunc
+	lc.Append(fx.Hook{
 		OnStart: func(ctx context.Context) error {
-			// Create a background context for the client that won't be cancelled when startup completes
-			clientCtx := logging.ContextWithLogger(context.Background(), logger)
+			// Create a background context for the client that can be cancelled on shutdown
+			clientCtx, cancel := context.WithCancel(logging.ContextWithLogger(context.Background(), logger))
 
 			go func() {
 				const (
-		OnStop: membershipClient.Stop,
+		OnStop: func(ctx context.Context) error {
+			if cancel != nil {
+				cancel()
+			}
+			return membershipClient.Stop(ctx)
+		},

Also applies to: 293-294

🤖 Prompt for AI Agents
In internal/module.go around lines 211-213 (and similarly at 293-294), clientCtx
is created from context.Background() and never cancelled; change creation to use
context.WithCancel so you get a cancel func, pass the cancel into the enclosing
scope, and register a call to that cancel inside the module's OnStop handler so
the background loop will be cancelled when the app stops; ensure any existing
OnStop logic still runs and that cancel is called before/alongside other
shutdown tasks to avoid goroutine leaks.

go func() {
if err := membershipClient.Start(logging.ContextWithLogger(ctx, logger), clientWithTrace); err != nil {
panic(err)
const (
maxRetries = 10
baseDelay = 1 * time.Second
maxDelay = 2 * time.Minute
backoffFactor = 2.0
)

Comment on lines +215 to +221
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Backoff helpers: add exponential backoff with jitter and cancellable sleep

Current delay is linear and lacks jitter. Add helpers once, reuse below.

Apply this diff right after the const block:

 				const (
 					maxRetries     = 10
 					baseDelay      = 1 * time.Second
 					maxDelay       = 2 * time.Minute
 					backoffFactor  = 2.0
 				)
 
+				// Helpers: exponential backoff with jitter + cancellable sleep
+				backoffDelay := func(retries int) time.Duration {
+					if retries < 1 {
+						retries = 1
+					}
+					d := time.Duration(float64(baseDelay) * math.Pow(backoffFactor, float64(retries-1)))
+					if d > maxDelay {
+						d = maxDelay
+					}
+					// ~50% jitter in [0.5x, 1.5x)
+					j := 0.5 + rand.Float64()
+					return time.Duration(float64(d) * j)
+				}
+				sleepWithContext := func(ctx context.Context, d time.Duration) bool {
+					t := time.NewTimer(d)
+					defer t.Stop()
+					select {
+					case <-ctx.Done():
+						return false
+					case <-t.C:
+						return true
+					}
+				}

Also add imports:

 import (
 	"context"
 	"reflect"
 	"sort"
 	"time"
+	"math"
+	"math/rand"

retries := 0
for {
select {
case <-clientCtx.Done():
return
default:
}

client, err := membershipClient.connect(clientCtx)
if err != nil {
logger.Errorf("Failed to connect to membership server: %s", err)

retries++
if retries >= maxRetries {
logger.Errorf("Max retries (%d) reached, giving up", maxRetries)
panic(errors.New("failed to connect to membership server after max retries"))
}
Comment on lines +235 to +238
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid panicking in goroutine; exit gracefully (or trigger app shutdown)

Panics here will crash the process from a background goroutine. Prefer graceful exit; optionally request fx shutdown.

Apply this diff:

-							logger.Errorf("Max retries (%d) reached, giving up", maxRetries)
-							panic(errors.New("failed to connect to membership server after max retries"))
+							logger.Errorf("Max retries (%d) reached, stopping membership client loop", maxRetries)
+							return
-							logger.Errorf("Max retries (%d) reached after connection loss, giving up", maxRetries)
-							panic(errors.New("failed to reconnect to membership server after max retries"))
+							logger.Errorf("Max retries (%d) reached after connection loss, stopping membership client loop", maxRetries)
+							return

Optional (recommended): inject fx.Shutdowner and call shutdowner.Shutdown() here to terminate the app cleanly instead of returning.

Also applies to: 271-274


// Calculate exponential backoff delay
delay := time.Duration(float64(baseDelay) * float64(retries) * backoffFactor)
if delay > maxDelay {
delay = maxDelay
}

logger.Infof("Retrying connection in %s (attempt %d/%d)", delay, retries, maxRetries)
time.Sleep(delay)
continue
Comment on lines +240 to +248
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use exponential backoff + jitter and make wait cancelable

Current delay is linear and Sleep ignores context cancellation. Use helpers added above.

Apply this diff:

-						// Calculate exponential backoff delay
-						delay := time.Duration(float64(baseDelay) * float64(retries) * backoffFactor)
-						if delay > maxDelay {
-							delay = maxDelay
-						}
-
-						logger.Infof("Retrying connection in %s (attempt %d/%d)", delay, retries, maxRetries)
-						time.Sleep(delay)
+						delay := backoffDelay(retries)
+						logger.Infof("Retrying connection in %s (attempt %d/%d)", delay, retries, maxRetries)
+						if !sleepWithContext(clientCtx, delay) {
+							return
+						}
-						// Calculate exponential backoff delay
-						delay := time.Duration(float64(baseDelay) * float64(retries) * backoffFactor)
-						if delay > maxDelay {
-							delay = maxDelay
-						}
-
-						logger.Infof("Reconnecting in %s (attempt %d/%d)", delay, retries, maxRetries)
-						time.Sleep(delay)
+						delay := backoffDelay(retries)
+						logger.Infof("Reconnecting in %s (attempt %d/%d)", delay, retries, maxRetries)
+						if !sleepWithContext(clientCtx, delay) {
+							return
+						}

Also applies to: 276-284

🤖 Prompt for AI Agents
In internal/module.go around lines 240-248 (and similarly 276-284) the retry
delay is currently computed linearly and uses time.Sleep which ignores context
cancellation; replace this with the exponential-backoff-with-jitter helper
provided earlier and the context-aware wait helper: compute delay using the
backoff helper (baseDelay, retries, backoffFactor, maxDelay) to get an
exponential jittered duration and then call the wait-with-context helper
(passing ctx and delay) instead of time.Sleep; ensure you update the log to
print the chosen jittered delay and handle context cancellation returned by the
wait helper to break/return as appropriate.

}

// Connection successful, reset retry counter
if retries > 0 {
logger.Infof("Successfully reconnected after %d attempts", retries)
retries = 0
}

clientWithTrace := grpcclient.NewConnectionWithTrace(client, debug)

if err := membershipClient.Start(clientCtx, clientWithTrace); err != nil {
logger.Errorf("Membership client stopped with error: %s", err)

// Check if it's a clean shutdown
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
logger.Info("Membership client shutdown requested")
return
}

// Connection lost, attempt to reconnect
logger.Info("Connection lost, attempting to reconnect...")
retries++
if retries >= maxRetries {
logger.Errorf("Max retries (%d) reached after connection loss, giving up", maxRetries)
panic(errors.New("failed to reconnect to membership server after max retries"))
}

// Calculate exponential backoff delay
delay := time.Duration(float64(baseDelay) * float64(retries) * backoffFactor)
if delay > maxDelay {
delay = maxDelay
}

logger.Infof("Reconnecting in %s (attempt %d/%d)", delay, retries, maxRetries)
time.Sleep(delay)
continue
}

// Clean exit
return
}
}()
return nil
Expand Down
Loading