java interface for hadoop hdfs filesystems – examples and concept

Hadoop has an abstract notion of filesystems, of which HDFS is just one implementation. The Java abstract class org.apache.hadoop.fs.FileSystem represents the client interface to a filesystem in Hadoop, and there are several concrete implementations.Hadoop is written in Java, so most Hadoop filesystem interactions are mediated through the Java API. The filesystem shell, for example, is a Java application that uses the Java FileSystem class to provide filesystem operations.By exposing its filesystem interface as a Java API, Hadoop makes it awkward for non-Java applications to access HDFS. The HTTP REST API exposed by the WebHDFS protocol makes it easier for other languages to interact with HDFS. Note that the HTTP interface is slower than the native Java client, so should be avoided for very large data transfers if possible.

There are two ways of accessing HDFS over HTTP: directly, where the HDFS daemons serve HTTP requests to clients and via a proxy, which accesses HDFS on the client’s behalf using the usual DistributedFileSystem API.

In the first case, the embedded web servers in the namenode and datanodes act as WebHDFS endpoints. (WebHDFS is enabled by default, since dfs.webhdfs.enabled is set to true.) File metadata operations are handled by the namenode, while file read (and write) operations are sent first to the namenode, which sends an HTTP redirect to the client indicating the datanode to stream file data from (or to).

The second way of accessing HDFS over HTTP relies on one or more standalone proxy servers. All traffic to the cluster passes through the proxy, so the client never accesses the namenode or datanode directly. This allows for stricter firewall and bandwidth-limiting policies to be put in place. The HttpFS proxy exposes the same HTTP (and HTTPS) interface as WebHDFS, so clients can access both using webhdfs (or swebhdfs) URIs. The HttpFS proxy is started independently of the namenode and datanode daemons, using the httpfs.sh script, and by default listens on a different port number 14000.

The Java Interface

In this section, we dig into the Hadoop FileSystem class: the API for interacting with one of Hadoop’s filesystems.

Reading Data Using the FileSystem API

A file in a Hadoop filesystem is represented by a Hadoop Path object. FileSystem is a general filesystem API, so the first step is to retrieve an instance for the filesystem we want to use—HDFS, in this case. There are several static factory methods for getting a FileSystem instance

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException
public static LocalFileSystem getLocal(Configuration conf) throws IOException

A Configuration object encapsulates a client or server’s configuration, which is set using configuration files read from the classpath, such as core-site.xml. The first method returns the default filesystem (as specified in core-site.xml, or the default local filesystem if not specified there). The second uses the given URI’s scheme and authority to determine the filesystem to use, falling back to the default filesystem if no scheme is specified in the given URI. The third retrieves the filesystem as the given user, which is important in the context of security.The fourth one retrieves a local filesystem instance.

With a FileSystem instance in hand, we invoke an open() method to get the input stream for a file.The first method uses a default buffer size of 4 KB.The second one gives an option to user to specify the buffer size.

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
FSDataInputStream

The open() method on FileSystem actually returns an FSDataInputStream rather than a standard java.io class. This class is a specialization of java.io.DataInputStream with support for random access, so you can read from any part of the stream:

package org.apache.hadoop.fs;

public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable {

}

The Seekable interface permits seeking to a position in the file and provides a query method for the current offset from the start of the file (getPos()) . Calling seek() with a position that is greater than the length of the file will result in an IOException. Unlike the skip() method of java.io.InputStream, which positions the stream at a point later than the current position, seek() can move to an arbitrary,
absolute position in the file.

public interface Seekable {
void seek(long pos) throws IOException;
long getPos() throws IOException;
}

FSDataInputStream also implements the PositionedReadable interface for reading parts of a file at a given offset.


public interface PositionedReadable {
public int read(long position, byte[] buffer, int offset, int length) throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}

The read() method reads up to length bytes from the given position in the file into the buffer at the given offset in the buffer. The return value is the number of bytes actually read; callers should check this value, as it may be less than length.The readFully() methods will read length bytes into the buffer, unless the end of the file is reached, in which case an EOFException is thrown.

