From b3bbeb8020c7a7cc35c5b818123b61780765fed5 Mon Sep 17 00:00:00 2001 From: Adrian Lopez Date: Mon, 9 Aug 2021 12:26:09 +0200 Subject: [PATCH] Merge step Aggregates elements from different revisions of the nodes into a new metadata key. Given a metadata element, that should be a map[string]interface{}, aggregate different values into another metadata key with format map[string][]interface{} Eg.: Metadata.data V1: {"a":{x}, "b":{y}} Metadata.data V2: {"a":{z}, "b":{y}} Metadata.agg: {"a":[{x},{z}], "b":[{y}]} It's purpose its to show data from past revisions of the same node. Example: G.At(1479899809,3600).V().Merge('data','agg') It could be also called defining the time slice in the parameters (since, from): G.V().Merge('A','B',1500000000,1500099999) This step return a modified copy of the last node, with all the aggregated data, not the node stored in the graph. This is to avoid modiying the node stored in the graph. This PR also modifies the Reduce method of the Neighbors step. Merge step only needs the node IDs, so Neighbors step could skip retrieving the full content of nodes. --- analyzer/server.go | 1 + go.mod | 2 +- graffiti/go.mod | 2 +- gremlin/traversal/merge.go | 258 ++++++++++++++ gremlin/traversal/merge_test.go | 610 ++++++++++++++++++++++++++++++++ gremlin/traversal/neighbors.go | 4 + gremlin/traversal/token.go | 1 + validator/validator.go | 1 + 8 files changed, 877 insertions(+), 2 deletions(-) create mode 100644 gremlin/traversal/merge.go create mode 100644 gremlin/traversal/merge_test.go diff --git a/analyzer/server.go b/analyzer/server.go index 5f1019e862..9fac92a45c 100644 --- a/analyzer/server.go +++ b/analyzer/server.go @@ -336,6 +336,7 @@ func NewServerFromConfig() (*Server, error) { tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension()) tr.AddTraversalExtension(ge.NewNextHopTraversalExtension()) tr.AddTraversalExtension(ge.NewGroupTraversalExtension()) + tr.AddTraversalExtension(ge.NewMergeTraversalExtension()) // new flow subscriber endpoints flowSubscriberWSServer := ws.NewStructServer(config.NewWSServer(hub.HTTPServer(), "/ws/subscriber/flow", apiAuthBackend)) diff --git a/go.mod b/go.mod index 1a6b12688e..773d68e8d0 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.0 - github.com/stretchr/testify v1.6.1 // indirect + github.com/stretchr/testify v1.7.0 github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c github.com/tebeka/go2xunit v1.4.10 github.com/tebeka/selenium v0.0.0-20170314201507-657e45ec600f diff --git a/graffiti/go.mod b/graffiti/go.mod index 0e8daf56ef..e24a38c5ab 100644 --- a/graffiti/go.mod +++ b/graffiti/go.mod @@ -40,7 +40,7 @@ require ( github.com/skydive-project/go-debouncer v1.0.0 github.com/spf13/cast v1.3.1 github.com/spf13/cobra v1.1.1 - github.com/stretchr/testify v1.5.1 + github.com/stretchr/testify v1.7.0 github.com/tchap/zapext v1.0.0 github.com/xeipuuv/gojsonschema v1.2.0 go.uber.org/zap v1.16.0 diff --git a/gremlin/traversal/merge.go b/gremlin/traversal/merge.go new file mode 100644 index 0000000000..9b21864d3c --- /dev/null +++ b/gremlin/traversal/merge.go @@ -0,0 +1,258 @@ +package traversal + +import ( + "fmt" + "reflect" + "time" + + "github.com/pkg/errors" + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/graffiti/graph/traversal" + "github.com/skydive-project/skydive/graffiti/logging" +) + +// MergeTraversalExtension describes a new extension to enhance the topology +type MergeTraversalExtension struct { + MergeToken traversal.Token +} + +// MergeGremlinTraversalStep step aggregates elements from different revisions of the nodes into a new metadata key. +// Nodes returned by this step are copies of the nodes in the graph, not the actual nodes. +// The reason is because this step is not meant to modify nodes in the graph, just for the output. +// This step should be used with a presistant backend, so it can access previous revisions of the nodes. +// To use this step we should select a metadata key (first parameter), where the elements will be read from. +// Inside this Metadata.Key elements should have the format map[interface{}]interface{} (could be a type based on that). +// The second parameter is the metadata key where all the elements will be aggregated. +// The aggregation will with the format: map[string][]interface{}. +// All elements with the same key in the map will be joined in an slice. +// To use this step we can use a graph with a time period context, eg: G.At(1479899809,3600).V().Merge('A','B'). +// Or we can define the time period in the step: G.V().Merge('A','B',1500000000,1500099999). +// Note that in this case we define the start and end time, while in "At" is start time and duration. +// In both cases, Merge step will use the nodes given by the previous step. +type MergeGremlinTraversalStep struct { + traversal.GremlinTraversalContext + MergeKey string + MergeAggKey string + StartTime time.Time + EndTime time.Time +} + +// NewMergeTraversalExtension returns a new graph traversal extension +func NewMergeTraversalExtension() *MergeTraversalExtension { + return &MergeTraversalExtension{ + MergeToken: traversalMergeToken, + } +} + +// ScanIdent recognise the word associated with this step (in uppercase) and return a token +// which represents it. Return true if it have found a match +func (e *MergeTraversalExtension) ScanIdent(s string) (traversal.Token, bool) { + switch s { + case "MERGE": + return e.MergeToken, true + } + return traversal.IDENT, false +} + +// ParseStep generate a step for a given token, having in 'p' context and params +func (e *MergeTraversalExtension) ParseStep(t traversal.Token, p traversal.GremlinTraversalContext) (traversal.GremlinTraversalStep, error) { + switch t { + case e.MergeToken: + default: + return nil, nil + } + + var mergeKey, mergeAggKey string + var startTime, endTime time.Time + var ok bool + + switch len(p.Params) { + case 2: + mergeKey, ok = p.Params[0].(string) + if !ok { + return nil, errors.New("Merge first parameter have to be a string") + } + mergeAggKey, ok = p.Params[1].(string) + if !ok { + return nil, errors.New("Merge second parameter have to be a string") + } + case 4: + mergeKey, ok = p.Params[0].(string) + if !ok { + return nil, errors.New("Merge first parameter have to be a string") + } + mergeAggKey, ok = p.Params[1].(string) + if !ok { + return nil, errors.New("Merge second parameter have to be a string") + } + startTimeUnixEpoch, ok := p.Params[2].(int64) + if !ok { + return nil, errors.New("Merge third parameter have to be a int (unix epoch time)") + } + startTime = time.Unix(startTimeUnixEpoch, 0) + endTimeUnixEpoch, ok := p.Params[3].(int64) + if !ok { + return nil, errors.New("Merge fourth parameter have to be a int (unix epoch time)") + } + endTime = time.Unix(endTimeUnixEpoch, 0) + default: + return nil, errors.New("Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)") + } + + return &MergeGremlinTraversalStep{ + GremlinTraversalContext: p, + MergeKey: mergeKey, + MergeAggKey: mergeAggKey, + StartTime: startTime, + EndTime: endTime, + }, nil +} + +// Exec executes the merge step +func (s *MergeGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) { + switch tv := last.(type) { + case *traversal.GraphTraversalV: + return s.InterfaceMerge(tv) + + } + return nil, traversal.ErrExecutionError +} + +// Reduce merge step +func (s *MergeGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) { + return next, nil +} + +// Context merge step +func (s *MergeGremlinTraversalStep) Context() *traversal.GremlinTraversalContext { + return &s.GremlinTraversalContext +} + +// InterfaceMerge for each node id, group all the elements stored in Metadata.key from the +// input nodes and put them into the newest node for each id into Metadata.aggKey. +// Merge are groupped based on its key. See mergedMetadata for an example. +// All output nodes will have Metadata.aggKey defined (empty or not). +func (s *MergeGremlinTraversalStep) InterfaceMerge(tv *traversal.GraphTraversalV) (traversal.GraphTraversalStep, error) { + // If user has defined start/end time in the parameters, use that values instead of the ones comming with the graph + if !s.StartTime.IsZero() && !s.EndTime.IsZero() { + timeSlice := graph.NewTimeSlice( + graph.Time(s.StartTime).UnixMilli(), + graph.Time(s.EndTime).UnixMilli(), + ) + userTimeSliceCtx := graph.Context{ + TimeSlice: timeSlice, + TimePoint: true, + } + + newGraph, err := tv.GraphTraversal.Graph.CloneWithContext(userTimeSliceCtx) + if err != nil { + return nil, err + } + tv.GraphTraversal.Graph = newGraph + } + + tv.GraphTraversal.RLock() + defer tv.GraphTraversal.RUnlock() + + // uniqNodes store the latest node for each node identifier + uniqNodes := map[graph.Identifier]*graph.Node{} + + // elements accumulate the elements for each node id + elements := map[graph.Identifier]map[string][]interface{}{} + + // Get the list of node ids + nodesIDs := make([]graph.Identifier, 0, len(tv.GetNodes())) + for _, node := range tv.GetNodes() { + nodesIDs = append(nodesIDs, node.ID) + } + + // Get all revision for the list of node ids + revisionNodes := tv.GraphTraversal.Graph.GetNodesFromIDs(nodesIDs) + + // Store only the most recent nodes + for _, rNode := range revisionNodes { + storedNode, ok := uniqNodes[rNode.ID] + if !ok { + uniqNodes[rNode.ID] = rNode + } else { + if storedNode.Revision < rNode.Revision { + uniqNodes[rNode.ID] = rNode + } + } + + // Store elements from all revisions into the "elements" variable + elements[rNode.ID] = mergeMetadata(rNode, s.MergeKey, elements[rNode.ID]) + } + + // Move the nodes from the uniqNodes map to an slice required by TraversalV + // Return a copy of the nodes, not the actual graph nodes, because we don't want + // to modify nodes with this step, just append some extra info + nodes := []*graph.Node{} + for id, n := range uniqNodes { + nCopy := n.Copy() + + e, ok := elements[id] + if ok { + // Set the stored node with the merge of previous and current node + metadataSet := nCopy.Metadata.SetField(s.MergeAggKey, e) + if !metadataSet { + return nil, fmt.Errorf("unable to set elements metadata for copied node %v", id) + } + } + + nodes = append(nodes, nCopy) + } + + return traversal.NewGraphTraversalV(tv.GraphTraversal, nodes), nil +} + +// mergeMetadata return the merge of node.Key elements with the ones already stored in nodeMerge +// Eg.: +// node: Metadata.key: {"a":{x}, "b":{y}} +// nodeMerge: {"a":[{z}]} +// return: Metadata.key: {"a":[{x},{z}], "b":[{y}]} +// +// Ignore if Metadata.key has an invalid format (not a map). +// Reflect is used to be able to access map's defined in different types. +// Element aggregation data type should be map[string]interface{} to be able to be encoded with JSON +func mergeMetadata(node *graph.Node, key string, nodeMerge map[string][]interface{}) map[string][]interface{} { + if nodeMerge == nil { + nodeMerge = map[string][]interface{}{} + } + + n1MergeIface, n1Err := node.GetField(key) + + if n1Err == nil { + // Ignore Metadata.key values which are not a map + n1MergeValue := reflect.ValueOf(n1MergeIface) + + // If the metadata value is a pointer, resolve it + if n1MergeValue.Kind() == reflect.Ptr { + n1MergeValue = n1MergeValue.Elem() + } + + // Merge step only accepts a map as data origin + if n1MergeValue.Kind() != reflect.Map { + logging.GetLogger().Errorf("Invalid type for elements, expecting a map, but it is %v", n1MergeValue.Kind()) + return nodeMerge + } + + iter := n1MergeValue.MapRange() + NODE_MERGE: + for iter.Next() { + k := fmt.Sprintf("%v", iter.Key().Interface()) + v := iter.Value().Interface() + + // Do not append if the same element already exists + for _, storedElement := range nodeMerge[k] { + if reflect.DeepEqual(storedElement, v) { + continue NODE_MERGE + } + } + + nodeMerge[k] = append(nodeMerge[k], v) + } + } + + return nodeMerge +} diff --git a/gremlin/traversal/merge_test.go b/gremlin/traversal/merge_test.go new file mode 100644 index 0000000000..cc8b25d2c8 --- /dev/null +++ b/gremlin/traversal/merge_test.go @@ -0,0 +1,610 @@ +package traversal + +import ( + "testing" + "time" + + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/graffiti/graph/traversal" + "github.com/stretchr/testify/assert" +) + +// FakeMergeGraphBackend simulate a backend with history that could store different revisions of nodes +type FakeMergeGraphBackend struct { + graph.MemoryBackend + Nodes []*graph.Node +} + +func (b *FakeMergeGraphBackend) IsHistorySupported() bool { + return true +} + +func (b *FakeMergeGraphBackend) GetNode(i graph.Identifier, at graph.Context) []*graph.Node { + nodes := []*graph.Node{} + for _, n := range b.Nodes { + if n.ID == i { + nodes = append(nodes, n) + } + } + return nodes +} + +func (b *FakeMergeGraphBackend) GetNodesFromIDs(identifierList []graph.Identifier, at graph.Context) []*graph.Node { + nodes := []*graph.Node{} + for _, n := range b.Nodes { + for _, i := range identifierList { + if n.ID == i { + nodes = append(nodes, n) + } + } + } + return nodes +} + +func TestMergeMetadataNilNodeMerge(t *testing.T) { + key := "Merge" + + metadataNode1 := graph.Metadata{key: map[string]interface{}{ + "abc": map[interface{}]string{"descr": "foo"}, + }} + node := CreateNode("nodeA", metadataNode1, graph.TimeUTC(), 1) + + nodeMergeAgg := mergeMetadata(node, key, nil) + + expected := map[string][]interface{}{ + "abc": { + map[interface{}]string{"descr": "foo"}, + }, + } + + assert.Equal(t, expected, nodeMergeAgg) +} + +func TestMergeMetadataPointerValue(t *testing.T) { + key := "Merge" + + value := map[string]interface{}{ + "abc": map[interface{}]string{"descr": "foo"}, + } + + metadataNode1 := graph.Metadata{key: &value} + node := CreateNode("nodeA", metadataNode1, graph.TimeUTC(), 1) + + nodeMergeAgg := mergeMetadata(node, key, nil) + + expected := map[string][]interface{}{ + "abc": { + map[interface{}]string{"descr": "foo"}, + }, + } + + assert.Equal(t, expected, nodeMergeAgg) +} + +func TestMergeMetadata(t *testing.T) { + tests := []struct { + name string + nodesMerge []interface{} + expected map[string][]interface{} + }{ + { + name: "no nodes", + expected: map[string][]interface{}{}, + }, + { + name: "one node", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + }, + }, + }, + { + name: "two nodes, different keys", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }, + map[string]interface{}{ + "xyz": map[string]string{"descr": "bar"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + }, + "xyz": { + map[string]string{"descr": "bar"}, + }, + }, + }, + { + name: "two nodes, same keys", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }, + map[string]interface{}{ + "abc": map[string]string{"descr": "bar"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + map[string]string{"descr": "bar"}, + }, + }, + }, + { + name: "two nodes, repeating one event, should be removed", + nodesMerge: []interface{}{ + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + }, + map[string]interface{}{ + "abc": map[string]string{"descr": "foo"}, + "xxx": map[string]string{"descr": "bar"}, + }}, + expected: map[string][]interface{}{ + "abc": { + map[string]string{"descr": "foo"}, + }, + "xxx": { + map[string]string{"descr": "bar"}, + }, + }, + }, + } + + key := "Merge" + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + nodeMergeAgg := map[string][]interface{}{} + + for _, nodeMerge := range test.nodesMerge { + metadataNode1 := graph.Metadata{key: nodeMerge} + node := CreateNode("nodeA", metadataNode1, graph.TimeUTC(), 1) + + nodeMergeAgg = mergeMetadata(node, key, nodeMergeAgg) + } + + assert.Equal(t, test.expected, nodeMergeAgg) + }) + } +} + +func TestInterfaceMerge(t *testing.T) { + tests := []struct { + name string + InNodes []*graph.Node + key string + aggKey string + startTime time.Time + endTime time.Time + // Expected nodes + OutNodes []*graph.Node + }{ + { + name: "no input nodes", + }, + { + // Node passes the step without being modified + name: "one input node without key defined", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{}, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one input node with key defined but empty", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one input node with key defined and one element", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one input node with a complex element", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{ + "e1": map[string]interface{}{ + "desc": "a", + "TTL": 45, + "Payload": []interface{}{ + map[string]interface{}{"Key": "foo"}, + map[string]interface{}{"Value": "bar"}, + }, + }, + }, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{ + "e1": map[string]interface{}{ + "desc": "a", + "TTL": 45, + "Payload": []interface{}{ + map[string]interface{}{"Key": "foo"}, + map[string]interface{}{"Value": "bar"}, + }, + }, + }, + "MergeAgg": map[string][]interface{}{ + "e1": { + map[string]interface{}{ + "desc": "a", + "TTL": 45, + "Payload": []interface{}{ + map[string]interface{}{"Key": "foo"}, + map[string]interface{}{"Value": "bar"}, + }, + }, + }, + }, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "two different input nodes with key defined and one element each one", + key: "Merge", + aggKey: "MergeAxx", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("B", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAxx": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("B", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAxx": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 1), + }, + }, + { + name: "one node, with a previous version, both without key defined", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{}, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{}, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, both with key defined but empty", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + "MergeAgg": map[string][]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, both with key defined, same event different content", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + map[string]interface{}{"desc": "b"}, + }}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, first one without event, second one with event", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "b"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "b"}, + }}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with a previous version, first one with event, second one without event", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(0, 0)), 2), + }, + }, + { + name: "one node, with two previous versions, first with, second without, third with", + key: "Merge", + aggKey: "MergeAgg", + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{}, + }, graph.Time(time.Unix(0, 0)), 2), + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "c"}}, + }, graph.Time(time.Unix(0, 0)), 3), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "c"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + map[string]interface{}{"desc": "c"}, + }}, + }, graph.Time(time.Unix(0, 0)), 3), + }, + }, + { + name: "memory backend does not filter nodes by date, startTime and endTime are ignored", + key: "Merge", + aggKey: "MergeAgg", + startTime: time.Unix(100, 0), + endTime: time.Unix(200, 0), + InNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(300, 0)), 1), + }, + OutNodes: []*graph.Node{ + CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + "MergeAgg": map[string][]interface{}{"e1": { + map[string]interface{}{"desc": "a"}, + }}, + }, graph.Time(time.Unix(300, 0)), 1), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + b := FakeMergeGraphBackend{ + Nodes: test.InNodes, + } + g := graph.NewGraph("testhost", &b, "analyzer.testhost") + + gt := traversal.NewGraphTraversal(g, false) + tvIn := traversal.NewGraphTraversalV(gt, test.InNodes) + + s := MergeGremlinTraversalStep{ + MergeKey: test.key, + MergeAggKey: test.aggKey, + StartTime: test.startTime, + EndTime: test.endTime, + } + ts, err := s.InterfaceMerge(tvIn) + if err != nil { + t.Error(err.Error()) + } + + tvOut, ok := ts.(*traversal.GraphTraversalV) + if !ok { + t.Errorf("Invalid GraphTraversal type") + } + + assert.ElementsMatch(t, test.OutNodes, tvOut.GetNodes()) + }) + } +} + +func TestInterfaceMergeDoNotModifyOriginNodes(t *testing.T) { + n := CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1) + + nCopy := CreateNode("A", graph.Metadata{ + "Merge": map[string]interface{}{"e1": map[string]interface{}{"desc": "a"}}, + }, graph.Time(time.Unix(0, 0)), 1) + + b := FakeMergeGraphBackend{ + Nodes: []*graph.Node{n}, + } + g := graph.NewGraph("testhost", &b, "analyzer.testhost") + + gt := traversal.NewGraphTraversal(g, false) + tvIn := traversal.NewGraphTraversalV(gt, []*graph.Node{n}) + + s := MergeGremlinTraversalStep{ + MergeKey: "Merge", + MergeAggKey: "AggMerge", + } + _, err := s.InterfaceMerge(tvIn) + assert.Nil(t, err) + + // Node stored in the graph should not be modified + assert.Equal(t, b.GetNode("A", graph.Context{})[0], nCopy) +} + +func TestEventsParseStep(t *testing.T) { + tests := []struct { + name string + token traversal.Token + traversalCtx traversal.GremlinTraversalContext + expectedTraversalStep traversal.GremlinTraversalStep + expectedError string + }{ + { + name: "non merge token", + token: traversal.COUNT, + }, + { + name: "nil traversalCtx", + token: traversalMergeToken, + expectedError: "Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)", + }, + { + name: "only one param", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"foo"}, + }, + expectedError: "Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)", + }, + { + name: "two param not string", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{1, 2}, + }, + expectedError: "Merge first parameter have to be a string", + }, + { + name: "two string params", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey"}, + }, + expectedTraversalStep: &MergeGremlinTraversalStep{ + GremlinTraversalContext: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey"}, + }, + MergeKey: "key", + MergeAggKey: "aggKey", + }, + }, + { + name: "four valid params", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey", int64(1627987976), int64(1627987977)}, + }, + expectedTraversalStep: &MergeGremlinTraversalStep{ + GremlinTraversalContext: traversal.GremlinTraversalContext{ + Params: []interface{}{"key", "aggKey", int64(1627987976), int64(1627987977)}, + }, + MergeKey: "key", + MergeAggKey: "aggKey", + StartTime: time.Unix(1627987976, 0), + EndTime: time.Unix(1627987977, 0), + }, + }, + { + name: "invalid start date", + token: traversalMergeToken, + traversalCtx: traversal.GremlinTraversalContext{ + Params: []interface{}{"foo", "bar", "123456789", "123456789"}, + }, + expectedError: "Merge third parameter have to be a int (unix epoch time)", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + e := MergeTraversalExtension{MergeToken: traversalMergeToken} + + traversalStep, err := e.ParseStep(test.token, test.traversalCtx) + if test.expectedError != "" { + assert.EqualErrorf(t, err, test.expectedError, "error") + } else { + assert.Nil(t, err, "nil error") + } + + assert.Equalf(t, test.expectedTraversalStep, traversalStep, "step") + }) + } +} + +// CreateNode func to create nodes with a specific node revision +func CreateNode(id string, m graph.Metadata, t graph.Time, revision int64) *graph.Node { + n := graph.CreateNode(graph.Identifier(id), m, t, "host", "orig") + n.Revision = revision + return n +} diff --git a/gremlin/traversal/neighbors.go b/gremlin/traversal/neighbors.go index 305048d670..ec3eee009c 100644 --- a/gremlin/traversal/neighbors.go +++ b/gremlin/traversal/neighbors.go @@ -179,6 +179,10 @@ func (d *NeighborsGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) // Reduce Neighbors step func (d *NeighborsGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) { + // Merge step only needs the ids of nodes. Saving some queries. + if _, ok := next.(*MergeGremlinTraversalStep); ok { + d.nextStepOnlyIDs = true + } return next, nil } diff --git a/gremlin/traversal/token.go b/gremlin/traversal/token.go index bce0a9c46d..3995efd185 100644 --- a/gremlin/traversal/token.go +++ b/gremlin/traversal/token.go @@ -35,4 +35,5 @@ const ( traversalMoreThanToken traversal.Token = 1013 traversalAscendantsToken traversal.Token = 1014 traversalNeighborsToken traversal.Token = 1015 + traversalMergeToken traversal.Token = 1016 ) diff --git a/validator/validator.go b/validator/validator.go index 8d00717d2d..5d041794d3 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -180,6 +180,7 @@ func isGremlinExpr(v interface{}, param string) error { tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension()) tr.AddTraversalExtension(ge.NewNextHopTraversalExtension()) tr.AddTraversalExtension(ge.NewGroupTraversalExtension()) + tr.AddTraversalExtension(ge.NewMergeTraversalExtension()) if _, err := tr.Parse(strings.NewReader(query)); err != nil { return GremlinNotValid(err)