Skip to content

Commit

Permalink
common: moved UnixMillis to flow and graph packages
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce committed Jul 8, 2020
1 parent 4b654fc commit 26b9ab9
Show file tree
Hide file tree
Showing 19 changed files with 59 additions and 70 deletions.
6 changes: 0 additions & 6 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package common

import (
"errors"
"time"
)

var (
Expand All @@ -45,11 +44,6 @@ const (
SortDescending SortOrder = "DESC"
)

// UnixMillis returns the current time in miliseconds
func UnixMillis(t time.Time) int64 {
return t.UTC().UnixNano() / 1000000
}

// Metric defines a common metric interface
type Metric interface {
// part of the Getter interface
Expand Down
14 changes: 6 additions & 8 deletions flow/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"syscall"
"time"
"unsafe"

"github.com/skydive-project/skydive/common"
)

// #cgo CFLAGS: -I../ebpf
Expand Down Expand Up @@ -58,7 +56,7 @@ func tcpFlagTime(currFlagTime C.__u64, startKTimeNs int64, start time.Time) int6
if currFlagTime == 0 {
return 0
}
return common.UnixMillis(start.Add(time.Duration(int64(currFlagTime) - startKTimeNs)))
return UnixMilli(start.Add(time.Duration(int64(currFlagTime) - startKTimeNs)))
}

