Skip to content

Commit

Permalink
grafitti: allow to request Revision and Origin
Browse files Browse the repository at this point in the history
  • Loading branch information
safchain committed Apr 8, 2021
1 parent 88812a7 commit c4d81f6
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 175 deletions.
2 changes: 1 addition & 1 deletion flow/ondemand/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ func NewOnDemandFlowProbeClient(g *graph.Graph, ch rest.WatchableHandler, agentP
nodeTypes[i] = nodeType
i++
}
nodeTypeQuery := new(gremlin.QueryString).Has("Host", gremlin.Ne(""), "Type", gremlin.Within(nodeTypes...)).String()
nodeTypeQuery := new(gremlin.QueryString).Has("@Host", gremlin.Ne(""), "Type", gremlin.Within(nodeTypes...)).String()
return client.NewOnDemandClient(g, ch, agentPool, subscriberPool, etcdClient, &onDemandFlowHandler{graph: g, nodeTypeQuery: nodeTypeQuery})
}
14 changes: 9 additions & 5 deletions flow/storage/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ type rawpacketRecord struct {
Flow *embeddedFlow `json:"Flow"`
}

func normalizeKey(key string) string {
return "Flow." + key
}

// StoreFlows push a set of flows in the database
func (c *Storage) StoreFlows(flows []*flow.Flow) error {
if !c.client.Started() {
Expand Down Expand Up @@ -241,10 +245,10 @@ func (c *Storage) SearchRawPackets(fsq filters.SearchQuery, packetFilter *filter
}

// do not escape flow as ES use sub object in that case
mustQueries := []elastic.Query{es.FormatFilter(fsq.Filter, "Flow")}
mustQueries := []elastic.Query{es.FormatFilter(fsq.Filter, normalizeKey)}

if packetFilter != nil {
mustQueries = append(mustQueries, es.FormatFilter(packetFilter, ""))
mustQueries = append(mustQueries, es.FormatFilter(packetFilter, nil))
}

out, err := c.sendRequest(elastic.NewBoolQuery().Must(mustQueries...), fsq, rawpacketIndex.IndexWildcard(c.indexPrefix))
Expand Down Expand Up @@ -276,8 +280,8 @@ func (c *Storage) SearchMetrics(fsq filters.SearchQuery, metricFilter *filters.F
}

// do not escape flow as ES use sub object in that case
flowQuery := es.FormatFilter(fsq.Filter, "Flow")
metricQuery := es.FormatFilter(metricFilter, "")
flowQuery := es.FormatFilter(fsq.Filter, normalizeKey)
metricQuery := es.FormatFilter(metricFilter, nil)

query := elastic.NewBoolQuery().Must(flowQuery, metricQuery)
out, err := c.sendRequest(query, fsq, metricIndex.IndexWildcard(c.indexPrefix))
Expand Down Expand Up @@ -311,7 +315,7 @@ func (c *Storage) SearchFlows(fsq filters.SearchQuery) (*flow.FlowSet, error) {
}

// TODO: dedup and sort in order to remove duplicate flow UUID due to rolling index
out, err := c.sendRequest(es.FormatFilter(fsq.Filter, ""), fsq, flowIndex.IndexWildcard(c.indexPrefix))
out, err := c.sendRequest(es.FormatFilter(fsq.Filter, nil), fsq, flowIndex.IndexWildcard(c.indexPrefix))
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions graffiti/graph/cachedbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,24 @@ func (c *CachedBackend) MetadataUpdated(i interface{}) error {
}

// GetNodes returns a list of nodes with a time slice, matching metadata
func (c *CachedBackend) GetNodes(t Context, m ElementMatcher, e ElementMatcher) []*Node {
func (c *CachedBackend) GetNodes(t Context, m ElementMatcher) []*Node {
mode := c.cacheMode.Load()

if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil {
return c.memory.GetNodes(t, m, e)
return c.memory.GetNodes(t, m)
}

return c.persistent.GetNodes(t, m, e)
return c.persistent.GetNodes(t, m)
}

// GetEdges returns a list of edges with a time slice, matching metadata
func (c *CachedBackend) GetEdges(t Context, m ElementMatcher, e ElementMatcher) []*Edge {
func (c *CachedBackend) GetEdges(t Context, m ElementMatcher) []*Edge {
mode := c.cacheMode.Load()

if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil {
return c.memory.GetEdges(t, m, e)
return c.memory.GetEdges(t, m)
}
return c.persistent.GetEdges(t, m, e)
return c.persistent.GetEdges(t, m)
}

// IsHistorySupported returns whether the persistent backend supports history
Expand Down
78 changes: 30 additions & 48 deletions graffiti/graph/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ type ElasticSearchBackend struct {
// TimedSearchQuery describes a search query within a time slice and metadata filters
type TimedSearchQuery struct {
filters.SearchQuery
TimeFilter *filters.Filter
MetadataFilter *filters.Filter
ElementFilter *filters.Filter
TimeFilter *filters.Filter
ElementFilter *filters.Filter
}

// easyjson:json
Expand All @@ -124,6 +123,13 @@ type rawData struct {
Child string `json:"Child,omitempty"`
}

func normalizeKey(key string) string {
if key[0] == '@' {
return key[1:]
}
return MetadataPrefix + key
}

func graphElementToRaw(typ string, e *graphElement) (*rawData, error) {
data, err := json.Marshal(e.Metadata)
if err != nil {
Expand Down Expand Up @@ -331,22 +337,18 @@ func (b *ElasticSearchBackend) MetadataUpdated(i interface{}) error {
// Query the database for a "node" or "edge"
func (b *ElasticSearchBackend) Query(typ string, tsq *TimedSearchQuery) (sr *elastic.SearchResult, _ error) {
fltrs := []elastic.Query{
es.FormatFilter(filters.NewTermStringFilter("_Type", typ), ""),
es.FormatFilter(filters.NewTermStringFilter("_Type", typ), nil),
}

if tf := es.FormatFilter(tsq.TimeFilter, ""); tf != nil {
if tf := es.FormatFilter(tsq.TimeFilter, nil); tf != nil {
fltrs = append(fltrs, tf)
}

if f := es.FormatFilter(tsq.Filter, ""); f != nil {
if f := es.FormatFilter(tsq.Filter, nil); f != nil {
fltrs = append(fltrs, f)
}

if ef := es.FormatFilter(tsq.ElementFilter, ""); ef != nil {
fltrs = append(fltrs, ef)
}

if mf := es.FormatFilter(tsq.MetadataFilter, "Metadata"); mf != nil {
if mf := es.FormatFilter(tsq.ElementFilter, normalizeKey); mf != nil {
fltrs = append(fltrs, mf)
}

Expand Down Expand Up @@ -400,23 +402,14 @@ func (b *ElasticSearchBackend) searchEdges(tsq *TimedSearchQuery) (edges []*Edge
}

// GetEdges returns a list of edges within time slice, matching metadata
func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher, e ElementMatcher) []*Edge {
var metadataFilter *filters.Filter
func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher) []*Edge {
var filter *filters.Filter
if m != nil {
f, err := m.Filter()
if err != nil {
return []*Edge{}
}
metadataFilter = f
}

var elementFilter *filters.Filter
if e != nil {
f, err := e.Filter()
if err != nil {
return []*Edge{}
}
elementFilter = f
filter = f
}

var searchQuery filters.SearchQuery
Expand All @@ -425,10 +418,9 @@ func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher, e ElementMa
}

edges := b.searchEdges(&TimedSearchQuery{
SearchQuery: searchQuery,
TimeFilter: getTimeFilter(t.TimeSlice),
MetadataFilter: metadataFilter,
ElementFilter: elementFilter,
SearchQuery: searchQuery,
TimeFilter: getTimeFilter(t.TimeSlice),
ElementFilter: filter,
})

if t.TimePoint {
Expand All @@ -439,7 +431,7 @@ func (b *ElasticSearchBackend) GetEdges(t Context, m ElementMatcher, e ElementMa
}

// GetNodes returns a list of nodes within time slice, matching metadata
func (b *ElasticSearchBackend) GetNodes(t Context, m ElementMatcher, e ElementMatcher) []*Node {
func (b *ElasticSearchBackend) GetNodes(t Context, m ElementMatcher) []*Node {
var filter *filters.Filter
if m != nil {
f, err := m.Filter()
Expand All @@ -449,25 +441,15 @@ func (b *ElasticSearchBackend) GetNodes(t Context, m ElementMatcher, e ElementMa
filter = f
}

var elementFilter *filters.Filter
if e != nil {
f, err := e.Filter()
if err != nil {
return []*Node{}
}
elementFilter = f
}

var searchQuery filters.SearchQuery
if !t.TimePoint {
searchQuery = filters.SearchQuery{Sort: true, SortBy: "UpdatedAt"}
}

nodes := b.searchNodes(&TimedSearchQuery{
SearchQuery: searchQuery,
TimeFilter: getTimeFilter(t.TimeSlice),
MetadataFilter: filter,
ElementFilter: elementFilter,
SearchQuery: searchQuery,
TimeFilter: getTimeFilter(t.TimeSlice),
ElementFilter: filter,
})

if len(nodes) > 1 && t.TimePoint {
Expand Down Expand Up @@ -512,9 +494,9 @@ func (b *ElasticSearchBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher
searchQuery.Filter = NewFilterForEdge(n.ID, n.ID)

edges = b.searchEdges(&TimedSearchQuery{
SearchQuery: searchQuery,
TimeFilter: getTimeFilter(t.TimeSlice),
MetadataFilter: filter,
SearchQuery: searchQuery,
TimeFilter: getTimeFilter(t.TimeSlice),
ElementFilter: filter,
})

if len(edges) > 1 && t.TimePoint {
Expand Down Expand Up @@ -550,9 +532,9 @@ func (b *ElasticSearchBackend) FlushElements(m ElementMatcher) error {

andFilter := filters.NewAndFilter(
filter,
filters.NewNullFilter("DeletedAt"),
filters.NewNullFilter("@DeletedAt"),
)
query := es.FormatFilter(andFilter, "")
query := es.FormatFilter(andFilter, normalizeKey)

script := elastic.NewScript("ctx._source.DeletedAt = params.now; ctx._source.ArchivedAt = params.now;")
script.Lang("painless")
Expand All @@ -566,7 +548,7 @@ func (b *ElasticSearchBackend) FlushElements(m ElementMatcher) error {
// Sync adds all the nodes and edges with the specified filter into an other graph
func (b *ElasticSearchBackend) Sync(g *Graph, elementFilter *ElementFilter) error {
// re-insert valid nodes and edges
for _, node := range b.GetNodes(Context{}, nil, elementFilter) {
for _, node := range b.GetNodes(Context{}, elementFilter) {
g.NodeAdded(node)

raw, err := nodeToRaw(node)
Expand All @@ -577,7 +559,7 @@ func (b *ElasticSearchBackend) Sync(g *Graph, elementFilter *ElementFilter) erro
b.prevRevision[node.ID] = raw
}

for _, edge := range b.GetEdges(Context{}, nil, elementFilter) {
for _, edge := range b.GetEdges(Context{}, elementFilter) {
g.EdgeAdded(edge)

raw, err := edgeToRaw(edge)
Expand Down
Loading

0 comments on commit c4d81f6

Please sign in to comment.