Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/hungry-flies-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
"chainlink": patch
---
# added bump llo with Quote stream support and TimeResolution channel detection on evm_abi_encode_unpacked type
46 changes: 46 additions & 0 deletions core/services/llo/observation/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) {
wg.Wait()
elapsed = time.Since(startTS)

d.removeIncompleteGroups(lggr, observedValues, osv.streamValues)

d.cache.AddMany(observedValues, 4*osv.observationTimeout)

// notify the caller that we've completed our first round of observations.
Expand Down Expand Up @@ -301,6 +303,50 @@ func (d *dataSource) Close() error {
return nil
}

// removeIncompleteGroups enforces all-or-nothing (atomic) writes per pipeline group.
// Some pipelines produce values that must be used together. For example jobs that output a bid/mid/ask
// must be used together to form a quote. So if any stream in the group failed, we drop
// the entire group to avoid writing a mix of fresh and stale values to the cache.
// Mutates observedValues in place.
func (d *dataSource) removeIncompleteGroups(lggr logger.Logger, observedValues map[streams.StreamID]llo.StreamValue, streamValues llo.StreamValues) {
checked := make(map[streams.Pipeline]bool)
for streamID := range observedValues {
// we only need to check the pipeline once per group. So if we've already checked this pipeline, skip it.
p, exists := d.registry.Get(streamID)
if !exists || checked[p] {
continue
}
checked[p] = true

// Check that every in-scope stream for this pipeline succeeded.
// This is because some pipelines might emit values for streams that the plugin is not requesting to be observed
var missing []streams.StreamID
for _, sid := range p.StreamIDs() {
if _, inScope := streamValues[sid]; !inScope {
continue // not requested this cycle so we can skip evaluating result
}
if _, ok := observedValues[sid]; !ok {
missing = append(missing, sid)
}
}

if len(missing) > 0 {
var dropped []streams.StreamID
for _, sid := range p.StreamIDs() {
if _, ok := observedValues[sid]; ok {
dropped = append(dropped, sid)
}
delete(observedValues, sid)
}
lggr.Debugw("Discarding incomplete pipeline group",
"pipelineStreamIDs", p.StreamIDs(),
"missingStreamIDs", missing,
"droppedStreamIDs", dropped,
)
}
}
}

type observableStreamValues struct {
opts llo.DSOpts
streamValues llo.StreamValues
Expand Down
17 changes: 10 additions & 7 deletions core/services/llo/observation/observation_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,16 @@ func (oc *observationContext) run(ctx context.Context, streamID streams.StreamID
ex, isExecuting := oc.executions[p]
if isExecuting {
oc.executionsMu.Unlock()
// wait for it to finish
select {
case <-ex.done:
return ex.run, ex.trrs, ex.err
case <-ctx.Done():
return nil, nil, ctx.Err()
}
// We intentionally do NOT select on ctx.Done() here.
// BridgeTask uses overtimeContext (context.WithoutCancel) which
// detaches from the caller's deadline, so p.Run can still be
// in-flight after ctx expires. If waiters bail early via
// ctx.Done(), they return an error while the executor may later
// succeed. This results in some streams from the pipeline having values
// while others do not. Blocking on ex.done ensures all goroutines for
// the same pipeline receive the identical (run, trrs, err) tuple.
<-ex.done
return ex.run, ex.trrs, ex.err
}

// execute here
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ require (
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7
github.com/smartcontractkit/chainlink-common/keystore v1.0.2-0.20260217160002-b56cb5356cc7
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10
github.com/smartcontractkit/chainlink-data-streams v0.1.11
github.com/smartcontractkit/chainlink-data-streams v0.1.12-0.20260217193813-7cada3d4dcdc
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260217171105-755485c4e00f
github.com/smartcontractkit/chainlink-evm/contracts/cre/gobindings v0.0.0-20260107191744-4b93f62cffe3
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251222115927-36a18321243c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1191,8 +1191,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340 h1:PsjEI+5jZIz9AS4eOsLS5VpSWJINf38clXV3wryPyMk=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340/go.mod h1:P/0OSXUlFaxxD4B/P6HWbxYtIRmmWGDJAvanq19879c=
github.com/smartcontractkit/chainlink-data-streams v0.1.11 h1:yBzjU0Cu8AcfuM858G4xcQIulfNQkPfpUs5FDxX9UaY=
github.com/smartcontractkit/chainlink-data-streams v0.1.11/go.mod h1:8rUcGhjeXBoTFx2MynWgXiBWzVSB+LXd9JR6m8y2FfQ=
github.com/smartcontractkit/chainlink-data-streams v0.1.12-0.20260217193813-7cada3d4dcdc h1:dPZuoqm4tGnjaKaR4qh9BDGqjDe/qzkAL+qybBgW9oE=
github.com/smartcontractkit/chainlink-data-streams v0.1.12-0.20260217193813-7cada3d4dcdc/go.mod h1:8rUcGhjeXBoTFx2MynWgXiBWzVSB+LXd9JR6m8y2FfQ=
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260217171105-755485c4e00f h1:29dXt+8hnpSK42N/O+u9Wn3rH8ZTeLYAj7jhbdZvpnI=
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260217171105-755485c4e00f/go.mod h1:vf3/QOdXKTaNzrD2h2wzklHk+MvDC+J0kKmqdIz4www=
github.com/smartcontractkit/chainlink-evm/contracts/cre/gobindings v0.0.0-20260107191744-4b93f62cffe3 h1:V22ITnWmgBAyxH+VVVo1jxm/LeJ3jcVMCVYB+zLN5mU=
Expand Down
Loading