CQRS 실습 – 주문/결제 도메인 만들기 (4편)

환경: Spring Boot 4.0.2 · Java 25 · MySQL 8.4 · Redis 7.4.7-alpine · Kafka(Docker) · IntelliJ
Jackson: import tools.jackson.databind.ObjectMapper

이번 편에서는 “결제 성공 → 주문 상태 PAID 반영 → 조회 모델 갱신”까지 이벤트 체인을 완성합니다. 토픽을 2개(order-events, payment-events)로 분리하고, PaymentCapturedEvent가 발생하면 order-command가 이를 소비해 MySQL의 주문을 PAID로 업데이트한 뒤 OrderPaidEvent를 다시 발행합니다. order-query는 order-events에 섞인 이벤트 타입을 안전하게 분기 소비해서 Redis Read Model을 갱신합니다.

핵심 요약

  • 이번 단계의 목표는 결제 이벤트를 연결해 주문 상태가 PAID까지 “끝까지 흐르도록” 만드는 것입니다.
  • Kafka 토픽을 2개(order-events / payment-events)로 분리해 도메인 흐름을 명확하게 합니다.
  • payment-commandPaymentCapturedEvent를 발행하면, order-command가 이를 소비해 MySQL 주문 상태를 PAID로 업데이트합니다.
  • 상태 변경 결과를 다시 OrderPaidEventorder-events에 발행하고, order-query는 이벤트 타입을 분기 소비해 Redis Read Model을 갱신합니다.
  • 중복 이벤트를 대비해 초급 idempotency(이미 PAID면 무시)를 1차 적용합니다.

이번 단계에서 달성한 것

  1. 토픽을 order-events / payment-events로 분리
  2. payment-command가 PaymentCapturedEvent 발행
  3. order-command가 결제 이벤트를 소비해 MySQL 주문 상태를 PAID로 업데이트
  4. order-command가 OrderPaidEventorder-events로 발행
  5. order-query가 order-events에 섞인 이벤트 타입(OrderCreated/OrderPaid)을 안전하게 분기 소비
  6. 초급 idempotency(중복 이벤트 방지) 1차 적용

목차


0) 토픽 2개로 나누자

이번 단계부터 이벤트 흐름을 명확히 하기 위해 Kafka 토픽을 2개로 분리합니다.

  • order-events: 주문 도메인 이벤트 (예: OrderCreated, OrderPaid)
  • payment-events: 결제 도메인 이벤트 (예: PaymentCaptured)

order-command application.yml

파일 위치: services/order-command/src/main/resources/application.yml


app:
  kafka:
    topics:
      order-events: order-events
      payment-events: payment-events

payment-command application.yml

파일 위치: services/payment-command/src/main/resources/application.yml


app:
  kafka:
    topics:
      payment-events: payment-events

order-query application.yml

파일 위치: services/order-query/src/main/resources/application.yml


app:
  kafka:
    topics:
      order-events: order-events

1) 이벤트 계약 추가: PaymentCapturedEvent

결제가 성공(CAPTURED)했음을 알리는 이벤트입니다. 이벤트는 서비스 간 계약이므로 libs/event-contracts에 둡니다.

파일 위치: libs/event-contracts/src/main/java/com/ilway/cqrslab/contracts/events/PaymentCapturedEvent.java


package com.ilway.cqrslab.contracts.events;

import java.time.Instant;
import java.util.UUID;

public record PaymentCapturedEvent(
  String eventId,
  String type,
  String occurredAt,
  String paymentId,
  String orderId,
  long amount,
  String status
) {

  public static PaymentCapturedEvent of(String paymentId, String orderId, long amount) {
    return new PaymentCapturedEvent(
      UUID.randomUUID().toString(),
      "PaymentCaptured",
      Instant.now().toString(),
      paymentId,
      orderId,
      amount,
      "CAPTURED"
    );
  }
}

2) payment-command: 결제 성공 시 payment-events 발행

payment-command는 결제 처리 결과가 CAPTURED로 확정되면 PaymentCapturedEventpayment-events 토픽으로 발행합니다.

2-1) Publisher

파일 위치: services/payment-command/src/main/java/com/ilway/cqrslab/paymentcommand/event/PaymentEventsPublisher.java


package com.ilway.cqrslab.paymentcommand.event;

import com.ilway.cqrslab.contracts.events.PaymentCapturedEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;

@Component
public class PaymentEventsPublisher {

