Skip to content

Commit 3fa6cf6

Browse files
authored
Merge pull request #34 from yeonsu00/refactor/week8
Refactor/week8
2 parents 394c1b7 + 1cff8df commit 3fa6cf6

File tree

15 files changed

+98
-61
lines changed

15 files changed

+98
-61
lines changed

apps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxService.java

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.loopers.support.error.CoreException;
55
import com.loopers.support.error.ErrorType;
66
import java.util.List;
7+
import java.util.concurrent.TimeUnit;
78
import lombok.RequiredArgsConstructor;
89
import lombok.extern.slf4j.Slf4j;
910
import org.springframework.kafka.core.KafkaTemplate;
@@ -18,7 +19,7 @@ public class OutboxService {
1819
private static final int MAX_RETRIES = 3;
1920

2021
private final OutboxRepository outboxRepository;
21-
private final KafkaTemplate<String, Object> kafkaTemplate;
22+
private final KafkaTemplate<String, String> kafkaTemplate;
2223
private final ObjectMapper objectMapper;
2324

2425
@Transactional
@@ -36,19 +37,16 @@ public void publishPendingOutboxes(int batchSize) {
3637
List<Outbox> pendingOutboxes = outboxRepository.findPendingOutboxes(batchSize);
3738

3839
for (Outbox outbox : pendingOutboxes) {
39-
kafkaTemplate
40-
.send(
41-
outbox.getTopic(),
42-
outbox.getPartitionKey(),
43-
outbox.getPayload()
44-
)
45-
.whenComplete((result, exception) -> {
46-
if (exception == null) {
47-
outboxRepository.markAsPublished(outbox.getId());
48-
} else {
49-
outboxRepository.markAsFailed(outbox.getId(), exception.getMessage());
50-
}
51-
});
40+
try {
41+
kafkaTemplate.send(
42+
outbox.getTopic(),
43+
outbox.getPartitionKey(),
44+
outbox.getPayload()
45+
).get();
46+
outboxRepository.markAsPublished(outbox.getId());
47+
} catch (Exception e) {
48+
outboxRepository.markAsFailed(outbox.getId(), e.getMessage());
49+
}
5250
}
5351
}
5452

@@ -62,19 +60,16 @@ public void retryFailedOutboxes(int batchSize) {
6260
continue;
6361
}
6462

65-
kafkaTemplate
66-
.send(
67-
outbox.getTopic(),
68-
outbox.getPartitionKey(),
69-
outbox.getPayload()
70-
)
71-
.whenComplete((result, exception) -> {
72-
if (exception == null) {
73-
outboxRepository.markAsPublished(outbox.getId());
74-
} else {
75-
outboxRepository.markAsFailed(outbox.getId(), exception.getMessage());
76-
}
77-
});
63+
try {
64+
kafkaTemplate.send(
65+
outbox.getTopic(),
66+
outbox.getPartitionKey(),
67+
outbox.getPayload()
68+
).get(5, TimeUnit.SECONDS);
69+
outboxRepository.markAsPublished(outbox.getId());
70+
} catch (Exception e) {
71+
outboxRepository.markAsFailed(outbox.getId(), e.getMessage());
72+
}
7873
}
7974
}
8075
}

apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxJpaRepository.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@
33
import com.loopers.domain.outbox.Outbox;
44
import com.loopers.domain.outbox.OutboxStatus;
55
import java.util.List;
6+
import org.springframework.data.domain.Pageable;
67
import org.springframework.data.jpa.repository.JpaRepository;
78
import org.springframework.data.jpa.repository.Modifying;
89
import org.springframework.data.jpa.repository.Query;
910
import org.springframework.data.repository.query.Param;
1011

