Skip to content

Commit fca2367

Browse files
scanhex12zvonand
authored andcommitted
Merge pull request ClickHouse#83653 from scanhex12/iceberg_field_id
Read iceberg data files by field ids
1 parent fe97330 commit fca2367

25 files changed

+234
-18
lines changed

src/Formats/FormatParserGroup.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,32 @@ namespace Setting
1717
extern const SettingsMaxThreads max_parsing_threads;
1818
}
1919

20+
void ColumnMapper::setStorageColumnEncoding(std::unordered_map<String, Int64> && storage_encoding_)
21+
{
22+
storage_encoding = std::move(storage_encoding_);
23+
}
24+
25+
std::pair<std::unordered_map<String, String>, std::unordered_map<String, String>> ColumnMapper::makeMapping(
26+
const Block & header,
27+
const std::unordered_map<Int64, String> & format_encoding)
28+
{
29+
std::unordered_map<String, String> clickhouse_to_parquet_names;
30+
std::unordered_map<String, String> parquet_names_to_clickhouse;
31+
32+
for (size_t i = 0; i < header.columns(); ++i)
33+
{
34+
auto column_name = header.getNames()[i];
35+
int64_t field_id;
36+
if (auto it = storage_encoding.find(column_name); it != storage_encoding.end())
37+
field_id = it->second;
38+
else
39+
continue;
40+
clickhouse_to_parquet_names[column_name] = format_encoding.at(field_id);
41+
parquet_names_to_clickhouse.emplace(format_encoding.at(field_id), column_name);
42+
}
43+
return {clickhouse_to_parquet_names, parquet_names_to_clickhouse};
44+
}
45+
2046
FormatParserGroup::FormatParserGroup(const Settings & settings, size_t num_streams_, std::shared_ptr<const ActionsDAG> filter_actions_dag_, const ContextPtr & context_)
2147
: max_parsing_threads(settings[Setting::max_parsing_threads])
2248
, max_io_threads(settings[Setting::max_download_threads])

src/Formats/FormatParserGroup.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,24 @@ struct FormatParserGroup;
1313

1414
using FormatParserGroupPtr = std::shared_ptr<FormatParserGroup>;
1515

16+
/// Some formats needs to custom mapping between columns in file and clickhouse columns.
17+
class ColumnMapper
18+
{
19+
public:
20+
/// clickhouse_column_name -> field_id
21+
void setStorageColumnEncoding(std::unordered_map<String, Int64> && storage_encoding_);
22+
23+
/// clickhouse_column_name -> format_column_name (just join the maps above by field_id).
24+
std::pair<std::unordered_map<String, String>, std::unordered_map<String, String>> makeMapping(
25+
const Block & header,
26+
const std::unordered_map<Int64, String> & format_encoding);
27+
28+
private:
29+
std::unordered_map<String, Int64> storage_encoding;
30+
};
31+
32+
using ColumnMapperPtr = std::shared_ptr<ColumnMapper>;
33+
1634
/// When reading many files in one query, e.g. `SELECT ... FROM file('part{00..99}.parquet')`,
1735
/// we want the file readers to share some resource limits, e.g. number of threads.
1836
/// They may also want to share some data structures to avoid initializing multiple copies,
@@ -41,6 +59,8 @@ struct FormatParserGroup
4159
/// IInputFormat implementation may put arbitrary state here.
4260
std::shared_ptr<void> opaque;
4361

62+
ColumnMapperPtr column_mapper;
63+
4464
private:
4565
/// For lazily initializing the fields above.
4666
std::once_flag init_flag;

src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ void ArrowBlockInputFormat::prepareReader()
174174
getPort().getHeader(),
175175
"Arrow",
176176
format_settings,
177+
std::nullopt,
177178
format_settings.arrow.allow_missing_columns,
178179
format_settings.null_as_default,
179180
format_settings.date_time_overflow_behavior,

