SQOOP-2334: Sqoop Volume Per Mapper
authorVenkat Ranganathan <venkat@hortonworks.com>
Tue, 30 Jun 2015 03:32:31 +0000 (20:32 -0700)
committerVenkat Ranganathan <venkat@hortonworks.com>
Tue, 30 Jun 2015 03:33:13 +0000 (20:33 -0700)
 (Rakesh Sharma via Venkat Ranganathan)

12 files changed:
src/docs/man/import-args.txt
src/docs/user/import.txt
src/java/org/apache/sqoop/SqoopOptions.java
src/java/org/apache/sqoop/config/ConfigurationConstants.java
src/java/org/apache/sqoop/config/ConfigurationHelper.java
src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
src/java/org/apache/sqoop/mapreduce/db/DateSplitter.java
src/java/org/apache/sqoop/mapreduce/db/IntegerSplitter.java
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/java/org/apache/sqoop/tool/ImportTool.java
src/test/org/apache/sqoop/mapreduce/db/TestIntegerSplitter.java

index 93f65ba..49855ce 100644 (file)
@@ -42,6 +42,10 @@ include::import-common-args.txt[]
 --split-by (column-name)::
   Column of the table used to split the table for parallel import
 
+--split-limit (size)::
+  Upper Limit for each split size. Optimize Integer and Date columns.
+  For date or timestamp fields it is calculated in seconds.
+
 --table (table-name)::
   The table to import
 
index df04157..342633a 100644 (file)
@@ -73,6 +73,9 @@ Argument                          Description
 +\--split-by <column-name>+       Column of the table used to split work\
                                   units.  Cannot be used with\
                                   +--autoreset-to-one-mapper+ option.
++\--split-limit <n>+              Upper Limit for each split size.\
+                                  This only applies to Integer and Date columns.\
+                                  For date or timestamp fields it is calculated in seconds.
 +\--autoreset-to-one-mapper+      Import should use one mapper if a table\
                                   has no primary key and no split-by column\
                                   is provided.  Cannot be used with\
@@ -211,6 +214,17 @@ multi-column indices. If your table has no index column, or has a
 multi-column key, then you must also manually choose a splitting
 column.
 
+User can override the +\--num-mapers+ by using +\--split-limit+ option.
+Using the +\--split-limit+ parameter places a limit on the size of the split
+section created. If the size of the split created is larger than the size
+specified in this parameter, then the splits would be resized to fit within
+this limit, and the number of splits will change according to that.This
+affects actual number of mappers. If size of a split calculated based on
+provided +\--num-mappers+ parameter exceeds +\--split-limit+ parameter then actual
+number of mappers will be increased.If the value specified in +\--split-limit+
+parameter is 0 or negative, the parameter will be ignored altogether and
+the split size will be calculated according to the number of mappers.
+
 If a table does not have a primary key defined and the +--split-by <col>+
 is not provided, then import will fail unless the number
 of mappers is explicitly set to one with the +--num-mappers 1+ option
