Skip to content

Commit 54ed115

Browse files
feat : add putOrder
refactor: modify rmq config
1 parent aaa5274 commit 54ed115

File tree

6 files changed

+113
-52
lines changed

6 files changed

+113
-52
lines changed

src/main/java/com/github/messageconsumer/config/RabbitMQConfig.java

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package com.github.messageconsumer.config;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
34
import org.springframework.amqp.core.Binding;
45
import org.springframework.amqp.core.BindingBuilder;
56
import org.springframework.amqp.core.DirectExchange;
67
import org.springframework.amqp.core.Queue;
78
import org.springframework.amqp.core.TopicExchange;
9+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
810
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
911
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
1012
import org.springframework.amqp.rabbit.core.RabbitTemplate;
1113
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
1214
import org.springframework.amqp.support.converter.MessageConverter;
15+
import org.springframework.beans.factory.annotation.Value;
1316
import org.springframework.context.annotation.Bean;
1417
import org.springframework.context.annotation.Configuration;
1518

@@ -24,6 +27,24 @@ public class RabbitMQConfig {
2427
public static final List<String> QUEUE_NAMES = Arrays.asList("postCart", "putCart", "postOrder", "putOrder", "postPayment", "putPayment");
2528
public static final List<String> ROUTING_KEYS = Arrays.asList("postCart", "putCart", "postOrder", "putOrder", "postPayment", "putPayment");
2629

30+
@Value("${rabbitmq.host}")
31+
private String rmqHost;
32+
33+
@Value("${rabbitmq.username}")
34+
private String rmqUsername;
35+
36+
@Value("${rabbitmq.password}")
37+
private String rmqPassword;
38+
39+
@Bean
40+
public CachingConnectionFactory cachingConnectionFactory() {
41+
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rmqHost);
42+
connectionFactory.setUsername(rmqUsername);
43+
connectionFactory.setPassword(rmqPassword);
44+
// 다양한 설정 (캐싱 옵션, 포트, 가상 호스트 등)을 설정할 수 있습니다.
45+
return connectionFactory;
46+
}
47+
2748
@Bean
2849
public DirectExchange exchange() {
2950
return new DirectExchange(EXCHANGE_NAME);
@@ -48,21 +69,35 @@ public List<Binding> bindings(List<Queue> queues, DirectExchange exchange) {
4869
return bindings;
4970
}
5071

72+
Jackson2JsonMessageConverter messageConverter(ObjectMapper mapper){
73+
var converter = new Jackson2JsonMessageConverter(mapper);
74+
converter.setCreateMessageIds(true); //create a unique message id for every message
75+
return converter;
76+
}
77+
5178
@Bean
52-
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
53-
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
54-
// JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
55-
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
56-
return rabbitTemplate;
79+
public RabbitTemplate rabbitTemplate(ConnectionFactory factory, ObjectMapper objectMapper){
80+
RabbitTemplate template = new RabbitTemplate();
81+
template.setConnectionFactory(factory);
82+
template.setMessageConverter(messageConverter(objectMapper));
83+
return template;
5784
}
5885

59-
/**
60-
* Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
61-
*
62-
* @return MessageConverter 객체
63-
*/
6486
@Bean
65-
public MessageConverter jackson2JsonMessageConverter() {
66-
return new Jackson2JsonMessageConverter();
87+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
88+
ConnectionFactory connectionFactory,
89+
ObjectMapper objectMapper
90+
) {
91+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
92+
factory.setConnectionFactory(connectionFactory);
93+
factory.setMessageConverter(messageConverter(objectMapper));
94+
// prefetch 설정 (한 번에 가져올 메시지 개수)
95+
factory.setPrefetchCount(10); // 예: 10개의 메시지를 미리 가져옴
96+
97+
// channel 개수 설정 (동시에 처리할 메시지 수)
98+
factory.setConcurrentConsumers(5); // 예: 5개의 채널을 사용하여 메시지 처리
99+
factory.setMaxConcurrentConsumers(10);
100+
return factory;
67101
}
102+
68103
}

