Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1470,8 +1470,6 @@ DEFINE_Int64(wait_cancel_release_memory_ms, "5000");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

DEFINE_mBool(force_azure_blob_global_endpoint, "false");

DEFINE_mInt32(max_s3_client_retry, "10");
DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
DEFINE_mInt32(s3_read_max_wait_time_ms, "800");
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1563,8 +1563,6 @@ DECLARE_mBool(check_segment_when_build_rowset_meta);

DECLARE_Int32(num_query_ctx_map_partitions);

DECLARE_mBool(force_azure_blob_global_endpoint);

DECLARE_mBool(enable_s3_rate_limiter);
DECLARE_mInt64(s3_get_bucket_tokens);
DECLARE_mInt64(s3_get_token_per_second);
Expand Down
3 changes: 0 additions & 3 deletions be/src/io/fs/azure_obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,6 @@ std::string AzureObjStorageClient::generate_presigned_url(const ObjectStoragePat
Azure::Storage::StorageSharedKeyCredential(conf.ak, conf.sk));

std::string endpoint = conf.endpoint;
if (doris::config::force_azure_blob_global_endpoint) {
endpoint = fmt::format("https://{}.blob.core.windows.net", conf.ak);
}
auto sasURL = fmt::format(SAS_TOKEN_URL_TEMPLATE, endpoint, conf.bucket, opts.key, sasToken);
if (sasURL.find("://") == std::string::npos) {
sasURL = "https://" + sasURL;
Expand Down
11 changes: 3 additions & 8 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,9 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client(
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(s3_conf.ak, s3_conf.sk);

const std::string container_name = s3_conf.bucket;
std::string uri;
if (config::force_azure_blob_global_endpoint) {
uri = fmt::format("https://{}.blob.core.windows.net/{}", s3_conf.ak, container_name);
} else {
uri = fmt::format("{}/{}", s3_conf.endpoint, container_name);
if (s3_conf.endpoint.find("://") == std::string::npos) {
uri = "https://" + uri;
}
std::string uri = fmt::format("{}/{}", s3_conf.endpoint, container_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strip the trailing / of the URI to increase robustness

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If necessary, this should be placed in normalized_uri.

if (s3_conf.endpoint.find("://") == std::string::npos) {
uri = "https://" + uri;
}

Azure::Storage::Blobs::BlobClientOptions options;
Expand Down
2 changes: 0 additions & 2 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ CONF_Validator(s3_client_http_scheme, [](const std::string& config) -> bool {
return config == "http" || config == "https";
});

CONF_Bool(force_azure_blob_global_endpoint, "false");

// Max retry times for object storage request
CONF_mInt64(max_s3_client_retry, "10");

Expand Down
10 changes: 3 additions & 7 deletions cloud/src/recycler/s3_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,9 @@ int S3Accessor::init() {
options.Retry.MaxRetries = config::max_s3_client_retry;
auto cred =
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(conf_.ak, conf_.sk);
if (config::force_azure_blob_global_endpoint) {
uri_ = fmt::format("https://{}.blob.core.windows.net/{}", conf_.ak, conf_.bucket);
} else {
uri_ = fmt::format("{}/{}", conf_.endpoint, conf_.bucket);
if (uri_.find("://") == std::string::npos) {
uri_ = "https://" + uri_;
}
uri_ = fmt::format("{}/{}", conf_.endpoint, conf_.bucket);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if (uri_.find("://") == std::string::npos) {
uri_ = "https://" + uri_;
}
uri_ = normalize_http_uri(uri_);
// In Azure's HTTP requests, all policies in the vector are called in a chained manner following the HTTP pipeline approach.
Expand Down
26 changes: 20 additions & 6 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -3544,13 +3544,27 @@ public static int metaServiceRpcRetryTimes() {
public static int s3_head_request_max_paths = 100;

@ConfField(mutable = true, description = {
"此参数控制是否强制使用 Azure global endpoint。默认值为 false,系统将使用用户指定的 endpoint。"
+ "如果设置为 true,系统将强制使用 {account}.blob.core.windows.net。",
"This parameter controls whether to force the use of the Azure global endpoint. "
+ "The default is false, meaning the system will use the user-specified endpoint. "
+ "If set to true, the system will force the use of {account}.blob.core.windows.net."
"指定 Azure endpoint 域名后缀白名单(包含 blob 与 dfs),多个值使用逗号分隔。"
+ "默认值为 .blob.core.windows.net,.dfs.core.windows.net,"
+ ".blob.core.chinacloudapi.cn,.dfs.core.chinacloudapi.cn,"
+ ".blob.core.usgovcloudapi.net,.dfs.core.usgovcloudapi.net,"
+ ".blob.core.cloudapi.de,.dfs.core.cloudapi.de。",
"The host suffix whitelist for Azure endpoints (both blob and dfs), separated by commas. "
+ "The default value is .blob.core.windows.net,.dfs.core.windows.net,"
+ ".blob.core.chinacloudapi.cn,.dfs.core.chinacloudapi.cn,"
+ ".blob.core.usgovcloudapi.net,.dfs.core.usgovcloudapi.net,"
+ ".blob.core.cloudapi.de,.dfs.core.cloudapi.de."
})
public static boolean force_azure_blob_global_endpoint = false;
public static String[] azure_blob_host_suffixes = {
".blob.core.windows.net",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to keep both
.dfs and .blob?

why not sth like just core.winodows.com?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required by the HDFS style guidelines.

".dfs.core.windows.net",
".blob.core.chinacloudapi.cn",
".dfs.core.chinacloudapi.cn",
".blob.core.usgovcloudapi.net",
".dfs.core.usgovcloudapi.net",
".blob.core.cloudapi.de",
".dfs.core.cloudapi.de"
};

@ConfField(mutable = true, description = {"指定 Jdbc driver url 白名单,举例:jdbc_driver_url_white_list=a,b,c",
"the white list for jdbc driver url, if it is empty, no white list will be set"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.conf.Configuration;

import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -64,6 +66,7 @@
public class AzureProperties extends StorageProperties {
@Getter
@ConnectorProperty(names = {"azure.endpoint", "s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT"},
required = false,
description = "The endpoint of S3.")
protected String endpoint = "";

Expand Down Expand Up @@ -135,8 +138,6 @@ public AzureProperties(Map<String, String> origProps) {
super(Type.AZURE, origProps);
}

private static final String AZURE_ENDPOINT_SUFFIX = ".blob.core.windows.net";

@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
Expand All @@ -160,7 +161,7 @@ public static boolean guessIsMe(Map<String, String> origProps) {
.findFirst()
.orElse(null);
if (!Strings.isNullOrEmpty(value)) {
return value.endsWith(AZURE_ENDPOINT_SUFFIX);
return AzurePropertyUtils.isAzureBlobEndpoint(value);
}
return false;
}
Expand Down Expand Up @@ -191,9 +192,12 @@ public Map<String, String> getBackendConfigProperties() {

public static final String AZURE_ENDPOINT_TEMPLATE = "https://%s.blob.core.windows.net";

public static String formatAzureEndpoint(String endpoint, String accessKey) {
if (Config.force_azure_blob_global_endpoint) {
return String.format(AZURE_ENDPOINT_TEMPLATE, accessKey);
public static String formatAzureEndpoint(String endpoint, String accountName) {
if (Strings.isNullOrEmpty(endpoint)) {
if (Strings.isNullOrEmpty(accountName)) {
return "";
}
return String.format(AZURE_ENDPOINT_TEMPLATE, accountName);
}
if (endpoint.contains("://")) {
return endpoint;
Expand Down Expand Up @@ -243,13 +247,24 @@ protected Set<String> schemas() {
}

private static void setHDFSAzureAccountKeys(Configuration conf, String accountName, String accountKey) {
String[] endpoints = {
"dfs.core.windows.net",
"blob.core.windows.net"
};
Set<String> endpoints = new LinkedHashSet<>();
if (Config.azure_blob_host_suffixes != null) {
for (String endpointSuffix : Config.azure_blob_host_suffixes) {
if (Strings.isNullOrEmpty(endpointSuffix)) {
continue;
}
String normalizedEndpoint = endpointSuffix.trim().toLowerCase(Locale.ROOT);
if (normalizedEndpoint.startsWith(".")) {
normalizedEndpoint = normalizedEndpoint.substring(1);
}
if (!normalizedEndpoint.isEmpty()) {
endpoints.add(normalizedEndpoint);
}
}
}
for (String endpoint : endpoints) {
String key = String.format("fs.azure.account.key.%s.%s", accountName, endpoint);
conf.set(key, accountKey);
String accountKeyConfig = String.format("fs.azure.account.key.%s.%s", accountName, endpoint);
conf.set(accountKeyConfig, accountKey);
}
conf.set("fs.azure.account.key", accountKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@

package org.apache.doris.datasource.property.storage;

import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;

import org.apache.commons.lang3.StringUtils;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Pattern;

public class AzurePropertyUtils {

/**
* Validates and normalizes an Azure Blob Storage URI into a unified {@code s3://}-style format.
* <p>
Expand Down Expand Up @@ -76,6 +77,61 @@ public static String validateAndNormalizeUri(String path) throws UserException {
private static final Pattern ONELAKE_PATTERN = Pattern.compile(
"abfs[s]?://([^@]+)@([^/]+)\\.dfs\\.fabric\\.microsoft\\.com(/.*)?", Pattern.CASE_INSENSITIVE);

public static boolean isAzureBlobEndpoint(String endpointOrHost) {
String host = extractHost(endpointOrHost);
if (StringUtils.isBlank(host)) {
return false;
}
String normalizedHost = host.toLowerCase(Locale.ROOT);
return matchesAnySuffix(normalizedHost, Config.azure_blob_host_suffixes);
}

private static boolean matchesAnySuffix(String normalizedHost, String[] suffixes) {
if (suffixes == null || suffixes.length == 0) {
return false;
}
for (String suffix : suffixes) {
if (matchesSuffix(normalizedHost, suffix)) {
return true;
}
}
return false;
}

private static boolean matchesSuffix(String normalizedHost, String suffix) {
if (StringUtils.isBlank(suffix)) {
return false;
}
String normalizedSuffix = suffix.trim().toLowerCase(Locale.ROOT);
if (!normalizedSuffix.startsWith(".")) {
normalizedSuffix = "." + normalizedSuffix;
}
return normalizedHost.endsWith(normalizedSuffix);
}

private static String extractHost(String endpointOrHost) {
if (StringUtils.isBlank(endpointOrHost)) {
return null;
}
String normalized = endpointOrHost.trim();
if (normalized.contains("://")) {
try {
return new URI(normalized).getHost();
} catch (URISyntaxException e) {
return null;
}
}
int slashIndex = normalized.indexOf('/');
if (slashIndex >= 0) {
normalized = normalized.substring(0, slashIndex);
}
int colonIndex = normalized.indexOf(':');
if (colonIndex >= 0) {
normalized = normalized.substring(0, colonIndex);
}
return normalized;
}


/**
* Converts an Azure Blob Storage URI into a unified {@code s3://<container>/<path>} format.
Expand Down Expand Up @@ -137,11 +193,6 @@ private static String convertToS3Style(String uri) {
throw new StoragePropertiesException("Invalid Azure HTTPS URI, missing host: " + uri);
}

// Typical Azure Blob domain: <account>.blob.core.windows.net
if (!host.contains(".blob.core.windows.net")) {
throw new StoragePropertiesException("Not an Azure Blob URL: " + uri);
}

// Path usually looks like: /<container>/<path>
String[] parts = path.split("/", 3);
if (parts.length < 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.property.storage;

import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;

Expand Down Expand Up @@ -213,4 +214,81 @@ public void testOneLake() throws UserException {
Assertions.assertEquals("https://login.microsoftonline.com/72f988bf-5289-5289-5289-2d7cd011db47/oauth2/token", hadoopStorageConfig.get("fs.azure.account.oauth2.client.endpoint.onelake.dfs.fabric.microsoft.com"));

}

@Test
public void testGuessIsMeChinaEndpoint() {
origProps.put("s3.endpoint", "https://mystorageaccount.blob.core.chinacloudapi.cn");
Assertions.assertTrue(AzureProperties.guessIsMe(origProps));
origProps.put("s3.endpoint", "mystorageaccount.blob.core.chinacloudapi.cn");
Assertions.assertTrue(AzureProperties.guessIsMe(origProps));
}

@Test
public void testGuessIsMeByProviderWhenEndpointIsUnknown() {
origProps.put("s3.endpoint", "https://mystorageaccount.invalid.test");
Assertions.assertFalse(AzureProperties.guessIsMe(origProps));
origProps.put("provider", "azure");
Assertions.assertTrue(AzureProperties.guessIsMe(origProps));
}

@Test
public void testFormatAzureEndpointUsesInputOrDefault() {
Assertions.assertEquals("https://mystorageaccount.blob.core.chinacloudapi.cn",
AzureProperties.formatAzureEndpoint("mystorageaccount.blob.core.chinacloudapi.cn",
"mystorageaccount"));
Assertions.assertEquals("https://mystorageaccount.blob.core.chinacloudapi.cn",
AzureProperties.formatAzureEndpoint("https://mystorageaccount.blob.core.chinacloudapi.cn",
"mystorageaccount"));
Assertions.assertEquals("https://mystorageaccount.blob.core.windows.net",
AzureProperties.formatAzureEndpoint("", "mystorageaccount"));
Assertions.assertEquals("", AzureProperties.formatAzureEndpoint("", ""));
}

@Test
public void testDefaultEndpointWhenEndpointNotSet() throws UserException {
origProps.put("s3.access_key", "myAzureAccessKey");
origProps.put("s3.secret_key", "myAzureSecretKey");
origProps.put("provider", "azure");

AzureProperties azureProperties = (AzureProperties) StorageProperties.createPrimary(origProps);
Assertions.assertEquals("https://myAzureAccessKey.blob.core.windows.net", azureProperties.getEndpoint());
}

@Test
public void testHadoopStorageConfigContainsChinaCloudAccountKeys() throws UserException {
origProps.put("s3.endpoint", "https://mystorageaccount.blob.core.chinacloudapi.cn");
origProps.put("s3.access_key", "myAzureAccessKey");
origProps.put("s3.secret_key", "myAzureSecretKey");
origProps.put("provider", "azure");

AzureProperties azureProperties = (AzureProperties) StorageProperties.createPrimary(origProps);
Configuration hadoopStorageConfig = azureProperties.getHadoopStorageConfig();
Assertions.assertEquals("myAzureSecretKey",
hadoopStorageConfig.get("fs.azure.account.key.myAzureAccessKey.blob.core.chinacloudapi.cn"));
Assertions.assertEquals("myAzureSecretKey",
hadoopStorageConfig.get("fs.azure.account.key.myAzureAccessKey.dfs.core.chinacloudapi.cn"));
}

@Test
public void testHadoopStorageConfigContainsCustomAccountKeyEndpointsFromConfig() throws UserException {
String[] originalAzureBlobHostSuffixes = Config.azure_blob_host_suffixes;
try {
Config.azure_blob_host_suffixes = new String[] {"blob.custom.test", ".dfs.custom.test", " "};
origProps.put("s3.endpoint", "https://mystorageaccount.blob.custom.test");
origProps.put("s3.access_key", "myAzureAccessKey");
origProps.put("s3.secret_key", "myAzureSecretKey");
origProps.put("provider", "azure");

AzureProperties azureProperties = (AzureProperties) StorageProperties.createPrimary(origProps);
Configuration hadoopStorageConfig = azureProperties.getHadoopStorageConfig();
Assertions.assertEquals("myAzureSecretKey",
hadoopStorageConfig.get("fs.azure.account.key.myAzureAccessKey.blob.custom.test"));
Assertions.assertEquals("myAzureSecretKey",
hadoopStorageConfig.get("fs.azure.account.key.myAzureAccessKey.dfs.custom.test"));
Assertions.assertNull(
hadoopStorageConfig.get("fs.azure.account.key.myAzureAccessKey.blob.core.windows.net"));
} finally {
Config.azure_blob_host_suffixes = originalAzureBlobHostSuffixes;
}
}
}
Loading
Loading