Skip to content

Commit 6980200

Browse files
authored
[Feature](tvf) Support using tvf to read sequence_file/rc_file in local/hdfs/s3 (#41080)
## Proposed changes Issue Number: #30669 <!--Describe your changes.--> This change supports reading the contents of external file tables from rcbinary, rctext, and sequence files via the JNI connector. todo-lists: - [x] Support read rc_binary files using local tvf - [x] Support read rc_text/sequence files using local tvf - [x] Support using s3/hdfs tvf Example: **sequence file:** input: ``` mysql select * from local( "file_path" = "test/test.seq", "format" = "sequence", "backend_id" = "10011", "hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:map<string,int>;k16:struct<name:string,age:int>"); ``` output: ``` +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9 | k10 | k11 | k12 | k13 | k14 | k15 | k16 | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | 7 | 13 | 74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char | Varchar | 1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | {"key2":2, "key1":1} | {"name":"John", "age":30} | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ 1 row in set (0.07 sec) ``` **rc_binary file:** input: ```mysql select * from local( "file_path" = "test/test.rcbinary", "format" = "rc_binary", "backend_id" = "10011", "hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:m ap<string,int>;k16:struct<name:string,age:int>"); ``` output: ``` +------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+ | k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9 | k10 | k11 | k12 | k13 | k14 | k15 | k16 | +------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+ | 1 | 2 | 3 | 10000000000 | 1.23 | 3.14 | 100.50 | you | are | beautiful | 0 | 2023-10-29 02:00:00 | 2023-10-29 | ["D", "E", "F"] | {"k2":5, "k1":3} | {"name":"chandler", "age":54} | +------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+ 1 row in set (0.12 sec) ``` **rc_text file:** input: ``` mysql select * from local( "file_path" = "test/test.rctext", "format" = "rc_text", "backend_id" = "10011", "hive_schema"="k1:tiny int;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15: map<string,int>;k16:struct<name:string,age:int>"); ``` output: ``` +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9 | k10 | k11 | k12 | k13 | k14 | k15 | k16 | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | 7 | 13 | 74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char | Varchar | 1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | {"key2":2, "key1":1} | {"name":"John", "age":30} | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ 1 row in set (0.06 sec) ```
1 parent b7516c9 commit 6980200

File tree

19 files changed

+1413
-55
lines changed

19 files changed

+1413
-55
lines changed

Diff for: be/src/vec/exec/format/hive/hive_jni_reader.cpp

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "hive_jni_reader.h"
19+
20+
#include <map>
21+
#include <ostream>
22+
23+
#include "common/logging.h"
24+
#include "runtime/descriptors.h"
25+
#include "runtime/types.h"
26+
27+
namespace doris::vectorized {
28+
29+
HiveJNIReader::HiveJNIReader(RuntimeState* state, RuntimeProfile* profile,
30+
const TFileScanRangeParams& params,
31+
const std::vector<SlotDescriptor*>& file_slot_descs,
32+
const TFileRangeDesc& range)
33+
: JniReader(file_slot_descs, state, profile), _params(params), _range(range) {}
34+
35+
HiveJNIReader::~HiveJNIReader() = default;
36+
37+
TFileType::type HiveJNIReader::get_file_type() {
38+
TFileType::type type;
39+
if (_range.__isset.file_type) {
40+
type = _range.file_type;
41+
} else {
42+
type = _params.file_type;
43+
}
44+
return type;
45+
}
46+
47+
Status HiveJNIReader::init_fetch_table_reader(
48+
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
49+
_colname_to_value_range = colname_to_value_range;
50+
std::ostringstream required_fields;
51+
std::ostringstream columns_types;
52+
std::vector<std::string> column_names;
53+
int index = 0;
54+
for (auto& desc : _file_slot_descs) {
55+
std::string field = desc->col_name();
56+
column_names.emplace_back(field);
57+
std::string type = JniConnector::get_jni_type_v2(desc->type());
58+
if (index == 0) {
59+
required_fields << field;
60+
columns_types << type;
61+
} else {
62+
required_fields << "," << field;
63+
columns_types << "#" << type;
64+
}
65+
index++;
66+
}
67+
68+
TFileType::type type = get_file_type();
69+
std::map<String, String> required_params = {
70+
{"uri", _range.path},
71+
{"file_type", std::to_string(type)},
72+
{"file_format", std::to_string(_params.format_type)},
73+
{"required_fields", required_fields.str()},
74+
{"columns_types", columns_types.str()},
75+
{"split_start_offset", std::to_string(_range.start_offset)},
76+
{"split_size", std::to_string(_range.size)}};
77+
if (type == TFileType::FILE_S3) {
78+
required_params.insert(_params.properties.begin(), _params.properties.end());
79+
}
80+
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/hive/HiveJNIScanner",
81+
required_params, column_names);
82+
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
83+
return _jni_connector->open(_state, _profile);
84+
}
85+
86+
Status HiveJNIReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
87+
RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
88+
if (*eof) {
89+
RETURN_IF_ERROR(_jni_connector->close());
90+
}
91+
return Status::OK();
92+
}
93+
94+
Status HiveJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
95+
std::unordered_set<std::string>* missing_cols) {
96+
for (auto& desc : _file_slot_descs) {
97+
name_to_type->emplace(desc->col_name(), desc->type());
98+
}
99+
return Status::OK();
100+
}
101+
102+
} // namespace doris::vectorized

