HDDS-360. Use RocksDBStore and TableStore for SCM Metadata.
authorNanda kumar <nanda@apache.org>
Tue, 12 Feb 2019 08:55:14 +0000 (14:25 +0530)
committerNanda kumar <nanda@apache.org>
Tue, 12 Feb 2019 08:56:19 +0000 (14:26 +0530)
Contributed by Anu Engineer.

30 files changed:
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStoreIterator.java
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TableIterator.java
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java [new file with mode: 0644]
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java [new file with mode: 0644]
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java [new file with mode: 0644]
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java [new file with mode: 0644]
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/package-info.java [new file with mode: 0644]
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java [new file with mode: 0644]
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java [moved from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorage.java with 89% similarity]
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/package-info.java [new file with mode: 0644]

index 1d696f8..ca319f6 100644 (file)
@@ -115,6 +115,7 @@ public final class OzoneConsts {
   public static final String DELETED_BLOCK_DB = "deletedBlock.db";
   public static final String OM_DB_NAME = "om.db";
   public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
+  public static final String SCM_DB_NAME = "scm.db";
 
   public static final String STORAGE_DIR_CHUNKS = "chunks";
 
index 1faa089..ecc065b 100644 (file)
@@ -41,7 +41,7 @@ public class RDBStoreIterator
   @Override
   public void forEachRemaining(
       Consumer<? super ByteArrayKeyValue> action) {
-    while(hasNext()) {
+    while (hasNext()) {
       action.accept(next());
     }
   }
@@ -56,7 +56,7 @@ public class RDBStoreIterator
     if (rocksDBIterator.isValid()) {
       ByteArrayKeyValue value =
           ByteArrayKeyValue.create(rocksDBIterator.key(), rocksDBIterator
-          .value());
+              .value());
       rocksDBIterator.next();
       return value;
     }
@@ -84,6 +84,23 @@ public class RDBStoreIterator
   }
 
   @Override
+  public byte[] key() {
+    if (rocksDBIterator.isValid()) {
+      return rocksDBIterator.key();
+    }
+    return null;
+  }
+
+  @Override
+  public ByteArrayKeyValue value() {
+    if (rocksDBIterator.isValid()) {
+      return ByteArrayKeyValue.create(rocksDBIterator.key(),
+          rocksDBIterator.value());
+    }
+    return null;
+  }
+
+  @Override
   public void close() throws IOException {
     rocksDBIterator.close();
   }
index 071dbf4..fcd8535 100644 (file)
@@ -47,4 +47,16 @@ public interface TableIterator<KEY, T> extends Iterator<T>, Closeable {
    */
   T seek(KEY key);
 
+  /**
+   * Returns the key value at the current position.
+   * @return KEY
+   */
+  KEY key();
+
+  /**
+   * Returns the VALUE at the current position.
+   * @return VALUE
+   */
+  T value();
+
 }
index d0a33fd..f715572 100644 (file)
@@ -140,12 +140,16 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
     private TableIterator<byte[], ? extends KeyValue<byte[], byte[]>>
         rawIterator;
+    private final Class<KEY> keyClass;
+    private final Class<VALUE> valueClass;
 
     public TypedTableIterator(
         TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> rawIterator,
         Class<KEY> keyType,
         Class<VALUE> valueType) {
       this.rawIterator = rawIterator;
+      keyClass = keyType;
+      valueClass = valueType;
     }
 
     @Override
@@ -169,6 +173,24 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
     }
 
     @Override
+    public KEY key() {
+      byte[] result = rawIterator.key();
+      if (result == null) {
+        return null;
+      }
+      return codecRegistry.asObject(result, keyClass);
+    }
+
+    @Override
+    public TypedKeyValue value() {
+      KeyValue keyValue = rawIterator.value();
+      if(keyValue != null) {
+        return new TypedKeyValue(keyValue, keyClass, valueClass);
+      }
+      return null;
+    }
+
+    @Override
     public void close() throws IOException {
       rawIterator.close();
     }
index 43b4452..5b17de9 100644 (file)
 
 package org.apache.hadoop.hdds.scm;
 
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
 import org.apache.hadoop.hdds.scm.chillmode.Precheck;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
 
 /**
  * SCM utility class.
  */
 public final class ScmUtils {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(ScmUtils.class);
 
   private ScmUtils() {
   }
@@ -42,4 +53,30 @@ public final class ScmUtils {
       preCheck.check(operation);
     }
   }
+
+  public static File getDBPath(Configuration conf, String dbDirectory) {
+    final Collection<String> dbDirs =
+        conf.getTrimmedStringCollection(dbDirectory);
+
+    if (dbDirs.size() > 1) {
+      throw new IllegalArgumentException(
+          "Bad configuration setting " + dbDirectory
+              + ". OM does not support multiple metadata dirs currently.");
+    }
+
+    if (dbDirs.size() == 1) {
+      final File dbDirPath = new File(dbDirs.iterator().next());
+      if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
+        throw new IllegalArgumentException(
+            "Unable to create directory " + dbDirPath
+                + " specified in configuration setting " + dbDirectory);
+      }
+      return dbDirPath;
+    }
+
+    LOG.warn("{} is not configured. We recommend adding this setting. "
+            + "Falling back to {} instead.", dbDirectory,
+        HddsConfigKeys.OZONE_METADATA_DIRS);
+    return ServerUtils.getOzoneMetaDirPath(conf);
+  }
 }
index c4ce69b..a4757ee 100644 (file)
  */
 package org.apache.hadoop.hdds.scm.block;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import javax.management.ObjectName;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .INVALID_BLOCK_SIZE;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.INVALID_BLOCK_SIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 
 /** Block Manager manages the block access for SCM. */
 public class BlockManagerImpl implements EventHandler<Boolean>,
@@ -85,18 +80,14 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
    * Constructor.
    *
    * @param conf - configuration.
-   * @param nodeManager - node manager.
-   * @param pipelineManager - pipeline manager.
-   * @param containerManager - container manager.
-   * @param eventPublisher - event publisher.
+   * @param scm
    * @throws IOException
    */
-  public BlockManagerImpl(final Configuration conf,
-      final NodeManager nodeManager, final PipelineManager pipelineManager,
-      final ContainerManager containerManager, EventPublisher eventPublisher)
+  public BlockManagerImpl(final Configuration conf, StorageContainerManager scm)
       throws IOException {
-    this.pipelineManager = pipelineManager;
-    this.containerManager = containerManager;
+    Objects.requireNonNull(scm, "SCM cannot be null");
+    this.pipelineManager = scm.getPipelineManager();
+    this.containerManager = scm.getContainerManager();
 
     this.containerSize = (long)conf.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
@@ -106,7 +97,8 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
     mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
 
     // SCM block deleting transaction log and deleting service.
-    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+    deletedBlockLog = new DeletedBlockLogImpl(conf, scm.getContainerManager(),
+        scm.getScmMetadataStore());
     long svcInterval =
         conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
             OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
@@ -118,7 +110,8 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
             TimeUnit.MILLISECONDS);
     blockDeletingService =
         new SCMBlockDeletingService(deletedBlockLog, containerManager,
-            nodeManager, eventPublisher, svcInterval, serviceTimeout, conf);
+            scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
+            serviceTimeout, conf);
     chillModePrecheck = new ChillModePrecheck(conf);
   }
 
index 766d428..5ff34f5 100644 (file)
  */
 package org.apache.hadoop.hdds.scm.block;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
