forked from dapr/components-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into feat_vault_secret_store_fix
* master: (23 commits) Add bindings dingtalk webhook (dapr#817) add BulkGetResponse comments (dapr#845) Fix race in pulsar pubsub component (dapr#868) Add pubsub rocketmq (dapr#816) Add bindings rocketmq (dapr#815) Add bindings nacos (dapr#814) Sentinel middleware support (dapr#829) Added vaultToken support (dapr#796) Update Aerospike to v4.5.0 Add function to generate CloudEvent for raw subscribed events. (dapr#864) Configurable retry (dapr#854) Update redis metadata comments to reflect the new maxRetry default from v8 (dapr#856) fix calc ipv6num use ipv4 slice (dapr#838) Do not block indefinitely on XGroupRead for redis subscriber (dapr#855) fix go mod fix lint fix add add a ctx context variable in the Redis struct. And implement `Close() error` function. fix add add a ctx context variable in the Redis struct. And implemet `Close() error` function fix use r.ctx instead of context.Background() in pubsub redis component Fixing go.mod ... # Conflicts: # secretstores/hashicorp/vault/vault.go # secretstores/hashicorp/vault/vault_test.go
- Loading branch information
Showing
55 changed files
with
3,638 additions
and
182 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// ------------------------------------------------------------ | ||
// Copyright (c) Microsoft Corporation and Dapr Contributors. | ||
// Licensed under the MIT License. | ||
// ------------------------------------------------------------ | ||
|
||
// DingTalk webhook are a simple way to post messages from apps into DingTalk | ||
// | ||
// See https://developers.dingtalk.com/document/app/custom-robot-access for details | ||
|
||
package webhook | ||
|
||
import ( | ||
"errors" | ||
|
||
"github.com/dapr/components-contrib/internal/config" | ||
) | ||
|
||
type Settings struct { | ||
ID string `mapstructure:"id"` | ||
URL string `mapstructure:"url"` | ||
Secret string `mapstructure:"secret"` | ||
} | ||
|
||
func (s *Settings) Decode(in interface{}) error { | ||
return config.Decode(in, s) | ||
} | ||
|
||
func (s *Settings) Validate() error { | ||
if s.ID == "" { | ||
return errors.New("webhook error: missing webhook id") | ||
} | ||
if s.URL == "" { | ||
return errors.New("webhook error: missing webhook url") | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
// ------------------------------------------------------------ | ||
// Copyright (c) Microsoft Corporation and Dapr Contributors. | ||
// Licensed under the MIT License. | ||
// ------------------------------------------------------------ | ||
|
||
package webhook | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestSettingsDecode(t *testing.T) { //nolint:paralleltest | ||
props := map[string]string{ | ||
"url": "a", | ||
"secret": "b", | ||
"id": "c", | ||
} | ||
|
||
var settings Settings | ||
err := settings.Decode(props) | ||
require.NoError(t, err) | ||
assert.Equal(t, "a", settings.URL) | ||
assert.Equal(t, "b", settings.Secret) | ||
assert.Equal(t, "c", settings.ID) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
// ------------------------------------------------------------ | ||
// Copyright (c) Microsoft Corporation and Dapr Contributors. | ||
// Licensed under the MIT License. | ||
// ------------------------------------------------------------ | ||
|
||
package webhook | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/hmac" | ||
"crypto/sha256" | ||
"encoding/base64" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/dapr/components-contrib/bindings" | ||
"github.com/dapr/kit/logger" | ||
) | ||
|
||
const ( | ||
webhookContentType = "application/json" | ||
defaultHTTPClientTimeout = time.Second * 30 | ||
) | ||
|
||
type DingTalkWebhook struct { | ||
logger logger.Logger | ||
settings Settings | ||
httpClient *http.Client | ||
} | ||
|
||
type webhookResult struct { | ||
ErrCode int `json:"errcode"` | ||
ErrMsg string `json:"errmsg"` | ||
} | ||
|
||
type outgoingWebhook struct { | ||
handler func(*bindings.ReadResponse) ([]byte, error) | ||
} | ||
|
||
var webhooks = struct { // nolint: gochecknoglobals | ||
sync.RWMutex | ||
m map[string]*outgoingWebhook | ||
}{m: make(map[string]*outgoingWebhook)} | ||
|
||
func NewDingTalkWebhook(l logger.Logger) *DingTalkWebhook { | ||
// See guidance on proper HTTP client settings here: | ||
// https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779 | ||
dialer := &net.Dialer{ //nolint:exhaustivestruct | ||
Timeout: 5 * time.Second, | ||
} | ||
var netTransport = &http.Transport{ //nolint:exhaustivestruct | ||
DialContext: dialer.DialContext, | ||
TLSHandshakeTimeout: 5 * time.Second, | ||
} | ||
httpClient := &http.Client{ //nolint:exhaustivestruct | ||
Timeout: defaultHTTPClientTimeout, | ||
Transport: netTransport, | ||
} | ||
|
||
return &DingTalkWebhook{ //nolint:exhaustivestruct | ||
logger: l, | ||
httpClient: httpClient, | ||
} | ||
} | ||
|
||
// Init performs metadata parsing | ||
func (t *DingTalkWebhook) Init(metadata bindings.Metadata) error { | ||
var err error | ||
if err = t.settings.Decode(metadata.Properties); err != nil { | ||
return fmt.Errorf("dingtalk configuration error: %w", err) | ||
} | ||
if err = t.settings.Validate(); err != nil { | ||
return fmt.Errorf("dingtalk configuration error: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Read triggers the outgoing webhook, not yet production ready | ||
func (t *DingTalkWebhook) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { | ||
t.logger.Debugf("dingtalk webhook: start read input binding") | ||
|
||
webhooks.Lock() | ||
defer webhooks.Unlock() | ||
_, loaded := webhooks.m[t.settings.ID] | ||
if loaded { | ||
return fmt.Errorf("dingtalk webhook error: duplicate id %s", t.settings.ID) | ||
} | ||
webhooks.m[t.settings.ID] = &outgoingWebhook{handler: handler} | ||
|
||
return nil | ||
} | ||
|
||
// Operations returns list of operations supported by dingtalk webhook binding | ||
func (t *DingTalkWebhook) Operations() []bindings.OperationKind { | ||
return []bindings.OperationKind{bindings.CreateOperation, bindings.GetOperation} | ||
} | ||
|
||
func (t *DingTalkWebhook) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { | ||
rst := &bindings.InvokeResponse{Metadata: map[string]string{}, Data: nil} | ||
switch req.Operation { | ||
case bindings.CreateOperation: | ||
return rst, t.sendMessage(req) | ||
case bindings.GetOperation: | ||
return rst, t.receivedMessage(req) | ||
case bindings.DeleteOperation, bindings.ListOperation: | ||
return rst, fmt.Errorf("dingtalk webhook error: unsupported operation %s", req.Operation) | ||
default: | ||
return rst, fmt.Errorf("dingtalk webhook error: unsupported operation %s", req.Operation) | ||
} | ||
} | ||
|
||
func (t *DingTalkWebhook) getOutgoingWebhook() (*outgoingWebhook, error) { | ||
webhooks.RLock() | ||
defer webhooks.RUnlock() | ||
item, loaded := webhooks.m[t.settings.ID] | ||
if !loaded { | ||
return nil, fmt.Errorf("dingtalk webhook error: invalid component metadata.id %s", t.settings.ID) | ||
} | ||
|
||
return item, nil | ||
} | ||
|
||
func (t *DingTalkWebhook) receivedMessage(req *bindings.InvokeRequest) error { | ||
item, err := t.getOutgoingWebhook() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
in := &bindings.ReadResponse{Data: req.Data, Metadata: req.Metadata} | ||
if _, err = item.handler(in); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (t *DingTalkWebhook) sendMessage(req *bindings.InvokeRequest) error { | ||
msg := req.Data | ||
|
||
postURL, err := getPostURL(t.settings.URL, t.settings.Secret) | ||
if err != nil { | ||
return fmt.Errorf("dingtalk webhook error: get url failed. %w", err) | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultHTTPClientTimeout) | ||
defer cancel() | ||
|
||
httpReq, err := http.NewRequestWithContext(ctx, "POST", postURL, bytes.NewReader(msg)) | ||
if err != nil { | ||
return fmt.Errorf("dingtalk webhook error: new request failed. %w", err) | ||
} | ||
|
||
httpReq.Header.Add("Accept", webhookContentType) | ||
httpReq.Header.Add("Content-Type", webhookContentType) | ||
|
||
resp, err := t.httpClient.Do(httpReq) | ||
if err != nil { | ||
return fmt.Errorf("dingtalk webhook error: post failed. %w", err) | ||
} | ||
defer func() { | ||
_ = resp.Body.Close() | ||
}() | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
return fmt.Errorf("dingtalk webhook error: post failed. status:%d", resp.StatusCode) | ||
} | ||
|
||
data, err := ioutil.ReadAll(resp.Body) | ||
if err != nil { | ||
return fmt.Errorf("dingtalk webhook error: read body failed. %w", err) | ||
} | ||
|
||
var rst webhookResult | ||
err = json.Unmarshal(data, &rst) | ||
if err != nil { | ||
return fmt.Errorf("dingtalk webhook error: unmarshal body failed. %w", err) | ||
} | ||
|
||
if rst.ErrCode != 0 { | ||
return fmt.Errorf("dingtalk webhook error: send msg failed. %v", rst.ErrMsg) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func getPostURL(urlPath, secret string) (string, error) { | ||
if secret == "" { | ||
return urlPath, nil | ||
} | ||
|
||
timestamp := strconv.FormatInt(time.Now().Unix()*1000, 10) | ||
sign, err := sign(secret, timestamp) | ||
if err != nil { | ||
return urlPath, err | ||
} | ||
|
||
query := url.Values{} | ||
query.Set("timestamp", timestamp) | ||
query.Set("sign", sign) | ||
|
||
return urlPath + "&" + query.Encode(), nil | ||
} | ||
|
||
func sign(secret, timestamp string) (string, error) { | ||
stringToSign := fmt.Sprintf("%s\n%s", timestamp, secret) | ||
h := hmac.New(sha256.New, []byte(secret)) | ||
if _, err := io.WriteString(h, stringToSign); err != nil { | ||
return "", fmt.Errorf("sign failed. %w", err) | ||
} | ||
|
||
return base64.StdEncoding.EncodeToString(h.Sum(nil)), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
// ------------------------------------------------------------ | ||
// Copyright (c) Microsoft Corporation and Dapr Contributors. | ||
// Licensed under the MIT License. | ||
// ------------------------------------------------------------ | ||
|
||
package webhook | ||
|
||
import ( | ||
"io/ioutil" | ||
"net/http" | ||
"net/http/httptest" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/dapr/components-contrib/bindings" | ||
"github.com/dapr/kit/logger" | ||
) | ||
|
||
func TestPublishMsg(t *testing.T) { //nolint:paralleltest | ||
msg := "{\"type\": \"text\",\"text\": {\"content\": \"hello\"}}" | ||
|
||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusOK) | ||
_, err := w.Write([]byte("{\"errcode\":0}")) | ||
require.NoError(t, err) | ||
if r.Method != "POST" { | ||
t.Errorf("Expected 'POST' request, got '%s'", r.Method) | ||
} | ||
if r.URL.EscapedPath() != "/test" { | ||
t.Errorf("Expected request to '/test', got '%s'", r.URL.EscapedPath()) | ||
} | ||
|
||
body, err := ioutil.ReadAll(r.Body) | ||
require.Nil(t, err) | ||
assert.Equal(t, msg, string(body)) | ||
})) | ||
defer ts.Close() | ||
|
||
m := bindings.Metadata{Name: "test", Properties: map[string]string{ | ||
"url": ts.URL + "/test", | ||
"secret": "", | ||
"id": "x", | ||
}} | ||
|
||
d := NewDingTalkWebhook(logger.NewLogger("test")) | ||
err := d.Init(m) | ||
require.NoError(t, err) | ||
|
||
req := &bindings.InvokeRequest{Data: []byte(msg), Operation: bindings.CreateOperation, Metadata: map[string]string{}} | ||
_, err = d.Invoke(req) | ||
require.NoError(t, err) | ||
} | ||
|
||
func TestBindingReadAndInvoke(t *testing.T) { //nolint:paralleltest | ||
msg := "{\"type\": \"text\",\"text\": {\"content\": \"hello\"}}" | ||
|
||
m := bindings.Metadata{Name: "test", | ||
Properties: map[string]string{ | ||
"url": "/test", | ||
"secret": "", | ||
"id": "x", | ||
}} | ||
|
||
d := NewDingTalkWebhook(logger.NewLogger("test")) | ||
err := d.Init(m) | ||
assert.NoError(t, err) | ||
|
||
var count int32 | ||
ch := make(chan bool, 1) | ||
|
||
handler := func(in *bindings.ReadResponse) ([]byte, error) { | ||
assert.Equal(t, msg, string(in.Data)) | ||
atomic.AddInt32(&count, 1) | ||
ch <- true | ||
|
||
return nil, nil | ||
} | ||
|
||
err = d.Read(handler) | ||
require.NoError(t, err) | ||
|
||
req := &bindings.InvokeRequest{Data: []byte(msg), Operation: bindings.GetOperation, Metadata: map[string]string{}} | ||
_, err = d.Invoke(req) | ||
require.NoError(t, err) | ||
|
||
select { | ||
case <-ch: | ||
require.True(t, atomic.LoadInt32(&count) > 0) | ||
case <-time.After(time.Second): | ||
require.FailNow(t, "read timeout") | ||
} | ||
} |
Oops, something went wrong.