src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,7 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
14681468
const Block & header_,
14691469
const std::string & format_name_,
14701470
const FormatSettings & format_settings_,
1471+
const std::optional<std::unordered_map<String, String>> & parquet_columns_to_clickhouse_,
14711472
bool allow_missing_columns_,
14721473
bool null_as_default_,
14731474
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior_,
@@ -1485,6 +1486,7 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
14851486
, case_insensitive_matching(case_insensitive_matching_)
14861487
, is_stream(is_stream_)
14871488
, enable_json_parsing(enable_json_parsing_)
1489+
, parquet_columns_to_clickhouse(parquet_columns_to_clickhouse_)
14881490
{
14891491
}
14901492

@@ -1504,12 +1506,14 @@ Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(
15041506

15051507
auto arrow_field = table->schema()->GetFieldByName(column_name);
15061508

1509+
if (parquet_columns_to_clickhouse)
1510+
column_name = parquet_columns_to_clickhouse->at(column_name);
1511+
15071512
if (case_insensitive_matching)
15081513
boost::to_lower(column_name);
15091514

15101515
name_to_arrow_column[std::move(column_name)] = {std::move(arrow_column), std::move(arrow_field)};
15111516
}
1512-
15131517
return arrowColumnsToCHChunk(name_to_arrow_column, num_rows, metadata, block_missing_values);
15141518
}
15151519

src/Processors/Formats/Impl/ArrowColumnToCHColumn.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <cstdint>
4+
#include <unordered_map>
45

56
#include "config.h"
67

@@ -25,6 +26,7 @@ class ArrowColumnToCHColumn
2526
const Block & header_,
2627
const std::string & format_name_,
2728
const FormatSettings & format_settings_,
29+
const std::optional<std::unordered_map<String, String>> & parquet_columns_to_clickhouse_,
2830
bool allow_missing_columns_,
2931
bool null_as_default_,
3032
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior_,
@@ -90,6 +92,8 @@ class ArrowColumnToCHColumn
9092
/// To avoid converting dictionary from Arrow Dictionary
9193
/// to LowCardinality every chunk we save it and reuse.
9294
std::unordered_map<std::string, DictionaryInfo> dictionary_infos;
95+
96+
std::optional<std::unordered_map<String, String>> parquet_columns_to_clickhouse;
9397
};
9498

9599
}

