8c3d2fd90ee57d4ddbd44a0df9ca0ac536014c15
[sqoop.git] / src / test / org / apache / sqoop / manager / postgresql / PostgresqlExternalTableImportTest.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.sqoop.manager.postgresql;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.BufferedReader;
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.IOException;
28 import java.io.InputStreamReader;
29 import java.sql.Connection;
30 import java.sql.SQLException;
31 import java.sql.Statement;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.io.IOUtils;
39 import org.apache.sqoop.manager.ConnManager;
40 import org.apache.sqoop.manager.PostgresqlManager;
41 import org.apache.sqoop.testcategories.thirdpartytest.PostgresqlTest;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45
46 import org.apache.sqoop.SqoopOptions;
47 import org.apache.sqoop.testutil.CommonArgs;
48 import org.apache.sqoop.testutil.ImportJobTestCase;
49 import org.apache.sqoop.util.FileListing;
50 import org.junit.experimental.categories.Category;
51
52 @Category(PostgresqlTest.class)
53 public class PostgresqlExternalTableImportTest extends ImportJobTestCase {
54
55 public static final Log LOG = LogFactory
56 .getLog(PostgresqlExternalTableImportTest.class.getName());
57 static final String HOST_URL = System.getProperty("sqoop.test.postgresql.connectstring.host_url",
58 "jdbc:postgresql://localhost/");
59 static final String DATABASE_USER = System.getProperty(
60 "sqoop.test.postgresql.username", "sqooptest");
61 static final String DATABASE_NAME = System.getProperty(
62 "sqoop.test.postgresql.database", "sqooptest");
63 static final String PASSWORD = System.getProperty("sqoop.test.postgresql.password");
64
65 static final String TABLE_NAME = "EMPLOYEES_PG";
66 static final String NULL_TABLE_NAME = "NULL_EMPLOYEES_PG";
67 static final String SPECIAL_TABLE_NAME = "EMPLOYEES_PG's";
68 static final String DIFFERENT_TABLE_NAME = "DIFFERENT_TABLE";
69 static final String SCHEMA_PUBLIC = "public";
70 static final String SCHEMA_SPECIAL = "special";
71 static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
72 static final String EXTERNAL_TABLE_DIR = "/tmp/external/employees_pg";
73 protected Connection connection;
74
75 @Override
76 protected boolean useHsqldbTestServer() {
77 return false;
78 }
79
80 public String quoteTableOrSchemaName(String tableName) {
81 return "\"" + tableName + "\"";
82 }
83
84 private String getDropTableStatement(String tableName, String schema) {
85 return "DROP TABLE IF EXISTS " + quoteTableOrSchemaName(schema) + "."
86 + quoteTableOrSchemaName(tableName);
87 }
88
89 @Before
90 public void setUp() {
91 super.setUp();
92
93 LOG.debug("Setting up another postgresql test: " + CONNECT_STRING);
94
95 setUpData(TABLE_NAME, SCHEMA_PUBLIC, false);
96 setUpData(NULL_TABLE_NAME, SCHEMA_PUBLIC, true);
97 setUpData(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC, false);
98 setUpData(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL, false);
99
100 LOG.debug("setUp complete.");
101 }
102
103 @After
104 public void tearDown() {
105 try {
106 Statement stmt = connection.createStatement();
107 stmt.executeUpdate(getDropTableStatement(TABLE_NAME, SCHEMA_PUBLIC));
108 stmt.executeUpdate(getDropTableStatement(NULL_TABLE_NAME, SCHEMA_PUBLIC));
109 stmt.executeUpdate(getDropTableStatement(SPECIAL_TABLE_NAME, SCHEMA_PUBLIC));
110 stmt.executeUpdate(getDropTableStatement(DIFFERENT_TABLE_NAME, SCHEMA_SPECIAL));
111 } catch (SQLException e) {
112 LOG.error("Can't clean up the database:", e);
113 }
114
115 super.tearDown();
116
117 try {
118 connection.close();
119 } catch (SQLException e) {
120 LOG.error("Ignoring exception in tearDown", e);
121 }
122 }
123
124 public void setUpData(String tableName, String schema, boolean nullEntry) {
125 SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
126 options.setUsername(DATABASE_USER);
127 options.setPassword(PASSWORD);
128
129 ConnManager manager = null;
130 Statement st = null;
131
132 try {
133 manager = new PostgresqlManager(options);
134 connection = manager.getConnection();
135 connection.setAutoCommit(false);
136 st = connection.createStatement();
137
138 // Create schema if not exists in dummy way (always create and ignore
139 // errors.
140 try {
141 st.executeUpdate("CREATE SCHEMA " + manager.escapeTableName(schema));
142 connection.commit();
143 } catch (SQLException e) {
144 LOG.info("Couldn't create schema " + schema + " (is o.k. as long as"
145 + "the schema already exists.");
146 connection.rollback();
147 }
148
149 String fullTableName = manager.escapeTableName(schema) + "."
150 + manager.escapeTableName(tableName);
151 LOG.info("Creating table: " + fullTableName);
152
153 try {
154 // Try to remove the table first. DROP TABLE IF EXISTS didn't
155 // get added until pg 8.3, so we just use "DROP TABLE" and ignore
156 // any exception here if one occurs.
157 st.executeUpdate("DROP TABLE " + fullTableName);
158 } catch (SQLException e) {
159 LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)");
160 // Now we need to reset the transaction.
161 connection.rollback();
162 }
163
164 st.executeUpdate("CREATE TABLE " + fullTableName + " (" + manager.escapeColName("id")
165 + " INT NOT NULL PRIMARY KEY, " + manager.escapeColName("name")
166 + " VARCHAR(24) NOT NULL, " + manager.escapeColName("start_date") + " DATE, "
167 + manager.escapeColName("Salary") + " FLOAT, " + manager.escapeColName("Fired")
168 + " BOOL, " + manager.escapeColName("dept") + " VARCHAR(32))");
169
170 st.executeUpdate("INSERT INTO " + fullTableName
171 + " VALUES(1,'Aaron','2009-05-14',1000000.00,TRUE,'engineering')");
172 st.executeUpdate("INSERT INTO " + fullTableName
173 + " VALUES(2,'Bob','2009-04-20',400.00,TRUE,'sales')");
174 st.executeUpdate("INSERT INTO " + fullTableName
175 + " VALUES(3,'Fred','2009-01-23',15.00,FALSE,'marketing')");
176 if (nullEntry) {
177 st.executeUpdate("INSERT INTO " + fullTableName + " VALUES(4,'Mike',NULL,NULL,NULL,NULL)");
178
179 }
180 connection.commit();
181 } catch (SQLException sqlE) {
182 LOG.error("Encountered SQL Exception: " + sqlE);
183 sqlE.printStackTrace();
184 fail("SQLException when running test setUp(): " + sqlE);
185 } finally {
186 try {
187 if (null != st) {
188 st.close();
189 }
190
191 if (null != manager) {
192 manager.close();
193 }
194 } catch (SQLException sqlE) {
195 LOG.warn("Got SQLException when closing connection: " + sqlE);
196 }
197 }
198
199 LOG.debug("setUp complete.");
200 }
201
202 private String[] getArgv(boolean isDirect, String tableName, String... extraArgs) {
203 ArrayList<String> args = new ArrayList<String>();
204
205 CommonArgs.addHadoopFlags(args);
206
207 args.add("--table");
208 args.add(tableName);
209 args.add("--external-table-dir");
210 args.add(EXTERNAL_TABLE_DIR);
211 args.add("--hive-import");
212 args.add("--warehouse-dir");
213 args.add(getWarehouseDir());
214 args.add("--connect");
215 args.add(CONNECT_STRING);
216 args.add("--username");
217 args.add(DATABASE_USER);
218 args.add("--password");
219 args.add(PASSWORD);
220 args.add("--where");
221 args.add("id > 1");
222 args.add("-m");
223 args.add("1");
224
225 if (isDirect) {
226 args.add("--direct");
227 }
228
229 for (String arg : extraArgs) {
230 args.add(arg);
231 }
232
233 return args.toArray(new String[0]);
234 }
235
236 private void doImportAndVerify(boolean isDirect, String[] expectedResults, String tableName,
237 String... extraArgs) throws IOException {
238
239 Path tablePath = new Path(EXTERNAL_TABLE_DIR);
240
241 // if importing with merge step, directory should exist and output should be
242 // from a reducer
243 boolean isMerge = Arrays.asList(extraArgs).contains("--merge-key");
244 Path filePath = new Path(tablePath, isMerge ? "part-r-00000" : "part-m-00000");
245
246 File tableFile = new File(tablePath.toString());
247 if (tableFile.exists() && tableFile.isDirectory() && !isMerge) {
248 // remove the directory before running the import.
249 FileListing.recursiveDeleteDir(tableFile);
250 }
251
252 String[] argv = getArgv(isDirect, tableName, extraArgs);
253 try {
254 runImport(argv);
255 } catch (IOException ioe) {
256 LOG.error("Got IOException during import: " + ioe.toString());
257 ioe.printStackTrace();
258 fail(ioe.toString());
259 }
260
261 File f = new File(filePath.toString());
262 assertTrue("Could not find imported data file, " + f, f.exists());
263 BufferedReader r = null;
264 try {
265 // Read through the file and make sure it's all there.
266 r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
267 for (String expectedLine : expectedResults) {
268 assertEquals(expectedLine, r.readLine());
269 }
270 } catch (IOException ioe) {
271 LOG.error("Got IOException verifying results: " + ioe.toString());
272 ioe.printStackTrace();
273 fail(ioe.toString());
274 } finally {
275 IOUtils.closeStream(r);
276 }
277 }
278
279 @Test
280 public void testJdbcBasedImport() throws IOException {
281 // separator is different to other tests
282 // because the CREATE EXTERNAL TABLE DDL is
283 // ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
284 char sep = '\001';
285 String[] expectedResults = {
286 "2" + sep + "Bob" + sep + "2009-04-20" + sep + "400.0" + sep + "true" + sep + "sales",
287 "3" + sep + "Fred" + sep + "2009-01-23" + sep + "15.0" + sep + "false" + sep + "marketing" };
288 doImportAndVerify(false, expectedResults, TABLE_NAME);
289 }
290 }