Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ private static StorageProperties findStorageProperties(StorageProperties.Type ty
&& storagePropertiesMap.containsKey(StorageProperties.Type.MINIO)) {
return storagePropertiesMap.get(StorageProperties.Type.MINIO);
}
if (type == StorageProperties.Type.S3
&& storagePropertiesMap.containsKey(StorageProperties.Type.OZONE)) {
return storagePropertiesMap.get(StorageProperties.Type.OZONE);
}

// Step 3: Compatibility fallback based on schema
// In previous configurations, the schema name may not strictly match the actual storage type.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.storage;

import org.apache.doris.datasource.property.ConnectorProperty;

import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

public class OzoneProperties extends AbstractS3CompatibleProperties {

@Setter
@Getter
@ConnectorProperty(names = {"ozone.endpoint", "s3.endpoint"},
required = false,
description = "The endpoint of Ozone S3 Gateway.")
protected String endpoint = "";

@Setter
@Getter
@ConnectorProperty(names = {"ozone.region", "s3.region"},
required = false,
description = "The region of Ozone S3 Gateway.")
protected String region = "us-east-1";

@Getter
@ConnectorProperty(names = {"ozone.access_key", "s3.access_key", "s3.access-key-id"},
required = false,
sensitive = true,
description = "The access key of Ozone S3 Gateway.")
protected String accessKey = "";

@Getter
@ConnectorProperty(names = {"ozone.secret_key", "s3.secret_key", "s3.secret-access-key"},
required = false,
sensitive = true,
description = "The secret key of Ozone S3 Gateway.")
protected String secretKey = "";

@Getter
@ConnectorProperty(names = {"ozone.session_token", "s3.session_token", "s3.session-token"},
required = false,
sensitive = true,
description = "The session token of Ozone S3 Gateway.")
protected String sessionToken = "";

@Getter
@ConnectorProperty(names = {"ozone.connection.maximum", "s3.connection.maximum"},
required = false,
description = "Maximum number of connections.")
protected String maxConnections = "100";

@Getter
@ConnectorProperty(names = {"ozone.connection.request.timeout", "s3.connection.request.timeout"},
required = false,
description = "Request timeout in seconds.")
protected String requestTimeoutS = "10000";

@Getter
@ConnectorProperty(names = {"ozone.connection.timeout", "s3.connection.timeout"},
required = false,
description = "Connection timeout in seconds.")
protected String connectionTimeoutS = "10000";

@Setter
@Getter
@ConnectorProperty(names = {"ozone.use_path_style", "use_path_style", "s3.path-style-access"},
required = false,
description = "Whether to use path style URL for the storage.")
protected String usePathStyle = "true";

@Setter
@Getter
@ConnectorProperty(names = {"ozone.force_parsing_by_standard_uri", "force_parsing_by_standard_uri"},
required = false,
description = "Whether to use path style URL for the storage.")
protected String forceParsingByStandardUrl = "false";

protected OzoneProperties(Map<String, String> origProps) {
super(Type.OZONE, origProps);
}

@Override
public void initNormalizeAndCheckProps() {
hydrateFromOriginalProps();
super.initNormalizeAndCheckProps();
hydrateFromOriginalProps();
}

private void hydrateFromOriginalProps() {
endpoint = StringUtils.firstNonBlank(
endpoint,
origProps.get("ozone.endpoint"),
origProps.get("s3.endpoint"));
region = StringUtils.firstNonBlank(region, origProps.get("ozone.region"), origProps.get("s3.region"));
accessKey = StringUtils.firstNonBlank(
accessKey,
origProps.get("ozone.access_key"),
origProps.get("s3.access_key"),
origProps.get("s3.access-key-id"));
secretKey = StringUtils.firstNonBlank(
secretKey,
origProps.get("ozone.secret_key"),
origProps.get("s3.secret_key"),
origProps.get("s3.secret-access-key"));
sessionToken = StringUtils.firstNonBlank(sessionToken, origProps.get("ozone.session_token"),
origProps.get("s3.session_token"), origProps.get("s3.session-token"));
usePathStyle = StringUtils.firstNonBlank(usePathStyle, origProps.get("ozone.use_path_style"),
origProps.get("use_path_style"), origProps.get("s3.path-style-access"));
forceParsingByStandardUrl = StringUtils.firstNonBlank(forceParsingByStandardUrl,
origProps.get("ozone.force_parsing_by_standard_uri"),
origProps.get("force_parsing_by_standard_uri"));
}

@Override
protected Set<Pattern> endpointPatterns() {
return ImmutableSet.of(Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$"));
}

@Override
protected void setEndpointIfPossible() {
super.setEndpointIfPossible();
if (StringUtils.isBlank(getEndpoint())) {
throw new IllegalArgumentException("Property ozone.endpoint is required.");
}
}

@Override
protected Set<String> schemas() {
return ImmutableSet.of("s3", "s3a", "s3n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class StorageProperties extends ConnectionProperties {
public static final String FS_S3_SUPPORT = "fs.s3.support";
public static final String FS_GCS_SUPPORT = "fs.gcs.support";
public static final String FS_MINIO_SUPPORT = "fs.minio.support";
public static final String FS_OZONE_SUPPORT = "fs.ozone.support";
public static final String FS_BROKER_SUPPORT = "fs.broker.support";
public static final String FS_AZURE_SUPPORT = "fs.azure.support";
public static final String FS_OSS_SUPPORT = "fs.oss.support";
Expand All @@ -67,6 +68,7 @@ public enum Type {
GCS,
OSS_HDFS,
MINIO,
OZONE,
AZURE,
BROKER,
LOCAL,
Expand Down Expand Up @@ -203,7 +205,9 @@ public static StorageProperties createPrimary(Map<String, String> origProps) {
props -> (isFsSupport(props, FS_AZURE_SUPPORT)
|| AzureProperties.guessIsMe(props)) ? new AzureProperties(props) : null,
props -> (isFsSupport(props, FS_MINIO_SUPPORT)
|| MinioProperties.guessIsMe(props)) ? new MinioProperties(props) : null,
|| (!isFsSupport(props, FS_OZONE_SUPPORT)
&& MinioProperties.guessIsMe(props))) ? new MinioProperties(props) : null,
props -> isFsSupport(props, FS_OZONE_SUPPORT) ? new OzoneProperties(props) : null,
props -> (isFsSupport(props, FS_BROKER_SUPPORT)
|| BrokerProperties.guessIsMe(props)) ? new BrokerProperties(props) : null,
props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.storage;

import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class OzonePropertiesTest {
private Map<String, String> origProps;

@BeforeEach
public void setup() {
origProps = new HashMap<>();
}

@Test
public void testValidOzoneConfiguration() {
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
origProps.put("ozone.endpoint", "http://ozone-s3g:9878");
origProps.put("ozone.access_key", "hadoop");
origProps.put("ozone.secret_key", "hadoop");

OzoneProperties ozoneProperties = (OzoneProperties) StorageProperties.createPrimary(origProps);
Map<String, String> backendProps = ozoneProperties.getBackendConfigProperties();

Assertions.assertEquals(StorageProperties.Type.OZONE, ozoneProperties.getType());
Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getEndpoint());
Assertions.assertEquals("hadoop", ozoneProperties.getAccessKey());
Assertions.assertEquals("hadoop", ozoneProperties.getSecretKey());
Assertions.assertEquals("us-east-1", ozoneProperties.getRegion());
Assertions.assertEquals("true", ozoneProperties.getUsePathStyle());

Assertions.assertEquals("http://ozone-s3g:9878", backendProps.get("AWS_ENDPOINT"));
Assertions.assertEquals("hadoop", backendProps.get("AWS_ACCESS_KEY"));
Assertions.assertEquals("hadoop", backendProps.get("AWS_SECRET_KEY"));
Assertions.assertEquals("us-east-1", backendProps.get("AWS_REGION"));
Assertions.assertEquals("true", backendProps.get("use_path_style"));
}

@Test
public void testS3PropertiesBinding() {
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
origProps.put("s3.endpoint", "http://ozone-s3g:9878");
origProps.put("s3.access_key", "hadoop");
origProps.put("s3.secret_key", "hadoop");
origProps.put("use_path_style", "true");
origProps.put("s3.region", "us-east-1");

OzoneProperties ozoneProperties = (OzoneProperties) StorageProperties.createPrimary(origProps);
Map<String, String> backendProps = ozoneProperties.getBackendConfigProperties();

Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getEndpoint());
Assertions.assertEquals("hadoop", ozoneProperties.getAccessKey());
Assertions.assertEquals("hadoop", ozoneProperties.getSecretKey());
Assertions.assertEquals("true", ozoneProperties.getUsePathStyle());

Assertions.assertEquals("http://ozone-s3g:9878", backendProps.get("AWS_ENDPOINT"));
Assertions.assertEquals("hadoop", backendProps.get("AWS_ACCESS_KEY"));
Assertions.assertEquals("hadoop", backendProps.get("AWS_SECRET_KEY"));
}

@Test
public void testFsS3aPropertiesAreNotSupported() {
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
origProps.put("fs.s3a.endpoint", "http://ozone-s3g:9878");
origProps.put("fs.s3a.access.key", "hadoop");
origProps.put("fs.s3a.secret.key", "hadoop");

ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
"Property ozone.endpoint is required.",
() -> StorageProperties.createPrimary(origProps));
}

@Test
public void testCreateAllWithDefaultFs() throws UserException {
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
origProps.put("fs.defaultFS", "s3a://dn-data/");
origProps.put("s3.endpoint", "http://ozone-s3g:9878");
origProps.put("s3.access_key", "hadoop");
origProps.put("s3.secret_key", "hadoop");
origProps.put("use_path_style", "true");

List<StorageProperties> properties = StorageProperties.createAll(origProps);
Assertions.assertEquals(HdfsProperties.class, properties.get(0).getClass());
Assertions.assertEquals(OzoneProperties.class, properties.get(1).getClass());

Map<StorageProperties.Type, StorageProperties> propertiesMap = properties.stream()
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
LocationPath locationPath = LocationPath.of("s3a://dn-data/warehouse/test_table", propertiesMap);
Assertions.assertTrue(locationPath.getStorageProperties() instanceof OzoneProperties);
}

@Test
public void testCreateAllWithDefaultFsAndOzoneProperties() throws UserException {
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
origProps.put("fs.defaultFS", "s3a://dn-data/");
origProps.put("ozone.endpoint", "http://ozone-s3g:9878");
origProps.put("ozone.access_key", "hadoop");
origProps.put("ozone.secret_key", "hadoop");
origProps.put("ozone.use_path_style", "true");
origProps.put("ozone.region", "us-east-1");

List<StorageProperties> properties = StorageProperties.createAll(origProps);
Assertions.assertEquals(HdfsProperties.class, properties.get(0).getClass());
Assertions.assertEquals(OzoneProperties.class, properties.get(1).getClass());

OzoneProperties ozoneProperties = (OzoneProperties) properties.get(1);
Assertions.assertEquals("hadoop", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.access.key"));
Assertions.assertEquals("hadoop", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.secret.key"));
Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.endpoint"));
Assertions.assertEquals("us-east-1", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.endpoint.region"));
Assertions.assertEquals("true", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.path.style.access"));
}

@Test
public void testMissingAccessKeyOrSecretKey() {
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
origProps.put("ozone.endpoint", "http://ozone-s3g:9878");
origProps.put("ozone.access_key", "hadoop");
ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
"Both the access key and the secret key must be set.",
() -> StorageProperties.createPrimary(origProps));

origProps.remove("ozone.access_key");
origProps.put("ozone.secret_key", "hadoop");
ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
"Both the access key and the secret key must be set.",
() -> StorageProperties.createPrimary(origProps));
}

@Test
public void testMissingEndpoint() {
origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
origProps.put("ozone.access_key", "hadoop");
origProps.put("ozone.secret_key", "hadoop");
ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class,
"Property ozone.endpoint is required.",
() -> StorageProperties.createPrimary(origProps));
}

@Test
public void testRequireExplicitFsOzoneSupport() throws UserException {
origProps.put("ozone.endpoint", "http://127.0.0.1:9878");
origProps.put("ozone.access_key", "hadoop");
origProps.put("ozone.secret_key", "hadoop");

List<StorageProperties> propertiesWithoutFlag = StorageProperties.createAll(origProps);
Assertions.assertEquals(1, propertiesWithoutFlag.size());
Assertions.assertEquals(HdfsProperties.class, propertiesWithoutFlag.get(0).getClass());

origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
List<StorageProperties> propertiesWithFlag = StorageProperties.createAll(origProps);
Assertions.assertEquals(2, propertiesWithFlag.size());
Assertions.assertEquals(HdfsProperties.class, propertiesWithFlag.get(0).getClass());
Assertions.assertEquals(OzoneProperties.class, propertiesWithFlag.get(1).getClass());
}
}
Loading