Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
go-version-file: ./go.mod
- name: Lint
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.61.0
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v2.1.2
./bin/golangci-lint run --verbose

test-linux-race:
Expand Down
4 changes: 2 additions & 2 deletions engine_number.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (n *numbers) Search(ctx context.Context, variable string, input any, result
func (n *numbers) Add(ctx context.Context, p ExpressionPart) error {
// If this is not equals, ignore.
if p.Predicate.Operator == operators.NotEquals {
return fmt.Errorf("Number engine does not support !=")
return fmt.Errorf("number engine does not support !=")
}

// Add the number to the btree.
Expand Down Expand Up @@ -185,7 +185,7 @@ func (n *numbers) Add(ctx context.Context, p ExpressionPart) error {
func (n *numbers) Remove(ctx context.Context, p ExpressionPart) error {
// If this is not equals, ignore.
if p.Predicate.Operator == operators.NotEquals {
return fmt.Errorf("Number engine does not support !=")
return fmt.Errorf("number engine does not support !=")
}

// Add the number to the btree.
Expand Down
101 changes: 86 additions & 15 deletions engine_stringmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (

func newStringEqualityMatcher(concurrency int64) MatchingEngine {
return &stringLookup{
lock: &sync.RWMutex{},
vars: map[string]struct{}{},
equality: variableMap{},
inequality: inequalityMap{},
lock: &sync.RWMutex{},
vars: map[string]struct{}{},
equality: variableMap{},
inequality: inequalityMap{},
// in stores all `in` operators, eg `"foo" in vars.a`. This lets us
// properly iterate over variables for in equaltiy matching.
in: variableMap{},
concurrency: concurrency,
}
}
Expand Down Expand Up @@ -49,6 +52,9 @@ type stringLookup struct {
// this performs string equality lookups.
equality variableMap

// in stores a list of all variables mapped to `in` operators, performing lookups across arrays.
in variableMap

// inequality stores all variables referenced within inequality checks mapped to the value,
// which is then mapped to expression parts.
//
Expand Down Expand Up @@ -76,16 +82,32 @@ func (n *stringLookup) Match(ctx context.Context, input map[string]any, result *
}

// default to an empty string
str := ""
if res := x.Get(input); len(res) > 0 {
if value, ok := res[0].(string); ok {
str = value
}
res := x.Get(input)
if len(res) == 0 {
res = []any{""}
}

opt := n.equalitySearch(ctx, path, str, result)
var optimized int32
switch val := res[0].(type) {
case string:
if n.equalitySearch(ctx, path, val, result) {
atomic.AddInt32(&optimized, 1)
}
case []any:
for _, item := range val {
if n.inSearch(ctx, path, item, result) {
atomic.AddInt32(&optimized, 1)
}
}
case []string:
for _, item := range val {
if n.inSearch(ctx, path, item, result) {
atomic.AddInt32(&optimized, 1)
}
}
}

if opt {
if optimized > 0 {
// Set optimized to true in every case.
atomic.AddInt32(&neqOptimized, 1)
}
Expand Down Expand Up @@ -128,11 +150,18 @@ func (n *stringLookup) Match(ctx context.Context, input map[string]any, result *
//
// Note that Search does not match inequality items.
func (n *stringLookup) Search(ctx context.Context, variable string, input any, result *MatchResult) {
str, ok := input.(string)
if !ok {
return
switch val := input.(type) {
case string:
n.equalitySearch(ctx, variable, val, result)
case []any:
for _, item := range val {
n.inSearch(ctx, variable, item, result)
}
case []string:
for _, item := range val {
n.inSearch(ctx, variable, item, result)
}
}
n.equalitySearch(ctx, variable, str, result)
}

func (n *stringLookup) equalitySearch(ctx context.Context, variable string, input string, result *MatchResult) (neqOptimized bool) {
Expand All @@ -155,6 +184,26 @@ func (n *stringLookup) equalitySearch(ctx context.Context, variable string, inpu
return neqOptimized
}

func (n *stringLookup) inSearch(ctx context.Context, variable string, input any, result *MatchResult) (neqOptimized bool) {
str, ok := input.(string)
if !ok {
return
}

hashedInput := n.hash(str)
for _, part := range n.in[hashedInput] {
if part.Ident != nil && *part.Ident != variable {
// The variables don't match.
continue
}
if part.GroupID.Flag() != OptimizeNone {
neqOptimized = true
}
result.Add(part.EvaluableID, part.GroupID)
}
return
}

// inequalitySearch performs lookups for != matches.
func (n *stringLookup) inequalitySearch(ctx context.Context, variable string, input string, neqOptimized bool, result *MatchResult) (matched []*StoredExpressionPart) {
if len(n.inequality[variable]) == 0 {
Expand Down Expand Up @@ -232,6 +281,28 @@ func (n *stringLookup) Add(ctx context.Context, p ExpressionPart) error {

n.inequality[p.Predicate.Ident][val] = append(n.inequality[p.Predicate.Ident][val], p.ToStored())
return nil

case operators.In:
// If this is an "in" operator, take the predicate's literal and ensure that we
// check appropriately.

switch v := p.Predicate.Literal.(type) {
case string:
// Assume that we're going to match an array in the event.

n.lock.Lock()
defer n.lock.Unlock()
val := n.hash(v)

n.vars[p.Predicate.Ident] = struct{}{}

if _, ok := n.in[val]; !ok {
n.in[val] = []*StoredExpressionPart{p.ToStored()}
return nil
}
n.in[val] = append(n.in[val], p.ToStored())
}

default:
return fmt.Errorf("StringHash engines only support string equality/inequality")
}
Expand Down
39 changes: 10 additions & 29 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,28 +634,30 @@ func (a *aggregator[T]) iterGroup(ctx context.Context, node *Node, parsed *Parse
func engineType(p Predicate) EngineType {
// switch on type of literal AND operator type. int64/float64 literals require
// btrees, texts require ARTs, and so on.
switch p.Literal.(type) {
switch v := p.Literal.(type) {
case int, int64, float64:
if p.Operator == operators.NotEquals {
// StringHash is only used for matching on equality.
return EngineTypeNone
}
// return EngineTypeNone
return EngineTypeBTree
case string:
if p.Operator == operators.Equals || p.Operator == operators.NotEquals {
if len(v) == 0 {
return EngineTypeNone
}
// NOTE: operators.In acts as operators.Equals, but iterates over the given
// array to check each item.
if p.Operator == operators.In || p.Operator == operators.Equals || p.Operator == operators.NotEquals {
// StringHash is only used for matching on in/equality.
return EngineTypeStringHash
}
case nil:
// Only allow this if we're not comparing two idents.
// Only allow this if we're not comparing two idents.each element of the array and
if p.LiteralIdent != nil {
return EngineTypeNone
}
return EngineTypeNullMatch
}
// case int64, float64:
// return EngineTypeBTree

return EngineTypeNone
}
Expand Down Expand Up @@ -732,27 +734,6 @@ func isAggregateable(n *Node) bool {
return false
}

switch v := n.Predicate.Literal.(type) {
case string:
if len(v) == 0 {
return false
}
if n.Predicate.Operator == operators.NotEquals {
// NOTE: NotEquals is _not_ supported. This requires selecting all leaf nodes _except_
// a given leaf, iterating over a tree. We may as well execute every expressiona s the difference
// is negligible.
return false
}
// Right now, we only support equality checking.
// TODO: Add GT(e)/LT(e) matching with tree iteration.
return n.Predicate.Operator == operators.Equals
case int, int64, float64:
return true
case nil:
// This is null, which is supported and a simple lookup to check
// if the event's key in question is present and is not nil.
return true
default:
return false
}
// If the engine type is none... this is non-aggregateable
return engineType(*n.Predicate) != EngineTypeNone
}
111 changes: 97 additions & 14 deletions expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func TestAddRemove(t *testing.T) {
require.Equal(t, 1, e.FastLen())
})

t.Run("With a non-aggregateable expression due to inequality/GTE on strings", func(t *testing.T) {
t.Run("neq", func(t *testing.T) {
e := NewAggregateEvaluator(AggregateEvaluatorOpts[testEvaluable]{
Parser: parser,
Eval: testBoolEvaluator,
Expand All @@ -854,40 +854,47 @@ func TestAddRemove(t *testing.T) {

ok, err := e.Add(ctx, tex(`event.data.foo != "no"`))
require.NoError(t, err)
require.Equal(t, ok, float64(0))
require.Equal(t, ok, float64(1))
require.Equal(t, 1, e.Len())
require.Equal(t, 1, e.SlowLen())
require.Equal(t, 0, e.FastLen())
require.Equal(t, 0, e.SlowLen())
require.Equal(t, 1, e.FastLen())
require.Equal(t, 0, e.MixedLen())
})

// Add the same expression again.
ok, err = e.Add(ctx, tex(`event.data.foo >= "no"`))
t.Run("With a non-aggregateable expression due to inequality/GTE on strings", func(t *testing.T) {
e := NewAggregateEvaluator(AggregateEvaluatorOpts[testEvaluable]{
Parser: parser,
Eval: testBoolEvaluator,
Concurrency: 0,
})

ok, err := e.Add(ctx, tex(`event.data.foo >= "no"`))
require.NoError(t, err)
require.Equal(t, ok, float64(0))
require.Equal(t, 2, e.Len())
require.Equal(t, 2, e.SlowLen())
require.Equal(t, 1, e.Len())
require.Equal(t, 1, e.SlowLen())
require.Equal(t, 0, e.FastLen())

// Add a new expression
ok, err = e.Add(ctx, tex(`event.data.another < "no"`))
require.NoError(t, err)
require.Equal(t, ok, float64(0))
require.Equal(t, 3, e.Len())
require.Equal(t, 3, e.SlowLen())
require.Equal(t, 2, e.Len())
require.Equal(t, 2, e.SlowLen())
require.Equal(t, 0, e.FastLen())

// And remove.
err = e.Remove(ctx, tex(`event.data.another < "no"`))
require.NoError(t, err)
require.Equal(t, 2, e.SlowLen())
require.Equal(t, 2, e.Len())
require.Equal(t, 1, e.SlowLen())
require.Equal(t, 1, e.Len())
require.Equal(t, 0, e.FastLen())

// And yeet out another non-existent expression
err = e.Remove(ctx, tex(`event.data.another != "i'm not here" && a != "b"`))
require.Error(t, ErrEvaluableNotFound, err)
require.Equal(t, 2, e.Len())
require.Equal(t, 2, e.SlowLen())
require.Equal(t, 1, e.Len())
require.Equal(t, 1, e.SlowLen())
require.Equal(t, 0, e.FastLen())
})

Expand Down Expand Up @@ -1200,6 +1207,82 @@ func TestMixedEngines(t *testing.T) {
})
}

func TestInMacro(t *testing.T) {
t.Run("string comparisons, where literal is string and var is array", func(t *testing.T) {
ctx := t.Context()

e := newTestEvaluator()

ex := tex(`"abc" in event.data.ids`)
_, err := e.Add(ctx, ex)
require.NoError(t, err)

// As this is a string equality match, this should be a fast expression.
require.EqualValues(t, 1, e.FastLen())
require.EqualValues(t, 0, e.SlowLen())

t.Run("matching", func(t *testing.T) {
found, evalCount, err := e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"ids": []any{"a", "b", "c", 1, false, "abc"},
},
},
})
require.NoError(t, err)
require.EqualValues(t, 1, evalCount)
require.Equal(t, 1, len(found))
require.Equal(t, ex, found[0])
})

t.Run("not matching", func(t *testing.T) {
found, evalCount, err := e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"ids": []string{"a", "b", "c"},
},
},
})
require.NoError(t, err)
require.EqualValues(t, 0, evalCount)
require.Equal(t, 0, len(found))
})

t.Run("compound", func(t *testing.T) {
ex := tex(`event.data.status == "ok" && "order_xyz" in event.data.ids`)
_, err := e.Add(ctx, ex)
require.NoError(t, err)

// As this is a string equality match, this should be a fast expression.
require.EqualValues(t, 2, e.FastLen())
require.EqualValues(t, 0, e.SlowLen())

found, evalCount, err := e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"status": "ok",
"ids": []any{"order_abc", "order_xyz"},
},
},
})
require.NoError(t, err)
require.EqualValues(t, 1, evalCount)
require.Equal(t, 1, len(found))
require.Equal(t, ex, found[0])
})
})
}

// newTestEvaluator
func newTestEvaluator() AggregateEvaluator[testEvaluable] {
parser := NewTreeParser(NewCachingCompiler(newEnv(), nil))
return NewAggregateEvaluator(AggregateEvaluatorOpts[testEvaluable]{
Parser: parser,
Eval: testBoolEvaluator,
Concurrency: 0,
})
}

// func BenchmarkNonCachingEvaluate1_000(b *testing.B) { benchEval(1_000, EnvParser(newEnv()), b) }
func benchEval(i int, p CELCompiler, b *testing.B) {
for n := 0; n < b.N; n++ {
Expand Down
Loading