Skip to content

Commit eee82a9

Browse files
committed
Merge step
Aggregates elements from different revisions of the nodes into a new metadata key. Given a metadata element, that should be a map[string]interface{}, aggregate different values into another metadata key with format map[string][]interface{} Eg.: Metadata.data V1: {"a":{x}, "b":{y}} Metadata.data V2: {"a":{z}, "b":{y}} Metadata.agg: {"a":[{x},{z}], "b":[{y}]} It's purpose its to show data from past revisions of the same node. Example: G.At(1479899809,3600).V().Merge('data','agg') It could be also called defining the time slice in the parameters (since, from): G.V().Merge('A','B',1500000000,1500099999) This step return a modified copy of the last node, with all the aggregated data, not the node stored in the graph. This is to avoid modiying the node stored in the graph. This PR also modifies the Reduce method of the Neighbors step. Merge step only needs the node IDs, so Neighbors step could skip retrieving the full content of nodes.
1 parent 446f6a1 commit eee82a9

File tree

7 files changed

+876
-1
lines changed

7 files changed

+876
-1
lines changed

analyzer/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ func NewServerFromConfig() (*Server, error) {
336336
tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension())
337337
tr.AddTraversalExtension(ge.NewNextHopTraversalExtension())
338338
tr.AddTraversalExtension(ge.NewGroupTraversalExtension())
339+
tr.AddTraversalExtension(ge.NewMergeTraversalExtension())
339340

