PHOENIX-5128 Add ability to skip header in CsvBulkLoadTool
authorJosh Elser <elserj@apache.org>
Fri, 8 Feb 2019 16:13:01 +0000 (11:13 -0500)
committerJosh Elser <elserj@apache.org>
Mon, 11 Feb 2019 23:11:31 +0000 (18:11 -0500)
phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java [new file with mode: 0644]

index 7e4226d..699b469 100644 (file)
@@ -497,4 +497,37 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
         stmt.close();
 
     }
+
+    @Test
+    public void testIgnoreCsvHeader() throws Exception {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE S.TABLE13 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
+
+            final Configuration conf = new Configuration(getUtility().getConfiguration());
+            FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+            FSDataOutputStream outputStream = fs.create(new Path("/tmp/input13.csv"));
+            try (PrintWriter printWriter = new PrintWriter(outputStream)) {
+                printWriter.println("id,name");
+                printWriter.println("1,Name 1");
+                printWriter.println("2,Name 2");
+                printWriter.println("3,Name 3");
+            }
+
+            CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+            csvBulkLoadTool.setConf(conf);
+            int exitCode = csvBulkLoadTool.run(new String[] {
+                    "--input", "/tmp/input13.csv",
+                    "--table", "table13",
+                    "--schema", "s",
+                    "--zookeeper", zkQuorum,
+                    "--skip-header"});
+            assertEquals(0, exitCode);
+
+            try (ResultSet rs = stmt.executeQuery("SELECT COUNT(1) FROM S.TABLE13")) {
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                assertFalse(rs.next());
+            }
+        }
+    }
 }
index 13c7ab6..e321361 100644 (file)
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -88,6 +87,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
     static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
     static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
     static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
+    static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)");
 
     /**
      * Set configuration values based on parsed command line options.
@@ -111,6 +111,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         options.addOption(IMPORT_COLUMNS_OPT);
         options.addOption(IGNORE_ERRORS_OPT);
         options.addOption(HELP_OPT);
+        options.addOption(SKIP_HEADER_OPT);
         return options;
     }
 
@@ -202,6 +203,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
                 conf.set(entry.getKey(), entry.getValue());
             }
         }
+        // Skip the first line of the CSV file(s)?
+        if (cmdLine.hasOption(SKIP_HEADER_OPT.getOpt())) {
+            PhoenixTextInputFormat.setSkipHeader(conf);
+        }
 
         final Connection conn = QueryUtil.getConnection(conf);
         if (LOG.isDebugEnabled()) {
@@ -279,7 +284,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         FileInputFormat.addInputPaths(job, inputPaths);
         FileOutputFormat.setOutputPath(job, outputPath);
 
-        job.setInputFormatClass(TextInputFormat.class);
+        job.setInputFormatClass(PhoenixTextInputFormat.class);
         job.setMapOutputKeyClass(TableRowkeyPair.class);
         job.setMapOutputValueClass(ImmutableBytesWritable.class);
         job.setOutputKeyClass(TableRowkeyPair.class);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java
new file mode 100644 (file)
index 0000000..cc170f5
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around TextInputFormat which can ignore the first line in the first InputSplit
+ * for a file.
+ */
+public class PhoenixTextInputFormat extends TextInputFormat {
+  public static final String SKIP_HEADER_KEY = "phoenix.input.format.skip.header";
+
+  public static void setSkipHeader(Configuration conf) {
+    conf.setBoolean(SKIP_HEADER_KEY, true);
+  }
+
+  @Override
+  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+    RecordReader<LongWritable,Text> rr = super.createRecordReader(split, context);
+    
+    return new PhoenixLineRecordReader((LineRecordReader) rr);
+  }
+
+  public static class PhoenixLineRecordReader extends RecordReader<LongWritable,Text> {
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixLineRecordReader.class);
+    private final LineRecordReader rr;
+    private PhoenixLineRecordReader(LineRecordReader rr) {
+      this.rr = rr;
+    }
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+      rr.initialize(genericSplit, context);
+      final Configuration conf = context.getConfiguration();
+      final FileSplit split = (FileSplit) genericSplit;
+      if (conf.getBoolean(SKIP_HEADER_KEY, false) && split.getStart() == 0) {
+        LOG.trace("Consuming first key-value from {}", genericSplit);
+        nextKeyValue();
+      } else {
+        LOG.trace("Not configured to skip header or not the first input split: {}", split);
+      }
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return rr.nextKeyValue();
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException {
+      return rr.getCurrentKey();
+    }
+
+    @Override
+    public Text getCurrentValue() throws IOException {
+      return rr.getCurrentValue();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return rr.getProgress();
+    }
+
+    @Override
+    public void close() throws IOException {
+      rr.close();
+    }
+  }
+}