Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
_init_status =
Status::InvalidArgument("fail to init MaxComputeTableDescriptor, missing quota.");
}

if (tdesc.mcTable.__isset.properties) [[likely]] {
_props = tdesc.mcTable.properties;
} else {
static const std::string MC_ACCESS_KEY = "mc.access_key";
static const std::string MC_SECRET_KEY = "mc.secret_key";
_props.insert({MC_ACCESS_KEY, _access_key});
_props.insert({MC_SECRET_KEY, _secret_key});
}
}

MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,20 @@ class MaxComputeTableDescriptor : public TableDescriptor {
std::string endpoint() const { return _endpoint; }
std::string quota() const { return _quota; }
Status init_status() const { return _init_status; }
std::map<std::string, std::string> properties() const { return _props; }

private:
std::string _region; //deprecated
std::string _project;
std::string _table;
std::string _odps_url; //deprecated
std::string _tunnel_url; //deprecated
std::string _access_key;
std::string _secret_key;
std::string _odps_url; //deprecated
std::string _tunnel_url; //deprecated
std::string _access_key; //deprecated
std::string _secret_key; //deprecated
std::string _public_access; //deprecated
std::string _endpoint;
std::string _quota;
std::map<std::string, std::string> _props;
Status _init_status = Status::OK();
};

Expand Down
34 changes: 17 additions & 17 deletions be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,27 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
}
index++;
}
std::map<String, String> params = {
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"endpoint", _table_desc->endpoint()},
{"quota", _table_desc->quota()},
{"project", _table_desc->project()},
{"table", _table_desc->table()},

{"session_id", _max_compute_params.session_id},
{"scan_serializer", _max_compute_params.table_batch_read_session},
auto properties = _table_desc->properties();
properties["endpoint"] = _table_desc->endpoint();
properties["quota"] = _table_desc->quota();
properties["project"] = _table_desc->project();
properties["table"] = _table_desc->table();

{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
properties["session_id"] = _max_compute_params.session_id;
properties["scan_serializer"] = _max_compute_params.table_batch_read_session;

properties["start_offset"] = std::to_string(_range.start_offset);
properties["split_size"] = std::to_string(_range.size);
properties["required_fields"] = required_fields.str();
properties["columns_types"] = columns_types.str();

properties["connect_timeout"] = std::to_string(_max_compute_params.connect_timeout);
properties["read_timeout"] = std::to_string(_max_compute_params.read_timeout);
properties["retry_count"] = std::to_string(_max_compute_params.retry_times);

{"connect_timeout", std::to_string(_max_compute_params.connect_timeout)},
{"read_timeout", std::to_string(_max_compute_params.read_timeout)},
{"retry_count", std::to_string(_max_compute_params.retry_times)}};
_jni_connector = std::make_unique<JniConnector>(
"org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names);
"org/apache/doris/maxcompute/MaxComputeJniScanner", properties, column_names);
}

Status MaxComputeJniReader::init_reader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.maxcompute.MCUtils;

import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.table.configuration.CompressionCodec;
import com.aliyun.odps.table.configuration.ReaderOptions;
import com.aliyun.odps.table.configuration.RestOptions;
Expand Down Expand Up @@ -120,8 +119,6 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
}
}

String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'.");
String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'.");
String endpoint = Objects.requireNonNull(params.get(ENDPOINT), "required property '" + ENDPOINT + "'.");
String quota = Objects.requireNonNull(params.get(QUOTA), "required property '" + QUOTA + "'.");
String scanSerializer = Objects.requireNonNull(params.get(SCAN_SERIALIZER),
Expand All @@ -137,10 +134,7 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
timeZone = ZoneId.systemDefault();
}


Account account = new AliyunAccount(accessKey, secretKey);
Odps odps = new Odps(account);

Odps odps = MCUtils.createMcClient(params);
odps.setDefaultProject(project);
odps.setEndpoint(endpoint);

Expand Down
14 changes: 14 additions & 0 deletions fe/fe-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ under the License.
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-iostreams</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
<exclusion>
<groupId>org.ini4j</groupId>
<artifactId>ini4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<finalName>doris-fe-common</finalName>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.constants;

