Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 106 additions & 18 deletions service/api/sse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import (
"errors"
"fmt"
"strings"
"sync/atomic"

"github.com/splitio/go-split-commons/v9/conf"
"github.com/splitio/go-split-commons/v9/dtos"
Expand All @@ -18,6 +20,13 @@
keepAlive = 70
)

// Client state constants for atomic state management
const (
StateIdle int32 = 0 // Client is idle/not started
StateRunning int32 = 1 // Client is running
StateDestroyed int32 = -1 // Client is destroyed/stopped
)

// StreamingClient interface
type StreamingClient interface {
ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage))
Expand All @@ -27,11 +36,13 @@

// StreamingClientImpl struct
type StreamingClientImpl struct {
sseClient *sse.Client
logger logging.LoggerInterface
lifecycle lifecycle.Manager
metadata dtos.Metadata
clientKey *string
sseClient *sse.Client
logger logging.LoggerInterface
lifecycle lifecycle.Manager
metadata dtos.Metadata
clientKey *string
state atomic.Int32 // Atomic state: 0=Idle, 1=Running, -1=Destroyed
goroutineStarted chan struct{} // Signals when goroutine has started
}

// Status constants
Expand All @@ -54,15 +65,32 @@
metadata: metadata,
clientKey: clientKey,
}
client.state.Store(StateIdle)
client.lifecycle.Setup()
return client
}

// ConnectStreaming connects to streaming
func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) {

Check failure on line 74 in service/api/sse/client.go

View check run for this annotation

SonarQube Pull Requests / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonar.harness.io/project/issues?id=go-split-commons&pullRequest=273&issues=45aaa7bf-4414-4a40-804f-6c3193ed576e&open=45aaa7bf-4414-4a40-804f-6c3193ed576e
fmt.Println("VAMOS A VERRRRRRRR")
// Atomic state check: Only proceed if state is Idle (0)
if s.state.Load() != StateIdle {
s.logger.Info("Client is not in idle state (already running or destroyed). Ignoring")
return
}

if !s.lifecycle.BeginInitialization() {
s.logger.Info("Connection is already in process/running. Ignoring")
s.state.Store(StateIdle) // Reset state since lifecycle check failed
return
}

// Now that lifecycle check passed, atomically transition to Running
// This ensures we only set Running if goroutine will actually be spawned
if !s.state.CompareAndSwap(StateIdle, StateRunning) {
s.logger.Info("Client destroyed during initialization. Aborting")
// Lifecycle was started but we're not spawning goroutine, need to complete it
s.lifecycle.ShutdownComplete()
return
}

Expand All @@ -71,21 +99,65 @@
params["accessToken"] = token
params["v"] = version

// Channel to signal goroutine has started (prevents race with StopStreaming)
// Must be created AFTER lifecycle check to ensure goroutine will be spawned
goroutineStarted := make(chan struct{})
s.goroutineStarted = goroutineStarted

go func() {
defer s.lifecycle.ShutdownComplete()
// Signal that goroutine has started executing
close(goroutineStarted)

defer func() {
s.lifecycle.ShutdownComplete()
// Reset to idle if not destroyed
if s.state.Load() != StateDestroyed {
s.state.Store(StateIdle)
}
}()

// Early exit if client was destroyed while goroutine was starting
if s.state.Load() <= StateIdle {
s.logger.Info("Client state is not valid (destroyed or idle). Exiting goroutine")
return
}

if !s.lifecycle.InitializationComplete() {
return
}

// Helper to send status without blocking if destroyed
sendStatus := func(status int) {
currentState := s.state.Load()
if currentState == StateDestroyed {
// Client is being destroyed, don't block
select {
case streamingStatus <- status:
default:
s.logger.Debug("Client destroyed, skipping status send")
}
return
}
// Normal operation: blocking send is safe
streamingStatus <- status
}

// Final check before starting connection - prevent race if Destroy was called
if s.state.Load() != StateRunning {
s.logger.Info("Client destroyed before connection started. Exiting goroutine")
return
}

firstEventReceived := gtSync.NewAtomicBool(false)
out := s.sseClient.Do(params, api.AddMetadataToHeaders(s.metadata, nil, s.clientKey), func(m IncomingMessage) {
if firstEventReceived.TestAndSet() && !m.IsError() {
streamingStatus <- StatusFirstEventOk
sendStatus(StatusFirstEventOk)
}
handleIncomingMessage(m)
})

if out == nil { // all good
streamingStatus <- StatusDisconnected
sendStatus(StatusDisconnected)
return
}

Expand All @@ -94,35 +166,51 @@

asConnectionFailedError := &sse.ErrConnectionFailed{}
if errors.As(out, &asConnectionFailedError) {
streamingStatus <- StatusConnectionFailed
sendStatus(StatusConnectionFailed)
return
}

switch out {
case sse.ErrNotIdle:
// If this happens we have a bug
streamingStatus <- StatusUnderlyingClientInUse
sendStatus(StatusUnderlyingClientInUse)
case sse.ErrReadingStream:
streamingStatus <- StatusDisconnected
sendStatus(StatusDisconnected)
case sse.ErrTimeout:
streamingStatus <- StatusDisconnected
sendStatus(StatusDisconnected)
default:
}
}()
}