1112
public interface OutboxJpaRepository extends JpaRepository<Outbox, Long> {
1213
@Query("SELECT o FROM Outbox o WHERE o.status = :status ORDER BY o.createdAt ASC")
13-
List<Outbox> findByStatusOrderByCreatedAtAsc(@Param("status") OutboxStatus status);
14+
List<Outbox> findByStatusOrderByCreatedAtAsc(@Param("status") OutboxStatus status, Pageable pageable);
1415

1516
@Query("SELECT o FROM Outbox o WHERE o.status = :status AND o.retryCount < :maxRetries ORDER BY o.createdAt ASC")
16-
List<Outbox> findFailedOutboxesForRetry(@Param("status") OutboxStatus status, @Param("maxRetries") int maxRetries);
17+
List<Outbox> findFailedOutboxesForRetry(@Param("status") OutboxStatus status, @Param("maxRetries") int maxRetries, Pageable pageable);
1718

1819
@Modifying(clearAutomatically = true)
1920
@Query("UPDATE Outbox o SET o.status = 'PUBLISHED', o.errorMessage = NULL WHERE o.id = :id")

apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRepositoryImpl.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import com.loopers.domain.outbox.OutboxStatus;
66
import java.util.List;
77
import lombok.RequiredArgsConstructor;
8+
import org.springframework.data.domain.PageRequest;
89
import org.springframework.stereotype.Repository;
10+
import org.springframework.transaction.annotation.Transactional;
911

1012
@RequiredArgsConstructor
1113
@Repository
@@ -20,26 +22,22 @@ public void saveOutbox(Outbox outbox) {
2022

2123
@Override
2224
public List<Outbox> findPendingOutboxes(int limit) {
23-
return outboxJpaRepository.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING)
24-
.stream()
25-
.limit(limit)
26-
.toList();
25+
return outboxJpaRepository.findByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING, PageRequest.of(0, limit));
2726
}
2827

2928
@Override
3029
public List<Outbox> findFailedAndRetryableOutboxes(int limit, int maxRetries) {
31-
return outboxJpaRepository.findFailedOutboxesForRetry(OutboxStatus.FAILED, maxRetries)
32-
.stream()
33-
.limit(limit)
34-
.toList();
30+
return outboxJpaRepository.findFailedOutboxesForRetry(OutboxStatus.FAILED, maxRetries, PageRequest.of(0, limit));
3531
}
3632

3733
@Override
34+
@Transactional
3835
public void markAsPublished(Long id) {
3936
outboxJpaRepository.updateOutboxPublishedStatus(id);
4037
}
4138

4239
@Override
40+
@Transactional
4341
public void markAsFailed(Long id, String message) {
4442
outboxJpaRepository.updateOutboxFailedStatusAndErrorMessageAndRetryCount(id, message);
4543
}

apps/commerce-api/src/main/java/com/loopers/interfaces/scheduler/OutboxRelayScheduler.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,38 @@
22

33
import com.loopers.domain.outbox.OutboxService;
44
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
56
import org.springframework.scheduling.annotation.Scheduled;
67
import org.springframework.stereotype.Component;
78

89
@RequiredArgsConstructor
910
@Component
11+
@Slf4j
1012
public class OutboxRelayScheduler {
1113

1214
private final OutboxService outboxService;
1315
private static final int BATCH_SIZE = 100;
1416

1517
@Scheduled(fixedDelay = 3000)
1618
public void relayOutboxEvents() {
17-
outboxService.publishPendingOutboxes(BATCH_SIZE);
19+
try {
20+
log.debug("Outbox 이벤트 릴레이 시작 - batchSize: {}", BATCH_SIZE);
21+
outboxService.publishPendingOutboxes(BATCH_SIZE);
22+
log.debug("Outbox 이벤트 릴레이 완료");
23+
} catch (Exception e) {
24+
log.error("Outbox 이벤트 릴레이 중 오류 발생", e);
25+
}
1826
}
1927

2028
@Scheduled(fixedDelay = 5000)
2129
public void retryFailedOutboxEvents() {
22-
outboxService.retryFailedOutboxes(BATCH_SIZE);
30+
try {
31+
log.debug("실패한 Outbox 이벤트 재시도 시작 - batchSize: {}", BATCH_SIZE);
32+
outboxService.retryFailedOutboxes(BATCH_SIZE);
33+
log.debug("실패한 Outbox 이벤트 재시도 완료");
34+
} catch (Exception e) {
35+
log.error("실패한 Outbox 이벤트 재시도 중 오류 발생", e);
36+
}
2337
}
2438
}
2539

apps/commerce-api/src/test/java/com/loopers/support/IntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@
66
public abstract class IntegrationTest {
77

88
@MockitoBean
9-
protected KafkaTemplate<String, Object> kafkaTemplate;
9+
protected KafkaTemplate<String, String> kafkaTemplate;
1010
}

apps/commerce-streamer/src/main/java/com/loopers/domain/eventhandled/EventHandledService.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.loopers.domain.eventhandled;
22

33
import lombok.RequiredArgsConstructor;
4+
import org.springframework.dao.DataIntegrityViolationException;
45
import org.springframework.stereotype.Service;
56
import org.springframework.transaction.annotation.Transactional;
67

@@ -17,10 +18,8 @@ public boolean isAlreadyHandled(String eventId) {
1718

1819
@Transactional
1920
public void markAsHandled(String eventId, String eventType, String aggregateKey) {
20-
if (!eventHandledRepository.existsByEventId(eventId)) {
21-
EventHandled eventHandled = EventHandled.create(eventId, eventType, aggregateKey);
22-
eventHandledRepository.saveEventHandled(eventHandled);
23-
}
21+
EventHandled eventHandled = EventHandled.create(eventId, eventType, aggregateKey);
22+
eventHandledRepository.saveEventHandled(eventHandled);
2423
}
2524
}
2625

apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@ public interface ProductMetricsRepository {
66
void saveProductMetrics(ProductMetrics productMetrics);
77

88
Optional<ProductMetrics> findByProductId(Long productId);
9+
10+
int incrementLikeCount(Long productId);
911
}
1012

apps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetricsService.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,42 @@
11
package com.loopers.domain.metrics;
22

33
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.dao.DataIntegrityViolationException;
46
import org.springframework.stereotype.Service;
57
import org.springframework.transaction.annotation.Transactional;
68

79
@RequiredArgsConstructor
810
@Service
11+
@Slf4j
912
public class ProductMetricsService {
1013

1114
private final ProductMetricsRepository productMetricsRepository;
1215

1316
@Transactional
1417
public void incrementLikeCount(Long productId) {
15-
ProductMetrics metrics = productMetricsRepository.findByProductId(productId)
16-
.orElseGet(() -> {
17-
ProductMetrics newMetrics = ProductMetrics.create(productId);
18-
productMetricsRepository.saveProductMetrics(newMetrics);
19-
return newMetrics;
20-
});
21-
metrics.incrementLikeCount();
22-
productMetricsRepository.saveProductMetrics(metrics);
18+
int updatedRows = productMetricsRepository.incrementLikeCount(productId);
19+
20+
if (updatedRows == 0) {
21+
try {
22+
ProductMetrics newMetrics = ProductMetrics.create(productId);
23+
newMetrics.incrementLikeCount();
24+
productMetricsRepository.saveProductMetrics(newMetrics);
25+
} catch (DataIntegrityViolationException e) {
26+
productMetricsRepository.incrementLikeCount(productId);
27+
}
28+
}
2329
}
2430

2531
@Transactional
2632
public void decrementLikeCount(Long productId) {
27-
productMetricsRepository.findByProductId(productId)
28-
.ifPresent(metrics -> {
29-
metrics.decrementLikeCount();
30-
productMetricsRepository.saveProductMetrics(metrics);
33+
ProductMetrics productMetrics = productMetricsRepository.findByProductId(productId)
34+
.orElseGet(() -> {
35+
log.warn("좋아요 취소 시 메트릭이 존재하지 않음: productId={}", productId);
36+
return ProductMetrics.create(productId);
3137
});
38+
productMetrics.decrementLikeCount();
39+
productMetricsRepository.saveProductMetrics(productMetrics);
3240
}
3341

3442
@Transactional

apps/commerce-streamer/src/main/java/com/loopers/infrastructure/eventhandled/EventHandledRepositoryImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,24 @@
33
import com.loopers.domain.eventhandled.EventHandled;
44
import com.loopers.domain.eventhandled.EventHandledRepository;
55
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.dao.DataIntegrityViolationException;
68
import org.springframework.stereotype.Repository;
79

810
@RequiredArgsConstructor
911
@Repository
12+
@Slf4j
1013
public class EventHandledRepositoryImpl implements EventHandledRepository {
1114

1215
private final EventHandledJpaRepository eventHandledJpaRepository;
1316

1417
@Override
1518
public void saveEventHandled(EventHandled eventHandled) {
16-
eventHandledJpaRepository.save(eventHandled);
19+
try {
20+
eventHandledJpaRepository.save(eventHandled);
21+
} catch (DataIntegrityViolationException e) {
22+
log.debug("중복 이벤트 감지로 인해 처리를 생략함: {}", eventHandled.getEventId());
23+
}
1724
}
1825

1926
@Override

apps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsJpaRepository.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,15 @@
33
import com.loopers.domain.metrics.ProductMetrics;
44
import java.util.Optional;
55
import org.springframework.data.jpa.repository.JpaRepository;
6+
import org.springframework.data.jpa.repository.Modifying;
7+
import org.springframework.data.jpa.repository.Query;
8+
import org.springframework.data.repository.query.Param;
69

710
public interface ProductMetricsJpaRepository extends JpaRepository<ProductMetrics, Long> {
811
Optional<ProductMetrics> findByProductId(Long productId);
12+
13+
@Modifying
14+
@Query("UPDATE ProductMetrics m SET m.likeCount = m.likeCount + 1 WHERE m.productId = :productId")
15+
int incrementLikeCount(@Param("productId") Long productId);
916
}
1017

0 commit comments

Comments
 (0)