[CARBONDATA-3221] Fix the error of SDK don't support read multiple file from S3
authorxubo245 <xubo29@huawei.com>
Fri, 4 Jan 2019 08:53:48 +0000 (16:53 +0800)
committerravipesala <ravi.pesala@gmail.com>
Tue, 8 Jan 2019 07:28:56 +0000 (12:58 +0530)
SDK reader is ok with filter, but when we read data without filter, the ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() will be 0 and
FileReader reader don't closed after readByteBuffer in org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader#initialize, so we should invoke finish after readByteBuffer

This closes #3051

examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java

index f9eae9e..33642bf 100644 (file)
@@ -33,6 +33,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.log4j.Logger;
 
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+
 /**
  * Example for testing CarbonWriter on S3
  */
@@ -41,7 +45,7 @@ public class SDKS3Example {
         Logger logger = LogServiceFactory.getLogService(SDKS3Example.class.getName());
         if (args == null || args.length < 3) {
             logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
-                + "<s3-endpoint> [table-path-on-s3] [rows]");
+                + "<s3-endpoint> [table-path-on-s3] [rows] [Number of writes]");
             System.exit(0);
         }
 
@@ -56,9 +60,13 @@ public class SDKS3Example {
             path=args[3];
         }
 
-        int num = 3;
+        int rows = 3;
         if (args.length > 4) {
-            num = Integer.parseInt(args[4]);
+            rows = Integer.parseInt(args[4]);
+        }
+        int num = 3;
+        if (args.length > 5) {
+            num = Integer.parseInt(args[5]);
         }
 
         Configuration conf = new Configuration(true);
@@ -69,18 +77,20 @@ public class SDKS3Example {
         Field[] fields = new Field[2];
         fields[0] = new Field("name", DataTypes.STRING);
         fields[1] = new Field("age", DataTypes.INT);
-        CarbonWriter writer = CarbonWriter
-            .builder()
-            .outputPath(path)
-            .withHadoopConf(conf)
-            .withCsvInput(new Schema(fields))
-            .writtenBy("SDKS3Example")
-            .build();
+        for (int j = 0; j < num; j++) {
+            CarbonWriter writer = CarbonWriter
+                .builder()
+                .outputPath(path)
+                .withHadoopConf(conf)
+                .withCsvInput(new Schema(fields))
+                .writtenBy("SDKS3Example")
+                .build();
 
-        for (int i = 0; i < num; i++) {
-            writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
+            for (int i = 0; i < rows; i++) {
+                writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
+            }
+            writer.close();
         }
-        writer.close();
         // Read data
 
         EqualToExpression equalToExpression = new EqualToExpression(
@@ -104,6 +114,25 @@ public class SDKS3Example {
         System.out.println("\nFinished");
         reader.close();
 
+        // Read without filter
+        CarbonReader reader2 = CarbonReader
+            .builder(path, "_temp")
+            .projection(new String[]{"name", "age"})
+            .withHadoopConf(ACCESS_KEY, args[0])
+            .withHadoopConf(SECRET_KEY, args[1])
+            .withHadoopConf(ENDPOINT, args[2])
+            .build();
+
+        System.out.println("\nData:");
+        i = 0;
+        while (i < 20 && reader2.hasNext()) {
+            Object[] row = (Object[]) reader2.readNextRow();
+            System.out.println(row[0] + " " + row[1]);
+            i++;
+        }
+        System.out.println("\nFinished");
+        reader2.close();
+
         CarbonProperties.getInstance()
             .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
                 backupProperty);
index d66bdd1..e18a4d4 100644 (file)
@@ -90,6 +90,7 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
                 ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockSize() - 8,
                 8);
         ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong());
+        reader.finish();
       }
       splitList = new ArrayList<>(1);
       splitList.add((CarbonInputSplit) inputSplit);