Diff for: be/src/vec/exec/format/hive/hive_jni_reader.h

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <rapidjson/document.h>
21+
#include <stddef.h>
22+
23+
#include <memory>
24+
#include <string>
25+
#include <unordered_map>
26+
#include <unordered_set>
27+
#include <vector>
28+
29+
#include "common/status.h"
30+
#include "exec/olap_common.h"
31+
#include "vec/exec/format/jni_reader.h"
32+
33+
namespace doris {
34+
35+
class RuntimeProfile;
36+
37+
class RuntimeState;
38+
39+
class SlotDescriptor;
40+
41+
namespace vectoried {
42+
43+
class Block;
44+
45+
} // namespace vectoried
46+
struct TypeDescriptor;
47+
} // namespace doris
48+
49+
namespace doris::vectorized {
50+
51+
/**
52+
* Read hive-format file: rcbinary, rctext, sequencefile
53+
*/
54+
class HiveJNIReader : public JniReader {
55+
ENABLE_FACTORY_CREATOR(HiveJNIReader);
56+
57+
public:
58+
/**
59+
* Call java side by jni to get table data
60+
*/
61+
HiveJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params,
62+
const std::vector<SlotDescriptor*>& file_slot_descs, const TFileRangeDesc& range);
63+
64+
~HiveJNIReader() override;
65+
66+
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
67+
68+
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
69+
std::unordered_set<std::string>* missing_cols) override;
70+
71+
Status init_fetch_table_reader(
72+
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
73+
74+
TFileType::type get_file_type();
75+
76+
private:
77+
const TFileScanRangeParams _params;
78+
const TFileRangeDesc _range;
79+
std::string _column_names;
80+
std::string _column_types;
81+
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
82+
};
83+
84+
} // namespace doris::vectorized

Diff for: be/src/vec/exec/jni_connector.cpp

+78
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,84 @@ std::string JniConnector::get_jni_type(const DataTypePtr& data_type) {
519519
}
520520
}
521521

