1010import org .apache .kafka .common .header .internals .RecordHeader ;
1111import org .apache .kafka .common .header .internals .RecordHeaders ;
1212import org .apache .kafka .common .record .TimestampType ;
13+
14+ import java .util .Optional ;
1315import org .junit .jupiter .api .DisplayName ;
1416import org .junit .jupiter .api .Test ;
1517import org .junit .jupiter .api .extension .ExtendWith ;
@@ -61,7 +63,7 @@ void canConsumeLikeAddedEvent() {
6163 headers .add (new RecordHeader ("version" , "1" .getBytes (StandardCharsets .UTF_8 )));
6264
6365 ConsumerRecord <String , Object > record = new ConsumerRecord <>(
64- "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event , headers
66+ "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional . empty ()
6567 );
6668 List <ConsumerRecord <String , Object >> records = List .of (record );
6769
@@ -91,7 +93,7 @@ void canConsumeLikeRemovedEvent() {
9193 headers .add (new RecordHeader ("version" , "2" .getBytes (StandardCharsets .UTF_8 )));
9294
9395 ConsumerRecord <String , Object > record = new ConsumerRecord <>(
94- "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event , headers
96+ "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional . empty ()
9597 );
9698 List <ConsumerRecord <String , Object >> records = List .of (record );
9799
@@ -131,7 +133,7 @@ void canConsumeOrderCreatedEvent() {
131133 headers .add (new RecordHeader ("version" , "3" .getBytes (StandardCharsets .UTF_8 )));
132134
133135 ConsumerRecord <String , Object > record = new ConsumerRecord <>(
134- "order-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event , headers
136+ "order-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional . empty ()
135137 );
136138 List <ConsumerRecord <String , Object >> records = List .of (record );
137139
@@ -168,8 +170,8 @@ void canConsumeMultipleEvents() {
168170 headers2 .add (new RecordHeader ("version" , "5" .getBytes (StandardCharsets .UTF_8 )));
169171
170172 List <ConsumerRecord <String , Object >> records = List .of (
171- new ConsumerRecord <>("like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event1 , headers1 ),
172- new ConsumerRecord <>("like-events" , 0 , 1L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event2 , headers2 )
173+ new ConsumerRecord <>("like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event1 , headers1 , Optional . empty () ),
174+ new ConsumerRecord <>("like-events" , 0 , 1L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event2 , headers2 , Optional . empty () )
173175 );
174176
175177 when (eventHandledService .isAlreadyHandled (eventId1 )).thenReturn (false );
@@ -213,8 +215,8 @@ void continuesProcessing_whenIndividualEventFails() {
213215 .when (productMetricsService ).incrementLikeCount (any (), anyLong ());
214216
215217 List <ConsumerRecord <String , Object >> records = List .of (
216- new ConsumerRecord <>("like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , invalidEvent , headers1 ),
217- new ConsumerRecord <>("like-events" , 0 , 1L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , validEvent , headers2 )
218+ new ConsumerRecord <>("like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , invalidEvent , headers1 , Optional . empty () ),
219+ new ConsumerRecord <>("like-events" , 0 , 1L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , validEvent , headers2 , Optional . empty () )
218220 );
219221
220222 // act
@@ -247,7 +249,7 @@ void acknowledgesEvenWhenIndividualEventFails() {
247249 .when (productMetricsService ).incrementLikeCount (eq (productId ), anyLong ());
248250
249251 List <ConsumerRecord <String , Object >> records = List .of (
250- new ConsumerRecord <>("like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event , headers )
252+ new ConsumerRecord <>("like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional . empty () )
251253 );
252254
253255 // act
@@ -275,7 +277,7 @@ void skipsAlreadyHandledEvent() {
275277 headers .add (new RecordHeader ("eventId" , eventId .getBytes (StandardCharsets .UTF_8 )));
276278
277279 ConsumerRecord <String , Object > record = new ConsumerRecord <>(
278- "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event , headers
280+ "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional . empty ()
279281 );
280282 List <ConsumerRecord <String , Object >> records = List .of (record );
281283
@@ -327,7 +329,7 @@ void handlesDataIntegrityViolationException() {
327329 headers .add (new RecordHeader ("version" , "9" .getBytes (StandardCharsets .UTF_8 )));
328330
329331 ConsumerRecord <String , Object > record = new ConsumerRecord <>(
330- "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0L , 0 , 0 , "key" , event , headers
332+ "like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional . empty ()
331333 );
332334 List <ConsumerRecord <String , Object >> records = List .of (record );
333335
@@ -344,4 +346,48 @@ void handlesDataIntegrityViolationException() {
344346 verify (eventHandledService ).markAsHandled (eventId , "LikeAdded" , "like-events" );
345347 verify (acknowledgment ).acknowledge ();
346348 }
349+
350+ @ DisplayName ("중복 메시지 재전송 시 한 번만 처리되어 멱등성이 보장된다." )
351+ @ Test
352+ void handlesDuplicateMessagesIdempotently () {
353+ // arrange
354+ String eventId = "duplicate-event-id" ;
355+ Long productId = 1L ;
356+ Long userId = 100L ;
357+ Long eventVersion = 1L ;
358+ LikeEvent .LikeAdded event = new LikeEvent .LikeAdded (userId , productId , LocalDateTime .now ());
359+
360+ Headers headers = new RecordHeaders ();
361+ headers .add (new RecordHeader ("eventId" , eventId .getBytes (StandardCharsets .UTF_8 )));
362+ headers .add (new RecordHeader ("version" , String .valueOf (eventVersion ).getBytes (StandardCharsets .UTF_8 )));
363+
364+ // 동일한 eventId를 가진 메시지 3개 생성
365+ List <ConsumerRecord <String , Object >> records = List .of (
366+ new ConsumerRecord <>("like-events" , 0 , 0L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional .empty ()),
367+ new ConsumerRecord <>("like-events" , 0 , 1L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional .empty ()),
368+ new ConsumerRecord <>("like-events" , 0 , 2L , 0L , TimestampType .CREATE_TIME , 0 , 0 , "key" , event , headers , Optional .empty ())
369+ );
370+
371+ // 첫 번째 메시지는 처리되지 않았으므로 false, 나머지는 이미 처리되었으므로 true
372+ when (eventHandledService .isAlreadyHandled (eventId ))
373+ .thenReturn (false ) // 첫 번째: 처리됨
374+ .thenReturn (true ) // 두 번째: 이미 처리됨 (스킵)
375+ .thenReturn (true ); // 세 번째: 이미 처리됨 (스킵)
376+
377+ // act
378+ productMetricsConsumer .consumeLikeEvents (records , acknowledgment );
379+
380+ // assert
381+ // isAlreadyHandled는 3번 호출됨 (각 메시지마다)
382+ verify (eventHandledService , times (3 )).isAlreadyHandled (eventId );
383+
384+ // incrementLikeCount는 한 번만 호출되어야 함 (첫 번째 메시지만 처리)
385+ verify (productMetricsService , times (1 )).incrementLikeCount (eq (productId ), eq (eventVersion ));
386+
387+ // markAsHandled는 한 번만 호출되어야 함 (첫 번째 메시지만 처리)
388+ verify (eventHandledService , times (1 )).markAsHandled (eventId , "LikeAdded" , "like-events" );
389+
390+ // acknowledgment는 한 번만 호출되어야 함 (배치 처리 완료)
391+ verify (acknowledgment , times (1 )).acknowledge ();
392+ }
347393}
0 commit comments