diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index b4782507e47cd..7b1a6caf61069 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -57,12 +57,22 @@ public SubscriptionPipeTsFileEventBatch( @Override public synchronized void ack() { batch.decreaseEventsReferenceCount(this.getClass().getName(), true); + enrichedEvents.stream() + // only decrease reference count for tsfile event, since we already decrease reference count + // for tablet event in batch + .filter(event -> event instanceof PipeTsFileInsertionEvent) + .forEach(event -> event.decreaseReferenceCount(this.getClass().getName(), true)); } @Override public synchronized void cleanUp(final boolean force) { // close batch, it includes clearing the reference count of events batch.close(); + + // clear the reference count of events + for (final EnrichedEvent enrichedEvent : enrichedEvents) { + enrichedEvent.clearReferenceCount(this.getClass().getName()); + } enrichedEvents.clear(); } @@ -102,7 +112,6 @@ protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { } finally { try { event.close(); - ((PipeTsFileInsertionEvent) event).decreaseReferenceCount(this.getClass().getName(), false); } catch (final Exception ignored) { // no exceptions will be thrown }