Skip to content

Commit

Permalink
Backend Plugins: Support handling of streaming resource response (gra…
Browse files Browse the repository at this point in the history
…fana#22580)

Use v0.19.0 of SDK.
Support handling of streaming resource response.
Disable gzip/compression middleware for resources 
to allow chunked/streaming response to clients the gzip
middleware had to be disabled since it buffers the full
response before sending it to the client.

Closes grafana#22569

Co-Authored-By: Arve Knudsen <[email protected]>
  • Loading branch information
marefr and aknuds1 authored Mar 5, 2020
1 parent f95c8b7 commit 4ff613a
Show file tree
Hide file tree
Showing 17 changed files with 517 additions and 240 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/gorilla/websocket v1.4.1
github.com/gosimple/slug v1.4.2
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.16.0
github.com/grafana/grafana-plugin-sdk-go v0.19.0
github.com/hashicorp/go-hclog v0.8.0
github.com/hashicorp/go-plugin v1.0.1
github.com/hashicorp/go-version v1.1.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,8 @@ github.com/gosimple/slug v1.4.2 h1:jDmprx3q/9Lfk4FkGZtvzDQ9Cj9eAmsjzeQGp24PeiQ=
github.com/gosimple/slug v1.4.2/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.16.0 h1:fuoLzsQLs0RKcvXDP/cAQxaZGP1rbnoBwUaY/1yvh6k=
github.com/grafana/grafana-plugin-sdk-go v0.16.0/go.mod h1:D1MkO+4EPCWc1Wrr260hq7wbo7Ox0grnNWBygulq7aM=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grafana/grafana-plugin-sdk-go v0.19.0 h1:qLq8tOSxZ9O7+AdduXJVU6jEOlg/2eP8UXdhAzQ81g0=
github.com/grafana/grafana-plugin-sdk-go v0.19.0/go.mod h1:G6Ov9M+FDOZXNw8eKXINO6XzqdUvTs7huwyQp5jLTBQ=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
github.com/hashicorp/go-hclog v0.8.0 h1:z3ollgGRg8RjfJH6UVBaG54R70GFd++QOkvnJH3VSBY=
github.com/hashicorp/go-hclog v0.8.0/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response {
}

payload := map[string]interface{}{
"status": resp.Status.String(),
"info": resp.Info,
"status": resp.Status.String(),
"message": resp.Message,
}

if resp.Status != backendplugin.HealthStatusOk {
Expand Down
7 changes: 7 additions & 0 deletions pkg/middleware/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"gopkg.in/macaron.v1"
)

const resourcesPath = "/resources"

func Gziper() macaron.Handler {
gziperLogger := log.New("gziper")
gziper := gzip.Gziper()
Expand All @@ -27,6 +29,11 @@ func Gziper() macaron.Handler {
return
}

// ignore resources
if (strings.HasPrefix(requestPath, "/api/datasources/") || strings.HasPrefix(requestPath, "/api/plugins/")) && strings.Contains(requestPath, resourcesPath) {
return
}

if _, err := ctx.Invoke(gziper); err != nil {
gziperLogger.Error("Invoking gzip handler failed", "err", err)
}
Expand Down
31 changes: 15 additions & 16 deletions pkg/plugins/backendplugin/backend_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *BackendPlugin) start(ctx context.Context) error {
if rawBackend != nil {
if plugin, ok := rawBackend.(CorePlugin); ok {
p.core = plugin
client.DatasourcePlugin = plugin
client.CorePlugin = plugin
}
}

Expand Down Expand Up @@ -186,8 +186,8 @@ func (p *BackendPlugin) checkHealth(ctx context.Context) (*pluginv2.CheckHealth_
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &pluginv2.CheckHealth_Response{
Status: pluginv2.CheckHealth_Response_UNKNOWN,
Info: "Health check not implemented",
Status: pluginv2.CheckHealth_Response_UNKNOWN,
Message: "Health check not implemented",
}, nil
}
}
Expand All @@ -197,9 +197,13 @@ func (p *BackendPlugin) checkHealth(ctx context.Context) (*pluginv2.CheckHealth_
return res, nil
}

func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceRequest) (*CallResourceResult, error) {
func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceRequest) (callResourceResultStream, error) {
p.logger.Debug("Calling resource", "path", req.Path, "method", req.Method)

if p.core == nil || p.client == nil || p.client.Exited() {
return nil, errors.New("plugin not running, cannot call resource")
}

reqHeaders := map[string]*pluginv2.CallResource_StringList{}
for k, v := range req.Headers {
reqHeaders[k] = &pluginv2.CallResource_StringList{Values: v}
Expand Down Expand Up @@ -238,28 +242,23 @@ func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceReques
}
}

