[Volume 9] 상품 랭킹 기능 구현#360
Conversation
- catalog-events 토픽을 product-events, like-events로 분리하여 도메인 경계와 일치시킴 - Consumer-Processor-Service가 도메인 기준 1:1:1로 정렬되도록 구조 변경 - 각 Processor에 eventType 상수 정의 및 검증 로직 추가
- Kafka 이벤트 처리 시 ApplicationEvent로 랭킹 점수를 버퍼링하고 배치 단위로 Redis ZINCRBY - 가중치 기반 점수 계산(조회 0.1, 좋아요 0.2, 주문 0.6)은 ProductRankingService가 담당 - GET /api/v1/rankings?date=yyyyMMdd 페이징 조회 API 추가
- Facade가 Repository를 직접 사용하지 않도록 단일 도메인 Service 레이어 추가
- Redis ZREVRANK를 활용하여 단일 상품의 랭킹 순위 조회 - 랭킹에 없는 상품은 rank: null 반환
- offset + index 기반으로 페이지에 무관하게 정확한 순위 반환
- 누적 카운터에서 일간(metricDate) 기반 집계로 변경하여 일간/주간/월간 랭킹 지원 - 동시 Consumer 접근 시 발생하는 Duplicate Key 문제를 ON DUPLICATE KEY UPDATE로 해결
- 매일 자정에 product_metrics의 일간 데이터를 가중치 기반으로 계산하여 Redis Sorted Set에 적재
- 자정 직후 콜드스타트 방지를 위해 23:50에 오늘 score의 30%를 내일 키에 이월 - 일간 집계 결과를 오늘 키에 ZINCRBY로 누적하여 carry-over와 합산되도록 변경
📝 WalkthroughWalkthrough제품 순위 집계 및 노출을 Redis ZSET으로 도입하고, 메트릭을 날짜별 스코프로 변경했으며 Kafka 토픽을 분리하고 이벤트 처리기를 토픽별 프로세서/컨슈머로 리팩토링했다. API·레포지토리·스케줄러·통합 테스트가 함께 추가되었다. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as 클라이언트
participant Controller as RankingV1Controller
participant Facade as RankingFacade
participant Service as RankingService
participant Cache as RankingCacheRepository(Redis)
participant Product as ProductService
participant Brand as BrandService
Client->>Controller: GET /api/v1/rankings?date=yyyyMMdd
Controller->>Facade: getRankedProducts(date, pageable)
Facade->>Service: getRankedProductIds(date, offset, size)
Service->>Cache: getTopRankedProductIds(date, offset, size)
Cache-->>Service: [id1, id2, ...]
Service->>Service: getTotalCount(date)
Service-->>Facade: ids + totalCount
loop 각 제품 ID
Facade->>Product: getProduct(id)
Product-->>Facade: ProductResult
Facade->>Brand: getBrand(product.brandId)
Brand-->>Facade: BrandResult
Facade->>Facade: compute rank = offset + index + 1
Facade-->>Controller: ProductWithBrandResult(product, brand, rank)
end
Controller-->>Client: ApiResponse(PageResponse)
sequenceDiagram
participant Kafka as KafkaTopics
participant Consumer as TopicBatchConsumer
participant Processor as EventProcessor
participant EventRepo as EventHandledRepository
participant MetricsService as ProductMetricsService
participant DB as ProductMetricsRepository(DB)
Kafka-->>Consumer: Batch[ConsumerRecord(eventId, eventType, payload)]
Consumer->>Processor: process(record)
Processor->>EventRepo: existsById(eventId)?
alt exists
EventRepo-->>Processor: true
Processor-->>Consumer: 조기 반환
else not exists
Processor->>Processor: parse JSON -> eventType, productId, quantity
alt PRODUCT_VIEWED
Processor->>MetricsService: incrementViewCount(productId, eventDate)
MetricsService->>DB: upsertViewCount(productId, metricDate, +1)
else PRODUCT_LIKED/UNLIKED
Processor->>MetricsService: incrementLikeCount/decrement(..., eventDate)
MetricsService->>DB: upsertLikeCount(productId, metricDate, ±1)
else ORDER_PLACED
Processor->>MetricsService: incrementSalesCount(productId, qty, eventDate)
MetricsService->>DB: upsertSalesCount(productId, metricDate, qty)
else
Processor->>Processor: 경고 로깅 및 반환
end
Processor->>EventRepo: save(EventHandled(eventId))
end
Processor-->>Consumer: 처리 완료
Consumer->>Consumer: acknowledgment.acknowledge()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/commerce-api/src/test/java/com/loopers/interfaces/api/product/ProductV1ControllerTest.java (1)
66-83:⚠️ Potential issue | 🟡 Minor상품 상세 조회 테스트에서
rank필드 검증 누락운영 관점:
ProductWithBrandResult에rank값(3)을 설정했으나 응답 검증에서$.data.rank필드를 확인하지 않는다. 새로 추가된 랭킹 기능이 API 응답에 정상적으로 포함되는지 테스트로 보장되지 않으며, 향후 회귀 발생 시 감지되지 않을 수 있다.💚 수정안
.andExpect(jsonPath("$.data.price").value(100000)) - .andExpect(jsonPath("$.data.likeCount").value(10)); + .andExpect(jsonPath("$.data.likeCount").value(10)) + .andExpect(jsonPath("$.data.rank").value(3)); }추가 테스트 제안:
rank가null인 경우(랭킹 미존재)의 응답도 검증하는 테스트 케이스 추가를 권장한다.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/test/java/com/loopers/interfaces/api/product/ProductV1ControllerTest.java` around lines 66 - 83, The test ProductV1ControllerTest::상품_상세를_조회하면_200_OK와_상품_정보를_반환한다 creates a ProductWithBrandResult with rank=3 but never asserts the returned rank; update the test to add an assertion verifying $.data.rank equals 3 (i.e., extend the mockMvc.perform(...) chain with .andExpect(jsonPath("$.data.rank").value(3))). Also add an additional test case for a null rank scenario by stubbing productFacade.getProduct(...) to return a ProductWithBrandResult with rank=null and asserting $.data.rank is null or absent per API contract.
🧹 Nitpick comments (16)
apps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingCacheRepository.java (2)
7-9: 입력 날짜 계약을String대신 타입 기반으로 고정하는 편이 안전하다Line 7, Line 9에서 날짜를
String으로 받으면 포맷 오입력이 발생해도 운영에서는 “랭킹 없음”처럼 보이는 침묵 실패가 나기 쉽다.
수정안은 인터페이스 계약을LocalDate로 바꾸고, 키 포맷팅은 구현체(키 리졸버)에서 단일 처리하도록 정리하는 방식이다.권장 변경 예시
+import java.time.LocalDate; import java.util.List; public interface RankingCacheRepository { - List<Long> getTopRankedProductIds(String date, long offset, long size); + List<Long> getTopRankedProductIds(LocalDate date, long offset, long size); - long getTotalCount(String date); + long getTotalCount(LocalDate date); - Long getRank(String date, Long productId); + Long getRank(LocalDate date, Long productId); }추가 테스트로는 API/서비스 계층에서 잘못된 날짜 입력 시
CoreException으로 일관 응답되는지, 정상 날짜는 기존과 동일한 키로 조회되는지 검증이 필요하다.Based on learnings: "In the loop-pack-be-l2-vol3-java project, enforce unified error handling by routing errors through CoreException to ApiControllerAdvice to ensure a consistent response format."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingCacheRepository.java` around lines 7 - 9, The interface RankingCacheRepository currently accepts date as String for getTopRankedProductIds and getTotalCount which risks silent failures from bad formats; change the contract to use java.time.LocalDate for both methods, move any string key formatting/serialization into the implementation (e.g., the concrete KeyResolver or repository impl) so formatting is centralized, validate and convert inbound String dates at the API/service boundary to LocalDate and throw CoreException on parse errors, and update/add tests to assert CoreException is raised for invalid dates and that valid LocalDate inputs produce the same cache keys/results as before.
11-11:getRank의 null/기준값(0-base vs 1-base) 계약을 명시해야 한다Line 11은
Long productId입력과Long반환의 null 허용 범위, 그리고 랭크 기준(0-base/1-base)이 드러나지 않아 호출자마다 처리 방식이 분기될 위험이 있다. 운영에서 화면 랭크 불일치나 NPE성 장애로 이어질 수 있다.
수정안은 입력을long productId로 고정하고, 반환은@Nullable Long+ Javadoc으로 “미존재 시 null, 존재 시 0-base(또는 1-base)”를 명확히 선언하는 방식이다.권장 변경 예시
+import org.springframework.lang.Nullable; public interface RankingCacheRepository { @@ - Long getRank(String date, Long productId); + /** + * `@return` 미존재 시 null, 존재 시 0-base rank + */ + `@Nullable` + Long getRank(String date, long productId); }추가 테스트로는 “존재하는 상품 랭크”, “미존재 상품(null 반환)”, “1등 상품 기준값(0 또는 1)”을 계약대로 고정하는 단위/통합 테스트가 필요하다.
As per coding guidelines "
**/*.java: null 처리 ... 점검한다."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingCacheRepository.java` at line 11, The getRank contract is ambiguous about nullability and whether ranks are 0-based or 1-based; update RankingCacheRepository.getRank to take primitive long productId (avoid null product id) and annotate the return as `@Nullable` Long, add Javadoc on getRank stating "returns null if the product has no rank; when present returns a 0-based rank (0 = top)", and ensure you import/use your project's Nullable annotation. Update callers to handle possible null returns and add unit tests for: existing product rank, non-existent product (null), and the top product expected to return 0.apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/MetricsEventPublisher.java (1)
34-37: 이벤트 타입 계약은 모듈 간 공유 상수로 고정하는 편이 안전하다운영 관점에서 문자열 계약이 발행자/소비자에 분산되면 오타 한 번으로 소비자에서
warn + return만 발생하고 지표가 조용히 누락될 수 있다. 수정안으로 공통 모듈에 이벤트 타입/필드 계약 상수(또는 DTO)를 두고 양쪽에서 재사용해야 한다. 추가 테스트로 발행 페이로드 계약 테스트(consumer가 실제 payload를 정상 파싱/분기하는지)를 넣어 회귀를 막아야 한다.Based on learnings: "event schema versioning and dual-read for rolling deploy compatibility are intentionally omitted ... approximate ranking system ... temporary score distortion ... acceptable."
Also applies to: 45-68
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/MetricsEventPublisher.java` around lines 34 - 37, The event type string constants (EVENT_PRODUCT_VIEWED, EVENT_PRODUCT_LIKED, EVENT_PRODUCT_UNLIKED, EVENT_ORDER_PLACED) in MetricsEventPublisher should be moved into a shared contract in a common module (preferably as an enum or final class of constants, e.g., EventTypes or EventsContract) and the MetricsEventPublisher class should reference those shared symbols instead of local literals; update any other modules (consumers) to import the same contract, and add a roundtrip/integration test that publishes example payloads and asserts the consumer can parse and branch on the shared event types to prevent silent loss from typos (the same change applies for other literals in the same file around the 45-68 region).apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java (1)
27-41: 배치 소비자 공통 흐름은 템플릿화해 장애 대응 일관성을 유지하는 편이 좋다운영 관점에서 같은 로직이 소비자마다 복제되면 긴급 수정 시 한 클래스만 반영되어 재시도/커밋 동작이 서로 달라지는 장애가 발생할 수 있다. 수정안으로
for-loop 처리 + 예외 정책 + flush + ack를 공통 컴포넌트(또는 추상 베이스)로 추출해야 한다. 추가 테스트로 중복 이벤트, 단건 실패, flush 실패 시나리오를 소비자별 파라미터화 테스트로 동일 보장해야 한다.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java` around lines 27 - 41, The consumeLikeEvent method duplicates a common batch-consumer flow (looping records, per-record exception policy, flush, ack) which should be extracted to a shared template to ensure consistent retry/commit behavior; create an abstract BatchKafkaConsumer (or reusable component) that implements the KafkaListener entry (or is invoked from it) and provides a template method like handleBatch(List<ConsumerRecord<Object,Object>> records, Acknowledgment ack) with an abstract processRecord(ConsumerRecord<?,?> record) that callers implement (LikeEventConsumer should delegate its likeEventProcessor.process call to processRecord), centralize the try/catch logic to convert failures into BatchListenerFailedException with the index, invoke productRankingService.flush in the template’s finally/flush step and only acknowledge after successful flush, and add parameterized tests for duplicate events, single-record failure and flush failure to validate consistent behavior across consumers.apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.java (1)
36-40: 배치 중간 실패 시 Redis 점수 누락 가능성이 있다.
process()호출이 중간에 실패하면BatchListenerFailedException이 발생하고flush()가 호출되지 않는다. 이 경우 이미 처리된 레코드들의ProductMetricsUpdatedEvent로 버퍼에 누적된 점수가 Redis에 반영되지 않고 유실된다.운영 관점에서 이는 DB 메트릭과 Redis 랭킹 간 불일치를 유발할 수 있다. 다만 학습된 내용에 따르면 일간 집계 스케줄러가 DB 기반으로 재계산하므로 자동 복구된다.
현재 구조가 의도된 것이라면 이 동작을 문서화하고, 그렇지 않다면 try-finally 블록으로 flush()를 보장하는 방안을 고려한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.java` around lines 36 - 40, When processing a batch in ProductEventConsumer, currently throwing BatchListenerFailedException from process() can skip calling productRankingService.flush(), causing Redis score updates to be lost; update the method so that productRankingService.flush() is executed in a finally block (or otherwise guaranteed) even if process() throws, and keep acknowledgment.acknowledge() behavior as intended (either after successful flush or also guarded as required); reference the process() invocation, the BatchListenerFailedException path, productRankingService.flush(), and acknowledgment to locate and modify the control flow, or alternatively add clear documentation of the current failure/flush semantics if the current behavior is intended.apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.java (2)
67-69: Redis 실패 시 silent failure 패턴의 영향을 문서화해야 한다.
incrementScores실패 시 예외를 삼키고 warn 로그만 남긴다. 호출자는 성공 여부를 알 수 없으며, 버퍼는 이미 clear된 상태이므로 점수가 유실된다.학습된 내용에 따르면 일간 집계로 자동 복구되므로 의도된 설계일 수 있으나, 이 동작을 JavaDoc 또는 주석으로 명시하여 유지보수자가 인지할 수 있도록 해야 한다. 또한 Redis 장애 시 모니터링을 위해 메트릭 수집을 고려한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.java` around lines 67 - 69, The catch in ProductRankingRepositoryImpl.incrementScores currently swallows exceptions and only logs a warning (log.warn("Redis 랭킹 점수 갱신 실패: key={}", key, e)), which causes silent failure and loss of buffered scores; add a JavaDoc on the incrementScores method (or class-level Javadoc) explaining this intentional silent-failure pattern, that the in-memory buffer is cleared regardless and daily aggregation can recover scores, and add a code comment at the catch site clarifying this; additionally instrument a metric/metric increment (e.g., redisIncrementFailures) inside the catch so Redis failures are observable by monitoring systems and make sure callers are aware via the JavaDoc that no exception is propagated.
51-66: Redis 파이프라인 실행 중 부분 실패 시 TTL 갱신이 누락될 수 있다.
executePipelined내에서incrementScore와expire가 순차 실행된다. Redis 파이프라인은 개별 명령의 실패를 즉시 던지지 않고, 결과 리스트에 예외를 포함시킬 수 있다. 현재 코드는 파이프라인 결과를 검증하지 않는다.실제로 Spring Data Redis의
executePipelined는 파이프라인 내 명령 실패 시 예외를 던지므로 큰 문제는 아니나, 부분 성공 시나리오(네트워크 타임아웃 등)에서 TTL만 갱신되지 않을 가능성이 있다.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.java` around lines 51 - 66, The pipeline can partially fail causing the expire call inside the SessionCallback to be skipped or not applied; after calling redisTemplate.executePipelined(...) in ProductRankingRepositoryImpl capture and inspect its returned List<Object> (the pipeline results) for exceptions/failure indicators, and if any failure or unexpected result is detected, perform a separate, standalone call to redisTemplate.expire(key, TTL) inside its own try/catch to ensure TTL is set; also log any pipeline errors and the fallback expire result so failures are visible.apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductRankingService.java (1)
36-40:flush()메서드의 read-then-clear 패턴에 경합 가능성이 있다.
buffer.isEmpty()체크 후buffer.clear()사이에handleMetricsUpdated가 실행되면, 새로 추가된 점수가 Redis에 반영되지 않고 유실될 수 있다.ConcurrentHashMap은 개별 연산은 원자적이나, 복합 연산(isEmpty → incrementScores → clear)은 원자적이지 않다.이 서비스가 싱글톤 빈이고 여러 Kafka 컨슈머 스레드에서 공유된다면 문제가 될 수 있다. 학습된 내용에 따르면 일간 집계로 자동 복구되므로 허용 가능한 수준일 수 있으나, 명시적 동기화가 필요한지 검토가 필요하다.
동기화 적용 제안
+ private final Object flushLock = new Object(); + public void flush() { - if (buffer.isEmpty()) return; - productRankingRepository.incrementScores(buffer); - buffer.clear(); + synchronized (flushLock) { + if (buffer.isEmpty()) return; + Map<Long, Double> snapshot = new HashMap<>(buffer); + buffer.clear(); + productRankingRepository.incrementScores(snapshot); + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductRankingService.java` around lines 36 - 40, The flush() method performs a non-atomic read-then-clear on the shared buffer which can race with concurrent producers (e.g., handleMetricsUpdated) and lose updates; to fix, make the flush atomic by either synchronizing the read/increment/clear sequence (e.g., wrap the isEmpty → incrementScores(buffer) → buffer.clear() sequence in a synchronized block on the shared buffer or this) or swap the buffer with a fresh map atomically (use an AtomicReference<ConcurrentHashMap> or replace buffer with a new instance and operate on the old snapshot) before calling productRankingRepository.incrementScores so no concurrent additions are lost. Ensure the chosen approach is applied to the singleton ProductRankingService to protect flush() and any producers that mutate buffer.apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java (1)
72-74:incrementSalesCount에 음수 수량 검증이 없다.
quantity파라미터가 음수일 경우salesCount가 감소하거나 음수가 될 수 있다. 상위 호출자의 버그나 잘못된 Kafka 메시지로 인해 데이터 정합성이 깨질 위험이 있다.운영 중 음수
salesCount가 발생하면 랭킹 계산 시 예상치 못한 결과를 초래할 수 있다.방어 로직 추가 제안
public void incrementSalesCount(int quantity) { + if (quantity < 0) { + throw new IllegalArgumentException("quantity는 0 이상이어야 합니다."); + } this.salesCount += quantity; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java` around lines 72 - 74, The incrementSalesCount method in ProductMetrics allows negative quantities which can reduce salesCount and break data integrity; update ProductMetrics.incrementSalesCount to validate the quantity parameter (if quantity < 0) and reject it (throw an IllegalArgumentException with a clear message or return an error), ensuring salesCount is only increased by non-negative values; adjust callers/tests of incrementSalesCount accordingly and include a unit test that asserts the method rejects negative input.apps/commerce-streamer/src/test/java/com/loopers/application/product/ProductMetricsServiceIntegrationTest.java (1)
28-61:decrementLikeCount및 경계 조건 테스트가 누락되었다.현재 테스트는 증가 연산만 검증한다. 다음 케이스에 대한 테스트 추가를 권장한다:
decrementLikeCount호출 시 카운트 감소 검증likeCount가 0일 때decrementLikeCount호출 시 0 유지 여부 (도메인 로직 검증)- 존재하지 않는
productId에 대한 decrement 동작As per coding guidelines: "단위 테스트는 경계값/실패 케이스/예외 흐름을 포함하는지 점검한다."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/test/java/com/loopers/application/product/ProductMetricsServiceIntegrationTest.java` around lines 28 - 61, Add tests covering decrementLikeCount and boundary/failure cases: write new `@Test` methods that call productMetricsService.decrementLikeCount(productId) to assert likeCount decreases from >0, that calling decrementLikeCount when likeCount is 0 leaves it at 0 (verify via productMetricsRepository.findByProductIdAndMetricDate and ProductMetrics.getLikeCount), and a test for decrement behavior when productId does not exist (expect no record created or specific domain behavior). Use the existing patterns in the file (LocalDate.now(), productId = 1L, orElseThrow()) and assert expected counts for view/sales remain unchanged where applicable.apps/commerce-streamer/src/test/java/com/loopers/infrastructure/product/RankingSchedulerIntegrationTest.java (2)
62-65: 테스트에서 가중치를 하드코딩하면 설정 변경 시 정합성 검증이 누락된다.가중치 값(
0.1,0.2,0.6)을 테스트 코드에 직접 작성하면,application.yml의ranking.weights설정이 변경되어도 테스트는 여전히 통과한다. 이는 실제 스케줄러 동작과 테스트 검증 간 불일치를 유발할 수 있다.
RankingScheduler를 직접 호출하거나,@Value로 가중치를 주입받아 테스트에서도 동일한 값을 사용하는 방안을 고려한다.개선 제안
+ `@Value`("${ranking.weights.view:0.1}") + private double viewWeight; + + `@Value`("${ranking.weights.like:0.2}") + private double likeWeight; + + `@Value`("${ranking.weights.sales:0.6}") + private double salesWeight; + // when — 스케줄러와 동일한 로직 for (var metrics : metricsList) { - double score = (metrics.getViewCount() * 0.1) - + (metrics.getLikeCount() * 0.2) - + (metrics.getSalesCount() * 0.6); + double score = (metrics.getViewCount() * viewWeight) + + (metrics.getLikeCount() * likeWeight) + + (metrics.getSalesCount() * salesWeight); scores.put(metrics.getProductId(), score); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/test/java/com/loopers/infrastructure/product/RankingSchedulerIntegrationTest.java` around lines 62 - 65, The test currently hardcodes weight literals (0.1, 0.2, 0.6) in RankingSchedulerIntegrationTest when computing expected scores, which can diverge from runtime config; update the test to use the same config or logic as production by retrieving the configured weights (e.g., inject the same `@Value` properties used by RankingScheduler or instantiate/use RankingScheduler.computeScore(...) if exposed) and compute expected score from those injected weights instead of literals — locate RankingSchedulerIntegrationTest and replace the hardcoded weight usage with reading the ranking.weights values (or calling RankingScheduler's scoring method) so the test stays consistent with application.yml changes.
40-78: 경계값 및 실패 케이스 테스트가 부족하다.현재 테스트는 정상 시나리오만 검증한다. 다음 케이스에 대한 테스트 추가를 권장한다:
- 해당 날짜에 메트릭이 없는 경우
- 모든 카운트가 0인 경우
- 동일
productId에 대해 중복 upsert 시 누적 여부As per coding guidelines: "단위 테스트는 경계값/실패 케이스/예외 흐름을 포함하는지 점검한다."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/test/java/com/loopers/infrastructure/product/RankingSchedulerIntegrationTest.java` around lines 40 - 78, Add additional test methods alongside 일간_랭킹_집계_시_DB_메트릭이_Redis에_적재된다() to cover the missing edge/failure cases: (1) no metrics for the metricDate — call productMetricsRepository.findAllByMetricDate with a date that has no upserts and assert productRankingRepository.incrementScores does not create the Redis key (use redisTemplate.opsForZSet().size(key) == null or 0); (2) all counts zero — upsert 0 for view/like/sales for a product and assert the resulting score is 0 (use redisTemplate.opsForZSet().score(key, "<id>")); (3) duplicate upserts for the same productId — perform multiple upsertViewCount/upsertLikeCount/upsertSalesCount calls for the same productId on the same metricDate and assert whether scores are cumulative or overwritten by checking the final score in Redis and matching expected behavior; reuse productMetricsRepository.findAllByMetricDate and productRankingRepository.incrementScores logic and the same key construction to locate results.apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.java (1)
13-13:findAllByMetricDate는 대량 데이터 조회 시 메모리 이슈가 발생할 수 있다.일간 집계 시 해당 날짜의 모든
ProductMetrics를 한 번에 메모리에 적재한다. 상품 수가 증가하면 OOM 위험이 있다.운영 관점에서 상품 수가 수만~수십만 단위로 증가할 경우를 대비해, 배치 처리(페이징 또는 스트림)를 고려하거나 예상 데이터 볼륨을 문서화해야 한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.java` at line 13, The repository method findAllByMetricDate currently loads all ProductMetrics for a given metricDate into memory which can OOM for large product counts; update ProductMetricsRepository to avoid full in-memory loads by providing a paged or streaming alternative—e.g., add a method like Page<ProductMetrics> findAllByMetricDate(LocalDate metricDate, Pageable pageable) or Stream<ProductMetrics> streamByMetricDate(LocalDate metricDate) (or a Slice-based variant) and update calling batch/aggregation code to iterate pages or the stream; ensure to close streams or use try-with-resources and document expected volume and chosen processing strategy.apps/commerce-api/src/test/java/com/loopers/interfaces/api/product/ProductV1ControllerTest.java (1)
44-64: 상품 목록 조회 테스트에서rank필드 null 검증 누락운영 관점: 목록 조회 시
rank가null로 설정되어 있으나 응답에서 해당 필드가 어떻게 직렬화되는지(null로 노출 또는 필드 제외) 검증하지 않는다.♻️ 수정안
.andExpect(jsonPath("$.data.content[0].price").value(100000)) - .andExpect(jsonPath("$.data.content[0].likeCount").value(10)); + .andExpect(jsonPath("$.data.content[0].likeCount").value(10)) + .andExpect(jsonPath("$.data.content[0].rank").doesNotExist()); // 또는 .value(nullValue()) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/test/java/com/loopers/interfaces/api/product/ProductV1ControllerTest.java` around lines 44 - 64, The test 상품_목록을_조회하면_200_OK와_페이징된_상품_목록을_반환한다() creates a ProductWithBrandResult with rank=null but doesn't assert how rank is serialized; update the test to verify the response handles the null rank (either present as null or absent) by adding an assertion on jsonPath("$.data.content[0].rank") using the mockMvc.perform result; locate the ProductWithBrandResult construction and the mockMvc.perform(...).andExpect(...) chain and add the appropriate .andExpect(...) for the rank field to reflect expected behavior.apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java (1)
43-49: N+1 조회 성능 개선 권장운영 관점: 랭킹 상품마다 개별적으로
productService.getProduct()와brandService.getBrand()를 호출하여 N+1 문제가 발생한다. 페이지 크기가 20인 경우 최대 40회의 캐시/DB 조회가 발생한다. 트래픽 증가 시 병목이 될 수 있다.수정안:
ProductService와BrandService에 배치 조회 메서드(getProductsByIds,getBrandsByIds)를 추가하고 한 번에 조회 후 매핑하는 방식을 권장한다.// 예시 개선 방향 List<ProductResult> products = productService.getProductsByIds(rankedProductIds); Set<Long> brandIds = products.stream().map(ProductResult::brandId).collect(Collectors.toSet()); Map<Long, BrandResult> brandMap = brandService.getBrandsByIds(brandIds);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java` around lines 43 - 49, The loop in RankingFacade is causing N+1 lookups by calling productService.getProduct(...) and brandService.getBrand(...) per item; add and call batch methods (e.g., ProductService.getProductsByIds(List<Long>) and BrandService.getBrandsByIds(Set<Long>)) to fetch all ProductResult and BrandResult at once, build a Map<Long,ProductResult> and Map<Long,BrandResult>, then iterate rankedProductIds to map each id to its product and brand (preserving order) and compute rank = offset + i + 1 before creating ProductWithBrandResult.from(...).apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingService.java (1)
32-35:getRank()메서드에@Transactional(readOnly = true)누락운영 관점: 동일 클래스 내 다른 조회 메서드(
getRankedProductIds,getTotalCount)는@Transactional(readOnly = true)가 적용되어 있으나getRank()는 누락되어 있다. 현재는 Redis 호출만 수행하므로 당장 문제가 없으나, 향후 DB 조회 로직이 추가될 경우 트랜잭션 경계가 누락되어 예기치 않은 동작이 발생할 수 있다.♻️ 수정안
+ `@Transactional`(readOnly = true) public Long getRank(Long productId) { String today = LocalDate.now().format(DATE_FORMATTER); return rankingCacheRepository.getRank(today, productId); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingService.java` around lines 32 - 35, Add the missing `@Transactional`(readOnly = true) to the getRank method in RankingService to match other read-only query methods (getRankedProductIds, getTotalCount); locate the getRank(Long productId) method and annotate it with `@Transactional`(readOnly = true) (and add the import if missing) so future DB reads are executed within a read-only transaction boundary.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java`:
- Around line 43-49: RankingFacade currently iterates rankedProductIds and calls
productService.getProduct(productId) and
brandService.getBrand(product.brandId()) which will throw CoreException for
deleted products/brands and cause the whole API to fail; update the loop in
RankingFacade to catch the exception around productService.getProduct and
brandService.getBrand (or check for null/optional if available), log a warning
including productId/brandId, skip that entry (continue) and avoid adding to
results so one missing item doesn't fail the whole request; also adjust logic
that uses rankedProductIds/offset/size so you either fetch additional IDs when
skipping to still return the requested size or document that fewer results may
be returned.
In
`@apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/RankingCacheRepositoryImpl.java`:
- Around line 37-40: Current catch blocks in RankingCacheRepositoryImpl (the
ones that now log and return Collections.emptyList(), 0, or null) swallow Redis
errors; change them to rethrow a meaningful unchecked exception so the API layer
can handle it uniformly (wrap the caught Exception into the project's unified
CoreException or another runtime exception used by ApiControllerAdvice and throw
it instead of returning emptyList/0/null). Update the catch sites that reference
key (the log.warn(..., key, e) locations) to log context and then throw new
CoreException(...) (or the agreed runtime wrapper) preserving the original
exception as the cause; add unit tests verifying that when Redis operations
throw, the repository propagates the exception and the controller layer returns
the standard error format.
- Around line 26-31: In getTopRankedProductIds, guard against invalid bounds by
returning early when offset < 0 or size <= 0, compute the end index using
Math.addExact(offset, size - 1) to prevent overflow, and ensure you never pass a
negative or wrapped end index to
redisTemplate.opsForZSet().reverseRangeWithScores; additionally, replace broad
catch(Exception) handlers in this class with specific Redis exceptions (e.g.,
RedisConnectionFailureException, RedisSystemException) so failures are
logged/propagated distinctly from normal empty/zero results and add unit tests
for offset<0, size<=0, size overflow, and offset+size-1 overflow cases.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java`:
- Around line 36-38: The current parsing in LikeEventProcessor uses
objectMapper.readTree(... ) and then calls node.get("productId").asLong() and
node.get("eventType").asText(), which will NPE if those fields are missing;
change the processing to defensively validate the JsonNode first (use
node.has("productId")/node.has("eventType") or
node.path("productId")/path("eventType") with default values), check for null or
missing/incorrect types before calling asLong()/asText(), and on invalid
messages log a clear error including the raw payload and either skip/acknowledge
the record or forward it to a dead-letter handler instead of rethrowing so the
consumer does not enter a retry loop; reference the LikeEventProcessor method
that reads the record and the objectMapper usage to locate and update this
logic.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/order/OrderEventProcessor.java`:
- Around line 34-43: OrderEventProcessor currently trusts the incoming payload
after objectMapper.readTree and directly reads node.get(...) causing NPEs or
wrong zero-value aggregations; update the EVENT_ORDER_PLACED handling to
validate presence and types via
node.path("eventType").asText()/node.hasNonNull("productId")/node.get("productId").isNumber()/node.get("quantity").isNumber(),
ensure eventType equals EVENT_ORDER_PLACED, ensure quantity > 0 before calling
productMetricsService.incrementSalesCount(productId, quantity), and on any
validation failure explicitly skip processing or route the record to a DLQ (do
not let it fall through to aggregation); also add unit/integration tests for
missing fields, wrong types, and quantity <= 0 cases to cover rejection
behavior.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductEventProcessor.java`:
- Around line 34-39: The ProductEventProcessor currently calls
objectMapper.readTree(...) and directly accesses node.get("eventType").asText()
and node.get("productId").asLong(), which will NPE or throw on missing/invalid
fields and trigger retry loops; update ProductEventProcessor to validate the
parsed JsonNode before acting: wrap JSON parsing in try/catch for malformed
JSON, check node.hasNonNull("eventType") and that
node.get("eventType").isTextual() and compare to EVENT_PRODUCT_VIEWED, then
confirm node.hasNonNull("productId") and
node.get("productId").canConvertToLong() (or isIntegral/isText convertible)
before calling productMetricsService.incrementViewCount(productId); for
invalid/missing fields log a warning and either skip the record or emit to DLT
instead of throwing, and add unit/integration tests for missing eventType,
missing/invalid productId, and malformed JSON to ensure no retry loop occurs.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductMetricsService.java`:
- Around line 22-40: The current methods (incrementLikeCount,
decrementLikeCount, incrementViewCount, incrementSalesCount in
ProductMetricsService) publish events inside the `@Transactional` boundary causing
possible DB/event inconsistency on rollback; change each method to defer
publishing until transaction commit by registering a TransactionSynchronization
that calls eventPublisher.publishEvent(...) in afterCommit (e.g., use
TransactionSynchronizationManager.registerSynchronization and publish inside
afterCommit), leaving the upsert calls unchanged; add an integration test that
performs an upsert then throws a runtime exception to force rollback and asserts
that no ProductMetricsUpdatedEvent was published.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java`:
- Around line 18-24: The upsertLikeCount native query allows negative like_count
when delta is negative; change the SQL in upsertLikeCount so the INSERT sets
like_count = GREATEST(:delta, 0) (or GREATEST(0, :delta)) and the ON DUPLICATE
KEY UPDATE uses like_count = GREATEST(like_count + :delta, 0) and updated_at =
NOW() to enforce a 0 lower bound; then add integration tests covering (1) an
initial unlike event (delta negative) creating a row with 0 likes and (2)
repeated unlikes when like_count is 0 to ensure it stays 0 and does not go
negative.
- Line 16: findAllByMetricDate 조회가 (product_id, metric_date) 유니크 키의 후행 컬럼만 사용하므로
metric_date 단독 인덱스가 필요합니다; ProductMetricsJpaRepository의 findAllByMetricDate 호출
성능 문제를 해결하려면 데이터베이스 마이그레이션을 추가해 metric_date 컬럼에 단독 인덱스(예:
idx_product_metrics_metric_date)를 생성하고, 마이그레이션 파일과 함께 해당 인덱스를 사용하는지 검증하는 EXPLAIN
쿼리 기반의 자동 검증(테스트 또는 스크립트)을 추가하며, 대량 데이터(예: 수백만 행)를 기준으로 자정 집계/랭킹 갱신 시 성능 회귀가 없는지
측정하는 성능 테스트를 작성해 병목을 확인하도록 하세요.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java`:
- Around line 49-50: The savePaths log is written unconditionally while
productRankingRepository.saveScores(...) and incrementScores(...) swallow Redis
errors; change the repository methods (saveScores and incrementScores) to either
rethrow exceptions or return a boolean success, then update RankingScheduler to
check the returned result or catch propagated exceptions before logging success
(remove the unconditional log lines around
productRankingRepository.saveScores(...) and the similar log after
incrementScores(...)); also add an integration test that simulates a Redis
exception and asserts that no "완료" success log is emitted and the scheduler run
surfaces a failure/exception.
- Around line 37-66: The scheduled methods carryOverRanking() and
aggregateDailyRanking() in RankingScheduler run on every instance causing
duplicate writes in multi-instance deployments; wrap each method with a
distributed lock (e.g., Redis SET NX with atomic release or integrate ShedLock)
so the job body executes only when the lock is acquired (use a sensible TTL and
ensure release in finally), update productRankingRepository calls inside
carryOverRanking() and aggregateDailyRanking() to only run under the lock, and
add an integration test that simulates two concurrent invocations (two scheduler
instances) invoking carryOverRanking() and aggregateDailyRanking() at the same
time and asserts productRankingRepository.saveScores()/incrementScores() are
called exactly once (verify via a test Redis instance or embedded lock
implementation).
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.java`:
- Line 33: In ProductEventConsumer replace the risky full-record log
(log.info("상품 이벤트 중복 처리 감지 (정상). record={}", messages.get(i))) with logging of
non-sensitive identifiers or a sanitized representation: use message
keys/IDs/timestamps (e.g., messages.get(i).getId() or getKey()) or call a
sanitizeRecord(message) helper that strips/masks PII before logging; update
where messages and messages.get(i) are referenced so only safe fields are logged
and add a sanitizeRecord method to centralize masking if needed.
---
Outside diff comments:
In
`@apps/commerce-api/src/test/java/com/loopers/interfaces/api/product/ProductV1ControllerTest.java`:
- Around line 66-83: The test
ProductV1ControllerTest::상품_상세를_조회하면_200_OK와_상품_정보를_반환한다 creates a
ProductWithBrandResult with rank=3 but never asserts the returned rank; update
the test to add an assertion verifying $.data.rank equals 3 (i.e., extend the
mockMvc.perform(...) chain with .andExpect(jsonPath("$.data.rank").value(3))).
Also add an additional test case for a null rank scenario by stubbing
productFacade.getProduct(...) to return a ProductWithBrandResult with rank=null
and asserting $.data.rank is null or absent per API contract.
---
Nitpick comments:
In
`@apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java`:
- Around line 43-49: The loop in RankingFacade is causing N+1 lookups by calling
productService.getProduct(...) and brandService.getBrand(...) per item; add and
call batch methods (e.g., ProductService.getProductsByIds(List<Long>) and
BrandService.getBrandsByIds(Set<Long>)) to fetch all ProductResult and
BrandResult at once, build a Map<Long,ProductResult> and Map<Long,BrandResult>,
then iterate rankedProductIds to map each id to its product and brand
(preserving order) and compute rank = offset + i + 1 before creating
ProductWithBrandResult.from(...).
In
`@apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingService.java`:
- Around line 32-35: Add the missing `@Transactional`(readOnly = true) to the
getRank method in RankingService to match other read-only query methods
(getRankedProductIds, getTotalCount); locate the getRank(Long productId) method
and annotate it with `@Transactional`(readOnly = true) (and add the import if
missing) so future DB reads are executed within a read-only transaction
boundary.
In
`@apps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingCacheRepository.java`:
- Around line 7-9: The interface RankingCacheRepository currently accepts date
as String for getTopRankedProductIds and getTotalCount which risks silent
failures from bad formats; change the contract to use java.time.LocalDate for
both methods, move any string key formatting/serialization into the
implementation (e.g., the concrete KeyResolver or repository impl) so formatting
is centralized, validate and convert inbound String dates at the API/service
boundary to LocalDate and throw CoreException on parse errors, and update/add
tests to assert CoreException is raised for invalid dates and that valid
LocalDate inputs produce the same cache keys/results as before.
- Line 11: The getRank contract is ambiguous about nullability and whether ranks
are 0-based or 1-based; update RankingCacheRepository.getRank to take primitive
long productId (avoid null product id) and annotate the return as `@Nullable`
Long, add Javadoc on getRank stating "returns null if the product has no rank;
when present returns a 0-based rank (0 = top)", and ensure you import/use your
project's Nullable annotation. Update callers to handle possible null returns
and add unit tests for: existing product rank, non-existent product (null), and
the top product expected to return 0.
In
`@apps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/MetricsEventPublisher.java`:
- Around line 34-37: The event type string constants (EVENT_PRODUCT_VIEWED,
EVENT_PRODUCT_LIKED, EVENT_PRODUCT_UNLIKED, EVENT_ORDER_PLACED) in
MetricsEventPublisher should be moved into a shared contract in a common module
(preferably as an enum or final class of constants, e.g., EventTypes or
EventsContract) and the MetricsEventPublisher class should reference those
shared symbols instead of local literals; update any other modules (consumers)
to import the same contract, and add a roundtrip/integration test that publishes
example payloads and asserts the consumer can parse and branch on the shared
event types to prevent silent loss from typos (the same change applies for other
literals in the same file around the 45-68 region).
In
`@apps/commerce-api/src/test/java/com/loopers/interfaces/api/product/ProductV1ControllerTest.java`:
- Around line 44-64: The test 상품_목록을_조회하면_200_OK와_페이징된_상품_목록을_반환한다() creates a
ProductWithBrandResult with rank=null but doesn't assert how rank is serialized;
update the test to verify the response handles the null rank (either present as
null or absent) by adding an assertion on jsonPath("$.data.content[0].rank")
using the mockMvc.perform result; locate the ProductWithBrandResult construction
and the mockMvc.perform(...).andExpect(...) chain and add the appropriate
.andExpect(...) for the rank field to reflect expected behavior.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductRankingService.java`:
- Around line 36-40: The flush() method performs a non-atomic read-then-clear on
the shared buffer which can race with concurrent producers (e.g.,
handleMetricsUpdated) and lose updates; to fix, make the flush atomic by either
synchronizing the read/increment/clear sequence (e.g., wrap the isEmpty →
incrementScores(buffer) → buffer.clear() sequence in a synchronized block on the
shared buffer or this) or swap the buffer with a fresh map atomically (use an
AtomicReference<ConcurrentHashMap> or replace buffer with a new instance and
operate on the old snapshot) before calling
productRankingRepository.incrementScores so no concurrent additions are lost.
Ensure the chosen approach is applied to the singleton ProductRankingService to
protect flush() and any producers that mutate buffer.
In
`@apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java`:
- Around line 72-74: The incrementSalesCount method in ProductMetrics allows
negative quantities which can reduce salesCount and break data integrity; update
ProductMetrics.incrementSalesCount to validate the quantity parameter (if
quantity < 0) and reject it (throw an IllegalArgumentException with a clear
message or return an error), ensuring salesCount is only increased by
non-negative values; adjust callers/tests of incrementSalesCount accordingly and
include a unit test that asserts the method rejects negative input.
In
`@apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.java`:
- Line 13: The repository method findAllByMetricDate currently loads all
ProductMetrics for a given metricDate into memory which can OOM for large
product counts; update ProductMetricsRepository to avoid full in-memory loads by
providing a paged or streaming alternative—e.g., add a method like
Page<ProductMetrics> findAllByMetricDate(LocalDate metricDate, Pageable
pageable) or Stream<ProductMetrics> streamByMetricDate(LocalDate metricDate) (or
a Slice-based variant) and update calling batch/aggregation code to iterate
pages or the stream; ensure to close streams or use try-with-resources and
document expected volume and chosen processing strategy.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.java`:
- Around line 67-69: The catch in ProductRankingRepositoryImpl.incrementScores
currently swallows exceptions and only logs a warning (log.warn("Redis 랭킹 점수 갱신
실패: key={}", key, e)), which causes silent failure and loss of buffered scores;
add a JavaDoc on the incrementScores method (or class-level Javadoc) explaining
this intentional silent-failure pattern, that the in-memory buffer is cleared
regardless and daily aggregation can recover scores, and add a code comment at
the catch site clarifying this; additionally instrument a metric/metric
increment (e.g., redisIncrementFailures) inside the catch so Redis failures are
observable by monitoring systems and make sure callers are aware via the JavaDoc
that no exception is propagated.
- Around line 51-66: The pipeline can partially fail causing the expire call
inside the SessionCallback to be skipped or not applied; after calling
redisTemplate.executePipelined(...) in ProductRankingRepositoryImpl capture and
inspect its returned List<Object> (the pipeline results) for exceptions/failure
indicators, and if any failure or unexpected result is detected, perform a
separate, standalone call to redisTemplate.expire(key, TTL) inside its own
try/catch to ensure TTL is set; also log any pipeline errors and the fallback
expire result so failures are visible.
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java`:
- Around line 27-41: The consumeLikeEvent method duplicates a common
batch-consumer flow (looping records, per-record exception policy, flush, ack)
which should be extracted to a shared template to ensure consistent retry/commit
behavior; create an abstract BatchKafkaConsumer (or reusable component) that
implements the KafkaListener entry (or is invoked from it) and provides a
template method like handleBatch(List<ConsumerRecord<Object,Object>> records,
Acknowledgment ack) with an abstract processRecord(ConsumerRecord<?,?> record)
that callers implement (LikeEventConsumer should delegate its
likeEventProcessor.process call to processRecord), centralize the try/catch
logic to convert failures into BatchListenerFailedException with the index,
invoke productRankingService.flush in the template’s finally/flush step and only
acknowledge after successful flush, and add parameterized tests for duplicate
events, single-record failure and flush failure to validate consistent behavior
across consumers.
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.java`:
- Around line 36-40: When processing a batch in ProductEventConsumer, currently
throwing BatchListenerFailedException from process() can skip calling
productRankingService.flush(), causing Redis score updates to be lost; update
the method so that productRankingService.flush() is executed in a finally block
(or otherwise guaranteed) even if process() throws, and keep
acknowledgment.acknowledge() behavior as intended (either after successful flush
or also guarded as required); reference the process() invocation, the
BatchListenerFailedException path, productRankingService.flush(), and
acknowledgment to locate and modify the control flow, or alternatively add clear
documentation of the current failure/flush semantics if the current behavior is
intended.
In
`@apps/commerce-streamer/src/test/java/com/loopers/application/product/ProductMetricsServiceIntegrationTest.java`:
- Around line 28-61: Add tests covering decrementLikeCount and boundary/failure
cases: write new `@Test` methods that call
productMetricsService.decrementLikeCount(productId) to assert likeCount
decreases from >0, that calling decrementLikeCount when likeCount is 0 leaves it
at 0 (verify via productMetricsRepository.findByProductIdAndMetricDate and
ProductMetrics.getLikeCount), and a test for decrement behavior when productId
does not exist (expect no record created or specific domain behavior). Use the
existing patterns in the file (LocalDate.now(), productId = 1L, orElseThrow())
and assert expected counts for view/sales remain unchanged where applicable.
In
`@apps/commerce-streamer/src/test/java/com/loopers/infrastructure/product/RankingSchedulerIntegrationTest.java`:
- Around line 62-65: The test currently hardcodes weight literals (0.1, 0.2,
0.6) in RankingSchedulerIntegrationTest when computing expected scores, which
can diverge from runtime config; update the test to use the same config or logic
as production by retrieving the configured weights (e.g., inject the same `@Value`
properties used by RankingScheduler or instantiate/use
RankingScheduler.computeScore(...) if exposed) and compute expected score from
those injected weights instead of literals — locate
RankingSchedulerIntegrationTest and replace the hardcoded weight usage with
reading the ranking.weights values (or calling RankingScheduler's scoring
method) so the test stays consistent with application.yml changes.
- Around line 40-78: Add additional test methods alongside
일간_랭킹_집계_시_DB_메트릭이_Redis에_적재된다() to cover the missing edge/failure cases: (1) no
metrics for the metricDate — call productMetricsRepository.findAllByMetricDate
with a date that has no upserts and assert
productRankingRepository.incrementScores does not create the Redis key (use
redisTemplate.opsForZSet().size(key) == null or 0); (2) all counts zero — upsert
0 for view/like/sales for a product and assert the resulting score is 0 (use
redisTemplate.opsForZSet().score(key, "<id>")); (3) duplicate upserts for the
same productId — perform multiple
upsertViewCount/upsertLikeCount/upsertSalesCount calls for the same productId on
the same metricDate and assert whether scores are cumulative or overwritten by
checking the final score in Redis and matching expected behavior; reuse
productMetricsRepository.findAllByMetricDate and
productRankingRepository.incrementScores logic and the same key construction to
locate results.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2426c371-9c6a-4f80-b6d7-0e55d34301b0
⛔ Files ignored due to path filters (1)
docs/week8/qna.mdis excluded by!**/*.mdand included by**
📒 Files selected for processing (32)
apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.javaapps/commerce-api/src/main/java/com/loopers/application/product/result/ProductWithBrandResult.javaapps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.javaapps/commerce-api/src/main/java/com/loopers/application/ranking/RankingService.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingCacheRepository.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/kafka/MetricsEventPublisher.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/RankingCacheRepositoryImpl.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/product/response/ProductWithBrandDetailResponse.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1ApiSpec.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.javaapps/commerce-api/src/test/java/com/loopers/interfaces/api/product/ProductV1ControllerTest.javaapps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.javaapps/commerce-streamer/src/main/java/com/loopers/application/order/OrderEventProcessor.javaapps/commerce-streamer/src/main/java/com/loopers/application/product/ProductEventProcessor.javaapps/commerce-streamer/src/main/java/com/loopers/application/product/ProductMetricsService.javaapps/commerce-streamer/src/main/java/com/loopers/application/product/ProductRankingService.javaapps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.javaapps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetricsRepository.javaapps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductRankingRepository.javaapps/commerce-streamer/src/main/java/com/loopers/domain/product/event/ProductMetricsUpdatedEvent.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DemoKafkaConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.javaapps/commerce-streamer/src/main/resources/application.ymlapps/commerce-streamer/src/test/java/com/loopers/application/product/ProductMetricsServiceIntegrationTest.javaapps/commerce-streamer/src/test/java/com/loopers/infrastructure/product/RankingSchedulerIntegrationTest.java
💤 Files with no reviewable changes (2)
- apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/DemoKafkaConsumer.java
- apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventConsumer.java
| for (int i = 0; i < rankedProductIds.size(); i++) { | ||
| Long productId = rankedProductIds.get(i); | ||
| ProductResult product = productService.getProduct(productId); | ||
| BrandResult brand = brandService.getBrand(product.brandId()); | ||
| long rank = offset + i + 1; | ||
| results.add(ProductWithBrandResult.from(product, brand.name(), rank)); | ||
| } |
There was a problem hiding this comment.
삭제된 상품/브랜드로 인한 전체 API 실패 위험
운영 관점: Redis 랭킹 ZSET에 포함된 상품이 삭제된 경우 productService.getProduct()에서 CoreException이 발생하여 전체 랭킹 API 요청이 실패한다. 동일하게 브랜드가 삭제된 경우에도 brandService.getBrand()에서 예외가 발생한다. 이는 단일 상품의 삭제가 전체 랭킹 서비스의 장애로 이어지는 단일 장애점(Single Point of Failure)을 만든다.
🐛 수정안: 삭제된 상품 건너뛰기
List<ProductWithBrandResult> results = new ArrayList<>();
for (int i = 0; i < rankedProductIds.size(); i++) {
Long productId = rankedProductIds.get(i);
- ProductResult product = productService.getProduct(productId);
- BrandResult brand = brandService.getBrand(product.brandId());
- long rank = offset + i + 1;
- results.add(ProductWithBrandResult.from(product, brand.name(), rank));
+ try {
+ ProductResult product = productService.getProduct(productId);
+ BrandResult brand = brandService.getBrand(product.brandId());
+ long rank = offset + i + 1;
+ results.add(ProductWithBrandResult.from(product, brand.name(), rank));
+ } catch (CoreException e) {
+ log.warn("랭킹 상품 조회 실패: productId={}", productId, e);
+ // 삭제된 상품은 건너뜀
+ }
}추가 고려사항:
- 삭제된 상품을 건너뛰면 실제 반환되는 결과 수가 요청한
size보다 적어질 수 있다. 이를 보완하려면 추가 상품을 조회하는 로직이 필요하다. - 랭킹 스케줄러에서 삭제된 상품을 정리하는 배치 작업을 추가하는 것을 권장한다.
- 해당 케이스에 대한 통합 테스트 추가가 필요하다.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for (int i = 0; i < rankedProductIds.size(); i++) { | |
| Long productId = rankedProductIds.get(i); | |
| ProductResult product = productService.getProduct(productId); | |
| BrandResult brand = brandService.getBrand(product.brandId()); | |
| long rank = offset + i + 1; | |
| results.add(ProductWithBrandResult.from(product, brand.name(), rank)); | |
| } | |
| for (int i = 0; i < rankedProductIds.size(); i++) { | |
| Long productId = rankedProductIds.get(i); | |
| try { | |
| ProductResult product = productService.getProduct(productId); | |
| BrandResult brand = brandService.getBrand(product.brandId()); | |
| long rank = offset + i + 1; | |
| results.add(ProductWithBrandResult.from(product, brand.name(), rank)); | |
| } catch (CoreException e) { | |
| log.warn("랭킹 상품 조회 실패: productId={}", productId, e); | |
| // 삭제된 상품은 건너뜀 | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java`
around lines 43 - 49, RankingFacade currently iterates rankedProductIds and
calls productService.getProduct(productId) and
brandService.getBrand(product.brandId()) which will throw CoreException for
deleted products/brands and cause the whole API to fail; update the loop in
RankingFacade to catch the exception around productService.getProduct and
brandService.getBrand (or check for null/optional if available), log a warning
including productId/brandId, skip that entry (continue) and avoid adding to
results so one missing item doesn't fail the whole request; also adjust logic
that uses rankedProductIds/offset/size so you either fetch additional IDs when
skipping to still return the requested size or document that fewer results may
be returned.
| public List<Long> getTopRankedProductIds(String date, long offset, long size) { | ||
| String key = KEY_PREFIX + date; | ||
| try { | ||
| Set<ZSetOperations.TypedTuple<String>> tuples = | ||
| redisTemplate.opsForZSet().reverseRangeWithScores(key, offset, offset + size - 1); | ||
| if (tuples == null || tuples.isEmpty()) { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# 목적: 상위 레이어에서 size/offset 유효성 검증이 실제로 강제되는지 확인한다.
# 기대 결과: Ranking API 입력에 `@Min/`@PositiveOrZero 등 제약이 존재하거나, 서비스에서 size<=0 차단 로직이 확인되어야 한다.
rg -n --type=java -C3 '\bgetTopRankedProductIds\s*\('
rg -n --type=java -C3 'RankingV1Controller|@RequestParam|@Min|@Positive|@PositiveOrZero|size|offset|page'Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 50397
🏁 Script executed:
cat -n apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/RankingCacheRepositoryImpl.javaRepository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 2782
size/offset 경계값 미검증으로 과다 조회가 발생할 수 있다
Line 30에서 size <= 0이면 종료 인덱스가 offset - 1이 되며, 특히 offset=0, size=0일 때 Redis 전범위 조회로 해석되어 대량 응답이 발생할 수 있다. 운영에서는 단일 요청이 캐시/네트워크 부하를 키워 지연과 장애 전파로 이어질 수 있다.
수정안은 offset < 0 || size <= 0을 조기 반환 처리하고, 종료 인덱스 계산은 Math.addExact로 오버플로우를 차단하는 방식이 적절하다. 추가 테스트는 size=0, size<0, offset<0, offset+size-1 오버플로우 케이스를 각각 검증해야 한다.
수정 예시 diff
`@Override`
public List<Long> getTopRankedProductIds(String date, long offset, long size) {
String key = KEY_PREFIX + date;
+ if (offset < 0 || size <= 0) {
+ return Collections.emptyList();
+ }
+
+ long end;
+ try {
+ end = Math.addExact(offset, size - 1);
+ } catch (ArithmeticException ex) {
+ log.warn("Redis 랭킹 조회 파라미터 오버플로우: key={}, offset={}, size={}", key, offset, size);
+ return Collections.emptyList();
+ }
+
try {
Set<ZSetOperations.TypedTuple<String>> tuples =
- redisTemplate.opsForZSet().reverseRangeWithScores(key, offset, offset + size - 1);
+ redisTemplate.opsForZSet().reverseRangeWithScores(key, offset, end);
if (tuples == null || tuples.isEmpty()) {
return Collections.emptyList();
}또한 Line 37-40, 49-52, 61-64의 catch(Exception)은 Redis 장애를 기본값으로 은닉하여 운영 중 장애 탐지를 지연시킨다. 각 메서드별로 특정 예외(예: RedisConnectionFailureException, RedisSystemException)를 구분하여 로깅하거나, 비즈니스 로직이 정상 응답(빈 목록/0/null)과 실패 상황을 명확히 구별할 수 있도록 개선해야 한다.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public List<Long> getTopRankedProductIds(String date, long offset, long size) { | |
| String key = KEY_PREFIX + date; | |
| try { | |
| Set<ZSetOperations.TypedTuple<String>> tuples = | |
| redisTemplate.opsForZSet().reverseRangeWithScores(key, offset, offset + size - 1); | |
| if (tuples == null || tuples.isEmpty()) { | |
| public List<Long> getTopRankedProductIds(String date, long offset, long size) { | |
| String key = KEY_PREFIX + date; | |
| if (offset < 0 || size <= 0) { | |
| return Collections.emptyList(); | |
| } | |
| long end; | |
| try { | |
| end = Math.addExact(offset, size - 1); | |
| } catch (ArithmeticException ex) { | |
| log.warn("Redis 랭킹 조회 파라미터 오버플로우: key={}, offset={}, size={}", key, offset, size); | |
| return Collections.emptyList(); | |
| } | |
| try { | |
| Set<ZSetOperations.TypedTuple<String>> tuples = | |
| redisTemplate.opsForZSet().reverseRangeWithScores(key, offset, end); | |
| if (tuples == null || tuples.isEmpty()) { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/RankingCacheRepositoryImpl.java`
around lines 26 - 31, In getTopRankedProductIds, guard against invalid bounds by
returning early when offset < 0 or size <= 0, compute the end index using
Math.addExact(offset, size - 1) to prevent overflow, and ensure you never pass a
negative or wrapped end index to
redisTemplate.opsForZSet().reverseRangeWithScores; additionally, replace broad
catch(Exception) handlers in this class with specific Redis exceptions (e.g.,
RedisConnectionFailureException, RedisSystemException) so failures are
logged/propagated distinctly from normal empty/zero results and add unit tests
for offset<0, size<=0, size overflow, and offset+size-1 overflow cases.
| } catch (Exception e) { | ||
| log.warn("Redis 랭킹 조회 실패: key={}", key, e); | ||
| return Collections.emptyList(); | ||
| } |
There was a problem hiding this comment.
Redis 장애를 기본값으로 숨기면 장애 탐지와 복구가 늦어진다
Line 37-40, Line 49-52, Line 61-64에서 예외 발생 시 빈 리스트/0/null을 반환하면, 장애 상황이 “데이터 없음”처럼 보이므로 운영 관점에서 이상 징후 감지가 늦어지고 잘못된 응답이 정상 처리될 수 있다.
수정안은 최소한 예외를 상위로 재전파하여 API 레이어에서 일관된 오류 응답으로 처리되게 바꾸는 것이다.
추가 테스트는 Redis 연산 예외 발생 시 Repository가 예외를 재전파하는지, 그리고 API가 표준 에러 포맷으로 응답하는지까지 포함해야 한다.
수정 예시 diff
} catch (Exception e) {
- log.warn("Redis 랭킹 조회 실패: key={}", key, e);
- return Collections.emptyList();
+ log.error("Redis 랭킹 조회 실패: key={}", key, e);
+ throw new IllegalStateException("Redis 랭킹 조회 실패", e);
}
}
@@
} catch (Exception e) {
- log.warn("Redis 랭킹 카운트 조회 실패: key={}", key, e);
- return 0;
+ log.error("Redis 랭킹 카운트 조회 실패: key={}", key, e);
+ throw new IllegalStateException("Redis 랭킹 카운트 조회 실패", e);
}
}
@@
} catch (Exception e) {
- log.warn("Redis 랭킹 순위 조회 실패: key={}, productId={}", key, productId, e);
- return null;
+ log.error("Redis 랭킹 순위 조회 실패: key={}, productId={}", key, productId, e);
+ throw new IllegalStateException("Redis 랭킹 순위 조회 실패", e);
}
}As per coding guidelines: **/*.java: "예외 흐름이 명확한지 점검한다."
Based on learnings: "In the loop-pack-be-l2-vol3-java project, enforce unified error handling by routing errors through CoreException to ApiControllerAdvice to ensure a consistent response format."
Also applies to: 49-52, 61-64
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/RankingCacheRepositoryImpl.java`
around lines 37 - 40, Current catch blocks in RankingCacheRepositoryImpl (the
ones that now log and return Collections.emptyList(), 0, or null) swallow Redis
errors; change them to rethrow a meaningful unchecked exception so the API layer
can handle it uniformly (wrap the caught Exception into the project's unified
CoreException or another runtime exception used by ApiControllerAdvice and throw
it instead of returning emptyList/0/null). Update the catch sites that reference
key (the log.warn(..., key, e) locations) to log context and then throw new
CoreException(...) (or the agreed runtime wrapper) preserving the original
exception as the cause; add unit tests verifying that when Redis operations
throw, the repository propagates the exception and the controller layer returns
the standard error format.
| JsonNode node = objectMapper.readTree(record.value().toString()); | ||
| Long productId = node.get("productId").asLong(); | ||
| String eventType = node.get("eventType").asText(); |
There was a problem hiding this comment.
JSON 필드 접근 시 NPE 발생 위험이 있다.
node.get("productId") 또는 node.get("eventType")이 null을 반환하면 asLong(), asText() 호출 시 NullPointerException이 발생한다. 잘못된 형식의 메시지가 유입되면 지속적인 재시도 루프에 빠질 수 있다.
운영 중 프로듀서 버그나 메시지 손상으로 인해 예상치 못한 장애가 발생할 수 있다.
방어 로직 추가 제안
JsonNode node = objectMapper.readTree(record.value().toString());
-Long productId = node.get("productId").asLong();
-String eventType = node.get("eventType").asText();
+JsonNode productIdNode = node.get("productId");
+JsonNode eventTypeNode = node.get("eventType");
+
+if (productIdNode == null || eventTypeNode == null) {
+ log.warn("필수 필드 누락. record: topic={}, partition={}, offset={}",
+ record.topic(), record.partition(), record.offset());
+ return;
+}
+
+Long productId = productIdNode.asLong();
+String eventType = eventTypeNode.asText();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| JsonNode node = objectMapper.readTree(record.value().toString()); | |
| Long productId = node.get("productId").asLong(); | |
| String eventType = node.get("eventType").asText(); | |
| JsonNode node = objectMapper.readTree(record.value().toString()); | |
| JsonNode productIdNode = node.get("productId"); | |
| JsonNode eventTypeNode = node.get("eventType"); | |
| if (productIdNode == null || eventTypeNode == null) { | |
| log.warn("필수 필드 누락. record: topic={}, partition={}, offset={}", | |
| record.topic(), record.partition(), record.offset()); | |
| return; | |
| } | |
| Long productId = productIdNode.asLong(); | |
| String eventType = eventTypeNode.asText(); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java`
around lines 36 - 38, The current parsing in LikeEventProcessor uses
objectMapper.readTree(... ) and then calls node.get("productId").asLong() and
node.get("eventType").asText(), which will NPE if those fields are missing;
change the processing to defensively validate the JsonNode first (use
node.has("productId")/node.has("eventType") or
node.path("productId")/path("eventType") with default values), check for null or
missing/incorrect types before calling asLong()/asText(), and on invalid
messages log a clear error including the raw payload and either skip/acknowledge
the record or forward it to a dead-letter handler instead of rethrowing so the
consumer does not enter a retry loop; reference the LikeEventProcessor method
that reads the record and the objectMapper usage to locate and update this
logic.
| var node = objectMapper.readTree(record.value().toString()); | ||
| String eventType = node.get("eventType").asText(); | ||
|
|
||
| if (EVENT_ORDER_PLACED.equals(eventType)) { | ||
| Long productId = node.get("productId").asLong(); | ||
| int quantity = node.get("quantity").asInt(); | ||
| productMetricsService.incrementSalesCount(productId, quantity); | ||
| } else { | ||
| log.warn("알 수 없는 주문 이벤트 타입: {}", eventType); | ||
| return; |
There was a problem hiding this comment.
외부 이벤트 payload 검증이 없어 잘못된 집계나 재처리 루프를 만든다
eventType, productId, quantity가 없거나 타입이 다르면 여기서 NPE가 나거나 기본값 0으로 잘못 집계될 수 있다. 운영에서는 poison message 하나가 파티션 지연으로 번지기 쉽다. path() 기반으로 필드 존재와 숫자 타입을 검증하고, quantity <= 0도 차단한 뒤 검증 실패는 명시적으로 skip 또는 DLQ 처리하는 편이 안전하다. 필드 누락, 잘못된 타입, 음수/0 수량 케이스에 대한 테스트를 추가해 달라.
수정 예시
- var node = objectMapper.readTree(record.value().toString());
- String eventType = node.get("eventType").asText();
+ var node = objectMapper.readTree(String.valueOf(record.value()));
+ var eventTypeNode = node.path("eventType");
+ if (eventTypeNode.isMissingNode()) {
+ log.warn("order-events payload 검증 실패: eventType 누락");
+ return;
+ }
+ String eventType = eventTypeNode.asText();
if (EVENT_ORDER_PLACED.equals(eventType)) {
- Long productId = node.get("productId").asLong();
- int quantity = node.get("quantity").asInt();
+ var productIdNode = node.path("productId");
+ var quantityNode = node.path("quantity");
+ if (!productIdNode.canConvertToLong()
+ || !quantityNode.canConvertToInt()
+ || quantityNode.asInt() <= 0) {
+ log.warn("order-events payload 검증 실패: productId/quantity 오류");
+ return;
+ }
+ Long productId = productIdNode.asLong();
+ int quantity = quantityNode.asInt();
productMetricsService.incrementSalesCount(productId, quantity);
} else {
log.warn("알 수 없는 주문 이벤트 타입: {}", eventType);
return;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var node = objectMapper.readTree(record.value().toString()); | |
| String eventType = node.get("eventType").asText(); | |
| if (EVENT_ORDER_PLACED.equals(eventType)) { | |
| Long productId = node.get("productId").asLong(); | |
| int quantity = node.get("quantity").asInt(); | |
| productMetricsService.incrementSalesCount(productId, quantity); | |
| } else { | |
| log.warn("알 수 없는 주문 이벤트 타입: {}", eventType); | |
| return; | |
| var node = objectMapper.readTree(String.valueOf(record.value())); | |
| var eventTypeNode = node.path("eventType"); | |
| if (eventTypeNode.isMissingNode()) { | |
| log.warn("order-events payload 검증 실패: eventType 누락"); | |
| return; | |
| } | |
| String eventType = eventTypeNode.asText(); | |
| if (EVENT_ORDER_PLACED.equals(eventType)) { | |
| var productIdNode = node.path("productId"); | |
| var quantityNode = node.path("quantity"); | |
| if (!productIdNode.canConvertToLong() | |
| || !quantityNode.canConvertToInt() | |
| || quantityNode.asInt() <= 0) { | |
| log.warn("order-events payload 검증 실패: productId/quantity 오류"); | |
| return; | |
| } | |
| Long productId = productIdNode.asLong(); | |
| int quantity = quantityNode.asInt(); | |
| productMetricsService.incrementSalesCount(productId, quantity); | |
| } else { | |
| log.warn("알 수 없는 주문 이벤트 타입: {}", eventType); | |
| return; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/order/OrderEventProcessor.java`
around lines 34 - 43, OrderEventProcessor currently trusts the incoming payload
after objectMapper.readTree and directly reads node.get(...) causing NPEs or
wrong zero-value aggregations; update the EVENT_ORDER_PLACED handling to
validate presence and types via
node.path("eventType").asText()/node.hasNonNull("productId")/node.get("productId").isNumber()/node.get("quantity").isNumber(),
ensure eventType equals EVENT_ORDER_PLACED, ensure quantity > 0 before calling
productMetricsService.incrementSalesCount(productId, quantity), and on any
validation failure explicitly skip processing or route the record to a DLQ (do
not let it fall through to aggregation); also add unit/integration tests for
missing fields, wrong types, and quantity <= 0 cases to cover rejection
behavior.
| Optional<ProductMetrics> findByProductId(Long productId); | ||
| Optional<ProductMetrics> findByProductIdAndMetricDate(Long productId, LocalDate metricDate); | ||
|
|
||
| List<ProductMetrics> findAllByMetricDate(LocalDate metricDate); |
There was a problem hiding this comment.
findAllByMetricDate에 맞는 인덱스가 없다
현재 엔티티의 유니크 키는 (product_id, metric_date)라서 metric_date 단독 조건인 이 조회는 선두 컬럼이 맞지 않아 데이터가 쌓일수록 풀스캔으로 기울 가능성이 높다. 자정 집계가 전체 테이블 스캔으로 바뀌면 랭킹 갱신 지연이 바로 운영 이슈로 이어진다. metric_date 단독 인덱스를 추가하는 마이그레이션을 함께 넣는 편이 안전하다. EXPLAIN으로 이 조회가 인덱스를 타는지 확인하는 검증과, 대량 데이터 기준의 집계 성능 테스트를 추가해 달라. As per coding guidelines: **/*Repository*.java: 정렬/인덱스 활용 가능성, 대량 데이터에서의 병목을 점검한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java`
at line 16, findAllByMetricDate 조회가 (product_id, metric_date) 유니크 키의 후행 컬럼만
사용하므로 metric_date 단독 인덱스가 필요합니다; ProductMetricsJpaRepository의
findAllByMetricDate 호출 성능 문제를 해결하려면 데이터베이스 마이그레이션을 추가해 metric_date 컬럼에 단독 인덱스(예:
idx_product_metrics_metric_date)를 생성하고, 마이그레이션 파일과 함께 해당 인덱스를 사용하는지 검증하는 EXPLAIN
쿼리 기반의 자동 검증(테스트 또는 스크립트)을 추가하며, 대량 데이터(예: 수백만 행)를 기준으로 자정 집계/랭킹 갱신 시 성능 회귀가 없는지
측정하는 성능 테스트를 작성해 병목을 확인하도록 하세요.
| @Modifying | ||
| @Query(value = """ | ||
| INSERT INTO product_metrics (product_id, metric_date, like_count, sales_count, view_count, created_at, updated_at) | ||
| VALUES (:productId, :metricDate, :delta, 0, 0, NOW(), NOW()) | ||
| ON DUPLICATE KEY UPDATE like_count = like_count + :delta, updated_at = NOW() | ||
| """, nativeQuery = true) | ||
| void upsertLikeCount(Long productId, LocalDate metricDate, int delta); |
There was a problem hiding this comment.
좋아요 감소 업서트가 음수 카운트를 허용한다
delta가 음수일 때 insert 경로는 like_count = :delta로 바로 저장되고, update 경로도 하한 없이 감소해서 중복 unlike나 역순 이벤트만으로 음수 좋아요가 영속화될 수 있다. 이렇게 깨진 값은 당일 랭킹 점수에 그대로 반영된다. 최소 0 하한을 강제하도록 쿼리를 바꾸는 편이 안전하다. 처음 이벤트가 unlike인 경우와 0에서 중복 unlike가 여러 번 들어오는 경우를 검증하는 통합 테스트를 추가해 달라.
수정 예시
`@Modifying`
`@Query`(value = """
INSERT INTO product_metrics (product_id, metric_date, like_count, sales_count, view_count, created_at, updated_at)
- VALUES (:productId, :metricDate, :delta, 0, 0, NOW(), NOW())
- ON DUPLICATE KEY UPDATE like_count = like_count + :delta, updated_at = NOW()
+ VALUES (:productId, :metricDate, GREATEST(:delta, 0), 0, 0, NOW(), NOW())
+ ON DUPLICATE KEY UPDATE like_count = GREATEST(0, like_count + :delta), updated_at = NOW()
""", nativeQuery = true)
void upsertLikeCount(Long productId, LocalDate metricDate, int delta);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @Modifying | |
| @Query(value = """ | |
| INSERT INTO product_metrics (product_id, metric_date, like_count, sales_count, view_count, created_at, updated_at) | |
| VALUES (:productId, :metricDate, :delta, 0, 0, NOW(), NOW()) | |
| ON DUPLICATE KEY UPDATE like_count = like_count + :delta, updated_at = NOW() | |
| """, nativeQuery = true) | |
| void upsertLikeCount(Long productId, LocalDate metricDate, int delta); | |
| `@Modifying` | |
| `@Query`(value = """ | |
| INSERT INTO product_metrics (product_id, metric_date, like_count, sales_count, view_count, created_at, updated_at) | |
| VALUES (:productId, :metricDate, GREATEST(:delta, 0), 0, 0, NOW(), NOW()) | |
| ON DUPLICATE KEY UPDATE like_count = GREATEST(0, like_count + :delta), updated_at = NOW() | |
| """, nativeQuery = true) | |
| void upsertLikeCount(Long productId, LocalDate metricDate, int delta); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java`
around lines 18 - 24, The upsertLikeCount native query allows negative
like_count when delta is negative; change the SQL in upsertLikeCount so the
INSERT sets like_count = GREATEST(:delta, 0) (or GREATEST(0, :delta)) and the ON
DUPLICATE KEY UPDATE uses like_count = GREATEST(like_count + :delta, 0) and
updated_at = NOW() to enforce a 0 lower bound; then add integration tests
covering (1) an initial unlike event (delta negative) creating a row with 0
likes and (2) repeated unlikes when like_count is 0 to ensure it stays 0 and
does not go negative.
| @Scheduled(cron = "0 50 23 * * *") | ||
| public void carryOverRanking() { | ||
| LocalDate today = LocalDate.now(); | ||
| LocalDate tomorrow = today.plusDays(1); | ||
|
|
||
| Map<Long, Double> todayScores = productRankingRepository.getAllScores(today); | ||
| Map<Long, Double> carryOverScores = todayScores.entrySet().stream() | ||
| .collect(Collectors.toMap( | ||
| Map.Entry::getKey, | ||
| entry -> entry.getValue() * carryOverRate | ||
| )); | ||
|
|
||
| productRankingRepository.saveScores(tomorrow, carryOverScores); | ||
| log.info("랭킹 carry-over 완료: from={}, to={}, count={}", today, tomorrow, carryOverScores.size()); | ||
| } | ||
|
|
||
| @Scheduled(cron = "0 0 0 * * *") | ||
| public void aggregateDailyRanking() { | ||
| LocalDate yesterday = LocalDate.now().minusDays(1); | ||
| LocalDate today = LocalDate.now(); | ||
|
|
||
| List<ProductMetrics> metricsList = productMetricsRepository.findAllByMetricDate(yesterday); | ||
| Map<Long, Double> scores = metricsList.stream() | ||
| .collect(Collectors.toMap( | ||
| ProductMetrics::getProductId, | ||
| this::calculateScore | ||
| )); | ||
|
|
||
| productRankingRepository.incrementScores(today, scores); | ||
| log.info("일간 랭킹 집계 완료: metricDate={}, targetDate={}, count={}", yesterday, today, scores.size()); |
There was a problem hiding this comment.
스케줄 실행을 단일 인스턴스로 보장해야 한다
멀티 인스턴스 배포에서는 이 두 잡이 모든 인스턴스에서 동시에 실행되어 carry-over와 일간 집계가 인스턴스 수만큼 중복 반영된다. 운영에서는 랭킹이 서서히 어긋나도 원인을 찾기 어렵다. Redis 기반 분산 락이나 ShedLock 같은 실행 제어를 추가해 한 번만 수행되도록 바꾸는 편이 안전하다. 최소 2개 인스턴스가 같은 시각에 carryOverRanking()과 aggregateDailyRanking()을 호출해도 Redis write가 1회만 발생하는 통합 테스트를 추가해 달라. Based on learnings: In RankingCarryOverScheduler (apps/commerce-streamer), ... a Lua-based distributed lock (SET NX + atomic release) prevents multi-instance duplicate execution.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java`
around lines 37 - 66, The scheduled methods carryOverRanking() and
aggregateDailyRanking() in RankingScheduler run on every instance causing
duplicate writes in multi-instance deployments; wrap each method with a
distributed lock (e.g., Redis SET NX with atomic release or integrate ShedLock)
so the job body executes only when the lock is acquired (use a sensible TTL and
ensure release in finally), update productRankingRepository calls inside
carryOverRanking() and aggregateDailyRanking() to only run under the lock, and
add an integration test that simulates two concurrent invocations (two scheduler
instances) invoking carryOverRanking() and aggregateDailyRanking() at the same
time and asserts productRankingRepository.saveScores()/incrementScores() are
called exactly once (verify via a test Redis instance or embedded lock
implementation).
| productRankingRepository.saveScores(tomorrow, carryOverScores); | ||
| log.info("랭킹 carry-over 완료: from={}, to={}, count={}", today, tomorrow, carryOverScores.size()); |
There was a problem hiding this comment.
실패를 삼키는 저장 호출 뒤에 성공 로그를 남기면 장애를 놓친다
현재 saveScores()와 incrementScores()는 Redis 예외를 내부에서 잡고 경고만 남기는 구현인데, 여기서는 결과 확인 없이 항상 완료 로그를 남긴다. 운영에서는 실제 적재 실패를 성공으로 오인해 재처리와 알림 타이밍을 놓친다. 저장 계층이 예외를 다시 던지거나 성공 여부를 반환하도록 바꾸고, 이 레벨의 성공 로그는 확인된 성공에만 남기도록 수정하는 편이 좋다. Redis 예외를 강제로 발생시켰을 때 완료 로그가 남지 않고 스케줄 실패가 관측되는 통합 테스트를 추가해 달라.
Also applies to: 65-66
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java`
around lines 49 - 50, The savePaths log is written unconditionally while
productRankingRepository.saveScores(...) and incrementScores(...) swallow Redis
errors; change the repository methods (saveScores and incrementScores) to either
rethrow exceptions or return a boolean success, then update RankingScheduler to
check the returned result or catch propagated exceptions before logging success
(remove the unconditional log lines around
productRankingRepository.saveScores(...) and the similar log after
incrementScores(...)); also add an integration test that simulates a Redis
exception and asserts that no "완료" success log is emitted and the scheduler run
surfaces a failure/exception.
| try { | ||
| productEventProcessor.process(messages.get(i)); | ||
| } catch (DataIntegrityViolationException e) { | ||
| log.info("상품 이벤트 중복 처리 감지 (정상). record={}", messages.get(i)); |
There was a problem hiding this comment.
로그에 전체 레코드 출력 시 민감 정보 노출 위험이 있다.
record 객체 전체를 로그에 출력하면 Kafka 메시지 페이로드에 포함된 민감 정보가 로그에 기록될 수 있다. 운영 환경에서 로그 수집 시스템으로 유출될 위험이 있다.
수정 제안
- log.info("상품 이벤트 중복 처리 감지 (정상). record={}", messages.get(i));
+ log.info("상품 이벤트 중복 처리 감지 (정상). topic={}, partition={}, offset={}",
+ messages.get(i).topic(), messages.get(i).partition(), messages.get(i).offset());As per coding guidelines: "로깅 시 민감정보 노출 가능성을 점검한다."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log.info("상품 이벤트 중복 처리 감지 (정상). record={}", messages.get(i)); | |
| log.info("상품 이벤트 중복 처리 감지 (정상). topic={}, partition={}, offset={}", | |
| messages.get(i).topic(), messages.get(i).partition(), messages.get(i).offset()); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.java`
at line 33, In ProductEventConsumer replace the risky full-record log
(log.info("상품 이벤트 중복 처리 감지 (정상). record={}", messages.get(i))) with logging of
non-sensitive identifiers or a sanitized representation: use message
keys/IDs/timestamps (e.g., messages.get(i).getId() or getKey()) or call a
sanitizeRecord(message) helper that strips/masks PII before logging; update
where messages and messages.get(i) are referenced so only safe fields are logged
and add a sanitizeRecord method to centralize masking if needed.
이벤트 기반 실시간 집계(ProductRankingService, ProductMetricsUpdatedEvent)를 RankingScheduler 일간 배치 집계로 대체하면서 불필요해진 코드를 정리한다. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductMetricsService.java`:
- Around line 32-33: The incrementSalesCount method allows zero or negative
quantities which corrupt metrics; add a fail-fast validation at the start of
ProductMetricsService.incrementSalesCount to require quantity > 0 and throw an
IllegalArgumentException (or a domain-specific exception) if not, ensuring
productMetricsRepository.upsertSalesCount is not invoked for invalid input; also
add unit tests for quantity == 0 and quantity < 0 that assert the exception is
thrown and verify the repository.upsertSalesCount method was not called (use
mocks/spies).
- Around line 18-33: The methods incrementLikeCount, decrementLikeCount,
incrementViewCount and incrementSalesCount call LocalDate.now() directly which
relies on the JVM default timezone; change ProductMetricsService to use a
deterministic business timezone or an injected Clock: add a constructor-injected
Clock (or ZoneId) field to the class and replace LocalDate.now() with
LocalDate.now(clock) (or LocalDate.now(zone) if you prefer ZoneId), updating
calls to
productMetricsRepository.upsertLikeCount/upsertViewCount/upsertSalesCount to
pass the clock-derived date; also add unit/integration tests around the midnight
boundary (e.g., 23:59:59 → 00:00:01) and timezone variations to assert all
upserts produce the same business date.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e733acac-a130-4130-8259-dbb0bdf25e8d
📒 Files selected for processing (6)
apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductMetricsService.javaapps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductRankingRepository.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.java
🚧 Files skipped from review as they are similar to previous changes (5)
- apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductRankingRepository.java
- apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.java
- apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/ProductEventConsumer.java
- apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java
- apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.java
| public void incrementSalesCount(Long productId, int quantity) { | ||
| ProductMetrics metrics = getOrCreate(productId); | ||
| metrics.incrementSalesCount(quantity); | ||
| } | ||
|
|
||
| private ProductMetrics getOrCreate(Long productId) { | ||
| return productMetricsRepository.findByProductId(productId) | ||
| .orElseGet(() -> productMetricsRepository.save( | ||
| ProductMetrics.builder() | ||
| .productId(productId) | ||
| .build() | ||
| )); | ||
| productMetricsRepository.upsertSalesCount(productId, LocalDate.now(), quantity); |
There was a problem hiding this comment.
판매 수량 검증 부재로 음수/0 누적이 가능하다
Line [32]~Line [33]은 quantity 유효성 검증이 없어 0 또는 음수가 들어오면 매출 지표가 오염되고 랭킹 점수 왜곡으로 이어질 수 있다. 서비스 경계에서 양수만 허용하도록 fail-fast 검증을 추가해야 한다.
추가 테스트로 quantity=0, quantity<0 입력 시 예외 발생 및 DB 값 미변경(또는 upsert 미호출)을 검증해야 한다.
🔧 제안 수정안
`@Transactional`
public void incrementSalesCount(Long productId, int quantity) {
+ if (quantity <= 0) {
+ throw new IllegalArgumentException("quantity must be greater than 0");
+ }
productMetricsRepository.upsertSalesCount(productId, LocalDate.now(), quantity);
}As per coding guidelines **/*Service*.java: "멱등성과 중복 처리 방지 전략을 점검한다."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public void incrementSalesCount(Long productId, int quantity) { | |
| ProductMetrics metrics = getOrCreate(productId); | |
| metrics.incrementSalesCount(quantity); | |
| } | |
| private ProductMetrics getOrCreate(Long productId) { | |
| return productMetricsRepository.findByProductId(productId) | |
| .orElseGet(() -> productMetricsRepository.save( | |
| ProductMetrics.builder() | |
| .productId(productId) | |
| .build() | |
| )); | |
| productMetricsRepository.upsertSalesCount(productId, LocalDate.now(), quantity); | |
| `@Transactional` | |
| public void incrementSalesCount(Long productId, int quantity) { | |
| if (quantity <= 0) { | |
| throw new IllegalArgumentException("quantity must be greater than 0"); | |
| } | |
| productMetricsRepository.upsertSalesCount(productId, LocalDate.now(), quantity); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductMetricsService.java`
around lines 32 - 33, The incrementSalesCount method allows zero or negative
quantities which corrupt metrics; add a fail-fast validation at the start of
ProductMetricsService.incrementSalesCount to require quantity > 0 and throw an
IllegalArgumentException (or a domain-specific exception) if not, ensuring
productMetricsRepository.upsertSalesCount is not invoked for invalid input; also
add unit tests for quantity == 0 and quantity < 0 that assert the exception is
thrown and verify the repository.upsertSalesCount method was not called (use
mocks/spies).
- 자정 전후 이벤트의 날짜가 뒤바뀌는 문제를 방지하기 위해 record.timestamp() 기반으로 변경
- eventDate 파라미터 변경에 맞춰 기존 테스트 수정 - 다른 날짜의 이벤트가 별도 row에 적재되는지 검증하는 테스트 추가
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (4)
apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java (2)
49-50:⚠️ Potential issue | 🟠 MajorRedis 저장 실패를 삼키는 구조에서 완료 로그를 고정 출력하면 장애 탐지가 늦어진다
현재 저장 계층(
apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductRankingRepositoryImpl.java)이 예외를 내부에서 잡고 warn만 남기는데, 스케줄러는 결과 확인 없이 완료 로그를 남겨 운영에서 실패를 성공으로 오인하게 된다. 저장 메서드가 예외를 전파하거나 성공 여부를 반환하도록 계약을 바꾸고, 성공 로그는 확인된 성공 케이스에만 기록하도록 분기 처리하는 것이 필요하다. 추가 테스트로 Redis 예외를 강제했을 때 완료 로그가 남지 않고 스케줄 실행 실패가 관측되는지 검증해야 한다.Also applies to: 66-67
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java` around lines 49 - 50, The scheduler currently unconditionally logs "랭킹 carry-over 완료" after calling productRankingRepository.saveScores, which hides failures because ProductRankingRepositoryImpl swallows Redis exceptions; change the contract of ProductRankingRepository.saveScores to either throw the exception on failure or return a boolean success value, update RankingScheduler to check the returned success or catch the propagated exception and only emit the completion log (in RankingScheduler) when the operation succeeded, and add a unit/integration test that forces a Redis exception from ProductRankingRepositoryImpl to assert that the completion log is not emitted and the scheduler run surfaces a failure.
37-38:⚠️ Potential issue | 🔴 Critical멀티 인스턴스 환경에서 스케줄 중복 실행을 막는 분산 락이 필요하다
운영 관점에서 현재 구현은 인스턴스 수만큼 carry-over/집계가 중복 반영되어 랭킹 점수가 지속적으로 왜곡될 수 있다. 각 잡 시작 시점에 잡별 락 키(예:
ranking:carryover:yyyyMMdd,ranking:aggregate:yyyyMMdd)로SET NX + TTL락을 획득한 경우에만 본문을 실행하고,finally에서 토큰 검증 후 해제하도록 수정하는 편이 안전하다. 추가 테스트로 스케줄러 인스턴스 2개가 동일 시각에 두 메서드를 동시에 호출해도 Redis write가 1회만 발생하는 동시성 통합 테스트가 필요하다.Based on learnings:
In RankingCarryOverScheduler (apps/commerce-streamer), ... a Lua-based distributed lock (SET NX + atomic release) prevents multi-instance duplicate execution.Also applies to: 53-54
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java` around lines 37 - 38, The scheduled methods in RankingScheduler (notably carryOverRanking and the other scheduled method around lines 53-54) must acquire a Redis-based distributed lock (per-day keys like ranking:carryover:yyyyMMdd and ranking:aggregate:yyyyMMdd) using SET NX with a TTL and a unique token, execute the job body only if the lock was acquired, and in a finally block release the lock atomically via a Lua script that checks the token before deleting; implement lock acquisition/renewal helpers (e.g., acquireLock(key, ttl) -> token/null and releaseLock(key, token)) and use them at the start of carryOverRanking and the other scheduled method, and add an integration test that boots two scheduler instances concurrently and asserts Redis write occurs exactly once.apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductEventProcessor.java (1)
39-47:⚠️ Potential issue | 🟠 Major필수 필드 미검증으로 NPE 또는 잘못된 데이터 처리 위험이 존재한다
node.get("eventType")과node.get("productId")가 null인 경우를 검증하지 않는다. 운영 관점에서 문제되는 이유는 다음과 같다:
node.get("eventType")이 null이면asText()가 빈 문자열을 반환하여 Line 46의 warn 분기로 빠지지만,node.get("productId")가 null이면asLong()이 0을 반환하여 productId=0으로 메트릭이 적재된다.- 필드 자체가 존재하지 않거나 형식이 잘못된 메시지가 반복 재시도되면 파티션 지연이 발생할 수 있다.
수정안으로 필수 필드 존재 여부와 타입을 먼저 검증하고, 비정상 메시지는 경고 로그 후 스킵하거나 DLT로 분기해야 한다.
🔧 제안 수정안
LocalDate eventDate = LocalDate.ofInstant(Instant.ofEpochMilli(record.timestamp()), KST); JsonNode node = objectMapper.readTree(record.value().toString()); - String eventType = node.get("eventType").asText(); + JsonNode eventTypeNode = node.get("eventType"); + JsonNode productIdNode = node.get("productId"); + if (eventTypeNode == null || productIdNode == null || !productIdNode.canConvertToLong()) { + log.warn("필수 필드 누락/형식 오류. offset={}, value={}", record.offset(), record.value()); + return; + } + String eventType = eventTypeNode.asText(); if (EVENT_PRODUCT_VIEWED.equals(eventType)) { - Long productId = node.get("productId").asLong(); + Long productId = productIdNode.asLong(); productMetricsService.incrementViewCount(productId, eventDate); } else { log.warn("알 수 없는 상품 이벤트 타입: {}", eventType); return; }추가 테스트로
eventType누락,productId누락/문자열 타입, 비정상 JSON 입력 케이스를 넣어 재시도 루프가 발생하지 않는지 검증해야 한다.,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductEventProcessor.java` around lines 39 - 47, Validate required JSON fields before using them: in ProductEventProcessor where objectMapper.readTree(...) is used, check that node.get("eventType") and node.get("productId") are non-null and of expected types (eventType as text, productId as numeric) before calling asText()/asLong() and before branching on EVENT_PRODUCT_VIEWED; if validation fails, log a warning with the offending payload and reason, skip processing (or route to DLT) instead of calling productMetricsService.incrementViewCount(productId, eventDate), and add unit tests for missing eventType, missing/ non-numeric productId, and invalid JSON to ensure no retry loop occurs.apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java (1)
41-43:⚠️ Potential issue | 🟠 MajorJSON 파싱/필드 접근을 방어적으로 바꾸고 독성 메시지 재처리를 차단해야 한다.
- 운영 관점: Line 41~Line 43은 필드 누락, 타입 불일치, malformed JSON에서
NullPointerException/파싱 예외가 발생하고, 현재 소비자 흐름에서는 배치 실패로 재시도 루프와 컨슈머 지연을 유발한다.- 수정안:
readTree를try-catch로 감싸고,node.path(...)+ 타입 검증(canConvertToLong,isTextual) 후 유효하지 않은 메시지는 경고 로그 후 스킵 또는 DLT로 분기해야 한다.- 추가 테스트: (1)
productId누락, (2)eventType누락, (3)productId문자열/0/음수, (4) malformed JSON 입력에서 예외 전파 없이productMetricsService가 호출되지 않는지 검증해야 한다.수정 예시 diff다
+import com.fasterxml.jackson.core.JsonProcessingException; @@ - JsonNode node = objectMapper.readTree(record.value().toString()); - Long productId = node.get("productId").asLong(); - String eventType = node.get("eventType").asText(); + if (record.value() == null) { + log.warn("좋아요 이벤트 payload 누락. topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset()); + return; + } + + JsonNode node; + try { + node = objectMapper.readTree(String.valueOf(record.value())); + } catch (JsonProcessingException e) { + log.warn("좋아요 이벤트 JSON 파싱 실패. topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset(), e); + return; + } + + JsonNode productIdNode = node.path("productId"); + JsonNode eventTypeNode = node.path("eventType"); + if (!productIdNode.canConvertToLong() || !eventTypeNode.isTextual()) { + log.warn("좋아요 이벤트 필수 필드 오류. topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset()); + return; + } + + Long productId = productIdNode.asLong(); + String eventType = eventTypeNode.asText(); + if (productId <= 0L) { + log.warn("좋아요 이벤트 productId 비정상. productId={}, topic={}, partition={}, offset={}", + productId, record.topic(), record.partition(), record.offset()); + return; + }#!/bin/bash # 현재 코드에 방어 파싱/검증이 없는지와 관련 테스트 유무를 확인한다. fd -i LikeEventProcessor.java --exec sed -n '30,130p' {} rg -n --type java 'node\.get\("productId"\)\.asLong\(\)|node\.get\("eventType"\)\.asText\(\)|readTree\(' fd -i 'LikeEventProcessor*Test*.java' rg -n --type java 'LikeEventProcessor|malformed|invalid|productId|eventType'As per coding guidelines
**/*.java: null 처리, 예외 흐름이 명확한지 점검한다.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java` around lines 41 - 43, 해당 코드(LikeEventProcessor)에서 objectMapper.readTree(...)과 직후의 node.get("productId").asLong() / node.get("eventType").asText()를 방어적으로 바꿔야 합니다: readTree 호출을 try-catch로 감싸고, 파싱 실패 시 예외를 잡아 경고 로깅 후 메시지를 스킵하거나 DLT로 분기하도록 하며, 성공 시에는 node.get(...) 대신 node.path("productId")/node.path("eventType")를 사용해 존재 여부와 타입을 검증(canConvertToLong(), isTextual())하고 유효성(음수/0 등) 검사에 실패하면 productMetricsService 호출을 하지 않고 경고 로그 또는 DLT로 분기하도록 변경하세요; 변경 대상 심볼: LikeEventProcessor, readTree, node.path, productId 변수, eventType 변수, productMetricsService.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java`:
- Around line 59-64: In extractEventId(ConsumerRecord<Object, Object> record)
guard against null/empty header values: return null if
record.headers().lastHeader("eventId") is null OR header.value() is null OR
header.value().length == 0; otherwise decode header.value() with
StandardCharsets.UTF_8, call trim() on the resulting String and if the trimmed
result is empty return null, else return the trimmed id. Update related logic
that calls eventHandledRepository.save(...) so it only saves when
extractEventId(...) returns a non-null non-blank id, and add unit tests for the
cases: header missing, header value null, and header value blank to ensure save
is not invoked.
---
Duplicate comments:
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java`:
- Around line 41-43: 해당 코드(LikeEventProcessor)에서 objectMapper.readTree(...)과 직후의
node.get("productId").asLong() / node.get("eventType").asText()를 방어적으로 바꿔야 합니다:
readTree 호출을 try-catch로 감싸고, 파싱 실패 시 예외를 잡아 경고 로깅 후 메시지를 스킵하거나 DLT로 분기하도록 하며, 성공
시에는 node.get(...) 대신 node.path("productId")/node.path("eventType")를 사용해 존재 여부와
타입을 검증(canConvertToLong(), isTextual())하고 유효성(음수/0 등) 검사에 실패하면
productMetricsService 호출을 하지 않고 경고 로그 또는 DLT로 분기하도록 변경하세요; 변경 대상 심볼:
LikeEventProcessor, readTree, node.path, productId 변수, eventType 변수,
productMetricsService.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductEventProcessor.java`:
- Around line 39-47: Validate required JSON fields before using them: in
ProductEventProcessor where objectMapper.readTree(...) is used, check that
node.get("eventType") and node.get("productId") are non-null and of expected
types (eventType as text, productId as numeric) before calling asText()/asLong()
and before branching on EVENT_PRODUCT_VIEWED; if validation fails, log a warning
with the offending payload and reason, skip processing (or route to DLT) instead
of calling productMetricsService.incrementViewCount(productId, eventDate), and
add unit tests for missing eventType, missing/ non-numeric productId, and
invalid JSON to ensure no retry loop occurs.
In
`@apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java`:
- Around line 49-50: The scheduler currently unconditionally logs "랭킹 carry-over
완료" after calling productRankingRepository.saveScores, which hides failures
because ProductRankingRepositoryImpl swallows Redis exceptions; change the
contract of ProductRankingRepository.saveScores to either throw the exception on
failure or return a boolean success value, update RankingScheduler to check the
returned success or catch the propagated exception and only emit the completion
log (in RankingScheduler) when the operation succeeded, and add a
unit/integration test that forces a Redis exception from
ProductRankingRepositoryImpl to assert that the completion log is not emitted
and the scheduler run surfaces a failure.
- Around line 37-38: The scheduled methods in RankingScheduler (notably
carryOverRanking and the other scheduled method around lines 53-54) must acquire
a Redis-based distributed lock (per-day keys like ranking:carryover:yyyyMMdd and
ranking:aggregate:yyyyMMdd) using SET NX with a TTL and a unique token, execute
the job body only if the lock was acquired, and in a finally block release the
lock atomically via a Lua script that checks the token before deleting;
implement lock acquisition/renewal helpers (e.g., acquireLock(key, ttl) ->
token/null and releaseLock(key, token)) and use them at the start of
carryOverRanking and the other scheduled method, and add an integration test
that boots two scheduler instances concurrently and asserts Redis write occurs
exactly once.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 11f9da18-2efd-4c96-9dc7-b6275133a2f1
📒 Files selected for processing (5)
apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.javaapps/commerce-streamer/src/main/java/com/loopers/application/order/OrderEventProcessor.javaapps/commerce-streamer/src/main/java/com/loopers/application/product/ProductEventProcessor.javaapps/commerce-streamer/src/main/java/com/loopers/application/product/ProductMetricsService.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/RankingScheduler.java
🚧 Files skipped from review as they are similar to previous changes (2)
- apps/commerce-streamer/src/main/java/com/loopers/application/product/ProductMetricsService.java
- apps/commerce-streamer/src/main/java/com/loopers/application/order/OrderEventProcessor.java
| private String extractEventId(ConsumerRecord<Object, Object> record) { | ||
| Header header = record.headers().lastHeader("eventId"); | ||
| if (header == null) { | ||
| return null; | ||
| } | ||
| return new String(header.value(), StandardCharsets.UTF_8); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
fd -type f -name "LikeEventProcessor.java"Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 255
🏁 Script executed:
fd -type file -name "LikeEventProcessor.java"Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 255
🏁 Script executed:
find . -name "LikeEventProcessor.java"Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 176
🏁 Script executed:
cat -n ./apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.javaRepository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 3066
🏁 Script executed:
rg "Header" ./apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java -A 5 -B 5Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 930
🏁 Script executed:
find . -path "*/test/*" -name "*LikeEventProcessor*"Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 67
🏁 Script executed:
find . -path "*/test/*" -name "*Event*" -o -path "*/test/*" -name "*Like*" | head -20Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 646
🏁 Script executed:
rg "header.value()" --type java -B 3 -A 3Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 2178
🌐 Web query:
Kafka Header value() API documentation null return
💡 Result:
In the Apache Kafka client API, org.apache.kafka.common.header.Header#value() is allowed to return null.
The Kafka 4.2.0 Javadoc states for value():
- Returns: “the header’s value; may be null.” [1]
So a null return means the header exists but has a null value (Kafka requires the key to be non-null, but the value can be null). This is also reflected in other client docs (e.g., Confluent’s .NET client: “The header value (may be null)”). [2]
Sources: [1] [2]
🏁 Script executed:
rg "eventId" ./apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java -B 2 -A 2Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 692
eventId 헤더 value의 null 검증 누락으로 NPE 및 blank 데이터 오염 위험이 있다.
Line 64의 header.value()는 Kafka API 상 null을 반환할 수 있어 new String(header.value(), StandardCharsets.UTF_8) 호출 시 NPE가 발생한다. 또한 header.value()가 빈 바이트 배열이거나 공백만 포함한 경우 blank 문자열이 eventId로 저장되어, 이후 모든 blank eventId 메시지가 line 36의 중복 검사에서 동일한 eventId로 인식되어 대량 건너뛰기 문제가 발생한다.
수정안: header == null || header.value() == null || header.value().length == 0 조건으로 먼저 필터링한 후, 디코딩한 문자열을 trim()하여 결과가 공백이면 null 반환해야 한다.
추가 테스트: (1) header 없음, (2) header value null, (3) header value 공백인 경우 line 55의 eventHandledRepository.save(...)가 호출되지 않는지 검증해야 한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/application/like/LikeEventProcessor.java`
around lines 59 - 64, In extractEventId(ConsumerRecord<Object, Object> record)
guard against null/empty header values: return null if
record.headers().lastHeader("eventId") is null OR header.value() is null OR
header.value().length == 0; otherwise decode header.value() with
StandardCharsets.UTF_8, call trim() on the resulting String and if the trimmed
result is empty return null, else return the trimmed id. Update related logic
that calls eventHandledRepository.save(...) so it only saves when
extractEventId(...) returns a non-null non-blank id, and add unit tests for the
cases: header missing, header value null, and header value blank to ensure save
is not invoked.
📌 Summary
views×0.1 + likes×0.2 + sales×0.6)으로 일간 랭킹을 집계하고, carry-over 스케줄러로 전일 인기도를 이월하여 랭킹 연속성 보장 및 랭킹 점수 누락 현상을 방지한다.🧭 Context & Decision
문제 정의
선택지와 결정
🏗️ Design Overview
변경 범위
RankingFacade,RankingService,RankingCacheRepository,RankingCacheRepositoryImplProductRankingService,ProductRankingRepository,ProductRankingRepositoryImplRankingScheduler(일간 집계 + carry-over)RankingV1Controller,RankingV1ApiSpec주요 컴포넌트 책임
RankingFacade: 랭킹 조회 유스케이스 조합 — Redis에서 랭킹 ID 조회 후 상품/브랜드 정보와 조인RankingService: 날짜별 랭킹 캐시 조회 위임 (기본값: 오늘)ProductMetricsService: Kafka 이벤트 기반 메트릭 증감 처리 (DB UPSERT + 도메인 이벤트 발행)ProductRankingService:ProductMetricsUpdatedEvent수신 → ConcurrentHashMap 버퍼에 스코어 누적 → flush()로 Redis 일괄 반영RankingScheduler: 23:50 carry-over(전일 스코어 × 0.3 이월) + 00:00 일간 집계(DB 메트릭 → Redis 스코어)ProductMetricsJpaRepository: MySQLINSERT ON DUPLICATE KEY UPDATE로 원자적 UPSERT🔁 Flow Diagram
Main Flow — 실시간 스코어 갱신
sequenceDiagram autonumber participant Kafka participant Consumer as DomainEventConsumer participant MetricsSvc as ProductMetricsService participant DB as MySQL (ProductMetrics) participant RankingSvc as ProductRankingService participant Redis as Redis (ZSET) Kafka->>Consumer: 이벤트 배치 수신 Consumer->>Consumer: 멱등성 체크 (EventHandled) Consumer->>MetricsSvc: increment*Count(productId) MetricsSvc->>DB: UPSERT (INSERT ON DUPLICATE KEY UPDATE) MetricsSvc-->>RankingSvc: ProductMetricsUpdatedEvent 발행 RankingSvc->>RankingSvc: 스코어 버퍼에 누적 (ConcurrentHashMap) Consumer->>RankingSvc: flush() (배치 완료 후) RankingSvc->>Redis: ZINCRBY ranking:all:{date} (pipelined)Main Flow — 일간 집계 스케줄러
sequenceDiagram autonumber participant Scheduler as RankingScheduler Note over Scheduler: 23:50 carry-over Scheduler->>Redis: ZRANGEWITHSCORES ranking:all:{today} Redis-->>Scheduler: 전체 스코어 목록 Scheduler->>Scheduler: 스코어 × 0.3 (감쇠) Scheduler->>Redis: ZADD ranking:all:{tomorrow} (pipelined) Note over Scheduler: 00:00 일간 집계 Scheduler->>DB: SELECT * FROM product_metrics WHERE metric_date = yesterday DB-->>Scheduler: 전일 메트릭 목록 Scheduler->>Scheduler: score = views×0.1 + likes×0.2 + sales×0.6 Scheduler->>Redis: ZINCRBY ranking:all:{today} (pipelined)Main Flow — 랭킹 조회 API
sequenceDiagram autonumber participant Client participant Controller as RankingV1Controller participant Facade as RankingFacade participant RankingSvc as RankingService participant Redis as Redis (ZSET) participant ProductSvc as ProductService Client->>Controller: GET /api/v1/rankings?date&page&size Controller->>Facade: getRankings(date, pageable) Facade->>RankingSvc: getRankedProductIds(date, offset, size) RankingSvc->>Redis: ZREVRANGE ranking:all:{date} Redis-->>RankingSvc: productId 목록 (스코어 내림차순) Facade->>RankingSvc: getTotalCount(date) RankingSvc->>Redis: ZCARD ranking:all:{date} Facade->>ProductSvc: getProducts(productIds) ProductSvc->>DB: 상품 + 브랜드 조인 조회 DB-->>ProductSvc: ProductWithBrandResult 목록 Facade-->>Controller: Page<ProductWithBrandResult> Controller-->>Client: PageResponse<ProductWithBrandDetailResponse> (rank 포함)리뷰 포인트
1. product_metrics 테이블 구조와 스케줄러 기반 랭킹 집계 흐름이 올바르게 설계되었는지 리뷰 부탁드립니다.
product_metrics 테이블에 일별/상품별로 metric 정보를 적재하고, 스케줄러가 해당 데이터를 읽어 랭킹 점수를 계산하는 방식을 선택했습니다. 23:50에 전일 스코어의 30%를 이월한 뒤, 00:00에 전일 메트릭을 가중치 계산하여 합산합니다.
Kafka 이벤트를 실시간으로 소비하여 랭킹 점수를 바로 계산하는 방식도 고려했지만, Redis 장애 시 영속화된 데이터가 없다면 랭킹 점수를 복구할 방법이 없어 스케줄러 기반을 선택했습니다.
다만, 현재 구조에서 오늘 사용자에게 보여주는 랭킹 정보는 어제 수집된 metric 데이터를 기반으로 계산된 랭킹 점수입니다. 어제 기준 랭킹이라는 문제가 있습니다.
랭킹 점수 계산 스케쥴 코드
metric 집계 테이블 구조
2. 시간별 랭킹 확장 시 데이터 증가에 대한 우려
요구사항에 시간별 랭킹이 추가된다면 시간을 구분하는 컬럼 추가와 함께 데이터가 상품 수 × 24(시간)로 증가하는 문제가 있습니다. 이 부분이 조금 우려되는데, 요구사항에 의해 어쩔 수 없는 현상일지, 더 적합한 방법이 있을지 의견을 듣고 싶습니다.
시간별 랭킹 확장 시 구조 (예시)