@@ -3,7 +3,7 @@ package go_rabbitmq
33import (
44 "errors"
55 "fmt"
6- "github.com/streadway/amqp "
6+ amqp "github.com/rabbitmq/amqp091-go "
77 "net/url"
88 "time"
99)
@@ -55,10 +55,11 @@ var (
5555 errAlreadyClosed = errors .New ("already closed: not connected to the server" )
5656 errShutdown = errors .New ("session is shutting down" )
5757 errFailedToPush = errors .New ("failed to push: not connected" )
58+ err error
5859)
5960
6061// New 创建一个新的消费者状态实例,并自动尝试连接到服务器
61- func New (config * Config , queueName , exchange , routeKey string , exchangeType , prefetchCount int , durable bool ) * RabbitMQ {
62+ func New (config * Config , queueName , exchange , routeKey string , exchangeType , prefetchCount int , durable bool ) ( * RabbitMQ , error ) {
6263 // amqp 出现url.Parse导致的错误 是因为特殊字符需要进行urlencode编码
6364 password := url .QueryEscape (config .Password )
6465 // amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
@@ -89,8 +90,12 @@ func New(config *Config, queueName, exchange, routeKey string, exchangeType, pre
8990 PrefetchCount : prefetchCount ,
9091 Durable : durable ,
9192 }
93+ rabbitmq .conn , err = rabbitmq .connect (addr )
94+ if err := rabbitmq .init (rabbitmq .conn ); err != nil {
95+ return nil , err
96+ }
9297 go rabbitmq .handleReconnect (rabbitmq .Addr )
93- return rabbitmq
98+ return rabbitmq , nil
9499}
95100
96101// handleReconnect 将在notifyConnClose上等待连接错误,然后不断尝试重新连接。
0 commit comments