Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ Sample specs:
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `hdfs`.|None|yes|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.<br /><br />Empty files located under one of the given paths will be skipped. Hidden files and directories whose names start with `_` or `.` are automatically excluded.<br /><br />When a path points to a directory, only the immediate files in that directory are listed; subdirectories are not traversed. To ingest files from nested directories, use glob patterns such as `hdfs://namenode_host/data/**/*.json`.|None|yes|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (URI) and `__file_path` (path component of URI).|None|no|

You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -66,6 +62,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -147,45 +144,59 @@ private static void throwIfInvalidProtocol(HdfsInputSourceConfig config, String
}
}

/**
* Matches Hadoop's FileInputFormat hidden-file filter: rejects paths whose name starts with '_' or '.'.
*/
private static boolean isHiddenPath(Path path)
{
final String name = path.getName();
return name.startsWith("_") || name.startsWith(".");
}

public static Collection<Path> getPaths(List<String> inputPaths, Configuration configuration) throws IOException
{
if (inputPaths.isEmpty()) {
return Collections.emptySet();
}

// Use FileInputFormat to read splits. To do this, we need to make a fake Job.
Job job = Job.getInstance(configuration);

// Add paths to the fake JobContext.
for (String inputPath : inputPaths) {
FileInputFormat.addInputPaths(job, inputPath);
final Set<Path> paths = new LinkedHashSet<>();
for (final String inputPath : inputPaths) {
final String[] splitPaths = org.apache.hadoop.util.StringUtils.split(inputPath);
for (final String singlePath : splitPaths) {
final Path p = new Path(singlePath);
final FileSystem fs = p.getFileSystem(configuration);
final FileStatus[] statuses = fs.globStatus(p);
if (statuses != null) {
for (final FileStatus status : statuses) {
if (isHiddenPath(status.getPath())) {
continue;
}
if (status.isDirectory()) {
addFilesFromDirectory(fs, status.getPath(), paths);
} else if (status.getLen() > 0) {
paths.add(status.getPath());
}
}
}
}
}

return new HdfsFileInputFormat().getSplits(job)
.stream()
.filter(split -> ((FileSplit) split).getLength() > 0)
.map(split -> ((FileSplit) split).getPath())
.collect(Collectors.toSet());
return paths;
}

/**
* Helper for leveraging hadoop code to interpret HDFS paths with globs
* Lists files in a directory non-recursively, matching the behavior of Hadoop's FileInputFormat
* when mapreduce.input.fileinputformat.input.dir.recursive is not set (the default).
* Hidden files (names starting with '_' or '.') and subdirectories are skipped.
*/
private static class HdfsFileInputFormat extends FileInputFormat<Object, Object>
private static void addFilesFromDirectory(FileSystem fs, Path dir, Set<Path> paths) throws IOException
{
@Override
public RecordReader<Object, Object> createRecordReader(
org.apache.hadoop.mapreduce.InputSplit inputSplit,
TaskAttemptContext taskAttemptContext
)
{
throw new UnsupportedOperationException();
}

@Override
protected boolean isSplitable(JobContext context, Path filename)
{
return false; // prevent generating extra paths
final FileStatus[] children = fs.listStatus(dir);
if (children != null) {
for (final FileStatus child : children) {
if (!child.isDirectory() && !isHiddenPath(child.getPath()) && child.getLen() > 0) {
paths.add(child.getPath());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -490,6 +491,233 @@ public void testSystemFields()
}
}

public static class GetPathsTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

private FileSystem fileSystem;
private Configuration configuration;

@Before
public void setup() throws IOException
{
final File dir = temporaryFolder.getRoot();
configuration = new Configuration(true);
fileSystem = new LocalFileSystem();
fileSystem.initialize(dir.toURI(), configuration);
fileSystem.setWorkingDirectory(new Path(dir.getAbsolutePath()));
}

@After
public void teardown() throws IOException
{
fileSystem.close();
}

@Test
public void testGetPathsWithGlobMatchingNoFiles() throws IOException
{
final Collection<Path> paths = HdfsInputSource.getPaths(
Collections.singletonList(fileSystem.getWorkingDirectory() + "/nonexistent*"),
configuration
);
Assert.assertTrue(paths.isEmpty());
}

@Test
public void testGetPathsFiltersZeroLengthFiles() throws IOException
{
// Create an empty file (zero length)
final Path emptyFile = new Path("empty_file");
fileSystem.create(emptyFile).close();

// Create a non-empty file
final Path nonEmptyFile = new Path("non_empty_file");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(nonEmptyFile), StandardCharsets.UTF_8)
)) {
writer.write("data");
}

final Collection<Path> paths = HdfsInputSource.getPaths(
Collections.singletonList(fileSystem.makeQualified(new Path("*_file")).toString()),
configuration
);

Assert.assertEquals(1, paths.size());
Assert.assertTrue(paths.contains(fileSystem.makeQualified(nonEmptyFile)));
}

@Test
public void testGetPathsWithMultipleInputPaths() throws IOException
{
final Path fileA = new Path("groupA_1");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(fileA), StandardCharsets.UTF_8)
)) {
writer.write("a");
}