func kernLayersPath(kernFlow *C.struct_flow) (string, bool) {
Expand Down Expand Up @@ -114,8 +112,8 @@ func (ft *Table) newFlowFromEBPF(ebpfFlow *EBPFFlow, key uint64) ([]uint64, []*F
var keys []uint64

f := NewFlow()
f.Init(common.UnixMillis(ebpfFlow.Start), "", &ft.uuids)
f.Last = common.UnixMillis(ebpfFlow.Last)
f.Init(UnixMilli(ebpfFlow.Start), "", &ft.uuids)
f.Last = UnixMilli(ebpfFlow.Last)

f.Metric = &FlowMetric{
ABBytes: int64(ebpfFlow.KernFlow.metrics.ab_bytes),
Expand Down Expand Up @@ -194,8 +192,8 @@ func (ft *Table) newFlowFromEBPF(ebpfFlow *EBPFFlow, key uint64) ([]uint64, []*F

// inner layer
f = NewFlow()
f.Init(common.UnixMillis(ebpfFlow.Start), parent.UUID, &ft.uuids)
f.Last = common.UnixMillis(ebpfFlow.Last)
f.Init(UnixMilli(ebpfFlow.Start), parent.UUID, &ft.uuids)
f.Last = UnixMilli(ebpfFlow.Last)
f.LayersPath = innerLayerPath
}

Expand Down Expand Up @@ -319,7 +317,7 @@ func isABPacket(ebpfFlow *EBPFFlow, f *Flow) bool {
}

func (ft *Table) updateFlowFromEBPF(ebpfFlow *EBPFFlow, f *Flow) bool {
last := common.UnixMillis(ebpfFlow.Last)
last := UnixMilli(ebpfFlow.Last)
if last == f.Last {
return false
}
Expand Down
13 changes: 9 additions & 4 deletions flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pierrec/xxHash/xxHash64"
"github.com/spf13/cast"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
fl "github.com/skydive-project/skydive/flow/layers"
"github.com/skydive-project/skydive/graffiti/getter"
Expand Down Expand Up @@ -69,6 +68,12 @@ type flowState struct {
ipv6 *layers.IPv6
}

// UnixMilli returns t as a Unix time, the number of milliseconds elapsed
// since January 1, 1970 UTC.
func UnixMilli(t time.Time) int64 {
return t.UTC().UnixNano() / 1000000
}

// Packet describes one packet
type Packet struct {
GoPacket gopacket.Packet // orignal gopacket
Expand Down Expand Up @@ -583,7 +588,7 @@ func (f *Flow) Init(now int64, parentUUID string, uuids *UUIDs) {

// initFromPacket initializes the flow based on packet data, flow key and ids
func (f *Flow) initFromPacket(key, l2Key, l3Key uint64, packet *Packet, parentUUID string, uuids *UUIDs, opts *Opts) {
now := common.UnixMillis(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
now := UnixMilli(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
f.Init(now, parentUUID, uuids)

f.newLinkLayer(packet)
Expand All @@ -607,7 +612,7 @@ func (f *Flow) initFromPacket(key, l2Key, l3Key uint64, packet *Packet, parentUU

// Update a flow metrics and latency
func (f *Flow) Update(packet *Packet, opts *Opts) {
now := common.UnixMillis(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
now := UnixMilli(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
f.Last = now
f.Metric.Last = now

Expand Down Expand Up @@ -956,7 +961,7 @@ func (f *Flow) updateTCPMetrics(packet *Packet) error {
return nil
}

captureTime := common.UnixMillis(metadata.CaptureInfo.Timestamp)
captureTime := UnixMilli(metadata.CaptureInfo.Timestamp)

switch {
case tcpPacket.SYN:
Expand Down
3 changes: 1 addition & 2 deletions flow/probes/targets/netflow_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/google/gopacket"

"github.com/skydive-project/skydive/api/types"
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/flow"
"github.com/skydive-project/skydive/graffiti/graph"
Expand Down Expand Up @@ -255,7 +254,7 @@ func NewNetFlowV5Target(g *graph.Graph, n *graph.Node, capture *types.Capture, u
nf := &NetFlowV5Target{
target: capture.Target,
sysBoot: now,
sysBootMs: common.UnixMillis(now),
sysBootMs: flow.UnixMilli(now),
}

nf.table = flow.NewTable(updateEvery, expireAfter, nf, uuids, tableOptsFromCapture(capture))
Expand Down
6 changes: 3 additions & 3 deletions flow/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (ft *Table) expire(expireBefore int64) {
}

func (ft *Table) updateAt(now time.Time) {
updateTime := common.UnixMillis(now)
updateTime := UnixMilli(now)
ft.update(ft.lastUpdate, updateTime)
ft.lastUpdate = updateTime
ft.updateVersion++
Expand Down Expand Up @@ -350,7 +350,7 @@ func (ft *Table) expireNow() {

func (ft *Table) expireAt(now time.Time) {
ft.expire(ft.lastExpire)
ft.lastExpire = common.UnixMillis(now)
ft.lastExpire = UnixMilli(now)
}

func (ft *Table) onQuery(tq *TableQuery) []byte {
Expand Down Expand Up @@ -427,7 +427,7 @@ func (ft *Table) packetToFlow(packet *Packet, parentUUID string) *Flow {
flow.RawPacketsCaptured++
linkType, _ := flow.LinkType()
data := &RawPacket{
Timestamp: common.UnixMillis(packet.GoPacket.Metadata().CaptureInfo.Timestamp),
Timestamp: UnixMilli(packet.GoPacket.Metadata().CaptureInfo.Timestamp),
Index: flow.RawPacketsCaptured,
Data: packet.Data,
LinkType: linkType,
Expand Down
11 changes: 6 additions & 5 deletions flow/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/google/gopacket/layers"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/graffiti/filters"
Expand Down Expand Up @@ -144,7 +145,7 @@ func TestUpdate(t *testing.T) {
flow1.XXX_state.updateVersion = table.updateVersion + 1

// check that LastUpdateMetric is filled after a expire before an update
table.expire(common.UnixMillis(time.Now()))
table.expire(UnixMilli(time.Now()))

if flow1.LastUpdateMetric.ABBytes != 1 {
t.Errorf("Flow should have been updated by expire : %+v", flow1)
Expand Down Expand Up @@ -217,11 +218,11 @@ func TestAppSpecificTimeout(t *testing.T) {
flowsTime := time.Now()

arpFlow, _ := table.getOrCreateFlow(123)
arpFlow.Last = common.UnixMillis(flowsTime)
arpFlow.Last = UnixMilli(flowsTime)
arpFlow.Application = "ARP"

dnsFlow, _ := table.getOrCreateFlow(456)
dnsFlow.Last = common.UnixMillis(flowsTime)
dnsFlow.Last = UnixMilli(flowsTime)
dnsFlow.Application = "DNS"

table.updateAt(flowsTime.Add(time.Duration(15) * time.Second))
Expand All @@ -241,7 +242,7 @@ func TestHold(t *testing.T) {
flowTime := time.Now()

flow1, _ := table.getOrCreateFlow(123)
flow1.Last = common.UnixMillis(flowTime)
flow1.Last = UnixMilli(flowTime)
flow1.FinishType = FlowFinishType_TCP_FIN

table.updateAt(flowTime.Add(time.Duration(5) * time.Second))
Expand All @@ -254,7 +255,7 @@ func TestHold(t *testing.T) {
}

flow2, _ := table.getOrCreateFlow(456)
flow2.Last = common.UnixMillis(flowTime)
flow2.Last = UnixMilli(flowTime)
flow2.FinishType = FlowFinishType_TCP_FIN
table.updateAt(flowTime.Add(time.Duration(5) * time.Second))
flow2.FinishType = FlowFinishType_NOT_FINISHED
Expand Down
10 changes: 5 additions & 5 deletions graffiti/graph/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ func graphElementToRaw(typ string, e *graphElement) (*rawData, error) {
ID: string(e.ID),
Host: e.Host,
Origin: e.Origin,
CreatedAt: e.CreatedAt.Unix(),
UpdatedAt: e.UpdatedAt.Unix(),
CreatedAt: e.CreatedAt.UnixMilli(),
UpdatedAt: e.UpdatedAt.UnixMilli(),
Metadata: json.RawMessage(data),
Revision: e.Revision,
}

if !e.DeletedAt.IsZero() {
raw.DeletedAt = e.DeletedAt.Unix()
raw.DeletedAt = e.DeletedAt.UnixMilli()
}

return raw, nil
Expand All @@ -162,7 +162,7 @@ func edgeToRaw(e *Edge) (*rawData, error) {
}

func (b *ElasticSearchBackend) archive(raw *rawData, at Time) error {
raw.ArchivedAt = at.Unix()
raw.ArchivedAt = at.UnixMilli()

data, err := json.Marshal(raw)
if err != nil {
Expand Down Expand Up @@ -512,7 +512,7 @@ func (b *ElasticSearchBackend) flushGraph() error {
script := elastic.NewScript("ctx._source.DeletedAt = params.now; ctx._source.ArchivedAt = params.now;")
script.Lang("painless")
script.Params(map[string]interface{}{
"now": TimeUTC().Unix(),
"now": TimeUTC().UnixMilli(),
})

return b.client.UpdateByScript(query, script, b.liveIndex.Alias(), b.archiveIndex.IndexWildcard())
Expand Down
7 changes: 2 additions & 5 deletions graffiti/graph/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package graph

import (
"time"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/graffiti/filters"
"github.com/skydive-project/skydive/graffiti/getter"
)
Expand Down Expand Up @@ -74,8 +71,8 @@ func NewTimeSlice(s, l int64) *TimeSlice {
// startName and endName. time.Now() is used as reference if t == nil
func filterForTimeSlice(t *TimeSlice, startName, endName string) *filters.Filter {
if t == nil {
u := common.UnixMillis(time.Now())
t = NewTimeSlice(u, u)
now := TimeNow().UnixMilli()
t = NewTimeSlice(now, now)
}

return filters.NewAndFilter(
Expand Down
6 changes: 3 additions & 3 deletions graffiti/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ func (e *graphElement) GetFieldBool(field string) (_ bool, err error) {
func (e *graphElement) GetFieldInt64(field string) (_ int64, err error) {
switch field {
case "CreatedAt":
return e.CreatedAt.Unix(), nil
return e.CreatedAt.UnixMilli(), nil
case "UpdatedAt":
return e.UpdatedAt.Unix(), nil
return e.UpdatedAt.UnixMilli(), nil
case "DeletedAt":
return e.DeletedAt.Unix(), nil
return e.DeletedAt.UnixMilli(), nil
case "Revision":
return e.Revision, nil
default:
Expand Down
8 changes: 4 additions & 4 deletions graffiti/graph/orientdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func graphElementToOrientDBSetString(e graphElement) (s string) {
fmt.Sprintf("ID = \"%s\"", string(e.ID)),
fmt.Sprintf("Host = \"%s\"", e.Host),
fmt.Sprintf("Origin = \"%s\"", e.Origin),
fmt.Sprintf("CreatedAt = %d", e.CreatedAt.Unix()),
fmt.Sprintf("UpdatedAt = %d", e.UpdatedAt.Unix()),
fmt.Sprintf("CreatedAt = %d", e.CreatedAt.UnixMilli()),
fmt.Sprintf("UpdatedAt = %d", e.UpdatedAt.UnixMilli()),
fmt.Sprintf("Revision = %d", e.Revision),
}
s = strings.Join(properties, ", ")
Expand Down Expand Up @@ -89,7 +89,7 @@ func metadataToOrientDBSelectString(m ElementMatcher) string {
func (o *OrientDBBackend) updateTimes(e string, id string, events ...eventTime) error {
attrs := []string{}
for _, event := range events {
attrs = append(attrs, fmt.Sprintf("%s = %d", event.name, event.t.Unix()))
attrs = append(attrs, fmt.Sprintf("%s = %d", event.name, event.t.UnixMilli()))
}
query := fmt.Sprintf("UPDATE %s SET %s WHERE ID = '%s' AND DeletedAt IS NULL AND ArchivedAt IS NULL", e, strings.Join(attrs, ", "), id)
result, err := o.client.SQL(query)
Expand Down Expand Up @@ -326,7 +326,7 @@ func (o *OrientDBBackend) IsHistorySupported() bool {
func (o *OrientDBBackend) flushGraph() error {
o.logger.Info("Flush graph elements")

now := TimeUTC().Unix()
now := TimeUTC().UnixMilli()

query := fmt.Sprintf("UPDATE Node SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL", now, now)
if _, err := o.client.SQL(query); err != nil {
Expand Down
15 changes: 9 additions & 6 deletions graffiti/graph/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,22 @@ import (
"encoding/json"
"strconv"
"time"

"github.com/skydive-project/skydive/common"
)

// Time describes time type used in the graph
type Time time.Time

// Unix returns the time in millisecond
func (t Time) Unix() int64 {
return common.UnixMillis(time.Time(t))
// UnixMilli returns the time in milliseconds since January 1, 1970
func (t Time) UnixMilli() int64 {
return time.Time(t).UnixNano() / int64(time.Millisecond)
}

// MarshalJSON custom marshalling function
func (t *Time) MarshalJSON() ([]byte, error) {
if t.IsZero() {
return []byte("null"), nil
}
return json.Marshal(t.Unix())
return json.Marshal(t.UnixMilli())
}

// UnmarshalJSON custom unmarshalling function
Expand All @@ -61,6 +59,11 @@ func (t Time) IsZero() bool {
return time.Time(t).IsZero()
}

// TimeNow creates a Time with now local time
func TimeNow() Time {
return Time(time.Now())
}

// TimeUTC creates a Time with now UTC
func TimeUTC() Time {
return Time(time.Now().UTC())
Expand Down
2 changes: 1 addition & 1 deletion graffiti/graph/traversal/traversal.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (t *GraphTraversal) Context(s ...interface{}) *GraphTraversal {

g, err := t.Graph.CloneWithContext(graph.Context{
TimePoint: len(s) == 1,
TimeSlice: graph.NewTimeSlice(common.UnixMillis(at.Add(-duration)), common.UnixMillis(at)),
TimeSlice: graph.NewTimeSlice(graph.Time(at.Add(-duration)).UnixMilli(), graph.Time(at).UnixMilli()),
})
if err != nil {
return &GraphTraversal{error: err}
Expand Down
4 changes: 1 addition & 3 deletions gremlin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"fmt"
"strings"
"time"

"github.com/skydive-project/skydive/common"
)

// QueryString used to construct string representation of query
Expand Down Expand Up @@ -117,7 +115,7 @@ func (q QueryString) Context(list ...interface{}) QueryString {
if t.IsZero() {
return q
}
newQ = newQ.appends(fmt.Sprintf("%d", common.UnixMillis(t)))
newQ = newQ.appends(fmt.Sprintf("%d", t.UnixNano()/int64(time.Millisecond)))
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
newQ = newQ.appends(fmt.Sprintf("%d", t))
default:
Expand Down
3 changes: 1 addition & 2 deletions netflow/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/google/gopacket/layers"
"github.com/safchain/insanelock"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/flow"
"github.com/skydive-project/skydive/graffiti/logging"
Expand Down Expand Up @@ -88,7 +87,7 @@ func (nfa *Agent) feedFlowTable(extFlowChan chan *flow.ExtFlow) {
continue
}

bootTime := common.UnixMillis(time.Now()) - int64(msg.Header.SysUpTimeMSecs)
bootTime := flow.UnixMilli(time.Now()) - int64(msg.Header.SysUpTimeMSecs)

LOOP:
for _, nf := range msg.Flows {
Expand Down
Loading

0 comments on commit 26b9ab9

Please sign in to comment.