Skip to content

Commit fb4273d

Browse files
committed
refactor: Interaction/Order 컨슈머 멱등성 4-TX → 단일 TX로 전환
기존 REQUIRES_NEW tryMark 방식은 event_handled INSERT가 독립 커밋되어, upsert 실패 시 재시도가 영구 스킵되는 데이터 유실 버그가 있었음. existsCheck + eventHandled save + upsert를 단일 @transactional로 묶고, DIV(PK 충돌)는 TX 바깥에서 catch하여 rollback-only 마킹 문제를 원천 차단.
1 parent 775b155 commit fb4273d

File tree

4 files changed

+71
-32
lines changed

4 files changed

+71
-32
lines changed

apps/commerce-streamer/src/main/java/com/loopers/application/metrics/InteractionMetricProcessor.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,38 @@
11
package com.loopers.application.metrics;
22

3-
import com.loopers.application.idempotent.IdempotencyChecker;
43
import com.loopers.application.log.EventLogWriter;
5-
import com.loopers.domain.metrics.ProductLikeMetricRepository;
64
import lombok.RequiredArgsConstructor;
5+
import org.springframework.dao.DataIntegrityViolationException;
76
import org.springframework.stereotype.Component;
87

98
import java.time.Instant;
10-
import java.time.LocalDateTime;
119

