View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.vfs2.provider.hdfs;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.UnsupportedEncodingException;
22  import java.net.URL;
23  import java.net.URLDecoder;
24  import java.util.Collection;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.commons.vfs2.CacheStrategy;
29  import org.apache.commons.vfs2.Capability;
30  import org.apache.commons.vfs2.FileName;
31  import org.apache.commons.vfs2.FileObject;
32  import org.apache.commons.vfs2.FileSystemException;
33  import org.apache.commons.vfs2.FileSystemOptions;
34  import org.apache.commons.vfs2.provider.AbstractFileName;
35  import org.apache.commons.vfs2.provider.AbstractFileSystem;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  
40  /**
41   * A VFS FileSystem that interacts with HDFS.
42   *
43   * @since 2.1
44   */
45  public class HdfsFileSystem extends AbstractFileSystem {
46  
47      private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
48  
49      private FileSystem fs;
50  
51      /**
52       * Construct file system.
53       *
54       * @param rootName Name of the root directory of this file system.
55       * @param fileSystemOptions options for this file system instance.
56       */
57      protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions) {
58          super(rootName, null, fileSystemOptions);
59      }
60  
61      /**
62       * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(Collection)
63       */
64      @Override
65      protected void addCapabilities(final Collection<Capability> capabilities) {
66          capabilities.addAll(HdfsFileProvider.CAPABILITIES);
67      }
68  
69      /**
70       * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
71       */
72      @Override
73      public void close() {
74          try {
75              if (null != fs) {
76                  fs.close();
77              }
78          } catch (final IOException e) {
79              throw new RuntimeException("Error closing HDFS client", e);
80          }
81          super.close();
82      }
83  
84      /**
85       * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(AbstractFileName)
86       */
87      @Override
88      protected FileObject createFile(final AbstractFileName name) throws Exception {
89          return resolveFile(name);
90      }
91  
92      /**
93       * Resolve FileName into FileObject.
94       *
95       * @param name The name of a file on the HdfsFileSystem.
96       * @return resolved FileObject.
97       * @throws FileSystemException if an error occurred.
98       */
99      @Override
100     public FileObject resolveFile(final FileName name) throws FileSystemException {
101         synchronized (this) {
102             if (this.fs == null) {
103                 final String hdfsUri = name.getRootURI();
104                 final HdfsFileSystemConfigBuilder builder = HdfsFileSystemConfigBuilder.getInstance();
105                 final FileSystemOptions options = getFileSystemOptions();
106                 final String[] configNames = builder.getConfigNames(options);
107                 final Path[] configPaths = builder.getConfigPaths(options);
108                 final URL[] configURLs = builder.getConfigURLs(options);
109                 final InputStream configStream = builder.getConfigInputStream(options);
110                 final Configuration configConfiguration = builder.getConfigConfiguration(options);
111 
112                 final Configuration conf = new Configuration(true);
113                 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
114 
115                 // Load any alternate configuration parameters that may have been specified
116                 // no matter where they might come from
117                 if (configNames != null) {
118                     for (final String configName : configNames) {
119                         log.debug("Adding HDFS configuration resource: " + configName);
120                         conf.addResource(configName);
121                     }
122                 }
123                 if (configPaths != null) {
124                     for (final Path path : configPaths) {
125                         log.debug("Adding HDFS configuration path: " + path);
126                         conf.addResource(path);
127                     }
128                 }
129                 if (configURLs != null) {
130                     for (final URL url : configURLs) {
131                         log.debug("Adding HDFS configuration URL: " + url);
132                         conf.addResource(url);
133                     }
134                 }
135                 if (configStream != null) {
136                     log.debug("Adding HDFS configuration stream");
137                     conf.addResource(configStream);
138                 }
139                 if (configConfiguration != null) {
140                     log.debug("Adding HDFS configuration object");
141                     conf.addResource(configConfiguration);
142                 }
143 
144                 try {
145                     fs = FileSystem.get(conf);
146                 } catch (final IOException e) {
147                     log.error("Error connecting to filesystem " + hdfsUri, e);
148                     throw new FileSystemException("Error connecting to filesystem " + hdfsUri, e);
149                 }
150             }
151         }
152 
153         final boolean useCache = null != getFileSystemManager().getFilesCache();
154         FileObject fileObject = useCache ? getFileFromCache(name) : null;
155         if (null == fileObject) {
156             String path;
157             try {
158                 path = URLDecoder.decode(name.getPath(), "UTF-8");
159             } catch (final UnsupportedEncodingException e) {
160                 path = name.getPath();
161             }
162             final Path filePath = new Path(path);
163             fileObject = new HdfsFileObject((AbstractFileName) name, this, fs, filePath);
164             fileObject = decorateFileObject(fileObject);
165             if (useCache) {
166                 this.putFileToCache(fileObject);
167             }
168         }
169         /*
170           resync the file information if requested
171          */
172         if (getFileSystemManager().getCacheStrategy().equals(CacheStrategy.ON_RESOLVE)) {
173             fileObject.refresh();
174         }
175         return fileObject;
176     }
177 
178 }