Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/services/ring/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func TestPlugin_RingStoreIntegration(t *testing.T) {

mappings, version := ringStore.GetWorkflowMappingsBatch(workflows)
require.Len(t, mappings, 3)
require.Equal(t, uint64(3), version)
require.Equal(t, uint64(1), version)
})

t.Run("workflow_transition_detected", func(t *testing.T) {
Expand Down
39 changes: 39 additions & 0 deletions core/services/ring/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,45 @@ func (s *Store) RegisterWorkflowsFromShard(shardID uint32, workflowIDs []string)
s.mappingVersion++
}

// SyncRoutes atomically replaces the routing map with the authoritative set
// from the OCR outcome, pruning any workflow IDs that are no longer present.
func (s *Store) SyncRoutes(routes map[string]uint32) {
s.mu.Lock()
defer s.mu.Unlock()

now := time.Now()
inTransition := !IsInSteadyState(s.currentState)

for wfID, shardID := range routes {
old := s.routingState[wfID]
s.routingState[wfID] = shardID
s.routingStateMeta[wfID] = &MappingMeta{
OldShardID: old,
NewShardID: shardID,
InTransition: inTransition,
UpdatedAt: now,
}
if waiters, ok := s.pendingAllocs[wfID]; ok {
for _, ch := range waiters {
select {
case ch <- shardID:
default:
}
}
delete(s.pendingAllocs, wfID)
}
}

for wfID := range s.routingState {
if _, keep := routes[wfID]; !keep {
delete(s.routingState, wfID)
delete(s.routingStateMeta, wfID)
}
}

s.mappingVersion++
}

func (s *Store) SubmitWorkflowsForAllocation(workflowIDs []string) (dropped int) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
38 changes: 38 additions & 0 deletions core/services/ring/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,44 @@ func TestStore_GetWorkflowMappingsBatch(t *testing.T) {
require.Equal(t, uint64(2), version)
}

func TestStore_SyncRoutes_PrunesStaleEntries(t *testing.T) {
store := NewStore()
store.SetRoutingState(&ringpb.RoutingState{
State: &ringpb.RoutingState_RoutableShards{RoutableShards: 3},
})

store.SetShardForWorkflow("wf-1", 0)
store.SetShardForWorkflow("wf-2", 1)
store.SetShardForWorkflow("wf-3", 2)
require.Len(t, store.GetAllRoutingState(), 3)

store.SyncRoutes(map[string]uint32{"wf-1": 0, "wf-3": 1})

routes := store.GetAllRoutingState()
require.Len(t, routes, 2)
require.Equal(t, uint32(0), routes["wf-1"])
require.Equal(t, uint32(1), routes["wf-3"])
require.NotContains(t, routes, "wf-2")

meta, _ := store.GetWorkflowMappingsBatch([]string{"wf-2"})
require.Empty(t, meta)
}

func TestStore_SyncRoutes_EmptyPrunesAll(t *testing.T) {
store := NewStore()
store.SetRoutingState(&ringpb.RoutingState{
State: &ringpb.RoutingState_RoutableShards{RoutableShards: 2},
})

store.SetShardForWorkflow("wf-1", 0)
store.SetShardForWorkflow("wf-2", 1)
require.Len(t, store.GetAllRoutingState(), 2)

store.SyncRoutes(map[string]uint32{})

require.Empty(t, store.GetAllRoutingState())
}

func TestStore_SubmitWorkflowsForAllocation(t *testing.T) {
store := NewStore()

Expand Down
8 changes: 5 additions & 3 deletions core/services/ring/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint
}
}
if applyRoutes {
for workflowID, route := range outcome.Routes {
t.ringStore.SetShardForWorkflow(workflowID, route.Shard)
t.lggr.Debugw("Updated workflow shard mapping", "workflowID", workflowID, "shard", route.Shard)
flat := make(map[string]uint32, len(outcome.Routes))
for wfID, route := range outcome.Routes {
flat[wfID] = route.Shard
}
t.ringStore.SyncRoutes(flat)
t.lggr.Debugw("Synced workflow shard mappings", "count", len(flat))
} else {
t.lggr.Debugw("Skipping route updates while in transition", "state", outcome.State)
}
Expand Down
46 changes: 46 additions & 0 deletions core/services/ring/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,52 @@ func TestTransmitter_Transmit_ArbiterError(t *testing.T) {
require.ErrorIs(t, err, context.DeadlineExceeded)
}

func TestTransmitter_Transmit_StaleEntriesPruned(t *testing.T) {
lggr := logger.Test(t)
store := NewStore()
mock := &mockArbiterScaler{}
tx := NewTransmitter(lggr, store, mock, "test-account")

outcome1 := &ringpb.Outcome{
State: &ringpb.RoutingState{
Id: 1,
State: &ringpb.RoutingState_RoutableShards{RoutableShards: 3},
},
Routes: map[string]*ringpb.WorkflowRoute{
"wf-1": {Shard: 0},
"wf-2": {Shard: 1},
"wf-3": {Shard: 2},
},
}
outcomeBytes, err := proto.Marshal(outcome1)
require.NoError(t, err)

err = tx.Transmit(t.Context(), types.ConfigDigest{}, 0, ocr3types.ReportWithInfo[[]byte]{Report: outcomeBytes}, nil)
require.NoError(t, err)
require.Len(t, store.GetAllRoutingState(), 3)

outcome2 := &ringpb.Outcome{
State: &ringpb.RoutingState{
Id: 2,
State: &ringpb.RoutingState_RoutableShards{RoutableShards: 3},
},
Routes: map[string]*ringpb.WorkflowRoute{
"wf-1": {Shard: 0},
},
}
outcomeBytes, err = proto.Marshal(outcome2)
require.NoError(t, err)

err = tx.Transmit(t.Context(), types.ConfigDigest{}, 0, ocr3types.ReportWithInfo[[]byte]{Report: outcomeBytes}, nil)
require.NoError(t, err)

routes := store.GetAllRoutingState()
require.Len(t, routes, 1)
require.Equal(t, uint32(0), routes["wf-1"])
require.NotContains(t, routes, "wf-2")
require.NotContains(t, routes, "wf-3")
}

func TestTransmitter_Transmit_NilState(t *testing.T) {
lggr := logger.Test(t)
store := NewStore()
Expand Down
Loading