final Path fileB = new Path("groupB_1");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(fileB), StandardCharsets.UTF_8)
)) {
writer.write("b");
}

final Collection<Path> paths = HdfsInputSource.getPaths(
Arrays.asList(
fileSystem.makeQualified(new Path("groupA*")).toString(),
fileSystem.makeQualified(new Path("groupB*")).toString()
),
configuration
);

Assert.assertEquals(2, paths.size());
Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileA)));
Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileB)));
}

@Test
public void testGetPathsWithCommaSeparatedString() throws IOException
{
final Path fileA = new Path("comma_a");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(fileA), StandardCharsets.UTF_8)
)) {
writer.write("a");
}

final Path fileB = new Path("comma_b");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(fileB), StandardCharsets.UTF_8)
)) {
writer.write("b");
}

final String commaSeparated =
fileSystem.makeQualified(fileA) + "," + fileSystem.makeQualified(fileB);
final Collection<Path> paths = HdfsInputSource.getPaths(
Collections.singletonList(commaSeparated),
configuration
);

Assert.assertEquals(2, paths.size());
Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileA)));
Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileB)));
}

@Test
public void testGetPathsFiltersHiddenFiles() throws IOException
{
final Path visibleFile = new Path("visible");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(visibleFile), StandardCharsets.UTF_8)
)) {
writer.write("data");
}

final Path dotFile = new Path(".hidden");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(dotFile), StandardCharsets.UTF_8)
)) {
writer.write("data");
}

final Path underscoreFile = new Path("_metadata");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(underscoreFile), StandardCharsets.UTF_8)
)) {
writer.write("data");
}

final Collection<Path> paths = HdfsInputSource.getPaths(
Collections.singletonList(fileSystem.makeQualified(new Path("*")).toString()),
configuration
);

Assert.assertEquals(1, paths.size());
Assert.assertTrue(paths.contains(fileSystem.makeQualified(visibleFile)));
Assert.assertFalse(paths.contains(fileSystem.makeQualified(dotFile)));
Assert.assertFalse(paths.contains(fileSystem.makeQualified(underscoreFile)));
}

@Test
public void testGetPathsDirectoryListsFilesNonRecursively() throws IOException
{
final Path dir = new Path("mydir");
fileSystem.mkdirs(dir);

final Path fileInDir = new Path(dir, "file1");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(fileInDir), StandardCharsets.UTF_8)
)) {
writer.write("data");
}

// Create a nested subdirectory with a file -- should NOT be included
final Path subDir = new Path(dir, "subdir");
fileSystem.mkdirs(subDir);

final Path nestedFile = new Path(subDir, "nested_file");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(nestedFile), StandardCharsets.UTF_8)
)) {
writer.write("nested");
}

// Create a hidden file in the directory -- should NOT be included
final Path hiddenInDir = new Path(dir, ".hidden_in_dir");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(hiddenInDir), StandardCharsets.UTF_8)
)) {
writer.write("hidden");
}

final Collection<Path> paths = HdfsInputSource.getPaths(
Collections.singletonList(fileSystem.makeQualified(dir).toString()),
configuration
);

Assert.assertEquals(1, paths.size());
Assert.assertTrue(paths.contains(fileSystem.makeQualified(fileInDir)));
}

@Test
public void testGetPathsSkipsHiddenDirectories() throws IOException
{
final Path visibleDir = new Path("visible_dir");
fileSystem.mkdirs(visibleDir);

final Path visibleFile = new Path(visibleDir, "data");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(visibleFile), StandardCharsets.UTF_8)
)) {
writer.write("data");
}

final Path hiddenDir = new Path(".hidden_dir");
fileSystem.mkdirs(hiddenDir);

final Path hiddenFile = new Path(hiddenDir, "should_skip");
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(fileSystem.create(hiddenFile), StandardCharsets.UTF_8)
)) {
writer.write("skip");
}

final Collection<Path> paths = HdfsInputSource.getPaths(
Collections.singletonList(fileSystem.makeQualified(new Path("*dir")).toString()),
configuration
);

Assert.assertEquals(1, paths.size());
Assert.assertTrue(paths.contains(fileSystem.makeQualified(visibleFile)));
Assert.assertFalse(paths.contains(fileSystem.makeQualified(hiddenFile)));
}
}

public static class EqualsTest
{
@Test
Expand Down
Loading