[HOTFIX] presto carbon doesn't work with Hadoop conf in cluster.
authorajantha-bhat <ajanthabhat@gmail.com>
Mon, 21 Jan 2019 12:17:22 +0000 (17:47 +0530)
committerravipesala <ravi.pesala@gmail.com>
Thu, 24 Jan 2019 12:23:51 +0000 (17:53 +0530)
problem : presto carbon doesn't work with Hadoop conf in cluster.

cause:
When presto queries are run in cluster, it fails with below message.
IllegalArgumentException java.net.UnknownHostException: hacluster
configuration from hdfsEnvironment is not used while checking schema path. hence the file factory is throwing exception.

solution: set the configuration while checking schema path and other places in presto

This closes #3089

core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java

index 8538e37..da558be 100644 (file)
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Helps to get Table content paths.
  */
@@ -175,12 +177,27 @@ public class CarbonTablePath {
    * @return schema file path
    */
   public static String getSchemaFilePath(String tablePath) {
-    return getActualSchemaFilePath(tablePath);
+    return getActualSchemaFilePath(tablePath, null);
+  }
+
+  /**
+   * return the schema file path
+   * @param tablePath path to table files
+   * @param hadoopConf hadoop configuration instance
+   * @return schema file path
+   */
+  public static String getSchemaFilePath(String tablePath, Configuration hadoopConf) {
+    return getActualSchemaFilePath(tablePath, hadoopConf);
   }
 
-  private static String getActualSchemaFilePath(String tablePath) {
+  private static String getActualSchemaFilePath(String tablePath, Configuration hadoopConf) {
     String metaPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
-    CarbonFile carbonFile = FileFactory.getCarbonFile(metaPath);
+    CarbonFile carbonFile;
+    if (hadoopConf != null) {
+      carbonFile = FileFactory.getCarbonFile(metaPath, hadoopConf);
+    } else {
+      carbonFile = FileFactory.getCarbonFile(metaPath);
+    }
     CarbonFile[] schemaFile = carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
         return file.getName().startsWith(SCHEMA_FILE);
index 1121a37..916e44c 100755 (executable)
@@ -168,60 +168,59 @@ public class CarbonTableReader {
   private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
       Configuration config) {
     try {
-      CarbonTableCacheModel cache = carbonCache.get().get(table);
-      if (cache != null && cache.isValid()) {
+      CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table);
+      if (cache != null) {
         return cache;
       }
-      // Step 1: get store path of the table and cache it.
-      String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath);
-      // If metadata folder exists, it is a transactional table
-      CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
-      boolean isTransactionalTable = schemaFile.exists();
-      org.apache.carbondata.format.TableInfo tableInfo;
-      long modifiedTime = System.currentTimeMillis();
-      if (isTransactionalTable) {
-        //Step 2: read the metadata (tableInfo) of the table.
-        ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
-          // TBase is used to read and write thrift objects.
-          // TableInfo is a kind of TBase used to read and write table information.
-          // TableInfo is generated by thrift,
-          // see schema.thrift under format/src/main/thrift for details.
-          public TBase create() {
-            return new org.apache.carbondata.format.TableInfo();
-          }
-        };
-        ThriftReader thriftReader =
-            new ThriftReader(schemaFilePath, createTBase, config);
-        thriftReader.open();
-        tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
-        thriftReader.close();
-        modifiedTime = schemaFile.getLastModifiedTime();
-      } else {
-        tableInfo = CarbonUtil
-            .inferSchema(tablePath, table.getTableName(), false, config);
-      }
-      // Step 3: convert format level TableInfo to code level TableInfo
-      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      // wrapperTableInfo is the code level information of a table in carbondata core,
-      // different from the Thrift TableInfo.
-      TableInfo wrapperTableInfo = schemaConverter
-          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
-              tablePath);
-
-      wrapperTableInfo.setTransactionalTable(isTransactionalTable);
-
-      CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
-      // Step 4: Load metadata info into CarbonMetadata
-      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-      CarbonTable carbonTable = Objects.requireNonNull(
-          CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()),
-          "carbontable is null");
-      // If table is not previously cached, then:
-      if (cache == null) {
+      // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this.
+      synchronized (this) {
+        // cache might be filled by another thread, so if filled use that cache.
+        CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table);
+        if (cacheModel != null) {
+          return cacheModel;
+        }
+        // Step 1: get store path of the table and cache it.
+        String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config);
+        // If metadata folder exists, it is a transactional table
+        CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+        boolean isTransactionalTable = schemaFile.exists();
+        org.apache.carbondata.format.TableInfo tableInfo;
+        long modifiedTime = System.currentTimeMillis();
+        if (isTransactionalTable) {
+          //Step 2: read the metadata (tableInfo) of the table.
+          ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+            // TBase is used to read and write thrift objects.
+            // TableInfo is a kind of TBase used to read and write table information.
+            // TableInfo is generated by thrift,
+            // see schema.thrift under format/src/main/thrift for details.
+            public TBase create() {
+              return new org.apache.carbondata.format.TableInfo();
+            }
+          };
+          ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config);
+          thriftReader.open();
+          tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
+          thriftReader.close();
+          modifiedTime = schemaFile.getLastModifiedTime();
+        } else {
+          tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config);
+        }
+        // Step 3: convert format level TableInfo to code level TableInfo
+        SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+        // wrapperTableInfo is the code level information of a table in carbondata core,
+        // different from the Thrift TableInfo.
+        TableInfo wrapperTableInfo = schemaConverter
+            .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                tablePath);
+        wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+        CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
+        // Step 4: Load metadata info into CarbonMetadata
+        CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+        CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance()
+            .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null");
         cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
         // cache the table
         carbonCache.get().put(table, cache);
-      } else {
         cache.setCarbonTable(carbonTable);
       }
       return cache;
@@ -230,6 +229,14 @@ public class CarbonTableReader {
     }
   }
 
+  private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) {
+    CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName);
+    if (cache != null && cache.isValid()) {
+      return cache;
+    }
+    return null;
+  }
+
   public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
       Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
       throws IOException {