Skip to content

Commit

Permalink
[Backport release-2.27] Improve readers by parallelizing I/O and comp…
Browse files Browse the repository at this point in the history
…ute operations (#5401) (#5451)

Backport of #5401 to release-2.27

---
TYPE: IMPROVEMENT
DESC: Improve readers by parallelizing I/O and compute operations

---------

Co-authored-by: Seth Shelnutt <[email protected]>
Co-authored-by: Ypatia Tsavliri <[email protected]>
Co-authored-by: Seth Shelnutt <[email protected]>
  • Loading branch information
4 people authored Feb 14, 2025
1 parent a2e10e1 commit 8652f02
Show file tree
Hide file tree
Showing 23 changed files with 416 additions and 143 deletions.
5 changes: 4 additions & 1 deletion test/src/unit-ReadCellSlabIter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ void set_result_tile_dim(
std::nullopt,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
result_tile.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
4 changes: 2 additions & 2 deletions test/src/unit-cppapi-consolidation-with-timestamps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down Expand Up @@ -685,7 +685,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down
20 changes: 16 additions & 4 deletions test/src/unit-result-tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -230,7 +233,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down Expand Up @@ -326,7 +332,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -343,7 +352,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
12 changes: 7 additions & 5 deletions test/src/unit-sparse-global-order-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1993,9 +1993,10 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tile (2 * (~3000 + 8) will be bigger than the per fragment
// budget (1000).
memory_.total_budget_ = "35000";
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (50000 * 0.11 / 2 fragments = 2750), so only one result
// tile will be loaded each time.
memory_.total_budget_ = "60000";
memory_.ratio_coords_ = "0.11";
update_config();

Expand Down Expand Up @@ -2518,8 +2519,9 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tile (2 * (~4000 + 8) will be bigger than the per fragment
// budget (1000).
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (40000 * 0.22 /2 frag = 4400), so only one will be loaded
// each time.
memory_.total_budget_ = "40000";
memory_.ratio_coords_ = "0.22";
update_config();
Expand Down
7 changes: 5 additions & 2 deletions test/src/unit-sparse-unordered-with-dups-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,12 @@ TEST_CASE_METHOD(

if (one_frag) {
CHECK(1 == loop_num->second);
} else {
CHECK(9 == loop_num->second);
}
/**
* We can't do a similar check for multiple fragments as it is architecture
* dependent how many tiles fit in the memory budget. And thus also
* architecture dependent as to how many internal loops we have.
*/

// Try to read multiple frags without partial tile offset reading. Should
// fail
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/compression_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ Status CompressionFilter::decompress_var_string_coords(
auto output_view = span<std::byte>(
reinterpret_cast<std::byte*>(output_buffer->data()), uncompressed_size);
auto offsets_view = span<uint64_t>(
offsets_tile->data_as<offsets_t>(), uncompressed_offsets_size);
offsets_tile->data_as_unsafe<offsets_t>(), uncompressed_offsets_size);

if (compressor_ == Compressor::RLE) {
uint8_t rle_len_bytesize, string_len_bytesize;
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/filter/filter_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ Status FilterPipeline::run_reverse(
// If the pipeline is empty, just copy input to output.
if (filters_.empty()) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(input_data.copy_to(output_chunk_buffer));
continue;
}
Expand All @@ -487,7 +487,7 @@ Status FilterPipeline::run_reverse(
bool last_filter = filter_idx == 0;
if (last_filter) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(output_data.set_fixed_allocation(
output_chunk_buffer, chunk.unfiltered_data_size_));
reader_stats->add_counter(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/filter_test_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ Tile create_tile_for_unfiltering(
tile->cell_size() * nelts,
tile->filtered_buffer().data(),
tile->filtered_buffer().size(),
tracker};
tracker,
std::nullopt};
}

void run_reverse(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/tile_data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class TileDataGenerator {
original_tile_size(),
filtered_buffer.data(),
filtered_buffer.size(),
memory_tracker);
memory_tracker,
std::nullopt);
}

/** Returns the size of the original unfiltered data. */
Expand Down
9 changes: 6 additions & 3 deletions tiledb/sm/metadata/test/unit_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ TEST_CASE(
tile1->size(),
tile1->filtered_buffer().data(),
tile1->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size());

metadata_tiles[1] = tdb::make_shared<Tile>(
Expand All @@ -135,7 +136,8 @@ TEST_CASE(
tile2->size(),
tile2->filtered_buffer().data(),
tile2->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size());

metadata_tiles[2] = tdb::make_shared<Tile>(
Expand All @@ -147,7 +149,8 @@ TEST_CASE(
tile3->size(),
tile3->filtered_buffer().data(),
tile3->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size());

meta = Metadata::deserialize(metadata_tiles);
Expand Down
59 changes: 32 additions & 27 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ Status DenseReader::dense_read() {
// processing.
if (qc_coords_mode_) {
t_start = t_end;
if (compute_task.valid()) {
throw_if_not_ok(compute_task.wait());
}
continue;
}

Expand Down Expand Up @@ -769,8 +772,8 @@ DenseReader::compute_result_space_tiles(
const auto fragment_num = (unsigned)frag_tile_domains.size();
const auto& tile_coords = subarray.tile_coords();

// Keep track of the required memory to load the result space tiles. Split up
// filtered versus unfiltered. The memory budget is combined for all
// Keep track of the required memory to load the result space tiles. Split
// up filtered versus unfiltered. The memory budget is combined for all
// query condition attributes.
uint64_t required_memory_query_condition_unfiltered = 0;
std::vector<uint64_t> required_memory_unfiltered(
Expand All @@ -786,28 +789,28 @@ DenseReader::compute_result_space_tiles(
aggregate_only_field[n - condition_names.size()] = aggregate_only(name);
}

// Here we estimate the size of the tile structures. First, we have to account
// the size of the space tile structure. We could go deeper in the class to
// account for other things but for now we keep it simpler. Second, we try to
// account for the tile subarray (DenseTileSubarray). This class will have a
// vector of ranges per dimensions, so 1 + dim_num * sizeof(vector). Here we
// choose 32 for the size of the vector to anticipate the conversion to a PMR
// vector. We also add dim_num * 2 * sizeof(DimType) to account for at least
// one range per dimension (this should be improved by accounting for the
// exact number of ranges). Finally for the original range index member, we
// have to add 1 + dim_num * sizeof(vector) as well and one uint64_t per
// dimension (this can also be improved by accounting for the
// exact number of ranges).
// Here we estimate the size of the tile structures. First, we have to
// account the size of the space tile structure. We could go deeper in the
// class to account for other things but for now we keep it simpler. Second,
// we try to account for the tile subarray (DenseTileSubarray). This class
// will have a vector of ranges per dimensions, so 1 + dim_num *
// sizeof(vector). Here we choose 32 for the size of the vector to
// anticipate the conversion to a PMR vector. We also add dim_num * 2 *
// sizeof(DimType) to account for at least one range per dimension (this
// should be improved by accounting for the exact number of ranges). Finally
// for the original range index member, we have to add 1 + dim_num *
// sizeof(vector) as well and one uint64_t per dimension (this can also be
// improved by accounting for the exact number of ranges).
uint64_t est_tile_structs_size =
sizeof(ResultSpaceTile<DimType>) + (1 + dim_num) * 2 * 32 +
dim_num * (2 * sizeof(DimType) + sizeof(uint64_t));

// Create the vector of result tiles to operate on. We stop once we reach
// the end or the memory budget. We either reach the tile upper memory limit,
// which is only for unfiltered data, or the limit of the available budget,
// which is for filtered data, unfiltered data and the tile structs. We try to
// process two tile batches at a time so the available memory is half of what
// we have available.
// the end or the memory budget. We either reach the tile upper memory
// limit, which is only for unfiltered data, or the limit of the available
// budget, which is for filtered data, unfiltered data and the tile structs.
// We try to process two tile batches at a time so the available memory is
// half of what we have available.
uint64_t t_end = t_start;
bool wait_compute_task_before_read = false;
bool done = false;
Expand Down Expand Up @@ -895,8 +898,8 @@ DenseReader::compute_result_space_tiles(
uint64_t tile_memory_filtered = 0;
uint64_t r_idx = n - condition_names.size();

// We might not need to load this tile into memory at all for aggregation
// only.
// We might not need to load this tile into memory at all for
// aggregation only.
if (aggregate_only_field[r_idx] &&
can_aggregate_tile_with_frag_md(
names[n], result_space_tile, tiles_cell_num[t_end])) {
Expand Down Expand Up @@ -953,13 +956,14 @@ DenseReader::compute_result_space_tiles(
required_memory_unfiltered[r_idx] +
est_tile_structs_size;

// Disable the multiple iterations if the tiles don't fit in the iteration
// budget.
// Disable the multiple iterations if the tiles don't fit in the
// iteration budget.
if (total_memory > available_memory_iteration) {
wait_compute_task_before_read = true;
}

// If a single tile doesn't fit in the available memory, we can't proceed.
// If a single tile doesn't fit in the available memory, we can't
// proceed.
if (total_memory > available_memory) {
throw DenseReaderException(
"Cannot process a single tile requiring " +
Expand Down Expand Up @@ -1003,7 +1007,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(
const auto& tile_coords = subarray.tile_coords();
const bool agg_only = name.has_value() && aggregate_only(name.value());

// If the result is already loaded in query condition, return the empty list;
// If the result is already loaded in query condition, return the empty
// list;
std::vector<ResultTile*> ret;
if (name.has_value() && condition_names.count(name.value()) != 0) {
return ret;
Expand Down Expand Up @@ -1033,8 +1038,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(

/**
* Apply the query condition. The computation will be pushed on the compute
* thread pool in `compute_task`. Callers should wait on this task before using
* the results of the query condition.
* thread pool in `compute_task`. Callers should wait on this task before
* using the results of the query condition.
*/
template <class DimType, class OffType>
Status DenseReader::apply_query_condition(
Expand Down
Loading

0 comments on commit 8652f02

Please sign in to comment.