src/main/java/com/github/messageconsumer/dto/CartRmqDto.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,5 @@ public class CartRmqDto {
2121
private Integer totalPrice;
2222
private String options;
2323

24-
public static CartRmqDto fromEntityForPost(Cart cart){
25-
return CartRmqDto.builder()
26-
.userId(cart.getUsers().getId())
27-
.productId(cart.getProducts().getId())
28-
.isOrdered(cart.getIsOrdered())
29-
.quantity(cart.getQuantity())
30-
.options(cart.getOptions())
31-
.build();
32-
}
33-
34-
public static CartRmqDto fromEntityForModify(Cart cart){
35-
return CartRmqDto.builder()
36-
.cartId(cart.getId())
37-
.userId(cart.getUsers().getId())
38-
.productId(cart.getProducts().getId())
39-
.isOrdered(cart.getIsOrdered())
40-
.quantity(cart.getQuantity())
41-
.options(cart.getOptions())
42-
.build();
43-
}
4424

4525
}

src/main/java/com/github/messageconsumer/dto/OrderRmqDto.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,5 @@ public class OrderRmqDto {
2121
private Integer total_price;
2222
private String options;
2323

24-
public static OrderRmqDto fromEntity(Order order){
25-
Product product = order.getProducts();
26-
return OrderRmqDto.builder()
27-
.cartId(order.getCarts().getId())
28-
.userId(order.getUsers().getId())
29-
.productId(product.getId())
30-
.quantity(order.getQuantity())
31-
.total_price(order.getTotal_price())
32-
.orderState(order.getOrderState())
33-
.options(order.getOptions())
34-
.build();
35-
}
3624

3725
}

