Skip to content

Commit

Permalink
Merge pull request duckdb#147 from samansmink/forward-partition-aware…
Browse files Browse the repository at this point in the history
…ness

Forward partition awareness
  • Loading branch information
samansmink authored Feb 7, 2025
2 parents efc4522 + 413f7c9 commit ba2179d
Show file tree
Hide file tree
Showing 13 changed files with 681 additions and 107 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ include_directories(src/include)
set(EXTENSION_SOURCES
src/delta_extension.cpp
src/delta_functions.cpp
src/delta_macros.cpp
src/delta_utils.cpp
src/functions/delta_scan.cpp
src/functions/expression_functions.cpp
Expand Down Expand Up @@ -140,7 +141,7 @@ ExternalProject_Add(
# the c++ headers. Currently, when bumping the kernel version, the produced
# header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying
# the fix
GIT_TAG v0.6.0
GIT_TAG v0.6.1
# Prints the env variables passed to the cargo build to the terminal, useful
# in debugging because passing them through CMake is an error-prone mess
CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS}
Expand Down
9 changes: 9 additions & 0 deletions scripts/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
query = "CREATE table test_table AS SELECT i, i%2 as part from range(0,10) tbl(i);"
generate_test_data_delta_rs("simple_partitioned", query, "part")

### Simple partitioned table
query = "CREATE table test_table AS SELECT i, i%20 as part from range(0,10000) tbl(i);"
generate_test_data_delta_rs("simple_partitioned_large", query, "part")

### Lineitem SF0.01 No partitions
query = "call dbgen(sf=0.01);"
query += "CREATE table test_table AS SELECT * as part from lineitem;"
Expand All @@ -183,6 +187,11 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate
query = f"CREATE table test_table as select i::{type} as value1, (i)::{type} as value2, (i)::{type} as value3, i::{type} as part from range(0,5) tbl(i)"
generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part")

## Partitioned table with all types we can file skip on
for type in ["int"]:
query = f"CREATE table test_table as select i::{type}+10 as value1, (i)::{type}+100 as value2, (i)::{type}+1000 as value3, i::{type} as part from range(0,5) tbl(i)"
generate_test_data_delta_rs(f"test_file_skipping_2/{type}", query, "part")

## Simple table with deletion vector
con = duckdb.connect()
con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'")
Expand Down
9 changes: 7 additions & 2 deletions src/delta_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

#include "delta_utils.hpp"
#include "delta_functions.hpp"
#include "delta_macros.hpp"
#include "storage/delta_catalog.hpp"
#include "storage/delta_transaction_manager.hpp"

#include "duckdb.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/function/table_macro_function.hpp"
#include "duckdb/main/extension_util.hpp"
#include "duckdb/storage/storage_extension.hpp"
#include "storage/delta_catalog.hpp"
#include "storage/delta_transaction_manager.hpp"
#include "duckdb/main/config.hpp"

namespace duckdb {
Expand Down Expand Up @@ -71,6 +74,8 @@ static void LoadInternal(DatabaseInstance &instance) {
"performance even with DuckDB logging disabled.",
LogicalType::BOOLEAN, Value(false), LoggerCallback::DuckDBSettingCallBack);

DeltaMacros::RegisterMacros(instance);

LoggerCallback::Initialize(instance);
}

Expand Down
94 changes: 94 additions & 0 deletions src/delta_macros.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include "delta_macros.hpp"

#include "duckdb.hpp"
#include "duckdb/main/extension_util.hpp"
#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp"
#include "duckdb/parser/parser.hpp"
#include "duckdb/function/table_macro_function.hpp"
#include "duckdb/parser/parsed_data/create_macro_info.hpp"
#include "duckdb/catalog/default/default_functions.hpp"

namespace duckdb {

// Macro to fetch the pushed down filters for the most recent query
static constexpr auto DELTA_FILTER_PUSHDOWN_MACRO = R"(
SELECT
l1.message as query_string,
parse_delta_filter_logline(l2.message)['type'] as filter_type,
parse_delta_filter_logline(l2.message)['files_before'] as files_before,
parse_delta_filter_logline(l2.message)['files_after'] as files_after,
parse_delta_filter_logline(l2.message)['filters_before'] as filters_before,
parse_delta_filter_logline(l2.message)['filters_after'] as filters_after,
parse_delta_filter_logline(l2.message)['path'] as path
FROM duckdb_logs as l1
JOIN duckdb_logs as l2 ON
l1.transaction_id = l2.transaction_id
WHERE
l2.type='delta.FilterPushdown' AND
l1.type = 'duckdb.ClientContext.BeginQuery'
ORDER BY l1.transaction_id
)";

static constexpr auto DELTA_FILTER_PUSHDOWN_MACRO_TPCDS = R"(
SELECT
query_nr as tpcds_query,
filter_type,
files_before,
files_after,
filters_before,
filters_after,
path
FROM
delta_filter_pushdown_log()
JOIN
tpcds_queries() as tpcds_queries on tpcds_queries."query"=query_string
)";

