METAMODEL-1205: Fixed CassandraUnit, Guava, Hadoop for JDK9+
[metamodel.git] / hadoop / src / main / java / org / apache / metamodel / util / HdfsResource.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.metamodel.util;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.io.Serializable;
26 import java.net.URI;
27 import java.util.Objects;
28
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FSDataOutputStream;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.metamodel.MetaModelException;
34
35 import com.google.common.base.Strings;
36
37 /**
38 * A {@link Resource} implementation that connects to Apache Hadoop's HDFS
39 * distributed file system.
40 */
41 public class HdfsResource extends AbstractResource implements Serializable {
42
43 private static final long serialVersionUID = 1L;
44
45 public static final String SYSTEM_PROPERTY_HADOOP_CONF_DIR_ENABLED = "metamodel.hadoop.use_hadoop_conf_dir";
46
47 public static final String SCHEME_HDFS = "hdfs";
48 public static final String SCHEME_SWIFT = "swift";
49 public static final String SCHEME_EMRFS = "emrfs";
50 public static final String SCHEME_MAPRFS = "maprfs";
51 public static final String SCHEME_S3 = "s3";
52 public static final String SCHEME_FTP = "ftp";
53
54 private final String _scheme;
55 private final String _hadoopConfDir;
56 private final String _hostname;
57 private final int _port;
58 private final String _filepath;
59 private transient Path _path;
60
61 /**
62 * Creates a {@link HdfsResource}
63 *
64 * @param url
65 * a URL of the form: scheme://hostname:port/path/to/file
66 */
67 public HdfsResource(String url) {
68 this(url, null);
69 }
70
71 /**
72 * Creates a {@link HdfsResource}
73 *
74 * @param url
75 * a URL of the form: scheme://hostname:port/path/to/file
76 * @param hadoopConfDir
77 * the path of a directory containing the Hadoop and HDFS
78 * configuration file(s).
79 */
80 public HdfsResource(String url, String hadoopConfDir) {
81 if (url == null) {
82 throw new IllegalArgumentException("Url cannot be null");
83 }
84
85 final URI uri = URI.create(url);
86
87 _scheme = uri.getScheme();
88 _hostname = uri.getHost();
89 _port = uri.getPort();
90 _filepath = uri.getPath();
91 _hadoopConfDir = hadoopConfDir;
92 }
93
94 /**
95 * Creates a {@link HdfsResource} using the "hdfs" scheme
96 *
97 * @param hostname
98 * the HDFS (namenode) hostname
99 * @param port
100 * the HDFS (namenode) port number
101 * @param filepath
102 * the path on HDFS to the file, starting with slash ('/')
103 */
104 public HdfsResource(String hostname, int port, String filepath) {
105 this(SCHEME_HDFS, hostname, port, filepath, null);
106 }
107
108 /**
109 * Creates a {@link HdfsResource}
110 *
111 * @param scheme
112 * the scheme to use (consider using {@link #SCHEME_HDFS} or any
113 * of the other "SCHEME_" constants).
114 * @param hostname
115 * the HDFS (namenode) hostname
116 * @param port
117 * the HDFS (namenode) port number
118 * @param filepath
119 * the path on HDFS to the file, starting with slash ('/')
120 * @param hadoopConfDir
121 * the path of a directory containing the Hadoop and HDFS
122 * configuration file(s).
123 */
124 public HdfsResource(String scheme, String hostname, int port, String filepath, String hadoopConfDir) {
125 _scheme = scheme;
126 _hostname = hostname;
127 _port = port;
128 _filepath = filepath;
129 _hadoopConfDir = hadoopConfDir;
130 }
131
132 public String getScheme() {
133 if (_scheme == null) {
134 // should only happen for deserialized and old objects before
135 // METAMODEL-220 introduced dynamic schemes
136 return SCHEME_HDFS;
137 }
138 return _scheme;
139 }
140
141 public String getFilepath() {
142 return _filepath;
143 }
144
145 public String getHostname() {
146 return _hostname;
147 }
148
149 public int getPort() {
150 return _port;
151 }
152
153 public String getHadoopConfDir() {
154 return _hadoopConfDir;
155 }
156
157 @Override
158 public String getName() {
159 final int lastSlash = _filepath.lastIndexOf('/');
160 if (lastSlash != -1) {
161 return _filepath.substring(lastSlash + 1);
162 }
163 return _filepath;
164 }
165
166 @Override
167 public String getQualifiedPath() {
168 final StringBuilder sb = new StringBuilder();
169 sb.append(getScheme());
170 sb.append("://");
171 if (_hostname != null) {
172 sb.append(_hostname);
173 }
174 if (_port > 0) {
175 sb.append(':');
176 sb.append(_port);
177 }
178 sb.append(_filepath);
179 return sb.toString();
180 }
181
182 @Override
183 public boolean isReadOnly() {
184 // We assume it is not read-only
185 return false;
186 }
187
188 @Override
189 public boolean isExists() {
190 final FileSystem fs = getHadoopFileSystem();
191 try {
192 return fs.exists(getHadoopPath());
193 } catch (Exception e) {
194 throw wrapException(e);
195 } finally {
196 FileHelper.safeClose(fs);
197 }
198 }
199
200 @Override
201 public long getSize() {
202 final FileSystem fs = getHadoopFileSystem();
203 try {
204 if (fs.getFileStatus(getHadoopPath()).isFile()) {
205 return fs.getFileStatus(getHadoopPath()).getLen();
206 } else {
207 return fs.getContentSummary(getHadoopPath()).getLength();
208 }
209 } catch (Exception e) {
210 throw wrapException(e);
211 } finally {
212 FileHelper.safeClose(fs);
213 }
214 }
215
216 @Override
217 public long getLastModified() {
218 final FileSystem fs = getHadoopFileSystem();
219 try {
220 return fs.getFileStatus(getHadoopPath()).getModificationTime();
221 } catch (Exception e) {
222 throw wrapException(e);
223 } finally {
224 FileHelper.safeClose(fs);
225 }
226 }
227
228 @Override
229 public OutputStream write() throws ResourceException {
230 final FileSystem fs = getHadoopFileSystem();
231 try {
232 final FSDataOutputStream out = fs.create(getHadoopPath(), true);
233 return new HdfsFileOutputStream(out, fs);
234 } catch (IOException e) {
235 // we can close 'fs' in case of an exception
236 FileHelper.safeClose(fs);
237 throw wrapException(e);
238 }
239 }
240
241 @Override
242 public OutputStream append() throws ResourceException {
243 final FileSystem fs = getHadoopFileSystem();
244 try {
245 final FSDataOutputStream out = fs.append(getHadoopPath());
246 return new HdfsFileOutputStream(out, fs);
247 } catch (IOException e) {
248 // we can close 'fs' in case of an exception
249 FileHelper.safeClose(fs);
250 throw wrapException(e);
251 }
252 }
253
254 @Override
255 public InputStream read() throws ResourceException {
256 final FileSystem fs = getHadoopFileSystem();
257 final InputStream in;
258 try {
259 final Path hadoopPath = getHadoopPath();
260 // return a wrapper InputStream which manages the 'fs' closeable
261 if (fs.getFileStatus(hadoopPath).isFile()) {
262 in = fs.open(hadoopPath);
263 return new HdfsFileInputStream(in, fs);
264 } else {
265 return new HdfsDirectoryInputStream(hadoopPath, fs);
266 }
267 } catch (Exception e) {
268 // we can close 'fs' in case of an exception
269 FileHelper.safeClose(fs);
270 throw wrapException(e);
271 }
272 }
273
274 private RuntimeException wrapException(Exception e) {
275 if (e instanceof RuntimeException) {
276 return (RuntimeException) e;
277 }
278 return new MetaModelException(e);
279 }
280
281 public Configuration getHadoopConfiguration() {
282 final Configuration conf = new Configuration();
283 if (_hostname != null && _port > 0) {
284 conf.set("fs.defaultFS", getScheme() + "://" + _hostname + ":" + _port);
285 }
286
287 final File hadoopConfigurationDirectory = getHadoopConfigurationDirectoryToUse();
288 if (hadoopConfigurationDirectory != null) {
289 addResourceIfExists(conf, hadoopConfigurationDirectory, "core-site.xml");
290 addResourceIfExists(conf, hadoopConfigurationDirectory, "hdfs-site.xml");
291 }
292
293 return conf;
294 }
295
296 private void addResourceIfExists(Configuration conf, File hadoopConfigurationDirectory, String filename) {
297 final File file = new File(hadoopConfigurationDirectory, filename);
298 if (file.exists()) {
299 final InputStream inputStream = FileHelper.getInputStream(file);
300 conf.addResource(inputStream, filename);
301 }
302 }
303
304 private File getHadoopConfigurationDirectoryToUse() {
305 File candidate = getDirectoryIfExists(null, _hadoopConfDir);
306 if ("true".equals(System.getProperty(SYSTEM_PROPERTY_HADOOP_CONF_DIR_ENABLED))) {
307 candidate = getDirectoryIfExists(candidate, System.getProperty("YARN_CONF_DIR"));
308 candidate = getDirectoryIfExists(candidate, System.getProperty("HADOOP_CONF_DIR"));
309 candidate = getDirectoryIfExists(candidate, System.getenv("YARN_CONF_DIR"));
310 candidate = getDirectoryIfExists(candidate, System.getenv("HADOOP_CONF_DIR"));
311 }
312 return candidate;
313 }
314
315 /**
316 * Gets a candidate directory based on a file path, if it exists, and if it
317 * another candidate hasn't already been resolved.
318 *
319 * @param existingCandidate
320 * an existing candidate directory. If this is non-null, it will
321 * be returned immediately.
322 * @param path
323 * the path of a directory
324 * @return a candidate directory, or null if none was resolved.
325 */
326 private File getDirectoryIfExists(File existingCandidate, String path) {
327 if (existingCandidate != null) {
328 return existingCandidate;
329 }
330 if (!Strings.isNullOrEmpty(path)) {
331 final File directory = new File(path);
332 if (directory.exists() && directory.isDirectory()) {
333 return directory;
334 }
335 }
336 return null;
337 }
338
339 public FileSystem getHadoopFileSystem() {
340 try {
341 return FileSystem.newInstance(getHadoopConfiguration());
342 } catch (IOException e) {
343 throw new MetaModelException("Could not connect to HDFS: " + e.getMessage(), e);
344 }
345 }
346
347 public Path getHadoopPath() {
348 if (_path == null) {
349 _path = new Path(_filepath);
350 }
351 return _path;
352 }
353
354 @Override
355 public int hashCode() {
356 return Objects.hash(getScheme(), _filepath, _hostname, _port, _hadoopConfDir);
357 }
358
359 @Override
360 public boolean equals(Object obj) {
361 if (this == obj) {
362 return true;
363 }
364 if (obj instanceof HdfsResource) {
365 final HdfsResource other = (HdfsResource) obj;
366 return Objects.equals(getScheme(), other.getScheme()) && Objects.equals(_filepath, other._filepath)
367 && Objects.equals(_hostname, other._hostname) && Objects.equals(_port, other._port)
368 && Objects.equals(_hadoopConfDir, other._hadoopConfDir);
369 }
370 return false;
371 }
372 }