Finally, bear in mind that calling seek() is a relatively expensive operation and should be done sparingly. You should structure your application access patterns to rely on streaming data (by using MapReduce, for example) rather than performing a large number of seeks.

Writing Data

The FileSystem class has a number of methods for creating a file. The simplest is the method that takes a Path object for the file to be created and returns an output stream to write to.

public FSDataOutputStream create(Path f) throws IOException

There are overloaded versions of this method that allow you to specify whether to forcibly overwrite existing files, the replication factor of the file, the buffer size to use when writing the file, the block size for the file, and file permissions.

Note : The create() methods create any parent directories of the file to be written that don’t already exist. Though convenient, this behavior may be unexpected. If you want the write to fail when the parent directory doesn’t exist, you should check for the existence of the parent directory first by calling the exists() method. Alternatively, use FileContext, which allows you to control whether parent directories are created or not.

There’s also an overloaded method for passing a callback interface, Progressable, so your application can be notified of the progress of the data being written to the datanodes.

package org.apache.hadoop.util;
public interface Progressable {
public void progress();
}

As an alternative to creating a new file, you can append to an existing file using the append() method (there are also some other overloaded versions)

public FSDataOutputStream append(Path f) throws IOException

The append operation allows a single writer to modify an already written file by opening it and writing data from the final offset in the file. With this API, applications that produce unbounded files, such as logfiles, can write to an existing file after having closed it. The append operation is optional and not implemented by all Hadoop filesystems.Below example shows how to copy a local file to a Hadoop filesystem. We illustrate progress by printing a period every time the progress() method is called by Hadoop, which is after each 64 KB packet of data is written to the datanode pipeline.

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class CopyFileWithProgress {

public static void main(String[] args) throws Exception {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}

}

FSDataOutputStream

The create() method on FileSystem returns an FSDataOutputStream, which, like FSDataInputStream, has a method for querying the current position in the file:


package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException {

}
}

However, unlike FSDataInputStream, FSDataOutputStream does not permit seeking. This is because HDFS allows only sequential writes to an open file or appends to an already written file. In other words, there is no support for writing to anywhere other than the end of the file, so there is no value in being able to seek while writing.

FileSystem provides a method to create a directory also

public boolean mkdirs(Path f) throws IOException

This method creates all of the necessary parent directories if they don’t already exists and returns true if its success full.

Querying the Filesystem

The FileStatus class encapsulates filesystem metadata for files and directories, including file length, block size, replication, modification time, ownership, and permission information. And also it gives the ability to navigate its directory structure and retrieve information about the files and directories

The method getFileStatus() on FileSystem provides a way of getting a FileStatus object for a single file or directory.

If no file or directory exists, a FileNotFoundException is thrown. However, if you are interested only in the existence of a file or directory, the exists() method on FileSystem is more convenient:


public boolean exists(Path f) throws IOException

Some examples of using the java api for hdfs

Scenario 1 – Given the namenode url,source directory and target directory copy all the files inside the source directory to target directory.


public class HdfsFileSystem {

private static Configuration conf = new Configuration();

public static void copyfiles(String url, String sourceDirectory, String destDirectory) throws Exception {
FileSystem fs = FileSystem.get(URI.create(url), conf);

Path[] paths = getAbsolutePaths(sourceDirectory);

for (Path path : paths) {

if (!fs.isDirectory(path) && path.getName().endsWith(".tar.gz")) {
FileUtil.copy(fs, path, fs, new Path(sourceDirectory + File.separator + path.getName()), false, conf);
}
}

}

public static Path[] getAbsolutePaths(String path) throws IOException {

FileSystem fs = FileSystem.get(URI.create(path), conf);

Path fsPath = new Path(path);

if (fs.exists(fsPath)) {
FileStatus[] status = fs.listStatus(fsPath);
return FileUtil.stat2Paths(status);

} else {
return null;
}

}

}

Scenario 2 – Given the namenode url and the path create a success trigger file with can be used to mark successfull completion of a job.


