Skip to content

Commit 5c7cb1e

Browse files
Pipe: Fixed multiple bugs (#15674) (#15683)
1. Doubled the TCP connection num 2. Made the memory for wal and batch fixed 3. Changed the default batch time to 20ms 4. Disabled the batch load for wal cache 5. Separated the tsFile and insertNode client --------- Co-authored-by: luoluoyuyu <zhenyu@apache.org>
1 parent c58680b commit 5c7cb1e

14 files changed

Lines changed: 109 additions & 148 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ public class IoTDBConfig {
151151

152152
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
153153

154+
private long allocateMemoryPerWalCache = 2 * 1024 * 1024;
155+
154156
/** Flush proportion for system */
155157
private double flushProportion = 0.4;
156158

@@ -2002,6 +2004,14 @@ public void setWriteMemoryVariationReportProportion(double writeMemoryVariationR
20022004
this.writeMemoryVariationReportProportion = writeMemoryVariationReportProportion;
20032005
}
20042006

2007+
public long getAllocateMemoryPerWalCache() {
2008+
return allocateMemoryPerWalCache;
2009+
}
2010+
2011+
public void setAllocateMemoryPerWalCache(final long allocateMemoryForWalCache) {
2012+
this.allocateMemoryPerWalCache = allocateMemoryForWalCache;
2013+
}
2014+
20052015
public boolean isEnablePartialInsert() {
20062016
return enablePartialInsert;
20072017
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

100755100644
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2458,6 +2458,12 @@ private void loadPipeProps(TrimProperties properties) {
24582458
conf.setIotConsensusV2DeletionFileDir(
24592459
properties.getProperty(
24602460
"iot_consensus_v2_deletion_file_dir", conf.getIotConsensusV2DeletionFileDir()));
2461+
2462+
conf.setAllocateMemoryPerWalCache(
2463+
Long.parseLong(
2464+
properties.getProperty(
2465+
"allocate_memory_per_wal_cache",
2466+
Long.toString(conf.getAllocateMemoryPerWalCache()))));
24612467
}
24622468

24632469
private void loadCQProps(TrimProperties properties) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ public IoTDBDataNodeAsyncClientManager(
116116
receiverAttributes,
117117
new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>()
118118
.createClientManager(
119-
new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
119+
isTSFileUsed
120+
? new ClientPoolFactory
121+
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
122+
: new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
120123
}
121124
endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
122125

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2323
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
2424
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
25-
import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
2625
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
2726
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
2827
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -48,7 +47,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable {
4847
private long firstEventProcessingTime = Long.MIN_VALUE;
4948

5049
protected long totalBufferSize = 0;
51-
private final PipeDynamicMemoryBlock allocatedMemoryBlock;
50+
private final PipeModelFixedMemoryBlock allocatedMemoryBlock;
5251

5352
protected volatile boolean isClosed = false;
5453

@@ -61,8 +60,10 @@ protected PipeTabletEventBatch(final int maxDelayInMs, final long requestMaxBatc
6160

6261
// limit in buffer size
6362
this.allocatedMemoryBlock =
64-
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
65-
allocatedMemoryBlock.setExpandable(false);
63+
pipeModelFixedMemoryBlock =
64+
PipeDataNodeResourceManager.memory()
65+
.forceAllocateForModelFixedMemoryBlock(
66+
requestMaxBatchSizeInBytes, PipeMemoryBlockType.BATCH);
6667

6768
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
6869
LOGGER.info(
@@ -127,12 +128,8 @@ protected abstract boolean constructBatch(final TabletInsertionEvent event)
127128
throws WALPipeException, IOException;
128129

129130
public boolean shouldEmit() {
130-
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
131-
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
132-
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs);
133-
return true;
134-
}
135-
return false;
131+
return totalBufferSize >= getMaxBatchSizeInBytes()
132+
|| System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs;
136133
}
137134

138135
private long getMaxBatchSizeInBytes() {
@@ -200,9 +197,7 @@ public static void init() {
200197
try {
201198
pipeModelFixedMemoryBlock =
202199
PipeDataNodeResourceManager.memory()
203-
.forceAllocateForModelFixedMemoryBlock(
204-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
205-
PipeMemoryBlockType.BATCH);
200+
.forceAllocateForModelFixedMemoryBlock(0L, PipeMemoryBlockType.BATCH);
206201
} catch (Exception e) {
207202
LOGGER.error("init pipe model fixed memory block failed", e);
208203
// If the allocation fails, we still need to create a default memory block to avoid NPE.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
168168
// 5. Data inserted in the step2 is not captured by PipeB, and if its tsfile
169169
// epoch's state is USING_TABLET, the tsfile event will be ignored, which
170170
// will cause the data loss in the tsfile epoch.
171+
LOGGER.info(
172+
"The tsFile {}'s epoch's start time {} is smaller than the captured insertNodes' min time {}, will regard it as data loss or un-sequential, will extract the tsFile",
173+
((PipeTsFileInsertionEvent) event.getEvent()).getTsFile(),
174+
((PipeTsFileInsertionEvent) event.getEvent()).getFileStartTime(),
175+
event.getTsFileEpoch().getInsertNodeMinTime());
171176
return TsFileEpoch.State.USING_BOTH;
172177
} else {
173178
// All data in the tsfile epoch has been extracted in tablet mode, so we should

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ void decreaseHeartbeatEventCount() {
105105
}
106106

107107
double getRemainingInsertEventSmoothingCount() {
108+
if (PipeConfig.getInstance().getPipeRemainingInsertNodeCountAverage() == PipeRateAverage.NONE) {
109+
return insertNodeEventCount.get();
110+
}
108111
if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
109112
>= PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds()) {
110113
insertNodeEventCountMeter.mark(insertNodeEventCount.get());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java

Lines changed: 9 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
2828
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
2929
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
30-
import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
3130
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
3231
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
3332
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -49,13 +48,11 @@
4948

5049
import java.io.IOException;
5150
import java.nio.ByteBuffer;
52-
import java.util.Collections;
5351
import java.util.HashMap;
5452
import java.util.Map;
5553
import java.util.Objects;
5654
import java.util.Set;
5755
import java.util.concurrent.ConcurrentHashMap;
58-
import java.util.concurrent.atomic.AtomicBoolean;
5956

6057
/** This cache is used by {@link WALEntryPosition}. */
6158
public class WALInsertNodeCache {
@@ -68,11 +65,10 @@ public class WALInsertNodeCache {
6865

6966
private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
7067

71-
private final PipeDynamicMemoryBlock memoryBlock;
68+
private final PipeModelFixedMemoryBlock memoryBlock;
7269

7370
// Used to adjust the memory usage of the cache
7471
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
75-
private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
7672
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
7773
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> lruCache;
7874

@@ -86,19 +82,12 @@ private WALInsertNodeCache(final Integer dataRegionId) {
8682
init();
8783
}
8884

89-
final long requestedAllocateSize =
90-
(long)
91-
Math.min(
92-
1.0
93-
* PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
94-
* CONFIG.getWalFileSizeThresholdInByte()
95-
/ CONFIG.getDataRegionNum(),
96-
0.5
97-
* MEMORY_CONFIG.getPipeMemoryManager().getTotalMemorySizeInBytes()
98-
/ CONFIG.getDataRegionNum());
99-
memoryBlock = walModelFixedMemory.registerPipeBatchMemoryBlock(requestedAllocateSize);
100-
isBatchLoadEnabled.set(
101-
memoryBlock.getMemoryUsageInBytes() >= CONFIG.getWalFileSizeThresholdInByte());
85+
final long requestedAllocateSize = CONFIG.getAllocateMemoryPerWalCache();
86+
87+
memoryBlock =
88+
PipeDataNodeResourceManager.memory()
89+
.forceAllocateForModelFixedMemoryBlock(requestedAllocateSize, PipeMemoryBlockType.WAL);
90+
10291
lruCache =
10392
Caffeine.newBuilder()
10493
.maximumWeight(requestedAllocateSize)
@@ -123,58 +112,9 @@ private WALInsertNodeCache(final Integer dataRegionId) {
123112
.recordStats()
124113
.build(new WALInsertNodeCacheLoader());
125114

126-
memoryBlock.setExpandable(true);
127-
memoryBlock.setExpand(
128-
memoryBlock -> {
129-
final long oldMemory = memoryBlock.getMemoryUsageInBytes();
130-
memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate());
131-
final long newMemory = memoryBlock.getMemoryUsageInBytes();
132-
if (newMemory > oldMemory) {
133-
setExpandCallback(oldMemory, newMemory, dataRegionId);
134-
} else if (newMemory < oldMemory) {
135-
shrinkCallback(oldMemory, newMemory, dataRegionId);
136-
}
137-
});
138115
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
139116
}
140117

141-
private void setExpandCallback(long oldMemory, long newMemory, Integer dataRegionId) {
142-
memoryUsageCheatFactor.updateAndGet(
143-
factor ->
144-
factor == 0L || newMemory == 0L || oldMemory == 0
145-
? 0.0
146-
: factor / ((double) newMemory / oldMemory));
147-
isBatchLoadEnabled.set(newMemory >= CONFIG.getWalFileSizeThresholdInByte());
148-
LOGGER.info(
149-
"WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded from {} to {}.",
150-
dataRegionId,
151-
oldMemory,
152-
newMemory);
153-
}
154-
155-
private void shrinkCallback(long oldMemory, long newMemory, Integer dataRegionId) {
156-
memoryUsageCheatFactor.updateAndGet(
157-
factor ->
158-
factor == 0L || newMemory == 0L || oldMemory == 0
159-
? 0.0
160-
: factor * ((double) oldMemory / newMemory));
161-
isBatchLoadEnabled.set(newMemory >= CONFIG.getWalFileSizeThresholdInByte());
162-
LOGGER.info(
163-
"WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk from {} to {}.",
164-
dataRegionId,
165-
oldMemory,
166-
newMemory);
167-
if (CONFIG.getWALCacheShrinkClearEnabled()) {
168-
try {
169-
lruCache.cleanUp();
170-
} catch (Exception e) {
171-
LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID: {}.", dataRegionId, e);
172-
return;
173-
}
174-
LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID: {}.", dataRegionId);
175-
}
176-
}
177-
178118
// please call this method at PipeLauncher
179119
public static void init() {
180120
if (walModelFixedMemory != null) {
@@ -184,9 +124,7 @@ public static void init() {
184124
// Allocate memory for the fixed memory block of WAL
185125
walModelFixedMemory =
186126
PipeDataNodeResourceManager.memory()
187-
.forceAllocateForModelFixedMemoryBlock(
188-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
189-
PipeMemoryBlockType.WAL);
127+
.forceAllocateForModelFixedMemoryBlock(0L, PipeMemoryBlockType.WAL);
190128
} catch (Exception e) {
191129
LOGGER.error("Failed to initialize WAL model fixed memory block", e);
192130
walModelFixedMemory =
@@ -259,10 +197,7 @@ public ByteBuffer getByteBuffer(final WALEntryPosition position) {
259197
public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNode(final WALEntryPosition position) {
260198
hasPipeRunning = true;
261199

262-
final Pair<ByteBuffer, InsertNode> pair =
263-
isBatchLoadEnabled.get()
264-
? lruCache.getAll(Collections.singleton(position)).get(position)
265-
: lruCache.get(position);
200+
final Pair<ByteBuffer, InsertNode> pair = lruCache.get(position);
266201

267202
if (pair == null) {
268203
throw new IllegalStateException();
@@ -402,16 +337,6 @@ private InstanceHolder() {
402337

403338
/////////////////////////// Test Only ///////////////////////////
404339

405-
@TestOnly
406-
public boolean isBatchLoadEnabled() {
407-
return isBatchLoadEnabled.get();
408-
}
409-
410-
@TestOnly
411-
public void setIsBatchLoadEnabled(final boolean isBatchLoadEnabled) {
412-
this.isBatchLoadEnabled.set(isBatchLoadEnabled);
413-
}
414-
415340
@TestOnly
416341
boolean contains(WALEntryPosition position) {
417342
return lruCache.getIfPresent(position) != null;

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646

4747
import static org.junit.Assert.assertEquals;
4848
import static org.junit.Assert.assertFalse;
49-
import static org.junit.Assert.assertTrue;
5049

5150
public class WALInsertNodeCacheTest {
5251
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -152,53 +151,6 @@ public void testLoadUnsealedWALFile() throws Exception {
152151
assertEquals(node1, cache.getInsertNode(position));
153152
}
154153

155-
@Test
156-
public void testBatchLoad() throws Exception {
157-
// Enable batch load
158-
boolean oldIsBatchLoadEnabled = cache.isBatchLoadEnabled();
159-
cache.setIsBatchLoadEnabled(true);
160-
WALInsertNodeCache localC = cache;
161-
try {
162-
// write memTable1
163-
IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
164-
walNode.onMemTableCreated(memTable1, logDirectory + "/" + "fake1.tsfile");
165-
InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
166-
node1.setSearchIndex(1);
167-
WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), node1);
168-
WALEntryPosition position1 = flushListener1.getWalEntryHandler().getWalEntryPosition();
169-
InsertRowNode node2 = getInsertRowNode(System.currentTimeMillis());
170-
node1.setSearchIndex(2);
171-
WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), node2);
172-
WALEntryPosition position2 = flushListener2.getWalEntryHandler().getWalEntryPosition();
173-
// write memTable2
174-
IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
175-
walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
176-
InsertRowNode node3 = getInsertRowNode(System.currentTimeMillis());
177-
node1.setSearchIndex(3);
178-
WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), node3);
179-
WALEntryPosition position3 = flushListener3.getWalEntryHandler().getWalEntryPosition();
180-
// wait until wal flushed
181-
walNode.rollWALFile();
182-
Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() && position3.canRead());
183-
// check batch load memTable1
184-
cache.clear();
185-
cache.addMemTable(memTable1.getMemTableId());
186-
assertEquals(node1, cache.getInsertNode(position1));
187-
assertTrue(cache.contains(position1));
188-
assertTrue(cache.contains(position2));
189-
assertFalse(cache.contains(position3));
190-
// check batch load none
191-
cache.removeMemTable(memTable1.getMemTableId());
192-
cache.clear();
193-
assertEquals(node1, cache.getInsertNode(position1));
194-
assertTrue(cache.contains(position1));
195-
assertFalse(cache.contains(position2));
196-
assertFalse(cache.contains(position3));
197-
} finally {
198-
WALInsertNodeCache.getInstance(1).setIsBatchLoadEnabled(oldIsBatchLoadEnabled);
199-
}
200-
}
201-
202154
private InsertRowNode getInsertRowNode(long time) throws IllegalPathException {
203155
TSDataType[] dataTypes =
204156
new TSDataType[] {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,33 @@ public GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> cre
298298
}
299299
}
300300

301+
public static class AsyncPipeTsFileDataTransferServiceClientPoolFactory
302+
implements IClientPoolFactory<TEndPoint, AsyncPipeDataTransferServiceClient> {
303+
@Override
304+
public GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool(
305+
ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
306+
final GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> clientPool =
307+
new GenericKeyedObjectPool<>(
308+
new AsyncPipeDataTransferServiceClient.Factory(
309+
manager,
310+
new ThriftClientProperty.Builder()
311+
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
312+
.setRpcThriftCompressionEnabled(
313+
conf.isPipeConnectorRPCThriftCompressionEnabled())
314+
.setSelectorNumOfAsyncClientManager(
315+
conf.getPipeAsyncConnectorSelectorNumber())
316+
.build(),
317+
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
318+
new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
319+
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
320+
.build()
321+
.getConfig());
322+
ClientManagerMetrics.getInstance()
323+
.registerClientManager(this.getClass().getSimpleName(), clientPool);
324+
return clientPool;
325+
}
326+
}
327+
301328
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
302329
implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
303330
@Override

0 commit comments

Comments
 (0)