Skip to content

Commit efbf79c

Browse files
authored
HDDS-12236. ContainerStateMachine should not apply or write future transactions in the event of failure (#7862)
1 parent 3f88dbe commit efbf79c

File tree

4 files changed

+345
-0
lines changed

4 files changed

+345
-0
lines changed

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Map;
3838
import java.util.Objects;
3939
import java.util.Optional;
40+
import java.util.Set;
4041
import java.util.concurrent.CompletableFuture;
4142
import java.util.concurrent.CompletionException;
4243
import java.util.concurrent.ConcurrentHashMap;
@@ -200,6 +201,7 @@ long getStartTime() {
200201
private final ExecutorService executor;
201202
private final List<ThreadPoolExecutor> chunkExecutors;
202203
private final Map<Long, Long> applyTransactionCompletionMap;
204+
private final Set<Long> unhealthyContainers;
203205
private final Cache<Long, ByteString> stateMachineDataCache;
204206
private final AtomicBoolean stateMachineHealthy;
205207

@@ -229,6 +231,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
229231
metrics = CSMMetrics.create(gid);
230232
this.writeChunkFutureMap = new ConcurrentHashMap<>();
231233
applyTransactionCompletionMap = new ConcurrentHashMap<>();
234+
this.unhealthyContainers = ConcurrentHashMap.newKeySet();
232235
long pendingRequestsBytesLimit = (long)conf.getStorageSize(
233236
OzoneConfigKeys.HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT,
234237
OzoneConfigKeys.HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT,
@@ -363,6 +366,17 @@ public boolean isStateMachineHealthy() {
363366
return stateMachineHealthy.get();
364367
}
365368

369+
private void checkContainerHealthy(long containerId, boolean skipContainerUnhealthyCheck)
370+
throws StorageContainerException {
371+
if (!isStateMachineHealthy() && unhealthyContainers.contains(containerId)) {
372+
throw new StorageContainerException(String.format("Prev writes to container %d failed, stopping all writes to " +
373+
"container", containerId), ContainerProtos.Result.CONTAINER_UNHEALTHY);
374+
} else if (!isStateMachineHealthy() && skipContainerUnhealthyCheck) {
375+
throw new StorageContainerException(String.format("Prev writes to containers %s failed, stopping all writes to " +
376+
"container", unhealthyContainers.toString()), ContainerProtos.Result.CONTAINER_UNHEALTHY);
377+
}
378+
}
379+
366380
@Override
367381
public long takeSnapshot() throws IOException {
368382
TermIndex ti = getLastAppliedTermIndex();
@@ -555,6 +569,11 @@ private CompletableFuture<Message> writeStateMachineData(
555569
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
556570
CompletableFuture.supplyAsync(() -> {
557571
try {
572+
try {
573+
checkContainerHealthy(write.getBlockID().getContainerID(), true);
574+
} catch (StorageContainerException e) {
575+
return ContainerUtils.logAndReturnError(LOG, e, requestProto);
576+
}
558577
metrics.recordWriteStateMachineQueueingLatencyNs(
559578
Time.monotonicNowNanos() - startTime);
560579
return dispatchCommand(requestProto, context);
@@ -565,6 +584,7 @@ private CompletableFuture<Message> writeStateMachineData(
565584
metrics.incNumWriteDataFails();
566585
// write chunks go in parallel. It's possible that one write chunk
567586
// see the stateMachine is marked unhealthy by other parallel thread
587+
unhealthyContainers.add(write.getBlockID().getContainerID());
568588
stateMachineHealthy.set(false);
569589
raftFuture.completeExceptionally(e);
570590
throw e;
@@ -597,6 +617,7 @@ private CompletableFuture<Message> writeStateMachineData(
597617
// This leads to pipeline close. Any change in that behavior requires
598618
// handling the entry for the write chunk in cache.
599619
stateMachineHealthy.set(false);
620+
unhealthyContainers.add(write.getBlockID().getContainerID());
600621
raftFuture.completeExceptionally(sce);
601622
} else {
602623
metrics.incNumBytesWrittenCount(
@@ -764,6 +785,7 @@ private ByteString readStateMachineData(
764785
+ "{} Container Result: {}", getGroupId(), response.getCmdType(), index,
765786
response.getMessage(), response.getResult());
766787
stateMachineHealthy.set(false);
788+
unhealthyContainers.add(requestProto.getContainerID());
767789
throw sce;
768790
}
769791

@@ -946,6 +968,7 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
946968
try {
947969
try {
948970
this.validatePeers();
971+
this.checkContainerHealthy(containerId, false);
949972
} catch (StorageContainerException e) {
950973
return ContainerUtils.logAndReturnError(LOG, e, request);
951974
}
@@ -1032,6 +1055,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
10321055
LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + index
10331056
+ " for " + requestProto.getCmdType(), e);
10341057
stateMachineHealthy.compareAndSet(true, false);
1058+
unhealthyContainers.add(requestProto.getContainerID());
10351059
metrics.incNumApplyTransactionsFails();
10361060
applyTransactionFuture.completeExceptionally(e);
10371061
};
@@ -1066,6 +1090,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
10661090
// shutdown.
10671091
applyTransactionFuture.completeExceptionally(sce);
10681092
stateMachineHealthy.compareAndSet(true, false);
1093+
unhealthyContainers.add(requestProto.getContainerID());
10691094
ratisServer.handleApplyTransactionFailure(getGroupId(), trx.getServerRole());
10701095
} else {
10711096
if (LOG.isDebugEnabled()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
22+
import static org.junit.jupiter.api.Assertions.assertNotNull;
23+
import static org.junit.jupiter.api.Assertions.assertNull;
24+
import static org.mockito.ArgumentMatchers.any;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.reset;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.when;
30+
31+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
32+
import java.io.IOException;
33+
import java.util.List;
34+
import java.util.UUID;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.concurrent.ThreadPoolExecutor;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicReference;
40+
import java.util.function.Function;
41+
import java.util.stream.Collectors;
42+
import java.util.stream.IntStream;
43+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
44+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
45+
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
46+
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
47+
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
48+
import org.apache.ratis.proto.RaftProtos;
49+
import org.apache.ratis.protocol.Message;
50+
import org.apache.ratis.protocol.RaftGroup;
51+
import org.apache.ratis.protocol.RaftGroupId;
52+
import org.apache.ratis.protocol.RaftPeer;
53+
import org.apache.ratis.server.DivisionInfo;
54+
import org.apache.ratis.server.RaftServer;
55+
import org.apache.ratis.statemachine.TransactionContext;
56+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
57+
import org.junit.jupiter.api.AfterAll;
58+
import org.junit.jupiter.api.AfterEach;
59+
import org.junit.jupiter.api.BeforeEach;
60+
import org.junit.jupiter.api.TestInstance;
61+
import org.junit.jupiter.params.ParameterizedTest;
62+
import org.junit.jupiter.params.provider.ValueSource;
63+
64+
/*
65+
* Licensed to the Apache Software Foundation (ASF) under one or more
66+
* contributor license agreements. See the NOTICE file distributed with
67+
* this work for additional information regarding copyright ownership.
68+
* The ASF licenses this file to You under the Apache License, Version 2.0
69+
* (the "License"); you may not use this file except in compliance with
70+
* the License. You may obtain a copy of the License at
71+
*
72+
* http://www.apache.org/licenses/LICENSE-2.0
73+
*
74+
* Unless required by applicable law or agreed to in writing, software
75+
* distributed under the License is distributed on an "AS IS" BASIS,
76+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
77+
* See the License for the specific language governing permissions and
78+
* limitations under the License.
79+
*/
80+
81+
/**
82+
* Test class to ContainerStateMachine class.
83+
*/
84+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
85+
abstract class TestContainerStateMachine {
86+
private ContainerDispatcher dispatcher;
87+
private final OzoneConfiguration conf = new OzoneConfiguration();
88+
private ContainerStateMachine stateMachine;
89+
private final List<ThreadPoolExecutor> executor = IntStream.range(0, 2).mapToObj(i -> new ThreadPoolExecutor(1, 1,
90+
0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder()
91+
.setDaemon(true)
92+
.setNameFormat("ChunkWriter-" + i + "-%d")
93+
.build())).collect(Collectors.toList());
94+
private final boolean isLeader;
95+
96+
TestContainerStateMachine(boolean isLeader) {
97+
this.isLeader = isLeader;
98+
}
99+
100+
@BeforeEach
101+
public void setup() throws IOException {
102+
dispatcher = mock(ContainerDispatcher.class);
103+
ContainerController controller = mock(ContainerController.class);
104+
XceiverServerRatis ratisServer = mock(XceiverServerRatis.class);
105+
RaftServer raftServer = mock(RaftServer.class);
106+
RaftServer.Division division = mock(RaftServer.Division.class);
107+
RaftGroup raftGroup = mock(RaftGroup.class);
108+
DivisionInfo info = mock(DivisionInfo.class);
109+
RaftPeer raftPeer = mock(RaftPeer.class);
110+
when(ratisServer.getServer()).thenReturn(raftServer);
111+
when(raftServer.getDivision(any())).thenReturn(division);
112+
when(division.getGroup()).thenReturn(raftGroup);
113+
when(raftGroup.getPeer(any())).thenReturn(raftPeer);
114+
when(division.getInfo()).thenReturn(info);
115+
when(info.isLeader()).thenReturn(isLeader);
116+
when(ratisServer.getServerDivision(any())).thenReturn(division);
117+
stateMachine = new ContainerStateMachine(null,
118+
RaftGroupId.randomId(), dispatcher, controller, executor, ratisServer, conf, "containerOp");
119+
}
120+
121+
122+
@AfterEach
123+
public void teardown() {
124+
stateMachine.close();
125+
}
126+
127+
128+
@AfterAll
129+
public void shutdown() {
130+
executor.forEach(ThreadPoolExecutor::shutdown);
131+
}
132+
133+
@ParameterizedTest
134+
@ValueSource(booleans = {true, false})
135+
public void testWriteFailure(boolean failWithException) throws ExecutionException, InterruptedException {
136+
RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class);
137+
when(entry.getTerm()).thenReturn(1L);
138+
when(entry.getIndex()).thenReturn(1L);
139+
TransactionContext trx = mock(TransactionContext.class);
140+
ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class);
141+
when(trx.getStateMachineContext()).thenReturn(context);
142+
if (failWithException) {
143+
when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException());
144+
} else {
145+
when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
146+
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
147+
.setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
148+
.build());
149+
}
150+
151+
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
152+
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
153+
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
154+
.setBlockID(
155+
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
156+
.setContainerID(1)
157+
.setDatanodeUuid(UUID.randomUUID().toString()).build());
158+
AtomicReference<Throwable> throwable = new AtomicReference<>(null);
159+
Function<Throwable, ? extends Message> throwableSetter = t -> {
160+
throwable.set(t);
161+
return null;
162+
};
163+
stateMachine.write(entry, trx).exceptionally(throwableSetter).get();
164+
verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
165+
any(DispatcherContext.class));
166+
reset(dispatcher);
167+
assertNotNull(throwable.get());
168+
if (failWithException) {
169+
assertInstanceOf(RuntimeException.class, throwable.get());
170+
} else {
171+
assertInstanceOf(StorageContainerException.class, throwable.get());
172+
StorageContainerException sce = (StorageContainerException) throwable.get();
173+
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
174+
}
175+
// Writing data to another container(containerId 2) should also fail.
176+
when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
177+
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
178+
ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
179+
.setBlockID(
180+
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build())
181+
.setContainerID(2)
182+
.setDatanodeUuid(UUID.randomUUID().toString()).build());
183+
stateMachine.write(entry, trx).exceptionally(throwableSetter).get();
184+
verify(dispatcher, times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
185+
any(DispatcherContext.class));
186+
assertInstanceOf(StorageContainerException.class, throwable.get());
187+
StorageContainerException sce = (StorageContainerException) throwable.get();
188+
assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());
189+
}
190+
191+
@ParameterizedTest
192+
@ValueSource(booleans = {true, false})
193+
public void testApplyTransactionFailure(boolean failWithException) throws ExecutionException,
194+
InterruptedException, IOException {
195+
RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class);
196+
when(entry.getTerm()).thenReturn(1L);
197+
when(entry.getIndex()).thenReturn(1L);
198+
TransactionContext trx = mock(TransactionContext.class);
199+
ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class);
200+
when(trx.getLogEntry()).thenReturn(entry);
201+
when(trx.getStateMachineContext()).thenReturn(context);
202+
if (failWithException) {
203+
when(dispatcher.dispatch(any(), any())).thenThrow(new RuntimeException());
204+
} else {
205+
when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
206+
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk)
207+
.setResult(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR)
208+
.build());
209+
}
210+
// Failing apply transaction on congtainer 1.
211+
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
212+
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
213+
ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
214+
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
215+
.setContainerID(1)
216+
.setDatanodeUuid(UUID.randomUUID().toString()).build());
217+
AtomicReference<Throwable> throwable = new AtomicReference<>(null);
218+
Function<Throwable, ? extends Message> throwableSetter = t -> {
219+
throwable.set(t);
220+
return null;
221+
};
222+
//apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first
223+
// failure on container 1.
224+
stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
225+
verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
226+
any(DispatcherContext.class));
227+
reset(dispatcher);
228+
assertNotNull(throwable.get());
229+
if (failWithException) {
230+
assertInstanceOf(RuntimeException.class, throwable.get());
231+
} else {
232+
assertInstanceOf(StorageContainerException.class, throwable.get());
233+
StorageContainerException sce = (StorageContainerException) throwable.get();
234+
assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
235+
}
236+
// Another apply transaction on same container 1 should fail because the previous apply transaction failed.
237+
stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
238+
verify(dispatcher, times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
239+
any(DispatcherContext.class));
240+
assertInstanceOf(StorageContainerException.class, throwable.get());
241+
StorageContainerException sce = (StorageContainerException) throwable.get();
242+
assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());
243+
244+
// Another apply transaction on a different container 2 shouldn't fail because the previous apply transaction
245+
// failure was only on container 1.
246+
when(context.getLogProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
247+
.setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
248+
ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(
249+
ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(2).setLocalID(1).build()).build())
250+
.setContainerID(2)
251+
.setDatanodeUuid(UUID.randomUUID().toString()).build());
252+
253+
reset(dispatcher);
254+
throwable.set(null);
255+
when(dispatcher.dispatch(any(), any())).thenReturn(ContainerProtos.ContainerCommandResponseProto
256+
.newBuilder().setCmdType(ContainerProtos.Type.WriteChunk).setResult(ContainerProtos.Result.SUCCESS)
257+
.build());
258+
Message succcesfulTransaction = stateMachine.applyTransaction(trx).exceptionally(throwableSetter).get();
259+
verify(dispatcher, times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
260+
any(DispatcherContext.class));
261+
assertNull(throwable.get());
262+
ContainerProtos.ContainerCommandResponseProto resp =
263+
ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent());
264+
assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult());
265+
}
266+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
19+
20+
/**
21+
* Test class to ContainerStateMachine class for follower.
22+
*/
23+
public class TestContainerStateMachineFollower extends TestContainerStateMachine {
24+
public TestContainerStateMachineFollower() {
25+
super(false);
26+
}
27+
}

0 commit comments

Comments
 (0)