diff --git a/analyzer/server.go b/analyzer/server.go index 5f1019e862..9fac92a45c 100644 --- a/analyzer/server.go +++ b/analyzer/server.go @@ -336,6 +336,7 @@ func NewServerFromConfig() (*Server, error) { tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension()) tr.AddTraversalExtension(ge.NewNextHopTraversalExtension()) tr.AddTraversalExtension(ge.NewGroupTraversalExtension()) + tr.AddTraversalExtension(ge.NewMergeTraversalExtension()) // new flow subscriber endpoints flowSubscriberWSServer := ws.NewStructServer(config.NewWSServer(hub.HTTPServer(), "/ws/subscriber/flow", apiAuthBackend)) diff --git a/go.mod b/go.mod index 1a6b12688e..773d68e8d0 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.0 - github.com/stretchr/testify v1.6.1 // indirect + github.com/stretchr/testify v1.7.0 github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c github.com/tebeka/go2xunit v1.4.10 github.com/tebeka/selenium v0.0.0-20170314201507-657e45ec600f diff --git a/go.sum b/go.sum index 160408838c..80faa1e473 100644 --- a/go.sum +++ b/go.sum @@ -1001,8 +1001,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20160928074757-e7cb7fa329f4/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= diff --git a/graffiti/go.mod b/graffiti/go.mod index 0e8daf56ef..e24a38c5ab 100644 --- a/graffiti/go.mod +++ b/graffiti/go.mod @@ -40,7 +40,7 @@ require ( github.com/skydive-project/go-debouncer v1.0.0 github.com/spf13/cast v1.3.1 github.com/spf13/cobra v1.1.1 - github.com/stretchr/testify v1.5.1 + github.com/stretchr/testify v1.7.0 github.com/tchap/zapext v1.0.0 github.com/xeipuuv/gojsonschema v1.2.0 go.uber.org/zap v1.16.0 diff --git a/gremlin/traversal/merge.go b/gremlin/traversal/merge.go new file mode 100644 index 0000000000..9b21864d3c --- /dev/null +++ b/gremlin/traversal/merge.go @@ -0,0 +1,258 @@ +package traversal + +import ( + "fmt" + "reflect" + "time" + + "github.com/pkg/errors" + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/graffiti/graph/traversal" + "github.com/skydive-project/skydive/graffiti/logging" +) + +// MergeTraversalExtension describes a new extension to enhance the topology +type MergeTraversalExtension struct { + MergeToken traversal.Token +} + +// MergeGremlinTraversalStep step aggregates elements from different revisions of the nodes into a new metadata key. +// Nodes returned by this step are copies of the nodes in the graph, not the actual nodes. +// The reason is because this step is not meant to modify nodes in the graph, just for the output. +// This step should be used with a presistant backend, so it can access previous revisions of the nodes. +// To use this step we should select a metadata key (first parameter), where the elements will be read from. +// Inside this Metadata.Key elements should have the format map[interface{}]interface{} (could be a type based on that). +// The second parameter is the metadata key where all the elements will be aggregated. +// The aggregation will with the format: map[string][]interface{}. +// All elements with the same key in the map will be joined in an slice. +// To use this step we can use a graph with a time period context, eg: G.At(1479899809,3600).V().Merge('A','B'). +// Or we can define the time period in the step: G.V().Merge('A','B',1500000000,1500099999). +// Note that in this case we define the start and end time, while in "At" is start time and duration. +// In both cases, Merge step will use the nodes given by the previous step. +type MergeGremlinTraversalStep struct { + traversal.GremlinTraversalContext + MergeKey string + MergeAggKey string + StartTime time.Time + EndTime time.Time +} + +// NewMergeTraversalExtension returns a new graph traversal extension +func NewMergeTraversalExtension() *MergeTraversalExtension { + return &MergeTraversalExtension{ + MergeToken: traversalMergeToken, + } +} + +// ScanIdent recognise the word associated with this step (in uppercase) and return a token +// which represents it. Return true if it have found a match +func (e *MergeTraversalExtension) ScanIdent(s string) (traversal.Token, bool) { + switch s { + case "MERGE": + return e.MergeToken, true + } + return traversal.IDENT, false +} + +// ParseStep generate a step for a given token, having in 'p' context and params +func (e *MergeTraversalExtension) ParseStep(t traversal.Token, p traversal.GremlinTraversalContext) (traversal.GremlinTraversalStep, error) { + switch t { + case e.MergeToken: + default: + return nil, nil + } + + var mergeKey, mergeAggKey string + var startTime, endTime time.Time + var ok bool + + switch len(p.Params) { + case 2: + mergeKey, ok = p.Params[0].(string) + if !ok { + return nil, errors.New("Merge first parameter have to be a string") + } + mergeAggKey, ok = p.Params[1].(string) + if !ok { + return nil, errors.New("Merge second parameter have to be a string") + } + case 4: + mergeKey, ok = p.Params[0].(string) + if !ok { + return nil, errors.New("Merge first parameter have to be a string") + } + mergeAggKey, ok = p.Params[1].(string) + if !ok { + return nil, errors.New("Merge second parameter have to be a string") + } + startTimeUnixEpoch, ok := p.Params[2].(int64) + if !ok { + return nil, errors.New("Merge third parameter have to be a int (unix epoch time)") + } + startTime = time.Unix(startTimeUnixEpoch, 0) + endTimeUnixEpoch, ok := p.Params[3].(int64) + if !ok { + return nil, errors.New("Merge fourth parameter have to be a int (unix epoch time)") + } + endTime = time.Unix(endTimeUnixEpoch, 0) + default: + return nil, errors.New("Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)") + } + + return &MergeGremlinTraversalStep{ + GremlinTraversalContext: p, + MergeKey: mergeKey, + MergeAggKey: mergeAggKey, + StartTime: startTime, + EndTime: endTime, + }, nil +} + +// Exec executes the merge step +func (s *MergeGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) { + switch tv := last.(type) { + case *traversal.GraphTraversalV: + return s.InterfaceMerge(tv) + + } + return nil, traversal.ErrExecutionError +} + +// Reduce merge step +func (s *MergeGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) { + return next, nil +} + +// Context merge step +func (s *MergeGremlinTraversalStep) Context() *traversal.GremlinTraversalContext { + return &s.GremlinTraversalContext +} + +// InterfaceMerge for each node id, group all the elements stored in Metadata.key from the +// input nodes and put them into the newest node for each id into Metadata.aggKey. +// Merge are groupped based on its key. See mergedMetadata for an example. +// All output nodes will have Metadata.aggKey defined (empty or not). +func (s *MergeGremlinTraversalStep) InterfaceMerge(tv *traversal.GraphTraversalV) (traversal.GraphTraversalStep, error) { + // If user has defined start/end time in the parameters, use that values instead of the ones comming with the graph + if !s.StartTime.IsZero() && !s.EndTime.IsZero() { + timeSlice := graph.NewTimeSlice( + graph.Time(s.StartTime).UnixMilli(), + graph.Time(s.EndTime).UnixMilli(), + ) + userTimeSliceCtx := graph.Context{ + TimeSlice: timeSlice, + TimePoint: true, + } + + newGraph, err := tv.GraphTraversal.Graph.CloneWithContext(userTimeSliceCtx) + if err != nil { + return nil, err + } + tv.GraphTraversal.Graph = newGraph + } + + tv.GraphTraversal.RLock() + defer tv.GraphTraversal.RUnlock() + + // uniqNodes store the latest node for each node identifier + uniqNodes := map[graph.Identifier]*graph.Node{} + + // elements accumulate the elements for each node id + elements := map[graph.Identifier]map[string][]interface{}{} + + // Get the list of node ids + nodesIDs := make([]graph.Identifier, 0, len(tv.GetNodes())) + for _, node := range tv.GetNodes() { + nodesIDs = append(nodesIDs, node.ID) + } + + // Get all revision for the list of node ids + revisionNodes := tv.GraphTraversal.Graph.GetNodesFromIDs(nodesIDs) + + // Store only the most recent nodes + for _, rNode := range revisionNodes { + storedNode, ok := uniqNodes[rNode.ID] + if !ok { + uniqNodes[rNode.ID] = rNode + } else { + if storedNode.Revision < rNode.Revision { + uniqNodes[rNode.ID] = rNode + } + } + + // Store elements from all revisions into the "elements" variable + elements[rNode.ID] = mergeMetadata(rNode, s.MergeKey, elements[rNode.ID]) + } + + // Move the nodes from the uniqNodes map to an slice required by TraversalV + // Return a copy of the nodes, not the actual graph nodes, because we don't want + // to modify nodes with this step, just append some extra info + nodes := []*graph.Node{} + for id, n := range uniqNodes { + nCopy := n.Copy() + + e, ok := elements[id] + if ok { + // Set the stored node with the merge of previous and current node + metadataSet := nCopy.Metadata.SetField(s.MergeAggKey, e) + if !metadataSet { + return nil, fmt.Errorf("unable to set elements metadata for copied node %v", id) + } + } + + nodes = append(nodes, nCopy) + } + + return traversal.NewGraphTraversalV(tv.GraphTraversal, nodes), nil +} + +// mergeMetadata return the merge of node.Key elements with the ones already stored in nodeMerge +// Eg.: +// node: Metadata.key: {"a":{x}, "b":{y}} +// nodeMerge: {"a":[{z}]} +// return: Metadata.key: {"a":[{x},{z}], "b":[{y}]} +// +// Ignore if Metadata.key has an invalid format (not a map). +// Reflect is used to be able to access map's defined in different types. +// Element aggregation data type should be map[string]interface{} to be able to be encoded with JSON +func mergeMetadata(node *graph.Node, key string, nodeMerge map[string][]interface{}) map[string][]interface{} { + if nodeMerge == nil { + nodeMerge = map[string][]interface{}{} + } + + n1MergeIface, n1Err := node.GetField(key) + + if n1Err == nil { + // Ignore Metadata.key values which are not a map + n1MergeValue := reflect.ValueOf(n1MergeIface) + + // If the metadata value is a pointer, resolve it + if n1MergeValue.Kind() == reflect.Ptr { + n1MergeValue = n1MergeValue.Elem() + } + + // Merge step only accepts a map as data origin + if n1MergeValue.Kind() != reflect.Map { + logging.GetLogger().Errorf("Invalid type for elements, expecting a map, but it is %v", n1MergeValue.Kind()) + return nodeMerge + } + + iter := n1MergeValue.MapRange() + NODE_MERGE: + for iter.Next() { + k := fmt.Sprintf("%v", iter.Key().Interface()) + v := iter.Value().Interface() + + // Do not append if the same element already exists + for _, storedElement := range nodeMerge[k] { + if reflect.DeepEqual(storedElement, v) { + continue NODE_MERGE + } + } + + nodeMerge[k] = append(nodeMerge[k], v) + } + } + + return nodeMerge +} diff --git a/gremlin/traversal/merge_test.go b/gremlin/traversal/merge_test.go new file mode 100644 index 0000000000..cc8b25d2c8 --- /dev/null +++ b/gremlin/traversal/merge_test.go @@ -0,0 +1,610 @@ +package traversal + +import ( + "testing" + "time" + + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/graffiti/graph/traversal" + "github.com/stretchr/testify/assert" +) + +// FakeMergeGraphBackend simulate a backend with history that could store different revisions of nodes +type FakeMergeGraphBackend struct { + graph.MemoryBackend + Nodes []*graph.Node +} + +func (b *FakeMergeGraphBackend) IsHistorySupported() bool { + return true +} + +func (b *FakeMergeGraphBackend) GetNode(i graph.Identifier, at graph.Context) []*graph.Node { + nodes := []*graph.Node{} + for _, n := range b.Nodes { + if n.ID == i { + nodes = append(nodes, n) + } + } + return nodes +} + +func (b *FakeMergeGraphBackend) GetNodesFromIDs(identifierList []graph.Identifier, at graph.Context) []*graph.Node { + nodes := []*graph.Node{} + for _, n := range b.Nodes { + for _, i := range identifierList { + if n.ID == i { + nodes = append(nodes, n) + } + } + } + return nodes +} + +func TestMergeMetadataNilNodeMerge(t *testing.T) { + key := "Merge" + + metadataNode1 := graph.Metadata{key: map[string]interface{}{ + "abc": map[interface{}]string{"descr": "foo"}, + }} + node := CreateNode("nodeA", metadataNode1, graph.TimeUTC(), 1) + + nodeMergeAgg := mergeMetadata(node, key, nil) + + expected := map[string][]interface{}{ + "abc": { + map[interface{}]string{"descr": "foo"}, + }, + } + + assert.Equal(t, expected, nodeMergeAgg) +} + +func TestMergeMetadataPointerValue(t *testing.T) { + key := "Merge" + + value := map[string]interface{}{ + "abc": map[interface{}]string{"descr": "foo"}, + } + + metadataNode1 := graph.Metadata{key: &value} + node := CreateNode("nodeA", metadataNode1, graph.TimeUTC(), 1) + + nodeMergeAgg := mergeMetadata(node, key, nil) + + expected := map[string][]interface{}{ + "abc": { + map[interface{}]string{"descr": "foo"}, + }, + } + + assert.Equal(t, expected, nodeMergeAgg) +} + +func TestMergeMetadata(t *testing.T) { + tests := []struct { + name string + nodesMerge []interface{} + expected map[string][]interface{} + }{ + { + name: "no nodes", + expected: map[string][]interface{}{}, + }, + { + name: "one node", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + }, + }, + }, + { + name: "two nodes, different keys", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }, + map[string]interface{}{ + "xyz": map[string]string{"descr": "bar"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + }, + "xyz": { + map[string]string{"descr": "bar"}, + }, + }, + }, + { + name: "two nodes, same keys", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }, + map[string]interface{}{ + "abc": map[string]string{"descr": "bar"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + map[string]string{"descr": "bar"}, + }, + }, + }, + { + name: "two nodes, repeating one event, should be removed", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }, + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + "xxx": map[string]string{"descr": "bar"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + }, + "xxx": { + map[string]string{"descr": "bar"}, + }, + }, + }, + } + + key := "Merge" + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + nodeMergeAgg := map[string][]interface{}{} + + for _, nodeMerge := range test.nodesMerge { + metadataNode1 := graph.Metadata{key: nodeMerge} + node := CreateNode("nodeA", metadataNode1, graph.TimeUTC(), 1) + + nodeMergeAgg = mergeMetadata(node, key, nodeMergeAgg) + } + + assert.Equal(t, test.expected, nodeMergeAgg) + }) + } +} + +func TestInterfaceMerge(t *testing.T) { + tests := []struct { + name string + InNodes []*graph.Node + key string + aggKey string + startTime time.Time + endTime time.Time + // Expected nodes + OutNodes []*graph.Node + }{ + { + name: "no input nodes", + }, + { + // Node passes the step without being modified + name: "one input node without key defined", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{}, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one input node with key defined but empty", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one input node with key defined and one element", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one input node with a complex element", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{ + "e1": map[string]interface{}{ + "desc": "a", + "TTL": 45, + "Payload": []interface{}{ + map[string]interface{}{"Key": "foo"}, + map[string]interface{}{"Value": "bar"}, + }, + }, + }, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{ + "e1": map[string]interface{}{ + "desc": "a", + "TTL": 45, + "Payload": []interface{}{ + map[string]interface{}{"Key": "foo"}, + map[string]interface{}{"Value": "bar"}, + }, + }, + }, + "MergeAgg": map[string][]interface{}{ + "e1": { + map[string]interface{}{ + "desc": "a", + "TTL": 45, + "Payload": []interface{}{ + map[string]interface{}{"Key": "foo"}, + map[string]interface{}{"Value": "bar"}, + }, + }, + }, + }, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "two different input nodes with key defined and one element each one", + key: "Merge", + aggKey: "MergeAxx", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("B", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAxx": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("B", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAxx": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one node, with a previous version, both without key defined", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{}, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{}, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, both with key defined but empty", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, both with key defined, same event different content", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + map[string]interface{}{"desc": "b"}, + }}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, first one without event, second one with event", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "b"}, + }}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, first one with event, second one without event", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with two previous versions, first with, second without, third with", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "c"}}, + }, graph.Time(time.Unix(0, 0)), 3), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "c"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + map[string]interface{}{"desc": "c"}, + }}, + }, graph.Time(time.Unix(0, 0)), 3), + }, + }, + { + name: "memory backend does not filter nodes by date, startTime and endTime are ignored", + key: "Merge", + aggKey: "MergeAgg", + startTime: time.Unix(100, 0), + endTime: time.Unix(200, 0), + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(300, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(300, 0)), 1), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + b := FakeMergeGraphBackend{ + Nodes: test.InNodes, + } + g := graph.NewGraph("testhost", &b, "analyzer.testhost") + + gt := traversal.NewGraphTraversal(g, false) + tvIn := traversal.NewGraphTraversalV(gt, test.InNodes) + + s := MergeGremlinTraversalStep{ + MergeKey: test.key, + MergeAggKey: test.aggKey, + StartTime: test.startTime, + EndTime: test.endTime, + } + ts, err := s.InterfaceMerge(tvIn) + if err != nil { + t.Error(err.Error()) + } + + tvOut, ok := ts.(*traversal.GraphTraversalV) + if !ok { + t.Errorf("Invalid GraphTraversal type") + } + + assert.ElementsMatch(t, test.OutNodes, tvOut.GetNodes()) + }) + } +} + +func TestInterfaceMergeDoNotModifyOriginNodes(t *testing.T) { + n := CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1) + + nCopy := CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1) + + b := FakeMergeGraphBackend{ + Nodes: []*graph.Node{n}, + } + g := graph.NewGraph("testhost", &b, "analyzer.testhost") + + gt := traversal.NewGraphTraversal(g, false) + tvIn := traversal.NewGraphTraversalV(gt, []*graph.Node{n}) + + s := MergeGremlinTraversalStep{ + MergeKey: "Merge", + MergeAggKey: "AggMerge", + } + _, err := s.InterfaceMerge(tvIn) + assert.Nil(t, err) + + // Node stored in the graph should not be modified + assert.Equal(t, b.GetNode("A", graph.Context{})[0], nCopy) +} + +func TestEventsParseStep(t *testing.T) { + tests := []struct { + name string + token traversal.Token + traversalCtx traversal.GremlinTraversalContext + expectedTraversalStep traversal.GremlinTraversalStep + expectedError string + }{ + { + name: "non merge token", + token: traversal.COUNT, + }, + { + name: "nil traversalCtx", + token: traversalMergeToken, + expectedError: "Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)", + }, + { + name: "only one param", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"foo"}, + }, + expectedError: "Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)", + }, + { + name: "two param not string", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{1, 2}, + }, + expectedError: "Merge first parameter have to be a string", + }, + { + name: "two string params", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey"}, + }, + expectedTraversalStep: &MergeGremlinTraversalStep{ + GremlinTraversalContext: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey"}, + }, + MergeKey: "key", + MergeAggKey: "aggKey", + }, + }, + { + name: "four valid params", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey", int64(1627987976), int64(1627987977)}, + }, + expectedTraversalStep: &MergeGremlinTraversalStep{ + GremlinTraversalContext: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey", int64(1627987976), int64(1627987977)}, + }, + MergeKey: "key", + MergeAggKey: "aggKey", + StartTime: time.Unix(1627987976, 0), + EndTime: time.Unix(1627987977, 0), + }, + }, + { + name: "invalid start date", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"foo", "bar", "123456789", "123456789"}, + }, + expectedError: "Merge third parameter have to be a int (unix epoch time)", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + e := MergeTraversalExtension{MergeToken: traversalMergeToken} + + traversalStep, err := e.ParseStep(test.token, test.traversalCtx) + if test.expectedError != "" { + assert.EqualErrorf(t, err, test.expectedError, "error") + } else { + assert.Nil(t, err, "nil error") + } + + assert.Equalf(t, test.expectedTraversalStep, traversalStep, "step") + }) + } +} + +// CreateNode func to create nodes with a specific node revision +func CreateNode(id string, m graph.Metadata, t graph.Time, revision int64) *graph.Node { + n := graph.CreateNode(graph.Identifier(id), m, t, "host", "orig") + n.Revision = revision + return n +} diff --git a/gremlin/traversal/neighbors.go b/gremlin/traversal/neighbors.go index 305048d670..ec3eee009c 100644 --- a/gremlin/traversal/neighbors.go +++ b/gremlin/traversal/neighbors.go @@ -179,6 +179,10 @@ func (d *NeighborsGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) // Reduce Neighbors step func (d *NeighborsGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) { + // Merge step only needs the ids of nodes. Saving some queries. + if _, ok := next.(*MergeGremlinTraversalStep); ok { + d.nextStepOnlyIDs = true + } return next, nil } diff --git a/gremlin/traversal/token.go b/gremlin/traversal/token.go index bce0a9c46d..3995efd185 100644 --- a/gremlin/traversal/token.go +++ b/gremlin/traversal/token.go @@ -35,4 +35,5 @@ const ( traversalMoreThanToken traversal.Token = 1013 traversalAscendantsToken traversal.Token = 1014 traversalNeighborsToken traversal.Token = 1015 + traversalMergeToken traversal.Token = 1016 ) diff --git a/validator/validator.go b/validator/validator.go index 8d00717d2d..5d041794d3 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -180,6 +180,7 @@ func isGremlinExpr(v interface{}, param string) error { tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension()) tr.AddTraversalExtension(ge.NewNextHopTraversalExtension()) tr.AddTraversalExtension(ge.NewGroupTraversalExtension()) + tr.AddTraversalExtension(ge.NewMergeTraversalExtension()) if _, err := tr.Parse(strings.NewReader(query)); err != nil { return GremlinNotValid(err)