Skip to content

Commit 1398f58

Browse files
authored
HDDS-9819. Recon - Potential memory overflow in Container Health Task. (#5841)
1 parent b932e16 commit 1398f58

3 files changed

Lines changed: 45 additions & 15 deletions

File tree

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ public void init() throws Exception {
7272
taskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(15));
7373
conf.setFromObject(taskConfig);
7474

75-
conf.set("ozone.scm.stale.node.interval", "10s");
76-
conf.set("ozone.scm.dead.node.interval", "20s");
75+
conf.set("ozone.scm.stale.node.interval", "6s");
76+
conf.set("ozone.scm.dead.node.interval", "10s");
7777
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
7878
.includeRecon(true).build();
7979
cluster.waitForClusterToBeReady();
@@ -102,9 +102,6 @@ public void testSyncSCMContainerInfo() throws Exception {
102102
final ContainerInfo container2 = scmContainerManager.allocateContainer(
103103
RatisReplicationConfig.getInstance(
104104
HddsProtos.ReplicationFactor.ONE), "admin");
105-
reconContainerManager.allocateContainer(
106-
RatisReplicationConfig.getInstance(
107-
HddsProtos.ReplicationFactor.ONE), "admin");
108105
scmContainerManager.updateContainerState(container1.containerID(),
109106
HddsProtos.LifeCycleEvent.FINALIZE);
110107
scmContainerManager.updateContainerState(container2.containerID(),

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.slf4j.LoggerFactory;
5454

5555
import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT;
56+
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
5657
import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_KEYS;
5758
import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_USED_BYTES;
5859

@@ -65,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask {
6566

6667
private static final Logger LOG =
6768
LoggerFactory.getLogger(ContainerHealthTask.class);
69+
public static final int FETCH_COUNT = Integer.parseInt(DEFAULT_FETCH_COUNT);
6870

6971
private ReadWriteLock lock = new ReentrantReadWriteLock(true);
7072

@@ -131,8 +133,24 @@ public void triggerContainerHealthCheck() {
131133
LOG.info("Container Health task thread took {} milliseconds to" +
132134
" process {} existing database records.",
133135
Time.monotonicNow() - start, existingCount);
136+
137+
checkAndProcessContainers(unhealthyContainerStateStatsMap, currentTime);
138+
processedContainers.clear();
139+
} finally {
140+
lock.writeLock().unlock();
141+
}
142+
}
143+
144+
private void checkAndProcessContainers(
145+
Map<UnHealthyContainerStates, Map<String, Long>>
146+
unhealthyContainerStateStatsMap, long currentTime) {
147+
ContainerID startID = ContainerID.valueOf(1);
148+
List<ContainerInfo> containers = containerManager.getContainers(startID,
149+
FETCH_COUNT);
150+
long start;
151+
long iterationCount = 0;
152+
while (!containers.isEmpty()) {
134153
start = Time.monotonicNow();
135-
final List<ContainerInfo> containers = containerManager.getContainers();
136154
containers.stream()
137155
.filter(c -> !processedContainers.contains(c))
138156
.forEach(c -> processContainer(c, currentTime,
@@ -142,10 +160,19 @@ public void triggerContainerHealthCheck() {
142160
" processing {} containers.", Time.monotonicNow() - start,
143161
containers.size());
144162
logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
145-
processedContainers.clear();
146-
} finally {
147-
lock.writeLock().unlock();
163+
if (containers.size() >= FETCH_COUNT) {
164+
startID = ContainerID.valueOf(
165+
containers.get(containers.size() - 1).getContainerID() + 1);
166+
containers = containerManager.getContainers(startID, FETCH_COUNT);
167+
} else {
168+
containers.clear();
169+
}
170+
iterationCount++;
148171
}
172+
LOG.info(
173+
"Container Health task thread took {} iterations to fetch all " +
174+
"containers using batched approach with batch size of {}",
175+
iterationCount, FETCH_COUNT);
149176
}
150177

151178
private void logUnhealthyContainerStats(

hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
package org.apache.hadoop.ozone.recon.fsck;
2020

21-
import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
2221
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_UNHEALTHY;
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertNotNull;
25+
import static org.mockito.ArgumentMatchers.any;
26+
import static org.mockito.ArgumentMatchers.anyInt;
2527
import static org.mockito.Mockito.mock;
2628
import static org.mockito.Mockito.when;
2729

@@ -96,7 +98,8 @@ public void testRun() throws Exception {
9698
List<ContainerInfo> mockContainers = getMockContainers(7);
9799
when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
98100
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
99-
when(containerManagerMock.getContainers()).thenReturn(mockContainers);
101+
when(containerManagerMock.getContainers(any(ContainerID.class),
102+
anyInt())).thenReturn(mockContainers);
100103
for (ContainerInfo c : mockContainers) {
101104
when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
102105
when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
@@ -151,7 +154,7 @@ public void testRun() throws Exception {
151154
reconTaskStatusDao, containerHealthSchemaManager,
152155
placementMock, reconTaskConfig, reconContainerMetadataManager);
153156
containerHealthTask.start();
154-
LambdaTestUtils.await(6000, 1000, () ->
157+
LambdaTestUtils.await(60000, 1000, () ->
155158
(unHealthyContainersTableHandle.count() == 6));
156159
UnhealthyContainers rec =
157160
unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
@@ -192,7 +195,8 @@ public void testRun() throws Exception {
192195

193196
ReconTaskStatus taskStatus =
194197
reconTaskStatusDao.findById(containerHealthTask.getTaskName());
195-
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
198+
assertThat(taskStatus.getLastUpdatedTimestamp())
199+
.isGreaterThan(currentTime);
196200

197201
// Now run the job again, to check that relevant records are updated or
198202
// removed as appropriate. Need to adjust the return value for all the mocks
@@ -267,7 +271,8 @@ public void testDeletedContainer() throws Exception {
267271
List<ContainerInfo> mockContainers = getMockContainers(3);
268272
when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
269273
when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
270-
when(containerManagerMock.getContainers()).thenReturn(mockContainers);
274+
when(containerManagerMock.getContainers(any(ContainerID.class),
275+
anyInt())).thenReturn(mockContainers);
271276
for (ContainerInfo c : mockContainers) {
272277
when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
273278
when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
@@ -327,7 +332,8 @@ public void testDeletedContainer() throws Exception {
327332

328333
ReconTaskStatus taskStatus =
329334
reconTaskStatusDao.findById(containerHealthTask.getTaskName());
330-
assertThat(taskStatus.getLastUpdatedTimestamp()).isGreaterThan(currentTime);
335+
assertThat(taskStatus.getLastUpdatedTimestamp())
336+
.isGreaterThan(currentTime);
331337
}
332338

333339
private Set<ContainerReplica> getMockReplicas(

0 commit comments

Comments
 (0)