import org.apache.doris.common.credentials.CloudCredential;

import java.util.Map;
package org.apache.doris.common.maxcompute;

/**
* properties for aliyun max compute
*/
public class MCProperties extends BaseProperties {
public class MCProperties {

//To be compatible with previous versions of the catalog.
public static final String REGION = "mc.region";
Expand Down Expand Up @@ -99,7 +95,12 @@ public class MCProperties extends BaseProperties {
public static final String ENABLE_NAMESPACE_SCHEMA = "mc.enable.namespace.schema";
public static final String DEFAULT_ENABLE_NAMESPACE_SCHEMA = "false";

public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
public static final String AUTH_TYPE = "mc.auth.type";
public static final String AUTH_TYPE_AK_SK = "ak_sk";
public static final String AUTH_TYPE_RAM_ROLE_ARN = "ram_role_arn";
public static final String AUTH_TYPE_ECS_RAM_ROLE = "ecs_ram_role";
public static final String DEFAULT_AUTH_TYPE = AUTH_TYPE_AK_SK;

public static final String RAM_ROLE_ARN = "mc.ram_role_arn";
public static final String ECS_RAM_ROLE = "mc.ecs_ram_role";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.maxcompute;

import com.aliyun.auth.credentials.Credential;
import com.aliyun.auth.credentials.provider.EcsRamRoleCredentialProvider;
import com.aliyun.auth.credentials.provider.RamRoleArnCredentialProvider;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AklessAccount;
import com.aliyun.odps.account.AliyunAccount;

import java.util.Map;

public class MCUtils {
public static void checkAuthProperties(Map<String, String> properties) {
String authType = properties.getOrDefault(MCProperties.AUTH_TYPE, MCProperties.DEFAULT_AUTH_TYPE);
if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_AK_SK)) {
if (!properties.containsKey(MCProperties.ACCESS_KEY) || !properties.containsKey(MCProperties.SECRET_KEY)) {
throw new RuntimeException("Missing access key or secret key for AK/SK auth type");
}
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_RAM_ROLE_ARN)) {
if (!properties.containsKey(MCProperties.ACCESS_KEY) || !properties.containsKey(MCProperties.SECRET_KEY)
|| !properties.containsKey(MCProperties.RAM_ROLE_ARN)) {
throw new RuntimeException("Missing access key, secret key or role arn for RAM Role ARN auth type");
}
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_ECS_RAM_ROLE)) {
if (!properties.containsKey(MCProperties.ECS_RAM_ROLE)) {
throw new RuntimeException("Missing role name for ECS RAM Role auth type");
}
} else {
throw new RuntimeException("Unsupported auth type: " + authType);
}
}

