diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f20d606d4365..1575195cfe9e 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1518,6 +1518,43 @@
+
+ ozone.compaction.service.enabled
+ true
+ OZONE, OM, PERFORMANCE
+
+ Enable or disable a background job that periodically compacts rocksdb tables flagged for compaction.
+
+
+
+ ozone.om.compaction.service.run.interval
+ 5m
+ OZONE, OM, PERFORMANCE
+
+ A background job that periodically compacts rocksdb tables flagged for compaction.
+ Unit could be defined with postfix (ns,ms,s,m,h,d)
+
+
+
+ ozone.om.compaction.service.timeout
+ 10m
+ OZONE, OM, PERFORMANCE
+ A timeout value of compaction service. If this is set
+ greater than 0, the service will stop waiting for compaction
+ completion after this time.
+ Unit could be defined with postfix (ns,ms,s,m,h,d)
+
+
+
+ ozone.om.compaction.service.threshold
+ 10000
+ OZONE, OM, PERFORMANCE
+
+ Compact rocksdb column families when the number of deleted keys
+ exceeds this number.
+
+
+
ozone.om.snapshot.rocksdb.metrics.enabled
false
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 24676ac33b5f..1143db73658d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.TableCacheMetrics;
@@ -67,6 +68,7 @@ public class TypedTable implements Table {
private final CodecBuffer.Capacity bufferCapacity
= new CodecBuffer.Capacity(this, BUFFER_SIZE_DEFAULT);
private final TableCache cache;
+ private final AtomicLong uncompactedDeletes = new AtomicLong(0);
/**
* The same as this(rawTable, codecRegistry, keyType, valueType,
@@ -399,12 +401,14 @@ public void delete(KEY key) throws IOException {
} else {
rawTable.delete(encodeKey(key));
}
+ uncompactedDeletes.addAndGet(1);
}
@Override
public void deleteWithBatch(BatchOperation batch, KEY key)
throws IOException {
rawTable.deleteWithBatch(batch, encodeKey(key));
+ uncompactedDeletes.addAndGet(1);
}
@Override
@@ -412,6 +416,15 @@ public void deleteRange(KEY beginKey, KEY endKey) throws IOException {
rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey));
}
+ public AtomicLong getUncompactedDeletes() {
+ return uncompactedDeletes;
+ }
+
+
+ public void resetUncompactedDeletes() {
+ uncompactedDeletes.set(0);
+ }
+
@Override
public Table.KeyValueIterator iterator() throws IOException {
return iterator(null);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 7e80766c7fe5..6db9961d567b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -622,4 +622,21 @@ private OMConfigKeys() {
public static final String OZONE_OM_MAX_BUCKET =
"ozone.om.max.buckets";
public static final int OZONE_OM_MAX_BUCKET_DEFAULT = 100000;
+ /**
+ * Configuration properties for Compaction Service.
+ */
+ public static final String OZONE_COMPACTION_SERVICE_ENABLED = "ozone.compaction.service.enabled";
+ public static final boolean OZONE_COMPACTION_SERVICE_ENABLED_DEFAULT = true;
+ public static final String OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL =
+ "ozone.om.compaction.service.run.interval";
+ public static final long
+ OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL_DEFAULT
+ = TimeUnit.MINUTES.toMillis(5);
+
+ public static final String OZONE_OM_COMPACTION_SERVICE_TIMEOUT
+ = "ozone.om.compaction.service.timeout";
+ public static final String OZONE_COMPACTION_SERVICE_TIMEOUT_DEFAULT = "10m";
+ public static final String OZONE_OM_COMPACTION_SERVICE_THRESHOLD
+ = "ozone.om.compaction.service.threshold";
+ public static final String OZONE_COMPACTION_SERVICE_THRESHOLD_DEFAULT = "10000";
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index d25535b151d4..c2ae95071fe2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
+import org.apache.hadoop.ozone.om.service.CompactionService;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
@@ -302,4 +303,10 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
* @return Background service.
*/
SnapshotDirectoryCleaningService getSnapshotDirectoryService();
+
+ /**
+ * Returns the instance of Compaction service.
+ * @return Background service.
+ */
+ CompactionService getCompactionService();
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 4357542ff7b8..155ce524344d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -38,8 +38,16 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.ETAG;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_COMPACTION_SERVICE_ENABLED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_COMPACTION_SERVICE_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_COMPACTION_SERVICE_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_COMPACTION_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_THRESHOLD;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT;
@@ -150,6 +158,7 @@
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.om.service.CompactionService;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.MultipartUploadCleanupService;
@@ -198,6 +207,7 @@ public class KeyManagerImpl implements KeyManager {
private BackgroundService multipartUploadCleanupService;
private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
private DNSToSwitchMapping dnsToSwitchMapping;
+ private CompactionService compactionService;
public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -227,6 +237,9 @@ public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
@Override
public void start(OzoneConfiguration configuration) {
+ boolean isCompactionServiceEnabled = configuration.getBoolean(OZONE_COMPACTION_SERVICE_ENABLED,
+ OZONE_COMPACTION_SERVICE_ENABLED_DEFAULT);
+ startCompactionService(configuration, isCompactionServiceEnabled);
boolean isSnapshotDeepCleaningEnabled = configuration.getBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED,
OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT);
if (keyDeletingService == null) {
@@ -363,6 +376,27 @@ public void start(OzoneConfiguration configuration) {
: new CachedDNSToSwitchMapping(newInstance));
}
+ private void startCompactionService(OzoneConfiguration configuration,
+ boolean isCompactionServiceEnabled) {
+ if (compactionService == null && isCompactionServiceEnabled) {
+ long compactionInterval = configuration.getTimeDuration(
+ OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL,
+ OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = configuration.getTimeDuration(
+ OZONE_OM_COMPACTION_SERVICE_TIMEOUT,
+ OZONE_COMPACTION_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long compactionThreshold = configuration.getTimeDuration(
+ OZONE_OM_COMPACTION_SERVICE_THRESHOLD,
+ OZONE_COMPACTION_SERVICE_THRESHOLD_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ compactionService = new CompactionService(ozoneManager, TimeUnit.MILLISECONDS,
+ compactionInterval, serviceTimeout, compactionThreshold);
+ compactionService.start();
+ }
+ }
+
KeyProviderCryptoExtension getKMSProvider() {
return kmsProvider;
}
@@ -397,6 +431,10 @@ public void stop() throws IOException {
snapshotDirectoryCleaningService.shutdown();
snapshotDirectoryCleaningService = null;
}
+ if (compactionService != null) {
+ compactionService.shutdown();
+ compactionService = null;
+ }
}
private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -806,6 +844,11 @@ public SnapshotDirectoryCleaningService getSnapshotDirectoryService() {
return snapshotDirectoryCleaningService;
}
+ @Override
+ public CompactionService getCompactionService() {
+ return compactionService;
+ }
+
public boolean isSstFilteringSvcEnabled() {
long serviceInterval = ozoneManager.getConfiguration()
.getTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java
new file mode 100644
index 000000000000..69979eb309df
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the background service to compact OM rocksdb tables.
+ */
+public class CompactionService extends BackgroundService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CompactionService.class);
+
+ // Use only a single thread for Compaction.
+ private static final int COMPACTOR_THREAD_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private final OMMetadataManager omMetadataManager;
+ private final AtomicLong numCompactions;
+ private final AtomicBoolean suspended;
+ private final long compactionThreshold;
+ // list of tables that can be compacted
+ private static final List COMPACTABLE_TABLES =
+ Arrays.asList(DELETED_DIR_TABLE, DELETED_TABLE, DIRECTORY_TABLE, FILE_TABLE, KEY_TABLE);
+
+ public CompactionService(OzoneManager ozoneManager, TimeUnit unit, long interval, long timeout,
+ long compactionThreshold) {
+ super("CompactionService", interval, unit,
+ COMPACTOR_THREAD_POOL_SIZE, timeout,
+ ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.omMetadataManager = this.ozoneManager.getMetadataManager();
+
+ this.numCompactions = new AtomicLong(0);
+ this.suspended = new AtomicBoolean(false);
+ this.compactionThreshold = compactionThreshold;
+ }
+
+ /**
+ * Suspend the service (for testing).
+ */
+ @VisibleForTesting
+ public void suspend() {
+ suspended.set(true);
+ }
+
+ /**
+ * Resume the service if suspended (for testing).
+ */
+ @VisibleForTesting
+ public void resume() {
+ suspended.set(false);
+ }
+
+ /**
+ * Returns the number of manual compactions performed.
+ *
+ * @return long count.
+ */
+ @VisibleForTesting
+ public long getNumCompactions() {
+ return numCompactions.get();
+ }
+
+ @Override
+ public synchronized BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+ for (String tableName : COMPACTABLE_TABLES) {
+ TypedTable table = (TypedTable)omMetadataManager.getTable(tableName);
+ if (table.getUncompactedDeletes().get() > compactionThreshold) {
+ queue.add(new CompactTask(tableName));
+ table.resetUncompactedDeletes();
+ }
+ }
+ return queue;
+ }
+
+ private boolean shouldRun() {
+ return !suspended.get();
+ }
+
+ protected void compactFully(String tableName) throws IOException {
+ long startTime = Time.monotonicNow();
+ LOG.info("Compacting column family: {}", tableName);
+ try (ManagedCompactRangeOptions options = new ManagedCompactRangeOptions()) {
+ options.setBottommostLevelCompaction(
+ ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+ // Find CF Handler
+ RocksDatabase rocksDatabase = ((RDBStore) omMetadataManager.getStore()).getDb();
+ RocksDatabase.ColumnFamily columnFamily = rocksDatabase.getColumnFamily(tableName);
+ rocksDatabase.compactRange(columnFamily, null, null, options);
+ LOG.info("Compaction of column family: {} completed in {} ms",
+ tableName, Time.monotonicNow() - startTime);
+ }
+ }
+
+ private class CompactTask implements BackgroundTask {
+ private final String tableName;
+
+ CompactTask(String tableName) {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // trigger full compaction for the specified table.
+ if (!shouldRun()) {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ LOG.debug("Running CompactTask");
+
+ compactFully(tableName);
+ numCompactions.incrementAndGet();
+ return () -> 1;
+ }
+ }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
index e1ae9ef3ccd9..cb6ff02049d9 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
@@ -17,6 +17,9 @@
package org.apache.hadoop.ozone.om.request.key;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -29,6 +32,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.ClientVersion;
@@ -52,7 +56,6 @@
* Tests {@link OMKeyPurgeRequest} and {@link OMKeyPurgeResponse}.
*/
public class TestOMDirectoriesPurgeRequestAndResponse extends TestOMKeyRequest {
-
private int numKeys = 10;
/**
@@ -185,6 +188,7 @@ private OMRequest preExecute(OMRequest originalOmRequest) throws IOException {
@Test
public void testValidateAndUpdateCacheCheckQuota() throws Exception {
+ keyManager.start(getOzoneConfiguration());
// Create and Delete keys. The keys should be moved to DeletedKeys table
List deletedKeyInfos = createAndDeleteKeys(1, null);
// The keys should be present in the DeletedKeys table before purging
@@ -216,6 +220,7 @@ public void testValidateAndUpdateCacheCheckQuota() throws Exception {
@Test
public void testValidateAndUpdateCacheSnapshotLastTransactionInfoUpdated() throws Exception {
+ keyManager.start(getOzoneConfiguration());
// Create and Delete keys. The keys should be moved to DeletedKeys table
List deletedKeyInfos = createAndDeleteKeys(1, null);
// The keys should be present in the DeletedKeys table before purging
@@ -260,7 +265,7 @@ public void testValidateAndUpdateCacheSnapshotLastTransactionInfoUpdated() throw
performBatchOperationCommit(omClientResponse);
// The keys should exist in the DeletedKeys table after dir delete
- validateDeletedKeys(rcOmSnapshot.get().getMetadataManager(), deletedKeyNames);
+ validateDeletedKeys(rcOmSnapshot.get().getMetadataManager(), deletedKeyNames, true);
snapshotInfoOnDisk = omMetadataManager.getSnapshotInfoTable().getSkipCache(snapshotInfo.getTableKey());
assertEquals(snapshotInfo, snapshotInfoOnDisk);
rcOmSnapshot.close();
@@ -269,6 +274,7 @@ public void testValidateAndUpdateCacheSnapshotLastTransactionInfoUpdated() throw
@Test
public void testValidateAndUpdateCacheQuotaBucketRecreated()
throws Exception {
+ keyManager.start(getOzoneConfiguration());
// Create and Delete keys. The keys should be moved to DeletedKeys table
List deletedKeyInfos = createAndDeleteKeys(1, null);
// The keys should be present in the DeletedKeys table before purging
@@ -343,9 +349,22 @@ private List validateDeletedKeysTable(OMMetadataManager omMetadataManage
private void validateDeletedKeys(OMMetadataManager omMetadataManager,
List deletedKeyNames) throws IOException {
+ validateDeletedKeys(omMetadataManager, deletedKeyNames, false);
+ }
+
+ private void validateDeletedKeys(OMMetadataManager omMetadataManager,
+ List deletedKeyNames, boolean isSnapshot) throws IOException {
for (String deletedKey : deletedKeyNames) {
assertTrue(omMetadataManager.getDeletedTable().isExist(
deletedKey));
}
+ // If it's deleted from a snapshot, it doesn't affect fileTable.
+ long expectedUncompactedDeletes = isSnapshot ? 0 : deletedKeyNames.size();
+ assertEquals(expectedUncompactedDeletes,
+ ((TypedTable)this.omMetadataManager.getTable(FILE_TABLE)).getUncompactedDeletes().get());
+ assertEquals(0,
+ ((TypedTable)this.omMetadataManager.getTable(DELETED_DIR_TABLE)).getUncompactedDeletes().get());
+ assertEquals(0,
+ ((TypedTable)this.omMetadataManager.getTable(DIRECTORY_TABLE)).getUncompactedDeletes().get());
}
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index 46f442c45e2f..d381a4d2de41 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om.request.key;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -28,6 +29,7 @@
import java.util.UUID;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -122,6 +124,7 @@ private OMRequest preExecute(OMRequest originalOmRequest) throws IOException {
@Test
public void testValidateAndUpdateCache() throws Exception {
+ keyManager.start(getOzoneConfiguration());
// Create and Delete keys. The keys should be moved to DeletedKeys table
List deletedKeyNames = createAndDeleteKeys(1, null);
@@ -165,6 +168,7 @@ public void testValidateAndUpdateCache() throws Exception {
@Test
public void testKeyPurgeInSnapshot() throws Exception {
+ keyManager.start(getOzoneConfiguration());
// Create and Delete keys. The keys should be moved to DeletedKeys table
List deletedKeyNames = createAndDeleteKeys(1, null);
@@ -211,8 +215,11 @@ public void testKeyPurgeInSnapshot() throws Exception {
try (BatchOperation batchOperation =
omMetadataManager.getStore().initBatchOperation()) {
- OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(omResponse, deletedKeyNames, snapInfo, null);
+ OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(
+ omResponse, deletedKeyNames, snapInfo, null);
omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation);
+ assertEquals(deletedKeyNames.size(),
+ ((TypedTable)this.omMetadataManager.getTable(DELETED_TABLE)).getUncompactedDeletes().get());
// Do manual commit and see whether addToBatch is successful or not.
omMetadataManager.getStore().commitBatchOperation(batchOperation);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestCompactionService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestCompactionService.java
new file mode 100644
index 000000000000..0f30f0d88611
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestCompactionService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(OrderAnnotation.class)
+class TestCompactionService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestCompactionService.class);
+
+ private static final int SERVICE_INTERVAL = 1;
+ private static final int WAIT_TIME = (int) Duration.ofSeconds(10).toMillis();
+
+ @BeforeAll
+ void setup(@TempDir Path tempDir) throws Exception {
+ ExitUtils.disableSystemExit();
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+ ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString());
+ conf.setTimeDuration(OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL,
+ SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ conf.setQuietMode(false);
+ }
+
+ /**
+ * Add a compaction request and verify that it is processed.
+ *
+ * @throws IOException - on Failure.
+ */
+ @Timeout(300)
+ @Test
+ public void testCompact() throws Exception {
+ OzoneManager ozoneManager = mock(OzoneManager.class);
+ OMMetadataManager metadataManager = mock(OMMetadataManager.class);
+
+ TypedTable table = mock(TypedTable.class);
+ AtomicLong numDeletes = new AtomicLong(2);
+
+ when(ozoneManager.getMetadataManager()).thenReturn(metadataManager);
+ when(metadataManager.getTable(anyString())).thenReturn(table);
+ when(table.getUncompactedDeletes()).thenReturn(numDeletes);
+
+ CompactionService compactionService = new CompactionService(ozoneManager, TimeUnit.MILLISECONDS,
+ TimeUnit.SECONDS.toMillis(SERVICE_INTERVAL), TimeUnit.SECONDS.toMillis(60), 1) {
+
+ @Override
+ public void compactFully(String tableName) throws IOException {
+ LOG.info("Compacting column family: {}", tableName);
+ }
+ };
+ compactionService.start();
+
+ compactionService.suspend();
+ // wait for submitted tasks to complete
+ Thread.sleep(SERVICE_INTERVAL);
+ final long oldkeyCount = compactionService.getNumCompactions();
+ LOG.info("oldkeyCount={}", oldkeyCount);
+
+ final int compactionTriggered = 1;
+
+ compactionService.resume();
+
+ GenericTestUtils.waitFor(
+ () -> compactionService.getNumCompactions() >= oldkeyCount + compactionTriggered,
+ SERVICE_INTERVAL, WAIT_TIME);
+ }
+
+}