001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.vfs2.provider.hdfs;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.UnsupportedEncodingException;
022import java.net.URL;
023import java.net.URLDecoder;
024import java.nio.charset.StandardCharsets;
025import java.util.Collection;
026
027import org.apache.commons.io.function.Uncheck;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.commons.vfs2.CacheStrategy;
031import org.apache.commons.vfs2.Capability;
032import org.apache.commons.vfs2.FileName;
033import org.apache.commons.vfs2.FileObject;
034import org.apache.commons.vfs2.FileSystemException;
035import org.apache.commons.vfs2.FileSystemOptions;
036import org.apache.commons.vfs2.provider.AbstractFileName;
037import org.apache.commons.vfs2.provider.AbstractFileSystem;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041
042/**
043 * A VFS FileSystem that interacts with HDFS.
044 *
045 * @since 2.1
046 */
047public class HdfsFileSystem extends AbstractFileSystem {
048
049    private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
050
051    private FileSystem fs;
052
053    /**
054     * Constructs file system.
055     *
056     * @param rootName Name of the root directory of this file system.
057     * @param fileSystemOptions options for this file system instance.
058     */
059    protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions) {
060        super(rootName, null, fileSystemOptions);
061    }
062
063    /**
064     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(Collection)
065     */
066    @Override
067    protected void addCapabilities(final Collection<Capability> capabilities) {
068        capabilities.addAll(HdfsFileProvider.CAPABILITIES);
069    }
070
071    /**
072     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
073     */
074    @Override
075    public void close() {
076        if (null != fs) {
077            Uncheck.run(fs::close);
078        }
079        super.close();
080    }
081
082    /**
083     * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(AbstractFileName)
084     */
085    @Override
086    protected FileObject createFile(final AbstractFileName name) throws Exception {
087        return resolveFile(name);
088    }
089
090    /**
091     * Resolve FileName into FileObject.
092     *
093     * @param name The name of a file on the HdfsFileSystem.
094     * @return resolved FileObject.
095     * @throws FileSystemException if an error occurred.
096     */
097    @Override
098    public FileObject resolveFile(final FileName name) throws FileSystemException {
099        synchronized (this) {
100            if (fs == null) {
101                final String hdfsUri = name.getRootURI();
102                final HdfsFileSystemConfigBuilder builder = HdfsFileSystemConfigBuilder.getInstance();
103                final FileSystemOptions options = getFileSystemOptions();
104                final String[] configNames = builder.getConfigNames(options);
105                final Path[] configPaths = builder.getConfigPaths(options);
106                final URL[] configURLs = builder.getConfigURLs(options);
107                final InputStream configStream = builder.getConfigInputStream(options);
108                final Configuration configConfiguration = builder.getConfigConfiguration(options);
109
110                final Configuration conf = new Configuration(true);
111                conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
112
113                // Load any alternate configuration parameters that may have been specified
114                // no matter where they might come from
115                if (configNames != null) {
116                    for (final String configName : configNames) {
117                        log.debug("Adding HDFS configuration resource: " + configName);
118                        conf.addResource(configName);
119                    }
120                }
121                if (configPaths != null) {
122                    for (final Path path : configPaths) {
123                        log.debug("Adding HDFS configuration path: " + path);
124                        conf.addResource(path);
125                    }
126                }
127                if (configURLs != null) {
128                    for (final URL url : configURLs) {
129                        log.debug("Adding HDFS configuration URL: " + url);
130                        conf.addResource(url);
131                    }
132                }
133                if (configStream != null) {
134                    log.debug("Adding HDFS configuration stream");
135                    conf.addResource(configStream);
136                }
137                if (configConfiguration != null) {
138                    log.debug("Adding HDFS configuration object");
139                    conf.addResource(configConfiguration);
140                }
141
142                try {
143                    fs = FileSystem.get(conf);
144                } catch (final IOException e) {
145                    log.error("Error connecting to filesystem " + hdfsUri, e);
146                    throw new FileSystemException("Error connecting to filesystem " + hdfsUri, e);
147                }
148            }
149        }
150
151        final boolean useCache = null != getFileSystemManager().getFilesCache();
152        FileObject fileObject = useCache ? getFileFromCache(name) : null;
153        if (null == fileObject) {
154            String path;
155            try {
156                path = URLDecoder.decode(name.getPath(), StandardCharsets.UTF_8.name());
157            } catch (final UnsupportedEncodingException e) {
158                path = name.getPath();
159            }
160            final Path filePath = new Path(path);
161            fileObject = new HdfsFileObject((AbstractFileName) name, this, fs, filePath);
162            fileObject = decorateFileObject(fileObject);
163            if (useCache) {
164                putFileToCache(fileObject);
165            }
166        }
167        /*
168          resync the file information if requested
169         */
170        if (getFileSystemManager().getCacheStrategy().equals(CacheStrategy.ON_RESOLVE)) {
171            fileObject.refresh();
172        }
173        return fileObject;
174    }
175
176}