src/Processors/Formats/Impl/ArrowFieldIndexUtil.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ class ArrowFieldIndexUtil
8080
std::vector<ClickHouseIndexToParquetIndex> findRequiredIndices(
8181
const Block & header,
8282
const arrow::Schema & schema,
83-
const parquet::FileMetaData & file)
83+
const parquet::FileMetaData & file,
84+
const std::optional<std::unordered_map<String, String>> & clickhouse_to_parquet_names)
8485
{
8586
std::vector<ClickHouseIndexToParquetIndex> required_indices;
8687
std::unordered_set<int> added_indices;
@@ -90,6 +91,13 @@ class ArrowFieldIndexUtil
9091
{
9192
const auto & named_col = header.getByPosition(i);
9293
std::string col_name = named_col.name;
94+
if (clickhouse_to_parquet_names)
95+
{
96+
if (auto it = clickhouse_to_parquet_names->find(col_name); it != clickhouse_to_parquet_names->end())
97+
col_name = it->second;
98+
else
99+
continue;
100+
}
93101
if (ignore_case)
94102
boost::to_lower(col_name);
95103
findRequiredIndices(col_name, i, named_col.type, fields_indices, added_indices, required_indices, file);

src/Processors/Formats/Impl/ORCBlockInputFormat.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ void ORCBlockInputFormat::prepareReader()
139139
getPort().getHeader(),
140140
"ORC",
141141
format_settings,
142+
std::nullopt,
142143
format_settings.orc.allow_missing_columns,
143144
format_settings.null_as_default,
144145
format_settings.date_time_overflow_behavior,

src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ ParquetRecordReader::ParquetRecordReader(
318318
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
319319
const FormatSettings & format_settings,
320320
std::vector<int> row_groups_indices_,
321+
const std::optional<std::vector<Int32>> & column_indices_,
321322
std::shared_ptr<parquet::FileMetaData> metadata)
322323
: file_reader(createFileReader(std::move(arrow_file), reader_properties_, std::move(metadata)))
323324
, arrow_properties(arrow_properties_)
@@ -338,20 +339,26 @@ ParquetRecordReader::ParquetRecordReader(
338339

339340
parquet_col_indice.reserve(header.columns());
340341
column_readers.reserve(header.columns());
341-
for (const auto & col_with_name : header)
342+
if (!column_indices_)
342343
{
343-
auto it = parquet_columns.find(col_with_name.name);
344-
if (it == parquet_columns.end())
345-
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", col_with_name.name);
344+
for (const auto & col_with_name : header)
345+
{
346+
auto it = parquet_columns.find(col_with_name.name);
347+
if (it == parquet_columns.end())
348+
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", col_with_name.name);
346349

347-
const auto & node = it->second;
348-
if (!node->is_primitive())
349-
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "arrays and maps are not implemented in native parquet reader");
350+
const auto & node = it->second;
351+
if (!node->is_primitive())
352+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "arrays and maps are not implemented in native parquet reader");
350353

351-
auto idx = file_reader->metadata()->schema()->ColumnIndex(*node);
352-
chassert(idx >= 0);
353-
parquet_col_indice.push_back(idx);
354+
auto idx = file_reader->metadata()->schema()->ColumnIndex(*node);
355+
chassert(idx >= 0);
356+
parquet_col_indice.push_back(idx);
357+
}
354358
}
359+
else
360+
parquet_col_indice = *column_indices_;
361+
355362
if (arrow_properties.pre_buffer())
356363
{
357364
THROW_PARQUET_EXCEPTION(file_reader->PreBuffer(

src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class ParquetRecordReader
2525
std::shared_ptr<::arrow::io::RandomAccessFile> arrow_file,
2626
const FormatSettings & format_settings,
2727
std::vector<int> row_groups_indices_,
28+
const std::optional<std::vector<Int32>> & column_indices_,
2829
std::shared_ptr<parquet::FileMetaData> metadata = nullptr);
2930

3031
Chunk readChunk();

src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
#include "ParquetBlockInputFormat.h"
1+
#include <memory>
2+
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
23

34
#if USE_PARQUET
45

@@ -632,7 +633,22 @@ void ParquetBlockInputFormat::initializeIfNeeded()
632633
format_settings.parquet.case_insensitive_column_matching,
633634
format_settings.parquet.allow_missing_columns);
634635

635-
auto index_mapping = field_util.findRequiredIndices(getPort().getHeader(), *schema, *metadata);
636+
std::optional<std::unordered_map<String, String>> clickhouse_to_parquet_names;
637+
if (parser_group && parser_group->column_mapper)
638+
{
639+
auto header = getPort().getHeader();
640+
const auto & group_node = metadata->schema()->group_node();
641+
642+
std::unordered_map<Int64, String> parquet_field_ids;
643+
parquet_names_to_clickhouse = std::unordered_map<String, String>{};
644+
for (int i = 0; i < group_node->field_count(); ++i)
645+
parquet_field_ids[group_node->field(i)->field_id()] = group_node->field(i)->name();
646+
647+
auto result = parser_group->column_mapper->makeMapping(header, parquet_field_ids);
648+
clickhouse_to_parquet_names = std::move(result.first);
649+
parquet_names_to_clickhouse = std::move(result.second);
650+
}
651+
auto index_mapping = field_util.findRequiredIndices(getPort().getHeader(), *schema, *metadata, clickhouse_to_parquet_names);
636652

637653
for (const auto & [clickhouse_header_index, parquet_indexes] : index_mapping)
638654
{
@@ -848,7 +864,8 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
848864
reader_properties,
849865
arrow_file,
850866
format_settings,
851-
row_group_batch.row_groups_idxs);
867+
row_group_batch.row_groups_idxs,
868+
column_indices);
852869
}
853870
else
854871
{
@@ -875,6 +892,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
875892
getPort().getHeader(),
876893
"Parquet",
877894
format_settings,
895+
parquet_names_to_clickhouse,
878896
format_settings.parquet.allow_missing_columns,
879897
format_settings.null_as_default,
880898
format_settings.date_time_overflow_behavior,

0 commit comments

Comments
 (0)