public static Odps createMcClient(Map<String, String> properties) {
String authType = properties.getOrDefault(MCProperties.AUTH_TYPE, MCProperties.DEFAULT_AUTH_TYPE);
if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_AK_SK)) {
String accessKey = properties.get(MCProperties.ACCESS_KEY);
String secretKey = properties.get(MCProperties.SECRET_KEY);
Account account = new AliyunAccount(accessKey, secretKey);
return new Odps(account);
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_RAM_ROLE_ARN)) {
String accessKey = properties.get(MCProperties.ACCESS_KEY);
String secretKey = properties.get(MCProperties.SECRET_KEY);
String roleArn = properties.get(MCProperties.RAM_ROLE_ARN);
RamRoleArnCredentialProvider ramRoleArnCredentialProvider =
RamRoleArnCredentialProvider.builder().credential(
Credential.builder().accessKeyId(accessKey)
.accessKeySecret(secretKey).build())
.roleArn(roleArn).build();
AklessAccount aklessAccount = new AklessAccount(ramRoleArnCredentialProvider);
return new Odps(aklessAccount);
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_ECS_RAM_ROLE)) {
String roleName = properties.get(MCProperties.ECS_RAM_ROLE);
EcsRamRoleCredentialProvider credentialProvider = EcsRamRoleCredentialProvider.create(roleName);
AklessAccount aklessAccount = new AklessAccount(credentialProvider);
return new Odps(aklessAccount);
} else {
throw new RuntimeException("Unsupported auth type: " + authType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.doris.common.util;

import org.apache.doris.common.maxcompute.MCProperties;
import org.apache.doris.datasource.property.ConnectorPropertiesUtils;
import org.apache.doris.datasource.property.constants.MCProperties;
import org.apache.doris.datasource.property.metastore.AWSGlueMetaStoreBaseProperties;
import org.apache.doris.datasource.property.metastore.AliyunDLFBaseProperties;
import org.apache.doris.datasource.property.storage.AzureProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@


import org.apache.doris.common.DdlException;
import org.apache.doris.common.credentials.CloudCredential;
import org.apache.doris.common.maxcompute.MCProperties;
import org.apache.doris.common.maxcompute.MCUtils;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.constants.MCProperties;

import com.aliyun.odps.Odps;
import com.aliyun.odps.Partition;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AccountFormat;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
Expand All @@ -54,9 +52,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
// you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints
private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";

private Map<String, String> props;
private Odps odps;
private String accessKey;
private String secretKey;
private String endpoint;
private String defaultProject;
private String quota;
Expand Down Expand Up @@ -158,7 +155,7 @@ protected void generatorEndpoint() {

@Override
protected void initLocalObjectsImpl() {
Map<String, String> props = catalogProperty.getProperties();
props = catalogProperty.getProperties();

generatorEndpoint();

Expand Down Expand Up @@ -198,16 +195,11 @@ protected void initLocalObjectsImpl() {
.withReadTimeout(readTimeout)
.withRetryTimes(retryTimes).build();

CloudCredential credential = MCProperties.getCredential(props);
accessKey = credential.getAccessKey();
secretKey = credential.getSecretKey();

dateTimePredicatePushDown = Boolean.parseBoolean(
props.getOrDefault(MCProperties.DATETIME_PREDICATE_PUSH_DOWN,
MCProperties.DEFAULT_DATETIME_PREDICATE_PUSH_DOWN));

Account account = new AliyunAccount(accessKey, secretKey);
this.odps = new Odps(account);
odps = MCUtils.createMcClient(props);
odps.setDefaultProject(defaultProject);
odps.setEndpoint(endpoint);

Expand Down Expand Up @@ -288,14 +280,9 @@ public List<String> listTableNames(SessionContext ctx, String dbName) {
return mcStructureHelper.listTableNames(getClient(), dbName);
}

public String getAccessKey() {
makeSureInitialized();
return accessKey;
}

public String getSecretKey() {
public Map<String, String> getProperties() {
makeSureInitialized();
return secretKey;
return props;
}

public String getEndpoint() {
Expand Down Expand Up @@ -449,10 +436,6 @@ public void checkProperties() throws DdlException {
+ MCProperties.READ_TIMEOUT + "/" + MCProperties.RETRY_COUNT + "must be an integer");
}

CloudCredential credential = MCProperties.getCredential(props);
if (!credential.isWhole()) {
throw new DdlException("Max-Compute credential properties '"
+ MCProperties.ACCESS_KEY + "' and '" + MCProperties.SECRET_KEY + "' are required.");
}
MCUtils.checkAuthProperties(props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,11 @@ public TTableDescriptor toThrift() {
TMCTable tMcTable = new TMCTable();
MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog);

tMcTable.setAccessKey(mcCatalog.getAccessKey());
tMcTable.setSecretKey(mcCatalog.getSecretKey());
tMcTable.setOdpsUrl("deprecated");
tMcTable.setRegion("deprecated");
tMcTable.setProperties(mcCatalog.getProperties());
tMcTable.setEndpoint(mcCatalog.getEndpoint());
// use mc project as dbName
tMcTable.setProject(dbName);
tMcTable.setQuota(mcCatalog.getQuota());

tMcTable.setTunnelUrl("deprecated");
tMcTable.setProject("deprecated");
tMcTable.setTable(name);
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MAX_COMPUTE_TABLE,
schema.size(), 0, getName(), dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.maxcompute.MCProperties;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
import org.apache.doris.datasource.property.constants.MCProperties;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.util.DateUtils;
import org.apache.doris.planner.PlanNodeId;
Expand Down
Loading