2424import static org .apache .hadoop .hdds .scm .pipeline .MockPipeline .createPipeline ;
2525import static org .apache .hadoop .hdds .scm .storage .ContainerProtocolCalls .createContainer ;
2626import static org .apache .ozone .test .GenericTestUtils .waitFor ;
27+ import static org .junit .jupiter .api .Assertions .assertEquals ;
2728
2829import com .google .common .collect .ImmutableList ;
2930import java .io .IOException ;
3435import java .util .concurrent .atomic .AtomicLong ;
3536import java .util .function .ToLongFunction ;
3637import java .util .stream .IntStream ;
38+ import java .util .stream .Stream ;
39+ import org .apache .hadoop .hdds .client .BlockID ;
3740import org .apache .hadoop .hdds .conf .OzoneConfiguration ;
41+ import org .apache .hadoop .hdds .conf .StorageUnit ;
3842import org .apache .hadoop .hdds .protocol .DatanodeDetails ;
3943import org .apache .hadoop .hdds .protocol .DatanodeDetails .Port ;
44+ import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos ;
45+ import org .apache .hadoop .hdds .scm .ScmConfigKeys ;
4046import org .apache .hadoop .hdds .scm .XceiverClientFactory ;
4147import org .apache .hadoop .hdds .scm .XceiverClientManager ;
4248import org .apache .hadoop .hdds .scm .XceiverClientSpi ;
4349import org .apache .hadoop .hdds .scm .container .replication .ReplicationManager .ReplicationManagerConfiguration ;
4450import org .apache .hadoop .hdds .utils .IOUtils ;
4551import org .apache .hadoop .ozone .HddsDatanodeService ;
4652import org .apache .hadoop .ozone .MiniOzoneCluster ;
53+ import org .apache .hadoop .ozone .OzoneConfigKeys ;
54+ import org .apache .hadoop .ozone .container .ContainerTestHelper ;
55+ import org .apache .hadoop .ozone .container .common .interfaces .Container ;
4756import org .apache .hadoop .ozone .container .common .statemachine .DatanodeStateMachine ;
4857import org .apache .hadoop .ozone .container .common .statemachine .StateContext ;
4958import org .apache .hadoop .ozone .protocol .commands .ReplicateContainerCommand ;
59+ import org .apache .hadoop .test .GenericTestUtils ;
60+ import org .apache .ozone .test .GenericTestUtils .LogCapturer ;
5061import org .junit .jupiter .api .AfterAll ;
5162import org .junit .jupiter .api .BeforeAll ;
5263import org .junit .jupiter .api .Test ;
5364import org .junit .jupiter .params .ParameterizedTest ;
65+ import org .junit .jupiter .params .provider .Arguments ;
5466import org .junit .jupiter .params .provider .EnumSource ;
67+ import org .junit .jupiter .params .provider .MethodSource ;
5568
5669/**
5770 * Tests ozone containers replication.
@@ -157,6 +170,68 @@ void pushUnknownContainer() throws Exception {
157170 ReplicationSupervisor ::getReplicationFailureCount );
158171 }
159172
173+ /**
174+ * Provides stream of different container sizes for tests.
175+ */
176+ public static Stream <Arguments > sizeProvider () {
177+ return Stream .of (
178+ Arguments .of ("Normal 2MB" , 2L * 1024L * 1024L ),
179+ Arguments .of ("Overallocated 6MB" , 6L * 1024L * 1024L )
180+ );
181+ }
182+
183+ /**
184+ * Tests push replication of a container with over-allocated size.
185+ * The target datanode will need to reserve double the container size,
186+ * which is greater than the configured max container size.
187+ */
188+ @ ParameterizedTest (name = "for {0}" )
189+ @ MethodSource ("sizeProvider" )
190+ void testPushWithOverAllocatedContainer (String testName , Long containerSize )
191+ throws Exception {
192+ LogCapturer grpcLog = LogCapturer .captureLogs (GrpcContainerUploader .class );
193+ LogCapturer containerImporterLog = LogCapturer .captureLogs (ContainerImporter .class );
194+
195+ DatanodeDetails source = cluster .getHddsDatanodes ().get (0 )
196+ .getDatanodeDetails ();
197+
198+ long containerID = createOverAllocatedContainer (source , containerSize );
199+
200+ DatanodeDetails target = selectOtherNode (source );
201+
202+ // Get the original container size from source
203+ Container <?> sourceContainer = getContainer (source , containerID );
204+ long originalSize = sourceContainer .getContainerData ().getBytesUsed ();
205+
206+ // Verify container is created with expected size
207+ assertEquals (originalSize , containerSize );
208+
209+ // Create replication command to push container to target
210+ ReplicateContainerCommand cmd =
211+ ReplicateContainerCommand .toTarget (containerID , target );
212+
213+ // Execute push replication
214+ queueAndWaitForCompletion (cmd , source ,
215+ ReplicationSupervisor ::getReplicationSuccessCount );
216+
217+ GenericTestUtils .waitFor (() -> {
218+ String grpcLogs = grpcLog .getOutput ();
219+ String containerImporterLogOutput = containerImporterLog .getOutput ();
220+
221+ return grpcLogs .contains ("Starting upload of container " +
222+ containerID + " to " + target + " with size " + originalSize ) &&
223+ containerImporterLogOutput .contains ("Choosing volume to reserve space: " +
224+ originalSize * 2 );
225+ }, 100 , 1000 );
226+
227+ // Verify container was successfully replicated to target
228+ Container <?> targetContainer = getContainer (target , containerID );
229+ long replicatedSize = targetContainer .getContainerData ().getBytesUsed ();
230+
231+ // verify sizes match exactly
232+ assertEquals (originalSize , replicatedSize );
233+ }
234+
160235 /**
161236 * Queues {@code cmd} in {@code dn}'s state machine, and waits until the
162237 * command is completed, as indicated by {@code counter} having been
@@ -194,6 +269,8 @@ private static OzoneConfiguration createConfiguration() {
194269 OzoneConfiguration conf = new OzoneConfiguration ();
195270 conf .setTimeDuration (OZONE_SCM_STALENODE_INTERVAL , 3 , TimeUnit .SECONDS );
196271 conf .setTimeDuration (OZONE_SCM_DEADNODE_INTERVAL , 6 , TimeUnit .SECONDS );
272+ conf .setStorageSize (ScmConfigKeys .OZONE_SCM_CONTAINER_SIZE , 5 , StorageUnit .MB );
273+ conf .setStorageSize (OzoneConfigKeys .OZONE_SCM_BLOCK_SIZE , 1 , StorageUnit .MB );
197274
198275 ReplicationManagerConfiguration repConf =
199276 conf .getObject (ReplicationManagerConfiguration .class );
@@ -212,4 +289,71 @@ private static long createNewClosedContainer(DatanodeDetails dn)
212289 }
213290 }
214291
292+ private static long createOverAllocatedContainer (DatanodeDetails dn , Long targetDataSize ) throws Exception {
293+ long containerID = CONTAINER_ID .incrementAndGet ();
294+ try (XceiverClientSpi client = clientFactory .acquireClient (
295+ createPipeline (singleton (dn )))) {
296+
297+ // Create the container
298+ createContainer (client , containerID , null );
299+
300+ int chunkSize = 1 * 1024 * 1024 ; // 1MB chunks
301+ long totalBytesWritten = 0 ;
302+
303+ // Write data in chunks until we reach target size
304+ while (totalBytesWritten < targetDataSize ) {
305+ BlockID blockID = ContainerTestHelper .getTestBlockID (containerID );
306+
307+ // Calculate remaining bytes and adjust chunk size if needed
308+ long remainingBytes = targetDataSize - totalBytesWritten ;
309+ int currentChunkSize = (int ) Math .min (chunkSize , remainingBytes );
310+
311+ // Create a write chunk request with current chunk size
312+ ContainerProtos .ContainerCommandRequestProto writeChunkRequest =
313+ ContainerTestHelper .getWriteChunkRequest (
314+ createPipeline (singleton (dn )), blockID , currentChunkSize );
315+
316+ // Send write chunk command
317+ client .sendCommand (writeChunkRequest );
318+
319+ // Create and send put block command
320+ ContainerProtos .ContainerCommandRequestProto putBlockRequest =
321+ ContainerTestHelper .getPutBlockRequest (writeChunkRequest );
322+ client .sendCommand (putBlockRequest );
323+
324+ totalBytesWritten += currentChunkSize ;
325+ }
326+
327+ // Close the container
328+ ContainerProtos .CloseContainerRequestProto closeRequest =
329+ ContainerProtos .CloseContainerRequestProto .newBuilder ().build ();
330+ ContainerProtos .ContainerCommandRequestProto closeContainerRequest =
331+ ContainerProtos .ContainerCommandRequestProto .newBuilder ()
332+ .setCmdType (ContainerProtos .Type .CloseContainer )
333+ .setContainerID (containerID )
334+ .setCloseContainer (closeRequest )
335+ .setDatanodeUuid (dn .getUuidString ())
336+ .build ();
337+ client .sendCommand (closeContainerRequest );
338+
339+ return containerID ;
340+ }
341+ }
342+
343+ /**
344+ * Gets the container from the specified datanode.
345+ */
346+ private Container <?> getContainer (DatanodeDetails datanode , long containerID ) {
347+ for (HddsDatanodeService datanodeService : cluster .getHddsDatanodes ()) {
348+ if (datanode .equals (datanodeService .getDatanodeDetails ())) {
349+ Container <?> container = datanodeService .getDatanodeStateMachine ().getContainer ()
350+ .getContainerSet ().getContainer (containerID );
351+ if (container != null ) {
352+ return container ;
353+ }
354+ }
355+ }
356+ throw new AssertionError ("Container " + containerID + " not found on " + datanode );
357+ }
358+
215359}
0 commit comments