SW Maestro · Kafka 101
회사/프로젝트에서 이벤트 스트리밍을 어디에 쓰고, 어떤 코드와 구조로 연결하는지 15분 안에 이해하기
Problem
One Sentence
“주문이 생성됐다”, “결제가 끝났다”처럼 이미 발생한 사실
이벤트를 순서대로 저장하고, 읽은 위치를 Consumer가 관리
이벤트가 계속 들어오고, 여러 서비스가 거의 실시간으로 처리
Full Architecture
Core Vocabulary
| 용어 | 역할 | 실무 감각 |
|---|---|---|
| Producer | 이벤트를 Kafka에 쓰는 애플리케이션 | Order Service가 order.created를 발행 |
| Topic | 같은 종류의 이벤트가 모이는 이름 | payment.completed, user.signed_up |
| Partition | Topic 안에서 병렬 처리되는 로그 줄 | 같은 key는 같은 Partition으로 가서 순서가 유지됨 |
| Broker | Kafka 서버 한 대 | 여러 Broker가 Cluster를 구성하고 데이터를 복제 |
| Consumer Group | 같은 목적의 Consumer 묶음 | 알림 Consumer 3대가 이벤트를 나눠 처리 |
Message Contract
order.created
orderId가 key 후보입니다.
{
"eventId": "evt-20260513-0001",
"eventType": "order.created",
"occurredAt": "2026-05-13T20:45:00+09:00",
"orderId": "ORD-10043",
"userId": "USR-7781",
"amount": 59000,
"currency": "KRW",
"items": [
{ "sku": "BOOK-KAFKA-01", "quantity": 1 }
]
}
Producer Code
// Spring Boot + Spring Kafka 예시
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
public OrderResponse createOrder(CreateOrderRequest request) {
Order order = orderRepository.save(Order.from(request));
OrderCreatedEvent event = new OrderCreatedEvent(
UUID.randomUUID().toString(),
order.getId(),
order.getUserId(),
order.getAmount(),
order.getCreatedAt()
);
kafkaTemplate.send(
"order.created",
order.getId(), // partition key
event
);
return OrderResponse.from(order);
}
}
Consumer Code
@Component
public class NotificationConsumer {
private final ProcessedEventRepository processedEvents;
private final NotificationService notificationService;
@KafkaListener(
topics = "order.created",
groupId = "notification-service"
)
public void handle(OrderCreatedEvent event) {
if (processedEvents.existsById(event.eventId())) {
return; // duplicate message guard
}
notificationService.sendOrderCreatedMessage(
event.userId(),
event.orderId(),
event.amount()
);
processedEvents.save(event.eventId());
}
}
Consumer Group
When To Use
Checklist
order.created처럼 사건을 과거형으로 정의했는가?
Consumer가 필요한 최소 데이터와 버전 관리 기준이 있는가?
순서를 지켜야 하는 기준이 주문 ID인지 사용자 ID인지 정했는가?
중복, 재시도, DLQ, 알림 실패를 어떻게 다룰지 정했는가?
Interactive Demo
Order order = repo.save(req);
var event = OrderCreatedEvent.of(order);
kafka.send(
"order.created",
order.id(),
event
);
@KafkaListener("order.created")
void handle(OrderCreatedEvent e) {
notify.send(e.userId());
commitOffset(e);
}
Actual Code Path
order.created Topic으로 발행합니다.
Setup Code
1plugins { id 'java' }
2plugins { id 'org.springframework.boot' version '3.3.0' }
3
4dependencies {
5 implementation 'org.springframework.boot:spring-boot-starter-web'
6 implementation 'org.springframework.kafka:spring-kafka'
7 implementation 'org.springframework.boot:spring-boot-starter-validation'
8 testImplementation 'org.springframework.kafka:spring-kafka-test'
9}
10
11# application.yml
12spring.kafka.bootstrap-servers: localhost:9092
13spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
14spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
15spring.kafka.consumer.group-id: notification-service
16spring.kafka.consumer.auto-offset-reset: earliest
17spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
18spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
19spring.kafka.consumer.properties.spring.json.trusted.packages: com.example.kafka
20spring.kafka.listener.ack-mode: manual
Message Contract
1package com.example.kafka.order;
2
3import java.time.Instant;
4import java.util.UUID;
5
6public record OrderCreatedEvent(
7 String eventId,
8 String orderId,
9 String userId,
10 long amount,
11 Instant occurredAt
12) {
13 public static OrderCreatedEvent from(Order order) {
14 return new OrderCreatedEvent(
15 UUID.randomUUID().toString(),
16 order.id(),
17 order.userId(),
18 order.amount(),
19 Instant.now()
20 );
21 }
22}
23
24public final class KafkaTopics {
25 public static final String ORDER_CREATED = "order.created";
26 private KafkaTopics() {}
27}
Producer Code
1@Slf4j
2@Component
3@RequiredArgsConstructor
4public class OrderEventProducer {
5 private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
6
7 public void publishOrderCreated(Order order) {
8 OrderCreatedEvent event = OrderCreatedEvent.from(order);
9 String topic = KafkaTopics.ORDER_CREATED;
10 String key = event.orderId();
11
12 kafkaTemplate.send(topic, key, event)
13 .whenComplete((result, ex) -> {
14 if (ex != null) {
15 log.error("publish failed. topic={}, key={}", topic, key, ex);
16 return;
17 }
18 var meta = result.getRecordMetadata();
19 log.info("published. partition={}, offset={}", meta.partition(), meta.offset());
20 });
21 }
22}
Service Code
1package com.example.kafka.order;
2
3import lombok.RequiredArgsConstructor;
4import org.springframework.stereotype.Service;
5import org.springframework.transaction.annotation.Transactional;
6
7@Service
8@RequiredArgsConstructor
9public class OrderService {
10 private final OrderRepository orderRepository;
11 private final OrderEventProducer orderEventProducer;
12
13 @Transactional
14 public OrderResponse createOrder(CreateOrderRequest request) {
15 Order order = Order.create(
16 request.userId(),
17 request.items(),
18 request.amount()
19 );
20
21 Order savedOrder = orderRepository.save(order);
22 orderEventProducer.publishOrderCreated(savedOrder);
23
24 return OrderResponse.from(savedOrder);
25 }
26}
Consumer Code
1@Slf4j
2@Component
3@RequiredArgsConstructor
4public class NotificationConsumer {
5 private final ProcessedEventRepository processedEventRepository;
6 private final NotificationService notificationService;
7
8 @KafkaListener(topics = KafkaTopics.ORDER_CREATED, groupId = "notification-service")
9 public void handle(OrderCreatedEvent event, Acknowledgment ack) {
10 log.info("consume. eventId={}, orderId={}", event.eventId(), event.orderId());
11
12 if (processedEventRepository.existsByEventId(event.eventId())) {
13 ack.acknowledge();
14 return;
15 }
16
17 notificationService.sendOrderCreatedMessage(event.userId(), event.orderId(), event.amount());
18 processedEventRepository.save(event.eventId());
19 ack.acknowledge();
20 }
21}
order.created Topic을 notification-service group으로 읽습니다.Consumer Code · Processing Part
1public void handle(OrderCreatedEvent event, Acknowledgment ack) {
2 log.info("consume. eventId={}, orderId={}",
3 event.eventId(), event.orderId());
4
5 if (processedEventRepository.existsByEventId(event.eventId())) {
6 ack.acknowledge();
7 return;
8 }
9
10 notificationService.sendOrderCreatedMessage(
11 event.userId(),
12 event.orderId(),
13 event.amount()
14 );
15
16 processedEventRepository.save(event.eventId());
17 ack.acknowledge();
18}