View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.vfs2.provider.hdfs;
18  
19  import java.io.InputStream;
20  import java.io.IOException;
21  import java.io.UnsupportedEncodingException;
22  import java.net.URL;
23  import java.net.URLDecoder;
24  import java.util.Collection;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.commons.vfs2.CacheStrategy;
29  import org.apache.commons.vfs2.Capability;
30  import org.apache.commons.vfs2.FileName;
31  import org.apache.commons.vfs2.FileObject;
32  import org.apache.commons.vfs2.FileSystemException;
33  import org.apache.commons.vfs2.FileSystemOptions;
34  import org.apache.commons.vfs2.provider.AbstractFileName;
35  import org.apache.commons.vfs2.provider.AbstractFileSystem;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  
40  /**
41   * A VFS FileSystem that interacts with HDFS.
42   *
43   * @since 2.1
44   */
45  public class HdfsFileSystem extends AbstractFileSystem
46  {
47      private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
48  
49      private FileSystem fs;
50  
51      /**
52       * Construct file system.
53       *
54       * @param rootName Name of the root directory of this file system.
55       * @param fileSystemOptions options for this file system instance.
56       */
57      protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions)
58      {
59          super(rootName, null, fileSystemOptions);
60      }
61  
62      /**
63       * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(Collection)
64       */
65      @Override
66      protected void addCapabilities(final Collection<Capability> capabilities)
67      {
68          capabilities.addAll(HdfsFileProvider.CAPABILITIES);
69      }
70  
71      /**
72       * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
73       */
74      @Override
75      public void close()
76      {
77          try
78          {
79              if (null != fs)
80              {
81                  fs.close();
82              }
83          }
84          catch (final IOException e)
85          {
86              throw new RuntimeException("Error closing HDFS client", e);
87          }
88          super.close();
89      }
90  
91      /**
92       * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(AbstractFileName)
93       */
94      @Override
95      protected FileObject createFile(final AbstractFileName name) throws Exception
96      {
97          throw new FileSystemException("Operation not supported");
98      }
99  
100     /**
101      * Resolve FileName into FileObject.
102      * @param name The name of a file on the HdfsFileSystem.
103      * @return resolved FileObject.
104      * @throws FileSystemException if an error occurred.
105      */
106     @Override
107     public FileObject resolveFile(final FileName name) throws FileSystemException
108     {
109         synchronized (this)
110         {
111             if (this.fs == null)
112             {
113                 final String hdfsUri = name.getRootURI();
114                 final HdfsFileSystemConfigBuilder builder = HdfsFileSystemConfigBuilder.getInstance();
115                 final FileSystemOptions options = getFileSystemOptions();
116                 final String[] configNames = builder.getConfigNames(options);
117                 final Path[] configPaths = builder.getConfigPaths(options);
118                 final URL[] configURLs = builder.getConfigURLs(options);
119                 final InputStream configStream = builder.getConfigInputStream(options);
120                 final Configuration configConfiguration = builder.getConfigConfiguration(options);
121 
122                 final Configuration conf = new Configuration(true);
123                 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
124 
125                 // Load any alternate configuration parameters that may have been specified
126                 // no matter where they might come from
127                 if (configNames != null)
128                 {
129                     for (String configName : configNames)
130                     {
131                         log.debug("Adding HDFS configuration resource: " + configName);
132                         conf.addResource(configName);
133                     }
134                 }
135                 if (configPaths != null)
136                 {
137                     for (Path path : configPaths)
138                     {
139                         log.debug("Adding HDFS configuration path: " + path);
140                         conf.addResource(path);
141                     }
142                 }
143                 if (configURLs != null)
144                 {
145                     for (URL url : configURLs)
146                     {
147                         log.debug("Adding HDFS configuration URL: " + url);
148                         conf.addResource(url);
149                     }
150                 }
151                 if (configStream != null)
152                 {
153                     log.debug("Adding HDFS configuration stream");
154                     conf.addResource(configStream);
155                 }
156                 if (configConfiguration != null)
157                 {
158                     log.debug("Adding HDFS configuration object");
159                     conf.addResource(configConfiguration);
160                 }
161 
162                 try
163                 {
164                     fs = FileSystem.get(conf);
165                 }
166                 catch (final IOException e)
167                 {
168                     log.error("Error connecting to filesystem " + hdfsUri, e);
169                     throw new FileSystemException("Error connecting to filesystem " + hdfsUri, e);
170                 }
171             }
172         }
173 
174         final boolean useCache = null != getContext().getFileSystemManager().getFilesCache();
175         FileObject file;
176         if (useCache)
177         {
178             file = this.getFileFromCache(name);
179         }
180         else
181         {
182             file = null;
183         }
184         if (null == file)
185         {
186             String path = null;
187             try
188             {
189                 path = URLDecoder.decode(name.getPath(), "UTF-8");
190             }
191             catch (final UnsupportedEncodingException e)
192             {
193                 path = name.getPath();
194             }
195             final Path filePath = new Path(path);
196             file = new HdfsFileObject((AbstractFileName) name, this, fs, filePath);
197             if (useCache)
198             {
199                 this.putFileToCache(file);
200             }
201         }
202         /**
203          * resync the file information if requested
204          */
205         if (getFileSystemManager().getCacheStrategy().equals(CacheStrategy.ON_RESOLVE))
206         {
207             file.refresh();
208         }
209         return file;
210     }
211 
212 }