3c5bb327e18aa79671e60f9c43a43092df332103
[sqoop.git] / src / test / org / apache / sqoop / manager / sqlserver / SQLServerManagerImportTest.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.manager.sqlserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.io.IOUtils;
26 import org.apache.sqoop.ConnFactory;
27 import org.apache.sqoop.SqoopOptions;
28 import org.apache.sqoop.manager.SQLServerManager;
29 import org.apache.sqoop.testcategories.thirdpartytest.SqlServerTest;
30 import org.apache.sqoop.testutil.ArgumentArrayBuilder;
31 import org.apache.sqoop.testutil.BaseSqoopTestCase;
32 import org.apache.sqoop.testutil.ImportJobTestCase;
33 import org.apache.sqoop.util.BlockJUnit4ClassRunnerWithParametersFactory;
34 import org.apache.sqoop.util.ExpectedLogMessage;
35 import org.apache.sqoop.util.FileListing;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Rule;
39 import org.junit.Test;
40 import org.junit.experimental.categories.Category;
41 import org.junit.runner.RunWith;
42 import org.junit.runners.Parameterized;
43 import org.junit.runners.Parameterized.Parameters;
44
45 import java.io.BufferedReader;
46 import java.io.File;
47 import java.io.FileInputStream;
48 import java.io.IOException;
49 import java.io.InputStreamReader;
50 import java.sql.Connection;
51 import java.sql.SQLException;
52 import java.sql.Statement;
53 import java.util.Arrays;
54
55 import static org.junit.Assert.assertEquals;
56 import static org.junit.Assert.assertTrue;
57 import static org.junit.Assert.fail;
58
59 @RunWith(Parameterized.class)
60 @Parameterized.UseParametersRunnerFactory(BlockJUnit4ClassRunnerWithParametersFactory.class)
61 /**
62 * Test the SQLServerManager implementation.
63 *
64 * This uses JDBC to import data from an SQLServer database into HDFS.
65 *
66 * Since this requires an SQLServer installation,
67 * this class is named in such a way that Sqoop's default QA process does
68 * not run it. You need to run this manually with
69 * -Dtestcase=SQLServerManagerImportTest or -Dthirdparty=true.
70 *
71 * You need to put SQL Server JDBC driver library (sqljdbc4.jar) in a location
72 * where Sqoop will be able to access it (since this library cannot be checked
73 * into Apache's tree for licensing reasons) and set it's path through -Dsqoop.thirdparty.lib.dir.
74 *
75 * To set up your test environment:
76 * Install SQL Server Express 2012
77 * Create a database SQOOPTEST
78 * Create a login SQOOPUSER with password PASSWORD and grant all
79 * access for SQOOPTEST to SQOOPUSER.
80 * Set these through -Dsqoop.test.sqlserver.connectstring.host_url, -Dsqoop.test.sqlserver.database and
81 * -Dms.sqlserver.password
82 */
83 @Category(SqlServerTest.class)
84 public class SQLServerManagerImportTest extends ImportJobTestCase {
85
86 public static final Log LOG = LogFactory.getLog(
87 SQLServerManagerImportTest.class.getName());
88
89 static final String HOST_URL = System.getProperty(
90 "sqoop.test.sqlserver.connectstring.host_url",
91 "jdbc:sqlserver://sqlserverhost:1433");
92 static final String DATABASE_NAME = System.getProperty(
93 "sqoop.test.sqlserver.database",
94 "sqooptest");
95 static final String DATABASE_USER = System.getProperty(
96 "ms.sqlserver.username",
97 "sqoopuser");
98 static final String DATABASE_PASSWORD = System.getProperty(
99 "ms.sqlserver.password",
100 "password");
101
102 static final String SCHEMA_DBO = "dbo";
103 static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL";
104 static final String SCHEMA_SCH = "sch";
105 static final String SCH_TABLE_NAME = "PRIVATE_TABLE";
106 static final String CONNECT_STRING = HOST_URL + ";databaseName=" + DATABASE_NAME;
107
108 static final String CONNECTOR_FACTORY = System.getProperty(
109 "sqoop.test.msserver.connector.factory",
110 ConnFactory.DEFAULT_FACTORY_CLASS_NAMES);
111
112 // instance variables populated during setUp, used during tests
113 private SQLServerManager manager;
114
115 private Configuration conf = new Configuration();
116
117 private Connection conn = null;
118
119 public static final String[] EXPECTED_RESULTS = new String[]{
120 "1,Aaron,1000000.0,engineering",
121 "2,Bob,400.0,sales",
122 "3,Fred,15.0,marketing",
123 };
124
125 @Parameters(name = "Builder: {0}, Table: {1}")
126 public static Iterable<? extends Object> testConfigurations() {
127 ArgumentArrayBuilder builderForTableImportWithExplicitSchema = getArgsBuilderForTableImport().withToolOption("schema", SCHEMA_DBO);
128 return Arrays.asList(
129 new Object[] { getArgsBuilderForQueryImport(), DBO_TABLE_NAME },
130 new Object[] { getArgsBuilderForTableImport(), DBO_TABLE_NAME },
131 new Object[] { getArgsBuilderForDifferentSchemaTableImport(), SCH_TABLE_NAME },
132 new Object[] { builderForTableImportWithExplicitSchema, DBO_TABLE_NAME }
133 );
134 }
135
136 private final ArgumentArrayBuilder builder;
137 private final String tableName;
138
139 public SQLServerManagerImportTest(ArgumentArrayBuilder builder, String tableName) {
140 this.builder = new ArgumentArrayBuilder().with(builder);
141 this.tableName = tableName;
142 }
143
144 @Override
145 protected Configuration getConf() {
146 return conf;
147 }
148
149 @Override
150 protected boolean useHsqldbTestServer() {
151 return false;
152 }
153
154 private String getDropTableStatement(String schema, String tableName) {
155 return "DROP TABLE IF EXISTS " + manager.escapeObjectName(schema)
156 + "." + manager.escapeObjectName(tableName);
157 }
158
159 @Before
160 public void setUp() {
161 super.setUp();
162
163 SqoopOptions options = new SqoopOptions(CONNECT_STRING, DBO_TABLE_NAME);
164 options.setUsername(DATABASE_USER);
165 options.setPassword(DATABASE_PASSWORD);
166
167 manager = new SQLServerManager(options);
168
169 createTableAndPopulateData(SCHEMA_DBO, DBO_TABLE_NAME);
170 createTableAndPopulateData(SCHEMA_SCH, SCH_TABLE_NAME);
171
172 // To test with Microsoft SQL server connector, copy the connector jar to
173 // sqoop.thirdparty.lib.dir and set sqoop.test.msserver.connector.factory
174 // to com.microsoft.sqoop.SqlServer.MSSQLServerManagerFactory. By default,
175 // the built-in SQL server connector is used.
176 conf.setStrings(ConnFactory.FACTORY_CLASS_NAMES_KEY, CONNECTOR_FACTORY);
177 }
178
179 public void createTableAndPopulateData(String schema, String table) {
180 String fulltableName = manager.escapeObjectName(schema)
181 + "." + manager.escapeObjectName(table);
182
183 Statement stmt = null;
184
185 // Create schema if needed
186 try {
187 conn = manager.getConnection();
188 stmt = conn.createStatement();
189 stmt.execute("CREATE SCHEMA " + schema);
190 } catch (SQLException sqlE) {
191 LOG.info("Can't create schema: " + sqlE.getMessage());
192 } finally {
193 try {
194 if (null != stmt) {
195 stmt.close();
196 }
197 } catch (Exception ex) {
198 LOG.warn("Exception while closing stmt", ex);
199 }
200 }
201
202 // Drop the existing table, if there is one.
203 try {
204 conn = manager.getConnection();
205 stmt = conn.createStatement();
206 stmt.execute("DROP TABLE " + fulltableName);
207 } catch (SQLException sqlE) {
208 LOG.info("Table was not dropped: " + sqlE.getMessage());
209 } finally {
210 try {
211 if (null != stmt) {
212 stmt.close();
213 }
214 } catch (Exception ex) {
215 LOG.warn("Exception while closing stmt", ex);
216 }
217 }
218
219 // Create and populate table
220 try {
221 conn = manager.getConnection();
222 conn.setAutoCommit(false);
223 stmt = conn.createStatement();
224
225 // create the database table and populate it with data.
226 stmt.executeUpdate("CREATE TABLE " + fulltableName + " ("
227 + "id INT NOT NULL, "
228 + "name VARCHAR(24) NOT NULL, "
229 + "salary FLOAT, "
230 + "dept VARCHAR(32), "
231 + "PRIMARY KEY (id))");
232
233 stmt.executeUpdate("INSERT INTO " + fulltableName + " VALUES("
234 + "1,'Aaron', "
235 + "1000000.00,'engineering')");
236 stmt.executeUpdate("INSERT INTO " + fulltableName + " VALUES("
237 + "2,'Bob', "
238 + "400.00,'sales')");
239 stmt.executeUpdate("INSERT INTO " + fulltableName + " VALUES("
240 + "3,'Fred', 15.00,"
241 + "'marketing')");
242 conn.commit();
243 } catch (SQLException sqlE) {
244 LOG.error("Encountered SQL Exception: ", sqlE);
245 sqlE.printStackTrace();
246 fail("SQLException when running test setUp(): " + sqlE);
247 } finally {
248 try {
249 if (null != stmt) {
250 stmt.close();
251 }
252 } catch (Exception ex) {
253 LOG.warn("Exception while closing connection/stmt", ex);
254 }
255 }
256 }
257
258 @After
259 public void tearDown() {
260 try {
261 Statement stmt = conn.createStatement();
262 stmt.executeUpdate(getDropTableStatement(SCHEMA_DBO, DBO_TABLE_NAME));
263 stmt.executeUpdate(getDropTableStatement(SCHEMA_SCH, SCH_TABLE_NAME));
264 } catch (SQLException e) {
265 LOG.error("Can't clean up the database:", e);
266 }
267
268 super.tearDown();
269 try {
270 manager.close();
271 } catch (SQLException sqlE) {
272 LOG.error("Got SQLException: " + sqlE.toString());
273 fail("Got SQLException: " + sqlE.toString());
274 }
275 }
276
277 @Rule
278 public ExpectedLogMessage logMessage = new ExpectedLogMessage();
279
280 @Test
281 public void testImportSimple() throws IOException {
282 doImportAndVerify(builder, tableName);
283 }
284
285 @Test
286 public void testImportTableHints() throws IOException {
287 builder.withToolOption("table-hints", "NOLOCK");
288 doImportAndVerify(builder, tableName);
289 }
290
291 @Test
292 public void testImportTableHintsMultiple() throws IOException {
293 builder.withToolOption("table-hints", "NOLOCK,NOWAIT");
294 doImportAndVerify(builder, tableName);
295 }
296
297 @Test
298 public void testImportTableResilient() throws IOException {
299 logMessage.expectWarn("Sqoop will use resilient operations! In case of import, the split-by column also has to be specified, unique, and in ascending order.");
300 builder.withToolOption("resilient");
301 doImportAndVerify(builder, tableName);
302 }
303
304 /**
305 * The resilient option was named non-resilient before, but got renamed.
306 * This test is here to ensure backward compatibility in the sense that
307 * using the non-resilient option won't break any job.
308 *
309 * @throws IOException
310 */
311 @Test
312 public void testImportTableNonResilient() throws IOException {
313 builder.withToolOption("non-resilient");
314 doImportAndVerify(builder, tableName);
315 }
316
317 private static ArgumentArrayBuilder getArgsBuilder() {
318 ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
319 return builder.withCommonHadoopFlags(true)
320 .withOption("connect", CONNECT_STRING)
321 .withOption("username", DATABASE_USER)
322 .withOption("password", DATABASE_PASSWORD)
323 .withOption("num-mappers", "1")
324 .withOption("split-by", "id");
325 }
326
327 private static ArgumentArrayBuilder getArgsBuilderForTableImport() {
328 ArgumentArrayBuilder builder = getArgsBuilder();
329 return builder.withCommonHadoopFlags(true)
330 .withOption("warehouse-dir", BaseSqoopTestCase.getLocalWarehouseDir())
331 .withOption("table", DBO_TABLE_NAME);
332 }
333
334 private static ArgumentArrayBuilder getArgsBuilderForQueryImport() {
335 ArgumentArrayBuilder builder = getArgsBuilder();
336 return builder.withCommonHadoopFlags(true)
337 .withOption("query", "SELECT * FROM EMPLOYEES_MSSQL WHERE $CONDITIONS")
338 .withOption("target-dir", BaseSqoopTestCase.getLocalWarehouseDir() + "/" + DBO_TABLE_NAME);
339 }
340
341 private static ArgumentArrayBuilder getArgsBuilderForDifferentSchemaTableImport() {
342 ArgumentArrayBuilder builder = getArgsBuilder();
343 return builder.withCommonHadoopFlags(true)
344 .withOption("warehouse-dir", BaseSqoopTestCase.getLocalWarehouseDir())
345 .withOption("table", SCH_TABLE_NAME)
346 .withToolOption("schema", SCHEMA_SCH);
347 }
348
349 private void doImportAndVerify(ArgumentArrayBuilder argBuilder,
350 String tableName) throws IOException {
351
352 Path warehousePath = new Path(this.getWarehouseDir());
353 Path tablePath = new Path(warehousePath, tableName);
354 Path filePath = new Path(tablePath, "part-m-00000");
355
356 File tableFile = new File(tablePath.toString());
357 if (tableFile.exists() && tableFile.isDirectory()) {
358 // remove the directory before running the import.
359 FileListing.recursiveDeleteDir(tableFile);
360 }
361
362 String [] argv = argBuilder.build();
363 try {
364 runImport(argv);
365 } catch (IOException ioe) {
366 LOG.error("Got IOException during import: " + ioe.toString());
367 ioe.printStackTrace();
368 fail(ioe.toString());
369 }
370
371 File f = new File(filePath.toString());
372 assertTrue("Could not find imported data file", f.exists());
373 BufferedReader r = null;
374 try {
375 // Read through the file and make sure it's all there.
376 r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
377 for (String expectedLine : EXPECTED_RESULTS) {
378 assertEquals(expectedLine, r.readLine());
379 }
380 } catch (IOException ioe) {
381 LOG.error("Got IOException verifying results: " + ioe.toString());
382 ioe.printStackTrace();
383 fail(ioe.toString());
384 } finally {
385 IOUtils.closeStream(r);
386 }
387 }
388 }