Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.property.storage.BrokerProperties;
import org.apache.doris.datasource.property.storage.S3Properties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import org.apache.doris.persist.gson.GsonUtils;
Expand All @@ -35,6 +36,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Broker descriptor
Expand Down Expand Up @@ -161,4 +163,47 @@ public void write(DataOutput out) throws IOException {
public static BrokerDesc read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), BrokerDesc.class);
}

/**
* Returns true when a 403 error should be retried with anonymous S3 credentials.
* This applies only when storage is S3, no explicit credentials were provided
* (access_key, secret_key, role_arn), and the error contains "Status Code: 403".
*/
public static boolean isS3AccessDeniedWithoutExplicitCredentials(StorageProperties storageProperties,
Exception e) {
if (!(storageProperties instanceof S3Properties)) {
return false;
}
S3Properties s3Props = (S3Properties) storageProperties;
if (StringUtils.isNotBlank(s3Props.getAccessKey()) || StringUtils.isNotBlank(s3Props.getSecretKey())) {
return false;
}
if (StringUtils.isNotBlank(s3Props.getS3IAMRole())) {
return false;
}
return e.getMessage() != null && e.getMessage().contains("Status Code: 403");
}

/**
* Returns a copy of this BrokerDesc with the credentials provider set to ANONYMOUS.
*/
public BrokerDesc withAnonymousCredentials() {
Map<String, String> newProps = new HashMap<>(this.properties);
newProps.put("s3.credentials_provider_type", "ANONYMOUS");
return new BrokerDesc(this.name, newProps);
}

public String toSql() {
StringBuilder sb = new StringBuilder();
if (storageType == StorageBackend.StorageType.BROKER) {
sb.append("WITH BROKER ").append(name);
} else {
sb.append("WITH ").append(storageType.name());
}
if (properties != null && !properties.isEmpty()) {
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false, true);
sb.append(" (").append(printableMap.toString()).append(")");
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,50 +91,33 @@ protected void getAllFileStatus() throws UserException {
fileStatusList.add(fileStatuses);
}
} else {
for (BrokerFileGroup fileGroup : fileGroups) {
long groupFileSize = 0;
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
for (String path : fileGroup.getFilePaths()) {
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
}
if (!fileStatuses.isEmpty()) {
fileGroup.initDeferredFileFormatPropertiesIfNecessary(fileStatuses);
boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat();
List<TBrokerFileStatus> filteredFileStatuses = Lists.newArrayList();
for (TBrokerFileStatus fstatus : fileStatuses) {
if (fstatus.getSize() == 0 && isBinaryFileFormat) {
// For parquet or orc file, if it is an empty file, ignore it.
// Because we can not read an empty parquet or orc file.
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("empty file", fstatus).build());
}
} else {
groupFileSize += fstatus.size;
filteredFileStatuses.add(fstatus);
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("file_status", fstatus).build());
}
}
try {
parseFileGroups(fileGroups, fileStatusList, entry, groupNum);
} catch (UserException e) {
if (BrokerDesc.isS3AccessDeniedWithoutExplicitCredentials(
brokerDesc.getStorageProperties(), e)) {
LOG.info("S3 returned 403 with no explicit credentials for job {}."
+ " Retrying with anonymous access.", callback.getCallbackId());
BrokerDesc anonymousBrokerDesc = brokerDesc.withAnonymousCredentials();
this.brokerDesc = anonymousBrokerDesc;
((BulkLoadJob) callback).brokerDesc = anonymousBrokerDesc;
fileStatusList.clear();
try {
parseFileGroups(fileGroups, fileStatusList, entry, 0);
} catch (UserException retryException) {
LOG.warn("Anonymous credential retry also failed for job {}.",
callback.getCallbackId(), retryException);
throw e;
}
fileStatusList.add(filteredFileStatuses);
tableTotalFileSize += groupFileSize;
tableTotalFileNum += filteredFileStatuses.size();
LOG.info("get {} files in file group {} for table {}. size: {}. job: {}, broker: {} ",
filteredFileStatuses.size(), groupNum, entry.getKey(), groupFileSize,
callback.getCallbackId(),
brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER
? BrokerUtil.getAddress(brokerDesc) : brokerDesc.getStorageType());
} else {
LOG.info("no file found in file group {} for table {}, job: {}, broker: {}",
groupNum, entry.getKey(), callback.getCallbackId(),
brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER
? BrokerUtil.getAddress(brokerDesc) : brokerDesc.getStorageType());
throw new UserException("No source files found in the specified paths: "
+ fileGroup.getFilePaths());
throw e;
}
}
for (List<TBrokerFileStatus> statuses : fileStatusList) {
for (TBrokerFileStatus fstatus : statuses) {
tableTotalFileSize += fstatus.size;
}
groupNum++;
tableTotalFileNum += statuses.size();
}
}

