SQOOP-3149: Sqoop incremental import - NULL column updates are not pulled into HBase...
authorAnna Szonyi <annaszonyi@apache.org>
Thu, 3 Aug 2017 13:25:36 +0000 (15:25 +0200)
committerAnna Szonyi <annaszonyi@apache.org>
Thu, 3 Aug 2017 13:25:36 +0000 (15:25 +0200)
(Jilani Shaik via Anna Szonyi)

src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
src/java/org/apache/sqoop/hbase/PutTransformer.java
src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java
src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java

index fdbe127..032fd38 100644 (file)
@@ -25,7 +25,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -128,17 +130,27 @@ public class HBasePutProcessor implements Closeable, Configurable,
   public void accept(FieldMappable record)
       throws IOException, ProcessingException {
     Map<String, Object> fields = record.getFieldMap();
-
-    List<Put> putList = putTransformer.getPutCommand(fields);
-    if (null != putList) {
-      for (Put put : putList) {
-        if (put!=null) {
-          if (put.isEmpty()) {
-            LOG.warn("Could not insert row with no columns "
-                + "for row-key column: " + Bytes.toString(put.getRow()));
-          } else {
-            this.table.put(put);
-          }
+    List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
+    if (null != mutationList) {
+      for (Mutation mutation : mutationList) {
+        if (mutation!=null) {
+            if(mutation instanceof Put) {
+              Put putObject = (Put) mutation;
+              if (putObject.isEmpty()) {
+                LOG.warn("Could not insert row with no columns "
+                      + "for row-key column: " + Bytes.toString(putObject.getRow()));
+                } else {
+                  this.table.put(putObject);
+                }
+              } else if(mutation instanceof Delete) {
+                Delete deleteObject = (Delete) mutation;
+                if (deleteObject.isEmpty()) {
+                  LOG.warn("Could not delete row with no columns "
+                        + "for row-key column: " + Bytes.toString(deleteObject.getRow()));
+                } else {
+                  this.table.delete(deleteObject);
+                }
+            }
         }
       }
     }
index 533467e..c4496ee 100644 (file)
@@ -22,9 +22,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Mutation;
 
 /**
  * Interface that takes a map of jdbc field names to values
@@ -71,7 +70,7 @@ public abstract class PutTransformer {
    * @param fields a map of field names to values to insert.
    * @return A list of Put commands that inserts these into HBase.
    */
-  public abstract List<Put> getPutCommand(Map<String, Object> fields)
+  public abstract List<Mutation> getMutationCommand(Map<String, Object> fields)
       throws IOException;
 
 }
index 363e145..20bf1b9 100644 (file)
@@ -22,6 +22,8 @@ import com.cloudera.sqoop.hbase.PutTransformer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
@@ -106,7 +108,7 @@ public class ToStringPutTransformer extends PutTransformer {
 
   @Override
   /** {@inheritDoc} */
-  public List<Put> getPutCommand(Map<String, Object> fields)
+  public List<Mutation> getMutationCommand(Map<String, Object> fields)
       throws IOException {
 
     String rowKeyCol = getRowKeyColumn();
@@ -140,7 +142,7 @@ public class ToStringPutTransformer extends PutTransformer {
       // from composite key
       String compositeRowKey = StringUtils.join(DELIMITER_HBASE, rowKeyList);
       // Insert record in HBase
-      return putRecordInHBase(fields, colFamily, compositeRowKey);
+      return mutationRecordInHBase(fields, colFamily, compositeRowKey);
 
     } else {
       // if row-key is regular primary key
@@ -154,23 +156,21 @@ public class ToStringPutTransformer extends PutTransformer {
       }
 
       String hBaseRowKey = toHBaseString(rowKey);
-      return putRecordInHBase(fields, colFamily, hBaseRowKey);
+      return mutationRecordInHBase(fields, colFamily, hBaseRowKey);
    }
  }
 
   /**
-   * Performs actual Put operation for the specified record in HBase.
+   * Performs actual Put/delete operation for the specified record in HBase.
    * @param record
    * @param colFamily
    * @param rowKey
-   * @return List containing a single put command
+   * @return List containing a put/delete command
    */
-  private List<Put> putRecordInHBase(Map<String, Object> record,
+  private List<Mutation> mutationRecordInHBase(Map<String, Object> record,
     String colFamily, String rowKey) {
-    // Put row-key in HBase
-    Put put = new Put(Bytes.toBytes(rowKey));
     byte[] colFamilyBytes = Bytes.toBytes(colFamily);
-
+    List<Mutation> mutationList = new ArrayList<Mutation>();
     for (Map.Entry<String, Object> fieldEntry : record.entrySet()) {
       String colName = fieldEntry.getKey();
       boolean rowKeyCol = false;
@@ -187,17 +187,24 @@ public class ToStringPutTransformer extends PutTransformer {
         // check addRowKey flag before including rowKey field.
         Object val = fieldEntry.getValue();
         if (null != val) {
+          // Put row-key in HBase
+          Put put = new Put(Bytes.toBytes(rowKey));
           if ( val instanceof byte[]) {
-            put.add(colFamilyBytes, getFieldNameBytes(colName),
+            put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
                 (byte[])val);
           } else {
-                 put.add(colFamilyBytes, getFieldNameBytes(colName),
+                 put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
                      Bytes.toBytes(toHBaseString(val)));
           }
+          mutationList.add(put);
+        } else {
+            Delete delete = new Delete(Bytes.toBytes(rowKey));
+            delete.addColumn(colFamilyBytes, getFieldNameBytes(colName));
+            mutationList.add(delete);
         }
       }
     }
-    return Collections.singletonList(put);
+    return Collections.unmodifiableList(mutationList);
   }
 
   private String toHBaseString(Object val) {
index 58ccee7..4b583dd 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -79,9 +80,12 @@ public class HBaseBulkImportMapper
     }
     Map<String, Object> fields = val.getFieldMap();
 
-    List<Put> putList = putTransformer.getPutCommand(fields);
-    for(Put put: putList){
-      context.write(new ImmutableBytesWritable(put.getRow()), put);
+    List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
+    for(Mutation mutation: mutationList){
+      if(mutation != null && mutation instanceof Put) {
+        Put putObject = (Put) mutation;
+        context.write(new ImmutableBytesWritable(putObject.getRow()), putObject);
+      }
     }
   }
   @Override
index fa14a01..4d79341 100644 (file)
@@ -72,6 +72,60 @@ public class HBaseImportTest extends HBaseTestCase {
   }
 
   @Test
+  public void testOverwriteNullColumnsSucceeds() throws IOException {
+    // Test that we can create a table and then import immediately
+    // back on top of it without problem and then update with null to validate
+    String [] argv = getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null);
+    String [] types = { "INT", "INT", "INT", "DATETIME" };
+    String [] vals = { "0", "1", "1", "'2017-03-20'" };
+    createTableWithColTypes(types, vals);
+    runImport(argv);
+    verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), "1");
+    // Run a second time.
+    argv = getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, "DATA_COL3", "2017-03-24 01:01:01.0", null);
+    vals = new String[] { "0", "1", null, "'2017-03-25'" };
+    updateTable(types, vals);
+    runImport(argv);
+    verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), null);
+  }
+
+  @Test
+  public void testAppendWithTimestampSucceeds() throws IOException {
+    // Test that we can create a table and then import multiple rows
+    // validate for append scenario with time stamp
+    String [] argv = getArgv(true, "AppendTable", "AppendColumnFamily", true, null);
+    String [] types = { "INT", "INT", "INT", "DATETIME" };
+    String [] vals = { "0", "1", "1", "'2017-03-20'" };
+    createTableWithColTypes(types, vals);
+    runImport(argv);
+    verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
+    // Run a second time.
+    argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null);
+    vals = new String[] { "1", "2", "3", "'2017-06-15'" };
+    insertIntoTable(types, vals);
+    runImport(argv);
+    verifyHBaseCell("AppendTable", "1", "AppendColumnFamily", getColName(2), "3");
+  }
+
+  @Test
+  public void testAppendSucceeds() throws IOException {
+       // Test that we can create a table and then import multiple rows
+       // validate for append scenario with ID column(DATA_COL3)
+    String [] argv = getArgv(true, "AppendTable", "AppendColumnFamily", true, null);
+    String [] types = { "INT", "INT", "INT", "DATETIME" };
+    String [] vals = { "0", "1", "1", "'2017-03-20'" };
+    createTableWithColTypes(types, vals);
+    runImport(argv);
+    verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
+    // Run a second time.
+    argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3");
+    vals = new String[] { "1", "2", "3", "'2017-06-15'" };
+    insertIntoTable(types, vals);
+    runImport(argv);
+    verifyHBaseCell("AppendTable", "1", "AppendColumnFamily", getColName(2), "3");
+  }
+
+  @Test
   public void testExitFailure() throws IOException {
     String [] types = { "INT", "INT", "INT" };
     String [] vals = { "0", "42", "43" };
index a054eb6..d9f7495 100644 (file)
 
 package com.cloudera.sqoop.hbase;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.util.StringUtils;
-
 import org.junit.After;
 import org.junit.Before;
 
 import com.cloudera.sqoop.testutil.CommonArgs;
 import com.cloudera.sqoop.testutil.HsqldbTestServer;
 import com.cloudera.sqoop.testutil.ImportJobTestCase;
-import java.io.File;
-import java.lang.reflect.Method;
-import java.util.UUID;
-import org.apache.commons.io.FileUtils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
 /**
  * Utility methods that facilitate HBase import tests.
@@ -115,7 +112,38 @@ public abstract class HBaseTestCase extends ImportJobTestCase {
     if (hbaseCreate) {
       args.add("--hbase-create-table");
     }
+    return args.toArray(new String[0]);
+  }
+
+  /**
+   * Create the argv to pass to Sqoop as incremental options.
+   * @return the argv as an array of strings.
+   */
+  protected String [] getIncrementalArgv(boolean includeHadoopFlags,
+      String hbaseTable, String hbaseColFam, boolean hbaseCreate,
+      String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn) {
+
+    String[] argsStrArray = getArgv(includeHadoopFlags, hbaseTable, hbaseColFam, hbaseCreate, queryStr);
+    List<String> args = new ArrayList<String>(Arrays.asList(argsStrArray));
 
+    if (isAppend) {
+      args.add("--incremental");
+      args.add("append");
+      if (!appendTimestamp) {
+        args.add("--check-column");
+        args.add(checkColumn);//"ID");
+      } else {
+        args.add("--check-column");
+        args.add(lastModifiedColumn);//LAST_MODIFIED");
+      }
+    } else {
+      args.add("--incremental");
+      args.add("lastmodified");
+      args.add("--check-column");
+      args.add(checkColumn);
+      args.add("--last-value");
+      args.add(checkValue);
+    }
     return args.toArray(new String[0]);
   }
   // Starts a mini hbase cluster in this process.
index 6310a39..8cbb37e 100644 (file)
@@ -383,7 +383,7 @@ public abstract class BaseSqoopTestCase {
               ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
           statement.executeUpdate();
         } catch (SQLException sqlException) {
-          fail("Could not create table: "
+          fail("Could not insert into table: "
               + StringUtils.stringifyException(sqlException));
         } finally {
           if (null != statement) {
@@ -413,6 +413,139 @@ public abstract class BaseSqoopTestCase {
   }
 
   /**
+   * insert into a table with a set of columns values for a given row.
+   * @param colTypes the types of the columns to make
+   * @param vals the SQL text for each value to insert
+   */
+  protected void insertIntoTable(String[] colTypes, String[] vals) {
+    assert colNames != null;
+    assert colNames.length == vals.length;
+
+    Connection conn = null;
+    PreparedStatement statement = null;
+
+    String[] colNames = new String[vals.length];
+    for( int i = 0; i < vals.length; i++) {
+      colNames[i] = BASE_COL_NAME + Integer.toString(i);
+    }
+    try {
+        conn = getManager().getConnection();
+        for (int count=0; vals != null && count < vals.length/colTypes.length;
+              ++count ) {
+         String columnListStr = "";
+         String valueListStr = "";
+         for (int i = 0; i < colTypes.length; i++) {
+           columnListStr += manager.escapeColName(colNames[i].toUpperCase());
+           valueListStr += vals[count * colTypes.length + i];
+           if (i < colTypes.length - 1) {
+             columnListStr += ", ";
+             valueListStr += ", ";
+           }
+         }
+         try {
+           String insertValsStr = "INSERT INTO " + manager.escapeTableName(getTableName()) + "(" + columnListStr + ")"
+               + " VALUES(" + valueListStr + ")";
+           LOG.info("Inserting values: " + insertValsStr);
+           statement = conn.prepareStatement(
+               insertValsStr,
+               ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+           statement.executeUpdate();
+         } catch (SQLException sqlException) {
+           fail("Could not insert into table: "
+               + StringUtils.stringifyException(sqlException));
+         } finally {
+           if (null != statement) {
+             try {
+               statement.close();
+             } catch (SQLException se) {
+               // Ignore exception on close.
+             }
+
+             statement = null;
+           }
+         }
+       }
+    conn.commit();
+    this.colNames = colNames;
+    } catch (SQLException se) {
+      if (null != conn) {
+        try {
+          conn.close();
+        } catch (SQLException connSE) {
+          // Ignore exception on close.
+       }
+      }
+      fail("Could not create table: " + StringUtils.stringifyException(se));
+    }
+
+  }
+
+  /**
+   * update a table with a set of columns values for a given row.
+   * @param colTypes the types of the columns to make
+   * @param vals the SQL text for each value to insert
+   */
+  protected void updateTable(String[] colTypes, String[] vals) {
+    assert colNames != null;
+    assert colNames.length == vals.length;
+
+    Connection conn = null;
+    PreparedStatement statement = null;
+
+    String[] colNames = new String[vals.length];
+    for( int i = 0; i < vals.length; i++) {
+      colNames[i] = BASE_COL_NAME + Integer.toString(i);
+    }
+
+    try {
+      conn = getManager().getConnection();
+      for (int count=0; vals != null && count < vals.length/colNames.length;
+           ++count ) {
+        String updateStr = "";
+        for (int i = 1; i < colNames.length; i++) {
+             updateStr += manager.escapeColName(colNames[i].toUpperCase()) + " = "+vals[count * colNames.length + i];
+          if (i < colNames.length - 1) {
+            updateStr += ", ";
+          }
+        }
+        updateStr += " WHERE "+colNames[0]+"="+vals[0]+"";
+        try {
+          String updateValsStr = "UPDATE " + manager.escapeTableName(getTableName()) + " SET " + updateStr;
+          LOG.info("updating values: " + updateValsStr);
+          statement = conn.prepareStatement(
+                      updateValsStr,
+              ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+          statement.executeUpdate();
+        } catch (SQLException sqlException) {
+          fail("Could not update table: "
+              + StringUtils.stringifyException(sqlException));
+        } finally {
+          if (null != statement) {
+            try {
+              statement.close();
+            } catch (SQLException se) {
+              // Ignore exception on close.
+            }
+            statement = null;
+          }
+        }
+      }
+
+      conn.commit();
+      this.colNames = colNames;
+    } catch (SQLException se) {
+      if (null != conn) {
+        try {
+          conn.close();
+        } catch (SQLException connSE) {
+          // Ignore exception on close.
+        }
+      }
+      fail("Could not update table: " + StringUtils.stringifyException(se));
+    }
+  }
+
+  /**
    * Create a table with a set of columns and add a row of values.
    * @param colTypes the types of the columns to make
    * @param vals the SQL text for each value to insert