Skip to content

Commit 295cba2

Browse files
committed
bugfix: fix broker cannot connect caused by invalid ipv6 format
1 parent 33fe267 commit 295cba2

File tree

3 files changed

+164
-2
lines changed

3 files changed

+164
-2
lines changed

consumer/consumer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
971971
req := &internal.GetConsumerListRequestHeader{
972972
ConsumerGroup: dc.consumerGroup,
973973
}
974+
brokerAddr = utils.AdaptIPv6(brokerAddr) // adapt IPv6 address to a format that can be used with net.DialContext
974975
cmd := remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
975976
res, err := dc.client.InvokeSync(context.Background(), brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
976977
if err != nil {

internal/utils/net.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ package utils
2020
import (
2121
"bytes"
2222
"fmt"
23-
"github.com/apache/rocketmq-client-go/v2/errors"
2423
"net"
2524
"strconv"
25+
"strings"
2626
"time"
27+
28+
"github.com/apache/rocketmq-client-go/v2/errors"
2729
)
2830

2931
var (
@@ -68,3 +70,42 @@ func FakeIP() []byte {
6870
func GetAddressByBytes(data []byte) string {
6971
return net.IPv4(data[0], data[1], data[2], data[3]).String()
7072
}
73+
74+
75+
// AdaptIPv6 adapts an IPv6 address to a format that can be used with net.DialContext, it will return the original address if the address is not a valid IPv6 address
76+
// addr must contain a valid IPv6 address and a port number
77+
func AdaptIPv6(addr string) string {
78+
var host, port string
79+
var err error
80+
host, port, err = net.SplitHostPort(addr)
81+
if err != nil {
82+
// if the address is not in the format of host:port, return the original address
83+
if addrErr, ok := err.(*net.AddrError); ok && addrErr.Err == "too many colons in address" {
84+
if i := strings.LastIndex(addr, ":"); i > 0 {
85+
host = addr[:i]
86+
port = addr[i+1:]
87+
}
88+
} else {
89+
return addr
90+
}
91+
}
92+
93+
if _, err := strconv.Atoi(port); err != nil {
94+
// if the port is not a valid port, return the original address
95+
return addr
96+
}
97+
98+
if ip := net.ParseIP(host); ip == nil || ip.To4() != nil {
99+
// if the host is not a valid IP address or is an IPv4 address, return the original address
100+
return addr
101+
}
102+
103+
// if the host is in the format of [host]:port, return the original address
104+
if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
105+
return net.JoinHostPort(host[1:len(host)-1], port)
106+
}
107+
108+
109+
return net.JoinHostPort(host, port)
110+
111+
}

internal/utils/net_test.go

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,128 @@ limitations under the License.
1717

1818
package utils
1919

20-
import "testing"
20+
import (
21+
"testing"
22+
)
2123

2224
func TestLocalIP2(t *testing.T) {
2325
t.Log(LocalIP)
2426
}
27+
28+
func TestAdaptIPv6(t *testing.T) {
29+
tests := []struct {
30+
name string
31+
input string
32+
expected string
33+
}{
34+
{
35+
name: "valid IPv6 address with port",
36+
input: "2001:db8::1:8080",
37+
expected: "[2001:db8::1]:8080",
38+
},
39+
{
40+
name: "valid IPv6 address with port - short format",
41+
input: "::1:8080",
42+
expected: "[::1]:8080",
43+
},
44+
{
45+
name: "valid IPv6 address with port - full format",
46+
input: "2001:0db8:85a3:0000:0000:8a2e:0370:7334:8080",
47+
expected: "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8080",
48+
},
49+
{
50+
name: "already bracketed IPv6 address",
51+
input: "[2001:db8::1]:8080",
52+
expected: "[2001:db8::1]:8080",
53+
},
54+
{
55+
name: "already bracketed IPv6 address - short format",
56+
input: "[::1]:8080",
57+
expected: "[::1]:8080",
58+
},
59+
{
60+
name: "IPv4 address should return original",
61+
input: "192.168.1.1:8080",
62+
expected: "192.168.1.1:8080",
63+
},
64+
{
65+
name: "IPv4 address should return original - localhost",
66+
input: "127.0.0.1:8080",
67+
expected: "127.0.0.1:8080",
68+
},
69+
{
70+
name: "invalid address format - no port",
71+
input: "2001:db8::1",
72+
expected: "2001:db8::1",
73+
},
74+
{
75+
name: "invalid address format - invalid port",
76+
input: "2001:db8::1:invalid",
77+
expected: "2001:db8::1:invalid",
78+
},
79+
{
80+
name: "invalid address format - empty string",
81+
input: "",
82+
expected: "",
83+
},
84+
{
85+
name: "invalid address format - port only",
86+
input: ":8080",
87+
expected: ":8080",
88+
},
89+
{
90+
name: "invalid address format - colon only",
91+
input: ":",
92+
expected: ":",
93+
},
94+
{
95+
name: "invalid address format - multiple colons but no valid IPv6",
96+
input: "invalid:address:8080",
97+
expected: "invalid:address:8080",
98+
},
99+
{
100+
name: "domain name should return original",
101+
input: "localhost:8080",
102+
expected: "localhost:8080",
103+
},
104+
{
105+
name: "domain name should return original - with dots",
106+
input: "example.com:8080",
107+
expected: "example.com:8080",
108+
},
109+
{
110+
name: "IPv6 address with large port number",
111+
input: "2001:db8::1:65535",
112+
expected: "[2001:db8::1]:65535",
113+
},
114+
{
115+
name: "IPv6 address with small port number",
116+
input: "2001:db8::1:1",
117+
expected: "[2001:db8::1]:1",
118+
},
119+
{
120+
name: "IPv6 address with zero port number",
121+
input: "2001:db8::1:0",
122+
expected: "[2001:db8::1]:0",
123+
},
124+
{
125+
name: "complex IPv6 address",
126+
input: "fe80::1%lo0:8080",
127+
expected: "fe80::1%lo0:8080",
128+
},
129+
{
130+
name: "IPv6 address with zone identifier",
131+
input: "fe80::1%eth0:8080",
132+
expected: "fe80::1%eth0:8080",
133+
},
134+
}
135+
136+
for _, tt := range tests {
137+
t.Run(tt.name, func(t *testing.T) {
138+
result := AdaptIPv6(tt.input)
139+
if result != tt.expected {
140+
t.Errorf("AdaptIPv6(%q) = %q, expected %q", tt.input, result, tt.expected)
141+
}
142+
})
143+
}
144+
}

0 commit comments

Comments
 (0)