Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 20 additions & 31 deletions core/services/workflows/syncer/v2/contract_workflow_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,20 @@ func (c *ContractWorkflowSource) ListWorkflowMetadata(ctx context.Context, don c
}

for _, wfMeta := range workflows.List {
// Log warnings for incomplete metadata but don't skip processing
c.validateWorkflowMetadata(wfMeta)
// Skip workflows with incomplete/invalid metadata - this can indicate stale metadata
// from deleted workflows in the contract (known contract bug where deleted workflows
// aren't fully removed from contract state)
if !isValidWorkflowMetadata(wfMeta) {
c.lggr.Warnw("Workflow has incomplete metadata from contract, skipping",
"source", ContractWorkflowSourceName,
"workflowID", hex.EncodeToString(wfMeta.WorkflowId[:]),
"workflowName", wfMeta.WorkflowName,
"owner", hex.EncodeToString(wfMeta.Owner.Bytes()),
"binaryURL", wfMeta.BinaryUrl,
"configURL", wfMeta.ConfigUrl,
"status", wfMeta.Status)
continue
}

// TODO: https://smartcontract-it.atlassian.net/browse/CAPPL-1021 load balance across workflow nodes in DON Family
allWorkflows = append(allWorkflows, WorkflowMetadataView{
Expand Down Expand Up @@ -216,33 +228,10 @@ func (c *ContractWorkflowSource) newWorkflowRegistryContractReader(ctx context.C
return reader, nil
}

// validateWorkflowMetadata logs warnings for incomplete workflow metadata from contract.
func (c *ContractWorkflowSource) validateWorkflowMetadata(wfMeta workflow_registry_wrapper_v2.WorkflowRegistryWorkflowMetadataView) {
if isEmptyWorkflowID(wfMeta.WorkflowId) {
c.lggr.Warnw("Workflow has empty WorkflowID from contract",
"source", ContractWorkflowSourceName,
"workflowName", wfMeta.WorkflowName,
"owner", hex.EncodeToString(wfMeta.Owner.Bytes()),
"binaryURL", wfMeta.BinaryUrl,
"configURL", wfMeta.ConfigUrl)
}

if len(wfMeta.Owner.Bytes()) == 0 {
c.lggr.Warnw("Workflow has empty Owner from contract",
"source", ContractWorkflowSourceName,
"workflowID", hex.EncodeToString(wfMeta.WorkflowId[:]),
"workflowName", wfMeta.WorkflowName,
"binaryURL", wfMeta.BinaryUrl,
"configURL", wfMeta.ConfigUrl)
}

if wfMeta.BinaryUrl == "" || wfMeta.ConfigUrl == "" {
c.lggr.Warnw("Workflow has empty BinaryURL or ConfigURL from contract",
"source", ContractWorkflowSourceName,
"workflowID", hex.EncodeToString(wfMeta.WorkflowId[:]),
"workflowName", wfMeta.WorkflowName,
"owner", hex.EncodeToString(wfMeta.Owner.Bytes()),
"binaryURL", wfMeta.BinaryUrl,
"configURL", wfMeta.ConfigUrl)
}
// isValidWorkflowMetadata checks if workflowID and workflowOwner are valid
// in the metadata pulled from the contract. In the case of contract deletion bugs
// (where deleted workflows retain stale metadata with zero addresses), this func
// filters out noisy deploys/workflow events.
func isValidWorkflowMetadata(wfMeta workflow_registry_wrapper_v2.WorkflowRegistryWorkflowMetadataView) bool {
return !isEmptyWorkflowID(wfMeta.WorkflowId) && !isZeroOwner(wfMeta.Owner.Bytes())
}
9 changes: 9 additions & 0 deletions core/services/workflows/syncer/v2/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"maps"
"math/big"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -742,6 +743,14 @@ func isEmptyWorkflowID(wfID [32]byte) bool {
return wfID == emptyID
}

// isZeroOwner checks if a workflow owner address is the zero address (all zeros).
// This can indicate stale metadata from deleted workflows in the contract - there's a known
// bug where deleted workflows aren't always fully removed from the contract state.
func isZeroOwner(owner []byte) bool {
// does not contain non-zero bytes
return !slices.ContainsFunc(owner, func(b byte) bool { return b != 0 })
}

// newAllowlistedRequestsContractReader creates a contract reader specifically for fetching
// allowlisted requests from the WorkflowRegistry contract. This is used by Vault DON nodes
// to verify that incoming vault requests have been pre-authorized on-chain by workflow owners.
Expand Down
37 changes: 37 additions & 0 deletions core/services/workflows/syncer/v2/workflow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,3 +1341,40 @@ func Test_PerSourceReconciliation_FailureIsolation(t *testing.T) {
require.Equal(t, wfTypes.WorkflowID(wfIDGrpc2), grpcEvents[0].Data.(WorkflowDeletedEvent).WorkflowID)
})
}

func Test_isZeroOwner(t *testing.T) {
t.Run("returns true for nil slice", func(t *testing.T) {
require.True(t, isZeroOwner(nil))
})

t.Run("returns true for empty slice", func(t *testing.T) {
require.True(t, isZeroOwner([]byte{}))
})

t.Run("returns true for all zeros (20 bytes - Ethereum address)", func(t *testing.T) {
zeroAddress := make([]byte, 20)
require.True(t, isZeroOwner(zeroAddress))
})

t.Run("returns true for all zeros (arbitrary length)", func(t *testing.T) {
zeros := make([]byte, 32)
require.True(t, isZeroOwner(zeros))
})

t.Run("returns false for valid owner address", func(t *testing.T) {
validOwner, _ := hex.DecodeString("1234567890123456789012345678901234567890")
require.False(t, isZeroOwner(validOwner))
})

t.Run("returns false for address with single non-zero byte", func(t *testing.T) {
almostZero := make([]byte, 20)
almostZero[19] = 1 // last byte is 1
require.False(t, isZeroOwner(almostZero))
})

t.Run("returns false for address with non-zero first byte", func(t *testing.T) {
almostZero := make([]byte, 20)
almostZero[0] = 1 // first byte is 1
require.False(t, isZeroOwner(almostZero))
})
}
Loading