  private final KafkaTemplate<String, String> kafkaTemplate;
  private final ObjectMapper objectMapper;
  private final String topic;

  public PaymentEventsPublisher(
    KafkaTemplate<String, String> kafkaTemplate,
    ObjectMapper objectMapper,
    @Value("${app.kafka.topics.payment-events:payment-events}") String topic
  ) {
    this.kafkaTemplate = kafkaTemplate;
    this.objectMapper = objectMapper;
    this.topic = topic;
  }

  public void publish(PaymentCapturedEvent event) {
    try {
      String key = event.orderId();
      String value = objectMapper.writeValueAsString(event);
      kafkaTemplate.send(topic, key, value);
    } catch (Exception e) {
      throw new IllegalStateException("Failed to publish PaymentCapturedEvent", e);
    }
  }
}

2-2) Service (이벤트 발행 위치)

파일 위치: services/payment-command/src/main/java/com/ilway/cqrslab/paymentcommand/application/PaymentCommandService.java


package com.ilway.cqrslab.paymentcommand.application;

import com.ilway.cqrslab.contracts.events.PaymentCapturedEvent;
import com.ilway.cqrslab.paymentcommand.api.PaymentCommandController.CreatePaymentResponse;
import com.ilway.cqrslab.paymentcommand.event.PaymentEventsPublisher;
import java.util.UUID;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class PaymentCommandService {

  private static final String PAYMENT_PREFIX = "p_";

  private final PaymentEventsPublisher publisher;

  public PaymentCommandService(PaymentEventsPublisher publisher) {
    this.publisher = publisher;
  }

  @Transactional
  public CreatePaymentResponse pay(String orderId, long amount) {
    // 1) 결제 처리(실습: 항상 성공 처리)
    String paymentId = PAYMENT_PREFIX + UUID.randomUUID();
    String status = "CAPTURED";

    // 2) 결제 성공이 확정된 뒤 이벤트 발행 (핵심)
    PaymentCapturedEvent event = PaymentCapturedEvent.of(paymentId, orderId, amount);
    publisher.publish(event);

    // 3) 응답
    return new CreatePaymentResponse(paymentId, orderId, status);
  }
}

2-3) Controller (Service로 위임)

파일 위치: services/payment-command/src/main/java/com/ilway/cqrslab/paymentcommand/api/PaymentCommandController.java


package com.ilway.cqrslab.paymentcommand.api;

import com.ilway.cqrslab.paymentcommand.application.PaymentCommandService;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/commands/payments")
public class PaymentCommandController {

  private final PaymentCommandService service;

  public PaymentCommandController(PaymentCommandService service) {
    this.service = service;
  }

  @PostMapping
  public CreatePaymentResponse pay(@RequestBody CreatePaymentRequest request) {
    return service.pay(request.orderId(), request.amount());
  }

  public record CreatePaymentRequest(String orderId, long amount) {}
  public record CreatePaymentResponse(String paymentId, String orderId, String status) {}
}

3) order-command: payment-events 소비 → 주문 PAID 반영 → OrderPaidEvent 발행

order-command는 결제 성공 이벤트(PaymentCapturedEvent)를 소비해 MySQL에 저장된 주문 상태를 PAID로 바꿉니다. 그리고 조회 모델 갱신을 위해 OrderPaidEventorder-events로 발행합니다.

3-1) OrderEventsPublisher 확장 (OrderPaidEvent 발행 지원)

파일 위치: services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/event/OrderEventsPublisher.java


package com.ilway.cqrslab.ordercommand.event;

import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.contracts.events.OrderPaidEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;

@Component
public class OrderEventsPublisher {

  private final KafkaTemplate<String, String> kafkaTemplate;
  private final ObjectMapper objectMapper;
  private final String topic;

  public OrderEventsPublisher(
    KafkaTemplate<String, String> kafkaTemplate,
    ObjectMapper objectMapper,
    @Value("${app.kafka.topics.order-events:order-events}") String topic
  ) {
    this.kafkaTemplate = kafkaTemplate;
    this.objectMapper = objectMapper;
    this.topic = topic;
  }

  public void publish(OrderCreatedEvent event) {
    try {
      String key = event.orderId();
      String value = objectMapper.writeValueAsString(event);
      kafkaTemplate.send(topic, key, value);
    } catch (Exception e) {
      throw new IllegalStateException("Failed to publish OrderCreatedEvent", e);
    }
  }

