환경: Spring Boot 4.0.2 · Java 25 · MySQL 8.4 ·
Redis 7.4.7-alpine · Kafka(Docker) · IntelliJ
Jackson: import tools.jackson.databind.ObjectMapper
이번 편에서는 “결제 성공 → 주문 상태 PAID 반영 → 조회 모델 갱신”까지 이벤트 체인을 완성합니다.
토픽을 2개(order-events, payment-events)로 분리하고,
PaymentCapturedEvent가 발생하면 order-command가 이를 소비해 MySQL의 주문을 PAID로 업데이트한 뒤
OrderPaidEvent를 다시 발행합니다.
order-query는 order-events에 섞인 이벤트 타입을 안전하게 분기 소비해서 Redis Read Model을 갱신합니다.
핵심 요약
- 이번 단계의 목표는 결제 이벤트를 연결해 주문 상태가 PAID까지 “끝까지 흐르도록” 만드는 것입니다.
- Kafka 토픽을 2개(order-events / payment-events)로 분리해 도메인 흐름을 명확하게 합니다.
- payment-command가 PaymentCapturedEvent를 발행하면, order-command가 이를 소비해 MySQL 주문 상태를
PAID로 업데이트합니다. - 상태 변경 결과를 다시 OrderPaidEvent로
order-events에 발행하고, order-query는 이벤트 타입을 분기 소비해 Redis Read Model을 갱신합니다. - 중복 이벤트를 대비해 초급 idempotency(이미 PAID면 무시)를 1차 적용합니다.
이번 단계에서 달성한 것
- 토픽을
order-events/payment-events로 분리 - payment-command가
PaymentCapturedEvent발행 - order-command가 결제 이벤트를 소비해 MySQL 주문 상태를
PAID로 업데이트 - order-command가
OrderPaidEvent를order-events로 발행 - order-query가
order-events에 섞인 이벤트 타입(OrderCreated/OrderPaid)을 안전하게 분기 소비 - 초급 idempotency(중복 이벤트 방지) 1차 적용
목차
- 0) 토픽 2개로 나누자
- 1) 이벤트 계약 추가: PaymentCapturedEvent
- 2) payment-command: 결제 성공 시 payment-events 발행
- 3) order-command: payment-events 소비 → 주문 PAID 반영 → OrderPaidEvent 발행
- 4) order-query: order-events 분기 소비
- 5) 테스트 시나리오(딱 이 순서대로) + 실제 결과
- 마무리
0) 토픽 2개로 나누자
이번 단계부터 이벤트 흐름을 명확히 하기 위해 Kafka 토픽을 2개로 분리합니다.
order-events: 주문 도메인 이벤트 (예:OrderCreated,OrderPaid)payment-events: 결제 도메인 이벤트 (예:PaymentCaptured)
order-command application.yml
파일 위치: services/order-command/src/main/resources/application.yml
app:
kafka:
topics:
order-events: order-events
payment-events: payment-events
payment-command application.yml
파일 위치: services/payment-command/src/main/resources/application.yml
app:
kafka:
topics:
payment-events: payment-events
order-query application.yml
파일 위치: services/order-query/src/main/resources/application.yml
app:
kafka:
topics:
order-events: order-events
1) 이벤트 계약 추가: PaymentCapturedEvent
결제가 성공(CAPTURED)했음을 알리는 이벤트입니다.
이벤트는 서비스 간 계약이므로 libs/event-contracts에 둡니다.
파일 위치: libs/event-contracts/src/main/java/com/ilway/cqrslab/contracts/events/PaymentCapturedEvent.java
package com.ilway.cqrslab.contracts.events;
import java.time.Instant;
import java.util.UUID;
public record PaymentCapturedEvent(
String eventId,
String type,
String occurredAt,
String paymentId,
String orderId,
long amount,
String status
) {
public static PaymentCapturedEvent of(String paymentId, String orderId, long amount) {
return new PaymentCapturedEvent(
UUID.randomUUID().toString(),
"PaymentCaptured",
Instant.now().toString(),
paymentId,
orderId,
amount,
"CAPTURED"
);
}
}
2) payment-command: 결제 성공 시 payment-events 발행
payment-command는 결제 처리 결과가 CAPTURED로 확정되면
PaymentCapturedEvent를 payment-events 토픽으로 발행합니다.
2-1) Publisher
파일 위치: services/payment-command/src/main/java/com/ilway/cqrslab/paymentcommand/event/PaymentEventsPublisher.java
package com.ilway.cqrslab.paymentcommand.event;
import com.ilway.cqrslab.contracts.events.PaymentCapturedEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;
@Component
public class PaymentEventsPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final String topic;
public PaymentEventsPublisher(
KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper,
@Value("${app.kafka.topics.payment-events:payment-events}") String topic
) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.topic = topic;
}
public void publish(PaymentCapturedEvent event) {
try {
String key = event.orderId();
String value = objectMapper.writeValueAsString(event);
kafkaTemplate.send(topic, key, value);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish PaymentCapturedEvent", e);
}
}
}
2-2) Service (이벤트 발행 위치)
파일 위치: services/payment-command/src/main/java/com/ilway/cqrslab/paymentcommand/application/PaymentCommandService.java
package com.ilway.cqrslab.paymentcommand.application;
import com.ilway.cqrslab.contracts.events.PaymentCapturedEvent;
import com.ilway.cqrslab.paymentcommand.api.PaymentCommandController.CreatePaymentResponse;
import com.ilway.cqrslab.paymentcommand.event.PaymentEventsPublisher;
import java.util.UUID;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class PaymentCommandService {
private static final String PAYMENT_PREFIX = "p_";
private final PaymentEventsPublisher publisher;
public PaymentCommandService(PaymentEventsPublisher publisher) {
this.publisher = publisher;
}
@Transactional
public CreatePaymentResponse pay(String orderId, long amount) {
// 1) 결제 처리(실습: 항상 성공 처리)
String paymentId = PAYMENT_PREFIX + UUID.randomUUID();
String status = "CAPTURED";
// 2) 결제 성공이 확정된 뒤 이벤트 발행 (핵심)
PaymentCapturedEvent event = PaymentCapturedEvent.of(paymentId, orderId, amount);
publisher.publish(event);
// 3) 응답
return new CreatePaymentResponse(paymentId, orderId, status);
}
}
2-3) Controller (Service로 위임)
파일 위치: services/payment-command/src/main/java/com/ilway/cqrslab/paymentcommand/api/PaymentCommandController.java
package com.ilway.cqrslab.paymentcommand.api;
import com.ilway.cqrslab.paymentcommand.application.PaymentCommandService;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/commands/payments")
public class PaymentCommandController {
private final PaymentCommandService service;
public PaymentCommandController(PaymentCommandService service) {
this.service = service;
}
@PostMapping
public CreatePaymentResponse pay(@RequestBody CreatePaymentRequest request) {
return service.pay(request.orderId(), request.amount());
}
public record CreatePaymentRequest(String orderId, long amount) {}
public record CreatePaymentResponse(String paymentId, String orderId, String status) {}
}
3) order-command: payment-events 소비 → 주문 PAID 반영 → OrderPaidEvent 발행
order-command는 결제 성공 이벤트(PaymentCapturedEvent)를 소비해
MySQL에 저장된 주문 상태를 PAID로 바꿉니다.
그리고 조회 모델 갱신을 위해 OrderPaidEvent를 order-events로 발행합니다.
3-1) OrderEventsPublisher 확장 (OrderPaidEvent 발행 지원)
파일 위치: services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/event/OrderEventsPublisher.java
package com.ilway.cqrslab.ordercommand.event;
import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.contracts.events.OrderPaidEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;
@Component
public class OrderEventsPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final String topic;
public OrderEventsPublisher(
KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper,
@Value("${app.kafka.topics.order-events:order-events}") String topic
) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.topic = topic;
}
public void publish(OrderCreatedEvent event) {
try {
String key = event.orderId();
String value = objectMapper.writeValueAsString(event);
kafkaTemplate.send(topic, key, value);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish OrderCreatedEvent", e);
}
}
public void publish(OrderPaidEvent event) {
try {
String key = event.orderId();
String value = objectMapper.writeValueAsString(event);
kafkaTemplate.send(topic, key, value);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish OrderPaidEvent", e);
}
}
}
3-2) payment-events Consumer (주문 PAID 반영 + idempotency)
파일 위치: services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/event/PaymentEventsConsumer.java
package com.ilway.cqrslab.ordercommand.event;
import com.ilway.cqrslab.contracts.events.OrderPaidEvent;
import com.ilway.cqrslab.contracts.events.PaymentCapturedEvent;
import com.ilway.cqrslab.ordercommand.order.OrderJpaEntity;
import com.ilway.cqrslab.ordercommand.order.OrderJpaRepository;
import com.ilway.cqrslab.ordercommand.order.OrderStatus;
import java.time.Instant;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import tools.jackson.databind.ObjectMapper;
@Component
public class PaymentEventsConsumer {
private final ObjectMapper objectMapper;
private final OrderJpaRepository orderRepo;
private final OrderEventsPublisher orderEventsPublisher;
public PaymentEventsConsumer(
ObjectMapper objectMapper,
OrderJpaRepository orderRepo,
OrderEventsPublisher orderEventsPublisher
) {
this.objectMapper = objectMapper;
this.orderRepo = orderRepo;
this.orderEventsPublisher = orderEventsPublisher;
}
@KafkaListener(
topics = "${app.kafka.topics.payment-events:payment-events}",
groupId = "order-command"
)
@Transactional
public void onMessage(String message) {
try {
PaymentCapturedEvent event = objectMapper.readValue(message, PaymentCapturedEvent.class);
OrderJpaEntity order = orderRepo.findById(event.orderId())
.orElseThrow(() -> new IllegalStateException("Order not found: " + event.orderId()));
// 초급 idempotency: 이미 PAID면 중복 이벤트로 보고 무시
if (order.getStatus() == OrderStatus.PAID) {
return;
}
order.markPaid(Instant.now());
orderRepo.save(order);
// 조회 모델 갱신용 이벤트 발행
OrderPaidEvent paidEvent = OrderPaidEvent.of(order.getOrderId(), order.getUserId());
orderEventsPublisher.publish(paidEvent);
} catch (Exception e) {
throw new IllegalStateException("Failed to consume payment event: " + message, e);
}
}
}
4) order-query: order-events 분기 소비
이제 order-events 토픽에는 OrderCreated와 OrderPaid가 섞여 들어옵니다.
따라서 order-query는 메시지의 type 필드로 분기하여 안전하게 소비해야 합니다.
중요: Jackson 3 (tools.jackson) 변경점
asText(default)가 deprecated 되었으므로asString(default)로 교체합니다.
4-1) OrderEventsConsumer (type 분기)
파일 위치: services/order-query/src/main/java/com/ilway/cqrslab/orderquery/event/OrderEventsConsumer.java
package com.ilway.cqrslab.orderquery.event;
import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.contracts.events.OrderPaidEvent;
import com.ilway.cqrslab.orderquery.readmodel.OrderReadModel;
import com.ilway.cqrslab.orderquery.readmodel.RedisOrderReadModelRepository;
import java.time.Instant;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tools.jackson.databind.JsonNode;
import tools.jackson.databind.ObjectMapper;
@Component
public class OrderEventsConsumer {
private final ObjectMapper objectMapper;
private final RedisOrderReadModelRepository redisRepo;
public OrderEventsConsumer(ObjectMapper objectMapper, RedisOrderReadModelRepository redisRepo) {
this.objectMapper = objectMapper;
this.redisRepo = redisRepo;
}
@KafkaListener(
topics = "${app.kafka.topics.order-events:order-events}",
groupId = "${spring.kafka.consumer.group-id:order-query}"
)
public void onMessage(String message) {
try {
JsonNode node = objectMapper.readTree(message);
// Jackson 3: asText(default) deprecated → asString(default)
String type = node.path("type").asString("");
if ("OrderCreated".equals(type)) {
OrderCreatedEvent event = objectMapper.treeToValue(node, OrderCreatedEvent.class);
long createdAtEpochMs = Instant.parse(event.occurredAt()).toEpochMilli();
redisRepo.upsertOrder(new OrderReadModel(
event.orderId(),
event.userId(),
event.status(),
createdAtEpochMs
));
return;
}
if ("OrderPaid".equals(type)) {
OrderPaidEvent event = objectMapper.treeToValue(node, OrderPaidEvent.class);
long occurredAtEpochMs = Instant.parse(event.occurredAt()).toEpochMilli();
redisRepo.updateOrderStatus(
event.orderId(),
event.userId(),
"PAID",
occurredAtEpochMs
);
return;
}
// unknown type: ignore (lab stage)
} catch (Exception e) {
throw new IllegalStateException("Failed to consume order event: " + message, e);
}
}
}
4-2) Redis Read Model 업데이트 메서드
파일 위치: services/order-query/src/main/java/com/ilway/cqrslab/orderquery/readmodel/RedisOrderReadModelRepository.java
public void updateOrderStatus(String orderId, String userId, String status, long occurredAtEpochMs) {
try {
OrderReadModel existing = findOrder(orderId);
long createdAt = (existing != null) ? existing.createdAtEpochMs() : occurredAtEpochMs;
OrderReadModel updated = new OrderReadModel(orderId, userId, status, createdAt);
upsertOrder(updated);
} catch (Exception e) {
throw new IllegalStateException("Failed to update order status", e);
}
}
5) 테스트 시나리오(딱 이 순서대로) + 실제 테스트 결과
5-1) 1. 주문 생성
/Desktop/il-way/lecture/cqrs
$ curl -X POST http://localhost:8081/commands/orders \
-H "Content-Type: application/json" \
-d "{\"userId\":\"u_1\"}"
{"orderId":"o_319c31e2-3bee-44e7-99c7-2d1ec470802c","status":"CREATED"}
5-2) 2. 방금 생성된 orderId로 결제 캡처
/Desktop/il-way/lecture/cqrs
$ curl -X POST http://localhost:8082/commands/payments \
-H "Content-Type: application/json" \
-d "{\"orderId\":\"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c\",\"amount\":42000}"
{"paymentId":"p_3b41d536-5a2e-41fa-bc62-01eb551e63e2","orderId":"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c","status":"CAPTURED"}
5-3) 3. 조회에서 상태가 PAID로 바뀌는지 확인
/Desktop/il-way/lecture/cqrs
$ curl "http://localhost:8083/queries/users/u_1/orders?limit=20"
[{"orderId":"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c","userId":"u_1","status":"PAID","createdAtEpochMs":1769874456452},{"orderId":"o_319c31e2-3bee-44e7-99c7-2d1ec470802c","userId":"u_1","status":"CREATED","createdAtEpochMs":1769874056986}]
5-4) 추가 확인: MySQL 주문 상태
mysql> select order_id, user_id, status, updated_at from orders where user_id='u_1' order by updated_at desc limit 5;
+----------------------------------------+---------+---------+----------------------------+
| order_id | user_id | status | updated_at |
+----------------------------------------+---------+---------+----------------------------+
| o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c | u_1 | PAID | 2026-01-31 15:47:36.455951 |
+----------------------------------------+---------+---------+----------------------------+
5-5) 추가 확인: Redis Read Model
PS C:\Users\Desktop\il-way\lecture\cqrs\infra> docker exec -it cqrs-redis redis-cli keys "order:*"
1) "order:o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c"
PS C:\Users\Desktop\il-way\lecture\cqrs\infra> docker exec -it cqrs-redis redis-cli get "order:o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c"
"{\"orderId\":\"o_a4b5cd7e-54ec-419d-99d2-e3e49f72961c\",\"userId\":\"u_1\",\"status\":\"PAID\",\"createdAtEpochMs\":1769874456452}"
정리
- payment-command가 결제 성공 이벤트를 발행했고
- order-command가 이를 소비해 MySQL의 주문을
PAID로 업데이트했으며 - 그 결과를 order-events로 재발행해
- order-query의 Redis Read Model이
PAID로 갱신되었습니다.
마무리
이번 단계로 “결제 성공 → 주문 상태 반영 → 조회 모델 갱신”까지 이벤트 체인이 완성되었습니다. 다음 단계에서는 Outbox 패턴, DLQ, 멱등성 강화를 통해 실전 안정성을 높여갈 수 있습니다.