들어가기
order-service에서 주문 내역을 저장하는 부분을 JPA와 Kafka Connect를 이용한 두 가지 방법으로 구현했었다. Kafka를 사용했을 때의 장점은 카프카가 DB로의 저장 요청을 대신 수행하므로 어플리케이션에서는 Kafka로 요청을 보내고 클라이언트에게 바로 응답을 할 수 있다는 점이다.
OrderJpaService
@Service
@Transactional
@RequiredArgsConstructor
public class OrderJpaService implements OrderService {
private final OrderRepository orderRepository;
private final OrderProductRepository orderProductRepository;
private final StoreClient storeClient;
@Override
public void save(RequestOrder requestOrder) {
Order order = saveOrderJpa(requestOrder);
saveOrderProductJpa(order, requestOrder);
}
private Order saveOrderJpa(RequestOrder requestOrder) {
Map<Long, Integer> productQuantityMap = requestOrder.getProductQuantityMap();
Order order = OrderMapper.INSTANCE.toOrder(requestOrder);
// store-service 로 Product 검색
List<ResponseProduct> productList = getResponseProductList(productQuantityMap);
// 상품이 없을 때
if (productList == null || productList.size() == 0) throw new NoProductException("상품 조회 실패");
// 재고가 충분한지 확인
for (ResponseProduct dbProduct : productList) {
if (dbProduct.getQuantity() < productQuantityMap.get(dbProduct.getId()))
throw new QuantityException("재고 부족");
}
// 주문 금액 합계 계산
int sum = getSum(productQuantityMap, productList);
order.setSumPrice(sum);
return orderRepository.save(order);
}
private void saveOrderProductJpa(Order order, RequestOrder requestOrder) {
Map<Long, Integer> productQuantityMap = requestOrder.getProductQuantityMap();
// 주문 받은대로 저장
List<OrderProduct> OrderProductList = productQuantityMap.entrySet().stream()
.map(ks ->
OrderProduct.builder()
.order(order)
.productId(ks.getKey())
.quantity(ks.getValue())
.build()
)
.toList();
orderProductRepository.saveAll(OrderProductList); // DB 저장
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
storeClient.updateQuantity(productQuantityMap, String.valueOf(authentication.getCredentials())); // 수량 갱신
}
@Override
public List<ResponseOrder> getOrderList(Long userId) {
return null;
}
// store-service로 상품 정보 조회 요청
private List<ResponseProduct> getResponseProductList(Map<Long, Integer> productQuantityMap) {
// store-service 로 Product 검색
List<Long> productIdList = productQuantityMap.keySet().stream().toList();
return storeClient.getProductList(productIdList);
}
}
OrderKafkaService
@Slf4j
@RequiredArgsConstructor
@Transactional
@Service
public class OrderKafkaService implements OrderService {
private final OrderRepository orderRepository;
private final StoreClient storeClient;
private final OrderProducer orderProducer;
private final OrderProductProducer orderProductProducer;
@Override
public void save(RequestOrder requestOrder) {
Long orderId = saveOrderJpa(requestOrder);
saveOrderProductKafka(orderId, requestOrder);
}
// JPA로 주문 저장
private Long saveOrderJpa(RequestOrder requestOrder) {
Map<Long, Integer> productQuantityMap = requestOrder.getProductQuantityMap();
Order order = OrderMapper.INSTANCE.toOrder(requestOrder);
// store-service 로 Product 검색
List<ResponseProduct> productList = getResponseProductList(productQuantityMap);
// 상품이 없을 때
if (productList == null || productList.size() == 0) throw new NoProductException("상품 조회 실패");
// 재고가 충분한지 확인
for (ResponseProduct dbProduct : productList) {
if (dbProduct.getQuantity() < productQuantityMap.get(dbProduct.getId()))
throw new QuantityException("재고 부족");
}
// 주문 금액 합계 계산
int sum = getSum(productQuantityMap, productList);
order.setSumPrice(sum);
return orderRepository.save(order).getId();
}
private void saveOrderProductKafka(Long orderId, RequestOrder requestOrder) {
Map<Long, Integer> productQuantityMap = requestOrder.getProductQuantityMap();
List<OrderProductDto> orderProductDtoList = productQuantityMap.keySet().stream()
.map(productId -> new OrderProductDto(orderId, productId, productQuantityMap.get(productId)))
.toList();
orderProductProducer.send(orderProductDtoList);
}
@Override
public List<ResponseOrder> getOrderList(Long userId) {
List<Order> foundOrder = orderRepository.findAllByUserIdOrderByCreatedAtDesc(userId);
return foundOrder.stream().map(OrderMapper.INSTANCE::toResponseOrder).toList();
}
// store-service로 상품 정보 조회 요청
private List<ResponseProduct> getResponseProductList(Map<Long, Integer> productQuantityMap) {
// store-service 로 Product 검색
List<Long> productIdList = productQuantityMap.keySet().stream().toList();
return storeClient.getProductList(productIdList);
}
}
주문과 상품 테이블에 저장할 때는 주문 테이블의 id가 필요한데 Kafka는 비동기적으로 처리하기 때문에 모두 카프카를 통해서 처리하게 되면 외래키 오류가 발생하게 된다. 그렇기 때문에 어쩔 수 없이 주문의 경우만 JPA를 써서 저장을 하고 반환받은 id를 사용하여 주문과 상품 테이블에 저장하도록 설정했다.
KafkaProducerConfig
@EnableKafka
@RequiredArgsConstructor
@Configuration
public class KafkaProducerConfig {
private final Environment environment;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
environment.getProperty("spring.cloud.stream.kafka.binder.brokers[0]"));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
OrderProductProducer
@Slf4j
@RequiredArgsConstructor
@Service
public class OrderProductProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final static String TOPIC_NAME = "order_products";
List<Field> fields = Arrays.asList(
new Field("int64", false, "order_id"),
new Field("int64", false, "product_id"),
new Field("int32", false, "quantity")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("order_products")
.build();
public void send(List<OrderProductDto> orderProductDtoList) {
for (OrderProductDto orderProductDto : orderProductDtoList) {
OrderProductPayload payload = OrderProductPayload.builder()
.order_id(orderProductDto.getOrderId())
.product_id(orderProductDto.getProductId())
.quantity(orderProductDto.getQuantity())
.build();
KafkaOrderProductDto kafkaOrderProductDto = new KafkaOrderProductDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonToString = "";
try {
jsonToString = mapper.writeValueAsString(kafkaOrderProductDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(TOPIC_NAME, jsonToString);
}
}
}
order_products 테이블의 스키마에 맞게 토픽에 저장될 내용을 구성한다.
응답 속도 차이
JPA
JPA로 주문 내역을 저장한 경우는 60초반 ~ 70초반 ms만큼 걸리는 것을 확인할 수 있었다.
Kafka로 주문 내역을 저장할 때는 20후반 ~ 30후반 ms만큼의 시간이 걸렸다.
해결해야 하는 문제
- 수량 확인 부분이 매끄럽지 않음
- 트랜잭션 처리를 고민해봐야 함 - 이벤트 소싱, SAGA 패턴에 대해서 더 공부해 봐야겠다.
- 더 큰 성능 최적화를 위해 CQRS에 대해 공부해 볼 필요가 있다.
- 성능 모니터링을 위해 프로메테우스나 그라파나에 관해 공부해봐야겠다.
'공부 > Spring Cloud' 카테고리의 다른 글
딜리버드이츠 최종 결과물 발표 (0) | 2024.06.30 |
---|---|
딜리버드이츠 프로젝트 기획 발표 자료 (0) | 2024.06.27 |
[Spring Cloud] SameSite와 Secure로 프론트 백엔드 쿠키 교환하기 (0) | 2024.06.24 |
[Spring Cloud][GCP] NGINX로 리버스 프록시 구현하기 (0) | 2024.06.24 |
[Spring Cloud][GCP] Gateway에서 HTTPS 사용하기 (pem -> p12) (0) | 2024.06.23 |