-    .DeleteBlockTransactionResult;
-import org.apache.hadoop.hdds.scm.command
-    .CommandStatusReportHandler.DeleteBlockStatus;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.eclipse.jetty.util.ConcurrentHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,17 +29,29 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.utils.db.TableIterator;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.lang.Math.min;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
 
 /**
  * A implement class of {@link DeletedBlockLog}, and it uses
@@ -88,40 +69,21 @@ public class DeletedBlockLogImpl
   public static final Logger LOG =
       LoggerFactory.getLogger(DeletedBlockLogImpl.class);
 
-  private static final byte[] LATEST_TXID =
-      DFSUtil.string2Bytes("#LATEST_TXID#");
-
   private final int maxRetry;
-  private final MetadataStore deletedStore;
   private final ContainerManager containerManager;
+  private final SCMMetadataStore scmMetadataStore;
   private final Lock lock;
-  // The latest id of deleted blocks in the db.
-  private long lastTxID;
   // Maps txId to set of DNs which are successful in committing the transaction
   private Map<Long, Set<UUID>> transactionToDNsCommitMap;
 
   public DeletedBlockLogImpl(Configuration conf,
-      ContainerManager containerManager) throws IOException {
+                             ContainerManager containerManager,
+                             SCMMetadataStore scmMetadataStore) {
     maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
         OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
-
-    final File metaDir = ServerUtils.getScmDbDir(conf);
-    final String scmMetaDataDir = metaDir.getPath();
-    final File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
-    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-    // Load store of all transactions.
-    deletedStore = MetadataStoreBuilder.newBuilder()
-        .setCreateIfMissing(true)
-        .setConf(conf)
-        .setDbFile(deletedLogDbPath)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
     this.containerManager = containerManager;
-
+    this.scmMetadataStore = scmMetadataStore;
     this.lock = new ReentrantLock();
-    // start from the head of deleted store.
-    lastTxID = findLatestTxIDInStore();
 
     // transactionToDNsCommitMap is updated only when
     // transaction is added to the log and when it is removed.
@@ -130,26 +92,6 @@ public class DeletedBlockLogImpl
     transactionToDNsCommitMap = new ConcurrentHashMap<>();
   }
 
-  @VisibleForTesting
-  public MetadataStore getDeletedStore() {
-    return deletedStore;
-  }
-
-  /**
-   * There is no need to lock before reading because
-   * it's only used in construct method.
-   *
-   * @return latest txid.
-   * @throws IOException
-   */
-  private long findLatestTxIDInStore() throws IOException {
-    long txid = 0;
-    byte[] value = deletedStore.get(LATEST_TXID);
-    if (value != null) {
-      txid = Longs.fromByteArray(value);
-    }
-    return txid;
-  }
 
   @Override
   public List<DeletedBlocksTransaction> getFailedTransactions()
