Skip to content

Commit 02760e6

Browse files
committed
Get object sizes based on S3's ListObjects output (just with scan_object_sizes() to start with)
8560764974
1 parent 520919c commit 02760e6

18 files changed

+318
-99
lines changed

cpp/arcticdb/async/async_store.hpp

+24
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,30 @@ void iterate_type(
223223
library_->iterate_type(type, func, prefix);
224224
}
225225

226+
folly::Future<storage::ObjectSizes> get_object_sizes(KeyType type, const std::string& prefix) override {
227+
if (library_->supports_object_size_calculation()) {
228+
// The library has native support for some kind of clever size calculation, so let it take over
229+
return async::submit_io_task(ObjectSizesTask{type, prefix, library_});
230+
}
231+
232+
// No native support for a clever size calculation, so just read keys and sum their sizes
233+
auto counter = std::make_shared<std::atomic_uint64_t>(0);
234+
auto bytes = std::make_shared<std::atomic_uint64_t>(0);
235+
KeySizeCalculators key_size_calculators;
236+
iterate_type(type, [&key_size_calculators, &counter, &bytes](const VariantKey&& k) {
237+
key_size_calculators.emplace_back(std::forward<const VariantKey>(k), [counter, bytes] (auto&& ks) {
238+
counter->fetch_add(1);
239+
auto key_seg = std::forward<decltype(ks)>(ks);
240+
auto compressed_size = key_seg.segment().size();
241+
bytes->fetch_add(compressed_size);
242+
return key_seg.variant_key();
243+
});
244+
}, prefix);
245+
246+
read_ignoring_key_not_found(std::move(key_size_calculators));
247+
return folly::makeFuture(storage::ObjectSizes{type, *counter, *bytes});
248+
}
249+
226250
bool scan_for_matching_key(
227251
KeyType key_type, const IterateTypePredicate& predicate) override {
228252
return library_->scan_for_matching_key(key_type, predicate);

cpp/arcticdb/async/tasks.hpp

+24
Original file line numberDiff line numberDiff line change
@@ -665,4 +665,28 @@ struct RemoveBatchTask : BaseTask {
665665
}
666666
};
667667

668+
struct ObjectSizesTask : BaseTask {
669+
KeyType type_;
670+
std::string prefix_;
671+
std::shared_ptr<storage::Library> lib_;
672+
673+
ObjectSizesTask(
674+
KeyType type,
675+
std::string prefix,
676+
std::shared_ptr<storage::Library> lib
677+
) :
678+
type_(type),
679+
prefix_(std::move(prefix)),
680+
lib_(std::move(lib)) {
681+
ARCTICDB_DEBUG(log::storage(), "Creating object sizes task for key type {} prefix {}", type_, prefix_);
682+
}
683+
684+
ARCTICDB_MOVE_ONLY_DEFAULT(ObjectSizesTask)
685+
686+
folly::Future<storage::ObjectSizes> operator()() {
687+
util::check(lib_->supports_object_size_calculation(), "ObjectSizesBytesTask should only be used with storages"
688+
" that natively support size calculation");
689+
return lib_->get_object_sizes(type_, prefix_);
690+
}
691+
};
668692
}

cpp/arcticdb/storage/library.hpp

+9
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ class Library {
7070
storages_->iterate_type(key_type, visitor, prefix);
7171
}
7272

