From 034453c19fe9b659855acd3fa5bf7a6ff568bc7b Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 29 May 2025 10:46:19 +0800 Subject: [PATCH] Subscription: retain tsfile events in tsfile batch to avoid premature commit (#15598) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consider the historical data export snapshot scenario: 1. The events delivered upstream are, in order, the tsfile event and the termination event. 2. The tsfile event is parsed into multiple tablet events, and then the reference count of the tsfile event is set to 0 (should report as false). 3. Assuming that for some reason the tablet events were not sent to the peer in time, the reference count of the transfer termination event is set to 0 (should report as true). 4. At this point, because the tablet events were not enriched with a commit id (see Subscription: fully managed tsfile parsing process for tsfile format topicĀ #15524), the termination event successfully marks the corresponding DR complete, which in turn leads to data loss. --- .../event/batch/SubscriptionPipeTsFileEventBatch.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index b4782507e47cd..7b1a6caf61069 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -57,12 +57,22 @@ public SubscriptionPipeTsFileEventBatch( @Override public synchronized void ack() { batch.decreaseEventsReferenceCount(this.getClass().getName(), true); + enrichedEvents.stream() + // only decrease reference count for tsfile event, since we already decrease reference count + // for tablet event in batch + .filter(event -> event instanceof PipeTsFileInsertionEvent) + .forEach(event -> event.decreaseReferenceCount(this.getClass().getName(), true)); } @Override public synchronized void cleanUp(final boolean force) { // close batch, it includes clearing the reference count of events batch.close(); + + // clear the reference count of events + for (final EnrichedEvent enrichedEvent : enrichedEvents) { + enrichedEvent.clearReferenceCount(this.getClass().getName()); + } enrichedEvents.clear(); } @@ -102,7 +112,6 @@ protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { } finally { try { event.close(); - ((PipeTsFileInsertionEvent) event).decreaseReferenceCount(this.getClass().getName(), false); } catch (final Exception ignored) { // no exceptions will be thrown }