void DeltaMacros::RegisterTableMacro(DatabaseInstance &db, const string &name, const string &query,
const vector<string> &params, const child_list_t<Value> &named_params) {
Parser parser;
parser.ParseQuery(query);
const auto &stmt = parser.statements.back();
auto &node = stmt->Cast<SelectStatement>().node;

auto func = make_uniq<TableMacroFunction>(std::move(node));
for (auto &param : params) {
func->parameters.push_back(make_uniq<ColumnRefExpression>(param));
}

for (auto &param : named_params) {
func->default_parameters[param.first] = make_uniq<ConstantExpression>(param.second);
}

CreateMacroInfo info(CatalogType::TABLE_MACRO_ENTRY);
info.schema = DEFAULT_SCHEMA;
info.name = name;
info.temporary = true;
info.internal = true;
info.macros.push_back(std::move(func));

ExtensionUtil::RegisterFunction(db, info);
}

static DefaultMacro delta_macros[] = {
{DEFAULT_SCHEMA,
"parse_delta_filter_logline",
{"x", nullptr},
{{nullptr, nullptr}},
"x::STRUCT(path VARCHAR, type VARCHAR, filters_before VARCHAR[], filters_after VARCHAR[], files_before BIGINT, "
"files_after BIGINT)"},
};

void DeltaMacros::RegisterMacros(DatabaseInstance &instance) {
// Register Regular macros
for (auto &macro : delta_macros) {
auto info = DefaultFunctionGenerator::CreateInternalMacroInfo(macro);
ExtensionUtil::RegisterFunction(instance, *info);
}

// Register Table Macros
RegisterTableMacro(instance, "delta_filter_pushdown_log", DELTA_FILTER_PUSHDOWN_MACRO, {}, {});
RegisterTableMacro(instance, "delta_filter_pushdown_log_tpcds", DELTA_FILTER_PUSHDOWN_MACRO_TPCDS, {}, {});
}

}; // namespace duckdb
28 changes: 14 additions & 14 deletions src/delta_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ vector<bool> KernelUtils::FromDeltaBoolSlice(const struct ffi::KernelBoolSlice s
return result;
}

PredicateVisitor::PredicateVisitor(const vector<string> &column_names, optional_ptr<TableFilterSet> filters) {
PredicateVisitor::PredicateVisitor(const vector<string> &column_names, optional_ptr<const TableFilterSet> filters) {
predicate = this;
visitor = (uintptr_t(*)(void *, ffi::KernelExpressionVisitorState *)) & VisitPredicate;

Expand Down Expand Up @@ -587,7 +587,7 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co
return visit_expression_eq(state, left, right);

default:
std::cout << " Unsupported operation: " << (int)filter.comparison_type << std::endl;
// TODO: add more types
return ~0; // Unsupported operation
}
}
Expand Down Expand Up @@ -644,24 +644,24 @@ void LoggerCallback::Initialize(DatabaseInstance &db_p) {
}

static string ConvertLogMessage(ffi::Event event) {
auto log_type = KernelUtils::FromDeltaString(event.target);
auto message = KernelUtils::FromDeltaString(event.message);
auto file = KernelUtils::FromDeltaString(event.file);
string constructed_log_message;
if (!file.empty()) {
constructed_log_message = StringUtil::Format("[%s] %s@%u : %s ", log_type, file, event.line, message);
} else {
constructed_log_message = message;
}

return constructed_log_message;
auto log_type = KernelUtils::FromDeltaString(event.target);
auto message = KernelUtils::FromDeltaString(event.message);
auto file = KernelUtils::FromDeltaString(event.file);
string constructed_log_message;
if (!file.empty()) {
constructed_log_message = StringUtil::Format("[%s] %s@%u : %s ", log_type, file, event.line, message);
} else {
constructed_log_message = message;
}

return constructed_log_message;
}
void LoggerCallback::CallbackEvent(ffi::Event event) {
auto &instance = GetInstance();
auto db_locked = instance.db.lock();
if (db_locked) {
auto transformed_log_level = GetDuckDBLogLevel(event.level);
DUCKDB_LOG( *db_locked, "delta.Kernel", transformed_log_level, ConvertLogMessage(event));
DUCKDB_LOG(*db_locked, "delta.Kernel", transformed_log_level, ConvertLogMessage(event));
}
}

Expand Down
Loading

0 comments on commit ba2179d

Please sign in to comment.