METAMODEL-1205: Fixed CassandraUnit, Guava, Hadoop for JDK9+
[metamodel.git] / hadoop / src / main / java / org / apache / metamodel / util / HdfsDirectoryInputStream.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.metamodel.util;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.util.Arrays;
24
25 import org.apache.hadoop.fs.FileStatus;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.fs.PathFilter;
29
30 /**
31 * An {@link InputStream} that represents all the data found in a directory on
32 * HDFS. This {@link InputStream} is used by {@link HdfsResource#read()} when
33 * pointed to a directory.
34 */
35 class HdfsDirectoryInputStream extends AbstractDirectoryInputStream<FileStatus> {
36
37 private final Path _hadoopPath;
38 private final FileSystem _fs;
39
40 public HdfsDirectoryInputStream(final Path hadoopPath, final FileSystem fs) {
41 _hadoopPath = hadoopPath;
42 _fs = fs;
43 FileStatus[] fileStatuses;
44 try {
45 fileStatuses = _fs.listStatus(_hadoopPath, new PathFilter() {
46 @Override
47 public boolean accept(final Path path) {
48 try {
49 return _fs.getFileStatus(path).isFile();
50 } catch (IOException e) {
51 return false;
52 }
53 }
54 });
55 // Natural ordering is the URL
56 Arrays.sort(fileStatuses);
57 } catch (IOException e) {
58 fileStatuses = new FileStatus[0];
59 }
60 _files = fileStatuses;
61 }
62
63 @Override
64 public InputStream openStream(final int index) throws IOException {
65 final Path nextPath = _files[index].getPath();
66 return _fs.open(nextPath);
67 }
68
69 @Override
70 public void close() throws IOException {
71 super.close();
72 FileHelper.safeClose(_fs);
73 }
74 }