CQRS 실습 – 주문/결제 도메인 만들기 (3편)

환경: Spring Boot 4.0.2 · Java 25 · MySQL 8.4.8 · Redis 7.4.7-alpine · Kafka (confluent-local 7.6.0) · IntelliJ · Docker

핵심 요약

  • 이번 편의 목표는 “CQRS의 핵심 흐름”을 직접 연결하는 것입니다: 쓰기(MySQL) → 이벤트(Kafka) → 읽기 모델(Redis) → 조회.
  • order-command는 주문을 JPA로 MySQL에 저장하고, 저장 직후 OrderCreatedEvent를 Kafka로 발행합니다.
  • order-query는 이벤트를 구독(Consumer)해 Redis의 Read Model을 갱신하고, 조회 API는 Redis만 바라보도록 설계합니다.
  • Kafka 토픽명은 하드코딩하지 않고 app.kafka.topics.order-events 설정으로 관리합니다.

목차

1) 이번 편에서 완성할 데이터 흐름(CQRS 뼈대)


[order-command]
  1) 주문 생성 요청 수신
  2) MySQL(JPA)에 주문 저장
  3) OrderCreatedEvent 발행(Kafka)
        ↓
[order-query]
  4) 이벤트 구독(Consumer)
  5) Redis에 Read Model 저장(주문 단건 + 유저별 주문 목록)
  6) 조회 API는 Redis만 읽음

2) 이벤트 계약: libs/event-contracts에 OrderCreatedEvent 만들기

이벤트는 서비스 간 계약이므로 libs/event-contracts에 둡니다. (order-command / order-query가 같은 이벤트 타입을 바라보도록 강제)

파일 위치: cqrs/libs/event-contracts/src/main/java/com/ilway/cqrslab/contracts/events/OrderCreatedEvent.java


package com.ilway.cqrslab.contracts.events;

import java.time.Instant;
import java.util.UUID;

public record OrderCreatedEvent(
  String eventId,
  String type,
  String occurredAt,
  String orderId,
  String userId,
  String status
) {

  public static OrderCreatedEvent of(String orderId, String userId, String status) {
    return new OrderCreatedEvent(
      UUID.randomUUID().toString(),
      "OrderCreated",
      Instant.now().toString(),
      orderId,
      userId,
      status
    );
  }
}

3) order-command: JPA(MySQL) 저장으로 변경

주문 생성 요청을 받으면 MySQL에 먼저 저장하고, “정합성의 기준점(Source of Truth)”을 MySQL로 둡니다.

3-1) Entity

파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/domain/OrderEntity.java


package com.ilway.cqrslab.ordercommand.domain;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.time.Instant;

@Entity
@Table(name = "orders")
public class OrderEntity {

  @Id
  private String orderId;

  private String userId;

  private String status;

  private Instant createdAt;

  private Instant updatedAt;

  protected OrderEntity() {}

  public OrderEntity(String orderId, String userId, String status, Instant createdAt, Instant updatedAt) {
    this.orderId = orderId;
    this.userId = userId;
    this.status = status;
    this.createdAt = createdAt;
    this.updatedAt = updatedAt;
  }

  public String getOrderId() { return orderId; }
  public String getUserId() { return userId; }
  public String getStatus() { return status; }
  public Instant getCreatedAt() { return createdAt; }
  public Instant getUpdatedAt() { return updatedAt; }
}

3-2) Repository

파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/domain/OrderJpaRepository.java


package com.ilway.cqrslab.ordercommand.domain;

import org.springframework.data.jpa.repository.JpaRepository;

public interface OrderJpaRepository extends JpaRepository<OrderEntity, String> {
}

4) order-command: 저장 후 Kafka 이벤트 발행

4-1) Kafka Publisher

토픽명은 app.kafka.topics.order-events로 주입받습니다.

파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/event/OrderEventsPublisher.java


package com.ilway.cqrslab.ordercommand.event;

import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;

@Component
public class OrderEventsPublisher {

  private final KafkaTemplate<String, String> kafka;
  private final ObjectMapper objectMapper;
  private final String topic;

