1
- package com .alibabacloud .mse .demo ;
1
+ package com .alibabacloud .mse .demo . a . mq ;
2
2
3
- import com .alibabacloud .mse .demo .service .MqConsumer ;
4
3
import lombok .RequiredArgsConstructor ;
5
4
import lombok .extern .slf4j .Slf4j ;
6
5
import org .apache .rocketmq .client .consumer .DefaultMQPushConsumer ;
7
6
import org .apache .rocketmq .client .exception .MQClientException ;
8
7
import org .apache .rocketmq .common .consumer .ConsumeFromWhere ;
8
+ import org .springframework .beans .factory .annotation .Autowired ;
9
+ import org .springframework .beans .factory .annotation .Qualifier ;
9
10
import org .springframework .beans .factory .annotation .Value ;
11
+ import org .springframework .cloud .commons .util .InetUtils ;
10
12
import org .springframework .context .annotation .Bean ;
11
13
import org .springframework .context .annotation .Configuration ;
14
+ import org .springframework .web .client .RestTemplate ;
12
15
13
16
@ Slf4j
14
17
@ Configuration
@@ -24,7 +27,15 @@ public class RocketMqConfiguration {
24
27
@ Value ("${rocketmq.consumer.topic}" )
25
28
private String topic ;
26
29
27
- private final MqConsumer mqConsumer ;
30
+ @ Autowired
31
+ @ Qualifier ("loadBalancedRestTemplate" )
32
+ private RestTemplate restTemplate ;
33
+
34
+ @ Autowired
35
+ private InetUtils inetUtils ;
36
+
37
+ @ Autowired
38
+ private String serviceTag ;
28
39
29
40
static {
30
41
System .setProperty ("rocketmq.client.log.loadconfig" , "false" );
@@ -37,6 +48,12 @@ public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
37
48
consumer .setNamesrvAddr (nameSrvAddr );
38
49
consumer .subscribe (topic , "*" );
39
50
consumer .setConsumeFromWhere (ConsumeFromWhere .CONSUME_FROM_FIRST_OFFSET );
51
+
52
+ MqConsumer mqConsumer = new MqConsumer (
53
+ restTemplate ,
54
+ inetUtils ,
55
+ serviceTag
56
+ );
40
57
consumer .registerMessageListener (mqConsumer );
41
58
log .info ("完成启动rocketMq的consumer,subscribe:{}" , topic );
42
59
return consumer ;
0 commit comments