diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index aaadbbbcb955..0c2df21cbf7a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CRLStatusReport; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.IEventInfo; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -51,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -200,6 +203,8 @@ public List dispatch(SCMHeartbeatRequestProto heartbeat) { new CommandStatusReportFromDatanode( datanodeDetails, commandStatusReport)); + // update commands + updateCommands(commands, commandStatusReport); } } } @@ -207,6 +212,50 @@ public List dispatch(SCMHeartbeatRequestProto heartbeat) { return commands; } + private void updateCommands(List commands, + CommandStatusReportsProto commandStatusReport) { + if (commands != null) { + List cmdStatusList + = commandStatusReport.getCmdStatusList(); + cmdStatusList.forEach(cmdStatus -> { + if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand && + cmdStatus.getStatus() == StorageContainerDatanodeProtocolProtos. + CommandStatus.Status.EXECUTED) { + StorageContainerDatanodeProtocolProtos. + ContainerBlocksDeletionACKProto ackProto = + cmdStatus.getBlockDeletionAck(); + List + results = ackProto.getResultsList(); + for (SCMCommand command : commands) { + if (command.getType() == SCMCommandProto.Type. + deleteBlocksCommand) { + DeleteBlocksCommand deleteBlocksCommand = + (DeleteBlocksCommand) command; + List deleteds = + deleteBlocksCommand.blocksTobeDeleted(); + for (StorageContainerDatanodeProtocolProtos. + ContainerBlocksDeletionACKProto. + DeleteBlockTransactionResult result : results) { + Iterator iterator = deleteds.iterator(); + while (iterator.hasNext()) { + StorageContainerDatanodeProtocolProtos. + DeletedBlocksTransaction delete = iterator.next(); + if (delete.getTxID() == result.getTxID()) { + iterator.remove(); + } + } + } + } + } + } + // Other commands + }); + } + } + /** * Wrapper class for events with the datanode origin. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index dc6ed2ccb103..3a9ef403a914 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -189,6 +189,11 @@ conf, getDatanodeAddressKey(), datanodeRpcAddr, HddsServerUtil.addSuppressedLoggingExceptions(datanodeRpcServer); } + @VisibleForTesting + public SCMDatanodeHeartbeatDispatcher getHeartbeatDispatcher() { + return heartbeatDispatcher; + } + public void start() { LOG.info( StorageContainerManager.buildRpcServerStartMessage( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index e75f6e6f41a5..02e3836d9d86 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto @@ -49,6 +50,7 @@ import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; @@ -239,6 +241,82 @@ public void testScmHeartbeat() } } + @Test + public void testCmdStateUpdate() throws Exception { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + DatanodeDetails datanodeDetails = registerWithCapacity(nodeManager); + // Generate DeletedBlocksTransaction set + List dnTXs = new ArrayList<>(); + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction + transaction1 = StorageContainerDatanodeProtocolProtos. + DeletedBlocksTransaction.newBuilder(). + setContainerID(100).setCount(10).setTxID(101).build(); + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction + transaction2 = StorageContainerDatanodeProtocolProtos. + DeletedBlocksTransaction.newBuilder(). + setContainerID(110).setCount(10).setTxID(102).build(); + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction + transaction3 = StorageContainerDatanodeProtocolProtos. + DeletedBlocksTransaction.newBuilder(). + setContainerID(120).setCount(10).setTxID(103).build(); + dnTXs.add(transaction1); + dnTXs.add(transaction2); + dnTXs.add(transaction3); + SCMCommand deleteCommand = new DeleteBlocksCommand(dnTXs); + nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), deleteCommand); + + // Generate CommandStatusReports + StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto. + DeleteBlockTransactionResult txResult = + StorageContainerDatanodeProtocolProtos. + ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult. + newBuilder().setTxID(103).setContainerID(120). + setSuccess(true).build(); + + List + results = new ArrayList<>(); + results.add(txResult); + StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + ackProto = StorageContainerDatanodeProtocolProtos. + ContainerBlocksDeletionACKProto.newBuilder().addAllResults(results). + setDnId(datanodeDetails.getUuidString()).build(); + + StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatus = + StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder(). + setBlockDeletionAck(ackProto).setCmdId(deleteCommand.getId()). + setStatus(StorageContainerDatanodeProtocolProtos. + CommandStatus.Status.EXECUTED). + setType(StorageContainerDatanodeProtocolProtos.SCMCommandProto. + Type.deleteBlocksCommand).build(); + StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto + commandStatusReport = StorageContainerDatanodeProtocolProtos. + CommandStatusReportsProto.newBuilder(). + addCmdStatus(cmdStatus).build(); + + // Generate heartbeat + StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto + heartbeat = StorageContainerDatanodeProtocolProtos. + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .addCommandStatusReports(commandStatusReport).build(); + SCMDatanodeHeartbeatDispatcher dispatcher = scm. + getDatanodeProtocolServer().getHeartbeatDispatcher(); + List commands = dispatcher.dispatch(heartbeat); + for (SCMCommand command : commands) { + if (command instanceof DeleteBlocksCommand) { + DeleteBlocksCommand deleteBlocksCommand = + (DeleteBlocksCommand) command; + for (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction + transaction: deleteBlocksCommand.blocksTobeDeleted()) { + assertTrue(transaction.getTxID() != 103); + } + } + } + } + } + /** * Tests that node manager handles layout version changes from heartbeats * correctly.