  public void publish(OrderPaidEvent event) {
    try {
      String key = event.orderId();
      String value = objectMapper.writeValueAsString(event);
      kafkaTemplate.send(topic, key, value);
    } catch (Exception e) {
      throw new IllegalStateException("Failed to publish OrderPaidEvent", e);
    }
  }
}

3-2) payment-events Consumer (주문 PAID 반영 + idempotency)

파일 위치: services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/event/PaymentEventsConsumer.java


package com.ilway.cqrslab.ordercommand.event;

import com.ilway.cqrslab.contracts.events.OrderPaidEvent;
import com.ilway.cqrslab.contracts.events.PaymentCapturedEvent;
import com.ilway.cqrslab.ordercommand.order.OrderJpaEntity;
import com.ilway.cqrslab.ordercommand.order.OrderJpaRepository;
import com.ilway.cqrslab.ordercommand.order.OrderStatus;
import java.time.Instant;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import tools.jackson.databind.ObjectMapper;

@Component
public class PaymentEventsConsumer {

  private final ObjectMapper objectMapper;
  private final OrderJpaRepository orderRepo;
  private final OrderEventsPublisher orderEventsPublisher;

  public PaymentEventsConsumer(
    ObjectMapper objectMapper,
    OrderJpaRepository orderRepo,
    OrderEventsPublisher orderEventsPublisher
  ) {
    this.objectMapper = objectMapper;
    this.orderRepo = orderRepo;
    this.orderEventsPublisher = orderEventsPublisher;
  }

  @KafkaListener(
    topics = "${app.kafka.topics.payment-events:payment-events}",
    groupId = "order-command"
  )
  @Transactional
  public void onMessage(String message) {
    try {
      PaymentCapturedEvent event = objectMapper.readValue(message, PaymentCapturedEvent.class);

      OrderJpaEntity order = orderRepo.findById(event.orderId())
        .orElseThrow(() -> new IllegalStateException("Order not found: " + event.orderId()));

      // 초급 idempotency: 이미 PAID면 중복 이벤트로 보고 무시
      if (order.getStatus() == OrderStatus.PAID) {
        return;
      }

      order.markPaid(Instant.now());
      orderRepo.save(order);

      // 조회 모델 갱신용 이벤트 발행
      OrderPaidEvent paidEvent = OrderPaidEvent.of(order.getOrderId(), order.getUserId());
      orderEventsPublisher.publish(paidEvent);

    } catch (Exception e) {
      throw new IllegalStateException("Failed to consume payment event: " + message, e);
    }
  }
}

4) order-query: order-events 분기 소비

이제 order-events 토픽에는 OrderCreatedOrderPaid가 섞여 들어옵니다. 따라서 order-query는 메시지의 type 필드로 분기하여 안전하게 소비해야 합니다.

중요: Jackson 3 (tools.jackson) 변경점

asText(default)가 deprecated 되었으므로 asString(default)로 교체합니다.

4-1) OrderEventsConsumer (type 분기)

파일 위치: services/order-query/src/main/java/com/ilway/cqrslab/orderquery/event/OrderEventsConsumer.java


package com.ilway.cqrslab.orderquery.event;

import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.contracts.events.OrderPaidEvent;
import com.ilway.cqrslab.orderquery.readmodel.OrderReadModel;
import com.ilway.cqrslab.orderquery.readmodel.RedisOrderReadModelRepository;
import java.time.Instant;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tools.jackson.databind.JsonNode;
import tools.jackson.databind.ObjectMapper;

@Component
public class OrderEventsConsumer {

  private final ObjectMapper objectMapper;
  private final RedisOrderReadModelRepository redisRepo;

  public OrderEventsConsumer(ObjectMapper objectMapper, RedisOrderReadModelRepository redisRepo) {
    this.objectMapper = objectMapper;
    this.redisRepo = redisRepo;
  }

