diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index e223686c1cd15..98b7dee910584 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -21,6 +21,9 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; @@ -34,6 +37,7 @@ import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.consensus.pipe.PipeConsensus; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource; @@ -304,7 +308,12 @@ public void customize( pipeName = environment.getPipeName(); creationTime = environment.getCreationTime(); pipeTaskMeta = environment.getPipeTaskMeta(); - startIndex = environment.getPipeTaskMeta().getProgressIndex(); + if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + startIndex = + tryToExtractLocalProgressIndexForIoTV2(environment.getPipeTaskMeta().getProgressIndex()); + } else { + startIndex = environment.getPipeTaskMeta().getProgressIndex(); + } dataRegionId = environment.getRegionId(); synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { @@ -426,6 +435,61 @@ private void flushDataRegionAllTsFiles() { } } + /** + * IoTV2 will only resend event that contains un-replicated local write data. So we only extract + * ProgressIndex containing local writes for comparison to prevent misjudgment on whether + * high-level tsFiles with mixed progressIndexes need to be retransmitted + * + * @return recoverProgressIndex dedicated in local DataNodeId or origin for fallback. + */ + private ProgressIndex tryToExtractLocalProgressIndexForIoTV2(ProgressIndex origin) { + // There are only 2 cases: + // 1. origin is RecoverProgressIndex + if (origin instanceof RecoverProgressIndex) { + RecoverProgressIndex toBeTransformed = (RecoverProgressIndex) origin; + return extractRecoverProgressIndex(toBeTransformed); + } + // 2. origin is HybridProgressIndex + else if (origin instanceof HybridProgressIndex) { + HybridProgressIndex toBeTransformed = (HybridProgressIndex) origin; + // if hybridProgressIndex contains recoverProgressIndex, which is what we expected. + if (toBeTransformed + .getType2Index() + .containsKey(ProgressIndexType.RECOVER_PROGRESS_INDEX.getType())) { + // 2.1. transform recoverProgressIndex + RecoverProgressIndex specificToBeTransformed = + (RecoverProgressIndex) + toBeTransformed + .getType2Index() + .get(ProgressIndexType.RECOVER_PROGRESS_INDEX.getType()); + return extractRecoverProgressIndex(specificToBeTransformed); + } + // if hybridProgressIndex doesn't contain recoverProgressIndex, which is not what we expected, + // fallback. + return origin; + } else { + // fallback + LOGGER.warn( + "Pipe {}@{}: unexpected ProgressIndex type {}, fallback to origin {}.", + pipeName, + dataRegionId, + origin.getType(), + origin); + return origin; + } + } + + private ProgressIndex extractRecoverProgressIndex(RecoverProgressIndex toBeTransformed) { + return new RecoverProgressIndex( + toBeTransformed.getDataNodeId2LocalIndex().entrySet().stream() + .filter( + entry -> + entry + .getKey() + .equals(IoTDBDescriptor.getInstance().getConfig().getDataNodeId())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + @Override public synchronized void start() { if (!shouldExtractInsertion) { @@ -617,8 +681,19 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) { if (startIndex instanceof StateProgressIndex) { startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex(); } - return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) - && !startIndex.equals(resource.getMaxProgressIndexAfterClose()); + + if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + // For consensus pipe, we only focus on the progressIndex that is generated from local write + // instead of replication or something else. + ProgressIndex dedicatedProgressIndex = + tryToExtractLocalProgressIndexForIoTV2(resource.getMaxProgressIndexAfterClose()); + return greaterThanStartIndex(dedicatedProgressIndex); + } + return greaterThanStartIndex(resource.getMaxProgressIndexAfterClose()); + } + + private boolean greaterThanStartIndex(ProgressIndex progressIndex) { + return !startIndex.isAfter(progressIndex) && !startIndex.equals(progressIndex); } private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) { @@ -712,12 +787,26 @@ private void extractDeletions( // For deletions that are filtered and will not be sent, we should manually decrease its // reference count. Because the initial value of referenceCount is `ReplicaNum - 1` allDeletionResources.stream() - .filter(resource -> startIndex.isAfter(resource.getProgressIndex())) + .filter( + resource -> { + ProgressIndex toBeCompared = resource.getProgressIndex(); + if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + toBeCompared = tryToExtractLocalProgressIndexForIoTV2(toBeCompared); + } + return !greaterThanStartIndex(toBeCompared); + }) .forEach(DeletionResource::decreaseReference); // Get deletions that should be sent. allDeletionResources = allDeletionResources.stream() - .filter(resource -> !startIndex.isAfter(resource.getProgressIndex())) + .filter( + resource -> { + ProgressIndex toBeCompared = resource.getProgressIndex(); + if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + toBeCompared = tryToExtractLocalProgressIndexForIoTV2(toBeCompared); + } + return greaterThanStartIndex(toBeCompared); + }) .collect(Collectors.toList()); resourceList.addAll(allDeletionResources); LOGGER.info(