522+
std::string JniConnector::get_jni_type_v2(const TypeDescriptor& desc) {
523+
std::ostringstream buffer;
524+
switch (desc.type) {
525+
case TYPE_BOOLEAN:
526+
return "boolean";
527+
case TYPE_TINYINT:
528+
return "tinyint";
529+
case TYPE_SMALLINT:
530+
return "smallint";
531+
case TYPE_INT:
532+
return "int";
533+
case TYPE_BIGINT:
534+
return "bigint";
535+
case TYPE_LARGEINT:
536+
return "largeint";
537+
case TYPE_FLOAT:
538+
return "float";
539+
case TYPE_DOUBLE:
540+
return "double";
541+
case TYPE_VARCHAR: {
542+
buffer << "varchar(" << desc.len << ")";
543+
return buffer.str();
544+
}
545+
case TYPE_DATE:
546+
[[fallthrough]];
547+
case TYPE_DATEV2:
548+
return "date";
549+
case TYPE_DATETIME:
550+
[[fallthrough]];
551+
case TYPE_TIME:
552+
[[fallthrough]];
553+
case TYPE_DATETIMEV2:
554+
[[fallthrough]];
555+
case TYPE_TIMEV2:
556+
return "timestamp";
557+
case TYPE_BINARY:
558+
return "binary";
559+
case TYPE_CHAR: {
560+
buffer << "char(" << desc.len << ")";
561+
return buffer.str();
562+
}
563+
case TYPE_STRING:
564+
return "string";
565+
case TYPE_DECIMALV2:
566+
[[fallthrough]];
567+
case TYPE_DECIMAL32:
568+
[[fallthrough]];
569+
case TYPE_DECIMAL64:
570+
[[fallthrough]];
571+
case TYPE_DECIMAL128I: {
572+
buffer << "decimal(" << desc.precision << "," << desc.scale << ")";
573+
return buffer.str();
574+
}
575+
case TYPE_STRUCT: {
576+
buffer << "struct<";
577+
for (int i = 0; i < desc.children.size(); ++i) {
578+
if (i != 0) {
579+
buffer << ",";
580+
}
581+
buffer << desc.field_names[i] << ":" << get_jni_type(desc.children[i]);
582+
}
583+
buffer << ">";
584+
return buffer.str();
585+
}
586+
case TYPE_ARRAY: {
587+
buffer << "array<" << get_jni_type(desc.children[0]) << ">";
588+
return buffer.str();
589+
}
590+
case TYPE_MAP: {
591+
buffer << "map<" << get_jni_type(desc.children[0]) << "," << get_jni_type(desc.children[1])
592+
<< ">";
593+
return buffer.str();
594+
}
595+
default:
596+
return "unsupported";
597+
}
598+
}
599+
522600
std::string JniConnector::get_jni_type(const TypeDescriptor& desc) {
523601
std::ostringstream buffer;
524602
switch (desc.type) {

Diff for: be/src/vec/exec/jni_connector.h

+2
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ class JniConnector : public ProfileCollector {
261261
/**
262262
* Map PrimitiveType to hive type.
263263
*/
264+
static std::string get_jni_type_v2(const TypeDescriptor& desc);
265+
264266
static std::string get_jni_type(const TypeDescriptor& desc);
265267

266268
static Status to_java_table(Block* block, size_t num_rows, const ColumnNumbers& arguments,

Diff for: be/src/vec/exec/scan/vfile_scanner.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include "vec/exec/format/arrow/arrow_stream_reader.h"
5757
#include "vec/exec/format/avro/avro_jni_reader.h"
5858
#include "vec/exec/format/csv/csv_reader.h"
59+
#include "vec/exec/format/hive/hive_jni_reader.h"
5960
#include "vec/exec/format/json/new_json_reader.h"
6061
#include "vec/exec/format/orc/vorc_reader.h"
6162
#include "vec/exec/format/parquet/vparquet_reader.h"
@@ -945,6 +946,15 @@ Status VFileScanner::_get_next_reader() {
945946
->init_fetch_table_reader(_colname_to_value_range);
946947
break;
947948
}
949+
case TFileFormatType::FORMAT_SEQUENCE:
950+
case TFileFormatType::FORMAT_RCTEXT:
951+
case TFileFormatType::FORMAT_RCBINARY: {
952+
_cur_reader = HiveJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs,
953+
range);
954+
init_status = ((HiveJNIReader*)(_cur_reader.get()))
955+
->init_fetch_table_reader(_colname_to_value_range);
956+
break;
957+
}
948958
case TFileFormatType::FORMAT_WAL: {
949959
_cur_reader.reset(new WalReader(_state));
950960
init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);

Diff for: build.sh

+2
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
534534
modules+=("be-java-extensions/max-compute-scanner")
535535
modules+=("be-java-extensions/avro-scanner")
536536
modules+=("be-java-extensions/lakesoul-scanner")
537+
modules+=("be-java-extensions/hive-scanner")
537538
modules+=("be-java-extensions/preload-extensions")
538539

539540
# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES
@@ -819,6 +820,7 @@ EOF
819820
extensions_modules+=("max-compute-scanner")
820821
extensions_modules+=("avro-scanner")
821822
extensions_modules+=("lakesoul-scanner")
823+
extensions_modules+=("hive-scanner")
822824
extensions_modules+=("preload-extensions")
823825

824826
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then

0 commit comments

Comments
 (0)