  @KafkaListener(
    topics = "${app.kafka.topics.order-events:order-events}",
    groupId = "${spring.kafka.consumer.group-id:order-query}"
  )
  public void onMessage(String message) {
    try {
      JsonNode node = objectMapper.readTree(message);

      // Jackson 3: asText(default) deprecated → asString(default)
      String type = node.path("type").asString("");

      if ("OrderCreated".equals(type)) {
        OrderCreatedEvent event = objectMapper.treeToValue(node, OrderCreatedEvent.class);
        long createdAtEpochMs = Instant.parse(event.occurredAt()).toEpochMilli();

        redisRepo.upsertOrder(new OrderReadModel(
          event.orderId(),
          event.userId(),
          event.status(),
          createdAtEpochMs
        ));
        return;
      }

      if ("OrderPaid".equals(type)) {
        OrderPaidEvent event = objectMapper.treeToValue(node, OrderPaidEvent.class);
        long occurredAtEpochMs = Instant.parse(event.occurredAt()).toEpochMilli();

        redisRepo.updateOrderStatus(
          event.orderId(),
          event.userId(),
          "PAID",
          occurredAtEpochMs
        );
        return;
      }

      // unknown type: ignore (lab stage)
    } catch (Exception e) {
      throw new IllegalStateException("Failed to consume order event: " + message, e);
    }
  }
}

4-2) Redis Read Model 업데이트 메서드

파일 위치: services/order-query/src/main/java/com/ilway/cqrslab/orderquery/readmodel/RedisOrderReadModelRepository.java


public void updateOrderStatus(String orderId, String userId, String status, long occurredAtEpochMs) {
  try {
    OrderReadModel existing = findOrder(orderId);

    long createdAt = (existing != null) ? existing.createdAtEpochMs() : occurredAtEpochMs;
    OrderReadModel updated = new OrderReadModel(orderId, userId, status, createdAt);

    upsertOrder(updated);
  } catch (Exception e) {
    throw new IllegalStateException("Failed to update order status", e);
  }
}

5) 테스트 시나리오(딱 이 순서대로) + 실제 테스트 결과

5-1) 1. 주문 생성


/Desktop/il-way/lecture/cqrs
$ curl -X POST http://localhost:8081/commands/orders \
  -H "Content-Type: application/json" \
  -d "{\"userId\":\"u_1\"}"
{"orderId":"o_319c31e2-3bee-44e7-99c7-2d1ec470802c","status":"CREATED"}

5-2) 2. 방금 생성된 orderId로 결제 캡처


/Desktop/il-way/lecture/cqrs
$ curl -X POST http://localhost:8082/commands/payments \
  -H "Content-Type: application/json" \
  -d "{\"orderId\":\"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c\",\"amount\":42000}"
{"paymentId":"p_3b41d536-5a2e-41fa-bc62-01eb551e63e2","orderId":"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c","status":"CAPTURED"}

5-3) 3. 조회에서 상태가 PAID로 바뀌는지 확인


/Desktop/il-way/lecture/cqrs
$ curl "http://localhost:8083/queries/users/u_1/orders?limit=20"
[{"orderId":"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c","userId":"u_1","status":"PAID","createdAtEpochMs":1769874456452},{"orderId":"o_319c31e2-3bee-44e7-99c7-2d1ec470802c","userId":"u_1","status":"CREATED","createdAtEpochMs":1769874056986}]

5-4) 추가 확인: MySQL 주문 상태


mysql> select order_id, user_id, status, updated_at from orders where user_id='u_1' order by updated_at desc limit 5;
+----------------------------------------+---------+---------+----------------------------+
| order_id                               | user_id | status  | updated_at                 |
+----------------------------------------+---------+---------+----------------------------+
| o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c | u_1     | PAID    | 2026-01-31 15:47:36.455951 |
+----------------------------------------+---------+---------+----------------------------+

5-5) 추가 확인: Redis Read Model


PS C:\Users\Desktop\il-way\lecture\cqrs\infra> docker exec -it cqrs-redis redis-cli keys "order:*"
1) "order:o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c"

PS C:\Users\Desktop\il-way\lecture\cqrs\infra> docker exec -it cqrs-redis redis-cli get "order:o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c"
"{\"orderId\":\"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c\",\"userId\":\"u_1\",\"status\":\"PAID\",\"createdAtEpochMs\":1769874456452}"

정리

  • payment-command가 결제 성공 이벤트를 발행했고
  • order-command가 이를 소비해 MySQL의 주문을 PAID로 업데이트했으며
  • 그 결과를 order-events로 재발행해
  • order-query의 Redis Read Model이 PAID로 갱신되었습니다.

마무리

이번 단계로 “결제 성공 → 주문 상태 반영 → 조회 모델 갱신”까지 이벤트 체인이 완성되었습니다. 다음 단계에서는 Outbox 패턴, DLQ, 멱등성 강화를 통해 실전 안정성을 높여갈 수 있습니다.

이 글은 어떠셨나요? 자유롭게 의견을 남겨주세요! 💬