Skip to content

Commit

Permalink
add workaround for duckdb#150
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Feb 10, 2025
1 parent ba2179d commit ac600eb
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 48 deletions.
107 changes: 65 additions & 42 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,49 +47,66 @@ string url_decode(string input) {
return result;
}

void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size,
void DeltaSnapshot::VisitCallbackInternal(ffi::NullableCvoid engine_context, ffi::KernelStringSlice path, int64_t size,
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const ffi::CStringMap *partition_values) {
auto context = (ScanDataCallBackContext*) engine_context;
auto &snapshot = context->snapshot;

auto path_string = snapshot.GetPath();
StringUtil::RTrim(path_string, "/");
path_string += "/" + KernelUtils::FromDeltaString(path);

path_string = url_decode(path_string);

// First we append the file to our resolved files
snapshot.resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string));
snapshot.metadata.emplace_back(make_uniq<DeltaFileMetaData>());

D_ASSERT(snapshot.resolved_files.size() == snapshot.metadata.size());

// Initialize the file metadata
snapshot.metadata.back()->delta_snapshot_version = snapshot.version;
snapshot.metadata.back()->file_number = snapshot.resolved_files.size() - 1;
if (stats) {
snapshot.metadata.back()->cardinality = stats->num_records;
}

// Fetch the deletion vector
auto selection_vector_res =
ffi::selection_vector_from_dv(dv_info, snapshot.extern_engine.get(), snapshot.global_state.get());

// TODO: remove workaround for https://github.com/duckdb/duckdb-delta/issues/150
if (selection_vector_res.tag != ffi::ExternResult<ffi::KernelBoolSlice>::Tag::Err) {
auto selection_vector =
KernelUtils::UnpackResult(selection_vector_res, "selection_vector_from_dv for path " + snapshot.GetPath());
if (selection_vector.ptr) {
snapshot.metadata.back()->selection_vector = selection_vector;
}
}

// Lookup all columns for potential hits in the constant map
case_insensitive_map_t<string> constant_map;
for (const auto &col : snapshot.names) {
auto key = KernelUtils::ToDeltaString(col);
auto *partition_val = (string *)ffi::get_from_map(partition_values, key, allocate_string);
if (partition_val) {
constant_map[col] = *partition_val;
delete partition_val;
}
}
snapshot.metadata.back()->partition_map = std::move(constant_map);
}

void DeltaSnapshot::VisitCallback(ffi::NullableCvoid engine_context, ffi::KernelStringSlice path, int64_t size,
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const struct ffi::CStringMap *partition_values) {
auto context = (DeltaSnapshot *)engine_context;
auto path_string = context->GetPath();
StringUtil::RTrim(path_string, "/");
path_string += "/" + KernelUtils::FromDeltaString(path);

path_string = url_decode(path_string);

// First we append the file to our resolved files
context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string));
context->metadata.emplace_back(make_uniq<DeltaFileMetaData>());

D_ASSERT(context->resolved_files.size() == context->metadata.size());

// Initialize the file metadata
context->metadata.back()->delta_snapshot_version = context->version;
context->metadata.back()->file_number = context->resolved_files.size() - 1;
if (stats) {
context->metadata.back()->cardinality = stats->num_records;
}

// Fetch the deletion vector
auto selection_vector_res =
ffi::selection_vector_from_dv(dv_info, context->extern_engine.get(), context->global_state.get());
auto selection_vector =
KernelUtils::UnpackResult(selection_vector_res, "selection_vector_from_dv for path " + context->GetPath());
if (selection_vector.ptr) {
context->metadata.back()->selection_vector = selection_vector;
}

// Lookup all columns for potential hits in the constant map
case_insensitive_map_t<string> constant_map;
for (const auto &col : context->names) {
auto key = KernelUtils::ToDeltaString(col);
auto *partition_val = (string *)ffi::get_from_map(partition_values, key, allocate_string);
if (partition_val) {
constant_map[col] = *partition_val;
delete partition_val;
}
const ffi::CStringMap *partition_values) {
try {
return VisitCallbackInternal(engine_context, path, size, stats, dv_info, partition_values);
} catch (std::runtime_error &e) {
auto context = (ScanDataCallBackContext*) engine_context;
context->error = ErrorData(e);
}
context->metadata.back()->partition_map = std::move(constant_map);
}

void DeltaSnapshot::VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data,
Expand Down Expand Up @@ -455,8 +472,14 @@ string DeltaSnapshot::GetFileInternal(idx_t i) const {
return "";
}

ScanDataCallBackContext callback_context(*this);

while (i >= resolved_files.size()) {
auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), (void *)this, VisitData);
auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), &callback_context, VisitData);

if (callback_context.error.HasError()) {
callback_context.error.Throw();
}

auto have_scan_data = TryUnpackKernelResult(have_scan_data_res);

Expand Down
22 changes: 16 additions & 6 deletions src/include/functions/delta_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ struct DeltaSnapshot : public MultiFileList {
static void VisitCallback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size,
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const struct ffi::CStringMap *partition_values);
static void VisitCallbackInternal(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size,
const ffi::Stats *stats, const ffi::DvInfo *dv_info,
const struct ffi::CStringMap *partition_values);

protected:
// Note: Nearly this entire class is mutable because it represents a lazily expanded list of files that is logically
Expand All @@ -118,17 +121,17 @@ struct DeltaSnapshot : public MultiFileList {
mutable bool initialized_scan = false;
mutable bool files_exhausted = false;

//! Metadata map for files
mutable vector<unique_ptr<DeltaFileMetaData>> metadata;

mutable vector<string> resolved_files;
mutable TableFilterSet table_filters;

//! Names
vector<string> names;
vector<LogicalType> types;
bool have_bound = false;

//! Metadata map for files
vector<unique_ptr<DeltaFileMetaData>> metadata;

vector<string> resolved_files;
TableFilterSet table_filters;

ClientContext &context;
};

Expand Down Expand Up @@ -191,4 +194,11 @@ struct DeltaMultiFileReader : public MultiFileReader {
shared_ptr<DeltaSnapshot> snapshot;
};

struct ScanDataCallBackContext {
explicit ScanDataCallBackContext(const DeltaSnapshot &snapshot_p) : snapshot(snapshot_p) {
}
const DeltaSnapshot &snapshot;
ErrorData error;
};

} // namespace duckdb

0 comments on commit ac600eb

Please sign in to comment.