[CARBONDATA-3210] Merge common method into CarbonSparkUtil and fix example error
authorxiaohui0318 <245300759@qq.com>
Fri, 28 Dec 2018 06:08:54 +0000 (14:08 +0800)
committerxubo245 <xubo29@huawei.com>
Wed, 9 Jan 2019 08:42:39 +0000 (16:42 +0800)
1.merge public methods to spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
org.apache.carbondata.examples.S3UsingSDKExample#getKeyOnPrefix
org.apache.carbondata.examples.S3Example$#getKeyOnPrefix
org.apache.carbondata.spark.thriftserver.CarbonThriftServer#getKeyOnPrefix

2. fix the error of S3UsingSDKExample

This closes #3032

examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala

index 9cc43d0..6c5bfc6 100644 (file)
@@ -18,11 +18,10 @@ package org.apache.carbondata.examples
 
 import java.io.File
 
-import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.sql.{Row, SparkSession}
 import org.slf4j.{Logger, LoggerFactory}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 object S3Example {
 
@@ -38,18 +37,18 @@ object S3Example {
    */
   def main(args: Array[String]) {
     val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
+      + "../../../..").getCanonicalPath
     val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv"
     val logger: Logger = LoggerFactory.getLogger(this.getClass)
 
     import org.apache.spark.sql.CarbonSession._
     if (args.length < 3 || args.length > 5) {
       logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
-                   "<table-path-on-s3> [s3-endpoint] [spark-master]")
+        "<table-path-on-s3> [s3-endpoint] [spark-master]")
       System.exit(0)
     }
 
-    val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2))
+    val (accessKey, secretKey, endpoint) = CarbonSparkUtil.getKeyOnPrefix(args(2))
     val spark = SparkSession
       .builder()
       .master(getSparkMaster(args))
@@ -57,13 +56,12 @@ object S3Example {
       .config("spark.driver.host", "localhost")
       .config(accessKey, args(0))
       .config(secretKey, args(1))
-      .config(endpoint, getS3EndPoint(args))
+      .config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
       .getOrCreateCarbonSession()
 
     spark.sparkContext.setLogLevel("WARN")
 
     spark.sql("Drop table if exists carbon_table")
-
     spark.sql(
       s"""
          | CREATE TABLE if not exists carbon_table(
@@ -79,7 +77,7 @@ object S3Example {
          | floatField FLOAT
          | )
          | STORED BY 'carbondata'
-         | LOCATION '${ args(2) }'
+         | LOCATION '${args(2)}'
          | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
        """.stripMargin)
 
@@ -136,26 +134,6 @@ object S3Example {
     spark.stop()
   }
 
-  def getKeyOnPrefix(path: String): (String, String, String) = {
-    val endPoint = "spark.hadoop." + ENDPOINT
-    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
-      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
-    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
-      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
-        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
-    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
-      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
-        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
-    } else {
-      throw new Exception("Incorrect Store Path")
-    }
-  }
-
-  def getS3EndPoint(args: Array[String]): String = {
-    if (args.length >= 4 && args(3).contains(".com")) args(3)
-    else ""
-  }
-
   def getSparkMaster(args: Array[String]): String = {
     if (args.length == 5) args(4)
     else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
index e0c2cdc..c335daf 100644 (file)
  */
 package org.apache.carbondata.examples
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.sql.SparkSession
 import org.slf4j.{Logger, LoggerFactory}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
 
 /**
  * Generate data and write data to S3
  * User can generate different numbers of data by specifying the number-of-rows in parameters
  */
-object S3UsingSDKExample {
+object S3UsingSdkExample {
 
   // prepare SDK writer output
   def buildTestData(
-      path: String,
-      num: Int = 3): Any = {
+                     args: Array[String],
+                     path: String,
+                     num: Int = 3): Any = {
 
     // getCanonicalPath gives path with \, but the code expects /.
-    val writerPath = path.replace("\\", "/");
+    val writerPath = path.replace("\\", "/")
 
     val fields: Array[Field] = new Array[Field](3)
     fields(0) = new Field("name", DataTypes.STRING)
@@ -50,16 +51,20 @@ object S3UsingSDKExample {
         builder.outputPath(writerPath)
           .uniqueIdentifier(System.currentTimeMillis)
           .withBlockSize(2)
+          .writtenBy("S3UsingSdkExample")
+          .withHadoopConf(ACCESS_KEY, args(0))
+          .withHadoopConf(SECRET_KEY, args(1))
+          .withHadoopConf(ENDPOINT, CarbonSparkUtil.getS3EndPoint(args))
           .withCsvInput(new Schema(fields)).build()
       var i = 0
-      var row = num
+      val row = num
       while (i < row) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
         i += 1
       }
       writer.close()
     } catch {
-      case ex: Throwable => None
+      case e: Exception => throw e
     }
   }
 
@@ -79,11 +84,11 @@ object S3UsingSDKExample {
     import org.apache.spark.sql.CarbonSession._
     if (args.length < 2 || args.length > 6) {
       logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
-                   "[table-path-on-s3] [s3-endpoint] [number-of-rows] [spark-master]")
+        "[table-path-on-s3] [s3-endpoint] [number-of-rows] [spark-master]")
       System.exit(0)
     }
 
-    val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2))
+    val (accessKey, secretKey, endpoint) = CarbonSparkUtil.getKeyOnPrefix(args(2))
     val spark = SparkSession
       .builder()
       .master(getSparkMaster(args))
