AMBARI-22365. Add storage support for storing metric definitions using LevelDB. ...
authorSiddharth Wagle <swagle@hortonworks.com>
Fri, 3 Nov 2017 17:06:12 +0000 (10:06 -0700)
committeravijayanhwx <avijayan@hortonworks.com>
Sun, 1 Apr 2018 19:13:52 +0000 (12:13 -0700)
ambari-metrics-anomaly-detection-service/pom.xml
ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala [new file with mode: 0644]
ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala [new file with mode: 0644]
ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala [new file with mode: 0644]
ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala [new file with mode: 0644]

index 44bdc1f..cfa8124 100644 (file)
       <artifactId>jackson-datatype-jdk8</artifactId>
       <version>${jackson.version}</version>
     </dependency>
+
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <version>${jackson.version}</version>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+      <version>1.8</version>
+    </dependency>
+    <dependency>
+      <groupId>org.iq80.leveldb</groupId>
+      <artifactId>leveldb</artifactId>
+      <version>0.9</version>
     </dependency>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>2.5</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.8.4</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
index 920c50c..299a472 100644 (file)
@@ -30,6 +30,14 @@ metricsCollector:
 adQueryService:
   anomalyDataTtl: 604800
 
+metricDefinitionDB:
+  # force checksum verification of all data that is read from the file system on behalf of a particular read
+  verifyChecksums: true
+  # raise an error as soon as it detects an internal corruption
+  performParanoidChecks: false
+  # Path to Level DB directory
+  dbDirPath: /var/lib/ambari-metrics-anomaly-detection/
+
 #subsystemService:
 #  spark:
 #  pointInTime:
index c1ef0d1..aa20223 100644 (file)
@@ -20,10 +20,9 @@ package org.apache.ambari.metrics.adservice.app
 
 import javax.validation.Valid
 
-import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricDefinitionServiceConfiguration}
+import org.apache.ambari.metrics.adservice.configuration._
 
 import com.fasterxml.jackson.annotation.JsonProperty
-
 import io.dropwizard.Configuration
 
 /**
@@ -46,6 +45,12 @@ class AnomalyDetectionAppConfig extends Configuration {
   @Valid
   private val adServiceConfiguration = new AdServiceConfiguration
 
+  /**
+    * LevelDB settings for metrics definitions
+    */
+  @Valid
+  private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration
+
   /*
    HBase Conf
     */
@@ -66,4 +71,6 @@ class AnomalyDetectionAppConfig extends Configuration {
   @JsonProperty("metricsCollector")
   def getMetricCollectorConfiguration: MetricCollectorConfiguration = metricCollectorConfiguration
 
+  @JsonProperty("metricDefinitionDB")
+  def getMetricDefinitionDBConfiguration: MetricDefinitionDBConfiguration = metricDefinitionDBConfiguration
 }
index 7425a7e..28b2880 100644 (file)
   */
 package org.apache.ambari.metrics.adservice.app
 
+import org.apache.ambari.metrics.adservice.db.MetadataDatasource
+import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
 import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, RootResource}
 import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl}
 
 import com.codahale.metrics.health.HealthCheck
 import com.google.inject.AbstractModule
 import com.google.inject.multibindings.Multibinder
