1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.vfs2.provider.hdfs;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.UnsupportedEncodingException;
22 import java.net.URL;
23 import java.net.URLDecoder;
24 import java.util.Collection;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.commons.vfs2.CacheStrategy;
29 import org.apache.commons.vfs2.Capability;
30 import org.apache.commons.vfs2.FileName;
31 import org.apache.commons.vfs2.FileObject;
32 import org.apache.commons.vfs2.FileSystemException;
33 import org.apache.commons.vfs2.FileSystemOptions;
34 import org.apache.commons.vfs2.provider.AbstractFileName;
35 import org.apache.commons.vfs2.provider.AbstractFileSystem;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39
40
41
42
43
44
45 public class HdfsFileSystem extends AbstractFileSystem {
46
47 private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
48
49 private FileSystem fs;
50
51
52
53
54
55
56
57 protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions) {
58 super(rootName, null, fileSystemOptions);
59 }
60
61
62
63
64 @Override
65 protected void addCapabilities(final Collection<Capability> capabilities) {
66 capabilities.addAll(HdfsFileProvider.CAPABILITIES);
67 }
68
69
70
71
72 @Override
73 public void close() {
74 try {
75 if (null != fs) {
76 fs.close();
77 }
78 } catch (final IOException e) {
79 throw new RuntimeException("Error closing HDFS client", e);
80 }
81 super.close();
82 }
83
84
85
86
87 @Override
88 protected FileObject createFile(final AbstractFileName name) throws Exception {
89 return resolveFile(name);
90 }
91
92
93
94
95
96
97
98
99 @Override
100 public FileObject resolveFile(final FileName name) throws FileSystemException {
101 synchronized (this) {
102 if (this.fs == null) {
103 final String hdfsUri = name.getRootURI();
104 final HdfsFileSystemConfigBuilder builder = HdfsFileSystemConfigBuilder.getInstance();
105 final FileSystemOptions options = getFileSystemOptions();
106 final String[] configNames = builder.getConfigNames(options);
107 final Path[] configPaths = builder.getConfigPaths(options);
108 final URL[] configURLs = builder.getConfigURLs(options);
109 final InputStream configStream = builder.getConfigInputStream(options);
110 final Configuration configConfiguration = builder.getConfigConfiguration(options);
111
112 final Configuration conf = new Configuration(true);
113 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
114
115
116
117 if (configNames != null) {
118 for (final String configName : configNames) {
119 log.debug("Adding HDFS configuration resource: " + configName);
120 conf.addResource(configName);
121 }
122 }
123 if (configPaths != null) {
124 for (final Path path : configPaths) {
125 log.debug("Adding HDFS configuration path: " + path);
126 conf.addResource(path);
127 }
128 }
129 if (configURLs != null) {
130 for (final URL url : configURLs) {
131 log.debug("Adding HDFS configuration URL: " + url);
132 conf.addResource(url);
133 }
134 }
135 if (configStream != null) {
136 log.debug("Adding HDFS configuration stream");
137 conf.addResource(configStream);
138 }
139 if (configConfiguration != null) {
140 log.debug("Adding HDFS configuration object");
141 conf.addResource(configConfiguration);
142 }
143
144 try {
145 fs = FileSystem.get(conf);
146 } catch (final IOException e) {
147 log.error("Error connecting to filesystem " + hdfsUri, e);
148 throw new FileSystemException("Error connecting to filesystem " + hdfsUri, e);
149 }
150 }
151 }
152
153 final boolean useCache = null != getFileSystemManager().getFilesCache();
154 FileObject fileObject = useCache ? getFileFromCache(name) : null;
155 if (null == fileObject) {
156 String path;
157 try {
158 path = URLDecoder.decode(name.getPath(), "UTF-8");
159 } catch (final UnsupportedEncodingException e) {
160 path = name.getPath();
161 }
162 final Path filePath = new Path(path);
163 fileObject = new HdfsFileObject((AbstractFileName) name, this, fs, filePath);
164 fileObject = decorateFileObject(fileObject);
165 if (useCache) {
166 this.putFileToCache(fileObject);
167 }
168 }
169
170
171
172 if (getFileSystemManager().getCacheStrategy().equals(CacheStrategy.ON_RESOLVE)) {
173 fileObject.refresh();
174 }
175 return fileObject;
176 }
177
178 }