Skip to content

Commit 85d081b

Browse files
authored
[volume - 8] Decoupling with Kafka (#208)
* feat: 주문 기능 Kafaka 프로듀서 및 컨슈머 구현 * fix: outbox 관련 코드 소실 복구 * feat: catalog-events 구현
1 parent 8dc52b5 commit 85d081b

File tree

26 files changed

+916
-9
lines changed

26 files changed

+916
-9
lines changed

apps/commerce-api/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ dependencies {
55
implementation(project(":supports:jackson"))
66
implementation(project(":supports:logging"))
77
implementation(project(":supports:monitoring"))
8+
implementation(project(":modules:kafka"))
89

910
// web
1011
implementation("org.springframework.boot:spring-boot-starter-web")
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.loopers.application.kafka;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
5+
import com.loopers.domain.outbox.EventOutbox;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.beans.factory.annotation.Value;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.kafka.support.SendResult;
11+
import org.springframework.stereotype.Component;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import java.util.Map;
14+
15+
@Slf4j
16+
@Component
17+
@RequiredArgsConstructor
18+
public class EventKafkaProducer {
19+
20+
private final KafkaTemplate<Object, Object> kafkaTemplate;
21+
private final ObjectMapper objectMapper;
22+
23+
@Value("${kafka.topics.catalog-events}")
24+
private String catalogEventsTopic;
25+
26+
@Value("${kafka.topics.order-events}")
27+
private String orderEventsTopic;
28+
/**
29+
* Outbox 이벤트를 Kafka로 발행
30+
*/
31+
public CompletableFuture<SendResult<Object, Object>> publish(EventOutbox outbox) {
32+
String topic = getTopicByAggregateType(outbox.getAggregateType());
33+
String partitionKey = outbox.getEventKey();
34+
35+
log.info("Kafka 발행 시작 - topic: {}, key: {}, eventType: {}",
36+
topic, partitionKey, outbox.getEventType());
37+
38+
String envelope;
39+
try {
40+
// payload는 JSON 문자열이므로 Map으로 역직렬화 후, envelope에 객체로 포함
41+
Object payloadObject = objectMapper.readValue(outbox.getPayload(), Object.class);
42+
Map<String, Object> eventEnvelope = Map.of(
43+
"id", outbox.getId(),
44+
"aggregateType", outbox.getAggregateType(),
45+
"aggregateId", outbox.getAggregateId(),
46+
"eventType", outbox.getEventType(),
47+
"occurredAt", outbox.getCreatedAt().toString(),
48+
"payload", payloadObject
49+
);
50+
envelope = objectMapper.writeValueAsString(eventEnvelope);
51+
} catch (Exception e) {
52+
throw new RuntimeException("이벤트 엔벨롭 생성 실패", e);
53+
}
54+
55+
return kafkaTemplate.send(topic, partitionKey, envelope)
56+
.thenApply(result -> {
57+
log.info("Kafka 발행 성공 - topic: {}, partition: {}, offset: {}, eventId: {}",
58+
topic,
59+
result.getRecordMetadata().partition(),
60+
result.getRecordMetadata().offset(),
61+
outbox.getId());
62+
return result;
63+
})
64+
.exceptionally(ex -> {
65+
log.error("Kafka 발행 실패 - topic: {}, key: {}, eventId: {}, error: {}",
66+
topic, partitionKey, outbox.getId(), ex.getMessage(), ex);
67+
throw new RuntimeException("Kafka 발행 실패", ex);
68+
});
69+
}
70+
71+
72+
private String getTopicByAggregateType(String aggregateType) {
73+
return switch (aggregateType.toUpperCase()) {
74+
case "ORDER", "PAYMENT" -> orderEventsTopic;
75+
case "PRODUCT", "LIKE" -> catalogEventsTopic;
76+
default -> throw new IllegalArgumentException("존재하지 않는 AggregateType: " + aggregateType);
77+
};
78+
}
79+
}

apps/commerce-api/src/main/java/com/loopers/application/order/OrderFacade.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.loopers.domain.order.Order;
44
import com.loopers.domain.order.OrderItem;
55
import com.loopers.domain.order.OrderRepository;
6+
import com.loopers.application.outbox.OutboxEventService;
67
import com.loopers.domain.point.Point;
78
import com.loopers.domain.point.PointRepository;
89
import com.loopers.domain.product.Product;
@@ -28,6 +29,7 @@ public class OrderFacade {
2829
private final PointRepository pointRepository;
2930
private final ProductRepository productRepository;
3031
private final ApplicationEventPublisher eventPublisher;
32+
private final OutboxEventService outboxService;
3133

3234

3335
@Transactional
@@ -93,6 +95,15 @@ public Order createOrder(String userId, List<OrderItem> orderItems){
9395
eventPublisher.publishEvent(event);
9496
log.info("주문 생성 이벤트 발행: {}", order.getId());
9597

98+
// 7. Outbox 저장
99+
outboxService.saveEvent(
100+
"ORDER", // aggregateType
101+
order.getId().toString(), // aggregateId
102+
"OrderCreatedEvent", // eventType
103+
order.getId().toString(), // eventKey (partition key)
104+
event // payload
105+
);
106+
96107
return order;
97108
}
98109

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.loopers.application.outbox;
2+
3+
import com.loopers.application.kafka.EventKafkaProducer;
4+
import com.loopers.domain.outbox.EventOutbox;
5+
import com.loopers.domain.outbox.OutboxStatus;
6+
import com.loopers.domain.outbox.EventOutboxRepository;
7+
import lombok.RequiredArgsConstructor;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10+
import org.springframework.scheduling.annotation.Scheduled;
11+
import org.springframework.stereotype.Component;
12+
import org.springframework.transaction.annotation.Transactional;
13+
14+
import java.time.LocalDateTime;
15+
import java.util.List;
16+
17+
@Slf4j
18+
@Component
19+
@RequiredArgsConstructor
20+
@ConditionalOnProperty(value = "outbox.publisher.enabled", havingValue = "true")
21+
public class OutboxEventPublisher {
22+
23+
private final EventOutboxRepository outboxRepository;
24+
private final EventKafkaProducer kafkaProducer;
25+
26+
@Scheduled(fixedDelay = 1000)
27+
@Transactional
28+
public void publishPendingEvents() {
29+
// PENDING 이벤트 처리
30+
List<EventOutbox> pending = outboxRepository.findPendingEvents();
31+
for (EventOutbox event : pending) {
32+
tryPublish(event);
33+
}
34+
35+
// 재시도 대상 FAILED 이벤트 처리
36+
List<EventOutbox> retryables = outboxRepository.findFailedEventsCanRetry();
37+
for (EventOutbox event : retryables) {
38+
tryPublish(event);
39+
}
40+
}
41+
42+
private void tryPublish(EventOutbox event) {
43+
try {
44+
kafkaProducer.publish(event).join();
45+
event.setStatus(OutboxStatus.PUBLISHED);
46+
event.setLastAttemptAt(LocalDateTime.now());
47+
} catch (Exception ex) {
48+
event.setStatus(OutboxStatus.FAILED);
49+
event.setLastAttemptAt(LocalDateTime.now());
50+
event.setAttemptCount(event.getAttemptCount() + 1);
51+
log.error("Outbox 발행 실패 - eventId={}, error={}", event.getId(), ex.getMessage(), ex);
52+
}
53+
}
54+
}
55+
56+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.loopers.application.outbox;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.loopers.domain.outbox.EventOutbox;
6+
import com.loopers.domain.outbox.EventOutboxRepository;
7+
import lombok.RequiredArgsConstructor;
8+
import org.springframework.stereotype.Service;
9+
import org.springframework.transaction.annotation.Transactional;
10+
11+
@Service
12+
@RequiredArgsConstructor
13+
public class OutboxEventService {
14+
15+
private final ObjectMapper objectMapper;
16+
private final EventOutboxRepository outboxRepository;
17+
18+
@Transactional
19+
public EventOutbox saveEvent(String aggregateType,
20+
String aggregateId,
21+
String eventType,
22+
String eventKey,
23+
Object payload) {
24+
try {
25+
String json = objectMapper.writeValueAsString(payload);
26+
EventOutbox outbox = EventOutbox.of(aggregateType, aggregateId, eventType, eventKey, json);
27+
return outboxRepository.save(outbox);
28+
} catch (JsonProcessingException e) {
29+
throw new IllegalArgumentException("Outbox payload 직렬화 실패", e);
30+
}
31+
}
32+
}
33+
34+

apps/commerce-api/src/main/java/com/loopers/application/payment/PaymentSyncScheduler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,6 @@ public void syncPendingPayments() {
5050

5151

5252

53+
54+
55+

apps/commerce-api/src/main/java/com/loopers/domain/like/LikeService.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,15 @@
55
import lombok.RequiredArgsConstructor;
66
import org.springframework.stereotype.Service;
77
import org.springframework.transaction.annotation.Transactional;
8-
import org.springframework.context.ApplicationEventPublisher;
9-
import com.loopers.domain.like.event.LikeCreatedEvent;
10-
import com.loopers.domain.like.event.LikeDeletedEvent;
8+
import com.loopers.application.outbox.OutboxEventService;
119
import java.util.List;
1210

1311
@Service
1412
@RequiredArgsConstructor
1513
public class LikeService {
1614
private final LikeRepository likeRepository;
1715
private final ProductRepository productRepository;
18-
private final ApplicationEventPublisher eventPublisher;
16+
private final OutboxEventService outboxEventService;
1917

2018
//상품 좋아요
2119
@Transactional
@@ -30,8 +28,14 @@ public void likeProduct(String userId, Long productId){
3028
// 2) 좋아요 기록 저장
3129
likeRepository.save(new Like(userId, productId));
3230

33-
// 3) 집계는 이벤트로 처리
34-
eventPublisher.publishEvent(new LikeCreatedEvent(userId, productId));
31+
// 3) Outbox 기록
32+
outboxEventService.saveEvent(
33+
"LIKE",
34+
String.valueOf(productId),
35+
"LIKE_CREATED",
36+
String.valueOf(productId),
37+
new LikePayload(userId, productId)
38+
);
3539

3640
} catch (DataIntegrityViolationException ignored) {
3741
// 중복 좋아요 race condition 대응
@@ -47,8 +51,14 @@ public void cancleLikeProduct(String userId, Long productId){
4751
.ifPresent(like -> {
4852
likeRepository.delete(like);
4953

50-
// 집계는 이벤트로 처리
51-
eventPublisher.publishEvent(new LikeDeletedEvent(userId, productId));
54+
// Outbox 기록
55+
outboxEventService.saveEvent(
56+
"LIKE",
57+
String.valueOf(productId),
58+
"LIKE_DELETED",
59+
String.valueOf(productId),
60+
new LikePayload(userId, productId)
61+
);
5262
});
5363

5464
}
@@ -57,4 +67,6 @@ public void cancleLikeProduct(String userId, Long productId){
5767
public List<Like> getUserLikeProduct(String userId){
5868
return likeRepository.findAllByUserId(userId);
5969
}
70+
71+
private record LikePayload(String userId, Long productId) {}
6072
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.loopers.domain.outbox;
2+
3+
import jakarta.persistence.Column;
4+
import jakarta.persistence.Entity;
5+
import jakarta.persistence.EnumType;
6+
import jakarta.persistence.Enumerated;
7+
import jakarta.persistence.Id;
8+
import jakarta.persistence.Lob;
9+
import jakarta.persistence.Table;
10+
import lombok.AccessLevel;
11+
import lombok.AllArgsConstructor;
12+
import lombok.Builder;
13+
import lombok.Getter;
14+
import lombok.NoArgsConstructor;
15+
import lombok.Setter;
16+
17+
import java.time.LocalDateTime;
18+
import java.util.UUID;
19+
20+
@Entity
21+
@Table(name = "event_outbox")
22+
@Getter
23+
@NoArgsConstructor(access = AccessLevel.PROTECTED)
24+
@AllArgsConstructor
25+
@Builder
26+
public class EventOutbox {
27+
28+
@Id
29+
@Column(nullable = false, updatable = false)
30+
private String id;
31+
32+
@Column(nullable = false)
33+
private String aggregateType;
34+
35+
@Column(nullable = false)
36+
private String aggregateId;
37+
38+
@Column(nullable = false)
39+
private String eventType;
40+
41+
@Column(nullable = false)
42+
private String eventKey; // Kafka partition key
43+
44+
@Lob
45+
@Column(nullable = false)
46+
private String payload;
47+
48+
@Enumerated(EnumType.STRING)
49+
@Column(nullable = false)
50+
@Setter
51+
private OutboxStatus status;
52+
53+
@Column(nullable = false)
54+
private LocalDateTime createdAt;
55+
56+
@Setter
57+
private LocalDateTime lastAttemptAt;
58+
59+
@Setter
60+
@Column(nullable = false)
61+
private int attemptCount;
62+
63+
public static EventOutbox of(String aggregateType, String aggregateId, String eventType, String eventKey, String payload) {
64+
return EventOutbox.builder()
65+
.id(UUID.randomUUID().toString())
66+
.aggregateType(aggregateType)
67+
.aggregateId(aggregateId)
68+
.eventType(eventType)
69+
.eventKey(eventKey)
70+
.payload(payload)
71+
.status(OutboxStatus.PENDING)
72+
.createdAt(LocalDateTime.now())
73+
.attemptCount(0)
74+
.build();
75+
}
76+
}
77+
78+
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.loopers.domain.outbox;
2+
3+
import java.util.List;
4+
5+
public interface EventOutboxRepository {
6+
EventOutbox save(EventOutbox outbox);
7+
List<EventOutbox> findPendingEvents();
8+
List<EventOutbox> findFailedEventsCanRetry();
9+
}
10+
11+
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.loopers.domain.outbox;
2+
3+
public enum OutboxStatus {
4+
PENDING,
5+
PUBLISHED,
6+
FAILED
7+
}
8+
9+

0 commit comments

Comments
 (0)