Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 309 additions & 0 deletions engine_stringmap_bitmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
package expr

import (
"context"
"fmt"
"strconv"
"sync"

"github.com/RoaringBitmap/roaring"
"github.com/cespare/xxhash/v2"
"github.com/google/cel-go/common/operators"
"github.com/ohler55/ojg/jp"
)

// bitmapStringLookup is an optimized version of stringLookup that uses Roaring Bitmaps
// for much faster set operations and reduced memory usage
type bitmapStringLookup struct {
// Use sharded locks to reduce contention
shards [64]struct {
mu sync.RWMutex
// For each field path, store bitmaps of pause IDs that match specific values
equality map[string]map[string]*roaring.Bitmap // fieldPath -> hashedValue -> bitmap
inequality map[string]map[string]*roaring.Bitmap // fieldPath -> hashedValue -> bitmap
in map[string]map[string]*roaring.Bitmap // fieldPath -> hashedValue -> bitmap
}

// Global tracking of all fields we've seen
vars map[string]struct{}
varsMu sync.RWMutex

// Mapping from pause ID to stored expression parts for final lookups
pauseIndex map[uint32]*StoredExpressionPart
pauseIndexMu sync.RWMutex

concurrency int64
nextPauseID uint32
idMu sync.Mutex
}

func newBitmapStringEqualityMatcher(concurrency int64) MatchingEngine {
engine := &bitmapStringLookup{
vars: make(map[string]struct{}),
pauseIndex: make(map[uint32]*StoredExpressionPart),
concurrency: concurrency,
}

// Initialize shards
for i := range engine.shards {
engine.shards[i].equality = make(map[string]map[string]*roaring.Bitmap)
engine.shards[i].inequality = make(map[string]map[string]*roaring.Bitmap)
engine.shards[i].in = make(map[string]map[string]*roaring.Bitmap)
}

return engine
}

func (b *bitmapStringLookup) Type() EngineType {
return EngineTypeStringHash
}

func (b *bitmapStringLookup) getShard(key string) *struct {
mu sync.RWMutex
equality map[string]map[string]*roaring.Bitmap
inequality map[string]map[string]*roaring.Bitmap
in map[string]map[string]*roaring.Bitmap
} {
hash := xxhash.Sum64String(key)
return &b.shards[hash%64]
}

func (b *bitmapStringLookup) getNextPauseID() uint32 {
b.idMu.Lock()
defer b.idMu.Unlock()
b.nextPauseID++
return b.nextPauseID
}

func (b *bitmapStringLookup) hash(input string) string {
ui := xxhash.Sum64String(input)
return strconv.FormatUint(ui, 36)
}

func (b *bitmapStringLookup) Match(ctx context.Context, input map[string]any, result *MatchResult) error {
// Instead of doing complex bitmap operations, let's use the same logic as the original
// but optimize the storage with bitmaps. We'll collect all matching pause IDs
// and let the group validation logic in the main aggregator handle the filtering.

b.varsMu.RLock()
fieldPaths := make([]string, 0, len(b.vars))
for path := range b.vars {
fieldPaths = append(fieldPaths, path)
}
b.varsMu.RUnlock()

// For each field path we track, check if it exists in the input and collect matches
for _, path := range fieldPaths {
shard := b.getShard(path)
shard.mu.RLock()

x, err := jp.ParseString(path)
if err != nil {
shard.mu.RUnlock()
continue
}

res := x.Get(input)
if len(res) == 0 {
res = []any{""}
}

switch val := res[0].(type) {
case string:
hashedVal := b.hash(val)

// Check equality matches
if valueMap, exists := shard.equality[path]; exists {
if bitmap, exists := valueMap[hashedVal]; exists {
b.addBitmapMatches(bitmap, result)
}
}

// Check inequality matches (all except this value)
if valueMap, exists := shard.inequality[path]; exists {
for value, bitmap := range valueMap {
if value != hashedVal {
b.addBitmapMatches(bitmap, result)
}
}
}

case []any:
// Handle 'in' operations for arrays
for _, item := range val {
if str, ok := item.(string); ok {
hashedVal := b.hash(str)
if valueMap, exists := shard.in[path]; exists {
if bitmap, exists := valueMap[hashedVal]; exists {
b.addBitmapMatches(bitmap, result)
}
}
}
}
case []string:
// Handle 'in' operations for string arrays
for _, str := range val {
hashedVal := b.hash(str)
if valueMap, exists := shard.in[path]; exists {
if bitmap, exists := valueMap[hashedVal]; exists {
b.addBitmapMatches(bitmap, result)
}
}
}
}

shard.mu.RUnlock()
}

return nil
}

// addBitmapMatches converts bitmap results to MatchResult format
func (b *bitmapStringLookup) addBitmapMatches(bitmap *roaring.Bitmap, result *MatchResult) {
b.pauseIndexMu.RLock()
defer b.pauseIndexMu.RUnlock()

for _, pauseID := range bitmap.ToArray() {
if part, exists := b.pauseIndex[pauseID]; exists {
result.Add(part.EvaluableID, part.GroupID)
}
}
}

func (b *bitmapStringLookup) Search(ctx context.Context, variable string, input any, result *MatchResult) {
// This method is kept for interface compatibility but uses the same logic as Match
testInput := map[string]any{variable: input}
_ = b.Match(ctx, testInput, result) // Error is already handled in Match
}