  public OrderEventsPublisher(
    KafkaTemplate<String, String> kafka,
    ObjectMapper objectMapper,
    @Value("${app.kafka.topics.order-events:order-events}") String topic
  ) {
    this.kafka = kafka;
    this.objectMapper = objectMapper;
    this.topic = topic;
  }

  public void publish(OrderCreatedEvent event) {
    try {
      String json = objectMapper.writeValueAsString(event);
      kafka.send(topic, event.orderId(), json);
    } catch (Exception e) {
      throw new IllegalStateException("Failed to publish event to Kafka", e);
    }
  }
}

4-2) Service: “저장 후 발행” 연결

파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/application/OrderCommandService.java


package com.ilway.cqrslab.ordercommand.application;

import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.ordercommand.domain.OrderEntity;
import com.ilway.cqrslab.ordercommand.domain.OrderJpaRepository;
import com.ilway.cqrslab.ordercommand.event.OrderEventsPublisher;
import java.time.Instant;
import java.util.UUID;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderCommandService {

  private final OrderJpaRepository orderRepo;
  private final OrderEventsPublisher publisher;

  public OrderCommandService(OrderJpaRepository orderRepo, OrderEventsPublisher publisher) {
    this.orderRepo = orderRepo;
    this.publisher = publisher;
  }

  @Transactional
  public CreatedOrder createOrder(String userId) {
    String orderId = "o_" + UUID.randomUUID();
    Instant now = Instant.now();

    OrderEntity entity = new OrderEntity(
      orderId,
      userId,
      "CREATED",
      now,
      now
    );

    orderRepo.save(entity);

    OrderCreatedEvent event = OrderCreatedEvent.of(orderId, userId, "CREATED");
    publisher.publish(event);

    return new CreatedOrder(orderId, "CREATED");
  }

  public record CreatedOrder(String orderId, String status) {}
}

4-3) order-command application.yml (수정본)

파일 위치: cqrs/services/order-command/src/main/resources/application.yml


server:
  port: 8081

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/cqrs_demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
    username: app
    password: app

  jpa:
    hibernate:
      ddl-auto: update
    open-in-view: false

  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

app:
  kafka:
    topics:
      order-events: order-events

5) order-query: Kafka 이벤트 소비 → Redis Read Model 저장

5-1) Read Model

파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/readmodel/OrderReadModel.java


package com.ilway.cqrslab.orderquery.readmodel;

public record OrderReadModel(
  String orderId,
  String userId,
  String status,
  long createdAtEpochMs
) {
}

5-2) Redis 저장소

파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/readmodel/RedisOrderReadModelRepository.java


package com.ilway.cqrslab.orderquery.readmodel;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;

@Component
public class RedisOrderReadModelRepository {

  private final StringRedisTemplate redis;
  private final ObjectMapper objectMapper;

  public RedisOrderReadModelRepository(ObjectMapper objectMapper, StringRedisTemplate redis) {
    this.objectMapper = objectMapper;
    this.redis = redis;
  }

  public void upsertOrder(OrderReadModel model) {
    try {
      String key = orderKey(model.orderId());
      String value = objectMapper.writeValueAsString(model);
      redis.opsForValue().set(key, value);

      redis.opsForZSet().add(
        userOrdersKey(model.userId()),
        model.orderId(),
        model.createdAtEpochMs()
      );
    } catch (Exception e) {
      throw new IllegalStateException("Failed to upsert order read model", e);
    }
  }

  public List<OrderReadModel> findUserOrders(String userId, int limit) {
    try {
      Set<String> orderIds = redis.opsForZSet().reverseRange(userOrdersKey(userId), 0, limit - 1);
      if (orderIds == null || orderIds.isEmpty()) return List.of();

      List<String> keys = orderIds.stream()
        .map(RedisOrderReadModelRepository::orderKey)
        .toList();

      List<String> jsons = redis.opsForValue().multiGet(keys);
      if (jsons == null) return List.of();

      return jsons.stream()
        .filter(Objects::nonNull)
        .map(json -> objectMapper.readValue(json, OrderReadModel.class))
        .toList();
    } catch (Exception e) {
      throw new IllegalStateException("Failed to read user orders", e);
    }
  }

