Skip to content

Commit

Permalink
[HUDI-7790] Revert changes in DFSPathSelector and UtilHelpers.readCon…
Browse files Browse the repository at this point in the history
…fig (apache#11293)
  • Loading branch information
yihua authored May 25, 2024
1 parent a0d60d9 commit eba95af
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.table.marker.WriteMarkersFactory;
Expand Down Expand Up @@ -483,7 +482,7 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {

TypedProperties properties = propsFilePath == null ? buildProperties(configs)
: readConfig(jsc.hadoopConfiguration(), new StoragePath(propsFilePath), configs).getProps(true);
: readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);

properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;

Expand Down Expand Up @@ -115,7 +114,7 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc, boole
SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
this.fs = HadoopFSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
this.props =
UtilHelpers.readConfig(fs.getConf(), new StoragePath(cfg.propsFilePath), cfg.configs).getProps();
UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps();
log.info("Creating workload generator with configs : {}", props.toString());
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.HoodieRepairTool;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down Expand Up @@ -133,7 +132,7 @@ private Map<String, String> getPropsAsMap(TypedProperties typedProperties) {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -67,31 +70,31 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
}

// obtain all eligible files for the batch
List<StoragePathInfo> eligibleFiles = new ArrayList<>();
List<StoragePathInfo> pathInfoList = storage.globEntries(
new StoragePath(getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH),
"*"));
List<FileStatus> eligibleFiles = new ArrayList<>();
FileStatus[] fileStatuses = fs.globStatus(
new Path(getStringWithAltKeys(props, DFSPathSelectorConfig.ROOT_INPUT_PATH), "*"));
// Say input data is as follow input/1, input/2, input/5 since 3,4 was rolled back and 5 is new generated data
// checkpoint from the latest commit metadata will be 2 since 3,4 has been rolled back. We need to set the
// next batch id correctly as 5 instead of 3
Option<String> correctBatchIdDueToRollback = Option.fromJavaOptional(pathInfoList.stream()
.map(f -> f.getPath().toString().split("/")[
f.getPath().toString().split("/").length - 1])
Option<String> correctBatchIdDueToRollback = Option.fromJavaOptional(Arrays.stream(fileStatuses)
.map(f -> f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1])
.filter(bid1 -> Integer.parseInt(bid1) > lastBatchId)
.min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), Integer.parseInt(bid2))));
if (correctBatchIdDueToRollback.isPresent()
&& Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) {
if (correctBatchIdDueToRollback.isPresent() && Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) {
nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get());
}
log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: "
+ sourceLimit + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
for (StoragePathInfo pathInfo : pathInfoList) {
if (!pathInfo.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
.anyMatch(pfx -> pathInfo.getPath().getName().startsWith(pfx))) {
log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit
+ " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
for (FileStatus fileStatus : fileStatuses) {
if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
.anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
} else if (Integer.parseInt(pathInfo.getPath().getName()) > lastBatchId
&& Integer.parseInt(pathInfo.getPath().getName()) <= nextBatchId) {
eligibleFiles.addAll(storage.listFiles(pathInfo.getPath()));
} else if (Integer.parseInt(fileStatus.getPath().getName()) > lastBatchId && Integer.parseInt(fileStatus.getPath()
.getName()) <= nextBatchId) {
RemoteIterator<LocatedFileStatus> files = fs.listFiles(fileStatus.getPath(), true);
while (files.hasNext()) {
eligibleFiles.add(files.next());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.streamer.HoodieStreamer;

import com.beust.jcommander.IValueValidator;
Expand Down Expand Up @@ -114,7 +113,7 @@ private boolean isUpsert() {
public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = HadoopFSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs.getConf(), new StoragePath(cfg.propsFilePath), cfg.configs).getProps(true);
: UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
LOG.info("Starting data import with configs : " + props.toString());
int ret = -1;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -139,7 +140,7 @@ public HoodieDataTableValidator(JavaSparkContext jsc, Config cfg) {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.table.HoodieSparkTable;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -139,7 +139,7 @@ public HoodieDropPartitionsTool(JavaSparkContext jsc, Config cfg) {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
Expand Down Expand Up @@ -106,7 +106,7 @@ public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
}

private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private String generateValidationTaskLabels() {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ private void printRepairInfo(
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;

import com.beust.jcommander.JCommander;
Expand Down Expand Up @@ -132,7 +131,7 @@ public TableSizeStats(JavaSparkContext jsc, Config cfg) {
* @return the {@link TypedProperties} instance.
*/
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@

import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;

/**
* Bunch of helper methods.
Expand Down Expand Up @@ -242,13 +243,14 @@ public static InitialCheckPointProvider createInitialCheckpointProvider(
}

public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig,
StoragePath cfgPath,
Path cfgPath,
List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
StoragePath storagePath = convertToStoragePath(cfgPath);
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, storagePath);
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))), cfgPath);
conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))), storagePath);
}
} catch (IOException ioe) {
throw new HoodieIOException("Unexpected error adding config overrides", ioe);
Expand All @@ -274,7 +276,7 @@ public static DFSPropertiesConfiguration getConfig(List<String> overriddenProps)
public static TypedProperties buildProperties(Configuration hadoopConf, String propsFilePath, List<String> props) {
return StringUtils.isNullOrEmpty(propsFilePath)
? UtilHelpers.buildProperties(props)
: UtilHelpers.readConfig(hadoopConf, new StoragePath(propsFilePath), props)
: UtilHelpers.readConfig(hadoopConf, new Path(propsFilePath), props)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.IdentitySplitter;
Expand Down Expand Up @@ -128,7 +127,7 @@ public void startServices() throws ExecutionException, InterruptedException {
}

private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new StoragePath(cfg.propsFilePath), cfg.configs).getProps(true);
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
}

private boolean pathExists(String path) {
Expand Down
Loading

0 comments on commit eba95af

Please sign in to comment.