Expand All @@ -148,4 +131,53 @@ protected void getAllFileStatus() throws UserException {

((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize);
}

private void parseFileGroups(List<BrokerFileGroup> fileGroups,
List<List<TBrokerFileStatus>> fileStatusList,
Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry,
int startGroupNum) throws UserException {
int groupNum = startGroupNum;
for (BrokerFileGroup fileGroup : fileGroups) {
long groupFileSize = 0;
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
for (String path : fileGroup.getFilePaths()) {
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
}
if (!fileStatuses.isEmpty()) {
fileGroup.initDeferredFileFormatPropertiesIfNecessary(fileStatuses);
boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat();
List<TBrokerFileStatus> filteredFileStatuses = Lists.newArrayList();
for (TBrokerFileStatus fstatus : fileStatuses) {
if (fstatus.getSize() == 0 && isBinaryFileFormat) {
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("empty file", fstatus).build());
}
} else {
groupFileSize += fstatus.size;
filteredFileStatuses.add(fstatus);
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("file_status", fstatus).build());
}
}
}
fileStatusList.add(filteredFileStatuses);
LOG.info("get {} files in file group {} for table {}. size: {}. job: {}, broker: {} ",
filteredFileStatuses.size(), groupNum, entry.getKey(), groupFileSize,
callback.getCallbackId(),
brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER
? BrokerUtil.getAddress(brokerDesc) : brokerDesc.getStorageType());
} else {
LOG.info("no file found in file group {} for table {}, job: {}, broker: {}",
groupNum, entry.getKey(), callback.getCallbackId(),
brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER
? BrokerUtil.getAddress(brokerDesc) : brokerDesc.getStorageType());
throw new UserException("No source files found in the specified paths: "
+ fileGroup.getFilePaths());
}
groupNum++;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,48 @@ public S3TableValuedFunction(Map<String, String> properties) throws AnalysisExce
// Fixme wait to be done #50320
// FileSystemFactory.get(storageProperties);
} else {
parseFile();
try {
parseFile();
} catch (AnalysisException e) {
if (BrokerDesc.isS3AccessDeniedWithoutExplicitCredentials(storageProperties, e)) {
LOG.info("S3 TVF got 403 with no explicit credentials, retrying with anonymous access");
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code uses LOG (at lines 67 and 71) which is inherited from the parent class ExternalFileTableValuedFunction. This means log messages will be associated with the parent class name rather than S3TableValuedFunction. For consistency with other table-valued functions in the codebase (e.g., HdfsTableValuedFunction defines its own LOG at line 38), consider adding a static LOG field specific to this class.

Copilot uses AI. Check for mistakes.
try {
retryWithAnonymousCredentials(props);
} catch (Exception retryException) {
LOG.warn("S3 TVF anonymous access retry also failed: {}",
retryException.getMessage());
throw e;
}
} else {
throw e;
}
}
}
}

/**
* Switches all property maps to use anonymous credentials and retries parseFile().
*/
private void retryWithAnonymousCredentials(Map<String, String> props) throws AnalysisException {
props.put("s3.credentials_provider_type", "ANONYMOUS");

try {
this.storageProperties = StorageProperties.createPrimary(props);
} catch (Exception e) {
throw new AnalysisException("Failed to create anonymous storage properties: " + e.getMessage(), e);
}

this.backendConnectProperties.clear();
this.backendConnectProperties.putAll(storageProperties.getBackendConfigProperties());
this.backendConnectProperties.put(URI_KEY, filePath);

this.processedParams.put("s3.credentials_provider_type", "ANONYMOUS");

this.fileStatuses.clear();

parseFile();
}


// =========== implement abstract methods of ExternalFileTableValuedFunction =================
@Override
Expand Down
Loading