@@ -157,16 +99,16 @@ public class DeletedBlockLogImpl
     lock.lock();
     try {
       final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
-      deletedStore.iterate(null, (key, value) -> {
-        if (!Arrays.equals(LATEST_TXID, key)) {
-          DeletedBlocksTransaction delTX =
-              DeletedBlocksTransaction.parseFrom(value);
+      try (TableIterator<Long,
+          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+        while (iter.hasNext()) {
+          DeletedBlocksTransaction delTX = iter.next().getValue();
           if (delTX.getCount() == -1) {
             failedTXs.add(delTX);
           }
         }
-        return true;
-      });
+      }
       return failedTXs;
     } finally {
       lock.unlock();
@@ -181,44 +123,44 @@ public class DeletedBlockLogImpl
    */
   @Override
   public void incrementCount(List<Long> txIDs) throws IOException {
-    BatchOperation batch = new BatchOperation();
-    lock.lock();
-    try {
-      for(Long txID : txIDs) {
-        try {
-          byte[] deleteBlockBytes =
-              deletedStore.get(Longs.toByteArray(txID));
-          if (deleteBlockBytes == null) {
-            LOG.warn("Delete txID {} not found", txID);
-            continue;
-          }
-          DeletedBlocksTransaction block = DeletedBlocksTransaction
-              .parseFrom(deleteBlockBytes);
-          DeletedBlocksTransaction.Builder builder = block.toBuilder();
-          int currentCount = block.getCount();
-          if (currentCount > -1) {
-            builder.setCount(++currentCount);
-          }
-          // if the retry time exceeds the maxRetry value
-          // then set the retry value to -1, stop retrying, admins can
-          // analyze those blocks and purge them manually by SCMCli.
-          if (currentCount > maxRetry) {
-            builder.setCount(-1);
-          }
-          deletedStore.put(Longs.toByteArray(txID),
-              builder.build().toByteArray());
-        } catch (IOException ex) {
-          LOG.warn("Cannot increase count for txID " + txID, ex);
+    for (Long txID : txIDs) {
+      lock.lock();
+      try {
+        DeletedBlocksTransaction block =
+            scmMetadataStore.getDeletedBlocksTXTable().get(txID);
+        if (block == null) {
+          // Should we make this an error ? How can we not find the deleted
+          // TXID?
+          LOG.warn("Deleted TXID not found.");
+          continue;
+        }
+        DeletedBlocksTransaction.Builder builder = block.toBuilder();
+        int currentCount = block.getCount();
+        if (currentCount > -1) {
+          builder.setCount(++currentCount);
+        }
+        // if the retry time exceeds the maxRetry value
+        // then set the retry value to -1, stop retrying, admins can
+        // analyze those blocks and purge them manually by SCMCli.
+        if (currentCount > maxRetry) {
+          builder.setCount(-1);
         }
+        scmMetadataStore.getDeletedBlocksTXTable().put(txID,
+            builder.build());
+      } catch (IOException ex) {
+        LOG.warn("Cannot increase count for txID " + txID, ex);
+        // We do not throw error here, since we don't want to abort the loop.
+        // Just log and continue processing the rest of txids.
+      } finally {
+        lock.unlock();
       }
-      deletedStore.writeBatch(batch);
-    } finally {
-      lock.unlock();
     }
   }
 
+
   private DeletedBlocksTransaction constructNewTransaction(long txID,
-      long containerID, List<Long> blocks) {
+                                                           long containerID,
+                                                           List<Long> blocks) {
     return DeletedBlocksTransaction.newBuilder()
         .setTxID(txID)
         .setContainerID(containerID)
@@ -231,7 +173,8 @@ public class DeletedBlockLogImpl
    * {@inheritDoc}
    *
    * @param transactionResults - transaction IDs.
-   * @param dnID - Id of Datanode which has acknowledged a delete block command.
+   * @param dnID               - Id of Datanode which has acknowledged
+   *                           a delete block command.
    * @throws IOException
    */
   @Override
@@ -259,8 +202,8 @@ public class DeletedBlockLogImpl
           }
 
           dnsWithCommittedTxn.add(dnID);
-          final ContainerInfo container = containerManager
-              .getContainer(containerId);
+          final ContainerInfo container =
+              containerManager.getContainer(containerId);
           final Set<ContainerReplica> replicas =
               containerManager.getContainerReplicas(containerId);
           // The delete entry can be safely removed from the log if all the
@@ -275,7 +218,7 @@ public class DeletedBlockLogImpl
             if (dnsWithCommittedTxn.containsAll(containerDns)) {
               transactionToDNsCommitMap.remove(txID);
               LOG.debug("Purging txId={} from block deletion log", txID);
-              deletedStore.delete(Longs.toByteArray(txID));
+              scmMetadataStore.getDeletedBlocksTXTable().delete(txID);
             }
           }
           LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
@@ -308,24 +251,18 @@ public class DeletedBlockLogImpl
    * {@inheritDoc}
    *
    * @param containerID - container ID.
-   * @param blocks - blocks that belong to the same container.
+   * @param blocks      - blocks that belong to the same container.
    * @throws IOException
    */
   @Override
   public void addTransaction(long containerID, List<Long> blocks)
       throws IOException {
-    BatchOperation batch = new BatchOperation();
     lock.lock();
     try {
-      DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
-          containerID, blocks);
-      byte[] key = Longs.toByteArray(lastTxID + 1);
-
-      batch.put(key, tx.toByteArray());
-      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
-
-      deletedStore.writeBatch(batch);
-      lastTxID += 1;
+      Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
+      DeletedBlocksTransaction tx =
+          constructNewTransaction(nextTXID, containerID, blocks);
+      scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
     } finally {
       lock.unlock();
     }
@@ -336,17 +273,16 @@ public class DeletedBlockLogImpl
     lock.lock();
     try {
       final AtomicInteger num = new AtomicInteger(0);
-      deletedStore.iterate(null, (key, value) -> {
-        // Exclude latest txid record
-        if (!Arrays.equals(LATEST_TXID, key)) {
-          DeletedBlocksTransaction delTX =
-              DeletedBlocksTransaction.parseFrom(value);
+      try (TableIterator<Long,
+          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+        while (iter.hasNext()) {
+          DeletedBlocksTransaction delTX = iter.next().getValue();
           if (delTX.getCount() > -1) {
             num.incrementAndGet();
           }
         }
-        return true;
-      });
+      }
       return num.get();
     } finally {
       lock.unlock();
@@ -360,24 +296,19 @@ public class DeletedBlockLogImpl
    * @throws IOException
    */
   @Override
-  public void addTransactions(
-      Map<Long, List<Long>> containerBlocksMap)
+  public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
       throws IOException {
-    BatchOperation batch = new BatchOperation();
     lock.lock();
     try {
-      long currentLatestID = lastTxID;
-      for (Map.Entry<Long, List<Long>> entry :
-          containerBlocksMap.entrySet()) {
-        currentLatestID += 1;
-        byte[] key = Longs.toByteArray(currentLatestID);
-        DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
+      BatchOperation batch = scmMetadataStore.getStore().initBatchOperation();
+      for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
+        long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
+        DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
             entry.getKey(), entry.getValue());
-        batch.put(key, tx.toByteArray());
+        scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
+            nextTXID, tx);
       }
-      lastTxID = currentLatestID;
-      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
-      deletedStore.writeBatch(batch);
+      scmMetadataStore.getStore().commitBatchOperation(batch);
     } finally {
       lock.unlock();
     }
@@ -385,9 +316,6 @@ public class DeletedBlockLogImpl
 
   @Override
   public void close() throws IOException {
-    if (deletedStore != null) {
-      deletedStore.close();
-    }
   }
 
   @Override
@@ -396,11 +324,12 @@ public class DeletedBlockLogImpl
     lock.lock();
     try {
       Map<Long, Long> deleteTransactionMap = new HashMap<>();
-      deletedStore.iterate(null, (key, value) -> {
-        if (!Arrays.equals(LATEST_TXID, key)) {
-          DeletedBlocksTransaction block = DeletedBlocksTransaction
-              .parseFrom(value);
-
+      try (TableIterator<Long,
+          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+        while (iter.hasNext()) {
+          Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = iter.next();
+          DeletedBlocksTransaction block = keyValue.getValue();
           if (block.getCount() > -1 && block.getCount() <= maxRetry) {
             if (transactions.addTransaction(block,
                 transactionToDNsCommitMap.get(block.getTxID()))) {
@@ -409,10 +338,8 @@ public class DeletedBlockLogImpl
                   .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
             }
           }
-          return !transactions.isFull();
         }
-        return true;
-      });
+      }
       return deleteTransactionMap;
     } finally {
       lock.unlock();
@@ -421,7 +348,7 @@ public class DeletedBlockLogImpl
 
   @Override
   public void onMessage(DeleteBlockStatus deleteBlockStatus,
-      EventPublisher publisher) {
+                        EventPublisher publisher) {
     ContainerBlocksDeletionACKProto ackProto =
         deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
     commitTransactions(ackProto.getResultsList(),
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/DeletedBlocksTransactionCodec.java
new file mode 100644 (file)
index 0000000..5deb3aa
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.utils.db.Codec;
+
+/**
+ * Codec for Persisting the DeletedBlocks.
+ */
+public class DeletedBlocksTransactionCodec
+    implements Codec<DeletedBlocksTransaction> {
+  @Override
+  public byte[] toPersistedFormat(DeletedBlocksTransaction object) {
+    return object.toByteArray();
+  }
+
+  @Override
+  public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData) {
+    try {
+      return DeletedBlocksTransaction.parseFrom(rawData);
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          "Can't convert rawBytes to DeletedBlocksTransaction.", e);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/LongCodec.java
new file mode 100644 (file)
index 0000000..2d495ab
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.utils.db.Codec;
+
+/**
+ * Codec for Persisting the DeletedBlocks.
+ */
+public class LongCodec implements Codec<Long> {
+
+  @Override
+  public byte[] toPersistedFormat(Long object) {
+    return Longs.toByteArray(object);
+  }
+
+  @Override
+  public Long fromPersistedFormat(byte[] rawData) {
+    return Longs.fromByteArray(rawData);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
new file mode 100644 (file)
index 0000000..8009e66
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+/**
+ * Generic interface for data stores for SCM.
+ * This is similar to the OMMetadataStore class,
+ * where we write classes into some underlying storage system.
+ */
+public interface SCMMetadataStore {
+  /**
+   * Start metadata manager.
+   *
+   * @param configuration - Configuration
+   * @throws IOException - Unable to start metadata store.
+   */
+  void start(OzoneConfiguration configuration) throws IOException;
+
+  /**
+   * Stop metadata manager.
+   */
+  void stop() throws Exception;
+
+  /**
+   * Get metadata store.
+   *
+   * @return metadata store.
+   */
+  @VisibleForTesting
+  DBStore getStore();
+
+  /**
+   *  A Table that keeps the deleted blocks lists and transactions.
+    * @return Table
+   */
+  Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable();
+
+  /**
+   * Returns the current TXID for the deleted blocks.
+   * @return Long
+   */
+  Long getCurrentTXID();
+
+  /**
+   * Returns the next TXID for the Deleted Blocks.
+   * @return Long.
+  */
+  Long getNextDeleteBlockTXID();
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
new file mode 100644 (file)
index 0000000..49a643d
--- /dev/null
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import java.io.IOException;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.hadoop.utils.db.DBStoreBuilder;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.utils.db.TableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_DB_NAME;
+
+/**
+ * A RocksDB based implementation of SCM Metadata Store.
+ * <p>
+ * <p>
+ * +---------------+------------+-------------------------+
+ * | Column Family |    Key     |          Value          |
+ * +---------------+------------+-------------------------+
+ * | DeletedBlocks | TXID(Long) | DeletedBlockTransaction |
+ * +---------------+------------+-------------------------+
+ */
+public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
+
+  private static final String DELETED_BLOCKS_TABLE = "deletedBlocks";
+  private Table deletedBlocksTable;
+
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMMetadataStoreRDBImpl.class);
+  private DBStore store;
+  private final OzoneConfiguration configuration;
+  private final AtomicLong txID;
+
+  /**
+   * Constructs the metadata store and starts the DB Services.
+   *
+   * @param config - Ozone Configuration.
+   * @throws IOException - on Failure.
+   */
+  public SCMMetadataStoreRDBImpl(OzoneConfiguration config) throws IOException {
+    this.configuration = config;
+    start(this.configuration);
+    this.txID = new AtomicLong(this.getLargestRecordedTXID());
+  }
+
+  @Override
+  public void start(OzoneConfiguration config)
+      throws IOException {
+    if (this.store == null) {
+      File metaDir = ServerUtils.getScmDbDir(configuration);
+
+      this.store = DBStoreBuilder.newBuilder(configuration)
+          .setName(SCM_DB_NAME)
+          .setPath(Paths.get(metaDir.getPath()))
+          .addTable(DELETED_BLOCKS_TABLE)
+          .addCodec(DeletedBlocksTransaction.class,
+              new DeletedBlocksTransactionCodec())
+          .addCodec(Long.class, new LongCodec())
+          .build();
+      deletedBlocksTable = this.store.getTable(DELETED_BLOCKS_TABLE,
+          Long.class, DeletedBlocksTransaction.class);
+      checkTableStatus(deletedBlocksTable, DELETED_BLOCKS_TABLE);
+    }
+  }
+
+  @Override
+  public void stop() throws Exception {
+    if (store != null) {
+      store.close();
+      store = null;
+    }
+  }
+
+  @Override
+  public org.apache.hadoop.utils.db.DBStore getStore() {
+    return this.store;
+  }
+
+  @Override
+  public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
+    return deletedBlocksTable;
+  }
+
+  @Override
+  public Long getNextDeleteBlockTXID() {
+    return this.txID.incrementAndGet();
+  }
+
+  @Override
+  public Long getCurrentTXID() {
+    return this.txID.get();
+  }
+
+  /**
+   * Returns the largest recorded TXID from the DB.
+   *
+   * @return Long
+   * @throws IOException
+   */
+  private Long getLargestRecordedTXID() throws IOException {
+    try (TableIterator<Long, DeletedBlocksTransaction> txIter =
+             deletedBlocksTable.iterator()) {
+      txIter.seekToLast();
+      Long txid = txIter.key();
+      if (txid != null) {
+        return txid;
+      }
+    }
+    return 0L;
+  }
+
+
+  private void checkTableStatus(Table table, String name) throws IOException {
+    String logMessage = "Unable to get a reference to %s table. Cannot " +
+        "continue.";
+    String errMsg = "Inconsistent DB state, Table - %s. Please check the logs" +
+        "for more info.";
+    if (table == null) {
+      LOG.error(String.format(logMessage, name));
+      throw new IOException(String.format(errMsg, name));
+    }
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/package-info.java
new file mode 100644 (file)
index 0000000..23e8aaa
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Metadata layer for SCM.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
index 3c5eaf8..16a10ac 100644 (file)
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -200,8 +199,8 @@ public class SCMNodeManager implements NodeManager {
     return VersionResponse.newBuilder()
         .setVersion(this.version.getVersion())
         .addValue(OzoneConsts.SCM_ID,
-            this.scmManager.getScmStorage().getScmId())
-        .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage()
+            this.scmManager.getScmStorageConfig().getScmId())
+        .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorageConfig()
             .getClusterID())
         .build();
   }
index 8ba8b42..80056b5 100644 (file)
@@ -248,8 +248,8 @@ public class SCMBlockProtocolServer implements
     try{
       ScmInfo.Builder builder =
           new ScmInfo.Builder()
-              .setClusterId(scm.getScmStorage().getClusterID())
-              .setScmId(scm.getScmStorage().getScmId());
+              .setClusterId(scm.getScmStorageConfig().getClusterID())
+              .setScmId(scm.getScmStorageConfig().getScmId());
       return builder.build();
     } catch (Exception ex) {
       auditSuccess = false;
index 998512c..0cb22ad 100644 (file)
@@ -422,8 +422,8 @@ public class SCMClientProtocolServer implements
     try{
       ScmInfo.Builder builder =
           new ScmInfo.Builder()
-              .setClusterId(scm.getScmStorage().getClusterID())
-              .setScmId(scm.getScmStorage().getScmId());
+              .setClusterId(scm.getScmStorageConfig().getClusterID())
+              .setScmId(scm.getScmStorageConfig().getScmId());
       return builder.build();
     } catch (Exception ex) {
       auditSuccess = false;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
new file mode 100644 (file)
index 0000000..bca9d57
--- /dev/null
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.server;
+
+
+import org.apache.hadoop.hdds.scm.block.BlockManager;
+import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+
+/**
+ * This class acts as an SCM builder Class. This class is important for us
+ * from a resilience perspective of SCM. This class will allow us swap out
+ * different managers and replace with out on manager in the testing phase.
+ * <p>
+ * At some point in the future, we will make all these managers dynamically
+ * loadable, so other developers can extend SCM by replacing various managers.
+ * <p>
+ * TODO: Add different config keys, so that we can load different managers at
+ * run time. This will make it easy to extend SCM without having to replace
+ * whole SCM each time.
+ * <p>
+ * Different Managers supported by this builder are:
+ * NodeManager scmNodeManager;
+ * PipelineManager pipelineManager;
+ * ContainerManager containerManager;
+ * BlockManager scmBlockManager;
+ * ReplicationManager replicationManager;
+ * SCMChillModeManager scmChillModeManager;
+ * CertificateServer certificateServer;
+ * SCMMetadata scmMetadataStore.
+ *
+ * If any of these are *not* specified then the default version of these
+ * managers are used by SCM.
+ *
+ */
+public final class SCMConfigurator {
+  private NodeManager scmNodeManager;
+  private PipelineManager pipelineManager;
+  private ContainerManager containerManager;
+  private BlockManager scmBlockManager;
+  private ReplicationManager replicationManager;
+  private SCMChillModeManager scmChillModeManager;
+  private CertificateServer certificateServer;
+  private SCMMetadataStore metadataStore;
+
+  /**
+   * Allows user to specify a version of Node manager to use with this SCM.
+   * @param scmNodeManager - Node Manager.
+   */
+  public void setScmNodeManager(NodeManager scmNodeManager) {
+    this.scmNodeManager = scmNodeManager;
+  }
+
+  /**
+   * Allows user to specify a custom version of PipelineManager to use with
+   * this SCM.
+   * @param pipelineManager - Pipeline Manager.
+   */
+  public void setPipelineManager(PipelineManager pipelineManager) {
+    this.pipelineManager = pipelineManager;
+  }
+
+  /**
+   *  Allows user to specify a custom version of containerManager to use with
+   *  this SCM.
+   * @param containerManager - Container Manager.
+   */
+  public void setContainerManager(ContainerManager containerManager) {
+    this.containerManager = containerManager;
+  }
+
+  /**
+   *  Allows user to specify a custom version of Block Manager to use with
+   *  this SCM.
+   * @param scmBlockManager - Block Manager
+   */
+  public void setScmBlockManager(BlockManager scmBlockManager) {
+    this.scmBlockManager = scmBlockManager;
+  }
+
+  /**
+   * Allows user to specify a custom version of Replication Manager to use
+   * with this SCM.
+   * @param replicationManager - replication Manager.
+   */
+  public void setReplicationManager(ReplicationManager replicationManager) {
+    this.replicationManager = replicationManager;
+  }
+
+  /**
+   * Allows user to specify a custom version of Chill Mode Manager to use
+   * with this SCM.
+   * @param scmChillModeManager - ChillMode Manager.
+   */
+  public void setScmChillModeManager(SCMChillModeManager scmChillModeManager) {
+    this.scmChillModeManager = scmChillModeManager;
+  }
+
+  /**
+   * Allows user to specify a custom version of Certificate Server to use
+   * with this SCM.
+   * @param certificateAuthority - Certificate server.
+   */
+  public void setCertificateServer(CertificateServer certificateAuthority) {
+    this.certificateServer = certificateAuthority;
+  }
+
+  /**
+   * Allows user to specify a custom version of Metadata Store to  be used
+   * with this SCM.
+   * @param scmMetadataStore - scm metadata store.
+   */
+  public void setMetadataStore(SCMMetadataStore scmMetadataStore) {
+    this.metadataStore = scmMetadataStore;
+  }
+
+  /**
+   * Gets SCM Node Manager.
+   * @return Node Manager.
+   */
+  public NodeManager getScmNodeManager() {
+    return scmNodeManager;
+  }
+
+  /**
+   * Get Pipeline Manager.
+   * @return pipeline manager.
+   */
+  public PipelineManager getPipelineManager() {
+    return pipelineManager;
+  }
+
+  /**
+   * Get Container Manager.
+   * @return container Manger.
+   */
+  public ContainerManager getContainerManager() {
+    return containerManager;
+  }
+
+  /**
+   * Get SCM Block Manager.
+   * @return Block Manager.
+   */
+  public BlockManager getScmBlockManager() {
+    return scmBlockManager;
+  }
+
+  /**
+   * Get Replica Manager.
+   * @return Replica Manager.
+   */
+  public ReplicationManager getReplicationManager() {
+    return replicationManager;
+  }
+
+  /**
+   * Gets Chill Mode Manager.
+   * @return Chill Mode manager.
+   */
+  public SCMChillModeManager getScmChillModeManager() {
+    return scmChillModeManager;
+  }
+
+  /**
+   * Get Certificate Manager.
+   * @return Certificate Manager.
+   */
+  public CertificateServer getCertificateServer() {
+    return certificateServer;
+  }
+
+  /**
+   * Get Metadata Store.
+   * @return SCMMetadataStore.
+   */
+  public SCMMetadataStore getMetadataStore() {
+    return metadataStore;
+  }
+}
@@ -30,16 +30,16 @@ import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
 
 /**
- * SCMStorage is responsible for management of the StorageDirectories used by
- * the SCM.
+ * SCMStorageConfig is responsible for management of the
+ * StorageDirectories used by the SCM.
  */
-public class SCMStorage extends Storage {
+public class SCMStorageConfig extends Storage {
 
   /**
-   * Construct SCMStorage.
+   * Construct SCMStorageConfig.
    * @throws IOException if any directories are inaccessible.
    */
-  public SCMStorage(OzoneConfiguration conf) throws IOException {
+  public SCMStorageConfig(OzoneConfiguration conf) throws IOException {
     super(NodeType.SCM, ServerUtils.getScmDbDir(conf), STORAGE_DIR);
   }
 
@@ -70,4 +70,4 @@ public class SCMStorage extends Storage {
     return scmProperties;
   }
 
-}
\ No newline at end of file
+}
index bc81c84..3221128 100644 (file)
@@ -27,6 +27,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.protobuf.BlockingService;
+import java.util.Objects;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
@@ -58,6 +59,8 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
 import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
 import org.apache.hadoop.hdds.scm.node.NewNodeHandler;
 import org.apache.hadoop.hdds.scm.node.NonHealthyToHealthyNodeHandler;
@@ -125,8 +128,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
  * and returns a pipeline.
  *
  * <p>A client once it gets a pipeline (a list of datanodes) will connect to
- * the datanodes and
- * create a container, which then can be used to store data.
+ * the datanodes and create a container, which then can be used to store data.
  */
 @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
 public final class StorageContainerManager extends ServiceRuntimeInfoImpl
@@ -158,16 +160,18 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private final SCMDatanodeProtocolServer datanodeProtocolServer;
   private final SCMBlockProtocolServer blockProtocolServer;
   private final SCMClientProtocolServer clientProtocolServer;
-  private final SCMSecurityProtocolServer securityProtocolServer;
+  private  SCMSecurityProtocolServer securityProtocolServer;
 
   /*
    * State Managers of SCM.
    */
-  private final NodeManager scmNodeManager;
-  private final PipelineManager pipelineManager;
-  private final ContainerManager containerManager;
-  private final BlockManager scmBlockManager;
-  private final SCMStorage scmStorage;
+  private NodeManager scmNodeManager;
+  private PipelineManager pipelineManager;
+  private ContainerManager containerManager;
+  private BlockManager scmBlockManager;
+  private final SCMStorageConfig scmStorageConfig;
+
+  private SCMMetadataStore scmMetadataStore;
 
   private final EventQueue eventQueue;
   /*
@@ -188,13 +192,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    */
   private Cache<String, ContainerStat> containerReportCache;
 
-  private final ReplicationManager replicationManager;
+  private ReplicationManager replicationManager;
 
   private final LeaseManager<Long> commandWatcherLeaseManager;
 
   private final ReplicationActivityStatus replicationStatus;
-  private final SCMChillModeManager scmChillModeManager;
-  private final CertificateServer certificateServer;
+  private SCMChillModeManager scmChillModeManager;
+  private CertificateServer certificateServer;
 
   private JvmPauseMonitor jvmPauseMonitor;
   private final OzoneConfiguration configuration;
@@ -206,29 +210,54 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    *
    * @param conf configuration
    */
-  private StorageContainerManager(OzoneConfiguration conf)
+  public StorageContainerManager(OzoneConfiguration conf)
       throws IOException, AuthenticationException {
+    // default empty configurator means default managers will be used.
+    this(conf, new SCMConfigurator());
+  }
+
+
+  /**
+   * This constructor offers finer control over how SCM comes up.
+   * To use this, user needs to create a SCMConfigurator and set various
+   * managers that user wants SCM to use, if a value is missing then SCM will
+   * use the default value for that manager.
+   *
+   * @param conf - Configuration
+   * @param configurator - configurator
+   */
+  public StorageContainerManager(OzoneConfiguration conf,
+                                 SCMConfigurator configurator)
+      throws IOException, AuthenticationException  {
     super(HddsVersionInfo.HDDS_VERSION_INFO);
 
+    Objects.requireNonNull(configurator, "configurator cannot not be null");
+    Objects.requireNonNull(conf, "configuration cannot not be null");
+
     configuration = conf;
     StorageContainerManager.initMetrics();
     initContainerReportCache(conf);
-    scmStorage = new SCMStorage(conf);
-    if (scmStorage.getState() != StorageState.INITIALIZED) {
-      throw new SCMException("SCM not initialized.", ResultCodes
-          .SCM_NOT_INITIALIZED);
+    /**
+     * It is assumed the scm --init command creates the SCM Storage Config.
+     */
+    scmStorageConfig = new SCMStorageConfig(conf);
+    if (scmStorageConfig.getState() != StorageState.INITIALIZED) {
+      LOG.error("Please make sure you have run \'ozone scm --init\' " +
+          "command to generate all the required metadata.");
+      throw new SCMException("SCM not initialized due to storage config " +
+          "failure.", ResultCodes.SCM_NOT_INITIALIZED);
     }
 
+    /**
+     * Important : This initialization sequence is assumed by some of our tests.
+     * The testSecureOzoneCluster assumes that security checks have to be
+     * passed before any artifacts like SCM DB is created. So please don't
+     * add any other initialization above the Security checks please.
+     */
+
     // Authenticate SCM if security is enabled
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-      loginAsSCMUser(conf);
-      certificateServer = initializeCertificateServer(
-          getScmStorage().getClusterID(), getScmStorage().getScmId());
-      // TODO: Support Intermediary CAs in future.
-      certificateServer.init(new SecurityConfig(conf),
-          CertificateServer.CAType.SELF_SIGNED_CA);
-      securityProtocolServer = new SCMSecurityProtocolServer(conf,
-          certificateServer);
+      initializeCAnSecurityProtocol(conf, configurator);
     } else {
       // if no Security, we do not create a Certificate Server at all.
       // This allows user to boot SCM without security temporarily
@@ -237,16 +266,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       securityProtocolServer = null;
     }
 
-    eventQueue = new EventQueue();
-
-    scmNodeManager = new SCMNodeManager(
-        conf, scmStorage.getClusterID(), this, eventQueue);
-    pipelineManager = new SCMPipelineManager(conf, scmNodeManager, eventQueue);
-    containerManager = new SCMContainerManager(
-        conf, scmNodeManager, pipelineManager, eventQueue);
-    scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
-        pipelineManager, containerManager, eventQueue);
+    // Creates the SCM DBs or opens them if it exists.
+    initalizeMetadataStore(conf, configurator);
 
+    eventQueue = new EventQueue();
+    long watcherTimeout =
+        conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
+            HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+    commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
+        watcherTimeout);
+    initalizeSystemManagers(conf, configurator);
     replicationStatus = new ReplicationActivityStatus();
 
     CloseContainerEventHandler closeContainerHandler =
@@ -280,12 +309,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     PipelineActionHandler pipelineActionHandler =
         new PipelineActionHandler(pipelineManager, conf);
 
-    long watcherTimeout =
-        conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
-            HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
-
-    commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
-        watcherTimeout);
 
     RetriableDatanodeEventWatcher retriableDatanodeEventWatcher =
         new RetriableDatanodeEventWatcher<>(
@@ -294,13 +317,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
             commandWatcherLeaseManager);
     retriableDatanodeEventWatcher.start(eventQueue);
 
-    //TODO: support configurable containerPlacement policy
-    ContainerPlacementPolicy containerPlacementPolicy =
-        new SCMContainerPlacementCapacity(scmNodeManager, conf);
-
-    replicationManager = new ReplicationManager(containerPlacementPolicy,
-        containerManager, eventQueue, commandWatcherLeaseManager);
-
     scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
         .OZONE_ADMINISTRATORS);
     scmUsername = UserGroupInformation.getCurrentUser().getUserName();
@@ -342,13 +358,120 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         replicationStatus.getChillModeStatusListener());
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
         (BlockManagerImpl) scmBlockManager);
-    scmChillModeManager = new SCMChillModeManager(conf,
-        containerManager.getContainers(), pipelineManager, eventQueue);
-
     eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
         scmChillModeManager);
     registerMXBean();
+  }
+
+  /**
+   * This function initializes the following managers. If the configurator
+   * specifies a value, we will use it, else we will use the default value.
+   *
+   *  Node Manager
+   *  Pipeline Manager
+   *  Container Manager
+   *  Block Manager
+   *  Replication Manager
+   *  Chill Mode Manager
+   *
+   * @param conf - Ozone Configuration.
+   * @param configurator - A customizer which allows different managers to be
+   *                    used if needed.
+   * @throws IOException - on Failure.
+   */
+  private void initalizeSystemManagers(OzoneConfiguration conf,
+                                       SCMConfigurator configurator)
+      throws IOException {
+    if(configurator.getScmNodeManager() != null) {
+      scmNodeManager = configurator.getScmNodeManager();
+    } else {
+      scmNodeManager = new SCMNodeManager(
+          conf, scmStorageConfig.getClusterID(), this, eventQueue);
+    }
+
+    //TODO: support configurable containerPlacement policy
+    ContainerPlacementPolicy containerPlacementPolicy =
+        new SCMContainerPlacementCapacity(scmNodeManager, conf);
+
+    if (configurator.getPipelineManager() != null) {
+      pipelineManager = configurator.getPipelineManager();
+    } else {
+      pipelineManager =
+          new SCMPipelineManager(conf, scmNodeManager, eventQueue);
+    }
+
+    if(configurator.getContainerManager() != null) {
+      containerManager = configurator.getContainerManager();
+    } else {
+      containerManager = new SCMContainerManager(
+          conf, scmNodeManager, pipelineManager, eventQueue);
+    }
+
+    if(configurator.getScmBlockManager() != null) {
+      scmBlockManager = configurator.getScmBlockManager();
+    } else {
+      scmBlockManager = new BlockManagerImpl(conf, this);
+    }
+    if (configurator.getReplicationManager() != null) {
+      replicationManager = configurator.getReplicationManager();
+    }  else {
+      replicationManager = new ReplicationManager(containerPlacementPolicy,
+          containerManager, eventQueue, commandWatcherLeaseManager);
+    }
+    if(configurator.getScmChillModeManager() != null) {
+      scmChillModeManager = configurator.getScmChillModeManager();
+    } else {
+      scmChillModeManager = new SCMChillModeManager(conf,
+          containerManager.getContainers(), pipelineManager, eventQueue);
+    }
+  }
+
+  /**
+   * If security is enabled we need to have the Security Protocol and a
+   * default CA. This function initializes those values based on the
+   * configurator.
+   *
+   * @param conf - Config
+   * @param configurator - configurator
+   * @throws IOException - on Failure
+   * @throws AuthenticationException - on Failure
+   */
+  private void initializeCAnSecurityProtocol(OzoneConfiguration conf,
+                                             SCMConfigurator configurator)
+      throws IOException, AuthenticationException {
+    loginAsSCMUser(conf);
+    if(configurator.getCertificateServer() != null) {
+      this.certificateServer = configurator.getCertificateServer();
+    } else {
+      certificateServer = initializeCertificateServer(
+          getScmStorageConfig().getClusterID(),
+          getScmStorageConfig().getScmId());
+    }
+    // TODO: Support Intermediary CAs in future.
+    certificateServer.init(new SecurityConfig(conf),
+        CertificateServer.CAType.SELF_SIGNED_CA);
+    securityProtocolServer = new SCMSecurityProtocolServer(conf,
+        certificateServer);
+  }
 
+  /**
+   * Init the metadata store based on the configurator.
+   * @param conf - Config
+   * @param configurator - configurator
+   * @throws IOException - on Failure
+   */
+  private void initalizeMetadataStore(OzoneConfiguration conf,
+                                      SCMConfigurator configurator)
+      throws IOException {
+    if(configurator.getMetadataStore() != null) {
+      scmMetadataStore = configurator.getMetadataStore();
+    } else {
+      scmMetadataStore = new SCMMetadataStoreRDBImpl(conf);
+      if (scmMetadataStore == null) {
+        throw new SCMException("Unable to initialize metadata store",
+            ResultCodes.SCM_NOT_INITIALIZED);
+      }
+    }
   }
 
   /**
@@ -393,7 +516,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     // So it is easy to use different Certificate Servers if needed.
     String subject = "scm@" + InetAddress.getLocalHost().getHostName();
     return new DefaultCAServer(subject, clusterID, scmID);
-
   }
 
   /**
@@ -562,21 +684,21 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * @throws IOException if init fails due to I/O error
    */
   public static boolean scmInit(OzoneConfiguration conf) throws IOException {
-    SCMStorage scmStorage = new SCMStorage(conf);
-    StorageState state = scmStorage.getState();
+    SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
+    StorageState state = scmStorageConfig.getState();
     if (state != StorageState.INITIALIZED) {
       try {
         String clusterId = StartupOption.INIT.getClusterId();
         if (clusterId != null && !clusterId.isEmpty()) {
-          scmStorage.setClusterId(clusterId);
+          scmStorageConfig.setClusterId(clusterId);
         }
-        scmStorage.initialize();
+        scmStorageConfig.initialize();
         System.out.println(
             "SCM initialization succeeded."
                 + "Current cluster id for sd="
-                + scmStorage.getStorageDir()
+                + scmStorageConfig.getStorageDir()
                 + ";cid="
-                + scmStorage.getClusterID());
+                + scmStorageConfig.getClusterID());
         return true;
       } catch (IOException ioe) {
         LOG.error("Could not initialize SCM version file", ioe);
@@ -586,9 +708,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       System.out.println(
           "SCM already initialized. Reusing existing"
               + " cluster id for sd="
-              + scmStorage.getStorageDir()
+              + scmStorageConfig.getStorageDir()
               + ";cid="
-              + scmStorage.getClusterID());
+              + scmStorageConfig.getClusterID());
       return true;
     }
   }
@@ -649,8 +771,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     return metrics == null ? SCMMetrics.create() : metrics;
   }
 
-  public SCMStorage getScmStorage() {
-    return scmStorage;
+  public SCMStorageConfig getScmStorageConfig() {
+    return scmStorageConfig;
   }
 
   public SCMDatanodeProtocolServer getDatanodeProtocolServer() {
@@ -878,6 +1000,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     }
     IOUtils.cleanupWithLogger(LOG, containerManager);
     IOUtils.cleanupWithLogger(LOG, pipelineManager);
+
+    try {
+      scmMetadataStore.stop();
+    } catch (Exception ex) {
+      LOG.error("SCM Metadata store stop failed", ex);
+    }
   }
 
   /**
@@ -1049,6 +1177,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     return nodeStateCount;
   }
 
+
+  /**
+   * Returns the SCM metadata Store.
+   * @return SCMMetadataStore
+   */
+  public SCMMetadataStore getScmMetadataStore() {
+    return scmMetadataStore;
+  }
   /**
    * Startup options.
    */
index 35003c7..d010e2d 100644 (file)
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
     .NodeRegistrationContainerReport;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -78,7 +78,7 @@ public final class HddsTestUtils {
   public static StorageContainerManager getScm(OzoneConfiguration conf)
       throws IOException, AuthenticationException {
     conf.setBoolean(OZONE_ENABLED, true);
-    SCMStorage scmStore = new SCMStorage(conf);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
       String clusterId = UUID.randomUUID().toString();
       String scmId = UUID.randomUUID().toString();
index 0f9a5e4..b5d9e4b 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
@@ -48,8 +49,12 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageTypeProto;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,6 +64,8 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+
 /**
  * Stateless helper functions to handler scm/datanode connection.
  */
@@ -461,4 +468,20 @@ public final class TestUtils {
         id, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
 
   }
+
+  public static StorageContainerManager getScm(OzoneConfiguration conf)
+      throws IOException, AuthenticationException {
+    conf.setBoolean(OZONE_ENABLED, true);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+    if(scmStore.getState() != Storage.StorageState.INITIALIZED) {
+      String clusterId = UUID.randomUUID().toString();
+      String scmId = UUID.randomUUID().toString();
+      scmStore.setClusterId(clusterId);
+      scmStore.setScmId(scmId);
+      // writes the version file properties
+      scmStore.initialize();
+    }
+    return StorageContainerManager.createSCM(null, conf);
+  }
+
 }
index 7be1b17..f7e171c 100644 (file)
 
 package org.apache.hadoop.hdds.scm.block;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
+import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
-import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -49,11 +50,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.concurrent.TimeoutException;
+import org.junit.rules.TemporaryFolder;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConsts.GB;
@@ -64,6 +61,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
  * Tests for SCM Block Manager.
  */
 public class TestBlockManager implements EventHandler<Boolean> {
+  private StorageContainerManager scm;
   private SCMContainerManager mapping;
   private MockNodeManager nodeManager;
   private PipelineManager pipelineManager;
@@ -75,11 +73,14 @@ public class TestBlockManager implements EventHandler<Boolean> {
   private static String containerOwner = "OZONE";
   private static EventQueue eventQueue;
   private int numContainerPerOwnerInPipeline;
-  private Configuration conf;
+  private OzoneConfiguration conf;
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
+  @Rule
+  public TemporaryFolder folder= new TemporaryFolder();
+
   @Before
   public void setUp() throws Exception {
     conf = SCMTestUtils.getConf();
@@ -87,24 +88,23 @@ public class TestBlockManager implements EventHandler<Boolean> {
         ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
 
-    String path = GenericTestUtils
-        .getTempPath(TestBlockManager.class.getSimpleName());
-    testDir = Paths.get(path).toFile();
-    testDir.delete();
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, path);
-    eventQueue = new EventQueue();
-    boolean folderExisted = testDir.exists() || testDir.mkdirs();
-    if (!folderExisted) {
-      throw new IOException("Unable to create test directory path");
-    }
+
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().toString());
+
+    // Override the default Node Manager in SCM with this Mock Node Manager.
     nodeManager = new MockNodeManager(true, 10);
-    pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
-    mapping = new SCMContainerManager(conf, nodeManager, pipelineManager,
-        eventQueue);
-    blockManager = new BlockManagerImpl(conf,
-        nodeManager, pipelineManager, mapping, eventQueue);
-    eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
+    SCMConfigurator configurator = new SCMConfigurator();
+    configurator.setScmNodeManager(nodeManager);
+    scm = getScm(conf, configurator);
+
+    // Initialize these fields so that the tests can pass.
+    mapping = (SCMContainerManager) scm.getContainerManager();
+    pipelineManager = scm.getPipelineManager();
+    blockManager = (BlockManagerImpl) scm.getScmBlockManager();
+
+    eventQueue = new EventQueue();
+    eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
+        (BlockManagerImpl) scm.getScmBlockManager());
     eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
     CloseContainerEventHandler closeContainerHandler =
         new CloseContainerEventHandler(pipelineManager, mapping);
@@ -121,16 +121,14 @@ public class TestBlockManager implements EventHandler<Boolean> {
 
   @After
   public void cleanup() throws IOException {
-    blockManager.close();
-    pipelineManager.close();
-    mapping.close();
-    FileUtil.fullyDelete(testDir);
+    scm.stop();
   }
 
-  private static StorageContainerManager getScm(OzoneConfiguration conf)
+  private static StorageContainerManager getScm(OzoneConfiguration conf,
+                                                SCMConfigurator configurator)
       throws IOException, AuthenticationException {
     conf.setBoolean(OZONE_ENABLED, true);
-    SCMStorage scmStore = new SCMStorage(conf);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     if(scmStore.getState() != StorageState.INITIALIZED) {
       String clusterId = UUID.randomUUID().toString();
       String scmId = UUID.randomUUID().toString();
@@ -139,7 +137,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
       // writes the version file properties
       scmStore.initialize();
     }
-    return StorageContainerManager.createSCM(null, conf);
+    return new StorageContainerManager(conf, configurator);
   }
 
   @Test
index 48949be..95c0cd2 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -42,7 +44,8 @@ import org.apache.hadoop.hdds.protocol.proto
     .DeleteBlockTransactionResult;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.utils.db.TableIterator;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -62,10 +65,12 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.when;
 
@@ -78,6 +83,7 @@ public class TestDeletedBlockLog {
   private OzoneConfiguration conf;
   private File testDir;
   private ContainerManager containerManager;
+  private StorageContainerManager scm;
   private List<DatanodeDetails> dnList;
 
   @Before
@@ -85,10 +91,13 @@ public class TestDeletedBlockLog {
     testDir = GenericTestUtils.getTestDir(
         TestDeletedBlockLog.class.getSimpleName());
     conf = new OzoneConfiguration();
+    conf.set(OZONE_ENABLED, "true");
     conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    scm = TestUtils.getScm(conf);
     containerManager = Mockito.mock(SCMContainerManager.class);
-    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
+        scm.getScmMetadataStore());
     dnList = new ArrayList<>(3);
     setupContainerManager();
   }
@@ -126,6 +135,8 @@ public class TestDeletedBlockLog {
   @After
   public void tearDown() throws Exception {
     deletedBlockLog.close();
+    scm.stop();
+    scm.join();
     FileUtils.deleteDirectory(testDir);
   }
 
@@ -263,7 +274,6 @@ public class TestDeletedBlockLog {
     MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid =
         (preKey, currentKey, nextKey) ->
             !Arrays.equals(latestTxid, currentKey);
-    MetadataStore store = deletedBlockLog.getDeletedStore();
     // Randomly add/get/commit/increase transactions.
     for (int i = 0; i < 100; i++) {
       int state = random.nextInt(4);
@@ -286,9 +296,13 @@ public class TestDeletedBlockLog {
         blocks = new ArrayList<>();
       } else {
         // verify the number of added and committed.
-        List<Map.Entry<byte[], byte[]>> result =
-            store.getRangeKVs(null, added, avoidLatestTxid);
-        Assert.assertEquals(added, result.size() + committed);
+        try (TableIterator<Long,
+            ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+            scm.getScmMetadataStore().getDeletedBlocksTXTable().iterator()) {
+          AtomicInteger count = new AtomicInteger();
+          iter.forEachRemaining((keyValue) -> count.incrementAndGet());
+          Assert.assertEquals(added, count.get() + committed);
+        }
       }
     }
     blocks = getTransactions(1000);
@@ -303,7 +317,8 @@ public class TestDeletedBlockLog {
     // close db and reopen it again to make sure
     // transactions are stored persistently.
     deletedBlockLog.close();
-    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager);
+    deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
+        scm.getScmMetadataStore());
     List<DeletedBlocksTransaction> blocks =
         getTransactions(10);
     commitTransactions(blocks);
index b0da150..ef0e88c 100644 (file)
@@ -23,13 +23,11 @@ import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Collection;
 import java.util.Optional;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -117,33 +115,11 @@ public final class OmUtils {
    * Get the location where OM should store its metadata directories.
    * Fall back to OZONE_METADATA_DIRS if not defined.
    *
-   * @param conf
-   * @return
+   * @param conf - Config
+   * @return File path, after creating all the required Directories.
    */
   public static File getOmDbDir(Configuration conf) {
-    final Collection<String> dbDirs = conf.getTrimmedStringCollection(
-        OMConfigKeys.OZONE_OM_DB_DIRS);
-
-    if (dbDirs.size() > 1) {
-      throw new IllegalArgumentException(
-          "Bad configuration setting " + OMConfigKeys.OZONE_OM_DB_DIRS +
-              ". OM does not support multiple metadata dirs currently.");
-    }
-
-    if (dbDirs.size() == 1) {
-      final File dbDirPath = new File(dbDirs.iterator().next());
-      if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
-        throw new IllegalArgumentException("Unable to create directory " +
-            dbDirPath + " specified in configuration setting " +
-            OMConfigKeys.OZONE_OM_DB_DIRS);
-      }
-      return dbDirPath;
-    }
-
-    LOG.warn("{} is not configured. We recommend adding this setting. " +
-        "Falling back to {} instead.",
-        OMConfigKeys.OZONE_OM_DB_DIRS, HddsConfigKeys.OZONE_METADATA_DIRS);
-    return ServerUtils.getOzoneMetaDirPath(conf);
+    return ScmUtils.getDBPath(conf, OMConfigKeys.OZONE_OM_DB_DIRS);
   }
 
   /**
index c21c383..ab322a2 100644 (file)
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ipc.Client;
@@ -40,7 +41,6 @@ import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocolPB
@@ -437,12 +437,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     private StorageContainerManager createSCM()
         throws IOException, AuthenticationException {
       configureSCM();
-      SCMStorage scmStore = new SCMStorage(conf);
+      SCMStorageConfig scmStore = new SCMStorageConfig(conf);
       initializeScmStorage(scmStore);
       return StorageContainerManager.createSCM(null, conf);
     }
 
-    private void initializeScmStorage(SCMStorage scmStore) throws IOException {
+    private void initializeScmStorage(SCMStorageConfig scmStore)
+        throws IOException {
       if (scmStore.getState() == StorageState.INITIALIZED) {
         return;
       }
index d54da2b..29264c0 100644 (file)
@@ -36,7 +36,6 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -45,7 +44,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
@@ -78,6 +77,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,6 +114,8 @@ public final class TestSecureOzoneCluster {
   private OzoneManagerProtocolClientSideTranslatorPB omClient;
   private KeyPair keyPair;
   private Path metaDirPath;
+  @Rule
+  public TemporaryFolder folder= new TemporaryFolder();
 
   @Before
   public void init() {
@@ -121,8 +123,7 @@ public final class TestSecureOzoneCluster {
       conf = new OzoneConfiguration();
       conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
       DefaultMetricsSystem.setMiniClusterMode(true);
-      final String path = GenericTestUtils
-          .getTempPath(UUID.randomUUID().toString());
+      final String path = folder.newFolder().toString();
       metaDirPath = Paths.get(path, "om-meta");
       conf.set(OZONE_METADATA_DIRS, metaDirPath.toString());
       startMiniKdc();
@@ -149,23 +150,22 @@ public final class TestSecureOzoneCluster {
       if (omClient != null) {
         omClient.close();
       }
-      FileUtils.deleteQuietly(metaDirPath.toFile());
     } catch (Exception e) {
       logger.error("Failed to stop TestSecureOzoneCluster", e);
     }
   }
 
-  private void createCredentialsInKDC(Configuration conf, MiniKdc miniKdc)
-      throws Exception {
+  private void createCredentialsInKDC(Configuration configuration,
+                                      MiniKdc kdc) throws Exception {
     createPrincipal(scmKeytab,
-        conf.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
+        configuration.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
     createPrincipal(spnegoKeytab,
-        conf.get(ScmConfigKeys
+        configuration.get(ScmConfigKeys
             .HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
-    conf.get(OMConfigKeys
+    configuration.get(OMConfigKeys
         .OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
     createPrincipal(omKeyTab,
-        conf.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
+        configuration.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
   }
 
   private void createPrincipal(File keytab, String... principal)
@@ -185,37 +185,39 @@ public final class TestSecureOzoneCluster {
     miniKdc.stop();
   }
 
-  private void setSecureConfig(Configuration conf) throws IOException {
-    conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
-    conf.setBoolean(OZONE_ENABLED, true);
-    String host = InetAddress.getLocalHost().getCanonicalHostName();
+  private void setSecureConfig(Configuration configuration) throws IOException {
+    configuration.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    configuration.setBoolean(OZONE_ENABLED, true);
+    String host = InetAddress.getLocalHost().getCanonicalHostName()
+        .toLowerCase();
     String realm = miniKdc.getRealm();
     curUser = UserGroupInformation.getCurrentUser()
         .getUserName();
-    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+    configuration.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
-    conf.set(OZONE_ADMINISTRATORS, curUser);
+    configuration.set(OZONE_ADMINISTRATORS, curUser);
 
-    conf.set(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    configuration.set(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
         "scm/" + host + "@" + realm);
-    conf.set(ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY,
+    configuration.set(ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY,
         "HTTP_SCM/" + host + "@" + realm);
 
-    conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
+    configuration.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
         "om/" + host + "@" + realm);
-    conf.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY,
+    configuration.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY,
         "HTTP_OM/" + host + "@" + realm);
 
     scmKeytab = new File(workDir, "scm.keytab");
     spnegoKeytab = new File(workDir, "http.keytab");
     omKeyTab = new File(workDir, "om.keytab");
 
-    conf.set(ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
+    configuration.set(ScmConfigKeys.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY,
         scmKeytab.getAbsolutePath());
-    conf.set(
+    configuration.set(
         ScmConfigKeys.HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY,
         spnegoKeytab.getAbsolutePath());
-    conf.set(OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
+    configuration.set(OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
         omKeyTab.getAbsolutePath());
     conf.set(OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE,
         spnegoKeytab.getAbsolutePath());
@@ -239,12 +241,15 @@ public final class TestSecureOzoneCluster {
     scmId = UUID.randomUUID().toString();
     omId = UUID.randomUUID().toString();
 
-    final String path = GenericTestUtils
-        .getTempPath(UUID.randomUUID().toString());
+    final String path = folder.newFolder().toString();
     Path scmPath = Paths.get(path, "scm-meta");
+    File temp = scmPath.toFile();
+    if(!temp.exists()) {
+      temp.mkdirs();
+    }
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
     conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
-    SCMStorage scmStore = new SCMStorage(conf);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     scmStore.setClusterId(clusterId);
     scmStore.setScmId(scmId);
     // writes the version file properties
@@ -586,5 +591,4 @@ public final class TestSecureOzoneCluster {
     CertificateClient certClient = new CertificateClientTestImpl(config);
     om.setCertClient(certClient);
   }
-
 }
index 0a12deb..a0c58db 100644 (file)
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
@@ -398,7 +398,7 @@ public class TestStorageContainerManager {
     // This will initialize SCM
     StorageContainerManager.scmInit(conf);
 
-    SCMStorage scmStore = new SCMStorage(conf);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
     Assert.assertEquals("testClusterId", scmStore.getClusterID());
     StartupOption.INIT.setClusterId("testClusterIdNew");
@@ -422,7 +422,7 @@ public class TestStorageContainerManager {
     StartupOption.INIT.setClusterId("testClusterId");
     // This will initialize SCM
     StorageContainerManager.scmInit(conf);
-    SCMStorage scmStore = new SCMStorage(conf);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
     Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
     cluster.shutdown();
@@ -438,7 +438,8 @@ public class TestStorageContainerManager {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
     conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
     exception.expect(SCMException.class);
-    exception.expectMessage("SCM not initialized.");
+    exception.expectMessage(
+        "SCM not initialized due to storage config failure");
     StorageContainerManager.createSCM(null, conf);
   }
 
@@ -463,7 +464,7 @@ public class TestStorageContainerManager {
     Path scmPath = Paths.get(path, "scm-meta");
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
     conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
-    SCMStorage scmStore = new SCMStorage(conf);
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     String clusterId = UUID.randomUUID().toString();
     String scmId = UUID.randomUUID().toString();
     scmStore.setClusterId(clusterId);
index fa862a3..78f7d29 100644 (file)
@@ -220,9 +220,9 @@ public class TestBlockDeletion {
   private void verifyTransactionsCommitted() throws IOException {
     DeletedBlockLogImpl deletedBlockLog =
         (DeletedBlockLogImpl) scm.getScmBlockManager().getDeletedBlockLog();
-    for (int txnID = 1; txnID <= maxTransactionId; txnID++) {
+    for (long txnID = 1; txnID <= maxTransactionId; txnID++) {
       Assert.assertNull(
-          deletedBlockLog.getDeletedStore().get(Longs.toByteArray(txnID)));
+          scm.getScmMetadataStore().getDeletedBlocksTXTable().get(txnID));
     }
   }
 
index 45231f1..140f91c 100644 (file)
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.net.NetUtils;
@@ -31,7 +32,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.protocol.proto
@@ -1311,11 +1311,11 @@ public class TestOzoneManager {
   public void testOmInitialization() throws IOException {
     // Read the version file info from OM version file
     OMStorage omStorage = cluster.getOzoneManager().getOmStorage();
-    SCMStorage scmStorage = new SCMStorage(conf);
+    SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
     // asserts whether cluster Id and SCM ID are properly set in SCM Version
     // file.
-    Assert.assertEquals(clusterId, scmStorage.getClusterID());
-    Assert.assertEquals(scmId, scmStorage.getScmId());
+    Assert.assertEquals(clusterId, scmStorageConfig.getClusterID());
+    Assert.assertEquals(scmId, scmStorageConfig.getScmId());
     // asserts whether OM Id is properly set in OM Version file.
     Assert.assertEquals(omId, omStorage.getOmId());
     // asserts whether the SCM info is correct in OM Version file.
index ea0b2a1..cf25693 100644 (file)
@@ -127,8 +127,8 @@ public class TestContainerSQLCli {
         new SCMPipelineManager(conf, nodeManager, eventQueue);
     containerManager = new SCMContainerManager(conf, nodeManager,
         pipelineManager, eventQueue);
-    blockManager = new BlockManagerImpl(
-        conf, nodeManager, pipelineManager, containerManager, eventQueue);
+    blockManager =
+        new BlockManagerImpl(conf, cluster.getStorageContainerManager());
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
     eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
     GenericTestUtils.waitFor(() -> {
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/package-info.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/package-info.java
new file mode 100644 (file)
index 0000000..291fcd9
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * A tool to convert Ozone manager Metadata to SQL DB.
+ */
+package org.apache.hadoop.ozone.scm;