SQOOP-3237: Mainframe FTP transfer option to insert custom FTP commands prior to... trunk
authorSzabolcs Vasas <vasas@apache.org>
Thu, 13 Dec 2018 14:19:16 +0000 (15:19 +0100)
committerSzabolcs Vasas <vasas@apache.org>
Thu, 13 Dec 2018 14:19:16 +0000 (15:19 +0100)
(Chris Teoh via Szabolcs Vasas)

src/docs/user/import-mainframe.txt
src/java/org/apache/sqoop/SqoopOptions.java
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
src/java/org/apache/sqoop/tool/MainframeImportTool.java
src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java

index 3ecfb7e..a994f8b 100644 (file)
@@ -214,6 +214,20 @@ buffer size specified in bytes. By default, --buffersize is set to 32760 bytes.
 will alter the number of records Sqoop reports to have imported. This is because it reads the
 binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records.
 
+Use the +\--ftp-commands+ with a comma separated list of commands to send custom FTP commands prior to
+file retrieval. This is useful for letting the mainframe know to embed data into the binary files
+like Record Descriptor Words for variable length records so downstream processes can separate each
+record. The mainframe will otherwise discard this metadata in the file transmission.
+
+NOTE: The responses from the mainframe of these commands are logged ONLY. It is up to the user to check
+for errors responses from the mainframe.
+
+----
+$ sqoop import-mainframe -D hadoop.security.credential.provider.path=jceks://file/my/folder/mainframe.jceks \
+  --connect <host> --username user1 --password-alias alias1 --dataset SomeDS --tape true \
+  --as-binaryfile --datasettype g --ftp-commands "SITE RDW,SITE RDW READTAPEFORMAT=V"
+----
+
 Additional Import Configuration Properties
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 There are some additional properties which can be configured by modifying
index f06872f..99eb8e6 100644 (file)
@@ -369,6 +369,9 @@ public class SqoopOptions implements Cloneable {
   // Buffer size to use when using binary FTP transfer mode
   @StoredAsProperty("mainframe.ftp.buffersize")
   private Integer bufferSize;
+  // custom FTP commands to be sent to mainframe
+  @StoredAsProperty("mainframe.ftp.commands")
+  private String customFtpCommands;
   // Accumulo home directory
   private String accumuloHome; // not serialized to metastore.
   // Zookeeper home directory
@@ -2528,6 +2531,16 @@ public class SqoopOptions implements Cloneable {
     bufferSize = buf;
   }
 
+  // sets the custom FTP commands
+  public void setFtpCommands(String ftpCmds) {
+    customFtpCommands = ftpCmds;
+  }
+
+  // gets the custom FTP commands issued
+  public String getFtpCommands() {
+    return customFtpCommands;
+  }
+
   public static String getAccumuloHomeDefault() {
     // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
     String accumuloHome = System.getenv("ACCUMULO_HOME");
index 9842daa..2ea21f7 100644 (file)
@@ -44,4 +44,6 @@ public class MainframeConfiguration
   public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760;
 
   public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize";
+
+  public static final String MAINFRAME_FTP_CUSTOM_COMMANDS = "mainframe.ftp.commands";
 }
index 90dc2dd..622667d 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.sqoop.mapreduce.mainframe;
 
 import java.io.IOException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -74,6 +75,11 @@ public class MainframeImportJob extends DataDrivenImportJob {
     job.getConfiguration().set(
             MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
             options.getMainframeInputDatasetTape().toString());
+    if (!StringUtils.isBlank(options.getFtpCommands())) {
+      job.getConfiguration().set(
+      MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS,
+      options.getFtpCommands());
+    }
     if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
       job.getConfiguration().set(
         MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
index fbc8c3d..cbaaf65 100644 (file)
@@ -42,6 +42,7 @@ public class MainframeImportTool extends ImportTool {
   public static final String DS_TYPE_ARG = "datasettype";
   public static final String DS_TAPE_ARG = "tape";
   public static final String BUFFERSIZE_ARG = "buffersize";
+  public static final String FTP_COMMANDS = "ftp-commands";
 
   public MainframeImportTool() {
     super("import-mainframe", false);
@@ -92,6 +93,10 @@ public class MainframeImportTool extends ImportTool {
       .hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)")
       .withLongOpt(BUFFERSIZE_ARG)
       .create());
+    importOpts.addOption(OptionBuilder.withArgName("Comma separated FTP commands issued before FTP transfer")
+      .hasArg().withDescription("Additional FTP commands issued before transfer")
+      .withLongOpt(FTP_COMMANDS)
+      .create());
     importOpts.addOption(OptionBuilder.withArgName("n")
         .hasArg().withDescription("Use 'n' map tasks to import in parallel")
         .withLongOpt(NUM_MAPPERS_ARG)
@@ -200,6 +205,9 @@ public class MainframeImportTool extends ImportTool {
       // set the default buffer size to 32kB
       out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
     }
+    if (in.hasOption(FTP_COMMANDS)) {
+      out.setFtpCommands(in.getOptionValue(FTP_COMMANDS));
+    }
   }
 
   @Override
index e7c48a6..a80aad9 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.sqoop.util;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
@@ -226,6 +227,7 @@ public final class MainframeFTPClientUtils {
         LOG.info("Defaulting FTP transfer mode to ascii");
         ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE);
       }
+      applyFtpCmds(ftp,conf);
       // Use passive mode as default.
       ftp.enterLocalPassiveMode();
       LOG.info("System type detected: " + ftp.getSystemType());
@@ -271,4 +273,28 @@ public final class MainframeFTPClientUtils {
     mockFTPClient = FTPClient;
   }
 
+  public static List<String> applyFtpCmds(FTPClient ftp, Configuration conf) throws IOException {
+    String ftpCmds = conf.get(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS);
+    String[] ftpCmdList = parseFtpCommands(ftpCmds);
+    List<String> results = new ArrayList<String>();
+    for (String ftpCommand : ftpCmdList) {
+      LOG.info("Issuing command: "+ftpCommand);
+      int res = ftp.sendCommand(ftpCommand);
+      String result = ftp.getReplyString();
+      results.add(result);
+      LOG.info("ReplyCode: "+res + " ReplyString: "+result);
+    }
+    return results;
+  }
+
+  // splits out the concatenated FTP commands
+  public static String[] parseFtpCommands(String ftpCmds) {
+    if (StringUtils.isBlank(ftpCmds)) {
+      return new String[] {};
+    }
+    return Arrays.stream(ftpCmds.split(","))
+      .map(String::trim)
+      .filter(StringUtils::isNotEmpty)
+      .toArray(String[]::new);
+  }
 }
index 00e57bd..9c4ac48 100644 (file)
@@ -228,6 +228,19 @@ public class TestMainframeImportTool extends BaseSqoopTestCase {
     configureAndValidateOptions(args);
   }
 
+  @Test
+  public void testFtpTransferCommands() throws ParseException, InvalidOptionsException {
+    String expectedCmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
+    String[] args = new String[] { "--dataset", "mydatasetname", "--ftp-commands", expectedCmds};
+    ToolOptions toolOptions = new ToolOptions();
+    SqoopOptions sqoopOption = new SqoopOptions();
+    mfImportTool.configureOptions(toolOptions);
+    sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+    mfImportTool.validateImportOptions(sqoopOption);
+    String ftpcmds = sqoopOption.getFtpCommands();
+    assertEquals(ftpcmds,expectedCmds);
+  }
+
   private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException {
     mfImportTool.configureOptions(toolOptions);
     sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
index fc6e56d..7a842ec 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -26,6 +28,7 @@ import static org.mockito.Mockito.verify;
 import java.io.IOException;
 
 import java.util.List;
+
 import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
 import org.apache.commons.net.ftp.FTPListParseEngine;
@@ -48,6 +51,8 @@ public class TestMainframeFTPClientUtils {
 
   private FTPClient mockFTPClient;
   private FTPListParseEngine mockFTPListParseEngine;
+  private static final String DEFAULT_FTP_USERNAME="user";
+  private static final String DEFAULT_FTP_PASSWORD="pssword";
 
   @Before
   public void setUp() {
@@ -119,9 +124,8 @@ public class TestMainframeFTPClientUtils {
     }
 
     FTPClient ftp = null;
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername");
     // set the password in the secure credentials object
     Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
     conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
@@ -148,13 +152,8 @@ public class TestMainframeFTPClientUtils {
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "userr");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-        "pssword".getBytes());
+    setupDefaultConfiguration();
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "invalidusername");
 
     try {
       MainframeFTPClientUtils.listSequentialDatasets("pdsName", conf);
@@ -187,15 +186,9 @@ public class TestMainframeFTPClientUtils {
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-            "pssword".getBytes());
 
     try {
       List<String> files = MainframeFTPClientUtils.listSequentialDatasets("a.b.c.blah1", conf);
@@ -226,15 +219,9 @@ public class TestMainframeFTPClientUtils {
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-            "pssword".getBytes());
 
     try {
                String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
@@ -266,15 +253,9 @@ public class TestMainframeFTPClientUtils {
       fail("No IOException should be thrown!");
     }
 
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1.");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-            "pssword".getBytes());
 
     try {
                String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
@@ -307,15 +288,9 @@ public class TestMainframeFTPClientUtils {
              fail("No IOException should be thrown!");
            }
 
-           conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-           conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-           conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+           setupDefaultConfiguration();
            conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
            conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
-           // set the password in the secure credentials object
-           Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-           conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
-                   "pssword".getBytes());
 
            try {
                        String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
@@ -347,14 +322,9 @@ public class TestMainframeFTPClientUtils {
     } catch (IOException e) {
       fail("No IOException should be thrown!");
     }
-    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
-    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
-    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    setupDefaultConfiguration();
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"p");
     conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
-    // set the password in the secure credentials object
-    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
-    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY, "pssword".getBytes());
     try {
       String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
       List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
@@ -365,4 +335,79 @@ public class TestMainframeFTPClientUtils {
       Assert.fail(ioeString);
     }
   }
+
+  @Test
+  public void testFtpCommandExecutes() throws IOException {
+    final String EXPECTED_RESPONSE = "200 OK";
+    final int EXPECTED_RESPONSE_CODE = 200;
+    String ftpcmds = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
+    when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+    when(mockFTPClient.logout()).thenReturn(true);
+    when(mockFTPClient.isConnected()).thenReturn(false);
+    when(mockFTPClient.getReplyCode()).thenReturn(EXPECTED_RESPONSE_CODE);
+    when(mockFTPClient.getReplyString()).thenReturn(EXPECTED_RESPONSE);
+    setupDefaultConfiguration();
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
+    conf.set(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
+    conf.set(MainframeConfiguration.MAINFRAME_FTP_CUSTOM_COMMANDS, ftpcmds);
+    MainframeFTPClientUtils.setMockFTPClient(mockFTPClient);
+    FTPClient ftp = MainframeFTPClientUtils.getFTPConnection(conf);
+    verify(mockFTPClient).sendCommand("quote SITE RDW");
+    verify(mockFTPClient).sendCommand("quote SITE RDW READTAPEFORMAT=V");
+  }
+
+  @Test
+  public void testFtpCommandsOneCommand() {
+    String inputString = "quote SITE RDW READTAPEFORMAT=V";
+    String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsOneCommandWithComma() {
+    String inputString = ",quote SITE RDW READTAPEFORMAT=V";
+    String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsOneCommandWithCommas() {
+    String inputString = ",quote SITE RDW READTAPEFORMAT=V,";
+    String [] expected = new String [] {"quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsTwoCommandWithComma() {
+    String inputString = "quote SITE RDW,quote SITE RDW READTAPEFORMAT=V";
+    String [] expected = new String [] {"quote SITE RDW","quote SITE RDW READTAPEFORMAT=V"};
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertArrayEquals(cmds,expected);
+  }
+
+  @Test
+  public void testFtpCommandsNullCommand() {
+    String inputString = null;
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertEquals(0, cmds.length);
+  }
+
+  @Test
+  public void testFtpCommandsEmptyCommands() {
+    String inputString = ",,,";
+    String [] cmds = MainframeFTPClientUtils.parseFtpCommands(inputString);
+    assertEquals(0, cmds.length);
+  }
+
+  private void setupDefaultConfiguration() {
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, DEFAULT_FTP_USERNAME);
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+      DEFAULT_FTP_PASSWORD.getBytes());
+  }
 }