Skip to content

Commit 8065636

Browse files
feature: add support for rocketMQ (#169)
1 parent 8d55340 commit 8065636

27 files changed

+1503
-0
lines changed

.github/workflows/plugin-tests.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ jobs:
9696
- discard-reporter
9797
- fiber
9898
- echov4
99+
- rocketmq
99100
steps:
100101
- uses: actions/checkout@v2
101102
with:

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Release Notes.
1111
* Support setting a discard type of reporter.
1212
* Add `redis.max_args_bytes` parameter for redis plugin.
1313
* Changing intercept point for gin, make sure interfaces could be grouped when params defined in relativePath.
14+
* Support [RocketMQ](https://github.com/apache/rocketmq-client-go) MQ.
1415

1516
#### Documentation
1617

docs/en/agent/support-plugins.md

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ metrics based on the tracing data.
2727
* [MySQL Driver](https://github.com/go-sql-driver/mysql) tested v1.4.0 to v1.7.1.
2828
* Cache Client
2929
* `go-redisv9`: [go-redis](https://github.com/redis/go-redis) tested v9.0.3 to v9.0.5.
30+
* MQ Client
31+
* `rocketMQ`: [rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested v2.1.2.
3032

3133
# Metrics Plugins
3234
The meter plugin provides the advanced metrics collections.

go.work

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use (
2323
./plugins/fasthttp
2424
./plugins/fiber
2525
./plugins/echov4
26+
./plugins/rocketmq
2627

2728
./test/benchmark-codebase/consumer
2829
./test/benchmark-codebase/provider
@@ -52,6 +53,7 @@ use (
5253
./test/plugins/scenarios/plugin_exclusion
5354
./test/plugins/scenarios/runtime_metrics
5455
./test/plugins/scenarios/echov4
56+
./test/plugins/scenarios/rocketmq
5557

5658
./tools/go-agent
5759

plugins/core/tracing/span.go

+2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ const (
6060
TagMQQueue = "mq.queue"
6161
TagMQBroker = "mq.broker"
6262
TagMQTopic = "mq.topic"
63+
TagMQStatus = "mq.status"
64+
TagMQMsgID = "mq.msg.id"
6365
TagCacheType = "cache.type"
6466
TagCacheOp = "cache.op"
6567
TagCacheCmd = "cache.cmd"

plugins/rocketmq/consumer/consumer.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Licensed to Apache Software Foundation (ASF) under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Apache Software Foundation (ASF) licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package consumer
19+
20+
import (
21+
"strings"
22+
23+
"github.com/apache/rocketmq-client-go/v2/consumer"
24+
"github.com/apache/rocketmq-client-go/v2/primitive"
25+
26+
"github.com/apache/skywalking-go/plugins/core/operator"
27+
"github.com/apache/skywalking-go/plugins/core/tracing"
28+
)
29+
30+
const (
31+
rmqConsumerComponentID = 39
32+
rmqConsumerPrefix = "RocketMQ/"
33+
rmqConsumerSuffix = "/Consumer"
34+
tagMQMsgID = "mq.msg.id"
35+
tagMQOffsetMsgID = "mq.offset.msg.id"
36+
semicolon = ";"
37+
)
38+
39+
type SwConsumerInterceptor struct {
40+
}
41+
42+
func (c *SwConsumerInterceptor) BeforeInvoke(invocation operator.Invocation) error {
43+
pushConsumer := invocation.CallerInstance().(*nativepushConsumer)
44+
peer := strings.Join(pushConsumer.client.GetNameSrv().AddrList(), semicolon)
45+
subMsgs := invocation.Args()[1].([]*primitive.MessageExt)
46+
if len(subMsgs) == 0 {
47+
return nil
48+
}
49+
topic, addr := subMsgs[0].Topic, subMsgs[0].StoreHost
50+
operationName := rmqConsumerPrefix + topic + rmqConsumerSuffix
51+
52+
var (
53+
span tracing.Span
54+
err error
55+
)
56+
for _, msg := range subMsgs {
57+
span, err = tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) {
58+
return msg.GetProperty(headerKey), nil
59+
},
60+
tracing.WithLayer(tracing.SpanLayerMQ),
61+
tracing.WithComponent(rmqConsumerComponentID),
62+
tracing.WithTag(tracing.TagMQTopic, topic),
63+
tracing.WithTag(tagMQMsgID, msg.MsgId),
64+
tracing.WithTag(tagMQOffsetMsgID, msg.OffsetMsgId),
65+
)
66+
if err != nil {
67+
return err
68+
}
69+
}
70+
span.Tag(tracing.TagMQBroker, addr)
71+
span.SetPeer(peer)
72+
invocation.SetContext(span)
73+
return nil
74+
}
75+
76+
func (c *SwConsumerInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error {
77+
if invocation.GetContext() == nil {
78+
return nil
79+
}
80+
span := invocation.GetContext().(tracing.Span)
81+
if err, ok := result[1].(error); ok && err != nil {
82+
span.Error(err.Error())
83+
}
84+
if consumeRet, ok := result[0].(consumer.ConsumeResult); ok {
85+
span.Tag(tracing.TagMQStatus, SwConsumerStatusStr(consumeRet))
86+
if consumer.ConsumeSuccess != consumeRet {
87+
span.Error()
88+
}
89+
}
90+
span.End()
91+
return nil
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed to Apache Software Foundation (ASF) under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Apache Software Foundation (ASF) licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package consumer
19+
20+
import (
21+
"fmt"
22+
23+
"github.com/apache/rocketmq-client-go/v2/consumer"
24+
)
25+
26+
func SwConsumerStatusStr(status consumer.ConsumeResult) string {
27+
switch status {
28+
case consumer.ConsumeSuccess:
29+
return "ConsumeSuccess"
30+
case consumer.ConsumeRetryLater:
31+
return "ConsumeRetryLater"
32+
case consumer.Commit:
33+
return "Commit"
34+
case consumer.Rollback:
35+
return "Rollback"
36+
case consumer.SuspendCurrentQueueAMoment:
37+
return "SuspendCurrentQueueAMoment"
38+
default:
39+
return fmt.Sprintf("%d", status)
40+
}
41+
}
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to Apache Software Foundation (ASF) under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Apache Software Foundation (ASF) licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package consumer
19+
20+
//skywalking:native github.com/apache/rocketmq-client-go/v2/consumer pushConsumer
21+
type nativepushConsumer struct {
22+
*nativedefaultConsumer
23+
}
24+
25+
//skywalking:native github.com/apache/rocketmq-client-go/v2/consumer defaultConsumer
26+
type nativedefaultConsumer struct {
27+
client nativeRMQClient
28+
}
29+
30+
//skywalking:native github.com/apache/rocketmq-client-go/v2/internal RMQClient
31+
type nativeRMQClient interface {
32+
GetNameSrv() nativeNamesrvs
33+
}
34+
35+
//skywalking:native github.com/apache/rocketmq-client-go/v2/internal Namesrvs
36+
type nativeNamesrvs interface {
37+
FindBrokerAddrByName(brokerName string) string
38+
AddrList() []string
39+
}

plugins/rocketmq/go.mod

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
module github.com/apache/skywalking-go/plugins/rocketmq
2+
3+
go 1.18
4+
5+
require (
6+
github.com/apache/rocketmq-client-go/v2 v2.1.2 // indirect
7+
github.com/emirpasic/gods v1.12.0 // indirect
8+
github.com/golang/mock v1.3.1 // indirect
9+
github.com/google/uuid v1.3.0 // indirect
10+
github.com/json-iterator/go v1.1.12 // indirect
11+
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
12+
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
13+
github.com/modern-go/reflect2 v1.0.2 // indirect
14+
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
15+
github.com/pkg/errors v0.8.1 // indirect
16+
github.com/sirupsen/logrus v1.4.0 // indirect
17+
github.com/tidwall/gjson v1.13.0 // indirect
18+
github.com/tidwall/match v1.1.1 // indirect
19+
github.com/tidwall/pretty v1.2.0 // indirect
20+
go.uber.org/atomic v1.5.1 // indirect
21+
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
22+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
23+
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
24+
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect
25+
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
26+
stathat.com/c/consistent v1.0.0 // indirect
27+
)

0 commit comments

Comments
 (0)