환경: Spring Boot 4.0.2 · Java 25 · MySQL 8.4.8 · Redis 7.4.7-alpine · Kafka (confluent-local 7.6.0) · IntelliJ · Docker
핵심 요약
- 이번 편의 목표는 “CQRS의 핵심 흐름”을 직접 연결하는 것입니다: 쓰기(MySQL) → 이벤트(Kafka) → 읽기 모델(Redis) → 조회.
- order-command는 주문을 JPA로 MySQL에 저장하고, 저장 직후 OrderCreatedEvent를 Kafka로 발행합니다.
- order-query는 이벤트를 구독(Consumer)해 Redis의 Read Model을 갱신하고, 조회 API는 Redis만 바라보도록 설계합니다.
- Kafka 토픽명은 하드코딩하지 않고 app.kafka.topics.order-events 설정으로 관리합니다.
목차
- 1) 이번 편에서 완성할 데이터 흐름(CQRS 뼈대)
- 2) 이벤트 계약: libs/event-contracts에 OrderCreatedEvent 만들기
- 3) order-command: JPA(MySQL) 저장으로 변경
- 4) order-command: 저장 후 Kafka 이벤트 발행
- 5) order-query: Kafka 이벤트 소비 → Redis Read Model 저장
- 6) order-query: 조회 API를 Redis 기반으로 교체
- 7) 동작 확인(진짜 CQRS 확인)
- 다음 편 예고
1) 이번 편에서 완성할 데이터 흐름(CQRS 뼈대)
[order-command]
1) 주문 생성 요청 수신
2) MySQL(JPA)에 주문 저장
3) OrderCreatedEvent 발행(Kafka)
↓
[order-query]
4) 이벤트 구독(Consumer)
5) Redis에 Read Model 저장(주문 단건 + 유저별 주문 목록)
6) 조회 API는 Redis만 읽음
2) 이벤트 계약: libs/event-contracts에 OrderCreatedEvent 만들기
이벤트는 서비스 간 계약이므로 libs/event-contracts에 둡니다.
(order-command / order-query가 같은 이벤트 타입을 바라보도록 강제)
파일 위치: cqrs/libs/event-contracts/src/main/java/com/ilway/cqrslab/contracts/events/OrderCreatedEvent.java
package com.ilway.cqrslab.contracts.events;
import java.time.Instant;
import java.util.UUID;
public record OrderCreatedEvent(
String eventId,
String type,
String occurredAt,
String orderId,
String userId,
String status
) {
public static OrderCreatedEvent of(String orderId, String userId, String status) {
return new OrderCreatedEvent(
UUID.randomUUID().toString(),
"OrderCreated",
Instant.now().toString(),
orderId,
userId,
status
);
}
}
3) order-command: JPA(MySQL) 저장으로 변경
주문 생성 요청을 받으면 MySQL에 먼저 저장하고, “정합성의 기준점(Source of Truth)”을 MySQL로 둡니다.
3-1) Entity
파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/domain/OrderEntity.java
package com.ilway.cqrslab.ordercommand.domain;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.time.Instant;
@Entity
@Table(name = "orders")
public class OrderEntity {
@Id
private String orderId;
private String userId;
private String status;
private Instant createdAt;
private Instant updatedAt;
protected OrderEntity() {}
public OrderEntity(String orderId, String userId, String status, Instant createdAt, Instant updatedAt) {
this.orderId = orderId;
this.userId = userId;
this.status = status;
this.createdAt = createdAt;
this.updatedAt = updatedAt;
}
public String getOrderId() { return orderId; }
public String getUserId() { return userId; }
public String getStatus() { return status; }
public Instant getCreatedAt() { return createdAt; }
public Instant getUpdatedAt() { return updatedAt; }
}
3-2) Repository
파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/domain/OrderJpaRepository.java
package com.ilway.cqrslab.ordercommand.domain;
import org.springframework.data.jpa.repository.JpaRepository;
public interface OrderJpaRepository extends JpaRepository<OrderEntity, String> {
}
4) order-command: 저장 후 Kafka 이벤트 발행
4-1) Kafka Publisher
토픽명은 app.kafka.topics.order-events로 주입받습니다.
파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/event/OrderEventsPublisher.java
package com.ilway.cqrslab.ordercommand.event;
import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;
@Component
public class OrderEventsPublisher {
private final KafkaTemplate<String, String> kafka;
private final ObjectMapper objectMapper;
private final String topic;
public OrderEventsPublisher(
KafkaTemplate<String, String> kafka,
ObjectMapper objectMapper,
@Value("${app.kafka.topics.order-events:order-events}") String topic
) {
this.kafka = kafka;
this.objectMapper = objectMapper;
this.topic = topic;
}
public void publish(OrderCreatedEvent event) {
try {
String json = objectMapper.writeValueAsString(event);
kafka.send(topic, event.orderId(), json);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish event to Kafka", e);
}
}
}
4-2) Service: “저장 후 발행” 연결
파일 위치: cqrs/services/order-command/src/main/java/com/ilway/cqrslab/ordercommand/application/OrderCommandService.java
package com.ilway.cqrslab.ordercommand.application;
import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.ordercommand.domain.OrderEntity;
import com.ilway.cqrslab.ordercommand.domain.OrderJpaRepository;
import com.ilway.cqrslab.ordercommand.event.OrderEventsPublisher;
import java.time.Instant;
import java.util.UUID;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderCommandService {
private final OrderJpaRepository orderRepo;
private final OrderEventsPublisher publisher;
public OrderCommandService(OrderJpaRepository orderRepo, OrderEventsPublisher publisher) {
this.orderRepo = orderRepo;
this.publisher = publisher;
}
@Transactional
public CreatedOrder createOrder(String userId) {
String orderId = "o_" + UUID.randomUUID();
Instant now = Instant.now();
OrderEntity entity = new OrderEntity(
orderId,
userId,
"CREATED",
now,
now
);
orderRepo.save(entity);
OrderCreatedEvent event = OrderCreatedEvent.of(orderId, userId, "CREATED");
publisher.publish(event);
return new CreatedOrder(orderId, "CREATED");
}
public record CreatedOrder(String orderId, String status) {}
}
4-3) order-command application.yml (수정본)
파일 위치: cqrs/services/order-command/src/main/resources/application.yml
server:
port: 8081
spring:
datasource:
url: jdbc:mysql://localhost:3306/cqrs_demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
username: app
password: app
jpa:
hibernate:
ddl-auto: update
open-in-view: false
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
app:
kafka:
topics:
order-events: order-events
5) order-query: Kafka 이벤트 소비 → Redis Read Model 저장
5-1) Read Model
파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/readmodel/OrderReadModel.java
package com.ilway.cqrslab.orderquery.readmodel;
public record OrderReadModel(
String orderId,
String userId,
String status,
long createdAtEpochMs
) {
}
5-2) Redis 저장소
파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/readmodel/RedisOrderReadModelRepository.java
package com.ilway.cqrslab.orderquery.readmodel;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;
@Component
public class RedisOrderReadModelRepository {
private final StringRedisTemplate redis;
private final ObjectMapper objectMapper;
public RedisOrderReadModelRepository(ObjectMapper objectMapper, StringRedisTemplate redis) {
this.objectMapper = objectMapper;
this.redis = redis;
}
public void upsertOrder(OrderReadModel model) {
try {
String key = orderKey(model.orderId());
String value = objectMapper.writeValueAsString(model);
redis.opsForValue().set(key, value);
redis.opsForZSet().add(
userOrdersKey(model.userId()),
model.orderId(),
model.createdAtEpochMs()
);
} catch (Exception e) {
throw new IllegalStateException("Failed to upsert order read model", e);
}
}
public List<OrderReadModel> findUserOrders(String userId, int limit) {
try {
Set<String> orderIds = redis.opsForZSet().reverseRange(userOrdersKey(userId), 0, limit - 1);
if (orderIds == null || orderIds.isEmpty()) return List.of();
List<String> keys = orderIds.stream()
.map(RedisOrderReadModelRepository::orderKey)
.toList();
List<String> jsons = redis.opsForValue().multiGet(keys);
if (jsons == null) return List.of();
return jsons.stream()
.filter(Objects::nonNull)
.map(json -> objectMapper.readValue(json, OrderReadModel.class))
.toList();
} catch (Exception e) {
throw new IllegalStateException("Failed to read user orders", e);
}
}
private static String orderKey(String orderId) {
return "order:" + orderId;
}
private static String userOrdersKey(String userId) {
return "user:" + userId + ":orders";
}
}
5-3) Kafka Consumer
파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/event/OrderEventsConsumer.java
package com.ilway.cqrslab.orderquery.event;
import com.ilway.cqrslab.contracts.events.OrderCreatedEvent;
import com.ilway.cqrslab.orderquery.readmodel.OrderReadModel;
import com.ilway.cqrslab.orderquery.readmodel.RedisOrderReadModelRepository;
import java.time.Instant;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;
@Component
public class OrderEventsConsumer {
private final ObjectMapper objectMapper;
private final RedisOrderReadModelRepository redisRepo;
public OrderEventsConsumer(ObjectMapper objectMapper, RedisOrderReadModelRepository redisRepo) {
this.objectMapper = objectMapper;
this.redisRepo = redisRepo;
}
@KafkaListener(
topics = "${app.kafka.topics.order-events:order-events}",
groupId = "${spring.kafka.consumer.group-id:order-query}"
)
public void onMessage(String message) {
try {
OrderCreatedEvent event = objectMapper.readValue(message, OrderCreatedEvent.class);
long createdAtEpochMs = Instant.parse(event.occurredAt()).toEpochMilli();
redisRepo.upsertOrder(new OrderReadModel(
event.orderId(),
event.userId(),
event.status(),
createdAtEpochMs
));
} catch (Exception e) {
throw new IllegalStateException("Failed to consume order event: " + message, e);
}
}
}
5-4) order-query application.yml (수정본)
파일 위치: cqrs/services/order-query/src/main/resources/application.yml
server:
port: 8083
spring:
data:
redis:
host: localhost
port: 6379
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order-query
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
app:
kafka:
topics:
order-events: order-events
6) order-query: 조회 API를 Redis 기반으로 교체
조회 API는 이제 MySQL을 보지 않고, Redis Read Model만 읽습니다.
파일 위치: cqrs/services/order-query/src/main/java/com/ilway/cqrslab/orderquery/api/OrderQueryController.java
package com.ilway.cqrslab.orderquery.api;
import com.ilway.cqrslab.orderquery.readmodel.OrderReadModel;
import com.ilway.cqrslab.orderquery.readmodel.RedisOrderReadModelRepository;
import java.util.List;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/queries")
public class OrderQueryController {
private final RedisOrderReadModelRepository redisRepo;
public OrderQueryController(RedisOrderReadModelRepository redisRepo) {
this.redisRepo = redisRepo;
}
@GetMapping("/users/{userId}/orders")
public List<OrderReadModel> userOrders(
@PathVariable String userId,
@RequestParam(defaultValue = "20") int limit
) {
return redisRepo.findUserOrders(userId, limit);
}
}
7) 동작 확인(진짜 CQRS 확인)
7-1) Kafka 메시지 확인용 콘솔 소비자
새 터미널에서:
docker exec -it cqrs-kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic order-events \
--from-beginning
7-2) 주문 생성 → MySQL 저장 + Kafka 발행 + Redis 반영
curl -X POST http://localhost:8081/commands/orders \
-H "Content-Type: application/json" \
-d "{\"userId\":\"u_1\"}"
7-3) MySQL에 들어갔는지 확인
docker exec -it cqrs-mysql mysql -uapp -papp -D cqrs_demo \
-e "select order_id, user_id, status, created_at from orders order by created_at desc limit 5;"
Redis에 Read Model이 생겼는지 확인
docker exec -it cqrs-redis redis-cli keys "order:*"
docker exec -it cqrs-redis redis-cli zrevrange "user:u_1:orders" 0 10
7-4) 이제 주문내역 조회는 Redis에서만
curl "http://localhost:8083/queries/users/u_1/orders?limit=20"
다음 편 예고
다음 편에서는 “실전에서 반드시 부딪히는 지점”을 다룹니다. 예를 들어 Outbox(원자성), 멱등성(중복 이벤트), DLQ, 재처리 전략 같은 운영 요소를 단계적으로 추가해 CQRS를 “실무형”으로 다듬습니다.