ab1e8ff2d50f64892cfcc3263469c567012b4a13
[sqoop.git] / src / test / org / apache / sqoop / manager / sqlserver / SQLServerManagerExportTest.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.sqoop.manager.sqlserver;
19
20 import org.apache.sqoop.ConnFactory;
21 import org.apache.sqoop.SqoopOptions;
22 import org.apache.sqoop.testcategories.thirdpartytest.SqlServerTest;
23 import org.apache.sqoop.testutil.CommonArgs;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.sqoop.manager.SQLServerManager;
27 import org.apache.sqoop.testutil.ExportJobTestCase;
28 import org.apache.hadoop.conf.Configuration;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.junit.experimental.categories.Category;
33
34 import java.io.BufferedWriter;
35 import java.io.File;
36 import java.io.FileWriter;
37 import java.io.IOException;
38 import java.io.Writer;
39 import java.sql.Connection;
40 import java.sql.ResultSet;
41 import java.sql.SQLException;
42 import java.sql.Statement;
43 import java.util.ArrayList;
44
45 import static org.junit.Assert.assertEquals;
46 import static org.junit.Assert.fail;
47
48 /**
49 * Please see instructions in SQLServerManagerImportTest.
50 */
51 @Category(SqlServerTest.class)
52 public class SQLServerManagerExportTest extends ExportJobTestCase {
53
54 public static final Log LOG = LogFactory.getLog(
55 SQLServerManagerExportTest.class.getName());
56
57 static final String HOST_URL = System.getProperty(
58 "sqoop.test.sqlserver.connectstring.host_url",
59 "jdbc:sqlserver://sqlserverhost:1433");
60 static final String DATABASE_NAME = System.getProperty(
61 "sqoop.test.sqlserver.database",
62 "sqooptest");
63 static final String DATABASE_USER = System.getProperty(
64 "ms.sqlserver.username",
65 "sqoopuser");
66 static final String DATABASE_PASSWORD = System.getProperty(
67 "ms.sqlserver.password",
68 "password");
69
70 static final String SCHEMA_DBO = "dbo";
71 static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL";
72 static final String DBO_BINARY_TABLE_NAME = "BINARYTYPE_MSSQL";
73 static final String SCHEMA_SCH = "sch";
74 static final String SCH_TABLE_NAME = "PRIVATE_TABLE";
75 static final String CONNECT_STRING = HOST_URL
76 + ";databaseName=" + DATABASE_NAME;
77
78 static final String CONNECTOR_FACTORY = System.getProperty(
79 "sqoop.test.msserver.connector.factory",
80 ConnFactory.DEFAULT_FACTORY_CLASS_NAMES);
81
82 // instance variables populated during setUp, used during tests
83 private SQLServerManager manager;
84 private Configuration conf = new Configuration();
85 private Connection conn = null;
86
87 @Override
88 protected Configuration getConf() {
89 return conf;
90 }
91
92 @Override
93 protected boolean useHsqldbTestServer() {
94 return false;
95 }
96
97 private String getDropTableStatement(String schema, String tableName) {
98 return "DROP TABLE IF EXISTS " + manager.escapeObjectName(schema)
99 + "." + manager.escapeObjectName(tableName);
100 }
101
102 @Before
103 public void setUp() {
104 super.setUp();
105
106 SqoopOptions options = new SqoopOptions(CONNECT_STRING,
107 DBO_TABLE_NAME);
108 options.setUsername(DATABASE_USER);
109 options.setPassword(DATABASE_PASSWORD);
110
111 manager = new SQLServerManager(options);
112
113 createTableAndPopulateData(SCHEMA_DBO, DBO_TABLE_NAME);
114 createTableAndPopulateData(SCHEMA_SCH, SCH_TABLE_NAME);
115
116 // To test with Microsoft SQL server connector, copy the connector jar to
117 // sqoop.thirdparty.lib.dir and set sqoop.test.msserver.connector.factory
118 // to com.microsoft.sqoop.SqlServer.MSSQLServerManagerFactory. By default,
119 // the built-in SQL server connector is used.
120 conf.setStrings(ConnFactory.FACTORY_CLASS_NAMES_KEY, CONNECTOR_FACTORY);
121 }
122
123 public void createTableAndPopulateData(String schema, String table) {
124 String fulltableName = manager.escapeObjectName(schema)
125 + "." + manager.escapeObjectName(table);
126
127 Statement stmt = null;
128
129 // Create schema if needed
130 try {
131 conn = manager.getConnection();
132 stmt = conn.createStatement();
133 stmt.execute("CREATE SCHEMA " + schema);
134 conn.commit();
135 } catch (SQLException sqlE) {
136 LOG.info("Can't create schema: " + sqlE.getMessage());
137 } finally {
138 try {
139 if (null != stmt) {
140 stmt.close();
141 }
142 } catch (Exception ex) {
143 LOG.warn("Exception while closing stmt", ex);
144 }
145 }
146
147 // Drop the existing table, if there is one.
148 try {
149 conn = manager.getConnection();
150 stmt = conn.createStatement();
151 stmt.execute("DROP TABLE " + fulltableName);
152 conn.commit();
153 } catch (SQLException sqlE) {
154 LOG.info("Table was not dropped: " + sqlE.getMessage());
155 } finally {
156 try {
157 if (null != stmt) {
158 stmt.close();
159 }
160 } catch (Exception ex) {
161 LOG.warn("Exception while closing stmt", ex);
162 }
163 }
164
165 // Create and populate table
166 try {
167 conn = manager.getConnection();
168 conn.setAutoCommit(false);
169 stmt = conn.createStatement();
170
171 // create the database table and populate it with data.
172 stmt.executeUpdate("CREATE TABLE " + fulltableName + " ("
173 + "id INT NOT NULL, "
174 + "name VARCHAR(24) NOT NULL, "
175 + "salary FLOAT, "
176 + "dept VARCHAR(32), "
177 + "PRIMARY KEY (id))");
178 conn.commit();
179 } catch (SQLException sqlE) {
180 LOG.error("Encountered SQL Exception: ", sqlE);
181 sqlE.printStackTrace();
182 fail("SQLException when running test setUp(): " + sqlE);
183 } finally {
184 try {
185 if (null != stmt) {
186 stmt.close();
187 }
188 } catch (Exception ex) {
189 LOG.warn("Exception while closing connection/stmt", ex);
190 }
191 }
192 }
193
194 public void createSQLServerBinaryTypeTable(String schema, String table) {
195 String fulltableName = manager.escapeObjectName(schema)
196 + "." + manager.escapeObjectName(table);
197
198 Statement stmt = null;
199
200 // Create schema if needed
201 try {
202 conn = manager.getConnection();
203 stmt = conn.createStatement();
204 stmt.execute("CREATE SCHEMA " + schema);
205 conn.commit();
206 } catch (SQLException sqlE) {
207 LOG.info("Can't create schema: " + sqlE.getMessage());
208 } finally {
209 try {
210 if (null != stmt) {
211 stmt.close();
212 }
213 } catch (Exception ex) {
214 LOG.warn("Exception while closing stmt", ex);
215 }
216 }
217
218 // Drop the existing table, if there is one.
219 try {
220 conn = manager.getConnection();
221 stmt = conn.createStatement();
222 stmt.execute("DROP TABLE " + fulltableName);
223 conn.commit();
224 } catch (SQLException sqlE) {
225 LOG.info("Table was not dropped: " + sqlE.getMessage());
226 } finally {
227 try {
228 if (null != stmt) {
229 stmt.close();
230 }
231 } catch (Exception ex) {
232 LOG.warn("Exception while closing stmt", ex);
233 }
234 }
235
236 // Create and populate table
237 try {
238 conn = manager.getConnection();
239 conn.setAutoCommit(false);
240 stmt = conn.createStatement();
241
242 // create the database table and populate it with data.
243 stmt.executeUpdate("CREATE TABLE " + fulltableName + " ("
244 + "id INT PRIMARY KEY, "
245 + "b1 BINARY(10), "
246 + "b2 VARBINARY(10))");
247 conn.commit();
248 } catch (SQLException sqlE) {
249 LOG.error("Encountered SQL Exception: ", sqlE);
250 sqlE.printStackTrace();
251 fail("SQLException when running test setUp(): " + sqlE);
252 } finally {
253 try {
254 if (null != stmt) {
255 stmt.close();
256 }
257 } catch (Exception ex) {
258 LOG.warn("Exception while closing connection/stmt", ex);
259 }
260 }
261 }
262
263 @After
264 public void tearDown() {
265 try {
266 Statement stmt = conn.createStatement();
267 stmt.executeUpdate(getDropTableStatement(SCHEMA_DBO, DBO_TABLE_NAME));
268 stmt.executeUpdate(getDropTableStatement(SCHEMA_SCH, SCH_TABLE_NAME));
269 } catch (SQLException e) {
270 LOG.error("Can't clean up the database:", e);
271 }
272
273 super.tearDown();
274 try {
275 conn.close();
276 manager.close();
277 } catch (SQLException sqlE) {
278 LOG.error("Got SQLException: " + sqlE.toString());
279 fail("Got SQLException: " + sqlE.toString());
280 }
281 }
282
283 private String [] getArgv(String tableName,
284 String... extraArgs) {
285 ArrayList<String> args = new ArrayList<String>();
286
287 CommonArgs.addHadoopFlags(args);
288
289 args.add("--table");
290 args.add(tableName);
291 args.add("--export-dir");
292 args.add(getWarehouseDir());
293 args.add("--fields-terminated-by");
294 args.add(",");
295 args.add("--lines-terminated-by");
296 args.add("\\n");
297 args.add("--connect");
298 args.add(CONNECT_STRING);
299 args.add("--username");
300 args.add(DATABASE_USER);
301 args.add("--password");
302 args.add(DATABASE_PASSWORD);
303 args.add("-m");
304 args.add("1");
305
306 for (String arg : extraArgs) {
307 args.add(arg);
308 }
309
310 return args.toArray(new String[0]);
311 }
312
313 protected void createTestFile(String filename,
314 String[] lines)
315 throws IOException {
316 new File(getWarehouseDir()).mkdirs();
317 File file = new File(getWarehouseDir() + "/" + filename);
318 Writer output = new BufferedWriter(new FileWriter(file));
319 for(String line : lines) {
320 output.write(line);
321 output.write("\n");
322 }
323 output.close();
324 }
325
326 @Test
327 public void testExport() throws IOException, SQLException {
328 createTestFile("inputFile", new String[] {
329 "2,Bob,400,sales",
330 "3,Fred,15,marketing",
331 });
332
333 runExport(getArgv(DBO_TABLE_NAME));
334
335 assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
336 }
337
338 @Test
339 public void testExportCustomSchema() throws IOException, SQLException {
340 createTestFile("inputFile", new String[] {
341 "2,Bob,400,sales",
342 "3,Fred,15,marketing",
343 });
344
345 String[] extra = new String[] {"--",
346 "--schema",
347 SCHEMA_SCH,
348 };
349
350 runExport(getArgv(SCH_TABLE_NAME, extra));
351
352 assertRowCount(
353 2,
354 escapeObjectName(SCHEMA_SCH) + "." + escapeObjectName(SCH_TABLE_NAME),
355 conn
356 );
357 }
358
359 @Test
360 public void testExportTableHints() throws IOException, SQLException {
361 createTestFile("inputFile", new String[] {
362 "2,Bob,400,sales",
363 "3,Fred,15,marketing",
364 });
365
366 String []extra = new String[] {"--", "--table-hints",
367 "ROWLOCK",
368 };
369 runExport(getArgv(DBO_TABLE_NAME, extra));
370 assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
371 }
372
373 @Test
374 public void testExportTableHintsMultiple() throws IOException, SQLException {
375 createTestFile("inputFile", new String[] {
376 "2,Bob,400,sales",
377 "3,Fred,15,marketing",
378 });
379
380 String []extra = new String[] {"--", "--table-hints",
381 "ROWLOCK,NOWAIT",
382 };
383 runExport(getArgv(DBO_TABLE_NAME, extra));
384 assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
385 }
386
387 @Test
388 public void testSQLServerBinaryType() throws IOException, SQLException {
389 createSQLServerBinaryTypeTable(SCHEMA_DBO, DBO_BINARY_TABLE_NAME);
390 createTestFile("inputFile", new String[] {
391 "1,73 65 63 72 65 74 00 00 00 00,73 65 63 72 65 74"
392 });
393 String[] expectedContent = {"73656372657400000000", "736563726574"};
394 runExport(getArgv(DBO_BINARY_TABLE_NAME));
395 assertRowCount(1, escapeObjectName(DBO_BINARY_TABLE_NAME), conn);
396 checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn);
397 }
398
399 /** Make sure mixed update/insert export work correctly. */
400 @Test
401 public void testUpsertTextExport() throws IOException, SQLException {
402 createTestFile("inputFile", new String[] {
403 "2,Bob,400,sales",
404 "3,Fred,15,marketing",
405 });
406 // first time will be insert.
407 runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
408 "--update-mode", "allowinsert", "--", "--schema", SCHEMA_SCH));
409 // second time will be update.
410 runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
411 "--update-mode", "allowinsert", "--", "--schema", SCHEMA_SCH));
412 assertRowCount(2, escapeObjectName(SCHEMA_SCH) + "." + escapeObjectName(SCH_TABLE_NAME), conn);
413 }
414
415 public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){
416 Statement stmt = null;
417 ResultSet rs = null;
418 try {
419 stmt = connection.createStatement();
420 rs = stmt.executeQuery("SELECT TOP 1 [b1], [b2] FROM " + tableName);
421 rs.next();
422 assertEquals(expected[0], rs.getString("b1"));
423 assertEquals(expected[1], rs.getString("b2"));
424 } catch (SQLException e) {
425 LOG.error("Can't verify table content", e);
426 fail();
427 } finally {
428 try {
429 connection.commit();
430
431 if (stmt != null) {
432 stmt.close();
433 }
434 if (rs != null) {
435 rs.close();
436 }
437 } catch (SQLException ex) {
438 LOG.info("Ignored exception in finally block.");
439 }
440 }
441 }
442
443 public static void assertRowCount(long expected,
444 String tableName,
445 Connection connection) {
446 Statement stmt = null;
447 ResultSet rs = null;
448 try {
449 stmt = connection.createStatement();
450 rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
451
452 rs.next();
453
454 assertEquals(expected, rs.getLong(1));
455 } catch (SQLException e) {
456 LOG.error("Can't verify number of rows", e);
457 fail();
458 } finally {
459 try {
460 connection.commit();
461
462 if (stmt != null) {
463 stmt.close();
464 }
465 if (rs != null) {
466 rs.close();
467 }
468 } catch (SQLException ex) {
469 LOG.info("Ignored exception in finally block.");
470 }
471 }
472 }
473
474 public String escapeObjectName(String objectName) {
475 return "[" + objectName + "]";
476 }
477 }