Skip to content

Commit d4605c4

Browse files
Pipe: Add retry for tablet batch req to avoid retransmission when memory is insufficient (#15715) (#15771)
1 parent 5ec14a0 commit d4605c4

1 file changed

Lines changed: 43 additions & 27 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -399,10 +399,10 @@ private TPipeTransferResp handleTransferTabletBatch(final PipeTransferTabletBatc
399399
Stream.of(
400400
statementPair.getLeft().isEmpty()
401401
? RpcUtils.SUCCESS_STATUS
402-
: executeStatementAndAddRedirectInfo(statementPair.getLeft()),
402+
: executeBatchStatementAndAddRedirectInfo(statementPair.getLeft()),
403403
statementPair.getRight().isEmpty()
404404
? RpcUtils.SUCCESS_STATUS
405-
: executeStatementAndAddRedirectInfo(statementPair.getRight()))
405+
: executeBatchStatementAndAddRedirectInfo(statementPair.getRight()))
406406
.collect(Collectors.toList())));
407407
}
408408

@@ -594,8 +594,8 @@ private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq pipeTra
594594
* request. So for each sub-status which needs to redirect, we record the device path using the
595595
* message field.
596596
*/
597-
private TSStatus executeStatementAndAddRedirectInfo(final InsertBaseStatement statement) {
598-
final TSStatus result = executeStatementAndClassifyExceptions(statement);
597+
private TSStatus executeBatchStatementAndAddRedirectInfo(final InsertBaseStatement statement) {
598+
final TSStatus result = executeStatementAndClassifyExceptions(statement, 5);
599599

600600
if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
601601
&& result.getSubStatusSize() > 0) {
@@ -631,18 +631,50 @@ private TSStatus executeStatementAndAddRedirectInfo(final InsertBaseStatement st
631631
}
632632

633633
private TSStatus executeStatementAndClassifyExceptions(final Statement statement) {
634+
return executeStatementAndClassifyExceptions(statement, 1);
635+
}
636+
637+
private TSStatus executeStatementAndClassifyExceptions(
638+
final Statement statement, final int tryCount) {
634639
long estimatedMemory = 0L;
635640
try {
636641
if (statement instanceof InsertBaseStatement) {
637642
estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
638-
allocatedMemoryBlock =
639-
PipeDataNodeResourceManager.memory()
640-
.forceAllocate(
641-
(long)
642-
(estimatedMemory
643-
* PipeConfig.getInstance()
644-
.getPipeReceiverActualToEstimatedMemoryRatio()));
643+
for (int i = 0; i < tryCount; ++i) {
644+
try {
645+
allocatedMemoryBlock =
646+
PipeDataNodeResourceManager.memory()
647+
.forceAllocate(
648+
(long)
649+
(estimatedMemory
650+
* PipeConfig.getInstance()
651+
.getPipeReceiverActualToEstimatedMemoryRatio()));
652+
break;
653+
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
654+
if (i == tryCount - 1) {
655+
final String message =
656+
String.format(
657+
"Temporarily out of memory when executing statement %s, Requested memory: %s, "
658+
+ "used memory: %s, free memory: %s, total non-floating memory: %s",
659+
statement,
660+
estimatedMemory
661+
* PipeConfig.getInstance().getPipeReceiverActualToEstimatedMemoryRatio(),
662+
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
663+
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
664+
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
665+
if (LOGGER.isDebugEnabled()) {
666+
LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
667+
}
668+
return new TSStatus(
669+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
670+
.setMessage(message);
671+
} else {
672+
Thread.sleep(100L * (i + 1));
673+
}
674+
}
675+
}
645676
}
677+
646678
final TSStatus result = executeStatementWithRetryOnDataTypeMismatch(statement);
647679
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
648680
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -655,22 +687,6 @@ private TSStatus executeStatementAndClassifyExceptions(final Statement statement
655687
result);
656688
return statement.accept(STATEMENT_STATUS_VISITOR, result);
657689
}
658-
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
659-
final String message =
660-
String.format(
661-
"Temporarily out of memory when executing statement %s, Requested memory: %s, "
662-
+ "used memory: %s, free memory: %s, total non-floating memory: %s",
663-
statement,
664-
estimatedMemory,
665-
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
666-
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
667-
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
668-
if (LOGGER.isDebugEnabled()) {
669-
LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
670-
}
671-
return new TSStatus(
672-
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
673-
.setMessage(message);
674690
} catch (final Exception e) {
675691
LOGGER.warn(
676692
"Receiver id = {}: Exception encountered while executing statement {}: ",

0 commit comments

Comments
 (0)