  private static String orderKey(String orderId) {
    return "order:" + orderId;
  }

  private static String userOrdersKey(String userId) {
    return "user:" + userId + ":orders";
  }
}

5-3) Kafka Consumer

파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/event/OrderEventsConsumer.java


package com.ilway.cqrslab.orderquery.event;

import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.orderquery.readmodel.OrderReadModel;
import com.ilway.cqrslab.orderquery.readmodel.RedisOrderReadModelRepository;
import java.time.Instant;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;

@Component
public class OrderEventsConsumer {

  private final ObjectMapper objectMapper;
  private final RedisOrderReadModelRepository redisRepo;

  public OrderEventsConsumer(ObjectMapper objectMapper, RedisOrderReadModelRepository redisRepo) {
    this.objectMapper = objectMapper;
    this.redisRepo = redisRepo;
  }

  @KafkaListener(
    topics = "${app.kafka.topics.order-events:order-events}",
    groupId = "${spring.kafka.consumer.group-id:order-query}"
  )
  public void onMessage(String message) {
    try {
      OrderCreatedEvent event = objectMapper.readValue(message, OrderCreatedEvent.class);
      long createdAtEpochMs = Instant.parse(event.occurredAt()).toEpochMilli();

      redisRepo.upsertOrder(new OrderReadModel(
        event.orderId(),
        event.userId(),
        event.status(),
        createdAtEpochMs
      ));
    } catch (Exception e) {
      throw new IllegalStateException("Failed to consume order event: " + message, e);
    }
  }
}

5-4) order-query application.yml (수정본)

파일 위치: cqrs/services/order-query/src/main/resources/application.yml



server:
  port: 8083

spring:
  data:
    redis:
      host: localhost
      port: 6379

  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-query
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

app:
  kafka:
    topics:
      order-events: order-events

6) order-query: 조회 API를 Redis 기반으로 교체

조회 API는 이제 MySQL을 보지 않고, Redis Read Model만 읽습니다.

파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/api/OrderQueryController.java


package com.ilway.cqrslab.orderquery.api;

import com.ilway.cqrslab.orderquery.readmodel.OrderReadModel;
import com.ilway.cqrslab.orderquery.readmodel.RedisOrderReadModelRepository;
import java.util.List;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/queries")
public class OrderQueryController {

  private final RedisOrderReadModelRepository redisRepo;

  public OrderQueryController(RedisOrderReadModelRepository redisRepo) {
    this.redisRepo = redisRepo;
  }

  @GetMapping("/users/{userId}/orders")
  public List<OrderReadModel> userOrders(
    @PathVariable String userId,
    @RequestParam(defaultValue = "20") int limit
  ) {
    return redisRepo.findUserOrders(userId, limit);
  }
}

7) 동작 확인(진짜 CQRS 확인)

7-1) Kafka 메시지 확인용 콘솔 소비자

새 터미널에서:


docker exec -it cqrs-kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic order-events \
  --from-beginning

7-2) 주문 생성 → MySQL 저장 + Kafka 발행 + Redis 반영


curl -X POST http://localhost:8081/commands/orders \
  -H "Content-Type: application/json" \
  -d "{\"userId\":\"u_1\"}"

7-3) MySQL에 들어갔는지 확인


docker exec -it cqrs-mysql mysql -uapp -papp -D cqrs_demo \
  -e "select order_id, user_id, status, created_at from orders order by created_at desc limit 5;"

Redis에 Read Model이 생겼는지 확인


docker exec -it cqrs-redis redis-cli keys "order:*"
docker exec -it cqrs-redis redis-cli zrevrange "user:u_1:orders" 0 10

7-4) 이제 주문내역 조회는 Redis에서만


curl "http://localhost:8083/queries/users/u_1/orders?limit=20"

다음 편 예고

다음 편에서는 “실전에서 반드시 부딪히는 지점”을 다룹니다. 예를 들어 Outbox(원자성), 멱등성(중복 이벤트), DLQ, 재처리 전략 같은 운영 요소를 단계적으로 추가해 CQRS를 “실무형”으로 다듬습니다.

이 글은 어떠셨나요? 자유롭게 의견을 남겨주세요! 💬