index d7c9cbb..9405605 100644 (file)
@@ -138,6 +138,7 @@ public class SqoopOptions implements Cloneable {
   @StoredAsProperty("codegen.auto.compile.dir") private boolean jarDirIsAuto;
   private String hadoopMapRedHome; // not serialized to metastore.
   @StoredAsProperty("db.split.column") private String splitByCol;
+  @StoredAsProperty("split.limit") private Integer splitLimit;
   @StoredAsProperty("db.where.clause") private String whereClause;
   @StoredAsProperty("db.query") private String sqlQuery;
   @StoredAsProperty("db.query.boundary") private String boundaryQuery;
@@ -1178,6 +1179,14 @@ public class SqoopOptions implements Cloneable {
     this.splitByCol = splitBy;
   }
 
+  public Integer getSplitLimit() {
+    return splitLimit;
+  }
+
+  public void setSplitLimit(Integer splitLimit) {
+    this.splitLimit = splitLimit;
+  }
+
   public String getWhereClause() {
     return whereClause;
   }
index 2070b63..e19c17b 100644 (file)
@@ -95,6 +95,11 @@ public final class ConfigurationConstants {
    */
   public static final String MAPRED_DISTCACHE_CONF_PARAM = "tmpjars";
 
+  /**
+   * The Configuration property identifying the split size.
+   */
+  public static final String PROP_SPLIT_LIMIT = "split.limit";
+
   private ConfigurationConstants() {
     // Disable Explicit Object Creation
   }
index 8dc2061..298907d 100644 (file)
@@ -213,6 +213,25 @@ public final class ConfigurationHelper {
     return ret;
   }
 
+  /**
+   * Stores in configuration the size of single hadoop input split.
+   *
+   * @param config Configuration to store the split size.
+   * @param splitLimit The size of single hadoop input split.
+   */
+  public static void setSplitLimit(Configuration config, long splitLimit) {
+      config.setLong(ConfigurationConstants.PROP_SPLIT_LIMIT, splitLimit);
+  }
+
+  /**
+   * Retrieves the size of single hadoop input split.
+   *
+   * @param config Configuration to retrieve the split size.
+   * @return Split size.
+   */
+  public static long getSplitLimit(Configuration config) {
+      return config.getInt(ConfigurationConstants.PROP_SPLIT_LIMIT, -1);
+  }
   public static boolean isLocalJobTracker(Configuration conf) {
     // If framework is set to YARN, then we can't be running in local mode
     if ("yarn".equalsIgnoreCase(conf
index 7521464..388ce7d 100644 (file)
@@ -320,6 +320,11 @@ public class DataDrivenImportJob extends ImportJobBase {
       job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
           options.getInlineLobLimit());
 
+      if (options.getSplitLimit() != null) {
+        org.apache.sqoop.config.ConfigurationHelper.setSplitLimit(
+          job.getConfiguration(), options.getSplitLimit());
+      }
+
       LOG.debug("Using InputFormat: " + inputFormatClass);
       job.setInputFormatClass(inputFormatClass);
     } finally {
index 2c59fe5..db96e41 100644 (file)
@@ -77,13 +77,27 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
    * into InputSplits.
    */
   protected DBSplitter getSplitter(int sqlDataType) {
+    return getSplitter(sqlDataType, 0);
+  }
+
+  /**
+   * @return the DBSplitter implementation to use to divide the table/query
+   * into InputSplits.
+   */
+  protected DBSplitter getSplitter(int sqlDataType, long splitLimit) {
     switch (sqlDataType) {
     case Types.NUMERIC:
     case Types.DECIMAL:
+      if(splitLimit >= 0) {
+        throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns");
+      }
       return new BigDecimalSplitter();
 
     case Types.BIT:
     case Types.BOOLEAN:
+      if(splitLimit >= 0) {
+        throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns");
+      }
       return new BooleanSplitter();
 
     case Types.INTEGER:
@@ -95,15 +109,24 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
     case Types.REAL:
     case Types.FLOAT:
     case Types.DOUBLE:
+      if(splitLimit >= 0) {
+        throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns");
+      }
       return new FloatSplitter();
 
     case Types.NVARCHAR:
     case Types.NCHAR:
+      if(splitLimit >= 0) {
+        throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns");
+      }
       return new NTextSplitter();
 
     case Types.CHAR:
     case Types.VARCHAR:
     case Types.LONGVARCHAR:
+      if(splitLimit >= 0) {
+         throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns");
+      }
       return new TextSplitter();
 
     case Types.DATE:
@@ -114,6 +137,9 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
     default:
       // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB,
       // BLOB, ARRAY, STRUCT, REF, DATALINK, and JAVA_OBJECT.
+      if(splitLimit >= 0) {
+        throw new IllegalArgumentException("split-limit is supported only with Integer and Date columns");
+      }
       return null;
     }
   }
@@ -125,12 +151,15 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
     int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
     String boundaryQuery = getDBConf().getInputBoundingQuery();
 
+    long splitLimit = org.apache.sqoop.config.ConfigurationHelper
+      .getSplitLimit(job.getConfiguration());
     // If user do not forced us to use his boundary query and we don't have to
     // bacause there is only one mapper we will return single split that
     // separates nothing. This can be considerably more optimal for a large
     // table with no index.
     if (1 == targetNumTasks
-            && (boundaryQuery == null || boundaryQuery.isEmpty())) {
+            && (boundaryQuery == null || boundaryQuery.isEmpty())
+            && splitLimit <= 0) {
       List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
       singletonSplit.add(new com.cloudera.sqoop.mapreduce.db.
           DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1"));
@@ -160,7 +189,7 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
           sqlDataType = Types.BIGINT;
       }
 
-      DBSplitter splitter = getSplitter(sqlDataType);
+      DBSplitter splitter = getSplitter(sqlDataType, splitLimit);
       if (null == splitter) {
         throw new IOException("Sqoop does not have the splitter for the given"
           + " SQL data type. Please use either different split column (argument"
index 31e9351..9b94283 100644 (file)
@@ -42,6 +42,10 @@ public class DateSplitter extends IntegerSplitter {
 
   private static final Log LOG = LogFactory.getLog(DateSplitter.class);
 
+  //Factor to convert the value to milliseconds.
+  //For Split limit we take input as seconds. So we need to convert to milliseconds
+  private static final long MS_IN_SEC = 1000L;
+
   public List<InputSplit> split(Configuration conf, ResultSet results,
       String colName) throws SQLException {
 
@@ -69,8 +73,11 @@ public class DateSplitter extends IntegerSplitter {
       return splits;
     }
 
+    // For split size we are using seconds. So we need to convert to milliseconds.
+    long splitLimit = org.apache.sqoop.config.ConfigurationHelper.getSplitLimit(conf) * MS_IN_SEC;
+
     // Gather the split point integers
-    List<Long> splitPoints = split(numSplits, minVal, maxVal);
+    List<Long> splitPoints = split(numSplits,splitLimit, minVal, maxVal);
     List<InputSplit> splits = new ArrayList<InputSplit>();
 
     // Turn the split points into a set of intervals.
index e6fefc6..5f8f937 100644 (file)
@@ -60,8 +60,10 @@ public class IntegerSplitter implements DBSplitter  {
         return splits;
       }
 
+      long splitLimit = org.apache.sqoop.config.ConfigurationHelper.getSplitLimit(conf);
+
       // Get all the split points together.
-      List<Long> splitPoints = split(numSplits, minVal, maxVal);
+      List<Long> splitPoints = split(numSplits,splitLimit, minVal, maxVal);
       if (LOG.isDebugEnabled()) {
         LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts",
             minVal, maxVal, numSplits));
@@ -112,8 +114,15 @@ public class IntegerSplitter implements DBSplitter  {
      * [5, 8)
      * [8, 12)
      * [12, 18] note the closed interval for the last split.
+     *
+     * @param numSplits Number of split chunks.
+     * @param splitLimit Limit the split size.
+     * @param minVal Minimum value of the set to split.
+     * @param maxVal Maximum value of the set to split.
+     * @return Split values inside the set.
+     * @throws SQLException In case of SQL exception.
      */
-    public List<Long> split(long numSplits, long minVal, long maxVal)
+    public List<Long> split(long numSplits,long splitLimit, long minVal, long maxVal)
         throws SQLException {
 
       List<Long> splits = new ArrayList<Long>();
@@ -124,6 +133,20 @@ public class IntegerSplitter implements DBSplitter  {
       // and add 1 if the current split index is less than the < the remainder.
       // This is guaranteed to add up to remainder and not surpass the value.
       long splitSize = (maxVal - minVal) / numSplits;
+      double splitSizeDouble = ((double)maxVal - (double)minVal) / (double)numSplits;
+
+      if (splitLimit > 0 && splitSizeDouble > splitLimit) {
+        // If split size is greater than limit then do the same thing with larger
+        // amount of splits.
+         LOG.debug("Adjusting split size " + splitSize
+          + " because it's greater than limit " + splitLimit);
+        long newSplits = (maxVal - minVal) / splitLimit;
+        return split(newSplits != numSplits ? newSplits : newSplits + 1,
+         splitLimit, minVal, maxVal);
+      }
+      LOG.info("Split size: " + splitSize + "; Num splits: " + numSplits
+       + " from: " + minVal + " to: " + maxVal);
+
       long remainder = (maxVal - minVal) % numSplits;
       long curVal = minVal;
 
index c97bb58..4e2e66d 100644 (file)
@@ -82,6 +82,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
   public static final String CLEAR_STAGING_TABLE_ARG = "clear-staging-table";
   public static final String COLUMNS_ARG = "columns";
   public static final String SPLIT_BY_ARG = "split-by";
+  public static final String SPLIT_LIMIT_ARG = "split-limit";
   public static final String WHERE_ARG = "where";
   public static final String HADOOP_HOME_ARG = "hadoop-home";
   public static final String HADOOP_MAPRED_HOME_ARG = "hadoop-mapred-home";
index c79e044..39af42c 100644 (file)
@@ -661,6 +661,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
           .withDescription("Column of the table used to split work units")
           .withLongOpt(SPLIT_BY_ARG)
           .create());
+      importOpts
+        .addOption(OptionBuilder
+          .withArgName("size")
+          .hasArg()
+          .withDescription(
+            "Upper Limit of rows per split for split columns of Date/Time/Timestamp and integer types. For date or timestamp fields it is calculated in seconds. split-limit should be greater than 0")
+          .withLongOpt(SPLIT_LIMIT_ARG)
+          .create());
       importOpts.addOption(OptionBuilder.withArgName("where clause")
           .hasArg().withDescription("WHERE clause to use during import")
           .withLongOpt(WHERE_ARG)
@@ -887,6 +895,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
           out.setSplitByCol(in.getOptionValue(SPLIT_BY_ARG));
         }
 
+        if (in.hasOption(SPLIT_LIMIT_ARG)) {
+            out.setSplitLimit(Integer.parseInt(in.getOptionValue(SPLIT_LIMIT_ARG)));
+        }
+
         if (in.hasOption(WHERE_ARG)) {
           out.setWhereClause(in.getOptionValue(WHERE_ARG));
         }
index 136afc7..e93b6ad 100644 (file)
@@ -76,38 +76,38 @@ public class TestIntegerSplitter extends TestCase {
   }
 
   public void testEvenSplits() throws SQLException {
-    List<Long> splits = new IntegerSplitter().split(10, 0, 100);
+    List<Long> splits = new IntegerSplitter().split(10,-1, 0, 100);
     long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, };
     assertLongArrayEquals(expected, toLongArray(splits));
   }
 
   public void testOddSplits() throws SQLException {
-    List<Long> splits = new IntegerSplitter().split(10, 0, 95);
+    List<Long> splits = new IntegerSplitter().split(10,-1, 0, 95);
     long [] expected = { 0, 10, 20, 30, 40, 50, 59, 68, 77, 86, 95, };
     assertLongArrayEquals(expected, toLongArray(splits));
   }
 
   public void testSingletonSplit() throws SQLException {
-    List<Long> splits = new IntegerSplitter().split(1, 5, 5);
+    List<Long> splits = new IntegerSplitter().split(1,-1, 5, 5);
     long [] expected = { 5, 5 };
     assertLongArrayEquals(expected, toLongArray(splits));
   }
 
   public void testSingletonSplit2() throws SQLException {
     // Same test, but overly-high numSplits
-    List<Long> splits = new IntegerSplitter().split(5, 5, 5);
+    List<Long> splits = new IntegerSplitter().split(5,-1, 5, 5);
     long [] expected = { 5, 5 };
     assertLongArrayEquals(expected, toLongArray(splits));
   }
 
   public void testTooManySplits() throws SQLException {
-    List<Long> splits = new IntegerSplitter().split(5, 3, 5);
+    List<Long> splits = new IntegerSplitter().split(5,-1, 3, 5);
     long [] expected = { 3, 4, 5, 5};
     assertLongArrayEquals(expected, toLongArray(splits));
   }
 
   public void testExactSplitsAsInterval() throws SQLException {
-    List<Long> splits = new IntegerSplitter().split(5, 1, 5);
+    List<Long> splits = new IntegerSplitter().split(5,-1, 1, 5);
     long [] expected = { 1, 2, 3, 4, 5, 5};
     assertLongArrayEquals(expected, toLongArray(splits));
   }
@@ -119,8 +119,32 @@ public class TestIntegerSplitter extends TestCase {
    * @throws SQLException
    */
   public void testBigIntSplits() throws SQLException {
-    List<Long> splits = new IntegerSplitter().split(4, 14,
+    List<Long> splits = new IntegerSplitter().split(4,-1, 14,
         7863696997872966707L);
     assertEquals(splits.size(), 5);
   }
+
+  public void testEvenSplitsWithLimit() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(5, 10, 0, 100);
+    long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testOddSplitsWithLimit() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(5, 10, 0, 95);
+    long [] expected = { 0, 10, 20, 30, 40, 50, 59, 68, 77, 86, 95};
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testSplitWithBiggerLimit() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(10, 15, 0, 100);
+    long [] expected = {0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100};
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testFractionalSplitWithLimit() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(5, 1, 1, 10);
+    long [] expected = {1,2, 3, 4, 5, 6, 7, 8, 9, 10, 10};
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
 }