eb798fa99366008d65a3c22ff32904d8ede71bd9
[sqoop.git] / src / test / org / apache / sqoop / manager / postgresql / PostgresqlExportTest.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.sqoop.manager.postgresql;
19
20 import org.apache.sqoop.SqoopOptions;
21 import org.apache.sqoop.testcategories.thirdpartytest.PostgresqlTest;
22 import org.apache.sqoop.testutil.CommonArgs;
23 import org.apache.sqoop.testutil.ExportJobTestCase;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.sqoop.manager.ConnManager;
27 import org.apache.sqoop.manager.PostgresqlManager;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.junit.experimental.categories.Category;
31
32 import java.io.BufferedWriter;
33 import java.io.File;
34 import java.io.FileWriter;
35 import java.io.IOException;
36 import java.io.Writer;
37 import java.sql.Connection;
38 import java.sql.DriverManager;
39 import java.sql.ResultSet;
40 import java.sql.SQLException;
41 import java.sql.Statement;
42 import java.util.ArrayList;
43
44 import static org.junit.Assert.assertEquals;
45 import static org.junit.Assert.fail;
46
47 /**
48 *
49 */
50 @Category(PostgresqlTest.class)
51 public class PostgresqlExportTest extends ExportJobTestCase {
52 public static final Log LOG = LogFactory.getLog(
53 PostgresqlExportTest.class.getName());
54
55 static final String HOST_URL = System.getProperty(
56 "sqoop.test.postgresql.connectstring.host_url",
57 "jdbc:postgresql://localhost/");
58 static final String DATABASE_USER = System.getProperty(
59 "sqoop.test.postgresql.username",
60 "sqooptest");
61 static final String DATABASE_NAME = System.getProperty(
62 "sqoop.test.postgresql.database",
63 "sqooptest");
64 static final String PASSWORD = System.getProperty(
65 "sqoop.test.postgresql.password");
66
67 static final String TABLE_NAME = "EMPLOYEES_PG";
68 static final String PROCEDURE_NAME = "INSERT_AN_EMPLOYEE";
69 static final String STAGING_TABLE_NAME = "STAGING";
70 static final String SCHEMA_PUBLIC = "public";
71 static final String SCHEMA_SPECIAL = "special";
72 static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
73
74 protected Connection connection;
75
76 @Override
77 protected boolean useHsqldbTestServer() {
78 return false;
79 }
80
81 private String getDropTableStatement(String tableName, String schema) {
82 return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "." + quoteTableOrSchemaName(tableName);
83 }
84
85 @Before
86 public void setUp() {
87 super.setUp();
88
89 LOG.debug("Setting up postgresql test: " + CONNECT_STRING);
90
91 try {
92 connection = DriverManager.getConnection(CONNECT_STRING, DATABASE_USER, PASSWORD);
93 connection.setAutoCommit(false);
94 } catch (SQLException ex) {
95 LOG.error("Can't create connection", ex);
96 throw new RuntimeException(ex);
97 }
98
99 createTable(TABLE_NAME, SCHEMA_PUBLIC);
100 createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC);
101 createTable(TABLE_NAME, SCHEMA_SPECIAL);
102 createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL);
103 createProcedure(PROCEDURE_NAME, SCHEMA_PUBLIC);
104
105 LOG.debug("setUp complete.");
106 }
107
108 @Override
109 public void tearDown() {
110 try {
111 Statement stmt = connection.createStatement();
112 stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC));
113 stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_PUBLIC));
114 stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_SPECIAL));
115 stmt.executeUpdate(getDropTableStatement(STAGING_TABLE_NAME, SCHEMA_SPECIAL));
116 } catch(SQLException e) {
117 LOG.error("Can't clean up the database:", e);
118 }
119
120 super.tearDown();
121
122 try {
123 connection.close();
124 } catch (SQLException e) {
125 LOG.error("Ignoring exception in tearDown", e);
126 }
127 }
128
129 private interface CreateIt {
130 void createIt(
131 Statement st,
132 String fullName,
133 ConnManager manager) throws SQLException;
134 }
135
136 private void createTable(String tableName, String schema) {
137 CreateIt createIt = new CreateIt() {
138 @Override
139 public void createIt(
140 Statement st,
141 String fullName,
142 ConnManager manager) throws SQLException {
143 st.executeUpdate("CREATE TABLE " + fullName + " ("
144 + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
145 + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
146 + manager.escapeColName("start_date") + " DATE, "
147 + manager.escapeColName("salary") + " FLOAT, "
148 + manager.escapeColName("dept") + " VARCHAR(32))");
149 }
150 };
151 create(tableName, "TABLE", schema, createIt);
152 }
153
154 private void createProcedure(String procedureName, String schema) {
155 CreateIt createIt = new CreateIt() {
156 @Override
157 public void createIt(
158 Statement st,
159 String fullName,
160 ConnManager manager) throws SQLException {
161 st.executeUpdate("CREATE OR REPLACE FUNCTION " + fullName + " ("
162 + "IN " + manager.escapeColName("id") + " INT,"
163 + "IN " + manager.escapeColName("name") + " VARCHAR(24),"
164 + "IN " + manager.escapeColName("start_date") + " DATE,"
165 + "IN " + manager.escapeColName("salary") + " FLOAT,"
166 + "IN " + manager.escapeColName("dept") + " VARCHAR(32)"
167 + ") "
168 + "RETURNS VOID "
169 + "AS $$ "
170 + "BEGIN "
171 + "INSERT INTO "
172 + quoteTableOrSchemaName(SCHEMA_PUBLIC)
173 + "."
174 + quoteTableOrSchemaName(TABLE_NAME)
175 + " ("
176 + manager.escapeColName("id")
177 +", "
178 + manager.escapeColName("name")
179 +", "
180 + manager.escapeColName("start_date")
181 +", "
182 + manager.escapeColName("salary")
183 +", "
184 + manager.escapeColName("dept")
185 + ") VALUES ("
186 + manager.escapeColName("id")
187 +", "
188 + manager.escapeColName("name")
189 +", "
190 + manager.escapeColName("start_date")
191 +", "
192 + manager.escapeColName("salary")
193 +", "
194 + manager.escapeColName("dept")
195 + ");"
196 + "END;"
197 + "$$ LANGUAGE plpgsql;");
198 }
199 };
200 create(procedureName, "FUNCTION", schema, createIt);
201 }
202
203 private void create(
204 String name,
205 String type,
206 String schema,
207 CreateIt createIt) {
208 SqoopOptions options = new SqoopOptions(CONNECT_STRING, name);
209 options.setUsername(DATABASE_USER);
210
211 ConnManager manager = null;
212 Statement st = null;
213
214 try {
215 manager = new PostgresqlManager(options);
216 st = connection.createStatement();
217
218 // Create schema if not exists in dummy way (always create and ignore
219 // errors.
220 try {
221 st.executeUpdate("CREATE SCHEMA " + quoteTableOrSchemaName(schema));
222 connection.commit();
223 } catch (SQLException e) {
224 LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
225 + "the schema already exists.", e);
226 connection.rollback();
227 }
228
229 String fullTableName = quoteTableOrSchemaName(schema)
230 + "." + quoteTableOrSchemaName(name);
231
232 try {
233 // Try to remove the table first. DROP TABLE IF EXISTS didn't
234 // get added until pg 8.3, so we just use "DROP TABLE" and ignore
235 // any exception here if one occurs.
236 st.executeUpdate("DROP " + type + " " + fullTableName);
237 } catch (SQLException e) {
238 LOG.info("Couldn't drop "
239 + type.toLowerCase()
240 + " " +fullTableName
241 + " (ok)",
242 e);
243 // Now we need to reset the transaction.
244 connection.rollback();
245 }
246
247 createIt.createIt(st, fullTableName, manager);
248
249 connection.commit();
250 } catch (SQLException sqlE) {
251 LOG.error("Encountered SQL Exception: " + sqlE);
252 sqlE.printStackTrace();
253 fail("SQLException when running test setUp(): " + sqlE);
254 } finally {
255 try {
256 if (null != st) {
257 st.close();
258 }
259
260 if (null != manager) {
261 manager.close();
262 }
263 } catch (SQLException sqlE) {
264 LOG.warn("Got SQLException when closing connection: " + sqlE);
265 }
266 }
267
268 LOG.debug("setUp complete.");
269 }
270
271 private String [] getArgv(boolean useTable,
272 String... extraArgs) {
273 ArrayList<String> args = new ArrayList<String>();
274
275 CommonArgs.addHadoopFlags(args);
276
277 if (useTable) {
278 args.add("--table");
279 args.add(TABLE_NAME);
280 } else {
281 args.add("--call");
282 args.add(PROCEDURE_NAME);
283 }
284 args.add("--export-dir");
285 args.add(getWarehouseDir());
286 args.add("--fields-terminated-by");
287 args.add(",");
288 args.add("--lines-terminated-by");
289 args.add("\\n");
290 args.add("--connect");
291 args.add(CONNECT_STRING);
292 args.add("--username");
293 args.add(DATABASE_USER);
294 args.add("--password");
295 args.add(PASSWORD);
296 args.add("-m");
297 args.add("1");
298
299 for (String arg : extraArgs) {
300 args.add(arg);
301 }
302
303 return args.toArray(new String[0]);
304 }
305
306 protected void createTestFile(String filename,
307 String[] lines)
308 throws IOException {
309 new File(getWarehouseDir()).mkdirs();
310 File file = new File(getWarehouseDir() + "/" + filename);
311 Writer output = new BufferedWriter(new FileWriter(file));
312 for(String line : lines) {
313 output.write(line);
314 output.write("\n");
315 }
316 output.close();
317 }
318
319 @Test
320 public void testExport() throws IOException, SQLException {
321 createTestFile("inputFile", new String[] {
322 "2,Bob,2009-04-20,400,sales",
323 "3,Fred,2009-01-23,15,marketing",
324 });
325
326 runExport(getArgv(true));
327
328 assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
329 }
330
331 @Test
332 public void testExportUsingProcedure() throws IOException, SQLException {
333 createTestFile("inputFile", new String[] {
334 "2,Bob,2009-04-20,400,sales",
335 "3,Fred,2009-01-23,15,marketing",
336 });
337
338 runExport(getArgv(false));
339
340 assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
341 }
342
343 @Test
344 public void testExportStaging() throws IOException, SQLException {
345 createTestFile("inputFile", new String[] {
346 "2,Bob,2009-04-20,400,sales",
347 "3,Fred,2009-01-23,15,marketing",
348 });
349
350 String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, };
351
352 runExport(getArgv(true, extra));
353
354 assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
355 }
356
357 @Test
358 public void testExportDirect() throws IOException, SQLException {
359 createTestFile("inputFile", new String[] {
360 "2,Bob,2009-04-20,400,sales",
361 "3,Fred,2009-01-23,15,marketing",
362 });
363
364 String[] extra = new String[] {"--direct"};
365
366 runExport(getArgv(true, extra));
367
368 assertRowCount(2, quoteTableOrSchemaName(TABLE_NAME), connection);
369 }
370
371 @Test
372 public void testExportCustomSchema() throws IOException, SQLException {
373 createTestFile("inputFile", new String[] {
374 "2,Bob,2009-04-20,400,sales",
375 "3,Fred,2009-01-23,15,marketing",
376 });
377
378 String[] extra = new String[] {"--",
379 "--schema",
380 SCHEMA_SPECIAL,
381 };
382
383 runExport(getArgv(true, extra));
384
385 assertRowCount(2,
386 quoteTableOrSchemaName(SCHEMA_SPECIAL)
387 + "." + quoteTableOrSchemaName(TABLE_NAME),
388 connection);
389 }
390
391 @Test
392 public void testExportCustomSchemaStaging() throws IOException, SQLException {
393 createTestFile("inputFile", new String[] {
394 "2,Bob,2009-04-20,400,sales",
395 "3,Fred,2009-01-23,15,marketing",
396 });
397
398 String[] extra = new String[] {
399 "--staging-table",
400 STAGING_TABLE_NAME,
401 "--",
402 "--schema",
403 SCHEMA_SPECIAL,
404 };
405
406 runExport(getArgv(true, extra));
407
408 assertRowCount(2,
409 quoteTableOrSchemaName(SCHEMA_SPECIAL)
410 + "." + quoteTableOrSchemaName(TABLE_NAME),
411 connection);
412 }
413
414 @Test
415 public void testExportCustomSchemaStagingClear()
416 throws IOException, SQLException {
417 createTestFile("inputFile", new String[] {
418 "2,Bob,2009-04-20,400,sales",
419 "3,Fred,2009-01-23,15,marketing",
420 });
421
422 String[] extra = new String[] {
423 "--staging-table",
424 STAGING_TABLE_NAME,
425 "--clear-staging-table",
426 "--",
427 "--schema",
428 SCHEMA_SPECIAL,
429 };
430
431 runExport(getArgv(true, extra));
432
433 assertRowCount(2,
434 quoteTableOrSchemaName(SCHEMA_SPECIAL)
435 + "." + quoteTableOrSchemaName(TABLE_NAME),
436 connection);
437 }
438
439 @Test
440 public void testExportCustomSchemaDirect() throws IOException, SQLException {
441 createTestFile("inputFile", new String[] {
442 "2,Bob,2009-04-20,400,sales",
443 "3,Fred,2009-01-23,15,marketing",
444 });
445
446 String[] extra = new String[] {
447 "--direct",
448 "--",
449 "--schema",
450 SCHEMA_SPECIAL,
451 };
452
453 runExport(getArgv(true, extra));
454
455 assertRowCount(2,
456 quoteTableOrSchemaName(SCHEMA_SPECIAL)
457 + "." + quoteTableOrSchemaName(TABLE_NAME),
458 connection);
459 }
460
461 public static void assertRowCount(long expected,
462 String tableName,
463 Connection connection) {
464 Statement stmt = null;
465 ResultSet rs = null;
466 try {
467 stmt = connection.createStatement();
468 rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
469
470 rs.next();
471
472 assertEquals(expected, rs.getLong(1));
473 } catch (SQLException e) {
474 LOG.error("Can't verify number of rows", e);
475 fail();
476 } finally {
477 try {
478 connection.commit();
479
480 if (stmt != null) {
481 stmt.close();
482 }
483 if (rs != null) {
484 rs.close();
485 }
486 } catch (SQLException ex) {
487 LOG.info("Ignored exception in finally block.");
488 }
489 }
490 }
491
492 public String quoteTableOrSchemaName(String tableName) {
493 return "\"" + tableName + "\"";
494 }
495 }