protoResp, err := p.core.CallResource(ctx, protoReq)
protoStream, err := p.core.CallResource(ctx, protoReq)
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &CallResourceResult{
Status: http.StatusNotImplemented,
return &singleCallResourceResult{
result: &CallResourceResult{
Status: http.StatusNotImplemented,
},
}, nil
}
}

return nil, errutil.Wrap("Failed to call resource", err)
}

respHeaders := map[string][]string{}
for key, values := range protoResp.Headers {
respHeaders[key] = values.Values
}

return &CallResourceResult{
Headers: respHeaders,
Body: protoResp.Body,
Status: int(protoResp.Code),
return &callResourceResultStreamImpl{
stream: protoStream,
}, nil
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/plugins/backendplugin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,11 @@ func NewRendererPluginDescriptor(pluginID, executablePath string, startFns Plugi
}

type DiagnosticsPlugin interface {
CollectMetrics(ctx context.Context, req *pluginv2.CollectMetrics_Request) (*pluginv2.CollectMetrics_Response, error)
CheckHealth(ctx context.Context, req *pluginv2.CheckHealth_Request) (*pluginv2.CheckHealth_Response, error)
}

type DatasourcePlugin interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error)
plugin.DiagnosticsServer
}

type CorePlugin interface {
CallResource(ctx context.Context, req *pluginv2.CallResource_Request) (*pluginv2.CallResource_Response, error)
DatasourcePlugin
plugin.CoreClient
}