// StopStreaming stops streaming
func (s *StreamingClientImpl) StopStreaming() {
if !s.lifecycle.BeginShutdown() {
s.logger.Info("SSE client wrapper not running. Ignoring")
// Check old state before destroying - we need to know if a goroutine was spawned
oldState := s.state.Swap(StateDestroyed)

// Always shutdown the underlying SSE client to unblock any in-flight connections
s.sseClient.Shutdown(true)

// If we were running, a goroutine exists and will call ShutdownComplete()
if oldState == StateRunning {
// Wait for goroutine to actually start before waiting for completion
// This prevents race where goroutine is queued but not started
if s.goroutineStarted != nil {
<-s.goroutineStarted
}
// Try to begin shutdown (might fail if goroutine still initializing)
s.lifecycle.BeginShutdown()
// But always wait for the goroutine to complete
s.lifecycle.AwaitShutdownComplete()
s.logger.Info("Stopped streaming")
return
}
s.sseClient.Shutdown(true)
s.lifecycle.AwaitShutdownComplete()
s.logger.Info("Stopped streaming")

// No goroutine was spawned, safe to return
s.logger.Info("SSE client wrapper not running. Ignoring")
}

// IsRunning returns true if the client is running
func (s *StreamingClientImpl) IsRunning() bool {
return s.lifecycle.IsRunning()
return s.lifecycle.IsRunning() && s.state.Load() == StateRunning
}
9 changes: 9 additions & 0 deletions synchronizer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,16 @@ func (s *ManagerImpl) Start() {
}

func (s *ManagerImpl) stop() {
fmt.Println("---Inside stop---")
if s.pushManager != nil {
fmt.Println("---Inside stop--- s.pushManager != nil ")
s.pushManager.Stop()
}
fmt.Println("---Inside stop---StopPeriodicFetching()")
s.synchronizer.StopPeriodicFetching()
fmt.Println("---Inside stop---StopPeriodicDataRecording()")
s.synchronizer.StopPeriodicDataRecording()
fmt.Println("---Inside stop---ShutdownComplete()")
s.lifecycle.ShutdownComplete()
}

Expand All @@ -185,12 +190,16 @@ func (s *ManagerImpl) Stop() {
}

func (s *ManagerImpl) pushStatusWatcher() {
fmt.Println("---Inside pushStatusWatcher before stop---")
defer s.stop()
fmt.Println("---Inside pushStatusWatcher---After stop")
for {
select {
case <-s.lifecycle.ShutdownRequested():
fmt.Println("---Inside pushStatusWatcher---ShutdownRequested")
return
case status := <-s.streamingStatus:
fmt.Println("---Inside pushStatusWatcher---streamingStatus ", status)
switch status {
case push.StatusUp:
s.stopPolling()
Expand Down
Loading