Skip to content

Commit

Permalink
enhancement(5423): added SetDuration function, added mock scheduler t…
Browse files Browse the repository at this point in the history
…o tests, simplified scheduler usage
  • Loading branch information
kaanyalti committed Feb 7, 2025
1 parent b04617d commit f606ca8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 19 deletions.
20 changes: 9 additions & 11 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@ const (

// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
ErrDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit
Backoff: backoffSettings{ // time after a failed call
Init: 60 * time.Second,
Max: 10 * time.Minute,
},
}

type fleetGatewaySettings struct {
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
ErrDuration time.Duration
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
ErrConsecutiveUnauthDuration time.Duration
}

type backoffSettings struct {
Expand Down Expand Up @@ -361,16 +361,14 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
f.unauthCounter++
if f.shouldUseLongSched() {
f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter)
f.scheduler = scheduler.NewPeriodic(defaultGatewaySettings.ErrDuration)
f.scheduler.SetDuration(defaultGatewaySettings.ErrConsecutiveUnauthDuration)
return &fleetapi.CheckinResponse{}, took, nil
}

return nil, took, err
}

if _, ok := f.scheduler.(*scheduler.PeriodicJitter); !ok {
f.scheduler = scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
}
f.scheduler.SetDuration(defaultGatewaySettings.Duration)

f.unauthCounter = 0
if err != nil {
Expand Down
46 changes: 38 additions & 8 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,13 +566,37 @@ func TestAgentStateToString(t *testing.T) {
}
}

type MockScheduler struct {
Duration time.Duration
Ticker *time.Ticker
}

func (m *MockScheduler) WaitTick() <-chan time.Time {
return m.Ticker.C
}

func (m *MockScheduler) SetDuration(d time.Duration) {
m.Duration = d
}

func (m *MockScheduler) Stop() {
m.Ticker.Stop()
}

func TestFleetGatewaySchedulerSwitch(t *testing.T) {
agentInfo := &testAgentInfo{}
settings := &fleetGatewaySettings{
Duration: 5 * time.Second,
Backoff: backoffSettings{Init: 1 * time.Second, Max: 5 * time.Second},
Duration: 1 * time.Second,
Backoff: backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond},
}

tempSet := *defaultGatewaySettings
defaultGatewaySettings.Duration = 500 * time.Millisecond
defaultGatewaySettings.ErrConsecutiveUnauthDuration = 700 * time.Millisecond
defer func() {
*defaultGatewaySettings = tempSet
}()

t.Run("if unauthorized responses exceed the set limit, the scheduler should be switched to the long-wait scheduler", withGateway(agentInfo, settings, func(
t *testing.T,
gateway coordinator.FleetGateway,
Expand All @@ -589,8 +613,12 @@ func TestFleetGatewaySchedulerSwitch(t *testing.T) {
clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)
g.scheduler = scheduler.NewPeriodicJitter(time.Second, time.Second)

ms := &MockScheduler{
Duration: defaultGatewaySettings.Duration,
Ticker: time.NewTicker(defaultGatewaySettings.Duration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

for i := 0; i <= maxUnauthCounter; i++ {
Expand All @@ -601,8 +629,7 @@ func TestFleetGatewaySchedulerSwitch(t *testing.T) {
err := <-errCh
require.NoError(t, err)

_, ok = g.scheduler.(*scheduler.Periodic)
require.True(t, ok)
require.Equal(t, ms.Duration, defaultGatewaySettings.ErrConsecutiveUnauthDuration)
}))

t.Run("should switch back to short-wait scheduler if the a successful response is received", withGateway(agentInfo, settings, func(
Expand All @@ -622,8 +649,12 @@ func TestFleetGatewaySchedulerSwitch(t *testing.T) {
clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)
g.scheduler = scheduler.NewPeriodic(time.Second)

ms := &MockScheduler{
Duration: defaultGatewaySettings.ErrConsecutiveUnauthDuration,
Ticker: time.NewTicker(defaultGatewaySettings.ErrConsecutiveUnauthDuration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

<-clientWaitFn
Expand All @@ -632,7 +663,6 @@ func TestFleetGatewaySchedulerSwitch(t *testing.T) {
err := <-errCh
require.NoError(t, err)

_, ok = g.scheduler.(*scheduler.PeriodicJitter)
require.True(t, ok)
require.Equal(t, ms.Duration, defaultGatewaySettings.Duration)
}))
}
12 changes: 12 additions & 0 deletions internal/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type Scheduler interface {
WaitTick() <-chan time.Time
Stop()
SetDuration(time.Duration)
}

// Stepper is a scheduler where each Tick is manually triggered, this is useful in scenario
Expand All @@ -32,6 +33,9 @@ func (s *Stepper) WaitTick() <-chan time.Time {
return s.C
}

// Sets the wait duration for the scheduler. Noop for stepper scheduler
func (s *Stepper) SetDuration(_ time.Duration) {}

// Stop is stopping the scheduler, in the case of the Stepper scheduler nothing is done.
func (s *Stepper) Stop() {}

Expand Down Expand Up @@ -68,6 +72,10 @@ func (p *Periodic) WaitTick() <-chan time.Time {
return rC
}

func (p *Periodic) SetDuration(d time.Duration) {
p.Ticker = time.NewTicker(d)
}

// Stop stops the internal Ticker.
// Note this will not close the internal channel is up to the developer to unblock the goroutine
// using another mechanism.
Expand Down Expand Up @@ -123,6 +131,10 @@ func (p *PeriodicJitter) WaitTick() <-chan time.Time {
return p.C
}

func (p *PeriodicJitter) SetDuration(d time.Duration) {
p.d = d
}

// Stop stops the PeriodicJitter scheduler.
func (p *PeriodicJitter) Stop() {
close(p.done)
Expand Down

0 comments on commit f606ca8

Please sign in to comment.