From df63a3d0f514c56d9c07c523b80b86955896e49d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Wed, 4 Jun 2025 16:03:52 +0800 Subject: [PATCH 1/4] Pipe: Add retry when TsFile parsing failed to avoid race among processor threads --- .../task/connection/PipeEventCollector.java | 2 + .../processor/PipeProcessorSubtask.java | 32 +++++++++++-- .../websocket/WebSocketConnector.java | 36 ++++++++++++-- .../tsfile/PipeTsFileInsertionEvent.java | 38 +++++++++++++-- .../aggregate/AggregateProcessor.java | 36 ++++++++++++-- .../downsampling/DownSamplingProcessor.java | 37 +++++++++++++-- .../SubscriptionPipeTsFileEventBatch.java | 47 +++++++++++++++---- 7 files changed, 200 insertions(+), 28 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index a78c4e2e4efb9..979c3fef66ad9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -150,6 +150,8 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th final TabletInsertionEvent parsedEvent = iterator.next(); int retryCount = 0; while (true) { + // If failed due do insufficient memory, retry until success to avoid race among multiple + // processor threads try { collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent); break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 4b11ef9728590..327c77d46f481 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -48,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -149,9 +150,34 @@ protected boolean executeOnce() throws Exception { && ((PipeTsFileInsertionEvent) event).shouldParse4Privilege()) { try (final PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) event) { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - pipeProcessor.process(tabletInsertionEvent, outputEventCollector); + final Iterable iterable = + tsFileInsertionEvent.toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among + // multiple processor threads + try { + pipeProcessor.process(parsedEvent, outputEventCollector); + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "PipeProcessorSubtask: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + tsFileInsertionEvent.getTsFile(), + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "PipeProcessorSubtask: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + tsFileInsertionEvent.getTsFile(), + retryCount, + e); + } + } + } } } } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index a9ed5cb46a764..461623b62bf10 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.websocket; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.Iterator; import java.util.Optional; @TreeModel @@ -141,10 +143,36 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception } try { - for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - // Skip report if any tablet events is added - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit(); - transfer(event); + final Iterable iterable = + tsFileInsertionEvent.toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among multiple + // processor threads + try { + // Skip report if any tablet events is added + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit(); + transfer(parsedEvent); + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "WebSocketConnector: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "WebSocketConnector: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), + retryCount, + e); + } + } + } } } finally { tsFileInsertionEvent.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 2c3cfd10cf29b..8f5c625782a85 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; @@ -55,6 +56,7 @@ import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -689,11 +691,37 @@ public long count(final boolean skipReportOnCommit) throws IOException { if (shouldParseTime()) { try { - for (final TabletInsertionEvent event : toTabletInsertionEvents()) { - final PipeRawTabletInsertionEvent rawEvent = ((PipeRawTabletInsertionEvent) event); - count += rawEvent.count(); - if (skipReportOnCommit) { - rawEvent.skipReportOnCommit(); + final Iterable iterable = toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among + // multiple processor threads + try { + final PipeRawTabletInsertionEvent rawEvent = + ((PipeRawTabletInsertionEvent) parsedEvent); + count += rawEvent.count(); + if (skipReportOnCommit) { + rawEvent.skipReportOnCommit(); + } + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "PipeTsFileInsertionEvent::count: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + getTsFile(), + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "PipeTsFileInsertionEvent::count: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + getTsFile(), + retryCount, + e); + } + } } } return count; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index ec1683358d44b..281c60bc0a87c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -65,6 +66,8 @@ import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -75,6 +78,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -108,6 +112,7 @@ */ @TreeModel public class AggregateProcessor implements PipeProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(AggregateProcessor.class); private static final String WINDOWING_PROCESSOR_SUFFIX = "-windowing-processor"; private String pipeName; @@ -524,9 +529,34 @@ public void process( final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector eventCollector) throws Exception { try { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - process(tabletInsertionEvent, eventCollector); + final Iterable iterable = + tsFileInsertionEvent.toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among multiple + // processor threads + try { + process(parsedEvent, eventCollector); + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "AggregateProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "AggregateProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), + retryCount, + e); + } + } + } } } finally { tsFileInsertionEvent.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index fd631772b930a..0b4d1d6130743 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -20,9 +20,11 @@ package org.apache.iotdb.db.pipe.processor.downsampling; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.access.Row; @@ -36,7 +38,10 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.tsfile.common.constant.TsFileConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE; @@ -45,6 +50,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY; public abstract class DownSamplingProcessor implements PipeProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(DownSamplingProcessor.class); protected long memoryLimitInBytes; @@ -149,9 +155,34 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev throws Exception { if (shouldSplitFile) { try { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - process(tabletInsertionEvent, eventCollector); + final Iterable iterable = + tsFileInsertionEvent.toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among + // multiple processor threads + try { + process(parsedEvent, eventCollector); + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "DownSamplingProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "DownSamplingProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), + retryCount, + e); + } + } + } } } finally { tsFileInsertionEvent.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index e6328a39ef4ac..6609c496b6fa7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.subscription.event.batch; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -36,6 +37,7 @@ import java.io.File; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -96,17 +98,42 @@ protected void onTabletInsertionEvent(final TabletInsertionEvent event) { protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { // TODO: parse tsfile event on the fly like SubscriptionPipeTabletEventBatch try { - for (final TabletInsertionEvent parsedEvent : event.toTabletInsertionEvents()) { - if (!((PipeRawTabletInsertionEvent) parsedEvent) - .increaseReferenceCount(this.getClass().getName())) { - LOGGER.warn( - "SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.", - ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage()); - } else { + final Iterable iterable = event.toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among multiple + // processor threads try { - batch.onEvent(parsedEvent); - } catch (final Exception ignored) { - // no exceptions will be thrown + if (!((PipeRawTabletInsertionEvent) parsedEvent) + .increaseReferenceCount(this.getClass().getName())) { + LOGGER.warn( + "SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.", + ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage()); + } else { + try { + batch.onEvent(parsedEvent); + } catch (final Exception ignored) { + // no exceptions will be thrown + } + } + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "SubscriptionPipeTsFileEventBatch: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) event).getTsFile(), + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "SubscriptionPipeTsFileEventBatch: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + ((PipeTsFileInsertionEvent) event).getTsFile(), + retryCount, + e); + } } } } From 897bae7f100a037fe7667c5a00acac272fde19b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Wed, 4 Jun 2025 16:49:21 +0800 Subject: [PATCH 2/4] refactor --- .../db/pipe/agent/task/connection/PipeEventCollector.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index 979c3fef66ad9..01c738f3f15a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -146,8 +146,10 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th try { final Iterable iterable = sourceEvent.toTabletInsertionEvents(); final Iterator iterator = iterable.iterator(); + int tabletEventCount = 0; while (iterator.hasNext()) { final TabletInsertionEvent parsedEvent = iterator.next(); + tabletEventCount++; int retryCount = 0; while (true) { // If failed due do insufficient memory, retry until success to avoid race among multiple @@ -158,14 +160,16 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th } catch (final PipeRuntimeOutOfMemoryCriticalException e) { if (retryCount++ % 100 == 0) { LOGGER.warn( - "parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + "parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", sourceEvent.getTsFile(), + tabletEventCount, retryCount, e); } else if (LOGGER.isDebugEnabled()) { LOGGER.debug( - "parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", + "parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", sourceEvent.getTsFile(), + tabletEventCount, retryCount, e); } From 3c8ba193fca82bf6509b8ea427189a3b823621ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Thu, 5 Jun 2025 16:13:48 +0800 Subject: [PATCH 3/4] refactor --- .../task/connection/PipeEventCollector.java | 38 +-------- .../processor/PipeProcessorSubtask.java | 39 +++------ .../websocket/WebSocketConnector.java | 41 ++------- .../tsfile/PipeTsFileInsertionEvent.java | 85 ++++++++++++------- .../aggregate/AggregateProcessor.java | 48 +++++------ .../downsampling/DownSamplingProcessor.java | 48 +++++------ .../SubscriptionPipeTsFileEventBatch.java | 58 ++++--------- 7 files changed, 128 insertions(+), 229 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index 01c738f3f15a6..10559669a8449 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.pipe.agent.task.connection; -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; @@ -36,13 +35,11 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; public class PipeEventCollector implements EventCollector { @@ -144,38 +141,9 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th } try { - final Iterable iterable = sourceEvent.toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); - int tabletEventCount = 0; - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - tabletEventCount++; - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among multiple - // processor threads - try { - collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent); - break; - } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - "parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", - sourceEvent.getTsFile(), - tabletEventCount, - retryCount, - e); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", - sourceEvent.getTsFile(), - tabletEventCount, - retryCount, - e); - } - } - } - } + sourceEvent.consumeTabletInsertionEventsWithRetry( + this::collectParsedRawTableEvent, + this.getClass().getSimpleName() + "::parseAndCollectEvent"); } finally { sourceEvent.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 327c77d46f481..e21bb44fbd6b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -48,7 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -150,34 +149,18 @@ protected boolean executeOnce() throws Exception { && ((PipeTsFileInsertionEvent) event).shouldParse4Privilege()) { try (final PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) event) { - final Iterable iterable = - tsFileInsertionEvent.toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among - // multiple processor threads - try { - pipeProcessor.process(parsedEvent, outputEventCollector); - break; - } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - "PipeProcessorSubtask: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - tsFileInsertionEvent.getTsFile(), - retryCount, - e); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "PipeProcessorSubtask: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - tsFileInsertionEvent.getTsFile(), - retryCount, - e); + final AtomicReference ex = new AtomicReference<>(); + tsFileInsertionEvent.consumeTabletInsertionEventsWithRetry( + event1 -> { + try { + pipeProcessor.process(event1, outputEventCollector); + } catch (Exception e) { + ex.set(e); } - } - } + }, + this.getClass().getSimpleName() + "::executeOnce"); + if (ex.get() != null) { + throw ex.get(); } } } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index 461623b62bf10..f660484ed6a5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.pipe.connector.protocol.websocket; -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -39,7 +38,6 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.Iterator; import java.util.Optional; @TreeModel @@ -143,37 +141,14 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception } try { - final Iterable iterable = - tsFileInsertionEvent.toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among multiple - // processor threads - try { - // Skip report if any tablet events is added - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit(); - transfer(parsedEvent); - break; - } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - "WebSocketConnector: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), - retryCount, - e); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "WebSocketConnector: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), - retryCount, - e); - } - } - } - } + ((PipeTsFileInsertionEvent) tsFileInsertionEvent) + .consumeTabletInsertionEventsWithRetry( + event -> { + // Skip report if any tablet events is added + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit(); + transfer(event); + }, + this.getClass().getSimpleName() + "::transfer"); } finally { tsFileInsertionEvent.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 8f5c625782a85..e1de816541d39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -62,6 +62,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT; @@ -564,6 +565,49 @@ public boolean isTableModelEvent() { /////////////////////////// TsFileInsertionEvent /////////////////////////// + @FunctionalInterface + public interface TabletInsertionEventConsumer { + void consume(final PipeRawTabletInsertionEvent event); + } + + public void consumeTabletInsertionEventsWithRetry( + final TabletInsertionEventConsumer consumer, final String callerName) throws PipeException { + final Iterable iterable = toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + int tabletEventCount = 0; + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + tabletEventCount++; + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among multiple + // processor threads + try { + consumer.consume((PipeRawTabletInsertionEvent) parsedEvent); + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", + callerName, + getTsFile(), + tabletEventCount, + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", + callerName, + getTsFile(), + tabletEventCount, + retryCount, + e); + } + } + } + } + } + @Override public Iterable toTabletInsertionEvents() throws PipeException { // 20 - 40 seconds for waiting @@ -687,44 +731,19 @@ private TsFileInsertionEventParser initEventParser() { } public long count(final boolean skipReportOnCommit) throws IOException { - long count = 0; + AtomicLong count = new AtomicLong(); if (shouldParseTime()) { try { - final Iterable iterable = toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among - // multiple processor threads - try { - final PipeRawTabletInsertionEvent rawEvent = - ((PipeRawTabletInsertionEvent) parsedEvent); - count += rawEvent.count(); + consumeTabletInsertionEventsWithRetry( + event -> { + count.addAndGet(event.count()); if (skipReportOnCommit) { - rawEvent.skipReportOnCommit(); - } - break; - } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - "PipeTsFileInsertionEvent::count: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - getTsFile(), - retryCount, - e); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "PipeTsFileInsertionEvent::count: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - getTsFile(), - retryCount, - e); + event.skipReportOnCommit(); } - } - } - } - return count; + }, + this.getClass().getSimpleName() + "::count"); + return count.get(); } finally { close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 281c60bc0a87c..1ce6c77b7ffb9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -78,7 +77,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -529,33 +527,25 @@ public void process( final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector eventCollector) throws Exception { try { - final Iterable iterable = - tsFileInsertionEvent.toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among multiple - // processor threads - try { - process(parsedEvent, eventCollector); - break; - } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - "AggregateProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), - retryCount, - e); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "AggregateProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), - retryCount, - e); - } - } + if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { + final AtomicReference ex = new AtomicReference<>(); + ((PipeTsFileInsertionEvent) tsFileInsertionEvent) + .consumeTabletInsertionEventsWithRetry( + event -> { + try { + process(event, eventCollector); + } catch (Exception e) { + ex.set(e); + } + }, + this.getClass().getSimpleName() + "::process"); + if (ex.get() != null) { + throw ex.get(); + } + } else { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); } } } finally { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index 0b4d1d6130743..8422e011a53c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.processor.downsampling; import org.apache.iotdb.commons.consensus.DataRegionId; -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -41,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE; @@ -155,33 +153,25 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev throws Exception { if (shouldSplitFile) { try { - final Iterable iterable = - tsFileInsertionEvent.toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among - // multiple processor threads - try { - process(parsedEvent, eventCollector); - break; - } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - "DownSamplingProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), - retryCount, - e); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "DownSamplingProcessor: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile(), - retryCount, - e); - } - } + if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { + final AtomicReference ex = new AtomicReference<>(); + ((PipeTsFileInsertionEvent) tsFileInsertionEvent) + .consumeTabletInsertionEventsWithRetry( + event -> { + try { + process(event, eventCollector); + } catch (Exception e) { + ex.set(e); + } + }, + this.getClass().getSimpleName() + "::process"); + if (ex.get() != null) { + throw ex.get(); + } + } else { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); } } } finally { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 6609c496b6fa7..404ac09af3bd5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -19,10 +19,8 @@ package org.apache.iotdb.db.subscription.event.batch; -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch; -import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; @@ -37,7 +35,6 @@ import java.io.File; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -98,45 +95,22 @@ protected void onTabletInsertionEvent(final TabletInsertionEvent event) { protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { // TODO: parse tsfile event on the fly like SubscriptionPipeTabletEventBatch try { - final Iterable iterable = event.toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among multiple - // processor threads - try { - if (!((PipeRawTabletInsertionEvent) parsedEvent) - .increaseReferenceCount(this.getClass().getName())) { - LOGGER.warn( - "SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.", - ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage()); - } else { - try { - batch.onEvent(parsedEvent); - } catch (final Exception ignored) { - // no exceptions will be thrown - } - } - break; - } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - "SubscriptionPipeTsFileEventBatch: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) event).getTsFile(), - retryCount, - e); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "SubscriptionPipeTsFileEventBatch: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.", - ((PipeTsFileInsertionEvent) event).getTsFile(), - retryCount, - e); - } - } - } - } + ((PipeTsFileInsertionEvent) event) + .consumeTabletInsertionEventsWithRetry( + event1 -> { + if (!event1.increaseReferenceCount(this.getClass().getName())) { + LOGGER.warn( + "SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.", + event1.coreReportMessage()); + } else { + try { + batch.onEvent(event1); + } catch (final Exception ignored) { + // no exceptions will be thrown + } + } + }, + this.getClass().getSimpleName() + "::onTsFileInsertionEvent"); } finally { try { event.close(); From 69f11567be49acdc05b88d183e41b1c24e35b7f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Thu, 5 Jun 2025 18:07:08 +0800 Subject: [PATCH 4/4] refactor --- .../db/pipe/agent/task/connection/PipeEventCollector.java | 3 +-- .../agent/task/subtask/processor/PipeProcessorSubtask.java | 2 +- .../connector/protocol/websocket/WebSocketConnector.java | 2 +- .../pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 2 +- .../db/pipe/processor/aggregate/AggregateProcessor.java | 5 +---- .../pipe/processor/downsampling/DownSamplingProcessor.java | 6 +----- .../event/batch/SubscriptionPipeTsFileEventBatch.java | 2 +- 7 files changed, 7 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index 10559669a8449..e64248a57979a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -142,8 +142,7 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th try { sourceEvent.consumeTabletInsertionEventsWithRetry( - this::collectParsedRawTableEvent, - this.getClass().getSimpleName() + "::parseAndCollectEvent"); + this::collectParsedRawTableEvent, "PipeEventCollector::parseAndCollectEvent"); } finally { sourceEvent.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index e21bb44fbd6b9..1f7262c0c1647 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -158,7 +158,7 @@ protected boolean executeOnce() throws Exception { ex.set(e); } }, - this.getClass().getSimpleName() + "::executeOnce"); + "PipeProcessorSubtask::executeOnce"); if (ex.get() != null) { throw ex.get(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index f660484ed6a5f..57c51af161e18 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -148,7 +148,7 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit(); transfer(event); }, - this.getClass().getSimpleName() + "::transfer"); + "WebSocketConnector::transfer"); } finally { tsFileInsertionEvent.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index e1de816541d39..55f40750662ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -742,7 +742,7 @@ public long count(final boolean skipReportOnCommit) throws IOException { event.skipReportOnCommit(); } }, - this.getClass().getSimpleName() + "::count"); + "PipeTsFileInsertionEvent::count"); return count.get(); } finally { close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 1ce6c77b7ffb9..1119deaf71287 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -65,8 +65,6 @@ import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.MeasurementSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -110,7 +108,6 @@ */ @TreeModel public class AggregateProcessor implements PipeProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(AggregateProcessor.class); private static final String WINDOWING_PROCESSOR_SUFFIX = "-windowing-processor"; private String pipeName; @@ -538,7 +535,7 @@ public void process( ex.set(e); } }, - this.getClass().getSimpleName() + "::process"); + "AggregateProcessor::process"); if (ex.get() != null) { throw ex.get(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index 8422e011a53c4..a8e0c270570bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -37,8 +37,6 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.tsfile.common.constant.TsFileConstant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicReference; @@ -48,8 +46,6 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY; public abstract class DownSamplingProcessor implements PipeProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(DownSamplingProcessor.class); - protected long memoryLimitInBytes; protected boolean shouldSplitFile; @@ -164,7 +160,7 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev ex.set(e); } }, - this.getClass().getSimpleName() + "::process"); + "DownSamplingProcessor::process"); if (ex.get() != null) { throw ex.get(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 404ac09af3bd5..d8c68d8ec2bc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -110,7 +110,7 @@ protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { } } }, - this.getClass().getSimpleName() + "::onTsFileInsertionEvent"); + "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent"); } finally { try { event.close();