Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

Commit 80dce95

Browse files
authored
Refactor Reacter plugin (#509)
1 parent 1274451 commit 80dce95

File tree

13 files changed

+180
-200
lines changed

13 files changed

+180
-200
lines changed

cmd/event-gateway/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ func main() {
9797
}
9898

9999
// Plugin manager
100-
pluginManager := plugin.NewManager(plugins, log)
101-
err = pluginManager.Connect()
100+
pluginManager, err := plugin.NewManager(plugins, log)
102101
if err != nil {
103102
log.Fatal("Loading plugins failed.", zap.Error(err))
104103
}

docs/system-events-and-plugin-system.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ further processed by the Event Gateway.
4747

4848
`React` method is called for every system event that plugin subscribed to.
4949

50-
For more details, see [the example plugin](../plugin/example).
50+
For more details, see [the example plugin](../plugin/example/reacter).

plugin/example/main.go

Lines changed: 0 additions & 23 deletions
This file was deleted.

plugin/example/reacter/main.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package main
2+
3+
import (
4+
goplugin "github.com/hashicorp/go-plugin"
5+
"github.com/serverless/event-gateway/plugin"
6+
"github.com/serverless/event-gateway/plugin/shared"
7+
)
8+
9+
func main() {
10+
pluginMap := map[string]goplugin.Plugin{
11+
"reacter": &plugin.ReacterRPCPlugin{Reacter: &Simple{}},
12+
}
13+
14+
goplugin.Serve(&goplugin.ServeConfig{
15+
HandshakeConfig: shared.Handshake,
16+
Plugins: pluginMap,
17+
})
18+
}

plugin/example/plugin.go renamed to plugin/example/reacter/plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (s *Simple) React(instance event.Event) error {
3232
switch instance.EventType {
3333
case event.SystemEventReceivedType:
3434
received := instance.Data.(event.SystemEventReceivedData)
35-
log.Printf("received gateway.received.event for event: %q", received.Event.EventType)
35+
log.Printf("received %s for event: %q", event.SystemEventReceivedType, received.Event.EventType)
3636
}
3737

3838
return nil

plugin/manager.go

Lines changed: 42 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
goplugin "github.com/hashicorp/go-plugin"
1010
"github.com/serverless/event-gateway/event"
11+
"github.com/serverless/event-gateway/plugin/shared"
1112
)
1213

1314
func init() {
@@ -18,84 +19,85 @@ func init() {
1819
gob.Register(event.SystemFunctionInvocationFailedData{})
1920
}
2021

22+
// Type of a subscription.
23+
type Type int
24+
25+
const (
26+
// Async subscription type. Plugin host will not block on it.
27+
Async Type = iota
28+
// Sync subscription type. Plugin host will use the response from the plugin before proceeding.
29+
Sync
30+
)
31+
2132
// Plugin is a generic struct for storing info about a plugin.
2233
type Plugin struct {
23-
Path string
24-
Client *goplugin.Client
25-
Reacter Reacter
26-
Subscriptions []Subscription
34+
Path string
35+
Reacter Reacter
2736
}
2837

2938
// Manager handles lifecycle of plugin management.
3039
type Manager struct {
31-
Plugins []*Plugin
32-
Log *zap.Logger
40+
Reacters []*Plugin
41+
Log *zap.Logger
3342
}
3443

3544
// NewManager creates new Manager.
36-
func NewManager(paths []string, log *zap.Logger) *Manager {
37-
plugins := []*Plugin{}
45+
func NewManager(paths []string, log *zap.Logger) (*Manager, error) {
46+
reacters := []*Plugin{}
3847
logger := Hclog2ZapLogger{Zap: log}
48+
3949
for _, path := range paths {
4050
client := goplugin.NewClient(&goplugin.ClientConfig{
41-
HandshakeConfig: handshakeConfig,
51+
HandshakeConfig: shared.Handshake,
4252
Plugins: pluginMap,
4353
Cmd: exec.Command(path),
4454
Logger: logger.Named("PluginManager"),
55+
Managed: true,
4556
})
4657

47-
plugins = append(plugins, &Plugin{
48-
Client: client,
49-
Path: path,
50-
})
51-
}
52-
53-
return &Manager{
54-
Plugins: plugins,
55-
Log: log,
56-
}
57-
}
58-
59-
// Connect connects to plugins.
60-
func (m *Manager) Connect() error {
61-
for _, plugin := range m.Plugins {
62-
rpcClient, err := plugin.Client.Client()
58+
rpcClient, err := client.Client()
6359
if err != nil {
64-
return err
60+
return nil, err
6561
}
6662

6763
// Request the plugin
68-
raw, err := rpcClient.Dispense("subscriber")
64+
raw, err := rpcClient.Dispense("reacter")
6965
if err != nil {
70-
return err
66+
return nil, err
7167
}
7268

73-
plugin.Reacter = raw.(*Subscriber)
74-
plugin.Subscriptions = plugin.Reacter.Subscriptions()
69+
switch raw.(type) {
70+
case *ReacterClient:
71+
reacters = append(reacters, &Plugin{
72+
Path: path,
73+
Reacter: raw.(*ReacterClient),
74+
})
75+
}
7576
}
7677

77-
return nil
78+
return &Manager{
79+
Reacters: reacters,
80+
Log: log,
81+
}, nil
7882
}
7983

8084
// Kill disconnects plugins and kill subprocesses.
8185
func (m *Manager) Kill() {
82-
for _, plugin := range m.Plugins {
83-
plugin.Client.Kill()
84-
}
86+
goplugin.CleanupClients()
8587
}
8688

8789
// React call all plugins' React method. It returns when the first error is returned by a plugin.
8890
func (m *Manager) React(event *event.Event) error {
89-
for _, plugin := range m.Plugins {
90-
for _, subscription := range plugin.Subscriptions {
91-
if subscription.EventType == event.EventType {
91+
for _, plugin := range m.Reacters {
92+
for _, sub := range plugin.Reacter.Subscriptions() {
93+
if sub.EventType == event.EventType {
9294
err := plugin.Reacter.React(*event)
9395
if err != nil {
9496
m.Log.Debug("Plugin returned error.",
9597
zap.String("plugin", plugin.Path),
9698
zap.Error(err),
97-
zap.String("subscriptionType", string(subscription.Type)))
98-
if subscription.Type == Sync {
99+
zap.String("subscriptionType", string(sub.Type)))
100+
if sub.Type == Sync {
99101
return err
100102
}
101103
}
@@ -106,19 +108,7 @@ func (m *Manager) React(event *event.Event) error {
106108
return nil
107109
}
108110

109-
// handshakeConfig is used to just do a basic handshake between a plugin and host. If the handshake fails, a user
110-
// friendly error is shown. This prevents users from executing bad plugins or executing a plugin directory. It is a UX
111-
// feature, not a security feature.
112-
var handshakeConfig = goplugin.HandshakeConfig{
113-
// The ProtocolVersion is the version that must match between EG core and EG plugins. This should be bumped whenever
114-
// a change happens in one or the other that makes it so that they can't safely communicate.
115-
ProtocolVersion: 1,
116-
// The magic cookie values should NEVER be changed.
117-
MagicCookieKey: "EVENT_GATEWAY_MAGIC_COOKIE",
118-
MagicCookieValue: "0329c93c-a64c-4eb5-bf72-63172430d433",
119-
}
120-
121111
// pluginMap is the map of plugins we can dispense.
122112
var pluginMap = map[string]goplugin.Plugin{
123-
"subscriber": &SubscriberPlugin{},
113+
"reacter": &ReacterRPCPlugin{},
124114
}

plugin/plugin.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

plugin/reacter.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package plugin
2+
3+
import (
4+
"net/rpc"
5+
6+
goplugin "github.com/hashicorp/go-plugin"
7+
eventpkg "github.com/serverless/event-gateway/event"
8+
)
9+
10+
// Subscription use by plugin to indicate which event it wants to react to.
11+
type Subscription struct {
12+
EventType eventpkg.TypeName
13+
Type
14+
}
15+
16+
// Reacter allows reacting on subscribed event types.
17+
type Reacter interface {
18+
Subscriptions() []Subscription
19+
React(event eventpkg.Event) error
20+
}
21+
22+
// ReacterRPCPlugin is the go-plugin's Plugin implementation.
23+
type ReacterRPCPlugin struct {
24+
Reacter Reacter
25+
}
26+
27+
// Server hosts ReacterServer.
28+
func (r *ReacterRPCPlugin) Server(*goplugin.MuxBroker) (interface{}, error) {
29+
return &ReacterServer{Reacter: r.Reacter}, nil
30+
}
31+
32+
// Client provides ReacterClient client.
33+
func (r *ReacterRPCPlugin) Client(b *goplugin.MuxBroker, c *rpc.Client) (interface{}, error) {
34+
return &ReacterClient{client: c}, nil
35+
}
36+
37+
// ReacterServer is a net/rpc compatibile structure for serving a Reacter.
38+
type ReacterServer struct {
39+
Reacter Reacter
40+
}
41+
42+
// Subscriptions server implementation.
43+
func (r *ReacterServer) Subscriptions(_ interface{}, resp *ReacterSubscriptionsResponse) error {
44+
*resp = ReacterSubscriptionsResponse{Subscriptions: r.Reacter.Subscriptions()}
45+
return nil
46+
}
47+
48+
// React server implementation.
49+
func (r *ReacterServer) React(args *ReacterReactArgs, resp *ReacterReactResponse) error {
50+
err := r.Reacter.React(args.Event)
51+
52+
*resp = ReacterReactResponse{Error: goplugin.NewBasicError(err)}
53+
return nil
54+
}
55+
56+
// ReacterClient is a RPC implementation of Reacter.
57+
type ReacterClient struct {
58+
client *rpc.Client
59+
}
60+
61+
// Subscriptions call plugin implementation.
62+
func (r *ReacterClient) Subscriptions() []Subscription {
63+
var resp ReacterSubscriptionsResponse
64+
err := r.client.Call("Plugin.Subscriptions", new(interface{}), &resp)
65+
if err != nil {
66+
return []Subscription{}
67+
}
68+
69+
return resp.Subscriptions
70+
}
71+
72+
// React calls plugin implementation.
73+
func (r *ReacterClient) React(event eventpkg.Event) error {
74+
args := &ReacterReactArgs{Event: event}
75+
var resp ReacterReactResponse
76+
err := r.client.Call("Plugin.React", args, &resp)
77+
if err != nil {
78+
return err
79+
}
80+
if resp.Error != nil {
81+
err = resp.Error
82+
}
83+
return err
84+
}
85+
86+
// ReacterSubscriptionsResponse RPC response
87+
type ReacterSubscriptionsResponse struct {
88+
Subscriptions []Subscription
89+
}
90+
91+
// ReacterReactArgs RPC args
92+
type ReacterReactArgs struct {
93+
Event eventpkg.Event
94+
}
95+
96+
// ReacterReactResponse RPC response
97+
type ReacterReactResponse struct {
98+
Error *goplugin.BasicError
99+
}

plugin/shared/handshake.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Package shared contains shared data between the host and plugins.
2+
package shared
3+
4+
import "github.com/hashicorp/go-plugin"
5+
6+
// Handshake is a common handshake that is shared by plugin and host.
7+
var Handshake = plugin.HandshakeConfig{
8+
// The ProtocolVersion is the version that must match between EG core and EG plugins. This should be bumped whenever
9+
// a change happens in one or the other that makes it so that they can't safely communicate.
10+
ProtocolVersion: 1,
11+
// The magic cookie values should NEVER be changed.
12+
MagicCookieKey: "EVENT_GATEWAY_MAGIC_COOKIE",
13+
MagicCookieValue: "0329c93c-a64c-4eb5-bf72-63172430d433",
14+
}

0 commit comments

Comments
 (0)