diff --git a/flow/ondemand/client/client.go b/flow/ondemand/client/client.go index 57f42f103f..0810a8b40d 100644 --- a/flow/ondemand/client/client.go +++ b/flow/ondemand/client/client.go @@ -113,6 +113,6 @@ func NewOnDemandFlowProbeClient(g *graph.Graph, ch rest.WatchableHandler, agentP nodeTypes[i] = nodeType i++ } - nodeTypeQuery := new(gremlin.QueryString).Has("Host", gremlin.Ne(""), "Type", gremlin.Within(nodeTypes...)).String() + nodeTypeQuery := new(gremlin.QueryString).Has("@Host", gremlin.Ne(""), "Type", gremlin.Within(nodeTypes...)).String() return client.NewOnDemandClient(g, ch, agentPool, subscriberPool, etcdClient, &onDemandFlowHandler{graph: g, nodeTypeQuery: nodeTypeQuery}) } diff --git a/flow/storage/elasticsearch/elasticsearch.go b/flow/storage/elasticsearch/elasticsearch.go index 327774fe43..81dfbdd1ff 100644 --- a/flow/storage/elasticsearch/elasticsearch.go +++ b/flow/storage/elasticsearch/elasticsearch.go @@ -176,6 +176,10 @@ type rawpacketRecord struct { Flow *embeddedFlow `json:"Flow"` } +func normalizeKey(key string) string { + return "Flow." + key +} + // StoreFlows push a set of flows in the database func (c *Storage) StoreFlows(flows []*flow.Flow) error { if !c.client.Started() { @@ -241,10 +245,10 @@ func (c *Storage) SearchRawPackets(fsq filters.SearchQuery, packetFilter *filter } // do not escape flow as ES use sub object in that case - mustQueries := []elastic.Query{es.FormatFilter(fsq.Filter, "Flow")} + mustQueries := []elastic.Query{es.FormatFilter(fsq.Filter, normalizeKey)} if packetFilter != nil { - mustQueries = append(mustQueries, es.FormatFilter(packetFilter, "")) + mustQueries = append(mustQueries, es.FormatFilter(packetFilter, nil)) } out, err := c.sendRequest(elastic.NewBoolQuery().Must(mustQueries...), fsq, rawpacketIndex.IndexWildcard(c.indexPrefix)) @@ -276,8 +280,8 @@ func (c *Storage) SearchMetrics(fsq filters.SearchQuery, metricFilter *filters.F } // do not escape flow as ES use sub object in that case - flowQuery := es.FormatFilter(fsq.Filter, "Flow") - metricQuery := es.FormatFilter(metricFilter, "") + flowQuery := es.FormatFilter(fsq.Filter, normalizeKey) + metricQuery := es.FormatFilter(metricFilter, nil) query := elastic.NewBoolQuery().Must(flowQuery, metricQuery) out, err := c.sendRequest(query, fsq, metricIndex.IndexWildcard(c.indexPrefix)) @@ -311,7 +315,7 @@ func (c *Storage) SearchFlows(fsq filters.SearchQuery) (*flow.FlowSet, error) { } // TODO: dedup and sort in order to remove duplicate flow UUID due to rolling index - out, err := c.sendRequest(es.FormatFilter(fsq.Filter, ""), fsq, flowIndex.IndexWildcard(c.indexPrefix)) + out, err := c.sendRequest(es.FormatFilter(fsq.Filter, nil), fsq, flowIndex.IndexWildcard(c.indexPrefix)) if err != nil { return nil, err } diff --git a/graffiti/graph/cachedbackend.go b/graffiti/graph/cachedbackend.go index 28f1786c71..72d109b833 100644 --- a/graffiti/graph/cachedbackend.go +++ b/graffiti/graph/cachedbackend.go @@ -183,24 +183,24 @@ func (c *CachedBackend) MetadataUpdated(i interface{}) error { } // GetNodes returns a list of nodes with a time slice, matching metadata -func (c *CachedBackend) GetNodes(t Context, m ElementMatcher, e ElementMatcher) []*Node { +func (c *CachedBackend) GetNodes(t Context, m ElementMatcher) []*Node { mode := c.cacheMode.Load() if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil { - return c.memory.GetNodes(t, m, e) + return c.memory.GetNodes(t, m) } - return c.persistent.GetNodes(t, m, e) + return c.persistent.GetNodes(t, m) } // GetEdges returns a list of edges with a time slice, matching metadata -func (c *CachedBackend) GetEdges(t Context, m ElementMatcher, e ElementMatcher) []*Edge { +func (c *CachedBackend) GetEdges(t Context, m ElementMatcher) []*Edge { mode := c.cacheMode.Load() if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil { - return c.memory.GetEdges(t, m, e) + return c.memory.GetEdges(t, m) } - return c.persistent.GetEdges(t, m, e) + return c.persistent.GetEdges(t, m) } // IsHistorySupported returns whether the persistent backend supports history diff --git a/graffiti/graph/elasticsearch.go b/graffiti/graph/elasticsearch.go index 27eebfa7bd..3fe281b428 100644 --- a/graffiti/graph/elasticsearch.go +++ b/graffiti/graph/elasticsearch.go @@ -103,9 +103,8 @@ type ElasticSearchBackend struct { // TimedSearchQuery describes a search query within a time slice and metadata filters type TimedSearchQuery struct { filters.SearchQuery - TimeFilter *filters.Filter - MetadataFilter *filters.Filter - ElementFilter *filters.Filter + TimeFilter *filters.Filter + ElementFilter *filters.Filter } // easyjson:json @@ -124,6 +123,13 @@ type rawData struct { Child string `json:"Child,omitempty"` } +func normalizeKey(key string) string { + if key[0] == '@' { + return key[1:] + } + return MetadataPrefix + key +} + func graphElementToRaw(typ string, e *graphElement) (*rawData, error) { data, err := json.Marshal(e.Metadata) if err != nil { @@ -331,22 +337,18 @@ func (b *ElasticSearchBackend) MetadataUpdated(i interface{}) error { // Query the database for a "node" or "edge" func (b *ElasticSearchBackend) Query(typ string, tsq *TimedSearchQuery) (sr *elastic.SearchResult, _ error) { fltrs := []elastic.Query{ - es.FormatFilter(filters.NewTermStringFilter("_Type", typ), ""), + es.FormatFilter(filters.NewTermStringFilter("_Type", typ), nil), } - if tf := es.FormatFilter(tsq.TimeFilter, ""); tf != nil { + if tf := es.FormatFilter(tsq.TimeFilter, nil); tf != nil { fltrs = append(fltrs, tf) } - if f := es.FormatFilter(tsq.Filter, ""); f != nil { + if f := es.FormatFilter(tsq.Filter, nil); f != nil { fltrs = append(fltrs, f) } - if ef := es.FormatFilter(tsq.ElementFilter, ""); ef != nil { - fltrs = append(fltrs, ef) - } - - if mf := es.FormatFilter(tsq.MetadataFilter, "Metadata"); mf != nil { + if mf := es.FormatFilter(tsq.ElementFilter, normalizeKey); mf != nil { fltrs = append(fltrs, mf) } @@ -400,23 +402,14 @@ func (b *ElasticSearchBackend) searchEdges(tsq *TimedSearchQuery) (edges []*Edge } // GetEdges returns a list of edges within time slice, matching metadata -func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher, e ElementMatcher) []*Edge { - var metadataFilter *filters.Filter +func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher) []*Edge { + var filter *filters.Filter if m != nil { f, err := m.Filter() if err != nil { return []*Edge{} } - metadataFilter = f - } - - var elementFilter *filters.Filter - if e != nil { - f, err := e.Filter() - if err != nil { - return []*Edge{} - } - elementFilter = f + filter = f } var searchQuery filters.SearchQuery @@ -425,10 +418,9 @@ func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher, e ElementMa } edges := b.searchEdges(&TimedSearchQuery{ - SearchQuery: searchQuery, - TimeFilter: getTimeFilter(t.TimeSlice), - MetadataFilter: metadataFilter, - ElementFilter: elementFilter, + SearchQuery: searchQuery, + TimeFilter: getTimeFilter(t.TimeSlice), + ElementFilter: filter, }) if t.TimePoint { @@ -439,7 +431,7 @@ func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher, e ElementMa } // GetNodes returns a list of nodes within time slice, matching metadata -func (b *ElasticSearchBackend) GetNodes(t Context, m ElementMatcher, e ElementMatcher) []*Node { +func (b *ElasticSearchBackend) GetNodes(t Context, m ElementMatcher) []*Node { var filter *filters.Filter if m != nil { f, err := m.Filter() @@ -449,25 +441,15 @@ func (b *ElasticSearchBackend) GetNodes(t Context, m ElementMatcher, e ElementMa filter = f } - var elementFilter *filters.Filter - if e != nil { - f, err := e.Filter() - if err != nil { - return []*Node{} - } - elementFilter = f - } - var searchQuery filters.SearchQuery if !t.TimePoint { searchQuery = filters.SearchQuery{Sort: true, SortBy: "UpdatedAt"} } nodes := b.searchNodes(&TimedSearchQuery{ - SearchQuery: searchQuery, - TimeFilter: getTimeFilter(t.TimeSlice), - MetadataFilter: filter, - ElementFilter: elementFilter, + SearchQuery: searchQuery, + TimeFilter: getTimeFilter(t.TimeSlice), + ElementFilter: filter, }) if len(nodes) > 1 && t.TimePoint { @@ -512,9 +494,9 @@ func (b *ElasticSearchBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher searchQuery.Filter = NewFilterForEdge(n.ID, n.ID) edges = b.searchEdges(&TimedSearchQuery{ - SearchQuery: searchQuery, - TimeFilter: getTimeFilter(t.TimeSlice), - MetadataFilter: filter, + SearchQuery: searchQuery, + TimeFilter: getTimeFilter(t.TimeSlice), + ElementFilter: filter, }) if len(edges) > 1 && t.TimePoint { @@ -550,9 +532,9 @@ func (b *ElasticSearchBackend) FlushElements(m ElementMatcher) error { andFilter := filters.NewAndFilter( filter, - filters.NewNullFilter("DeletedAt"), + filters.NewNullFilter("@DeletedAt"), ) - query := es.FormatFilter(andFilter, "") + query := es.FormatFilter(andFilter, normalizeKey) script := elastic.NewScript("ctx._source.DeletedAt = params.now; ctx._source.ArchivedAt = params.now;") script.Lang("painless") @@ -566,7 +548,7 @@ func (b *ElasticSearchBackend) FlushElements(m ElementMatcher) error { // Sync adds all the nodes and edges with the specified filter into an other graph func (b *ElasticSearchBackend) Sync(g *Graph, elementFilter *ElementFilter) error { // re-insert valid nodes and edges - for _, node := range b.GetNodes(Context{}, nil, elementFilter) { + for _, node := range b.GetNodes(Context{}, elementFilter) { g.NodeAdded(node) raw, err := nodeToRaw(node) @@ -577,7 +559,7 @@ func (b *ElasticSearchBackend) Sync(g *Graph, elementFilter *ElementFilter) erro b.prevRevision[node.ID] = raw } - for _, edge := range b.GetEdges(Context{}, nil, elementFilter) { + for _, edge := range b.GetEdges(Context{}, elementFilter) { g.EdgeAdded(edge) raw, err := edgeToRaw(edge) diff --git a/graffiti/graph/graph.go b/graffiti/graph/graph.go index fc80fca842..c263fc9b95 100644 --- a/graffiti/graph/graph.go +++ b/graffiti/graph/graph.go @@ -46,6 +46,11 @@ const ( EdgeDeleted ) +const ( + // MetadataPrefix used to access user metadata + MetadataPrefix = "Metadata." +) + // Identifier graph ID type Identifier string @@ -115,8 +120,8 @@ type Backend interface { MetadataUpdated(e interface{}) error - GetNodes(t Context, m ElementMatcher, e ElementMatcher) []*Node - GetEdges(t Context, m ElementMatcher, e ElementMatcher) []*Edge + GetNodes(t Context, m ElementMatcher) []*Node + GetEdges(t Context, m ElementMatcher) []*Edge IsHistorySupported() bool } @@ -338,47 +343,47 @@ func GenID(s ...string) Identifier { } func (e *graphElement) GetFieldBool(field string) (_ bool, err error) { - return e.Metadata.GetFieldBool(field) + return e.Metadata.GetFieldBool(strings.TrimPrefix(field, MetadataPrefix)) } func (e *graphElement) GetFieldInt64(field string) (_ int64, err error) { switch field { - case "CreatedAt": + case "@CreatedAt": return e.CreatedAt.UnixMilli(), nil - case "UpdatedAt": + case "@UpdatedAt": return e.UpdatedAt.UnixMilli(), nil - case "DeletedAt": + case "@DeletedAt": return e.DeletedAt.UnixMilli(), nil - case "Revision": + case "@Revision": return e.Revision, nil default: - return e.Metadata.GetFieldInt64(field) + return e.Metadata.GetFieldInt64(strings.TrimPrefix(field, MetadataPrefix)) } } func (e *graphElement) GetFieldString(field string) (_ string, err error) { switch field { - case "ID": + case "@ID": return string(e.ID), nil - case "Host": + case "@Host": return e.Host, nil - case "Origin": + case "@Origin": return e.Origin, nil default: - return e.Metadata.GetFieldString(field) + return e.Metadata.GetFieldString(strings.TrimPrefix(field, MetadataPrefix)) } } -func (e *graphElement) GetField(name string) (interface{}, error) { - if i, err := e.GetFieldInt64(name); err == nil { +func (e *graphElement) GetField(field string) (interface{}, error) { + if i, err := e.GetFieldInt64(field); err == nil { return i, nil } - if s, err := e.GetFieldString(name); err == nil { + if s, err := e.GetFieldString(field); err == nil { return s, nil } - return e.Metadata.GetField(name) + return e.Metadata.GetField(strings.TrimPrefix(field, MetadataPrefix)) } func (e *graphElement) GetFields(names []string) (interface{}, error) { @@ -397,7 +402,7 @@ func (e *graphElement) GetFields(names []string) (interface{}, error) { return values, nil } -var graphElementKeys = map[string]bool{"ID": false, "Host": false, "Origin": false, "CreatedAt": false, "UpdatedAt": false, "DeletedAt": false, "Revision": false} +var graphElementKeys = map[string]bool{"@ID": false, "@Host": false, "@Origin": false, "@CreatedAt": false, "@UpdatedAt": false, "@DeletedAt": false, "@Revision": false} func (e *graphElement) GetFieldKeys() []string { keys := make([]string, len(graphElementKeys)) @@ -418,6 +423,9 @@ func (e *graphElement) MatchBool(field string, predicate getter.BoolPredicate) b } } } + + field = strings.TrimPrefix(field, MetadataPrefix) + return e.Metadata.MatchBool(field, predicate) } @@ -428,6 +436,8 @@ func (e *graphElement) MatchInt64(field string, predicate getter.Int64Predicate) } } + field = strings.TrimPrefix(field, MetadataPrefix) + if index := strings.Index(field, "."); index != -1 { first := field[index+1:] if v, found := e.Metadata[first]; found { @@ -447,6 +457,8 @@ func (e *graphElement) MatchString(field string, predicate getter.StringPredicat } } + field = strings.TrimPrefix(field, MetadataPrefix) + if index := strings.Index(field, "."); index != -1 { first := field[index+1:] if v, found := e.Metadata[first]; found { @@ -956,7 +968,7 @@ func getNodeMinDistance(nodesMap map[Identifier]*Node, distance map[Identifier]u // GetNodesMap returns a map of nodes within a time slice func (g *Graph) GetNodesMap(t Context) map[Identifier]*Node { - nodes := g.backend.GetNodes(t, nil, nil) + nodes := g.backend.GetNodes(t, nil) nodesMap := make(map[Identifier]*Node, len(nodes)) for _, n := range nodes { nodesMap[n.ID] = n @@ -1335,12 +1347,12 @@ func (g *Graph) DelNodes(m ElementMatcher) error { // GetNodes returns a list of nodes func (g *Graph) GetNodes(m ElementMatcher) []*Node { - return g.backend.GetNodes(g.context, m, nil) + return g.backend.GetNodes(g.context, m) } // GetEdges returns a list of edges func (g *Graph) GetEdges(m ElementMatcher) []*Edge { - return g.backend.GetEdges(g.context, m, nil) + return g.backend.GetEdges(g.context, m) } // GetEdgeNodes returns a list of nodes of an edge diff --git a/graffiti/graph/memory.go b/graffiti/graph/memory.go index 4d29b407fe..66e8f16204 100644 --- a/graffiti/graph/memory.go +++ b/graffiti/graph/memory.go @@ -184,9 +184,9 @@ func (m *MemoryBackend) NodeDeleted(n *Node) error { } // GetNodes from the graph backend -func (m MemoryBackend) GetNodes(t Context, metadata ElementMatcher, element ElementMatcher) (nodes []*Node) { +func (m MemoryBackend) GetNodes(t Context, metadata ElementMatcher) (nodes []*Node) { for _, n := range m.nodes { - if n.MatchMetadata(metadata) && n.MatchMetadata(element) { + if n.MatchMetadata(metadata) { nodes = append(nodes, n.Node) } } @@ -194,9 +194,9 @@ func (m MemoryBackend) GetNodes(t Context, metadata ElementMatcher, element Elem } // GetEdges from the graph backend -func (m MemoryBackend) GetEdges(t Context, metadata ElementMatcher, element ElementMatcher) (edges []*Edge) { +func (m MemoryBackend) GetEdges(t Context, metadata ElementMatcher) (edges []*Edge) { for _, e := range m.edges { - if e.MatchMetadata(metadata) && e.MatchMetadata(element) { + if e.MatchMetadata(metadata) { edges = append(edges, e.Edge) } } diff --git a/graffiti/graph/orientdb.go b/graffiti/graph/orientdb.go index f118a0e1a6..13bf00ea41 100644 --- a/graffiti/graph/orientdb.go +++ b/graffiti/graph/orientdb.go @@ -64,21 +64,7 @@ func metadataToOrientDBSetString(m Metadata) string { return "" } -func elementToOrientDBSelectString(e ElementMatcher) string { - if e == nil { - return "" - } - - var filter *filters.Filter - filter, err := e.Filter() - if err != nil { - return "" - } - - return orientdb.FilterToExpression(filter, func(k string) string { return k }) -} - -func metadataToOrientDBSelectString(m ElementMatcher) string { +func matcherToOrientDBSelectString(m ElementMatcher) string { if m == nil { return "" } @@ -89,13 +75,20 @@ func metadataToOrientDBSelectString(m ElementMatcher) string { return "" } - return orientdb.FilterToExpression(filter, func(k string) string { - key := "Metadata" - for _, s := range strings.Split(k, ".") { + normalizeKey := func(key string) string { + if key[0] == '@' { + return key[1:] + } + els := strings.Split(key, ".") + key, els = MetadataPrefix+els[0], els[1:] + + for _, s := range els { key += "['" + s + "']" } return key - }) + } + + return orientdb.FilterToExpression(filter, normalizeKey) } func (o *OrientDBBackend) updateTimes(e string, id string, events ...eventTime) error { @@ -233,8 +226,8 @@ func (o *OrientDBBackend) GetNode(i Identifier, t Context) (nodes []*Node) { func (o *OrientDBBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edges []*Edge) { query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil) query += fmt.Sprintf(" AND (Parent = '%s' OR Child = '%s')", n.ID, n.ID) - if metadataQuery := metadataToOrientDBSelectString(m); metadataQuery != "" { - query += " AND " + metadataQuery + if matcherQuery := matcherToOrientDBSelectString(m); matcherQuery != "" { + query += " AND " + matcherQuery } return o.searchEdges(t, query) } @@ -310,27 +303,21 @@ func (o *OrientDBBackend) MetadataUpdated(i interface{}) error { return err } -// GetNodes returns a list of nodes within time slice, matching metadata -func (o *OrientDBBackend) GetNodes(t Context, m ElementMatcher, e ElementMatcher) (nodes []*Node) { +// GetNodes returns a list of nodes within time slice +func (o *OrientDBBackend) GetNodes(t Context, m ElementMatcher) (nodes []*Node) { query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil) - if elementQuery := elementToOrientDBSelectString(e); elementQuery != "" { - query += " AND " + elementQuery - } - if metadataQuery := metadataToOrientDBSelectString(m); metadataQuery != "" { - query += " AND " + metadataQuery + if matcherQuery := matcherToOrientDBSelectString(m); matcherQuery != "" { + query += " AND " + matcherQuery } return o.searchNodes(t, query) } -// GetEdges returns a list of edges within time slice, matching metadata -func (o *OrientDBBackend) GetEdges(t Context, m ElementMatcher, e ElementMatcher) (edges []*Edge) { +// GetEdges returns a list of edges within time slice +func (o *OrientDBBackend) GetEdges(t Context, m ElementMatcher) (edges []*Edge) { query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil) - if elementQuery := elementToOrientDBSelectString(e); elementQuery != "" { - query += " AND " + elementQuery - } - if metadataQuery := metadataToOrientDBSelectString(m); metadataQuery != "" { - query += " AND " + metadataQuery + if matcherQuery := matcherToOrientDBSelectString(m); matcherQuery != "" { + query += " AND " + matcherQuery } return o.searchEdges(t, query) @@ -356,13 +343,13 @@ func (o *OrientDBBackend) FlushElements(e ElementMatcher) error { now := TimeUTC().UnixMilli() - elementQuery := elementToOrientDBSelectString(e) - query := fmt.Sprintf("UPDATE Node SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL AND %s", now, now, elementQuery) + matcherQuery := matcherToOrientDBSelectString(e) + query := fmt.Sprintf("UPDATE Node SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL AND %s", now, now, matcherQuery) if _, err := o.client.SQL(query); err != nil { return fmt.Errorf("Error while flushing graph: %s", err) } - query = fmt.Sprintf("UPDATE Link SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL AND %s", now, now, elementQuery) + query = fmt.Sprintf("UPDATE Link SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL AND %s", now, now, matcherQuery) if _, err := o.client.SQL(query); err != nil { return fmt.Errorf("Error while flushing graph: %s", err) } @@ -373,11 +360,11 @@ func (o *OrientDBBackend) FlushElements(e ElementMatcher) error { // Sync adds all the nodes and edges with the specified filter into an other graph func (o *OrientDBBackend) Sync(g *Graph, elementFilter *ElementFilter) error { // re-insert valid nodes and edges - for _, node := range o.GetNodes(Context{}, nil, elementFilter) { + for _, node := range o.GetNodes(Context{}, elementFilter) { g.NodeAdded(node) } - for _, edge := range o.GetEdges(Context{}, nil, elementFilter) { + for _, edge := range o.GetEdges(Context{}, elementFilter) { g.EdgeAdded(edge) } diff --git a/graffiti/graph/origin.go b/graffiti/graph/origin.go index 28bccb34ae..c111ec44e1 100644 --- a/graffiti/graph/origin.go +++ b/graffiti/graph/origin.go @@ -20,6 +20,7 @@ package graph import ( "fmt" + "github.com/skydive-project/skydive/graffiti/filters" "github.com/skydive-project/skydive/graffiti/service" ws "github.com/skydive-project/skydive/graffiti/websocket" ) @@ -31,9 +32,11 @@ func ClientOrigin(c ws.Speaker) string { // DelSubGraphOfOrigin deletes all the nodes with a specified origin func DelSubGraphOfOrigin(g *Graph, origin string) { - g.DelNodes(Metadata{"Origin": origin}) + filter := filters.NewTermStringFilter("@Origin", origin) + g.DelNodes(NewElementFilter(filter)) } +// Origin returns string representation of origin components func Origin(hostID string, kind service.Type) string { return fmt.Sprintf("%s.%s", kind, hostID) } diff --git a/graffiti/graph/traversal/traversal.go b/graffiti/graph/traversal/traversal.go index a2e2ef0c11..ae35b015ae 100644 --- a/graffiti/graph/traversal/traversal.go +++ b/graffiti/graph/traversal/traversal.go @@ -264,6 +264,7 @@ func KeyValueToFilter(k string, v interface{}) (*filters.Filter, error) { } } +// TODO(safchain) // ParamsToMetadata converts a slice to Metadata func paramsToMetadata(s ...interface{}) (graph.Metadata, error) { if len(s)%2 != 0 { @@ -314,8 +315,8 @@ func ParamsToFilter(filterOp filters.BoolFilterOp, s ...interface{}) (*filters.F return filters.NewBoolFilter(filterOp, lf...), nil } -// ParamsToMetadataFilter converts a slice to a ElementMatcher -func ParamsToMetadataFilter(filterOp filters.BoolFilterOp, s ...interface{}) (graph.ElementMatcher, error) { +// ParamsToElementMatcher converts a slice to a ElementMatcher +func ParamsToElementMatcher(filterOp filters.BoolFilterOp, s ...interface{}) (graph.ElementMatcher, error) { filter, err := ParamsToFilter(filterOp, s...) if err != nil { return nil, err @@ -591,7 +592,7 @@ func (t *GraphTraversal) V(ctx StepContext, s ...interface{}) *GraphTraversalV { } nodes = []*graph.Node{node} default: - if matcher, err = ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...); err != nil { + if matcher, err = ParamsToElementMatcher(filters.BoolFilterOp_AND, s...); err != nil { return &GraphTraversalV{GraphTraversal: t, error: err} } fallthrough @@ -835,7 +836,7 @@ func (t *GraphTraversal) E(ctx StepContext, s ...interface{}) *GraphTraversalE { } edges = []*graph.Edge{edge} default: - if matcher, err = ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...); err != nil { + if matcher, err = ParamsToElementMatcher(filters.BoolFilterOp_AND, s...); err != nil { return &GraphTraversalE{GraphTraversal: t, error: err} } fallthrough @@ -1156,7 +1157,7 @@ func (tv *GraphTraversalV) Both(ctx StepContext, s ...interface{}) *GraphTravers return tv } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalV{GraphTraversal: tv.GraphTraversal, error: err} } @@ -1238,7 +1239,7 @@ func (tv *GraphTraversalV) Out(ctx StepContext, s ...interface{}) *GraphTraversa return tv } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalV{GraphTraversal: tv.GraphTraversal, error: err} } @@ -1269,7 +1270,7 @@ func (tv *GraphTraversalV) OutE(ctx StepContext, s ...interface{}) *GraphTravers return &GraphTraversalE{error: tv.error} } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalE{GraphTraversal: tv.GraphTraversal, error: err} } @@ -1302,7 +1303,7 @@ func (tv *GraphTraversalV) BothE(ctx StepContext, s ...interface{}) *GraphTraver return &GraphTraversalE{error: tv.error} } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalE{GraphTraversal: tv.GraphTraversal, error: err} } @@ -1333,7 +1334,7 @@ func (tv *GraphTraversalV) In(ctx StepContext, s ...interface{}) *GraphTraversal return tv } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalV{GraphTraversal: tv.GraphTraversal, error: err} } @@ -1364,7 +1365,7 @@ func (tv *GraphTraversalV) InE(ctx StepContext, s ...interface{}) *GraphTraversa return &GraphTraversalE{error: tv.error} } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalE{GraphTraversal: tv.GraphTraversal, error: err} } @@ -1666,7 +1667,7 @@ func (te *GraphTraversalE) InV(ctx StepContext, s ...interface{}) *GraphTraversa return &GraphTraversalV{error: te.error} } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalV{GraphTraversal: te.GraphTraversal, error: err} } @@ -1697,7 +1698,7 @@ func (te *GraphTraversalE) OutV(ctx StepContext, s ...interface{}) *GraphTravers return &GraphTraversalV{error: te.error} } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalV{GraphTraversal: te.GraphTraversal, error: err} } @@ -1728,7 +1729,7 @@ func (te *GraphTraversalE) BothV(ctx StepContext, s ...interface{}) *GraphTraver return &GraphTraversalV{GraphTraversal: te.GraphTraversal, error: te.error} } - metadata, err := ParamsToMetadataFilter(filters.BoolFilterOp_AND, s...) + metadata, err := ParamsToElementMatcher(filters.BoolFilterOp_AND, s...) if err != nil { return &GraphTraversalV{GraphTraversal: te.GraphTraversal, error: err} } diff --git a/graffiti/js/test.ts b/graffiti/js/test.ts index f609edf278..eabc49f3db 100644 --- a/graffiti/js/test.ts +++ b/graffiti/js/test.ts @@ -51,7 +51,7 @@ client.login().then(function () { console.log(nodes[0].ID) }) .then(function () { - return client.G.E().Has("Host", NE(1234)) + return client.G.E().Has("@Host", NE(1234)) }) .then(function (edges) { console.log("Edges: " + edges); diff --git a/graffiti/storage/elasticsearch/client.go b/graffiti/storage/elasticsearch/client.go index e512adcde7..71b4e16907 100644 --- a/graffiti/storage/elasticsearch/client.go +++ b/graffiti/storage/elasticsearch/client.go @@ -387,21 +387,22 @@ func (c *Client) start() error { } // FormatFilter creates a ElasticSearch request based on filters -func FormatFilter(filter *filters.Filter, mapKey string) elastic.Query { +func FormatFilter(filter *filters.Filter, normalizeKey func(string) string) elastic.Query { // TODO: remove all this and replace with olivere/elastic queries if filter == nil { return nil } - prefix := mapKey - if prefix != "" { - prefix += "." + if normalizeKey == nil { + normalizeKey = func(key string) string { + return key + } } if f := filter.BoolFilter; f != nil { queries := make([]elastic.Query, len(f.Filters)) for i, item := range f.Filters { - queries[i] = FormatFilter(item, mapKey) + queries[i] = FormatFilter(item, normalizeKey) } boolQuery := elastic.NewBoolQuery() switch f.Op { @@ -417,13 +418,13 @@ func FormatFilter(filter *filters.Filter, mapKey string) elastic.Query { } if f := filter.TermStringFilter; f != nil { - return elastic.NewTermQuery(prefix+f.Key, f.Value) + return elastic.NewTermQuery(normalizeKey(f.Key), f.Value) } if f := filter.TermInt64Filter; f != nil { - return elastic.NewTermQuery(prefix+f.Key, f.Value) + return elastic.NewTermQuery(normalizeKey(f.Key), f.Value) } if f := filter.TermBoolFilter; f != nil { - return elastic.NewTermQuery(prefix+f.Key, f.Value) + return elastic.NewTermQuery(normalizeKey(f.Key), f.Value) } if f := filter.RegexFilter; f != nil { @@ -431,7 +432,7 @@ func FormatFilter(filter *filters.Filter, mapKey string) elastic.Query { value := strings.TrimPrefix(f.Value, "^") value = strings.TrimSuffix(value, "$") - return elastic.NewRegexpQuery(prefix+f.Key, value) + return elastic.NewRegexpQuery(normalizeKey(f.Key), value) } if f := filter.IPV4RangeFilter; f != nil { @@ -445,23 +446,23 @@ func FormatFilter(filter *filters.Filter, mapKey string) elastic.Query { value := strings.TrimPrefix(regex, "^") value = strings.TrimSuffix(value, "$") - return elastic.NewRegexpQuery(prefix+f.Key, value) + return elastic.NewRegexpQuery(normalizeKey(f.Key), value) } if f := filter.GtInt64Filter; f != nil { - return elastic.NewRangeQuery(prefix + f.Key).Gt(f.Value) + return elastic.NewRangeQuery(normalizeKey(f.Key)).Gt(f.Value) } if f := filter.LtInt64Filter; f != nil { - return elastic.NewRangeQuery(prefix + f.Key).Lt(f.Value) + return elastic.NewRangeQuery(normalizeKey(f.Key)).Lt(f.Value) } if f := filter.GteInt64Filter; f != nil { - return elastic.NewRangeQuery(prefix + f.Key).Gte(f.Value) + return elastic.NewRangeQuery(normalizeKey(f.Key)).Gte(f.Value) } if f := filter.LteInt64Filter; f != nil { - return elastic.NewRangeQuery(prefix + f.Key).Lte(f.Value) + return elastic.NewRangeQuery(normalizeKey(f.Key)).Lte(f.Value) } if f := filter.NullFilter; f != nil { - return elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery(prefix + f.Key)) + return elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery(normalizeKey(f.Key))) } return nil } diff --git a/graffiti/storage/orientdb/client.go b/graffiti/storage/orientdb/client.go index f40b22fe0f..844b172abc 100644 --- a/graffiti/storage/orientdb/client.go +++ b/graffiti/storage/orientdb/client.go @@ -194,16 +194,16 @@ func compressBody(body io.Reader) io.Reader { } // FilterToExpression returns a OrientDB select expression based on filters -func FilterToExpression(f *filters.Filter, formatter func(string) string) string { - if formatter == nil { - formatter = func(s string) string { return s } +func FilterToExpression(f *filters.Filter, normalizeKey func(string) string) string { + if normalizeKey == nil { + normalizeKey = func(s string) string { return s } } if f.BoolFilter != nil { keyword := "" switch f.BoolFilter.Op { case filters.BoolFilterOp_NOT: - return "NOT (" + FilterToExpression(f.BoolFilter.Filters[0], formatter) + ")" + return "NOT (" + FilterToExpression(f.BoolFilter.Filters[0], normalizeKey) + ")" case filters.BoolFilterOp_OR: keyword = "OR" case filters.BoolFilterOp_AND: @@ -211,57 +211,57 @@ func FilterToExpression(f *filters.Filter, formatter func(string) string) string } var conditions []string for _, item := range f.BoolFilter.Filters { - if expr := FilterToExpression(item, formatter); expr != "" { - conditions = append(conditions, "("+FilterToExpression(item, formatter)+")") + if expr := FilterToExpression(item, normalizeKey); expr != "" { + conditions = append(conditions, "("+FilterToExpression(item, normalizeKey)+")") } } return strings.Join(conditions, " "+keyword+" ") } if f.TermStringFilter != nil { - return fmt.Sprintf(`(%s = "%s") OR ("%s" IN %s)`, formatter(f.TermStringFilter.Key), f.TermStringFilter.Value, - f.TermStringFilter.Value, formatter(f.TermStringFilter.Key)) + return fmt.Sprintf(`(%s = "%s") OR ("%s" IN %s)`, normalizeKey(f.TermStringFilter.Key), f.TermStringFilter.Value, + f.TermStringFilter.Value, normalizeKey(f.TermStringFilter.Key)) } if f.TermInt64Filter != nil { - return fmt.Sprintf(`(%s = %d) OR (%d IN %s)`, formatter(f.TermInt64Filter.Key), f.TermInt64Filter.Value, - f.TermInt64Filter.Value, formatter(f.TermInt64Filter.Key)) + return fmt.Sprintf(`(%s = %d) OR (%d IN %s)`, normalizeKey(f.TermInt64Filter.Key), f.TermInt64Filter.Value, + f.TermInt64Filter.Value, normalizeKey(f.TermInt64Filter.Key)) } if f.TermBoolFilter != nil { - return fmt.Sprintf(`(%s = %s) OR (%s IN %s)`, formatter(f.TermBoolFilter.Key), strconv.FormatBool(f.TermBoolFilter.Value), - strconv.FormatBool(f.TermBoolFilter.Value), formatter(f.TermBoolFilter.Key)) + return fmt.Sprintf(`(%s = %s) OR (%s IN %s)`, normalizeKey(f.TermBoolFilter.Key), strconv.FormatBool(f.TermBoolFilter.Value), + strconv.FormatBool(f.TermBoolFilter.Value), normalizeKey(f.TermBoolFilter.Key)) } if f.GtInt64Filter != nil { - return fmt.Sprintf("%v > %v", formatter(f.GtInt64Filter.Key), f.GtInt64Filter.Value) + return fmt.Sprintf("%v > %v", normalizeKey(f.GtInt64Filter.Key), f.GtInt64Filter.Value) } if f.LtInt64Filter != nil { - return fmt.Sprintf("%v < %v", formatter(f.LtInt64Filter.Key), f.LtInt64Filter.Value) + return fmt.Sprintf("%v < %v", normalizeKey(f.LtInt64Filter.Key), f.LtInt64Filter.Value) } if f.GteInt64Filter != nil { - return fmt.Sprintf("%v >= %v", formatter(f.GteInt64Filter.Key), f.GteInt64Filter.Value) + return fmt.Sprintf("%v >= %v", normalizeKey(f.GteInt64Filter.Key), f.GteInt64Filter.Value) } if f.LteInt64Filter != nil { - return fmt.Sprintf("%v <= %v", formatter(f.LteInt64Filter.Key), f.LteInt64Filter.Value) + return fmt.Sprintf("%v <= %v", normalizeKey(f.LteInt64Filter.Key), f.LteInt64Filter.Value) } if f.RegexFilter != nil { - return fmt.Sprintf(`%s MATCHES "%s"`, formatter(f.RegexFilter.Key), strings.Replace(f.RegexFilter.Value, `\`, `\\`, -1)) + return fmt.Sprintf(`%s MATCHES "%s"`, normalizeKey(f.RegexFilter.Key), strings.Replace(f.RegexFilter.Value, `\`, `\\`, -1)) } if f.NullFilter != nil { - return fmt.Sprintf("%s is NULL", formatter(f.NullFilter.Key)) + return fmt.Sprintf("%s is NULL", normalizeKey(f.NullFilter.Key)) } if f.IPV4RangeFilter != nil { // ignore the error at this point it should have been catched earlier regex, _ := filters.IPV4CIDRToRegex(f.IPV4RangeFilter.Value) - return fmt.Sprintf(`%s MATCHES "%s"`, formatter(f.IPV4RangeFilter.Key), strings.Replace(regex, `\`, `\\`, -1)) + return fmt.Sprintf(`%s MATCHES "%s"`, normalizeKey(f.IPV4RangeFilter.Key), strings.Replace(regex, `\`, `\\`, -1)) } return "" diff --git a/graffiti/websocket/server.go b/graffiti/websocket/server.go index 456a33dac7..87113930c0 100644 --- a/graffiti/websocket/server.go +++ b/graffiti/websocket/server.go @@ -110,7 +110,7 @@ func (s *Server) serveMessages(w http.ResponseWriter, r *auth.AuthenticatedReque if _, err = s.incomerHandler(conn, r, func(c *wsIncomingClient) (Speaker, error) { return c, nil }); err != nil { s.opts.Logger.Warningf("Unable to accept incomer from %s: %s", r.RemoteAddr, err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil { - conn.Close() + conn.Close() } } } diff --git a/gremlin/traversal/topology.go b/gremlin/traversal/topology.go index b6c650f139..19da2948d0 100644 --- a/gremlin/traversal/topology.go +++ b/gremlin/traversal/topology.go @@ -35,7 +35,7 @@ func InterfaceMetrics(ctx traversal.StepContext, tv *traversal.GraphTraversalV, } startField := key + ".Start" - tv = tv.Dedup(ctx, "ID", startField).Sort(ctx, filters.SortOrder_Ascending, startField) + tv = tv.Dedup(ctx, "@ID", startField).Sort(ctx, filters.SortOrder_Ascending, startField) if tv.Error() != nil { return NewMetricsTraversalStepFromError(tv.Error())