diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index fd863b2ab8a4e..66bc5ab2a5088 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; @@ -141,9 +140,8 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th } try { - for (final TabletInsertionEvent parsedEvent : sourceEvent.toTabletInsertionEvents()) { - collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent); - } + sourceEvent.consumeTabletInsertionEventsWithRetry( + this::collectParsedRawTableEvent, "PipeEventCollector::parseAndCollectEvent"); } finally { sourceEvent.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java index cf25845234603..92436366511e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java @@ -139,11 +139,14 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception } try { - for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { - // Skip report if any tablet events is added - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit(); - transfer(event); - } + ((PipeTsFileInsertionEvent) tsFileInsertionEvent) + .consumeTabletInsertionEventsWithRetry( + event -> { + // Skip report if any tablet events is added + ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit(); + transfer(event); + }, + "WebSocketConnector::transfer"); } finally { tsFileInsertionEvent.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index b648af30e6fc6..a28433926364a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; @@ -50,11 +51,13 @@ import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class PipeTsFileInsertionEvent extends EnrichedEvent @@ -413,6 +416,49 @@ public boolean mayEventPathsOverlappedWithPattern() { /////////////////////////// TsFileInsertionEvent /////////////////////////// + @FunctionalInterface + public interface TabletInsertionEventConsumer { + void consume(final PipeRawTabletInsertionEvent event); + } + + public void consumeTabletInsertionEventsWithRetry( + final TabletInsertionEventConsumer consumer, final String callerName) throws PipeException { + final Iterable iterable = toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + int tabletEventCount = 0; + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + tabletEventCount++; + int retryCount = 0; + while (true) { + // If failed due do insufficient memory, retry until success to avoid race among multiple + // processor threads + try { + consumer.consume((PipeRawTabletInsertionEvent) parsedEvent); + break; + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + if (retryCount++ % 100 == 0) { + LOGGER.warn( + "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", + callerName, + getTsFile(), + tabletEventCount, + retryCount, + e); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.", + callerName, + getTsFile(), + tabletEventCount, + retryCount, + e); + } + } + } + } + } + @Override public Iterable toTabletInsertionEvents() throws PipeException { // 20 - 40 seconds for waiting @@ -528,18 +574,19 @@ private TsFileInsertionDataContainer initDataContainer() { } public long count(final boolean skipReportOnCommit) throws IOException { - long count = 0; + AtomicLong count = new AtomicLong(); if (shouldParseTime()) { try { - for (final TabletInsertionEvent event : toTabletInsertionEvents()) { - final PipeRawTabletInsertionEvent rawEvent = ((PipeRawTabletInsertionEvent) event); - count += rawEvent.count(); - if (skipReportOnCommit) { - rawEvent.skipReportOnCommit(); - } - } - return count; + consumeTabletInsertionEventsWithRetry( + event -> { + count.addAndGet(event.count()); + if (skipReportOnCommit) { + event.skipReportOnCommit(); + } + }, + "PipeTsFileInsertionEvent::count"); + return count.get(); } finally { close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 157c6e2103dad..dc5a7e4390f76 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -512,9 +512,26 @@ public void process( final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector eventCollector) throws Exception { try { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - process(tabletInsertionEvent, eventCollector); + if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { + final AtomicReference ex = new AtomicReference<>(); + ((PipeTsFileInsertionEvent) tsFileInsertionEvent) + .consumeTabletInsertionEventsWithRetry( + event -> { + try { + process(event, eventCollector); + } catch (Exception e) { + ex.set(e); + } + }, + "AggregateProcessor::process"); + if (ex.get() != null) { + throw ex.get(); + } + } else { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); + } } } finally { tsFileInsertionEvent.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index fd631772b930a..a8e0c270570bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.access.Row; @@ -45,7 +46,6 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY; public abstract class DownSamplingProcessor implements PipeProcessor { - protected long memoryLimitInBytes; protected boolean shouldSplitFile; @@ -149,9 +149,26 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev throws Exception { if (shouldSplitFile) { try { - for (final TabletInsertionEvent tabletInsertionEvent : - tsFileInsertionEvent.toTabletInsertionEvents()) { - process(tabletInsertionEvent, eventCollector); + if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { + final AtomicReference ex = new AtomicReference<>(); + ((PipeTsFileInsertionEvent) tsFileInsertionEvent) + .consumeTabletInsertionEventsWithRetry( + event -> { + try { + process(event, eventCollector); + } catch (Exception e) { + ex.set(e); + } + }, + "DownSamplingProcessor::process"); + if (ex.get() != null) { + throw ex.get(); + } + } else { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); + } } } finally { tsFileInsertionEvent.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 7b1a6caf61069..15c93d7a5e609 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -21,7 +21,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch; -import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; @@ -95,20 +94,22 @@ protected void onTabletInsertionEvent(final TabletInsertionEvent event) { protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) { // TODO: parse tsfile event on the fly like SubscriptionPipeTabletEventBatch try { - for (final TabletInsertionEvent parsedEvent : event.toTabletInsertionEvents()) { - if (!((PipeRawTabletInsertionEvent) parsedEvent) - .increaseReferenceCount(this.getClass().getName())) { - LOGGER.warn( - "SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.", - ((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage()); - } else { - try { - batch.onEvent(parsedEvent); - } catch (final Exception ignored) { - // no exceptions will be thrown - } - } - } + ((PipeTsFileInsertionEvent) event) + .consumeTabletInsertionEventsWithRetry( + event1 -> { + if (!event1.increaseReferenceCount(this.getClass().getName())) { + LOGGER.warn( + "SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.", + event1.coreReportMessage()); + } else { + try { + batch.onEvent(event1); + } catch (final Exception ignored) { + // no exceptions will be thrown + } + } + }, + "SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent"); } finally { try { event.close();