type TransformPlugin interface {
Expand All @@ -127,6 +121,6 @@ type LegacyClient struct {

// Client client for communicating with a plugin using the current plugin protocol.
type Client struct {
DatasourcePlugin DatasourcePlugin
TransformPlugin TransformPlugin
CorePlugin CorePlugin
TransformPlugin TransformPlugin
}
51 changes: 47 additions & 4 deletions pkg/plugins/backendplugin/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (hs HealthStatus) String() string {

// CheckHealthResult check health result.
type CheckHealthResult struct {
Status HealthStatus
Info string
Status HealthStatus
Message string
}

func checkHealthResultFromProto(protoResp *pluginv2.CheckHealth_Response) *CheckHealthResult {
Expand All @@ -51,8 +51,8 @@ func checkHealthResultFromProto(protoResp *pluginv2.CheckHealth_Response) *Check
}

return &CheckHealthResult{
Status: status,
Info: protoResp.Info,
Status: status,
Message: protoResp.Message,
}
}

Expand Down Expand Up @@ -91,3 +91,46 @@ type CallResourceResult struct {
Headers map[string][]string
Body []byte
}

type callResourceResultStream interface {
Recv() (*CallResourceResult, error)
Close() error
}

type callResourceResultStreamImpl struct {
stream pluginv2.Core_CallResourceClient
}

func (s *callResourceResultStreamImpl) Recv() (*CallResourceResult, error) {
protoResp, err := s.stream.Recv()
if err != nil {
return nil, err
}

respHeaders := map[string][]string{}
for key, values := range protoResp.Headers {
respHeaders[key] = values.Values
}

return &CallResourceResult{
Headers: respHeaders,
Body: protoResp.Body,
Status: int(protoResp.Code),
}, nil
}

func (s *callResourceResultStreamImpl) Close() error {
return s.stream.CloseSend()
}

type singleCallResourceResult struct {
result *CallResourceResult
}

func (s *singleCallResourceResult) Recv() (*CallResourceResult, error) {
return s.result, nil
}

func (s *singleCallResourceResult) Close() error {
return nil
}
58 changes: 44 additions & 14 deletions pkg/plugins/backendplugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backendplugin
import (
"context"
"errors"
"io"
"sync"
"time"

Expand Down Expand Up @@ -209,30 +210,59 @@ func (m *manager) CallResource(config PluginConfig, c *models.ReqContext, path s
Body: body,
}

res, err := p.callResource(clonedReq.Context(), req)
stream, err := p.callResource(clonedReq.Context(), req)
if err != nil {
c.JsonApiErr(500, "Failed to call resource", err)
return
}

// Make sure a content type always is returned in response
if _, exists := res.Headers["Content-Type"]; !exists {
res.Headers["Content-Type"] = []string{"application/json"}
}
processedStreams := 0

for k, values := range res.Headers {
if k == "Set-Cookie" {
continue
for {
resp, err := stream.Recv()
if err == io.EOF {
if processedStreams == 0 {
c.JsonApiErr(500, "Received empty resource response ", nil)
}
return
}
if err != nil {
if processedStreams == 0 {
c.JsonApiErr(500, "Failed to receive response from resource call", err)
} else {
p.logger.Error("Failed to receive response from resource call", "error", err)
}
return
}

// Expected that headers and status are only part of first stream
if processedStreams == 0 {
// Make sure a content type always is returned in response
if _, exists := resp.Headers["Content-Type"]; !exists {
resp.Headers["Content-Type"] = []string{"application/json"}
}

for k, values := range resp.Headers {
// Due to security reasons we don't want to forward
// cookies from a backend plugin to clients/browsers.
if k == "Set-Cookie" {
continue
}

for _, v := range values {
c.Resp.Header().Add(k, v)
}
}

c.WriteHeader(resp.Status)
}

for _, v := range values {
c.Resp.Header().Add(k, v)
if _, err := c.Write(resp.Body); err != nil {
p.logger.Error("Failed to write resource response", "error", err)
}
}

c.WriteHeader(res.Status)
if _, err := c.Write(res.Body); err != nil {
p.logger.Error("Failed to write resource response", "error", err)
c.Resp.Flush()
processedStreams++
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)

func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, plugin backendplugin.DatasourcePlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{DatasourcePlugin: plugin, logger: log, pluginId: pluginId, pluginType: pluginType}
func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, plugin backendplugin.CorePlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{CorePlugin: plugin, logger: log, pluginId: pluginId, pluginType: pluginType}
}

type DatasourcePluginWrapperV2 struct {
backendplugin.DatasourcePlugin
backendplugin.CorePlugin
logger log.Logger
pluginId string
pluginType string
Expand Down Expand Up @@ -68,7 +68,7 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS
})
}

pbRes, err := tw.DatasourcePlugin.DataQuery(ctx, pbQuery)
pbRes, err := tw.CorePlugin.DataQuery(ctx, pbQuery)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/datasource_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *backendp
}

func (p *DataSourcePlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error {
if client.DatasourcePlugin != nil {
if client.CorePlugin != nil {
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.DatasourcePlugin), nil
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.CorePlugin), nil
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/transform_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendP
func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error {
p.TransformWrapper = NewTransformWrapper(logger, client.TransformPlugin)

if client.DatasourcePlugin != nil {
if client.CorePlugin != nil {
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.DatasourcePlugin), nil
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.CorePlugin), nil
})
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4ff613a

Please sign in to comment.