@@ -91,7 +96,7 @@ object S3UsingSDKExample {
       .config("spark.driver.host", "localhost")
       .config(accessKey, args(0))
       .config(secretKey, args(1))
-      .config(endpoint, getS3EndPoint(args))
+      .config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
       .getOrCreateCarbonSession()
 
     spark.sparkContext.setLogLevel("WARN")
@@ -105,7 +110,7 @@ object S3UsingSDKExample {
     } else {
       3
     }
-    buildTestData(path, num)
+    buildTestData(args, path, num)
 
     spark.sql("DROP TABLE IF EXISTS s3_sdk_table")
     spark.sql(s"CREATE EXTERNAL TABLE s3_sdk_table STORED BY 'carbondata'" +
@@ -114,29 +119,9 @@ object S3UsingSDKExample {
     spark.stop()
   }
 
-  def getKeyOnPrefix(path: String): (String, String, String) = {
-    val endPoint = "spark.hadoop." + ENDPOINT
-    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
-      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
-    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
-      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
-        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
-    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
-      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
-        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
-    } else {
-      throw new Exception("Incorrect Store Path")
-    }
-  }
-
-  def getS3EndPoint(args: Array[String]): String = {
-    if (args.length >= 4 && args(3).contains(".com")) args(3)
-    else ""
-  }
-
   def getSparkMaster(args: Array[String]): String = {
     if (args.length == 6) args(5)
-    else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
     else "local"
   }
+
 }
index ce46af3..e268e5d 100644 (file)
@@ -28,12 +28,13 @@ import org.slf4j.{Logger, LoggerFactory}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
-/**
- * CarbonThriftServer support different modes:
- * 1. read/write data from/to HDFS or local,it only needs configurate storePath
- * 2. read/write data from/to S3, it needs provide access-key, secret-key, s3-endpoint
- */
+ /**
 * CarbonThriftServer support different modes:
 * 1. read/write data from/to HDFS or local,it only needs configurate storePath
 * 2. read/write data from/to S3, it needs provide access-key, secret-key, s3-endpoint
 */
 object CarbonThriftServer {
 
   def main(args: Array[String]): Unit = {
@@ -72,10 +73,10 @@ object CarbonThriftServer {
     val spark = if (args.length <= 1) {
       builder.getOrCreateCarbonSession(storePath)
     } else {
-      val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(0))
+      val (accessKey, secretKey, endpoint) = CarbonSparkUtil.getKeyOnPrefix(args(0))
       builder.config(accessKey, args(1))
         .config(secretKey, args(2))
-        .config(endpoint, getS3EndPoint(args))
+        .config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
         .getOrCreateCarbonSession(storePath)
     }
 
@@ -86,31 +87,11 @@ object CarbonThriftServer {
       case e: Exception =>
         val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
-                  "Using default Value and proceeding")
+          "Using default Value and proceeding")
         Thread.sleep(5000)
     }
 
     HiveThriftServer2.startWithContext(spark.sqlContext)
   }
 
-  def getKeyOnPrefix(path: String): (String, String, String) = {
-    val endPoint = "spark.hadoop." + ENDPOINT
-    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
-      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
-    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
-      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
-        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
-    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
-      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
-        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
-    } else {
-      throw new Exception("Incorrect Store Path")
-    }
-  }
-
-  def getS3EndPoint(args: Array[String]): String = {
-    if (args.length >= 4 && args(3).contains(".com")) args(3)
-    else ""
-  }
-
 }
index b2687d0..8feb1b9 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -30,6 +31,9 @@ import org.apache.carbondata.core.util.CarbonUtil
 
 case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
 
+/**
+ * carbon spark common methods
+ */
 object CarbonSparkUtil {
 
   def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
@@ -41,7 +45,7 @@ object CarbonSparkUtil {
       carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
           f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-          !f.getDataType.isComplexType)
+            !f.getDataType.isComplexType)
       }
     CarbonMetaData(dimensionsAttr,
       measureAttr,
@@ -62,8 +66,8 @@ object CarbonSparkUtil {
   /**
    * return's the formatted column comment if column comment is present else empty("")
    *
-   * @param carbonColumn
-   * @return
+   * @param carbonColumn the column of carbonTable
+   * @return string
    */
   def getColumnComment(carbonColumn: CarbonColumn): String = {
     {
@@ -81,8 +85,8 @@ object CarbonSparkUtil {
   /**
    * the method return's raw schema
    *
-   * @param carbonRelation
-   * @return
+   * @param carbonRelation logical plan for one carbon table
+   * @return schema
    */
   def getRawSchema(carbonRelation: CarbonRelation): String = {
     val fields = new Array[String](
@@ -110,6 +114,10 @@ object CarbonSparkUtil {
 
   /**
    * add escape prefix for delimiter
+   *
+   * @param delimiter A delimiter is a sequence of one or more characters
+   * used to specify the boundary between separate
+   * @return delimiter
    */
   def delimiterConverter4Udf(delimiter: String): String = delimiter match {
     case "|" | "*" | "." | ":" | "^" | "\\" | "$" | "+" | "?" | "(" | ")" | "{" | "}" | "[" | "]" =>
@@ -117,4 +125,26 @@ object CarbonSparkUtil {
     case _ =>
       delimiter
   }
+
+  def getKeyOnPrefix(path: String): (String, String, String) = {
+    val prefix = "spark.hadoop."
+    val endPoint = prefix + ENDPOINT
+    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+      (prefix + ACCESS_KEY, prefix + SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
+      (prefix + CarbonCommonConstants.S3N_ACCESS_KEY,
+        prefix + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      (prefix + CarbonCommonConstants.S3_ACCESS_KEY,
+        prefix + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
+    } else {
+      throw new Exception("Incorrect Store Path")
+    }
+  }
+
+  def getS3EndPoint(args: Array[String]): String = {
+    if (args.length >= 4 && args(3).contains(".com")) args(3)
+    else ""
+  }
+
 }