src/main/java/com/github/messageconsumer/service/cart/CartConsumerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class CartConsumerService {
2323
private final CartRepository cartRepository;
2424
private final ValidatCartMethod validatCartMethod;
2525

26-
@RabbitListener(queues = "postCart")
26+
@RabbitListener(queues = "postCart", containerFactory = "rabbitListenerContainerFactory")
2727
public void postCartQueue(CartRmqDto cartRmqDto, Message message, Channel channel) throws IOException {
2828
try {
2929
Product validatedProduct = validatCartMethod.validateProduct(cartRmqDto.getProductId());
@@ -62,7 +62,7 @@ public void postCartQueue(CartRmqDto cartRmqDto, Message message, Channel channe
6262
}
6363
}
6464

65-
@RabbitListener(queues = "putCart")
65+
@RabbitListener(queues = "putCart", containerFactory = "rabbitListenerContainerFactory")
6666
public void putCartQueue(CartRmqDto cartRmqDto, Message message, Channel channel) throws IOException {
6767
try {
6868
Cart validatedCart = validatCartMethod.validateCart(cartRmqDto.getCartId(), cartRmqDto.getUserId());

src/main/java/com/github/messageconsumer/service/order/OrderCosumerService.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717

1818
import java.io.IOException;
1919
import java.time.LocalDateTime;
20+
import java.util.List;
21+
import java.util.Map;
22+
2023
@Slf4j
2124
@RequiredArgsConstructor
2225
@Service
2326
public class OrderCosumerService {
2427
private final ValidateOrderMethod validateOrderMethod;
2528
private final OrderRepository orderRepository;
2629
private final CartRepository cartRepository;
27-
@RabbitListener(queues = "postOrder")
30+
@RabbitListener(queues = "postOrder", containerFactory = "rabbitListenerContainerFactory")
2831
public void postOrderQueue(OrderRmqDto orderRmqDto, Message message, Channel channel) throws IOException {
2932
try {
3033
Product validatedProduct = validateOrderMethod.validateProduct(orderRmqDto.getProductId());
@@ -69,6 +72,61 @@ public void postOrderQueue(OrderRmqDto orderRmqDto, Message message, Channel cha
6972
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
7073
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
7174
} else {
75+
//큐에 아래의 argument 추가. dlq 로의 publish가 중복될 수도 있어 명시적으로 publish하는 코드 일단 주석처리.
76+
//x-dead-letter-exchange: dlqExchange
77+
//x-dead-letter-routing-key: dlq.postOrder
78+
// 최대 재시도 횟수를 초과하면 메시지를 DLQ로 보냅니다.
79+
// String dlqExchange = "dlqExchange"; // DLQ로 메시지를 보낼 Exchange 이름
80+
// String dlqRoutingKey = "dlq.postOrder"; // DLQ로 메시지를 보낼 Routing Key
81+
// channel.basicPublish(dlqExchange, dlqRoutingKey, null, message.getBody());
82+
83+
// 최대 재시도 횟수를 초과하면 메시지를 버립니다.
84+
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
85+
log.error("Max retries exceeded. Discarding message.");
86+
}
87+
}
88+
}
89+
90+
@RabbitListener(queues = "putOrder", containerFactory = "rabbitListenerContainerFactory")
91+
public void putOrderQueue(OrderRmqDto orderRmqDto, Message message, Channel channel) throws IOException {
92+
try {
93+
Product validatedProduct = validateOrderMethod.validateProduct(orderRmqDto.getProductId());
94+
Order validatedOrder = validateOrderMethod.validateOrder(orderRmqDto.getOrderId(), orderRmqDto.getUserId());
95+
Integer inputQuantity = orderRmqDto.getQuantity();
96+
String inputOptions = orderRmqDto.getOptions();
97+
validateOrderMethod.validateStock(inputQuantity, validatedProduct);
98+
99+
validatedOrder.setQuantity(inputQuantity);
100+
validatedOrder.setOptions(inputOptions);
101+
102+
orderRepository.save(
103+
validatedOrder
104+
);
105+
} catch (Exception e) {
106+
log.error("Error processing cart message: " + e.getMessage(), e);
107+
108+
// 재시도 제한 설정
109+
int maxRetries = 3; // 최대 재시도 횟수 설정
110+
Integer retries = (Integer) message.getMessageProperties().getHeader("x-retries");
111+
112+
if (retries < maxRetries) {
113+
// 재시도 횟수 증가
114+
retries++;
115+
message.getMessageProperties().setHeader("x-retries", retries);
116+
117+
// 일정 시간 후 재시도
118+
long delayMillis = 5000; // 5초 대기
119+
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
120+
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
121+
} else {
122+
//큐에 아래의 argument 추가. dlq 로의 publish가 중복될 수도 있어 명시적으로 publish하는 코드 일단 주석처리.
123+
//x-dead-letter-exchange: dlqExchange
124+
//x-dead-letter-routing-key: dlq.postOrder
125+
// 최대 재시도 횟수를 초과하면 메시지를 DLQ로 보냅니다.
126+
// String dlqExchange = "dlqExchange"; // DLQ로 메시지를 보낼 Exchange 이름
127+
// String dlqRoutingKey = "dlq.postOrder"; // DLQ로 메시지를 보낼 Routing Key
128+
// channel.basicPublish(dlqExchange, dlqRoutingKey, null, message.getBody());
129+
72130
// 최대 재시도 횟수를 초과하면 메시지를 버립니다.
73131
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
74132
log.error("Max retries exceeded. Discarding message.");

src/main/resources/application.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ spring:
3333
ddl-auto: update
3434
show-sql: true
3535

36-
rabbitmq:
37-
host: ${RMQ_HOST}
38-
port: 5672
39-
username: ${RMQ_USER}
40-
password: ${RMQ_PASSWORD}
36+
rabbitmq:
37+
host: ${RMQ_HOST}
38+
port: 5672
39+
username: ${RMQ_USER}
40+
password: ${RMQ_PASSWORD}

0 commit comments

Comments
 (0)