Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 30 additions & 56 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type AggregateEvaluator[T Evaluable] interface {
Evaluate(ctx context.Context, data map[string]any) ([]T, int32, error)

// AggregateMatch returns all expression parts which are evaluable given the input data.
AggregateMatch(ctx context.Context, data map[string]any) ([]*StoredExpressionPart, error)
AggregateMatch(ctx context.Context, data map[string]any) ([]*uuid.UUID, error)

// Len returns the total number of aggregateable and constantly matched expressions
// stored in the evaluator.
Expand Down Expand Up @@ -286,8 +286,8 @@ func (a *aggregator[T]) Evaluate(ctx context.Context, data map[string]any) ([]T,
mpool := newErrPool(errPoolOpts{concurrency: a.concurrency})

a.lock.RLock()
for _, expr := range matches {
eval, err := a.kv.Get(expr.EvaluableID)
for _, id := range matches {
eval, err := a.kv.Get(*id)
if err != nil {
continue
}
Expand Down Expand Up @@ -339,8 +339,8 @@ func (a *aggregator[T]) Evaluate(ctx context.Context, data map[string]any) ([]T,

// AggregateMatch attempts to match incoming data to all PredicateTrees, resulting in a selection
// of parts of an expression that have matched.
func (a *aggregator[T]) AggregateMatch(ctx context.Context, data map[string]any) ([]*StoredExpressionPart, error) {
result := []*StoredExpressionPart{}
func (a *aggregator[T]) AggregateMatch(ctx context.Context, data map[string]any) ([]*uuid.UUID, error) {
result := []*uuid.UUID{}

a.lock.RLock()
defer a.lock.RUnlock()
Expand All @@ -353,9 +353,7 @@ func (a *aggregator[T]) AggregateMatch(ctx context.Context, data map[string]any)
// Note that having a count >= the group ID value does not guarantee that the expression is valid.
//
// Note that we break this down per evaluable ID (UUID)
totalCounts := map[uuid.UUID]map[groupID]int{}
// Store all expression parts per group ID for returning.
found := map[uuid.UUID]map[groupID][]*StoredExpressionPart{}
found := map[uuid.UUID]map[groupID]int{}

for _, engine := range a.engines {
// we explicitly ignore the deny path for now.
Expand All @@ -366,63 +364,39 @@ func (a *aggregator[T]) AggregateMatch(ctx context.Context, data map[string]any)

// Add all found items from the engine to the above list.
for _, eval := range matched {
idCount, idFound := totalCounts[eval.EvaluableID], found[eval.EvaluableID]

if idCount == nil {
idCount = map[groupID]int{}
idFound = map[groupID][]*StoredExpressionPart{}
}

idCount[eval.GroupID] += 1
if _, ok := idFound[eval.GroupID]; !ok {
idFound[eval.GroupID] = []*StoredExpressionPart{}
if _, ok := found[eval.EvaluableID]; !ok {
found[eval.EvaluableID] = map[groupID]int{}
}
idFound[eval.GroupID] = append(idFound[eval.GroupID], eval)

// Update mapping
totalCounts[eval.EvaluableID] = idCount
found[eval.EvaluableID] = idFound
found[eval.EvaluableID][eval.GroupID]++
}

}

seen := map[uuid.UUID]struct{}{}

// Validate that groups meet the minimum size.
for evalID, counts := range totalCounts {
for groupID, matchingCount := range counts {

for evalID, groups := range found {
for groupID, matchingCount := range groups {
requiredSize := int(groupID.Size()) // The total req size from the group ID

if matchingCount >= requiredSize {
for _, i := range found[evalID][groupID] {
if _, ok := seen[i.EvaluableID]; ok {
continue
}
seen[i.EvaluableID] = struct{}{}
result = append(result, i)
}
continue
// If this group isn't the required size, delete the group
// from our map
if matchingCount < requiredSize {
delete(groups, groupID)
}

// If this is a partial eval, always add it if there's a match for now.

// The GroupID required more comparisons to equate to true than
// we had, so this could never evaluate to true. Skip this.
//
// NOTE: We currently don't add items with OR predicates to the
// matching engine, so we cannot use group sizes if the expr part
// has an OR.
for _, i := range found[evalID][groupID] {
// if this is purely aggregateable, we're safe to rely on group IDs.
//
// So, we only need to care if this expression is mixed. If it's mixed,
// we can ignore group IDs for the time being.
if _, ok := a.mixed[i.EvaluableID]; ok {
// this wasn't fully aggregatable so evaluate it.
result = append(result, i)
}
}
}

// After iterating through each group, we now know:
//
// if len(groups) > 0, we have enough matches in this eval group for
// it to be a candidate.
hasMatchedGroups := len(groups) > 0

// NOTE: We currently don't add items with OR predicates to the
// matching engine, so we cannot use group sizes if the expr part
// has an OR.
_, isMixedOrs := a.mixed[evalID]

if hasMatchedGroups || isMixedOrs {
result = append(result, &evalID)
}
}

Expand Down