public static void createSuccessFile(String url, String path) throws IOException {
FileSystem fs = FileSystem.get(URI.create(path), conf);
fs.create(new Path(path));
}

Scenario 3 – Given the url to the path of the file return the content of the file in string


public static String getFileContent(String url) throws IOException {

FileSystem fs = FileSystem.get(URI.create(url), conf);
InputStream content = null;
String fileContent = null;

try {
content = fs.open(new Path(url));
fileContent = org.apache.commons.io.IOUtils.toString(content);

} finally {

IOUtils.closeStream(content);

}

return fileContent;

}

Scenario 4 – renaming the filename.


public static boolean renameFile(String input, String output) throws IOException {
FileSystem fs = FileSystem.get(URI.create(input), conf);

return fs.rename(new Path(input), new Path(output));
}

Scenario 5 – Given a fileContent in the form of string create a file and write the content of the string to the file.


public static void writeFileContent(String ouputUrl, String fileContents) throws IOException {
FileSystem fs = FileSystem.get(URI.create(ouputUrl), conf);
InputStream ins = new ByteArrayInputStream(fileContents.getBytes());
OutputStream os = fs.create(new Path(ouputUrl));
try {
IOUtils.copyBytes(ins, os, 4096, false);
} finally {
IOUtils.closeStream(ins);
IOUtils.closeStream(os);
}

}

Scenario 6 – Given a directory location return the list of files inside the directory.

public static List<String> getFileNames(String path) throws IOException {

FileSystem fs = FileSystem.get(URI.create(path), conf);
Path fsPath = new Path(path);
FileStatus[] status = fs.listStatus(fsPath);
Path[] listedPaths = FileUtil.stat2Paths(status);
List<String> files = new ArrayList<String>();
for (Path file : listedPaths) {

files.add(file.getName());

}

return files;

}

Scenario 7 – Given a base path and the list of file inside the base path delete all the files in the list.

public static void deleteFiles(List<String> listOfFiles, String basePath) throws Exception {

System.out.println("Delete many files function called with base path :" + basePath);
FileSystem fs = FileSystem.get(URI.create(basePath), conf);
for (String filePath : listOfFiles) {

Path path = new Path(basePath + File.separator + filePath);
boolean test = fs.delete(path, false);
if (test) {
System.out.println("File deleted successfull : " + basePath + File.separator + filePath);
} else {
System.out.println("File deletion failed : " + basePath + File.separator + filePath);
}

}

}

Scenario 8 – given a url check whether the file exists

public static boolean fileExists(String url) throws IllegalArgumentException, IOException {

FileSystem fs = FileSystem.get(URI.create(url), conf);

return fs.exists(new Path(url));

}
File patterns

It is a common requirement to process sets of files in a single operation. For example, a MapReduce job for log processing might analyze a month’s worth of files contained in a number of directories. Rather than having to enumerate each file and directory to specify the input, it is convenient to use wildcard characters to match multiple files with a single expression, an operation that is known as globbing. Hadoop provides two FileSystem methods for processing globs:

public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException

The globStatus() methods return an array of FileStatus objects whose paths match the supplied pattern, sorted by path. An optional PathFilter can be specified to restrict the matches further.

PathFilter

Glob patterns are not always powerful enough to describe a set of files you want to access. For example, it is not generally possible to exclude a particular file using a glob pattern. The listStatus() and globStatus() methods of FileSystem take an optional PathFilter, which allows programmatic control over matching.

package org.apache.hadoop.fs;
public interface PathFilter {
boolean accept(Path path);
}

PathFilter is the equivalent of java.io.FileFilter for Path objects rather than File objects. The filter passes only those files that don’t match the regular expression. After the glob picks out an initial set of files to include, the filter is used to refine the results

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

public class HdfsPathFilter implements PathFilter {

private final String regex;

public HdfsPathFilter(String regex) {
this.regex = regex;
}

public boolean accept(Path path) {
return !path.toString().matches(regex);
}

}