Skip to content

Commit 24c0ab2

Browse files
authored
Merge pull request #33 from yeonsu00/round-8
[volume - 8] Decoupling with Kafka
2 parents 2a7a304 + 394c1b7 commit 24c0ab2

File tree

49 files changed

+1438
-33
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1438
-33
lines changed

apps/commerce-api/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ dependencies {
44
// add-ons
55
implementation(project(":modules:jpa"))
66
implementation(project(":modules:redis"))
7+
implementation(project(":modules:kafka"))
78
implementation(project(":supports:jackson"))
89
implementation(project(":supports:logging"))
910
implementation(project(":supports:monitoring"))

apps/commerce-api/src/main/java/com/loopers/application/like/LikeEvent.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@
33
public class LikeEvent {
44

55
public record LikeRecorded(
6-
Long productId
6+
Long productId,
7+
Long userId
78
) {
8-
public static LikeRecorded from(Long productId) {
9-
return new LikeRecorded(productId);
9+
public static LikeRecorded from(Long productId, Long userId) {
10+
return new LikeRecorded(productId, userId);
1011
}
1112
}
1213

1314
public record LikeCancelled(
14-
Long productId
15+
Long productId,
16+
Long userId
1517
) {
16-
public static LikeCancelled from(Long productId) {
17-
return new LikeCancelled(productId);
18+
public static LikeCancelled from(Long productId, Long userId) {
19+
return new LikeCancelled(productId, userId);
1820
}
1921
}
2022
}

apps/commerce-api/src/main/java/com/loopers/application/like/LikeFacade.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public LikeInfo recordLike(LikeCommand.LikeProductCommand command) {
4747
boolean wasCreated = likeService.recordLikeIfAbsent(user.getId(), command.productId());
4848

4949
if (wasCreated) {
50-
eventPublisher.publishEvent(LikeEvent.LikeRecorded.from(command.productId()));
50+
eventPublisher.publishEvent(LikeEvent.LikeRecorded.from(command.productId(), user.getId()));
5151
eventPublisher.publishEvent(UserBehaviorEvent.LikeRecorded.from(user.getId(), command.productId()));
5252
}
5353

@@ -82,7 +82,7 @@ public LikeInfo cancelLike(LikeCommand.LikeProductCommand command) {
8282
boolean wasDeleted = likeService.cancelLikeIfPresent(user.getId(), command.productId());
8383

8484
if (wasDeleted) {
85-
eventPublisher.publishEvent(LikeEvent.LikeCancelled.from(command.productId()));
85+
eventPublisher.publishEvent(LikeEvent.LikeCancelled.from(command.productId(), user.getId()));
8686
eventPublisher.publishEvent(UserBehaviorEvent.LikeCancelled.from(user.getId(), command.productId()));
8787
}
8888

apps/commerce-api/src/main/java/com/loopers/application/order/OrderEvent.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.loopers.application.order;
22

33
import com.loopers.domain.order.Order;
4+
import java.util.List;
45

56
public class OrderEvent {
67

@@ -19,18 +20,57 @@ public static CouponUsed from(Long couponId, Long userId) {
1920
public record OrderCreated(
2021
String loginId,
2122
String orderKey,
23+
Long orderId,
24+
Long userId,
2225
Integer originalTotalPrice,
2326
Integer discountPrice
2427
) {
2528
public static OrderCreated from(Order order, String loginId) {
2629
return new OrderCreated(
2730
loginId,
2831
order.getOrderKey(),
32+
order.getId(),
33+
order.getUserId(),
2934
order.getOriginalTotalPrice(),
3035
order.getDiscountPrice()
3136
);
3237
}
3338
}
3439

40+
public record OrderPaid(
41+
String orderKey,
42+
Long orderId,
43+
Long userId,
44+
Integer totalPrice,
45+
List<OrderItemInfo> orderItems
46+
) {
47+
public static OrderPaid from(Order order) {
48+
List<OrderItemInfo> orderItemInfos = order.getOrderItems().stream()
49+
.map(item -> new OrderItemInfo(
50+
item.getProductId(),
51+
item.getProductName(),
52+
item.getPrice(),
53+
item.getQuantity()
54+
))
55+
.toList();
56+
57+
return new OrderPaid(
58+
order.getOrderKey(),
59+
order.getId(),
60+
order.getUserId(),
61+
order.getOriginalTotalPrice() - (order.getDiscountPrice() != null ? order.getDiscountPrice() : 0),
62+
orderItemInfos
63+
);
64+
}
65+
}
66+
67+
public record OrderItemInfo(
68+
Long productId,
69+
String productName,
70+
Integer price,
71+
Integer quantity
72+
) {
73+
}
74+
3575

3676
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.loopers.application.product;
2+
3+
import java.time.ZonedDateTime;
4+
5+
public class ProductEvent {
6+
7+
public record StockDepleted(
8+
Long productId,
9+
Integer remainingStock,
10+
ZonedDateTime timestamp
11+
) {
12+
public static StockDepleted from(Long productId, Integer remainingStock) {
13+
return new StockDepleted(
14+
productId,
15+
remainingStock,
16+
ZonedDateTime.now()
17+
);
18+
}
19+
}
20+
}
21+
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.loopers.domain.outbox;
2+
3+
import com.loopers.domain.BaseEntity;
4+
import jakarta.persistence.Column;
5+
import jakarta.persistence.Entity;
6+
import jakarta.persistence.EnumType;
7+
import jakarta.persistence.Enumerated;
8+
import jakarta.persistence.Table;
9+
import lombok.Builder;
10+
import lombok.Getter;
11+
12+
@Entity
13+
@Table(name = "outbox")
14+
@Getter
15+
public class Outbox extends BaseEntity {
16+
17+
@Column(nullable = false, length = 100)
18+
private String topic;
19+
20+
@Column(nullable = false, length = 100)
21+
private String partitionKey;
22+
23+
@Column(nullable = false, columnDefinition = "TEXT")
24+
private String payload;
25+
26+
@Enumerated(EnumType.STRING)
27+
@Column(nullable = false, length = 20)
28+
private OutboxStatus status;
29+
30+
@Column(length = 500)
31+
private String errorMessage;
32+
33+
@Column(nullable = false)
34+
private Integer retryCount;
35+
36+
@Builder
37+
private Outbox(String topic, String partitionKey, String payload, OutboxStatus status, Integer retryCount) {
38+
this.topic = topic;
39+
this.partitionKey = partitionKey;
40+
this.payload = payload;
41+
this.status = status;
42+
this.retryCount = retryCount != null ? retryCount : 0;
43+
}
44+
45+
public Outbox() {
46+
this.retryCount = 0;
47+
}
48+
49+
public static Outbox create(String topic, String partitionKey, String payload) {
50+
return Outbox.builder()
51+
.topic(topic)
52+
.partitionKey(partitionKey)
53+
.payload(payload)
54+
.status(OutboxStatus.PENDING)
55+
.retryCount(0)
56+
.build();
57+
}
58+
59+
public boolean hasExceededMaxRetries(int maxRetries) {
60+
return this.retryCount >= maxRetries;
61+
}
62+
}
63+
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.loopers.domain.outbox;
2+
3+
import java.util.List;
4+
5+
public interface OutboxRepository {
6+
void saveOutbox(Outbox outbox);
7+
8+
List<Outbox> findPendingOutboxes(int limit);
9+
10+
List<Outbox> findFailedAndRetryableOutboxes(int limit, int maxRetries);
11+
12+
void markAsPublished(Long id);
13+
14+
void markAsFailed(Long id, String message);
15+
}
16+
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.loopers.domain.outbox;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.loopers.support.error.CoreException;
5+
import com.loopers.support.error.ErrorType;
6+
import java.util.List;
7+
import lombok.RequiredArgsConstructor;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.stereotype.Service;
11+
import org.springframework.transaction.annotation.Transactional;
12+
13+
@Slf4j
14+
@RequiredArgsConstructor
15+
@Service
16+
public class OutboxService {
17+
18+
private static final int MAX_RETRIES = 3;
19+
20+
private final OutboxRepository outboxRepository;
21+
private final KafkaTemplate<String, Object> kafkaTemplate;
22+
private final ObjectMapper objectMapper;
23+
24+
@Transactional
25+
public void saveOutbox(String topic, String partitionKey, Object event) {
26+
try {
27+
String payload = objectMapper.writeValueAsString(event);
28+
Outbox outbox = Outbox.create(topic, partitionKey, payload);
29+
outboxRepository.saveOutbox(outbox);
30+
} catch (Exception e) {
31+
throw new CoreException(ErrorType.INTERNAL_ERROR, "이벤트 저장에 실패했습니다.");
32+
}
33+
}
34+
35+
public void publishPendingOutboxes(int batchSize) {
36+
List<Outbox> pendingOutboxes = outboxRepository.findPendingOutboxes(batchSize);
37+
38+
for (Outbox outbox : pendingOutboxes) {
39+
kafkaTemplate
40+
.send(
41+
outbox.getTopic(),
42+
outbox.getPartitionKey(),
43+
outbox.getPayload()
44+
)
45+
.whenComplete((result, exception) -> {
46+
if (exception == null) {
47+
outboxRepository.markAsPublished(outbox.getId());
48+
} else {
49+
outboxRepository.markAsFailed(outbox.getId(), exception.getMessage());
50+
}
51+
});
52+
}
53+
}
54+
55+
public void retryFailedOutboxes(int batchSize) {
56+
List<Outbox> failedOutboxes = outboxRepository.findFailedAndRetryableOutboxes(batchSize, MAX_RETRIES);
57+
58+
for (Outbox outbox : failedOutboxes) {
59+
if (outbox.hasExceededMaxRetries(MAX_RETRIES)) {
60+
log.warn("Outbox 재시도 횟수 초과: outboxId={}, retryCount={}, topic={}",
61+
outbox.getId(), outbox.getRetryCount(), outbox.getTopic());
62+
continue;
63+
}
64+
65+
kafkaTemplate
66+
.send(
67+
outbox.getTopic(),
68+
outbox.getPartitionKey(),
69+
outbox.getPayload()
70+
)
71+
.whenComplete((result, exception) -> {
72+
if (exception == null) {
73+
outboxRepository.markAsPublished(outbox.getId());
74+
} else {
75+
outboxRepository.markAsFailed(outbox.getId(), exception.getMessage());
76+
}
77+
});
78+
}
79+
}
80+
}
81+
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.loopers.domain.outbox;
2+
3+
public enum OutboxStatus {
4+
PENDING,
5+
PUBLISHED,
6+
FAILED
7+
}
8+

apps/commerce-api/src/main/java/com/loopers/domain/payment/PaymentService.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import java.time.ZonedDateTime;
66
import java.util.List;
77
import lombok.RequiredArgsConstructor;
8-
import org.springframework.context.ApplicationEventPublisher;
98
import org.springframework.stereotype.Service;
109

1110
@RequiredArgsConstructor
@@ -14,7 +13,6 @@ public class PaymentService {
1413

1514
private final PaymentRepository paymentRepository;
1615
private final PaymentClient paymentClient;
17-
private final ApplicationEventPublisher eventPublisher;
1816

1917
public void createPayment(Integer amount, String orderKey) {
2018
Payment payment = Payment.createPayment(amount, orderKey);

0 commit comments

Comments
 (0)