1210
@Component
1311
@RequiredArgsConstructor
1412
public class InteractionMetricProcessor {
1513

16-
private final IdempotencyChecker idempotencyChecker;
17-
private final ProductLikeMetricRepository likeMetricRepository;
14+
private final InteractionMetricWriter interactionMetricWriter;
1815
private final EventLogWriter eventLogWriter;
1916
private final ConsumerMetrics consumerMetrics;
2017

2118
public void process(String eventId, String eventType, String topic, String groupId,
2219
Long productId, Instant occurredAt) {
2320
long start = System.currentTimeMillis();
2421

25-
if (!idempotencyChecker.tryMark(eventId, eventType)) {
22+
try {
23+
boolean processed = interactionMetricWriter.write(eventId, eventType, productId, occurredAt);
24+
25+
if (processed) {
26+
long durationMs = System.currentTimeMillis() - start;
27+
eventLogWriter.saveProcessed(eventId, eventType, topic, groupId, durationMs);
28+
consumerMetrics.recordProcessed(topic, groupId, eventType, durationMs);
29+
} else {
30+
eventLogWriter.saveSkipped(eventId, eventType, topic, groupId);
31+
consumerMetrics.recordSkipped(topic, groupId, eventType);
32+
}
33+
} catch (DataIntegrityViolationException e) {
2634
eventLogWriter.saveSkipped(eventId, eventType, topic, groupId);
2735
consumerMetrics.recordSkipped(topic, groupId, eventType);
28-
return;
29-
}
30-
31-
try {
32-
LocalDateTime bucketTime = BucketTimeUtils.toLocalDateTime(BucketTimeUtils.truncate5min(occurredAt));
33-
int delta = "product.liked".equals(eventType) ? 1 : -1;
34-
likeMetricRepository.upsert(productId, bucketTime, delta);
35-
36-
long durationMs = System.currentTimeMillis() - start;
37-
eventLogWriter.saveProcessed(eventId, eventType, topic, groupId, durationMs);
38-
consumerMetrics.recordProcessed(topic, groupId, eventType, durationMs);
3936
} catch (Exception e) {
4037
long durationMs = System.currentTimeMillis() - start;
4138
eventLogWriter.saveFailed(eventId, eventType, topic, groupId, e.getMessage(), durationMs);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.loopers.application.metrics;
2+
3+
import com.loopers.domain.idempotent.EventHandled;
4+
import com.loopers.domain.idempotent.EventHandledRepository;
5+
import com.loopers.domain.metrics.ProductLikeMetricRepository;
6+
import lombok.RequiredArgsConstructor;
7+
import org.springframework.stereotype.Component;
8+
import org.springframework.transaction.annotation.Transactional;
9+
10+
import java.time.Instant;
11+
import java.time.LocalDateTime;
12+
13+
@Component
14+
@RequiredArgsConstructor
15+
public class InteractionMetricWriter {
16+
17+
private final EventHandledRepository eventHandledRepository;
18+
private final ProductLikeMetricRepository likeMetricRepository;
19+
20+
@Transactional
21+
public boolean write(String eventId, String eventType, Long productId, Instant occurredAt) {
22+
if (eventHandledRepository.existsByEventId(eventId)) {
23+
return false;
24+
}
25+
eventHandledRepository.save(EventHandled.create(eventId, eventType));
26+
27+
LocalDateTime bucketTime = BucketTimeUtils.toLocalDateTime(BucketTimeUtils.truncate5min(occurredAt));
28+
int delta = "product.liked".equals(eventType) ? 1 : -1;
29+
likeMetricRepository.upsert(productId, bucketTime, delta);
30+
return true;
31+
}
32+
}

apps/commerce-streamer/src/main/java/com/loopers/application/metrics/OrderMetricProcessor.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
package com.loopers.application.metrics;
22

3-
import com.loopers.application.idempotent.IdempotencyChecker;
43
import com.loopers.application.log.EventLogWriter;
54
import lombok.RequiredArgsConstructor;
5+
import org.springframework.dao.DataIntegrityViolationException;
66
import org.springframework.stereotype.Component;
77

88
import java.time.Instant;
9-
import java.time.LocalDateTime;
109
import java.util.List;
1110

1211
@Component
1312
@RequiredArgsConstructor
1413
public class OrderMetricProcessor {
1514

16-
private final IdempotencyChecker idempotencyChecker;
1715
private final OrderMetricWriter orderMetricWriter;
1816
private final EventLogWriter eventLogWriter;
1917
private final ConsumerMetrics consumerMetrics;
@@ -22,19 +20,20 @@ public void process(String eventId, String eventType, String topic, String group
2220
Instant occurredAt, List<OrderItemMetric> items) {
2321
long start = System.currentTimeMillis();
2422

25-
if (!idempotencyChecker.tryMark(eventId, eventType)) {
23+
try {
24+
boolean processed = orderMetricWriter.write(eventId, eventType, occurredAt, items);
25+
26+
if (processed) {
27+
long durationMs = System.currentTimeMillis() - start;
28+
eventLogWriter.saveProcessed(eventId, eventType, topic, groupId, durationMs);
29+
consumerMetrics.recordProcessed(topic, groupId, eventType, durationMs);
30+
} else {
31+
eventLogWriter.saveSkipped(eventId, eventType, topic, groupId);
32+
consumerMetrics.recordSkipped(topic, groupId, eventType);
33+
}
34+
} catch (DataIntegrityViolationException e) {
2635
eventLogWriter.saveSkipped(eventId, eventType, topic, groupId);
2736
consumerMetrics.recordSkipped(topic, groupId, eventType);
28-
return;
29-
}
30-
31-
try {
32-
LocalDateTime bucketTime = BucketTimeUtils.toLocalDateTime(BucketTimeUtils.truncate5min(occurredAt));
33-
orderMetricWriter.upsertAll(bucketTime, items);
34-
35-
long durationMs = System.currentTimeMillis() - start;
36-
eventLogWriter.saveProcessed(eventId, eventType, topic, groupId, durationMs);
37-
consumerMetrics.recordProcessed(topic, groupId, eventType, durationMs);
3837
} catch (Exception e) {
3938
long durationMs = System.currentTimeMillis() - start;
4039
eventLogWriter.saveFailed(eventId, eventType, topic, groupId, e.getMessage(), durationMs);
Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,38 @@
11
package com.loopers.application.metrics;
22

33
import com.loopers.application.metrics.OrderMetricProcessor.OrderItemMetric;
4+
import com.loopers.domain.idempotent.EventHandled;
5+
import com.loopers.domain.idempotent.EventHandledRepository;
46
import com.loopers.domain.metrics.ProductOrderMetricRepository;
57
import lombok.RequiredArgsConstructor;
68
import org.springframework.stereotype.Component;
79
import org.springframework.transaction.annotation.Transactional;
810

11+
import java.time.Instant;
912
import java.time.LocalDateTime;
1013
import java.util.List;
1114

1215
@Component
1316
@RequiredArgsConstructor
1417
public class OrderMetricWriter {
1518

19+
private final EventHandledRepository eventHandledRepository;
1620
private final ProductOrderMetricRepository orderMetricRepository;
1721

1822
@Transactional
19-
public void upsertAll(LocalDateTime bucketTime, List<OrderItemMetric> items) {
23+
public boolean write(String eventId, String eventType, Instant occurredAt, List<OrderItemMetric> items) {
24+
if (eventHandledRepository.existsByEventId(eventId)) {
25+
return false;
26+
}
27+
eventHandledRepository.save(EventHandled.create(eventId, eventType));
28+
29+
LocalDateTime bucketTime = BucketTimeUtils.toLocalDateTime(BucketTimeUtils.truncate5min(occurredAt));
2030
for (OrderItemMetric item : items) {
2131
orderMetricRepository.upsert(
2232
item.productId(), bucketTime,
2333
1, item.quantity(), item.salesAmount()
2434
);
2535
}
36+
return true;
2637
}
2738
}

0 commit comments

Comments
 (0)