Skip to content

Commit

Permalink
[HUDI-7745] Move Hadoop-dependent util methods to hudi-hadoop-common (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored May 11, 2024
1 parent 49072d1 commit 61f54a0
Show file tree
Hide file tree
Showing 49 changed files with 567 additions and 504 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private void addPartitionsToTableInternal(Table table, List<String> partitionsTo
try {
StorageDescriptor sd = table.storageDescriptor();
List<PartitionInput> partitionInputList = partitionsToAdd.stream().map(partition -> {
String fullPartitionPath = FSUtils.constructAbsolutePathInHadoopPath(s3aToS3(getBasePath()), partition).toString();
String fullPartitionPath = FSUtils.constructAbsolutePath(s3aToS3(getBasePath()), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath));
return PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
Expand Down Expand Up @@ -347,7 +347,7 @@ private void updatePartitionsToTableInternal(Table table, List<String> changedPa
try {
StorageDescriptor sd = table.storageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
String fullPartitionPath = FSUtils.constructAbsolutePathInHadoopPath(s3aToS3(getBasePath()), partition).toString();
String fullPartitionPath = FSUtils.constructAbsolutePath(s3aToS3(getBasePath()), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath));
PartitionInput partitionInput = PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
Expand All @@ -38,6 +37,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
Expand Down Expand Up @@ -308,14 +308,14 @@ public void clean(HoodieEngineContext context, int compactedVersions) throws IOE
manifestFilesToClean.add(fileStatus.getPath().toString());
}
});
FSUtils.deleteFilesParallelize(metaClient, manifestFilesToClean, context, config.getArchiveDeleteParallelism(),
HadoopFSUtils.deleteFilesParallelize(metaClient, manifestFilesToClean, context, config.getArchiveDeleteParallelism(),
false);
// delete the data files
List<String> dataFilesToClean = LSMTimeline.listAllMetaFiles(metaClient).stream()
.filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName()))
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
FSUtils.deleteFilesParallelize(metaClient, dataFilesToClean, context,
HadoopFSUtils.deleteFilesParallelize(metaClient, dataFilesToClean, context,
config.getArchiveDeleteParallelism(), false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -108,8 +109,8 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t
*/
public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable table, String partition) {
HoodieTableMetaClient metaClient = table.getMetaClient();
Path metadataPath = FSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(), partition);
Path partitionPath = FSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePathV2().toString(), partition);
Path metadataPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(), partition);
Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePathV2().toString(), partition);
try {
Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
String filename = fileStatus.getPath().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -534,7 +535,7 @@ public List<WriteStatus> close() {
// TODO we can actually deduce file size purely from AppendResult (based on offset and size
// of the appended block)
for (WriteStatus status : statuses) {
long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
long logFileSize = HadoopFSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
status.getStat().setFileSizeInBytes(logFileSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.io;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -40,6 +39,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -216,7 +216,7 @@ public Map<String, Long> getCDCWriteStats() {
for (Path cdcAbsPath : cdcAbsPaths) {
String cdcFileName = cdcAbsPath.getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
stats.put(cdcPath, FSUtils.getFileSize(fs, cdcAbsPath));
stats.put(cdcPath, HadoopFSUtils.getFileSize(fs, cdcAbsPath));
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to get cdc write stat", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.StoragePath;
Expand Down Expand Up @@ -244,7 +245,7 @@ protected void setupWriteStatus() throws IOException {
stat.setPath(new StoragePath(config.getBasePath()), path);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());

long fileSize = FSUtils.getFileSize(fs, new Path(path.toUri()));
long fileSize = HadoopFSUtils.getFileSize(fs, new Path(path.toUri()));
stat.setTotalWriteBytes(fileSize);
stat.setFileSizeInBytes(fileSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
Expand Down Expand Up @@ -438,7 +439,7 @@ public List<WriteStatus> close() {
fileWriter.close();
fileWriter = null;

long fileSizeInBytes = FSUtils.getFileSize(fs, new Path(newFilePath.toUri()));
long fileSizeInBytes = HadoopFSUtils.getFileSize(fs, new Path(newFilePath.toUri()));
HoodieWriteStat stat = writeStatus.getStat();

stat.setTotalWriteBytes(fileSizeInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
package org.apache.hudi.table.action.bootstrap;

import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -68,9 +67,9 @@ public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFi

for (FileStatus topLevelStatus: topLevelStatuses) {
if (topLevelStatus.isFile() && filePathFilter.accept(topLevelStatus.getPath())) {
String relativePath = FSUtils.getRelativePartitionPath(basePath, topLevelStatus.getPath().getParent());
String relativePath = HadoopFSUtils.getRelativePartitionPath(basePath, topLevelStatus.getPath().getParent());
Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count();
HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(topLevelStatus);
HoodieFileStatus hoodieFileStatus = HadoopFSUtils.fromFileStatus(topLevelStatus);
result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
} else if (topLevelStatus.isDirectory() && metaPathFilter.accept(topLevelStatus.getPath())) {
subDirectories.add(topLevelStatus.getPath().toString());
Expand All @@ -87,9 +86,9 @@ public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFi
while (itr.hasNext()) {
FileStatus status = itr.next();
if (pathFilter.accept(status.getPath())) {
String relativePath = FSUtils.getRelativePartitionPath(new Path(basePathStr), status.getPath().getParent());
String relativePath = HadoopFSUtils.getRelativePartitionPath(new Path(basePathStr), status.getPath().getParent());
Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count();
HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(status);
HoodieFileStatus hoodieFileStatus = HadoopFSUtils.fromFileStatus(status);
res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;

Expand Down Expand Up @@ -190,7 +191,7 @@ protected List<HoodieRollbackStat> deleteFiles(HoodieTableMetaClient metaClient,
String basePath = metaClient.getBasePath();
try {
Path fullDeletePath = new Path(fileToDelete);
String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
boolean isDeleted = true;
if (doDelete) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -184,14 +185,14 @@ private FileStatus[] listAllFilesSinceCommit(
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return HoodieTimeline.compareTimestamps(commit, HoodieTimeline.LESSER_THAN_OR_EQUALS,
fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
} else if (HadoopFSUtils.isLogFile(path)) {
String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(path.toUri()));
return completionTimeQueryView.isSlicedAfterOrOn(commit, fileCommitTime);
}
return false;
};
return ((FileSystem) metaClient.getStorage().getFileSystem())
.listStatus(FSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(), partitionPath),
.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(), partitionPath),
filter);
}

Expand Down Expand Up @@ -221,7 +222,7 @@ private FileStatus[] listBaseFilesToBeDeleted(String commit, String basefileExte
}
return false;
};
return fs.listStatus(FSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(), partitionPath), filter);
return fs.listStatus(HadoopFSUtils.constructAbsolutePathInHadoopPath(config.getBasePath(), partitionPath), filter);
}

private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, String partitionPath, String basePath,
Expand Down Expand Up @@ -282,7 +283,7 @@ private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback,
}

private static Path[] listFilesToBeDeleted(String basePath, String partitionPath) {
return new Path[] {FSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath)};
return new Path[] {HadoopFSUtils.constructAbsolutePathInHadoopPath(basePath, partitionPath)};
}

private static Path[] getFilesFromCommitMetadata(String basePath, HoodieCommitMetadata commitMetadata, String partitionPath) {
Expand All @@ -296,7 +297,7 @@ private static SerializablePathFilter getSerializablePathFilter(String basefileE
if (path.toString().endsWith(basefileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commit.equals(fileCommitTime);
} else if (FSUtils.isLogFile(path)) {
} else if (HadoopFSUtils.isLogFile(path)) {
// Since the baseCommitTime is the only commit for new log files, it's okay here
String fileCommitTime = FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(path.toUri()));
return commit.equals(fileCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
Expand Down Expand Up @@ -72,8 +72,8 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
IOType type = IOType.valueOf(typeStr);
String filePathStr = WriteMarkers.stripMarkerSuffix(markerFilePath);
Path filePath = new Path(basePath, filePathStr);
String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), filePath.getParent());
String fileId = FSUtils.getFileIdFromFilePath(filePath);
String partitionPath = HadoopFSUtils.getRelativePartitionPath(new Path(basePath), filePath.getParent());
String fileId = HadoopFSUtils.getFileIdFromFilePath(filePath);
switch (type) {
case MERGE:
case CREATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -54,7 +54,7 @@ public HoodieSparkBootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
@Override
protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) {
Schema schema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
Path filePath = FileStatusUtils.toPath(fs.getPath());
Path filePath = HadoopFSUtils.toPath(fs.getPath());
String extension = FSUtils.getFileExtension(filePath.getName());
if (PARQUET.getFileExtension().equals(extension)) {
return getBootstrapSourceSchemaParquet(writeConfig, context, filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@

package org.apache.hudi.table.action.bootstrap;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.avro.model.HoodieFileStatus;

import org.apache.hadoop.fs.Path;

import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;

public class MetadataBootstrapHandlerFactory {

public static BootstrapMetadataHandler getMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) {
Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
Path sourceFilePath = HadoopFSUtils.toPath(srcFileStatus.getPath());

String extension = FSUtils.getFileExtension(sourceFilePath.toString());
if (ORC.getFileExtension().equals(extension)) {
Expand Down
Loading

0 comments on commit 61f54a0

Please sign in to comment.