73+
bool supports_object_size_calculation() {
74+
return storages_->supports_object_size_calculation();
75+
}
76+
77+
ObjectSizes get_object_sizes(KeyType type, const std::string& prefix) {
78+
ARCTICDB_SAMPLE(GetObjectSizes, 0)
79+
return storages_->get_object_sizes(type, prefix);
80+
}
81+
7382
/**
7483
* Scan through every key of the given type until one matches the predicate.
7584
*

cpp/arcticdb/storage/s3/detail-inl.hpp

+82-16
Original file line numberDiff line numberDiff line change
@@ -405,23 +405,30 @@ void do_update_impl(
405405
do_write_impl(std::move(kvs), root_folder, bucket_name, s3_client, std::forward<KeyBucketizer>(bucketizer));
406406
}
407407

408-
inline auto default_prefix_handler() {
408+
inline PrefixHandler default_prefix_handler() {
409409
return [](const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor& key_descriptor, KeyType) {
410410
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
411411
};
412412
}
413413

414-
template<class KeyBucketizer, class PrefixHandler>
415-
bool do_iterate_type_impl(
416-
KeyType key_type,
417-
const IterateTypePredicate& visitor,
414+
struct PathInfo {
415+
PathInfo(std::string prefix, std::string key_type_dir, size_t path_to_key_size) :
416+
key_prefix_(std::move(prefix)), key_type_dir_(std::move(key_type_dir)), path_to_key_size_(path_to_key_size) {
417+
418+
}
419+
420+
std::string key_prefix_;
421+
std::string key_type_dir_;
422+
size_t path_to_key_size_;
423+
};
424+
425+
template<class KeyBucketizer>
426+
PathInfo calculate_path_info(
418427
const std::string& root_folder,
419-
const std::string& bucket_name,
420-
const S3ClientInterface& s3_client,
421-
KeyBucketizer&& bucketizer,
422-
PrefixHandler&& prefix_handler = default_prefix_handler(),
423-
const std::string& prefix = std::string{}) {
424-
ARCTICDB_SAMPLE(S3StorageIterateType, 0)
428+
KeyType key_type,
429+
const PrefixHandler& prefix_handler,
430+
const std::string& prefix,
431+
KeyBucketizer&& bucketizer) {
425432
auto key_type_dir = key_type_folder(root_folder, key_type);
426433
const auto path_to_key_size = key_type_dir.size() + 1 + bucketizer.bucketize_length(key_type);
427434
// if prefix is empty, add / to avoid matching both 'log' and 'logc' when key_type_dir is {root_folder}/log
@@ -438,19 +445,36 @@ bool do_iterate_type_impl(
438445
: IndexDescriptorImpl::Type::TIMESTAMP,
439446
FormatType::TOKENIZED);
440447
auto key_prefix = prefix_handler(prefix, key_type_dir, key_descriptor, key_type);
441-
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Searching for objects in bucket {} with prefix {}", bucket_name,
442-
key_prefix);
448+
449+
return {key_prefix, key_type_dir, path_to_key_size};
450+
}
451+
452+
template<class KeyBucketizer>
453+
bool do_iterate_type_impl(
454+
KeyType key_type,
455+
const IterateTypePredicate& visitor,
456+
const std::string& root_folder,
457+
const std::string& bucket_name,
458+
const S3ClientInterface& s3_client,
459+
KeyBucketizer&& bucketizer,
460+
const PrefixHandler& prefix_handler = default_prefix_handler(),
461+
const std::string& prefix = std::string{}) {
462+
ARCTICDB_SAMPLE(S3StorageIterateType, 0)
463+
464+
auto path_info = calculate_path_info(root_folder, key_type, prefix_handler, prefix, std::move(bucketizer));
465+
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Iterating over objects in bucket {} with prefix {}", bucket_name,
466+
path_info.key_prefix_);
443467

444468
auto continuation_token = std::optional<std::string>();
445469
do {
446-
auto list_objects_result = s3_client.list_objects(key_prefix, bucket_name, continuation_token);
470+
auto list_objects_result = s3_client.list_objects(path_info.key_prefix_, bucket_name, continuation_token);
447471
if (list_objects_result.is_success()) {
448472
auto& output = list_objects_result.get_output();
449473

450474
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Received object list");
451475

452476
for (auto& s3_object_name : output.s3_object_names) {
453-
auto key = s3_object_name.substr(path_to_key_size);
477+
auto key = s3_object_name.substr(path_info.path_to_key_size_);
454478
ARCTICDB_TRACE(log::version(), "Got object_list: {}, key: {}", s3_object_name, key);
455479
auto k = variant_key_from_bytes(
456480
reinterpret_cast<uint8_t *>(key.data()),
@@ -474,13 +498,55 @@ bool do_iterate_type_impl(
474498
error.GetMessage().c_str());
475499
// We don't raise on expected errors like NoSuchKey because we want to return an empty list
476500
// instead of raising.
477-
raise_if_unexpected_error(error, key_prefix);
501+
raise_if_unexpected_error(error, path_info.key_prefix_);
478502
return false;
479503
}
480504
} while (continuation_token.has_value());
481505
return false;
482506
}
483507

508+
template<class KeyBucketizer>
509+
ObjectSizes do_calculate_sizes_for_type_impl(
510+
KeyType key_type,
511+
const std::string& root_folder,
512+
const std::string& bucket_name,
513+
const S3ClientInterface& s3_client,
514+
KeyBucketizer&& bucketizer,
515+
const PrefixHandler& prefix_handler = default_prefix_handler(),
516+
const std::string& prefix = std::string{}) {
517+
ARCTICDB_SAMPLE(S3StorageCalculateSizesForType, 0)
518+
519+
auto path_info = calculate_path_info(root_folder, key_type, prefix_handler, prefix, std::move(bucketizer));
520+
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Calculating sizes for objects in bucket {} with prefix {}", bucket_name,
521+
path_info.key_prefix_);
522+
523+
auto continuation_token = std::optional<std::string>();
524+
ObjectSizes res{key_type};
525+
do {
526+
auto list_objects_result = s3_client.list_objects(path_info.key_prefix_, bucket_name, continuation_token);
527+
if (list_objects_result.is_success()) {
528+
auto& output = list_objects_result.get_output();
529+
530+
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Received object list");
531+
532+
for (auto& s3_object_size : output.s3_object_sizes) {
533+
res.count_ += 1;
534+
res.compressed_size_bytes_ += s3_object_size;
535+
}
536+
continuation_token = output.next_continuation_token;
537+
} else {
538+
const auto& error = list_objects_result.get_error();
539+
log::storage().warn("Failed to iterate key type with key '{}' {}: {}",
540+
key_type,
541+
error.GetExceptionName().c_str(),
542+
error.GetMessage().c_str());
543+
raise_if_unexpected_error(error, path_info.key_prefix_);
544+
}
545+
} while (continuation_token.has_value());
546+
547+
return res;
548+
}
549+
484550
template<class KeyBucketizer>
485551
bool do_key_exists_impl(
486552
const VariantKey& key,

cpp/arcticdb/storage/s3/nfs_backed_storage.cpp

+18-10
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,16 @@ void NfsBackedStorage::do_remove(std::span<VariantKey> variant_keys, RemoveOpts)
197197
s3::detail::do_remove_impl(std::span(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{});
198198
}
199199

200+
static std::string prefix_handler(const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor&, KeyType key_type) {
201+
std::string new_prefix;
202+
if(!prefix.empty()) {
203+
uint32_t id = get_id_bucket(encode_item<StreamId, StringId, NumericId>(StringId{prefix}, is_ref_key_class(key_type)));
204+
new_prefix = fmt::format("{:03}", id);
205+
}
206+
207+
return !prefix.empty() ? fmt::format("{}/{}", key_type_dir, new_prefix) : key_type_dir;
208+
}
209+
200210
bool NfsBackedStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix) {
201211
const IterateTypePredicate func = [&v = visitor, prefix=prefix] (VariantKey&& k) {
202212
auto key = unencode_object_id(k);
@@ -207,16 +217,6 @@ bool NfsBackedStorage::do_iterate_type_until_match(KeyType key_type, const Itera
207217
}
208218
};
209219

210-
auto prefix_handler = [] (const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor&, KeyType key_type) {
211-
std::string new_prefix;
212-
if(!prefix.empty()) {
213-
uint32_t id = get_id_bucket(encode_item<StreamId, StringId, NumericId>(StringId{prefix}, is_ref_key_class(key_type)));
214-
new_prefix = fmt::format("{:03}", id);
215-
}
216-
217-
return !prefix.empty() ? fmt::format("{}/{}", key_type_dir, new_prefix) : key_type_dir;
218-
};
219-
220220
return s3::detail::do_iterate_type_impl(key_type, func, root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, prefix_handler, prefix);
221221
}
222222

@@ -225,4 +225,12 @@ bool NfsBackedStorage::do_key_exists(const VariantKey& key) {
225225
return s3::detail::do_key_exists_impl(encoded_key, root_folder_, bucket_name_, *s3_client_, NfsBucketizer{});
226226
}
227227

228+
bool NfsBackedStorage::supports_object_size_calculation() const {
229+
return true;
230+
}
231+
232+
ObjectSizes NfsBackedStorage::do_get_object_sizes(KeyType key_type, const std::string& prefix) {
233+
return s3::detail::do_calculate_sizes_for_type_impl(key_type, root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, prefix_handler, prefix);
234+
}
235+
228236
} //namespace arcticdb::storage::nfs_backed

cpp/arcticdb/storage/s3/nfs_backed_storage.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ class NfsBackedStorage final : public Storage {
3232

3333
std::string name() const final;
3434

35+
bool supports_object_size_calculation() const final override;
36+
3537
private:
3638
void do_write(KeySegmentPair& key_seg) final;
3739

@@ -51,6 +53,8 @@ class NfsBackedStorage final : public Storage {
5153

5254
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;
5355

56+
ObjectSizes do_get_object_sizes(KeyType key_type, const std::string& prefix) final;
57+
5458
bool do_key_exists(const VariantKey& key) final;
5559

5660
bool do_supports_prefix_matching() const final {

cpp/arcticdb/storage/s3/s3_client_impl.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -299,11 +299,13 @@ S3Result<ListObjectsOutput> S3ClientImpl::list_objects(
299299
next_continuation_token = {result.GetNextContinuationToken()};
300300

301301
auto s3_object_names = std::vector<std::string>();
302+
auto s3_object_sizes = std::vector<uint64_t>();
302303
for (const auto &s3_object: result.GetContents()) {
303304
s3_object_names.emplace_back(s3_object.GetKey());
305+
s3_object_sizes.emplace_back(s3_object.GetSize());
304306
}
305307

306-
ListObjectsOutput output = {s3_object_names, next_continuation_token};
308+
ListObjectsOutput output = {s3_object_names, s3_object_sizes, next_continuation_token};
307309
return {output};
308310
}
309311

cpp/arcticdb/storage/s3/s3_client_interface.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ using S3Result = StorageResult<Output, Aws::S3::S3Error>;
4949

5050
struct ListObjectsOutput{
5151
std::vector<std::string> s3_object_names;
52+
std::vector<uint64_t> s3_object_sizes;
5253
// next_continuation_token indicates there are more s3_objects to be listed because they didn't fit in one response.
5354
// If set can be used to get the remaining s3_objects.
5455
std::optional<std::string> next_continuation_token;

cpp/arcticdb/storage/s3/s3_storage.cpp

+13-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,15 @@ bool S3Storage::do_iterate_type_until_match(KeyType key_type, const IterateTypeP
105105
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
106106
};
107107

108-
return detail::do_iterate_type_impl(key_type, visitor, root_folder_, bucket_name_, client(), FlatBucketizer{}, std::move(prefix_handler), prefix);
108+
return detail::do_iterate_type_impl(key_type, visitor, root_folder_, bucket_name_, client(), FlatBucketizer{}, prefix_handler, prefix);
109+
}
110+
111+
ObjectSizes S3Storage::do_get_object_sizes(KeyType key_type, const std::string& prefix) {
112+
auto prefix_handler = [] (const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor& key_descriptor, KeyType) {
113+
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
114+
};
115+
116+
return detail::do_calculate_sizes_for_type_impl(key_type, root_folder_, bucket_name_, client(), FlatBucketizer{}, prefix_handler, prefix);
109117
}
110118

111119
bool S3Storage::do_key_exists(const VariantKey& key) {
@@ -183,6 +191,10 @@ S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const S3Set
183191
ARCTICDB_DEBUG(log::storage(), "Opened S3 backed storage at {}", root_folder_);
184192
}
185193

194+
bool S3Storage::supports_object_size_calculation() const {
195+
return true;
196+
}
197+
186198
GCPXMLStorage::GCPXMLStorage(const arcticdb::storage::LibraryPath& lib,
187199
arcticdb::storage::OpenMode mode,
188200
const arcticdb::storage::s3::GCPXMLSettings& conf) :

cpp/arcticdb/storage/s3/s3_storage.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
namespace arcticdb::storage::s3 {
3131

32+
using PrefixHandler = std::function<std::string(const std::string&, const std::string&, const KeyDescriptor&, KeyType)>;
33+
3234
const std::string USE_AWS_CRED_PROVIDERS_TOKEN = "_RBAC_";
3335

3436
class S3Storage : public Storage, AsyncStorage {
@@ -48,6 +50,8 @@ class S3Storage : public Storage, AsyncStorage {
4850
return dynamic_cast<AsyncStorage*>(this);
4951
}
5052

53+
bool supports_object_size_calculation() const final override;
54+
5155
protected:
5256
void do_write(KeySegmentPair& key_seg) final;
5357

@@ -67,6 +71,8 @@ class S3Storage : public Storage, AsyncStorage {
6771

6872
void do_remove(std::span<VariantKey> variant_keys, RemoveOpts opts);
6973

74+
ObjectSizes do_get_object_sizes(KeyType key_type, const std::string& prefix) override final;
75+
7076
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;
7177

7278
bool do_key_exists(const VariantKey& key) final;

0 commit comments

Comments
 (0)