340341
// new flow subscriber endpoints
341342
flowSubscriberWSServer := ws.NewStructServer(config.NewWSServer(hub.HTTPServer(), "/ws/subscriber/flow", apiAuthBackend))

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ require (
7777
github.com/spf13/cobra v1.1.1
7878
github.com/spf13/pflag v1.0.5
7979
github.com/spf13/viper v1.7.0
80-
github.com/stretchr/testify v1.6.1 // indirect
80+
github.com/stretchr/testify v1.7.0
8181
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c
8282
github.com/tebeka/go2xunit v1.4.10
8383
github.com/tebeka/selenium v0.0.0-20170314201507-657e45ec600f

gremlin/traversal/merge.go

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
package traversal
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"time"
7+
8+
"github.com/pkg/errors"
9+
"github.com/skydive-project/skydive/graffiti/graph"
10+
"github.com/skydive-project/skydive/graffiti/graph/traversal"
11+
"github.com/skydive-project/skydive/graffiti/logging"
12+
)
13+
14+
// MergeTraversalExtension describes a new extension to enhance the topology
15+
type MergeTraversalExtension struct {
16+
MergeToken traversal.Token
17+
}
18+
19+
// MergeGremlinTraversalStep step aggregates elements from different revisions of the nodes into a new metadata key.
20+
// Nodes returned by this step are copies of the nodes in the graph, not the actual nodes.
21+
// The reason is because this step is not meant to modify nodes in the graph, just for the output.
22+
// This step should be used with a presistant backend, so it can access previous revisions of the nodes.
23+
// To use this step we should select a metadata key (first parameter), where the elements will be read from.
24+
// Inside this Metadata.Key elements should have the format map[interface{}]interface{} (could be a type based on that).
25+
// The second parameter is the metadata key where all the elements will be aggregated.
26+
// The aggregation will with the format: map[string][]interface{}.
27+
// All elements with the same key in the map will be joined in an slice.
28+
// To use this step we can use a graph with a time period context, eg: G.At(1479899809,3600).V().Merge('A','B').
29+
// Or we can define the time period in the step: G.V().Merge('A','B',1500000000,1500099999).
30+
// Note that in this case we define the start and end time, while in "At" is start time and duration.
31+
// In both cases, Merge step will use the nodes given by the previous step.
32+
type MergeGremlinTraversalStep struct {
33+
traversal.GremlinTraversalContext
34+
MergeKey string
35+
MergeAggKey string
36+
StartTime time.Time
37+
EndTime time.Time
38+
}
39+
40+
// NewMergeTraversalExtension returns a new graph traversal extension
41+
func NewMergeTraversalExtension() *MergeTraversalExtension {
42+
return &MergeTraversalExtension{
43+
MergeToken: traversalMergeToken,
44+
}
45+
}
46+
47+
// ScanIdent recognise the word associated with this step (in uppercase) and return a token
48+
// which represents it. Return true if it have found a match
49+
func (e *MergeTraversalExtension) ScanIdent(s string) (traversal.Token, bool) {
50+
switch s {
51+
case "MERGE":
52+
return e.MergeToken, true
53+
}
54+
return traversal.IDENT, false
55+
}
56+
57+
// ParseStep generate a step for a given token, having in 'p' context and params
58+
func (e *MergeTraversalExtension) ParseStep(t traversal.Token, p traversal.GremlinTraversalContext) (traversal.GremlinTraversalStep, error) {
59+
switch t {
60+
case e.MergeToken:
61+
default:
62+
return nil, nil
63+
}
64+
65+
var mergeKey, mergeAggKey string
66+
var startTime, endTime time.Time
67+
var ok bool
68+
69+
switch len(p.Params) {
70+
case 2:
71+
mergeKey, ok = p.Params[0].(string)
72+
if !ok {
73+
return nil, errors.New("Merge first parameter have to be a string")
74+
}
75+
mergeAggKey, ok = p.Params[1].(string)
76+
if !ok {
77+
return nil, errors.New("Merge second parameter have to be a string")
78+
}
79+
case 4:
80+
mergeKey, ok = p.Params[0].(string)
81+
if !ok {
82+
return nil, errors.New("Merge first parameter have to be a string")
83+
}
84+
mergeAggKey, ok = p.Params[1].(string)
85+
if !ok {
86+
return nil, errors.New("Merge second parameter have to be a string")
87+
}
88+
startTimeUnixEpoch, ok := p.Params[2].(int64)
89+
if !ok {
90+
return nil, errors.New("Merge third parameter have to be a int (unix epoch time)")
91+
}
92+
startTime = time.Unix(startTimeUnixEpoch, 0)
93+
endTimeUnixEpoch, ok := p.Params[3].(int64)
94+
if !ok {
95+
return nil, errors.New("Merge fourth parameter have to be a int (unix epoch time)")
96+
}
97+
endTime = time.Unix(endTimeUnixEpoch, 0)
98+
default:
99+
return nil, errors.New("Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)")
100+
}
101+
102+
return &MergeGremlinTraversalStep{
103+
GremlinTraversalContext: p,
104+
MergeKey: mergeKey,
105+
MergeAggKey: mergeAggKey,
106+
StartTime: startTime,
107+
EndTime: endTime,
108+
}, nil
109+
}
110+
111+
// Exec executes the merge step
112+
func (s *MergeGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) {
113+
switch tv := last.(type) {
114+
case *traversal.GraphTraversalV:
115+
return s.InterfaceMerge(tv)
116+
117+
}
118+
return nil, traversal.ErrExecutionError
119+
}
120+
121+
// Reduce merge step
122+
func (s *MergeGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) {
123+
return next, nil
124+
}
125+
126+
// Context merge step
127+
func (s *MergeGremlinTraversalStep) Context() *traversal.GremlinTraversalContext {
128+
return &s.GremlinTraversalContext
129+
}
130+
131+
// InterfaceMerge for each node id, group all the elements stored in Metadata.key from the
132+
// input nodes and put them into the newest node for each id into Metadata.aggKey.
133+
// Merge are groupped based on its key. See mergedMetadata for an example.
134+
// All output nodes will have Metadata.aggKey defined (empty or not).
135+
func (s *MergeGremlinTraversalStep) InterfaceMerge(tv *traversal.GraphTraversalV) (traversal.GraphTraversalStep, error) {
136+
// If user has defined start/end time in the parameters, use that values instead of the ones comming with the graph
137+
if !s.StartTime.IsZero() && !s.EndTime.IsZero() {
138+
timeSlice := graph.NewTimeSlice(
139+
graph.Time(s.StartTime).UnixMilli(),
140+
graph.Time(s.EndTime).UnixMilli(),
141+
)
142+
userTimeSliceCtx := graph.Context{
143+
TimeSlice: timeSlice,
144+
TimePoint: true,
145+
}
146+
147+
newGraph, err := tv.GraphTraversal.Graph.CloneWithContext(userTimeSliceCtx)
148+
if err != nil {
149+
return nil, err
150+
}
151+
tv.GraphTraversal.Graph = newGraph
152+
}
153+
154+
tv.GraphTraversal.RLock()
155+
defer tv.GraphTraversal.RUnlock()
156+
157+
// uniqNodes store the latest node for each node identifier
158+
uniqNodes := map[graph.Identifier]*graph.Node{}
159+
160+
// elements accumulate the elements for each node id
161+
elements := map[graph.Identifier]map[string][]interface{}{}
162+
163+
// Get the list of node ids
164+
nodesIDs := make([]graph.Identifier, 0, len(tv.GetNodes()))
165+
for _, node := range tv.GetNodes() {
166+
nodesIDs = append(nodesIDs, node.ID)
167+
}
168+
169+
// Get all revision for the list of node ids
170+
revisionNodes := tv.GraphTraversal.Graph.GetNodesFromIDs(nodesIDs)
171+
172+
// Store only the most recent nodes
173+
for _, rNode := range revisionNodes {
174+
storedNode, ok := uniqNodes[rNode.ID]
175+
if !ok {
176+
uniqNodes[rNode.ID] = rNode
177+
} else {
178+
if storedNode.Revision < rNode.Revision {
179+
uniqNodes[rNode.ID] = rNode
180+
}
181+
}
182+
183+
// Store elements from all revisions into the "elements" variable
184+
elements[rNode.ID] = mergeMetadata(rNode, s.MergeKey, elements[rNode.ID])
185+
}
186+
187+
// Move the nodes from the uniqNodes map to an slice required by TraversalV
188+
// Return a copy of the nodes, not the actual graph nodes, because we don't want
189+
// to modify nodes with this step, just append some extra info
190+
nodes := []*graph.Node{}
191+
for id, n := range uniqNodes {
192+
nCopy := n.Copy()
193+
194+
e, ok := elements[id]
195+
if ok {
196+
// Set the stored node with the merge of previous and current node
197+
metadataSet := nCopy.Metadata.SetField(s.MergeAggKey, e)
198+
if !metadataSet {
199+
return nil, fmt.Errorf("unable to set elements metadata for copied node %v", id)
200+
}
201+
}
202+
203+
nodes = append(nodes, nCopy)
204+
}
205+
206+
return traversal.NewGraphTraversalV(tv.GraphTraversal, nodes), nil
207+
}
208+
209+
// mergeMetadata return the merge of node.Key elements with the ones already stored in nodeMerge
210+
// Eg.:
211+
// node: Metadata.key: {"a":{x}, "b":{y}}
212+
// nodeMerge: {"a":[{z}]}
213+
// return: Metadata.key: {"a":[{x},{z}], "b":[{y}]}
214+
//
215+
// Ignore if Metadata.key has an invalid format (not a map).
216+
// Reflect is used to be able to access map's defined in different types.
217+
// Element aggregation data type should be map[string]interface{} to be able to be encoded with JSON
218+
func mergeMetadata(node *graph.Node, key string, nodeMerge map[string][]interface{}) map[string][]interface{} {
219+
if nodeMerge == nil {
220+
nodeMerge = map[string][]interface{}{}
221+
}
222+
223+
n1MergeIface, n1Err := node.GetField(key)
224+
225+
if n1Err == nil {
226+
// Ignore Metadata.key values which are not a map
227+
n1MergeValue := reflect.ValueOf(n1MergeIface)
228+
229+
// If the metadata value is a pointer, resolve it
230+
if n1MergeValue.Kind() == reflect.Ptr {
231+
n1MergeValue = n1MergeValue.Elem()
232+
}
233+
234+
// Merge step only accepts a map as data origin
235+
if n1MergeValue.Kind() != reflect.Map {
236+
logging.GetLogger().Errorf("Invalid type for elements, expecting a map, but it is %v", n1MergeValue.Kind())
237+
return nodeMerge
238+
}
239+
240+
iter := n1MergeValue.MapRange()
241+
NODE_MERGE:
242+
for iter.Next() {
243+
k := fmt.Sprintf("%v", iter.Key().Interface())
244+
v := iter.Value().Interface()
245+
246+
// Do not append if the same element already exists
247+
for _, storedElement := range nodeMerge[k] {
248+
if reflect.DeepEqual(storedElement, v) {
249+
continue NODE_MERGE
250+
}
251+
}
252+
253+
nodeMerge[k] = append(nodeMerge[k], v)
254+
}
255+
}
256+
257+
return nodeMerge
258+
}

0 commit comments

Comments
 (0)