Skip to content

Commit

Permalink
Merge pull request skydive-project#2380 from skydive-project/libvirt-…
Browse files Browse the repository at this point in the history
…fixes

Libvirt fixes
  • Loading branch information
safchain authored Jul 18, 2021
2 parents f54dfe0 + 4d1abf3 commit 64561f1
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .mk/tests.mk
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
TEST_PATTERN?=
UT_PACKAGES?=$(shell $(GO) list ./... | grep -Ev '/tests|/contrib')
UT_PACKAGES?=$(shell $(GO) list $(SKYDIVE_GITHUB)/... | grep -Ev '/tests|/contrib')
FUNC_TESTS_CMD:="grep -e 'func Test${TEST_PATTERN}' tests/*.go | perl -pe 's|.*func (.*?)\(.*|\1|g' | shuf"
FUNC_TESTS:=$(shell sh -c $(FUNC_TESTS_CMD))
VERBOSE_TESTS_FLAGS?=
Expand Down
5 changes: 2 additions & 3 deletions graffiti/js/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/robertkrimen/otto"

"github.com/skydive-project/skydive/graffiti/assets"
Expand Down Expand Up @@ -122,11 +121,11 @@ func (r *Runtime) RunScript(path string) otto.Value {
func (r *Runtime) runEmbededScript(path string) error {
content, err := r.assets.Asset(path)
if err != nil {
return errors.Wrapf(err, "failed to load %s asset: %s)", path)
return fmt.Errorf("failed to load %s asset: %w", path, err)
}

if _, err := r.Run(string(content)); err != nil {
return errors.Wrapf(err, "failed to run %s: %s", path)
return fmt.Errorf("failed to run %s: %w", path, err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions graffiti/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func (m *PartiallyUpdatedRawMsg) Decode(decoders map[string]graph.MetadataDecode
if decoder, found := decoders[op.Key]; found {
value, err = decoder(op.Value)
if err != nil {
return nil, errors.Wrapf(err, "failed to decode partial operation for '%s'", op.Key)
return nil, fmt.Errorf("failed to decode partial operation for '%s': %w", op.Key, err)
}
} else {
if err = json.Unmarshal(op.Value, &value); err != nil {
return nil, errors.Wrapf(err, "failed to decode partial update of key 's'", op.Key)
return nil, fmt.Errorf("failed to decode partial update of key '%s': %w", op.Key, err)
}
}
case graph.PartiallyUpdatedDelOpType:
Expand Down
8 changes: 5 additions & 3 deletions graffiti/websocket/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type fakeClientSubscriptionHandler struct {
connected int
}

func (f *fakeServerSubscriptionHandler) OnConnected(c Speaker) {
func (f *fakeServerSubscriptionHandler) OnConnected(c Speaker) error {
f.Lock()
f.connected++
f.Unlock()
Expand All @@ -62,6 +62,8 @@ func (f *fakeServerSubscriptionHandler) OnConnected(c Speaker) {

return nil
}, retry.Delay(10*time.Millisecond))

return nil
}

func (f *fakeServerSubscriptionHandler) OnMessage(c Speaker, m Message) {
Expand All @@ -70,11 +72,11 @@ func (f *fakeServerSubscriptionHandler) OnMessage(c Speaker, m Message) {
f.Unlock()
}

func (f *fakeClientSubscriptionHandler) OnConnected(c Speaker) {
func (f *fakeClientSubscriptionHandler) OnConnected(c Speaker) error {
f.Lock()
f.connected++
f.Unlock()
c.SendMessage(RawMessage{})
return c.SendMessage(RawMessage{})
}

func (f *fakeClientSubscriptionHandler) OnMessage(c Speaker, m Message) {
Expand Down
15 changes: 10 additions & 5 deletions topology/probes/libvirt/libvirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (
"strings"
"sync"

libvirt "github.com/digitalocean/go-libvirt"

"github.com/skydive-project/skydive/graffiti/filters"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/probe"
"github.com/skydive-project/skydive/topology"
"github.com/skydive-project/skydive/topology/probes"

libvirt "github.com/digitalocean/go-libvirt"
"github.com/skydive-project/skydive/graffiti/graph"
tp "github.com/skydive-project/skydive/topology/probes"
)

Expand Down Expand Up @@ -411,8 +412,12 @@ func (probe *Probe) Do(ctx context.Context, wg *sync.WaitGroup) error {
func NewProbe(ctx tp.Context, bundle *probe.Bundle) (probe.Handler, error) {
uri := ctx.Config.GetString("agent.topology.libvirt.url")
probe := &Probe{
Ctx: ctx,
tunProcessor: graph.NewProcessor(ctx.Graph, ctx.Graph, graph.Metadata{"Type": "tun"}, "Name"),
Ctx: ctx,
tunProcessor: graph.NewProcessor(ctx.Graph, ctx.Graph, graph.NewElementFilter(
filters.NewOrFilter(
filters.NewTermStringFilter("Type", "tun"),
filters.NewTermStringFilter("Type", "tuntap"),
)), "Name"),
interfaceMap: make(map[string]*Interface),
uri: uri,
}
Expand Down
4 changes: 1 addition & 3 deletions topology/probes/libvirt/libvirtgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,10 @@ func newMonitor(ctx context.Context, probe *Probe, wg *sync.WaitGroup) (*Libvirt
return nil, fmt.Errorf("Could not register the device added event handler %s", err)
}

wg.Add(2)
wg.Add(1)

disconnected := make(chan error, 1)
conn.RegisterCloseCallback(func(conn *libvirtgo.Connect, reason libvirtgo.ConnectCloseReason) {
defer wg.Done()

monitor.Stop()
disconnected <- errors.New("disconnected from libvirt")
})
Expand Down

0 comments on commit 64561f1

Please sign in to comment.