Skip to content

Commit f2fb16d

Browse files
woaishixiaoxiaoshixiaoxiao
andauthored
unmarshalMsgID support ipv6 (#1203)
Co-authored-by: shixiaoxiao <[email protected]>
1 parent 3010ce9 commit f2fb16d

File tree

3 files changed

+54
-8
lines changed

3 files changed

+54
-8
lines changed

internal/utils/net.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package utils
2020
import (
2121
"bytes"
2222
"fmt"
23-
"github.com/apache/rocketmq-client-go/v2/errors"
2423
"net"
2524
"strconv"
2625
"time"
26+
27+
"github.com/apache/rocketmq-client-go/v2/errors"
2728
)
2829

2930
var (
@@ -66,5 +67,5 @@ func FakeIP() []byte {
6667
}
6768

6869
func GetAddressByBytes(data []byte) string {
69-
return net.IPv4(data[0], data[1], data[2], data[3]).String()
70+
return net.IP(data).String()
7071
}

primitive/message.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -482,19 +482,46 @@ func UnmarshalMsgID(id []byte) (*MessageID, error) {
482482
if len(id) < 32 {
483483
return nil, fmt.Errorf("%s len < 32", string(id))
484484
}
485+
485486
var (
486487
ipBytes = make([]byte, 4)
487488
portBytes = make([]byte, 4)
488489
offsetBytes = make([]byte, 8)
489490
)
490-
hex.Decode(ipBytes, id[0:8])
491-
hex.Decode(portBytes, id[8:16])
492-
hex.Decode(offsetBytes, id[16:32])
491+
if len(id) == 32 {
492+
hex.Decode(ipBytes, id[0:8])
493+
hex.Decode(portBytes, id[8:16])
494+
hex.Decode(offsetBytes, id[16:32])
495+
} else {
496+
ipBytes = make([]byte, 16)
497+
portBytes = make([]byte, 4)
498+
offsetBytes = make([]byte, 8)
499+
hex.Decode(ipBytes, id[0:32])
500+
hex.Decode(portBytes, id[32:40])
501+
hex.Decode(offsetBytes, id[40:56])
502+
}
503+
504+
addr := utils.GetAddressByBytes(ipBytes)
505+
port := int(binary.BigEndian.Uint32(portBytes))
506+
offset := int64(binary.BigEndian.Uint64(offsetBytes))
507+
508+
if addr == "" {
509+
return nil, fmt.Errorf("addr is empty")
510+
}
511+
512+
if port < 0 || port > 65535 {
513+
return nil, fmt.Errorf("port > 65535, acutal port is %d", port)
514+
}
515+
516+
if len(id) != 32 {
517+
// DialContext require ipv6 format: [ipv6]:port
518+
addr = fmt.Sprintf("[%s]", addr)
519+
}
493520

494521
return &MessageID{
495-
Addr: utils.GetAddressByBytes(ipBytes),
496-
Port: int(binary.BigEndian.Uint32(portBytes)),
497-
Offset: int64(binary.BigEndian.Uint64(offsetBytes)),
522+
Addr: addr,
523+
Port: port,
524+
Offset: offset,
498525
}, nil
499526
}
500527

primitive/message_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,24 @@ func TestMessageID(t *testing.T) {
3737
t.Log(msgID)
3838
}
3939

40+
func TestIpv6MessageID(t *testing.T) {
41+
id := []byte("FDBDDC4100010136C800000000000024000078BF0000000000004F45")
42+
msgID, err := UnmarshalMsgID(id)
43+
if err != nil {
44+
t.Fatalf("unmarshal msg id error, ms is: %s", err.Error())
45+
}
46+
if msgID.Addr != "[fdbd:dc41:1:136:c800::24]" {
47+
t.Fatalf("parse messageID %s error", id)
48+
}
49+
if msgID.Port != 30911 {
50+
t.Fatalf("parse messageID %s error", id)
51+
}
52+
if msgID.Offset != 20293 {
53+
t.Fatalf("parse messageID %s error", id)
54+
}
55+
t.Log(msgID)
56+
}
57+
4058
func TestMessageKey(t *testing.T) {
4159
msg := &Message{}
4260
expected := "testKey"

0 commit comments

Comments
 (0)