-
 import io.dropwizard.setup.Environment
 
 class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule {
@@ -35,5 +36,6 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm
     bind(classOf[AnomalyResource])
     bind(classOf[RootResource])
     bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
+    bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
   }
 }
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala
new file mode 100644 (file)
index 0000000..79a350c
--- /dev/null
@@ -0,0 +1,38 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.configuration
+
+import javax.validation.constraints.NotNull
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+class MetricDefinitionDBConfiguration {
+
+  @NotNull
+  private var dbDirPath: String = _
+
+  @JsonProperty("verifyChecksums")
+  def verifyChecksums: Boolean = true
+
+  @JsonProperty("performParanoidChecks")
+  def performParanoidChecks: Boolean = false
+
+  @JsonProperty("dbDirPath")
+  def getDbDirPath: String = dbDirPath
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
new file mode 100644 (file)
index 0000000..aa6694a
--- /dev/null
@@ -0,0 +1,73 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.db
+
+trait MetadataDatasource {
+
+  type Key = Array[Byte]
+  type Value = Array[Byte]
+
+  /**
+    *  Idempotent call at the start of the application to initialize db
+    */
+  def initialize(): Unit
+
+  /**
+    * This function obtains the associated value to a key. It requires the (key-value) pair to be in the DataSource
+    *
+    * @param key
+    * @return the value associated with the passed key.
+    */
+  def apply(key: Key): Value = get(key).get
+
+  /**
+    * This function obtains the associated value to a key, if there exists one.
+    *
+    * @param key
+    * @return the value associated with the passed key.
+    */
+  def get(key: Key): Option[Value]
+
+
+  /**
+    * This function associates a key to a value, overwriting if necessary
+    */
+  def put(key: Key, value: Value): Unit
+
+  /**
+    * Delete key from the db
+    */
+  def delete(key: Key): Unit
+
+  /**
+    * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
+    *
+    * @param toRemove which includes all the keys to be removed from the DataSource.
+    * @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource.
+    *                 If a key is already in the DataSource its value will be updated.
+    * @return the new DataSource after the removals and insertions were done.
+    */
+  def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit
+
+  /**
+    * This function closes the DataSource, without deleting the files used by it.
+    */
+  def close(): Unit
+
+}
diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
new file mode 100644 (file)
index 0000000..6d185bf
--- /dev/null
@@ -0,0 +1,102 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.leveldb
+
+import java.io.File
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
+import org.apache.ambari.metrics.adservice.db.MetadataDatasource
+import org.iq80.leveldb.{DB, Options, WriteOptions}
+import org.iq80.leveldb.impl.Iq80DBFactory
+
+import com.google.inject.Singleton
+
+@Singleton
+class LevelDBDataSource(appConfig: AnomalyDetectionAppConfig) extends MetadataDatasource {
+
+  private var db: DB = _
+  @volatile var isInitialized: Boolean = false
+
+  override def initialize(): Unit = {
+    if (isInitialized) return 
+
+    val configuration: MetricDefinitionDBConfiguration = appConfig.getMetricDefinitionDBConfiguration
+
+    db = createDB(new LevelDbConfig {
+      override val createIfMissing: Boolean = true
+      override val verifyChecksums: Boolean = configuration.verifyChecksums
+      override val paranoidChecks: Boolean = configuration.performParanoidChecks
+      override val path: String = configuration.getDbDirPath
+    })
+    isInitialized = true
+  }
+
+  private def createDB(levelDbConfig: LevelDbConfig): DB = {
+    import levelDbConfig._
+
+    val options = new Options()
+      .createIfMissing(createIfMissing)
+      .paranoidChecks(paranoidChecks) // raise an error as soon as it detects an internal corruption
+      .verifyChecksums(verifyChecksums) // force checksum verification of all data that is read from the file system on behalf of a particular read
+
+    Iq80DBFactory.factory.open(new File(path), options)
+  }
+
+  override def close(): Unit = {
+    db.close()
+  }
+
+  /**
+    * This function obtains the associated value to a key, if there exists one.
+    *
+    * @param key
+    * @return the value associated with the passed key.
+    */
+  override def get(key: Key): Option[Value] = Option(db.get(key))
+
+  /**
+    * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
+    *
+    * @param toRemove which includes all the keys to be removed from the DataSource.
+    * @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource.
+    *                 If a key is already in the DataSource its value will be updated.
+    */
+  override def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit = {
+    val batch = db.createWriteBatch()
+    toRemove.foreach { key => batch.delete(key) }
+    toUpsert.foreach { item => batch.put(item._1, item._2) }
+    db.write(batch, new WriteOptions())
+  }
+
+  override def put(key: Key, value: Value): Unit = {
+    db.put(key, value)
+  }
+
+  override def delete(key: Key): Unit = {
+    db.delete(key)
+  }
+}
+
+trait LevelDbConfig {
+  val createIfMissing: Boolean
+  val paranoidChecks: Boolean
+  val verifyChecksums: Boolean
+  val path: String
+}
\ No newline at end of file
diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala
new file mode 100644 (file)
index 0000000..2ddb7b8
--- /dev/null
@@ -0,0 +1,57 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.ambari.metrics.adservice.leveldb
+
+import java.io.File
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
+import org.iq80.leveldb.util.FileUtils
+import org.mockito.Mockito.when
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.mockito.MockitoSugar
+
+class LevelDBDataSourceTest extends FunSuite with BeforeAndAfter with Matchers with MockitoSugar {
+
+  var db: LevelDBDataSource = _
+  var file : File = FileUtils.createTempDir("adservice-leveldb-test")
+
+  before {
+    val appConfig: AnomalyDetectionAppConfig = mock[AnomalyDetectionAppConfig]
+    val mdConfig : MetricDefinitionDBConfiguration = mock[MetricDefinitionDBConfiguration]
+
+    when(appConfig.getMetricDefinitionDBConfiguration).thenReturn(mdConfig)
+    when(mdConfig.verifyChecksums).thenReturn(true)
+    when(mdConfig.performParanoidChecks).thenReturn(false)
+    when(mdConfig.getDbDirPath).thenReturn(file.getAbsolutePath)
+
+    db = new LevelDBDataSource(appConfig)
+    db.initialize()
+  }
+
+  test("testOperations") {
+    db.put("Hello".getBytes(), "World".getBytes())
+    assert(db.get("Hello".getBytes()).get.sameElements("World".getBytes()))
+    db.update(Seq("Hello".getBytes()), Seq(("Hello".getBytes(), "Mars".getBytes())))
+    assert(db.get("Hello".getBytes()).get.sameElements("Mars".getBytes()))
+  }
+
+  after {
+    FileUtils.deleteRecursively(file)
+  }
+}