func (b *bitmapStringLookup) Add(ctx context.Context, p ExpressionPart) error {
// Generate a unique pause ID for this expression part
pauseID := b.getNextPauseID()

// Store the mapping from pause ID to expression part
b.pauseIndexMu.Lock()
b.pauseIndex[pauseID] = p.ToStored()
b.pauseIndexMu.Unlock()

// Track the variable
b.varsMu.Lock()
b.vars[p.Predicate.Ident] = struct{}{}
b.varsMu.Unlock()

shard := b.getShard(p.Predicate.Ident)
shard.mu.Lock()
defer shard.mu.Unlock()

switch p.Predicate.Operator {
case operators.Equals:
hashedVal := b.hash(p.Predicate.LiteralAsString())

if shard.equality[p.Predicate.Ident] == nil {
shard.equality[p.Predicate.Ident] = make(map[string]*roaring.Bitmap)
}
if shard.equality[p.Predicate.Ident][hashedVal] == nil {
shard.equality[p.Predicate.Ident][hashedVal] = roaring.New()
}
shard.equality[p.Predicate.Ident][hashedVal].Add(pauseID)

case operators.NotEquals:
hashedVal := b.hash(p.Predicate.LiteralAsString())

if shard.inequality[p.Predicate.Ident] == nil {
shard.inequality[p.Predicate.Ident] = make(map[string]*roaring.Bitmap)
}
if shard.inequality[p.Predicate.Ident][hashedVal] == nil {
shard.inequality[p.Predicate.Ident][hashedVal] = roaring.New()
}
shard.inequality[p.Predicate.Ident][hashedVal].Add(pauseID)

case operators.In:
if str, ok := p.Predicate.Literal.(string); ok {
hashedVal := b.hash(str)

if shard.in[p.Predicate.Ident] == nil {
shard.in[p.Predicate.Ident] = make(map[string]*roaring.Bitmap)
}
if shard.in[p.Predicate.Ident][hashedVal] == nil {
shard.in[p.Predicate.Ident][hashedVal] = roaring.New()
}
shard.in[p.Predicate.Ident][hashedVal].Add(pauseID)
}

default:
return fmt.Errorf("BitmapStringHash engines only support string equality/inequality/in operations")
}

return nil
}

func (b *bitmapStringLookup) Remove(ctx context.Context, p ExpressionPart) error {
// Find the pause ID for this expression part
var pauseIDToRemove uint32
var found bool

b.pauseIndexMu.RLock()
for pauseID, stored := range b.pauseIndex {
if p.EqualsStored(stored) {
pauseIDToRemove = pauseID
found = true
break
}
}
b.pauseIndexMu.RUnlock()

if !found {
return ErrExpressionPartNotFound
}

// Remove from pause index
b.pauseIndexMu.Lock()
delete(b.pauseIndex, pauseIDToRemove)
b.pauseIndexMu.Unlock()

shard := b.getShard(p.Predicate.Ident)
shard.mu.Lock()
defer shard.mu.Unlock()

switch p.Predicate.Operator {
case operators.Equals:
hashedVal := b.hash(p.Predicate.LiteralAsString())
if valueMap, exists := shard.equality[p.Predicate.Ident]; exists {
if bitmap, exists := valueMap[hashedVal]; exists {
bitmap.Remove(pauseIDToRemove)
if bitmap.IsEmpty() {
delete(valueMap, hashedVal)
}
}
}

case operators.NotEquals:
hashedVal := b.hash(p.Predicate.LiteralAsString())
if valueMap, exists := shard.inequality[p.Predicate.Ident]; exists {
if bitmap, exists := valueMap[hashedVal]; exists {
bitmap.Remove(pauseIDToRemove)
if bitmap.IsEmpty() {
delete(valueMap, hashedVal)
}
}
}

case operators.In:
if str, ok := p.Predicate.Literal.(string); ok {
hashedVal := b.hash(str)
if valueMap, exists := shard.in[p.Predicate.Ident]; exists {
if bitmap, exists := valueMap[hashedVal]; exists {
bitmap.Remove(pauseIDToRemove)
if bitmap.IsEmpty() {
delete(valueMap, hashedVal)
}
}
}
}

default:
return fmt.Errorf("BitmapStringHash engines only support string equality/inequality/in operations")
}

return nil
}
2 changes: 1 addition & 1 deletion expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewAggregateEvaluator[T Evaluable](
eval: opts.Eval,
parser: opts.Parser,
engines: map[EngineType]MatchingEngine{
EngineTypeStringHash: newStringEqualityMatcher(opts.Concurrency),
EngineTypeStringHash: newBitmapStringEqualityMatcher(opts.Concurrency),
EngineTypeNullMatch: newNullMatcher(opts.Concurrency),
EngineTypeBTree: newNumberMatcher(opts.Concurrency),
},
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ require (

require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/RoaringBitmap/roaring v1.9.4 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
Expand All @@ -33,6 +35,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.15.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4=
Expand Down Expand Up @@ -58,6 +62,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/ohler55/ojg v1.21.0 h1:niqSS6yl3PQZJrqh7pKs/zinl4HebGe8urXEfpvlpYY=
github.com/ohler55/ojg v1.21.0/go.mod h1:gQhDVpQLqrmnd2eqGAvJtn+NfKoYJbe/A4Sj3/Vro4o=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
Expand All @@ -84,6 +90,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
Expand Down Expand Up @@ -142,5 +149,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
19 changes: 19 additions & 0 deletions vendor/github.com/RoaringBitmap/roaring/.drone.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading