diff --git a/3rd/datasketches/common/CMakeLists.txt b/3rd/datasketches/common/CMakeLists.txt index add757425..3a19e40ac 100644 --- a/3rd/datasketches/common/CMakeLists.txt +++ b/3rd/datasketches/common/CMakeLists.txt @@ -37,9 +37,10 @@ target_sources(common ${CMAKE_CURRENT_SOURCE_DIR}/include/conditional_back_inserter.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/conditional_forward.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/ceiling_power_of_2.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/include/kolmogorov_smirnov.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/include/kolmogorov_smirnov.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/kolmogorov_smirnov_impl.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/quantiles_sorted_view.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/quantiles_sorted_view_impl.hpp - ${CMAKE_CURRENT_SOURCE_DIR}/include/version.hpp.in + ${CMAKE_CURRENT_SOURCE_DIR}/include/optional.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/include/version.hpp.in ) diff --git a/3rd/datasketches/common/include/common_defs.hpp b/3rd/datasketches/common/include/common_defs.hpp index fc80fe022..d8e3e6ca0 100644 --- a/3rd/datasketches/common/include/common_defs.hpp +++ b/3rd/datasketches/common/include/common_defs.hpp @@ -28,27 +28,30 @@ #include #include +/// DataSketches namespace namespace datasketches { static const uint64_t DEFAULT_SEED = 9001; enum resize_factor { X1 = 0, X2, X4, X8 }; -template using AllocChar = typename std::allocator_traits::template rebind_alloc; -template using string = std::basic_string, AllocChar>; - -// thread-safe random bit -static thread_local std::independent_bits_engine - random_bit(static_cast(std::chrono::system_clock::now().time_since_epoch().count() - + std::hash{}(std::this_thread::get_id()))); +template using string = std::basic_string, typename std::allocator_traits::template rebind_alloc>; // common random declarations namespace random_utils { static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2 static thread_local std::mt19937_64 rand(rd()); static thread_local std::uniform_real_distribution<> next_double(0.0, 1.0); -} + // thread-safe random bit + static thread_local std::independent_bits_engine + random_bit(static_cast(std::chrono::system_clock::now().time_since_epoch().count() + + std::hash{}(std::this_thread::get_id()))); + + inline void override_seed(uint64_t s) { + rand.seed(s); + } +} // utility function to hide unused compiler warning // usually has no additional cost diff --git a/3rd/datasketches/common/include/count_zeros.hpp b/3rd/datasketches/common/include/count_zeros.hpp index 51cbc0c90..94316a445 100644 --- a/3rd/datasketches/common/include/count_zeros.hpp +++ b/3rd/datasketches/common/include/count_zeros.hpp @@ -22,8 +22,6 @@ #include -#include - namespace datasketches { static const uint8_t byte_leading_zeros_table[256] = { diff --git a/3rd/datasketches/common/include/kolmogorov_smirnov.hpp b/3rd/datasketches/common/include/kolmogorov_smirnov.hpp index d00853de2..90f226c59 100644 --- a/3rd/datasketches/common/include/kolmogorov_smirnov.hpp +++ b/3rd/datasketches/common/include/kolmogorov_smirnov.hpp @@ -22,13 +22,16 @@ namespace datasketches { +/** + * Kolmogorov-Smirnov test for KLL or Quantiles sketches + */ class kolmogorov_smirnov { public: /** * Computes the raw delta area between two quantile sketches for the Kolmogorov-Smirnov Test. * Will work for a type-matched pair of KLL or Quantiles sketches of the same parameterized type T. - * @param sketch1 KLL sketch 1 - * @param sketch2 KLL sketch 2 + * @param sketch1 sketch 1 + * @param sketch2 sketch 2 * @return the raw delta between two KLL quantile sketches */ template @@ -39,8 +42,8 @@ class kolmogorov_smirnov { * Adjusts the computed threshold by the error epsilons of the two given sketches. * See Kolmogorov–Smirnov Test * Will work for a type-matched pair of KLL or Quantiles sketches of the same parameterized type T. - * @param sketch1 KLL sketch 1 - * @param sketch2 KLL sketch 2 + * @param sketch1 sketch 1 + * @param sketch2 sketch 2 * @param p Target p-value. Typically .001 to .1, e.g., .05. * @return the adjusted threshold to be compared with the raw delta */ @@ -52,8 +55,8 @@ class kolmogorov_smirnov { * Will work for a type-matched pair of KLL or Quantiles sketches of the same parameterized type T. * Note: if the given sketches have insufficient data or if the sketch sizes are too small, * this will return false. - * @param sketch1 KLL sketch 1 - * @param sketch2 KLL sketch 2 + * @param sketch1 sketch 1 + * @param sketch2 sketch 2 * @param p Target p-value. Typically .001 to .1, e.g., .05. * @return Boolean indicating whether we can reject the null hypothesis (that the sketches * reflect the same underlying distribution) using the provided p-value. diff --git a/3rd/datasketches/common/include/optional.hpp b/3rd/datasketches/common/include/optional.hpp new file mode 100644 index 000000000..862b27e04 --- /dev/null +++ b/3rd/datasketches/common/include/optional.hpp @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _OPTIONAL_HPP_ +#define _OPTIONAL_HPP_ + +// This is a simplistic substitute for std::optional until we require C++17 + +#if (__cplusplus >= 201703L || (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)) +#include +using std::optional; +#else + +#include + +namespace datasketches { + +template +class optional { +public: + + optional() noexcept: initialized_(false) {} + + optional(const T& value) noexcept(std::is_nothrow_copy_constructible::value) { + new (&value_) T(value); + initialized_ = true; + } + + optional(T&& value) noexcept(std::is_nothrow_move_constructible::value) { + new (&value_) T(std::move(value)); + initialized_ = true; + } + + // conversion from compatible types + template + optional(const optional& other) noexcept(std::is_nothrow_constructible::value): initialized_(false) { + if (other.initialized_) { + new (&value_) T(other.value_); + initialized_ = true; + } + } + + optional(const optional& other) noexcept(std::is_nothrow_copy_constructible::value): initialized_(false) { + if (other.initialized_) { + new (&value_) T(other.value_); + initialized_ = true; + } + } + + optional(optional&& other) noexcept(std::is_nothrow_move_constructible::value): initialized_(false) { + if (other.initialized_) { + new (&value_) T(std::move(other.value_)); + initialized_ = true; + } + } + + ~optional() noexcept(std::is_nothrow_destructible::value) { + if (initialized_) value_.~T(); + } + + explicit operator bool() const noexcept { + return initialized_; + } + + optional& operator=(const optional& other) + noexcept(std::is_nothrow_copy_constructible::value && std::is_nothrow_copy_assignable::value) { + if (initialized_) { + if (other.initialized_) { + value_ = other.value_; + } else { + reset(); + } + } else { + if (other.initialized_) { + new (&value_) T(other.value_); + initialized_ = true; + } + } + return *this; + } + + optional& operator=(optional&& other) + noexcept(std::is_nothrow_move_constructible::value && std::is_nothrow_move_assignable::value) { + if (initialized_) { + if (other.initialized_) { + value_ = std::move(other.value_); + } else { + reset(); + } + } else { + if (other.initialized_) { + new (&value_) T(std::move(other.value_)); + initialized_ = true; + } + } + return *this; + } + + template + void emplace(Args&&... args) noexcept(std::is_nothrow_constructible::value) { + new (&value_) T(args...); + initialized_ = true; + } + + T& operator*() & noexcept { return value_; } + const T& operator*() const & noexcept { return value_; } + T&& operator*() && noexcept { return std::move(value_); } + const T&& operator*() const && noexcept { return std::move(value_); } + + T* operator->() noexcept { return &value_; } + const T* operator->() const noexcept { return &value_; } + + void reset() noexcept(std::is_nothrow_destructible::value) { + if (initialized_) value_.~T(); + initialized_ = false; + } + +private: + union { + T value_; + }; + bool initialized_; + + // for converting constructor + template friend class optional; +}; + +} // namespace + +#endif // C++17 + +#endif // _OPTIONAL_HPP_ diff --git a/3rd/datasketches/common/include/quantiles_sorted_view.hpp b/3rd/datasketches/common/include/quantiles_sorted_view.hpp index e965cb179..c3ab6af0b 100755 --- a/3rd/datasketches/common/include/quantiles_sorted_view.hpp +++ b/3rd/datasketches/common/include/quantiles_sorted_view.hpp @@ -27,6 +27,9 @@ namespace datasketches { +/** + * Sorted view for quantiles sketches (REQ, KLL and Quantiles) + */ template< typename T, typename Comparator, // strict weak ordering function (see C++ named requirements: Compare) @@ -34,30 +37,119 @@ template< > class quantiles_sorted_view { public: + /// Entry type using Entry = typename std::conditional::value, std::pair, std::pair>::type; using AllocEntry = typename std::allocator_traits::template rebind_alloc; using Container = std::vector; + /// @private quantiles_sorted_view(uint32_t num, const Comparator& comparator, const Allocator& allocator); + /// @private template void add(Iterator begin, Iterator end, uint64_t weight); + /// @private void convert_to_cummulative(); class const_iterator; + + /** + * Iterator pointing to the first entry in the view. + * If the view is empty, the returned iterator must not be dereferenced or incremented. + * @return iterator pointing to the first entry + */ const_iterator begin() const; + + /** + * Iterator pointing to the past-the-end entry in the view. + * The past-the-end entry is the hypothetical entry that would follow the last entry. + * It does not point to any entry, and must not be dereferenced or incremented. + * @return iterator pointing to the past-the-end entry + */ const_iterator end() const; + /// @return size of the view size_t size() const; + /** + * Returns an approximation to the normalized rank of the given item. + * + *

If the view is empty this throws std::runtime_error. + * + * @param item to be ranked + * @param inclusive if true the weight of the given item is included into the rank. + * Otherwise the rank equals the sum of the weights of all items that are less than the given item + * according to the Comparator. + * + * @return an approximate normalized rank of the given item (0 to 1 inclusive) + */ double get_rank(const T& item, bool inclusive = true) const; + /** + * Quantile return type. + * This is to return quantiles either by value (for arithmetic types) or by const reference (for all other types) + */ using quantile_return_type = typename std::conditional::value, T, const T&>::type; + + /** + * Returns an item from the sketch that is the best approximation to an item + * from the original stream with the given normalized rank. + * + *

If the view is empty this throws std::runtime_error. + * + * @param rank of an item in the hypothetical sorted stream. + * @param inclusive if true, the given rank is considered inclusive (includes weight of an item) + * + * @return approximate quantile associated with the given normalized rank + */ quantile_return_type get_quantile(double rank, bool inclusive = true) const; using vector_double = std::vector::template rebind_alloc>; + + /** + * Returns an approximation to the Cumulative Distribution Function (CDF), which is the + * cumulative analog of the PMF, of the input stream given a set of split points (items). + * + *

If the view is empty this throws std::runtime_error. + * + * @param split_points an array of m unique, monotonically increasing items + * that divide the input domain into m+1 consecutive disjoint intervals. + * + * @param size the number of split points in the array + * + * @param inclusive if true the rank of an item includes its own weight, and therefore + * if the sketch contains items equal to a slit point, then in CDF such items are + * included into the interval to the left of split point. Otherwise they are included into + * the interval to the right of split point. + * + * @return an array of m+1 doubles, which are a consecutive approximation to the CDF + * of the input stream given the split_points. The value at array position j of the returned + * CDF array is the sum of the returned values in positions 0 through j of the returned PMF + * array. This can be viewed as array of ranks of the given split points plus one more value + * that is always 1. + */ vector_double get_CDF(const T* split_points, uint32_t size, bool inclusive = true) const; + + /** + * Returns an approximation to the Probability Mass Function (PMF) of the input stream + * given a set of split points (items). + * + *

If the view is empty this throws std::runtime_error. + * + * @param split_points an array of m unique, monotonically increasing items + * that divide the input domain into m+1 consecutive disjoint intervals (bins). + * + * @param size the number of split points in the array + * + * @param inclusive if true the rank of an item includes its own weight, and therefore + * if the sketch contains items equal to a slit point, then in PMF such items are + * included into the interval to the left of split point. Otherwise they are included into the interval + * to the right of split point. + * + * @return an array of m+1 doubles each of which is an approximation + * to the fraction of the input stream items (the mass) that fall into one of those intervals. + */ vector_double get_PMF(const T* split_points, uint32_t size, bool inclusive = true) const; private: @@ -122,8 +214,6 @@ class quantiles_sorted_view::const_iterator: public quantiles_sorted_vi using Base = typename quantiles_sorted_view::Container::const_iterator; using value_type = typename std::conditional::value, typename Base::value_type, std::pair>::type; - const_iterator(const Base& it, const Base& begin): Base(it), begin(begin) {} - template::value, int>::type = 0> const value_type operator*() const { return Base::operator*(); } @@ -147,6 +237,9 @@ class quantiles_sorted_view::const_iterator: public quantiles_sorted_vi private: Base begin; + + friend class quantiles_sorted_view; + const_iterator(const Base& it, const Base& begin): Base(it), begin(begin) {} }; } /* namespace datasketches */ diff --git a/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp b/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp index 326301e0e..d57cd53fb 100755 --- a/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp +++ b/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp @@ -75,7 +75,7 @@ double quantiles_sorted_view::get_rank(const T& item, bool inclusive) c template auto quantiles_sorted_view::get_quantile(double rank, bool inclusive) const -> quantile_return_type { if (entries_.empty()) throw std::runtime_error("operation is undefined for an empty sketch"); - uint64_t weight = inclusive ? std::ceil(rank * total_weight_) : rank * total_weight_; + uint64_t weight = static_cast(inclusive ? std::ceil(rank * total_weight_) : rank * total_weight_); auto it = inclusive ? std::lower_bound(entries_.begin(), entries_.end(), make_dummy_entry(weight), compare_pairs_by_second()) : std::upper_bound(entries_.begin(), entries_.end(), make_dummy_entry(weight), compare_pairs_by_second()); diff --git a/3rd/datasketches/common/include/serde.hpp b/3rd/datasketches/common/include/serde.hpp index 9b3349bdf..ad20fe635 100644 --- a/3rd/datasketches/common/include/serde.hpp +++ b/3rd/datasketches/common/include/serde.hpp @@ -30,23 +30,56 @@ namespace datasketches { -// serialize and deserialize +/// Interface for serializing and deserializing items template struct serde { - // stream serialization + /** + * Stream serialization + * @param os output stream + * @param items pointer to array of items + * @param num number of items + */ void serialize(std::ostream& os, const T* items, unsigned num) const; - void deserialize(std::istream& is, T* items, unsigned num) const; // items allocated but not initialized - // raw bytes serialization - size_t size_of_item(const T& item) const; + /** + * Stream deserialization + * @param is input stream + * @param items pointer to array of items (items in the array are allocated but not initialized) + * @param num number of items + */ + void deserialize(std::istream& is, T* items, unsigned num) const; + + /** + * Raw bytes serialization + * @param ptr pointer to output buffer + * @param capacity size of the buffer in bytes + * @param items pointer to array of items + * @param num number of items + */ size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num) const; - size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num) const; // items allocated but not initialized + + /** + * Raw bytes deserialization + * @param ptr pointer to input buffer + * @param capacity size of the buffer in bytes + * @param items pointer to array of items (items in the array are allocated but not initialized) + * @param num number of items + */ + size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num) const; + + /** + * Size of the given item + * @param item to be sized + * @return size of the given item in bytes + */ + size_t size_of_item(const T& item) const; }; -// serde for all fixed-size arithmetic types (int and float of different sizes) -// in particular, kll_sketch should produce sketches binary-compatible -// with LongsSketch and ItemsSketch with ArrayOfLongsSerDe in Java +/// serde for all fixed-size arithmetic types (int and float of different sizes). +/// in particular, kll_sketch should produce sketches binary-compatible +/// with LongsSketch and ItemsSketch with ArrayOfLongsSerDe in Java template struct serde::value>::type> { + /// @copydoc serde::serialize void serialize(std::ostream& os, const T* items, unsigned num) const { bool failure = false; try { @@ -58,6 +91,7 @@ struct serde::value>::type> { throw std::runtime_error("error writing to std::ostream with " + std::to_string(num) + " items"); } } + void deserialize(std::istream& is, T* items, unsigned num) const { bool failure = false; try { @@ -70,30 +104,37 @@ struct serde::value>::type> { } } - size_t size_of_item(const T&) const { - return sizeof(T); - } + /// @copydoc serde::serialize(void*,size_t,const T*,unsigned) const size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num) const { const size_t bytes_written = sizeof(T) * num; check_memory_size(bytes_written, capacity); memcpy(ptr, items, bytes_written); return bytes_written; } + + /// @copydoc serde::deserialize(const void*,size_t,T*,unsigned) const size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num) const { const size_t bytes_read = sizeof(T) * num; check_memory_size(bytes_read, capacity); memcpy(items, ptr, bytes_read); return bytes_read; } + + /// @copydoc serde::size_of_item + size_t size_of_item(const T& item) const { + unused(item); + return sizeof(T); + } }; -// serde for std::string items -// This should produce sketches binary-compatible with -// ItemsSketch with ArrayOfStringsSerDe in Java. -// The length of each string is stored as a 32-bit integer (historically), -// which may be too wasteful. Treat this as an example. +/// serde for std::string items. +/// This should produce sketches binary-compatible with +/// ItemsSketch with ArrayOfStringsSerDe in Java. +/// The length of each string is stored as a 32-bit integer (historically), +/// which may be too wasteful. Treat this as an example. template<> struct serde { + /// @copydoc serde::serialize void serialize(std::ostream& os, const std::string* items, unsigned num) const { unsigned i = 0; bool failure = false; @@ -110,6 +151,8 @@ struct serde { throw std::runtime_error("error writing to std::ostream at item " + std::to_string(i)); } } + + /// @copydoc serde::deserialize void deserialize(std::istream& is, std::string* items, unsigned num) const { unsigned i = 0; bool failure = false; @@ -137,9 +180,8 @@ struct serde { throw std::runtime_error("error reading from std::istream at item " + std::to_string(i)); } } - size_t size_of_item(const std::string& item) const { - return sizeof(uint32_t) + item.size(); - } + + /// @copydoc serde::serialize(void*,size_t,const T*,unsigned) const size_t serialize(void* ptr, size_t capacity, const std::string* items, unsigned num) const { size_t bytes_written = 0; for (unsigned i = 0; i < num; ++i) { @@ -154,6 +196,8 @@ struct serde { } return bytes_written; } + + /// @copydoc serde::deserialize(const void*,size_t,T*,unsigned) const size_t deserialize(const void* ptr, size_t capacity, std::string* items, unsigned num) const { size_t bytes_read = 0; unsigned i = 0; @@ -189,6 +233,11 @@ struct serde { return bytes_read; } + + /// @copydoc serde::size_of_item + size_t size_of_item(const std::string& item) const { + return sizeof(uint32_t) + item.size(); + } }; } /* namespace datasketches */ diff --git a/3rd/datasketches/cpc/include/cpc_common.hpp b/3rd/datasketches/cpc/include/cpc_common.hpp index feb1e15a1..4b94975f9 100644 --- a/3rd/datasketches/cpc/include/cpc_common.hpp +++ b/3rd/datasketches/cpc/include/cpc_common.hpp @@ -26,45 +26,39 @@ namespace datasketches { +/// CPC constants namespace cpc_constants { - const uint8_t MIN_LG_K = 4; - const uint8_t MAX_LG_K = 26; - const uint8_t DEFAULT_LG_K = 11; + /// min log2 of K + const uint8_t MIN_LG_K = 4; + /// max log2 of K + const uint8_t MAX_LG_K = 26; + /// default log2 of K + const uint8_t DEFAULT_LG_K = 11; } -// TODO: Redundant and deprecated. Will be removed in next major version release. -static const uint8_t CPC_MIN_LG_K = cpc_constants::MIN_LG_K; -static const uint8_t CPC_MAX_LG_K = cpc_constants::MAX_LG_K; -static const uint8_t CPC_DEFAULT_LG_K = cpc_constants::DEFAULT_LG_K; - -template using AllocU8 = typename std::allocator_traits::template rebind_alloc; -template using AllocU16 = typename std::allocator_traits::template rebind_alloc; -template using AllocU32 = typename std::allocator_traits::template rebind_alloc; -template using AllocU64 = typename std::allocator_traits::template rebind_alloc; - -template using vector_u8 = std::vector>; -template using vector_u32 = std::vector>; -template using vector_u64 = std::vector>; - // forward declaration template class u32_table; template struct compressed_state { + using vector_u32 = std::vector::template rebind_alloc>; + explicit compressed_state(const A& allocator): table_data(allocator), table_data_words(0), table_num_entries(0), window_data(allocator), window_data_words(0) {} - vector_u32 table_data; + vector_u32 table_data; uint32_t table_data_words; uint32_t table_num_entries; // can be different from the number of entries in the sketch in hybrid mode - vector_u32 window_data; + vector_u32 window_data; uint32_t window_data_words; }; template struct uncompressed_state { + using vector_bytes = std::vector::template rebind_alloc>; + explicit uncompressed_state(const A& allocator): table(allocator), window(allocator) {} u32_table table; - vector_u8 window; + vector_bytes window; }; } /* namespace datasketches */ diff --git a/3rd/datasketches/cpc/include/cpc_compressor.hpp b/3rd/datasketches/cpc/include/cpc_compressor.hpp index a8f426f51..ffcf776af 100644 --- a/3rd/datasketches/cpc/include/cpc_compressor.hpp +++ b/3rd/datasketches/cpc/include/cpc_compressor.hpp @@ -47,6 +47,9 @@ inline cpc_compressor& get_compressor(); template class cpc_compressor { public: + using vector_bytes = std::vector::template rebind_alloc>; + using vector_u32 = std::vector::template rebind_alloc>; + void compress(const cpc_sketch_alloc& source, compressed_state& target) const; void uncompress(const compressed_state& source, uncompressed_state& target, uint8_t lg_k, uint32_t num_coupons) const; @@ -126,17 +129,17 @@ class cpc_compressor { uint16_t* make_decoding_table(const uint16_t* encoding_table, unsigned num_byte_values); void validate_decoding_table(const uint16_t* decoding_table, const uint16_t* encoding_table) const; - void compress_surprising_values(const vector_u32& pairs, uint8_t lg_k, compressed_state& result) const; + void compress_surprising_values(const vector_u32& pairs, uint8_t lg_k, compressed_state& result) const; void compress_sliding_window(const uint8_t* window, uint8_t lg_k, uint32_t num_coupons, compressed_state& target) const; - vector_u32 uncompress_surprising_values(const uint32_t* data, uint32_t data_words, uint32_t num_pairs, uint8_t lg_k, const A& allocator) const; - void uncompress_sliding_window(const uint32_t* data, uint32_t data_words, vector_u8& window, uint8_t lg_k, uint32_t num_coupons) const; + vector_u32 uncompress_surprising_values(const uint32_t* data, uint32_t data_words, uint32_t num_pairs, uint8_t lg_k, const A& allocator) const; + void uncompress_sliding_window(const uint32_t* data, uint32_t data_words, vector_bytes& window, uint8_t lg_k, uint32_t num_coupons) const; static size_t safe_length_for_compressed_pair_buf(uint32_t k, uint32_t num_pairs, uint8_t num_base_bits); static size_t safe_length_for_compressed_window_buf(uint32_t k); static uint8_t determine_pseudo_phase(uint8_t lg_k, uint32_t c); - static inline vector_u32 tricky_get_pairs_from_window(const uint8_t* window, uint32_t k, uint32_t num_pairs_to_get, uint32_t empty_space, const A& allocator); + static inline vector_u32 tricky_get_pairs_from_window(const uint8_t* window, uint32_t k, uint32_t num_pairs_to_get, uint32_t empty_space, const A& allocator); static inline uint8_t golomb_choose_number_of_base_bits(uint32_t k, uint64_t count); }; diff --git a/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp b/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp index 7f323be7c..e1e75d32a 100644 --- a/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp +++ b/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp @@ -183,7 +183,7 @@ void cpc_compressor::uncompress(const compressed_state& source, uncompress template void cpc_compressor::compress_sparse_flavor(const cpc_sketch_alloc& source, compressed_state& result) const { if (source.sliding_window.size() > 0) throw std::logic_error("unexpected sliding window"); - vector_u32 pairs = source.surprising_value_table.unwrapping_get_items(); + vector_u32 pairs = source.surprising_value_table.unwrapping_get_items(); u32_table::introspective_insertion_sort(pairs.data(), 0, pairs.size()); compress_surprising_values(pairs, source.get_lg_k(), result); } @@ -192,7 +192,7 @@ template void cpc_compressor::uncompress_sparse_flavor(const compressed_state& source, uncompressed_state& target, uint8_t lg_k) const { if (source.window_data.size() > 0) throw std::logic_error("unexpected sliding window"); if (source.table_data.size() == 0) throw std::logic_error("table is expected"); - vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, source.table_num_entries, + vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, source.table_num_entries, lg_k, source.table_data.get_allocator()); target.table = u32_table::make_from_pairs(pairs.data(), source.table_num_entries, lg_k, pairs.get_allocator()); } @@ -204,12 +204,12 @@ void cpc_compressor::compress_hybrid_flavor(const cpc_sketch_alloc& source if (source.sliding_window.size() == 0) throw std::logic_error("no sliding window"); if (source.window_offset != 0) throw std::logic_error("window_offset != 0"); const uint32_t k = 1 << source.get_lg_k(); - vector_u32 pairs_from_table = source.surprising_value_table.unwrapping_get_items(); + vector_u32 pairs_from_table = source.surprising_value_table.unwrapping_get_items(); const uint32_t num_pairs_from_table = static_cast(pairs_from_table.size()); if (num_pairs_from_table > 0) u32_table::introspective_insertion_sort(pairs_from_table.data(), 0, num_pairs_from_table); const uint32_t num_pairs_from_window = source.get_num_coupons() - num_pairs_from_table; // because the window offset is zero - vector_u32 all_pairs = tricky_get_pairs_from_window(source.sliding_window.data(), k, num_pairs_from_window, num_pairs_from_table, source.get_allocator()); + vector_u32 all_pairs = tricky_get_pairs_from_window(source.sliding_window.data(), k, num_pairs_from_window, num_pairs_from_table, source.get_allocator()); u32_table::merge( pairs_from_table.data(), 0, pairs_from_table.size(), @@ -224,7 +224,7 @@ template void cpc_compressor::uncompress_hybrid_flavor(const compressed_state& source, uncompressed_state& target, uint8_t lg_k) const { if (source.window_data.size() > 0) throw std::logic_error("window is not expected"); if (source.table_data.size() == 0) throw std::logic_error("table is expected"); - vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, source.table_num_entries, + vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, source.table_num_entries, lg_k, source.table_data.get_allocator()); // In the hybrid flavor, some of these pairs actually @@ -250,7 +250,7 @@ void cpc_compressor::uncompress_hybrid_flavor(const compressed_state& sour template void cpc_compressor::compress_pinned_flavor(const cpc_sketch_alloc& source, compressed_state& result) const { compress_sliding_window(source.sliding_window.data(), source.get_lg_k(), source.get_num_coupons(), result); - vector_u32 pairs = source.surprising_value_table.unwrapping_get_items(); + vector_u32 pairs = source.surprising_value_table.unwrapping_get_items(); if (pairs.size() > 0) { // Here we subtract 8 from the column indices. Because they are stored in the low 6 bits // of each row_col pair, and because no column index is less than 8 for a "Pinned" sketch, @@ -277,7 +277,7 @@ void cpc_compressor::uncompress_pinned_flavor(const compressed_state& sour target.table = u32_table(2, 6 + lg_k, source.table_data.get_allocator()); } else { if (source.table_data.size() == 0) throw std::logic_error("table is expected"); - vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, num_pairs, + vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, num_pairs, lg_k, source.table_data.get_allocator()); // undo the compressor's 8-column shift for (uint32_t i = 0; i < num_pairs; i++) { @@ -291,7 +291,7 @@ void cpc_compressor::uncompress_pinned_flavor(const compressed_state& sour template void cpc_compressor::compress_sliding_flavor(const cpc_sketch_alloc& source, compressed_state& result) const { compress_sliding_window(source.sliding_window.data(), source.get_lg_k(), source.get_num_coupons(), result); - vector_u32 pairs = source.surprising_value_table.unwrapping_get_items(); + vector_u32 pairs = source.surprising_value_table.unwrapping_get_items(); if (pairs.size() > 0) { // Here we apply a complicated transformation to the column indices, which // changes the implied ordering of the pairs, so we must do it before sorting. @@ -330,7 +330,7 @@ void cpc_compressor::uncompress_sliding_flavor(const compressed_state& sou target.table = u32_table(2, 6 + lg_k, source.table_data.get_allocator()); } else { if (source.table_data.size() == 0) throw std::logic_error("table is expected"); - vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, num_pairs, + vector_u32 pairs = uncompress_surprising_values(source.table_data.data(), source.table_data_words, num_pairs, lg_k, source.table_data.get_allocator()); const uint8_t pseudo_phase = determine_pseudo_phase(lg_k, num_coupons); @@ -356,7 +356,7 @@ void cpc_compressor::uncompress_sliding_flavor(const compressed_state& sou } template -void cpc_compressor::compress_surprising_values(const vector_u32& pairs, uint8_t lg_k, compressed_state& result) const { +void cpc_compressor::compress_surprising_values(const vector_u32& pairs, uint8_t lg_k, compressed_state& result) const { const uint32_t k = 1 << lg_k; const uint32_t num_pairs = static_cast(pairs.size()); const uint8_t num_base_bits = golomb_choose_number_of_base_bits(k + num_pairs, num_pairs); @@ -374,10 +374,10 @@ void cpc_compressor::compress_surprising_values(const vector_u32& pairs, u } template -vector_u32 cpc_compressor::uncompress_surprising_values(const uint32_t* data, uint32_t data_words, uint32_t num_pairs, - uint8_t lg_k, const A& allocator) const { +auto cpc_compressor::uncompress_surprising_values(const uint32_t* data, uint32_t data_words, uint32_t num_pairs, + uint8_t lg_k, const A& allocator) const -> vector_u32 { const uint32_t k = 1 << lg_k; - vector_u32 pairs(num_pairs, 0, allocator); + vector_u32 pairs(num_pairs, 0, allocator); const uint8_t num_base_bits = golomb_choose_number_of_base_bits(k + num_pairs, num_pairs); low_level_uncompress_pairs(pairs.data(), num_pairs, num_base_bits, data, data_words); return pairs; @@ -399,7 +399,7 @@ void cpc_compressor::compress_sliding_window(const uint8_t* window, uint8_t l } template -void cpc_compressor::uncompress_sliding_window(const uint32_t* data, uint32_t data_words, vector_u8& window, +void cpc_compressor::uncompress_sliding_window(const uint32_t* data, uint32_t data_words, vector_bytes& window, uint8_t lg_k, uint32_t num_coupons) const { const uint32_t k = 1 << lg_k; window.resize(k); // zeroing not needed here (unlike the Hybrid Flavor) @@ -722,10 +722,10 @@ void write_unary( // The empty space that this leaves at the beginning of the output array // will be filled in later by the caller. template -vector_u32 cpc_compressor::tricky_get_pairs_from_window(const uint8_t* window, uint32_t k, uint32_t num_pairs_to_get, - uint32_t empty_space, const A& allocator) { +auto cpc_compressor::tricky_get_pairs_from_window(const uint8_t* window, uint32_t k, uint32_t num_pairs_to_get, + uint32_t empty_space, const A& allocator) -> vector_u32 { const size_t output_length = empty_space + num_pairs_to_get; - vector_u32 pairs(output_length, 0, allocator); + vector_u32 pairs(output_length, 0, allocator); size_t pair_index = empty_space; for (unsigned row_index = 0; row_index < k; row_index++) { uint8_t byte = window[row_index]; diff --git a/3rd/datasketches/cpc/include/cpc_sketch.hpp b/3rd/datasketches/cpc/include/cpc_sketch.hpp index c000ac90b..b35e528d1 100644 --- a/3rd/datasketches/cpc/include/cpc_sketch.hpp +++ b/3rd/datasketches/cpc/include/cpc_sketch.hpp @@ -33,58 +33,58 @@ namespace datasketches { -/* - * High performance C++ implementation of Compressed Probabilistic Counting (CPC) Sketch - * - * This is a very compact (in serialized form) distinct counting sketch. - * The theory is described in the following paper: - * https://arxiv.org/abs/1708.06839 - * - * author Kevin Lang - * author Alexander Saydakov - */ - -// forward-declarations +// forward declarations template class cpc_sketch_alloc; template class cpc_union_alloc; -// alias with default allocator for convenience +/// CPC sketch alias with default allocator using cpc_sketch = cpc_sketch_alloc>; -// allocation and initialization of global decompression (decoding) tables -// call this before anything else if you want to control the initialization time -// for instance, to have this happen outside of a transaction context -// otherwise initialization happens on the first use (serialization or deserialization) -// it is safe to call more than once assuming no race conditions -// this is not thread safe! neither is the rest of the library +/** + * Allocation and initialization of global decompression (decoding) tables. + * Call this before anything else if you want to control the initialization time. + * For instance, to have this happen outside of a transaction context. + * Otherwise initialization happens on the first use (serialization or deserialization). + * It is safe to call more than once assuming no race conditions. + * This is not thread safe! Neither is the rest of the library. + */ template void cpc_init(); +/** + * High performance C++ implementation of Compressed Probabilistic Counting (CPC) Sketch + * + * This is a very compact (in serialized form) distinct counting sketch. + * The theory is described in the following paper: + * https://arxiv.org/abs/1708.06839 + * + * @author Kevin Lang + * @author Alexander Saydakov + */ template class cpc_sketch_alloc { public: + using allocator_type = A; + using vector_bytes = std::vector::template rebind_alloc>; + using vector_u64 = std::vector::template rebind_alloc>; + /** * Creates an instance of the sketch given the lg_k parameter and hash seed. * @param lg_k base 2 logarithm of the number of bins in the sketch * @param seed for hash function + * @param allocator instance of an allocator */ explicit cpc_sketch_alloc(uint8_t lg_k = cpc_constants::DEFAULT_LG_K, uint64_t seed = DEFAULT_SEED, const A& allocator = A()); - using allocator_type = A; + /// @return allocator A get_allocator() const; - /** - * @return configured lg_k of this sketch - */ + /// @return configured lg_k of this sketch uint8_t get_lg_k() const; - /** - * @return true if this sketch represents an empty set - */ + /// @return true if this sketch represents an empty set bool is_empty() const; - /** - * @return estimate of the distinct count of the input stream - */ + /// @return estimate of the distinct count of the input stream double get_estimate() const; /** @@ -189,13 +189,14 @@ class cpc_sketch_alloc { * Otherwise two sketches that should represent overlapping sets will be disjoint * For instance, for signed 32-bit values call update(int32_t) method above, * which does widening conversion to int64_t, if compatibility with Java is expected - * @param data pointer to the data - * @param length of the data in bytes + * @param value pointer to the data + * @param size of the data in bytes */ void update(const void* value, size_t size); /** * Returns a human-readable summary of this sketch + * @return a human-readable summary of this sketch */ string to_string() const; @@ -205,16 +206,13 @@ class cpc_sketch_alloc { */ void serialize(std::ostream& os) const; - // This is a convenience alias for users - // The type returned by the following serialize method - using vector_bytes = vector_u8; - /** * This method serializes the sketch as a vector of bytes. * An optional header can be reserved in front of the sketch. * It is an uninitialized space of a given size. * This header is used in Datasketches PostgreSQL extension. * @param header_size_bytes space to reserve in front of the sketch + * @return serialized sketch as a vector of bytes */ vector_bytes serialize(unsigned header_size_bytes = 0) const; @@ -222,6 +220,7 @@ class cpc_sketch_alloc { * This method deserializes a sketch from a given stream. * @param is input stream * @param seed the seed for the hash function that was used to create the sketch + * @param allocator instance of an Allocator * @return an instance of a sketch */ static cpc_sketch_alloc deserialize(std::istream& is, uint64_t seed = DEFAULT_SEED, const A& allocator = A()); @@ -231,6 +230,7 @@ class cpc_sketch_alloc { * @param bytes pointer to the array of bytes * @param size the size of the array * @param seed the seed for the hash function that was used to create the sketch + * @param allocator instance of an Allocator * @return an instance of the sketch */ static cpc_sketch_alloc deserialize(const void* bytes, size_t size, uint64_t seed = DEFAULT_SEED, const A& allocator = A()); @@ -246,10 +246,10 @@ class cpc_sketch_alloc { */ static size_t get_max_serialized_size_bytes(uint8_t lg_k); - // for internal use + /// @private for internal use uint32_t get_num_coupons() const; - // for debugging + /// @private for debugging // this should catch some forms of corruption during serialization-deserialization bool validate() const; @@ -276,7 +276,7 @@ class cpc_sketch_alloc { uint32_t num_coupons; // the number of coupons collected so far u32_table surprising_value_table; - vector_u8 sliding_window; + vector_bytes sliding_window; uint8_t window_offset; // derivable from num_coupons, but made explicit for speed uint8_t first_interesting_column; // This is part of a speed optimization @@ -285,7 +285,7 @@ class cpc_sketch_alloc { // for deserialization and cpc_union::get_result() cpc_sketch_alloc(uint8_t lg_k, uint32_t num_coupons, uint8_t first_interesting_column, u32_table&& table, - vector_u8&& window, bool has_hip, double kxp, double hip_est_accum, uint64_t seed); + vector_bytes&& window, bool has_hip, double kxp, double hip_est_accum, uint64_t seed); inline void row_col_update(uint32_t row_col); inline void update_sparse(uint32_t row_col); @@ -308,7 +308,7 @@ class cpc_sketch_alloc { static inline uint8_t determine_correct_offset(uint8_t lg_k, uint64_t c); // this produces a full-size k-by-64 bit matrix - vector_u64 build_bit_matrix() const; + vector_u64 build_bit_matrix() const; static uint8_t get_preamble_ints(uint32_t num_coupons, bool has_hip, bool has_table, bool has_window); inline void write_hip(std::ostream& os) const; diff --git a/3rd/datasketches/cpc/include/cpc_sketch_impl.hpp b/3rd/datasketches/cpc/include/cpc_sketch_impl.hpp index c5f467e88..84709cdc1 100644 --- a/3rd/datasketches/cpc/include/cpc_sketch_impl.hpp +++ b/3rd/datasketches/cpc/include/cpc_sketch_impl.hpp @@ -315,7 +315,7 @@ void cpc_sketch_alloc::move_window() { const uint32_t k = 1 << lg_k; // Construct the full-sized bit matrix that corresponds to the sketch - vector_u64 bit_matrix = build_bit_matrix(); + vector_u64 bit_matrix = build_bit_matrix(); // refresh the KXP register on every 8th window shift. if ((new_offset & 0x7) == 0) refresh_kxp(bit_matrix.data()); @@ -458,7 +458,7 @@ void cpc_sketch_alloc::serialize(std::ostream& os) const { } template -vector_u8 cpc_sketch_alloc::serialize(unsigned header_size_bytes) const { +auto cpc_sketch_alloc::serialize(unsigned header_size_bytes) const -> vector_bytes { compressed_state compressed(sliding_window.get_allocator()); compressed.table_data_words = 0; compressed.table_num_entries = 0; @@ -469,7 +469,7 @@ vector_u8 cpc_sketch_alloc::serialize(unsigned header_size_bytes) const { const bool has_window = compressed.window_data.size() > 0; const uint8_t preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window); const size_t size = header_size_bytes + (preamble_ints + compressed.table_data_words + compressed.window_data_words) * sizeof(uint32_t); - vector_u8 bytes(size, 0, sliding_window.get_allocator()); + vector_bytes bytes(size, 0, sliding_window.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; ptr += copy_to_mem(preamble_ints, ptr); const uint8_t serial_version = SERIAL_VERSION; @@ -712,15 +712,18 @@ static const size_t CPC_MAX_PREAMBLE_SIZE_BYTES = 40; template size_t cpc_sketch_alloc::get_max_serialized_size_bytes(uint8_t lg_k) { check_lg_k(lg_k); - if (lg_k <= CPC_EMPIRICAL_SIZE_MAX_LGK) return CPC_EMPIRICAL_MAX_SIZE_BYTES[lg_k - CPC_MIN_LG_K] + CPC_MAX_PREAMBLE_SIZE_BYTES; + if (lg_k <= CPC_EMPIRICAL_SIZE_MAX_LGK) { + return CPC_EMPIRICAL_MAX_SIZE_BYTES[lg_k - cpc_constants::MIN_LG_K] + CPC_MAX_PREAMBLE_SIZE_BYTES; + } const uint32_t k = 1 << lg_k; return (int) (CPC_EMPIRICAL_MAX_SIZE_FACTOR * k) + CPC_MAX_PREAMBLE_SIZE_BYTES; } template void cpc_sketch_alloc::check_lg_k(uint8_t lg_k) { - if (lg_k < CPC_MIN_LG_K || lg_k > CPC_MAX_LG_K) { - throw std::invalid_argument("lg_k must be >= " + std::to_string(CPC_MIN_LG_K) + " and <= " + std::to_string(CPC_MAX_LG_K) + ": " + std::to_string(lg_k)); + if (lg_k < cpc_constants::MIN_LG_K || lg_k > cpc_constants::MAX_LG_K) { + throw std::invalid_argument("lg_k must be >= " + std::to_string(cpc_constants::MIN_LG_K) + " and <= " + + std::to_string(cpc_constants::MAX_LG_K) + ": " + std::to_string(lg_k)); } } @@ -731,14 +734,14 @@ uint32_t cpc_sketch_alloc::get_num_coupons() const { template bool cpc_sketch_alloc::validate() const { - vector_u64 bit_matrix = build_bit_matrix(); + vector_u64 bit_matrix = build_bit_matrix(); const uint64_t num_bits_set = count_bits_set_in_matrix(bit_matrix.data(), 1ULL << lg_k); return num_bits_set == num_coupons; } template cpc_sketch_alloc::cpc_sketch_alloc(uint8_t lg_k, uint32_t num_coupons, uint8_t first_interesting_column, - u32_table&& table, vector_u8&& window, bool has_hip, double kxp, double hip_est_accum, uint64_t seed): + u32_table&& table, vector_bytes&& window, bool has_hip, double kxp, double hip_est_accum, uint64_t seed): lg_k(lg_k), seed(seed), was_merged(!has_hip), @@ -800,14 +803,14 @@ uint8_t cpc_sketch_alloc::determine_correct_offset(uint8_t lg_k, uint64_t c) } template -vector_u64 cpc_sketch_alloc::build_bit_matrix() const { +auto cpc_sketch_alloc::build_bit_matrix() const -> vector_u64 { const uint32_t k = 1 << lg_k; if (window_offset > 56) throw std::logic_error("offset > 56"); // Fill the matrix with default rows in which the "early zone" is filled with ones. // This is essential for the routine's O(k) time cost (as opposed to O(C)). const uint64_t default_row = (static_cast(1) << window_offset) - 1; - vector_u64 matrix(k, default_row, sliding_window.get_allocator()); + vector_u64 matrix(k, default_row, sliding_window.get_allocator()); if (num_coupons == 0) return matrix; diff --git a/3rd/datasketches/cpc/include/cpc_union.hpp b/3rd/datasketches/cpc/include/cpc_union.hpp index 54fbed57f..08341dd31 100644 --- a/3rd/datasketches/cpc/include/cpc_union.hpp +++ b/3rd/datasketches/cpc/include/cpc_union.hpp @@ -27,31 +27,55 @@ namespace datasketches { -/* +/// CPC union alias with default allocator +using cpc_union = cpc_union_alloc>; + +/** * High performance C++ implementation of Compressed Probabilistic Counting (CPC) Union * * author Kevin Lang * author Alexander Saydakov */ - -// alias with default allocator for convenience -using cpc_union = cpc_union_alloc>; - template class cpc_union_alloc { public: + using vector_bytes = std::vector::template rebind_alloc>; + using vector_u64 = std::vector::template rebind_alloc>; + /** * Creates an instance of the union given the lg_k parameter and hash seed. * @param lg_k base 2 logarithm of the number of bins in the sketch * @param seed for hash function + * @param allocator instance of an allocator */ explicit cpc_union_alloc(uint8_t lg_k = cpc_constants::DEFAULT_LG_K, uint64_t seed = DEFAULT_SEED, const A& allocator = A()); + /** + * Copy constructor + * @param other union to be copied + */ cpc_union_alloc(const cpc_union_alloc& other); + + /** + * Move constructor + * @param other union to be moved + */ cpc_union_alloc(cpc_union_alloc&& other) noexcept; + ~cpc_union_alloc(); + /** + * Copy assignment + * @param other union to be copied + * @return reference to this union + */ cpc_union_alloc& operator=(const cpc_union_alloc& other); + + /** + * Move assignment + * @param other union to be moved + * @return reference to this union + */ cpc_union_alloc& operator=(cpc_union_alloc&& other) noexcept; /** @@ -73,14 +97,14 @@ class cpc_union_alloc { cpc_sketch_alloc get_result() const; private: - typedef typename std::allocator_traits::template rebind_alloc AllocU8; - typedef typename std::allocator_traits::template rebind_alloc AllocU64; - typedef typename std::allocator_traits::template rebind_alloc> AllocCpc; + using AllocU8 = typename std::allocator_traits::template rebind_alloc; + using AllocU64 = typename std::allocator_traits::template rebind_alloc; + using AllocCpc = typename std::allocator_traits::template rebind_alloc>; uint8_t lg_k; uint64_t seed; cpc_sketch_alloc* accumulator; - vector_u64 bit_matrix; + vector_u64 bit_matrix; template void internal_update(S&& sketch); // to support both rvalue and lvalue @@ -90,8 +114,8 @@ class cpc_union_alloc { void switch_to_bit_matrix(); void walk_table_updating_sketch(const u32_table& table); void or_table_into_matrix(const u32_table& table); - void or_window_into_matrix(const vector_u8& sliding_window, uint8_t offset, uint8_t src_lg_k); - void or_matrix_into_matrix(const vector_u64& src_matrix, uint8_t src_lg_k); + void or_window_into_matrix(const vector_bytes& sliding_window, uint8_t offset, uint8_t src_lg_k); + void or_matrix_into_matrix(const vector_u64& src_matrix, uint8_t src_lg_k); void reduce_k(uint8_t new_lg_k); }; diff --git a/3rd/datasketches/cpc/include/cpc_union_impl.hpp b/3rd/datasketches/cpc/include/cpc_union_impl.hpp index 5865cf515..f277107f0 100644 --- a/3rd/datasketches/cpc/include/cpc_union_impl.hpp +++ b/3rd/datasketches/cpc/include/cpc_union_impl.hpp @@ -33,8 +33,8 @@ seed(seed), accumulator(nullptr), bit_matrix(allocator) { - if (lg_k < CPC_MIN_LG_K || lg_k > CPC_MAX_LG_K) { - throw std::invalid_argument("lg_k must be >= " + std::to_string(CPC_MIN_LG_K) + " and <= " + std::to_string(CPC_MAX_LG_K) + ": " + std::to_string(lg_k)); + if (lg_k < cpc_constants::MIN_LG_K || lg_k > cpc_constants::MAX_LG_K) { + throw std::invalid_argument("lg_k must be >= " + std::to_string(cpc_constants::MIN_LG_K) + " and <= " + std::to_string(cpc_constants::MAX_LG_K) + ": " + std::to_string(lg_k)); } accumulator = new (AllocCpc(allocator).allocate(1)) cpc_sketch_alloc(lg_k, seed, allocator); } @@ -166,7 +166,7 @@ void cpc_union_alloc::internal_update(S&& sketch) { // SLIDING mode involves inverted logic, so we can't just walk the source sketch. // Instead, we convert it to a bitMatrix that can be OR'ed into the destination. if (cpc_sketch_alloc::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); // Case D - vector_u64 src_matrix = sketch.build_bit_matrix(); + vector_u64 src_matrix = sketch.build_bit_matrix(); or_matrix_into_matrix(src_matrix, sketch.get_lg_k()); } @@ -203,7 +203,7 @@ cpc_sketch_alloc cpc_union_alloc::get_result_from_bit_matrix() const { const uint8_t offset = cpc_sketch_alloc::determine_correct_offset(lg_k, num_coupons); - vector_u8 sliding_window(k, 0, bit_matrix.get_allocator()); + vector_bytes sliding_window(k, 0, bit_matrix.get_allocator()); // don't need to zero the window's memory // dynamically growing caused snowplow effect @@ -289,7 +289,7 @@ void cpc_union_alloc::or_table_into_matrix(const u32_table& table) { } template -void cpc_union_alloc::or_window_into_matrix(const vector_u8& sliding_window, uint8_t offset, uint8_t src_lg_k) { +void cpc_union_alloc::or_window_into_matrix(const vector_bytes& sliding_window, uint8_t offset, uint8_t src_lg_k) { if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK"); const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK const uint32_t src_k = 1 << src_lg_k; @@ -299,7 +299,7 @@ void cpc_union_alloc::or_window_into_matrix(const vector_u8& sliding_windo } template -void cpc_union_alloc::or_matrix_into_matrix(const vector_u64& src_matrix, uint8_t src_lg_k) { +void cpc_union_alloc::or_matrix_into_matrix(const vector_u64& src_matrix, uint8_t src_lg_k) { if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK"); const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK const uint32_t src_k = 1 << src_lg_k; @@ -315,10 +315,10 @@ void cpc_union_alloc::reduce_k(uint8_t new_lg_k) { if (bit_matrix.size() > 0) { // downsample the unioner's bit matrix if (accumulator != nullptr) throw std::logic_error("accumulator is not null"); - vector_u64 old_matrix = std::move(bit_matrix); + vector_u64 old_matrix = std::move(bit_matrix); const uint8_t old_lg_k = lg_k; const uint32_t new_k = 1 << new_lg_k; - bit_matrix = vector_u64(new_k, 0, old_matrix.get_allocator()); + bit_matrix = vector_u64(new_k, 0, old_matrix.get_allocator()); lg_k = new_lg_k; or_matrix_into_matrix(old_matrix, old_lg_k); return; diff --git a/3rd/datasketches/cpc/include/u32_table.hpp b/3rd/datasketches/cpc/include/u32_table.hpp index a344a1727..afdea8378 100644 --- a/3rd/datasketches/cpc/include/u32_table.hpp +++ b/3rd/datasketches/cpc/include/u32_table.hpp @@ -38,6 +38,7 @@ static const uint32_t U32_TABLE_DOWNSIZE_DENOM = 4LL; template class u32_table { public: + using vector_u32 = std::vector::template rebind_alloc>; u32_table(const A& allocator); u32_table(uint8_t lg_size, uint8_t num_valid_bits, const A& allocator); @@ -54,7 +55,7 @@ class u32_table { static u32_table make_from_pairs(const uint32_t* pairs, uint32_t num_pairs, uint8_t lg_k, const A& allocator); - vector_u32 unwrapping_get_items() const; + vector_u32 unwrapping_get_items() const; static void merge( const uint32_t* arr_a, size_t start_a, size_t length_a, // input @@ -70,7 +71,7 @@ class u32_table { uint8_t lg_size; // log2 of number of slots uint8_t num_valid_bits; uint32_t num_items; - vector_u32 slots; + vector_u32 slots; inline uint32_t lookup(uint32_t item) const; inline void must_insert(uint32_t item); diff --git a/3rd/datasketches/cpc/include/u32_table_impl.hpp b/3rd/datasketches/cpc/include/u32_table_impl.hpp index a82e7dea1..62cd7dacd 100644 --- a/3rd/datasketches/cpc/include/u32_table_impl.hpp +++ b/3rd/datasketches/cpc/include/u32_table_impl.hpp @@ -151,8 +151,8 @@ void u32_table::rebuild(uint8_t new_lg_size) { const uint32_t old_size = 1 << lg_size; const uint32_t new_size = 1 << new_lg_size; if (new_size <= num_items) throw std::logic_error("new_size <= num_items"); - vector_u32 old_slots = std::move(slots); - slots = vector_u32(new_size, UINT32_MAX, old_slots.get_allocator()); + vector_u32 old_slots = std::move(slots); + slots = vector_u32(new_size, UINT32_MAX, old_slots.get_allocator()); lg_size = new_lg_size; for (uint32_t i = 0; i < old_size; i++) { if (old_slots[i] != UINT32_MAX) { @@ -168,10 +168,10 @@ void u32_table::rebuild(uint8_t new_lg_size) { // and even then the subsequent sort would fix things up. // The result is nearly sorted, so make sure to use an efficient sort for that case template -vector_u32 u32_table::unwrapping_get_items() const { - if (num_items == 0) return vector_u32(slots.get_allocator()); +auto u32_table::unwrapping_get_items() const -> vector_u32 { + if (num_items == 0) return vector_u32(slots.get_allocator()); const uint32_t table_size = 1 << lg_size; - vector_u32 result(num_items, 0, slots.get_allocator()); + vector_u32 result(num_items, 0, slots.get_allocator()); size_t i = 0; size_t l = 0; size_t r = num_items - 1; diff --git a/3rd/datasketches/fi/include/frequent_items_sketch.hpp b/3rd/datasketches/fi/include/frequent_items_sketch.hpp index 02a08540b..0aa9514cb 100644 --- a/3rd/datasketches/fi/include/frequent_items_sketch.hpp +++ b/3rd/datasketches/fi/include/frequent_items_sketch.hpp @@ -32,15 +32,19 @@ namespace datasketches { -/* +/// Frequent items error type +enum frequent_items_error_type { + NO_FALSE_POSITIVES, ///< include an item in the result list if get_lower_bound(item) > threshold + NO_FALSE_NEGATIVES ///< include an item in the result list if get_upper_bound(item) > threshold +}; + +/** + * Frequent Items sketch. + * * Based on Java implementation here: * https://github.com/apache/datasketches-java/blob/master/src/main/java/org/apache/datasketches/frequencies/ItemsSketch.java - * author Alexander Saydakov + * @author Alexander Saydakov */ - -enum frequent_items_error_type { NO_FALSE_POSITIVES, NO_FALSE_NEGATIVES }; - -// type W for weight must be an arithmetic type (integral or floating point) template< typename T, typename W = uint64_t, @@ -49,6 +53,7 @@ template< typename A = std::allocator > class frequent_items_sketch { + static_assert(std::is_arithmetic::value, "Arithmetic type expected"); public: static const uint8_t LG_MIN_MAP_SIZE = 3; @@ -194,7 +199,7 @@ class frequent_items_sketch { * There may be items omitted from the set with true frequencies greater than the * threshold (false negatives).

* - * @param error_type determines whether no false positives or no false negatives are desired. + * @param err_type determines whether no false positives or no false negatives are desired. * @return an array of frequent items */ vector_row get_frequent_items(frequent_items_error_type err_type) const; @@ -217,7 +222,7 @@ class frequent_items_sketch { * There may be items omitted from the set with true frequencies greater than the * threshold (false negatives).

* - * @param error_type determines whether no false positives or no false negatives are desired. + * @param err_type determines whether no false positives or no false negatives are desired. * @param threshold to include items in the result list * @return an array of frequent items */ @@ -293,7 +298,9 @@ class frequent_items_sketch { static const uint8_t PREAMBLE_LONGS_EMPTY = 1; static const uint8_t PREAMBLE_LONGS_NONEMPTY = 4; static constexpr double EPSILON_FACTOR = 3.5; - enum flags { IS_EMPTY }; + // due to a mistake different bits were used in C++ and Java to indicate empty sketch + // therefore both are set and checked for compatibility with historical binary format + enum flags { IS_EMPTY_1 = 0, IS_EMPTY_2 = 2 }; W total_weight; W offset; reverse_purge_hash_map map; @@ -318,14 +325,19 @@ class frequent_items_sketch { class items_deleter; }; +/// Row in the output from #get_frequent_items template class frequent_items_sketch::row { public: row(const T* item, W weight, W offset): item(item), weight(weight), offset(offset) {} + /// @return item const T& get_item() const { return *item; } + /// @return frequency (weight) estimate W get_estimate() const { return weight + offset; } + /// @return estimate lower bound W get_lower_bound() const { return weight; } + /// @return estimate upper bound W get_upper_bound() const { return weight + offset; } private: const T* item; diff --git a/3rd/datasketches/fi/include/frequent_items_sketch_impl.hpp b/3rd/datasketches/fi/include/frequent_items_sketch_impl.hpp index d72f46248..acbd2ee1a 100644 --- a/3rd/datasketches/fi/include/frequent_items_sketch_impl.hpp +++ b/3rd/datasketches/fi/include/frequent_items_sketch_impl.hpp @@ -174,7 +174,8 @@ void frequent_items_sketch::serialize(std::ostream& os, const Ser const uint8_t lg_cur_size = map.get_lg_cur_size(); write(os, lg_cur_size); const uint8_t flags_byte( - (is_empty() ? 1 << flags::IS_EMPTY : 0) + (is_empty() ? 1 << flags::IS_EMPTY_1 : 0) + | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0) ); write(os, flags_byte); const uint16_t unused16 = 0; @@ -234,7 +235,8 @@ auto frequent_items_sketch::serialize(unsigned header_size_bytes, const uint8_t lg_cur_size = map.get_lg_cur_size(); ptr += copy_to_mem(lg_cur_size, ptr); const uint8_t flags_byte( - (is_empty() ? 1 << flags::IS_EMPTY : 0) + (is_empty() ? 1 << flags::IS_EMPTY_1 : 0) + | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0) ); ptr += copy_to_mem(flags_byte, ptr); ptr += sizeof(uint16_t); // unused @@ -298,7 +300,7 @@ frequent_items_sketch frequent_items_sketch::deser const auto flags_byte = read(is); read(is); // unused - const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); + const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2)); check_preamble_longs(preamble_longs, is_empty); check_serial_version(serial_version); @@ -352,7 +354,7 @@ frequent_items_sketch frequent_items_sketch::deser ptr += copy_from_mem(ptr, flags_byte); ptr += sizeof(uint16_t); // unused - const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); + const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2)); check_preamble_longs(preamble_longs, is_empty); check_serial_version(serial_version); diff --git a/3rd/datasketches/kll/include/kll_helper_impl.hpp b/3rd/datasketches/kll/include/kll_helper_impl.hpp index 321761bb5..bb92bdc72 100644 --- a/3rd/datasketches/kll/include/kll_helper_impl.hpp +++ b/3rd/datasketches/kll/include/kll_helper_impl.hpp @@ -99,7 +99,7 @@ void kll_helper::randomly_halve_down(T* buf, uint32_t start, uint32_t length) { #ifdef KLL_VALIDATION const uint32_t offset = deterministic_offset(); #else - const uint32_t offset = random_bit(); + const uint32_t offset = random_utils::random_bit(); #endif uint32_t j = start + offset; for (uint32_t i = start; i < (start + half_length); i++) { @@ -115,7 +115,7 @@ void kll_helper::randomly_halve_up(T* buf, uint32_t start, uint32_t length) { #ifdef KLL_VALIDATION const uint32_t offset = deterministic_offset(); #else - const uint32_t offset = random_bit(); + const uint32_t offset = random_utils::random_bit(); #endif uint32_t j = (start + length) - 1 - offset; for (uint32_t i = (start + length) - 1; i >= (start + half_length); i--) { @@ -230,7 +230,8 @@ kll_helper::compress_result kll_helper::general_compress(uint16_t k, uint8_t m, // move level over as is // make sure we are not moving data upwards if (raw_beg < out_levels[current_level]) throw std::logic_error("wrong move"); - std::move(items + raw_beg, items + raw_lim, items + out_levels[current_level]); + if (raw_beg != out_levels[current_level]) + std::move(items + raw_beg, items + raw_lim, items + out_levels[current_level]); out_levels[current_level + 1] = out_levels[current_level] + raw_pop; } else { // The sketch is too full AND this level is too full, so we compact it @@ -243,7 +244,8 @@ kll_helper::compress_result kll_helper::general_compress(uint16_t k, uint8_t m, const auto half_adj_pop = adj_pop / 2; if (odd_pop) { // move one guy over - items[out_levels[current_level]] = std::move(items[raw_beg]); + if (out_levels[current_level] != raw_beg) + items[out_levels[current_level]] = std::move(items[raw_beg]); out_levels[current_level + 1] = out_levels[current_level] + 1; } else { // even number of items out_levels[current_level + 1] = out_levels[current_level]; diff --git a/3rd/datasketches/kll/include/kll_sketch.hpp b/3rd/datasketches/kll/include/kll_sketch.hpp index 560f0975e..904587a18 100644 --- a/3rd/datasketches/kll/include/kll_sketch.hpp +++ b/3rd/datasketches/kll/include/kll_sketch.hpp @@ -26,10 +26,22 @@ #include "common_defs.hpp" #include "serde.hpp" #include "quantiles_sorted_view.hpp" +#include "optional.hpp" namespace datasketches { -/* +/// KLL sketch constants +namespace kll_constants { + /// default value of parameter K + const uint16_t DEFAULT_K = 200; + const uint8_t DEFAULT_M = 8; + /// min value of parameter K + const uint16_t MIN_K = DEFAULT_M; + /// max value of parameter K + const uint16_t MAX_K = (1 << 16) - 1; +} + +/** * Implementation of a very compact quantiles sketch with lazy compaction scheme * and nearly optimal accuracy per retained item. * See
Optimal Quantile Approximation in Streams. @@ -146,10 +158,6 @@ namespace datasketches { * author Lee Rhodes */ -namespace kll_constants { - const uint16_t DEFAULT_K = 200; -} - template < typename T, typename C = std::less, // strict weak ordering function (see C++ named requirements: Compare) @@ -159,17 +167,51 @@ class kll_sketch { public: using value_type = T; using comparator = C; + using allocator_type = A; using vector_u32 = std::vector::template rebind_alloc>; + using vector_double = typename quantiles_sorted_view::vector_double; - static const uint8_t DEFAULT_M = 8; - static const uint16_t MIN_K = DEFAULT_M; - static const uint16_t MAX_K = (1 << 16) - 1; + /** + * Quantile return type. + * This is to return quantiles either by value (for arithmetic types) or by const reference (for all other types) + */ + using quantile_return_type = typename quantiles_sorted_view::quantile_return_type; + /** + * Constructor + * @param k affects the size of the sketch and its estimation error + * @param comparator strict weak ordering function (see C++ named requirements: Compare) + * @param allocator used by this sketch to allocate memory + */ explicit kll_sketch(uint16_t k = kll_constants::DEFAULT_K, const C& comparator = C(), const A& allocator = A()); + + /** + * Copy constructor + * @param other sketch to be copied + */ kll_sketch(const kll_sketch& other); + + /** + * Move constructor + * @param other sketch to be moved + */ kll_sketch(kll_sketch&& other) noexcept; + + ~kll_sketch(); + + /** + * Copy assignment + * @param other sketch to be copied + * @return reference to this sketch + */ kll_sketch& operator=(const kll_sketch& other); + + /** + * Move assignment + * @param other sketch to be moved + * @return reference to this sketch + */ kll_sketch& operator=(kll_sketch&& other); /* @@ -262,44 +304,8 @@ class kll_sketch { * * @return approximate quantile associated with the given rank */ - using quantile_return_type = typename quantiles_sorted_view::quantile_return_type; quantile_return_type get_quantile(double rank, bool inclusive = true) const; - /** - * This returns an array that could have been generated by using get_quantile() for each - * rank separately. - * - *

If the sketch is empty this throws std::runtime_error. - * - * @param ranks given array of ranks in the hypothetical sorted stream. - * These ranks must be in the interval [0.0, 1.0]. - * @param size the number of ranks in the array - * @param inclusive if true, the given ranks are considered inclusive (include weights of items) - * - * @return array of approximate quantiles corresponding to the given ranks in the same order. - * - * Deprecated. Will be removed in the next major version. Use get_quantile() instead. - */ - std::vector get_quantiles(const double* ranks, uint32_t size, bool inclusive = true) const; - - /** - * This is a multiple-query version of get_quantile() that allows the caller to - * specify the number of evenly-spaced ranks. - * - *

If the sketch is empty this throws std::runtime_error. - * - * @param num an integer that specifies the number of evenly-spaced ranks. - * This must be an integer greater than 0. A value of 1 will return the quantile of rank 0. - * A value of 2 will return quantiles of ranks 0 and 1. A value of 3 will return quantiles of ranks 0, - * 0.5 (median) and 1, etc. - * @param inclusive if true, the ranks are considered inclusive (include weights of items) - * - * @return array of approximate quantiles corresponding to the given number of evenly-spaced ranks. - * - * Deprecated. Will be removed in the next major version. Use get_quantile() instead. - */ - std::vector get_quantiles(uint32_t num, bool inclusive = true) const; - /** * Returns an approximation to the normalized rank of the given item from 0 to 1, inclusive. * @@ -339,7 +345,6 @@ class kll_sketch { * @return an array of m+1 doubles each of which is an approximation * to the fraction of the input stream items (the mass) that fall into one of those intervals. */ - using vector_double = typename quantiles_sorted_view::vector_double; vector_double get_PMF(const T* split_points, uint32_t size, bool inclusive = true) const; /** @@ -489,9 +494,26 @@ class kll_sketch { string to_string(bool print_levels = false, bool print_items = false) const; class const_iterator; + + /** + * Iterator pointing to the first item in the sketch. + * If the sketch is empty, the returned iterator must not be dereferenced or incremented. + * @return iterator pointing to the first item in the sketch + */ const_iterator begin() const; + + /** + * Iterator pointing to the past-the-end item in the sketch. + * The past-the-end item is the hypothetical item that would follow the last item. + * It does not point to any item, and must not be dereferenced or incremented. + * @return iterator pointing to the past-the-end item in the sketch + */ const_iterator end() const; + /** + * Gets the sorted view of this sketch + * @return the sorted view of this sketch + */ quantiles_sorted_view get_sorted_view() const; private: @@ -529,16 +551,15 @@ class kll_sketch { vector_u32 levels_; T* items_; uint32_t items_size_; - T* min_item_; - T* max_item_; + optional min_item_; + optional max_item_; mutable quantiles_sorted_view* sorted_view_; // for deserialization - class item_deleter; class items_deleter; kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32&& levels, - std::unique_ptr items, uint32_t items_size, std::unique_ptr min_item, - std::unique_ptr max_item, bool is_level_zero_sorted, const C& comparator); + std::unique_ptr items, uint32_t items_size, optional&& min_item, + optional&& max_item, bool is_level_zero_sorted, const C& comparator); // common update code inline void update_min_max(const T& item); diff --git a/3rd/datasketches/kll/include/kll_sketch_impl.hpp b/3rd/datasketches/kll/include/kll_sketch_impl.hpp index cced0caa1..fde0a314a 100644 --- a/3rd/datasketches/kll/include/kll_sketch_impl.hpp +++ b/3rd/datasketches/kll/include/kll_sketch_impl.hpp @@ -37,7 +37,7 @@ kll_sketch::kll_sketch(uint16_t k, const C& comparator, const A& alloca comparator_(comparator), allocator_(allocator), k_(k), -m_(DEFAULT_M), +m_(kll_constants::DEFAULT_M), min_k_(k), num_levels_(1), is_level_zero_sorted_(false), @@ -45,12 +45,13 @@ n_(0), levels_(2, 0, allocator), items_(nullptr), items_size_(k_), -min_item_(nullptr), -max_item_(nullptr), +min_item_(), +max_item_(), sorted_view_(nullptr) { - if (k < MIN_K || k > MAX_K) { - throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k)); + if (k < kll_constants::MIN_K || k > kll_constants::MAX_K) { + throw std::invalid_argument("K must be >= " + std::to_string(kll_constants::MIN_K) + " and <= " + + std::to_string(kll_constants::MAX_K) + ": " + std::to_string(k)); } levels_[0] = levels_[1] = k; items_ = allocator_.allocate(items_size_); @@ -69,14 +70,12 @@ n_(other.n_), levels_(other.levels_), items_(nullptr), items_size_(other.items_size_), -min_item_(nullptr), -max_item_(nullptr), +min_item_(other.min_item_), +max_item_(other.max_item_), sorted_view_(nullptr) { items_ = allocator_.allocate(items_size_); for (auto i = levels_[0]; i < levels_[num_levels_]; ++i) new (&items_[i]) T(other.items_[i]); - if (other.min_item_ != nullptr) min_item_ = new (allocator_.allocate(1)) T(*other.min_item_); - if (other.max_item_ != nullptr) max_item_ = new (allocator_.allocate(1)) T(*other.max_item_); } template @@ -92,13 +91,11 @@ n_(other.n_), levels_(std::move(other.levels_)), items_(other.items_), items_size_(other.items_size_), -min_item_(other.min_item_), -max_item_(other.max_item_), +min_item_(std::move(other.min_item_)), +max_item_(std::move(other.max_item_)), sorted_view_(nullptr) { other.items_ = nullptr; - other.min_item_ = nullptr; - other.max_item_ = nullptr; } template @@ -148,14 +145,6 @@ kll_sketch::~kll_sketch() { for (uint32_t i = begin; i < end; i++) items_[i].~T(); allocator_.deallocate(items_, items_size_); } - if (min_item_ != nullptr) { - min_item_->~T(); - allocator_.deallocate(min_item_, 1); - } - if (max_item_ != nullptr) { - max_item_->~T(); - allocator_.deallocate(max_item_, 1); - } reset_sorted_view(); } @@ -173,8 +162,8 @@ n_(other.n_), levels_(other.levels_, allocator_), items_(nullptr), items_size_(other.items_size_), -min_item_(nullptr), -max_item_(nullptr), +min_item_(other.min_item_), +max_item_(other.max_item_), sorted_view_(nullptr) { static_assert( @@ -183,8 +172,6 @@ sorted_view_(nullptr) ); items_ = allocator_.allocate(items_size_); for (auto i = levels_[0]; i < levels_[num_levels_]; ++i) new (&items_[i]) T(other.items_[i]); - if (other.min_item_ != nullptr) min_item_ = new (allocator_.allocate(1)) T(*other.min_item_); - if (other.max_item_ != nullptr) max_item_ = new (allocator_.allocate(1)) T(*other.max_item_); check_sorting(); } @@ -192,7 +179,7 @@ template template void kll_sketch::update(FwdT&& item) { if (!check_update_item(item)) { return; } - update_min_max(item); + update_min_max(static_cast(item)); // min and max are always copies const uint32_t index = internal_update(); new (&items_[index]) T(std::forward(item)); reset_sorted_view(); @@ -201,8 +188,8 @@ void kll_sketch::update(FwdT&& item) { template void kll_sketch::update_min_max(const T& item) { if (is_empty()) { - min_item_ = new (allocator_.allocate(1)) T(item); - max_item_ = new (allocator_.allocate(1)) T(item); + min_item_.emplace(item); + max_item_.emplace(item); } else { if (comparator_(item, *min_item_)) *min_item_ = item; if (comparator_(*max_item_, item)) *max_item_ = item; @@ -225,8 +212,8 @@ void kll_sketch::merge(FwdSk&& other) { throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_)); } if (is_empty()) { - min_item_ = new (allocator_.allocate(1)) T(conditional_forward(*other.min_item_)); - max_item_ = new (allocator_.allocate(1)) T(conditional_forward(*other.max_item_)); + min_item_.emplace(conditional_forward(*other.min_item_)); + max_item_.emplace(conditional_forward(*other.max_item_)); } else { if (comparator_(*other.min_item_, *min_item_)) *min_item_ = conditional_forward(*other.min_item_); if (comparator_(*max_item_, *other.max_item_)) *max_item_ = conditional_forward(*other.max_item_); @@ -322,42 +309,6 @@ auto kll_sketch::get_quantile(double rank, bool inclusive) const -> qua return sorted_view_->get_quantile(rank, inclusive); } -template -std::vector kll_sketch::get_quantiles(const double* ranks, uint32_t size, bool inclusive) const { - if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); - std::vector quantiles(allocator_); - quantiles.reserve(size); - - // may have a side effect of sorting level zero if needed - setup_sorted_view(); - - for (uint32_t i = 0; i < size; i++) { - const double rank = ranks[i]; - if ((rank < 0.0) || (rank > 1.0)) { - throw std::invalid_argument("normalized rank cannot be less than 0 or greater than 1"); - } - quantiles.push_back(sorted_view_->get_quantile(rank, inclusive)); - } - return quantiles; -} - -template -std::vector kll_sketch::get_quantiles(uint32_t num, bool inclusive) const { - if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch"); - if (num == 0) { - throw std::invalid_argument("num must be > 0"); - } - vector_double ranks(num, 0, allocator_); - ranks[0] = 0.0; - for (size_t i = 1; i < num; i++) { - ranks[i] = static_cast(i) / (num - 1); - } - if (num > 1) { - ranks[num - 1] = 1.0; - } - return get_quantiles(ranks.data(), num, inclusive); -} - template double kll_sketch::get_normalized_rank_error(bool pmf) const { return get_normalized_rank_error(min_k_, pmf); @@ -396,7 +347,7 @@ template template::value, int>::type> size_t kll_sketch::get_max_serialized_size_bytes(uint16_t k, uint64_t n) { const uint8_t num_levels = kll_helper::ub_on_num_levels(n); - const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, DEFAULT_M, num_levels); + const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, kll_constants::DEFAULT_M, num_levels); // the last integer in the levels_ array is not serialized because it can be derived return DATA_START + num_levels * sizeof(uint32_t) + (max_num_retained + 2) * sizeof(TT); } @@ -406,7 +357,7 @@ template template::value, int>::type> size_t kll_sketch::get_max_serialized_size_bytes(uint16_t k, uint64_t n, size_t max_item_size_bytes) { const uint8_t num_levels = kll_helper::ub_on_num_levels(n); - const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, DEFAULT_M, num_levels); + const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, kll_constants::DEFAULT_M, num_levels); // the last integer in the levels_ array is not serialized because it can be derived return DATA_START + num_levels * sizeof(uint32_t) + (max_num_retained + 2) * max_item_size_bytes; } @@ -438,8 +389,8 @@ void kll_sketch::serialize(std::ostream& os, const SerDe& sd) const { write(os, num_levels_); write(os, unused); write(os, levels_.data(), sizeof(levels_[0]) * num_levels_); - sd.serialize(os, min_item_, 1); - sd.serialize(os, max_item_, 1); + sd.serialize(os, &*min_item_, 1); + sd.serialize(os, &*max_item_, 1); } sd.serialize(os, &items_[levels_[0]], get_num_retained()); } @@ -474,8 +425,8 @@ auto kll_sketch::serialize(unsigned header_size_bytes, const SerDe& sd) ptr += copy_to_mem(num_levels_, ptr); ptr += sizeof(uint8_t); // unused ptr += copy_to_mem(levels_.data(), ptr, sizeof(levels_[0]) * num_levels_); - ptr += sd.serialize(ptr, end_ptr - ptr, min_item_, 1); - ptr += sd.serialize(ptr, end_ptr - ptr, max_item_, 1); + ptr += sd.serialize(ptr, end_ptr - ptr, &*min_item_, 1); + ptr += sd.serialize(ptr, end_ptr - ptr, &*max_item_, 1); } const size_t bytes_remaining = end_ptr - ptr; ptr += sd.serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained()); @@ -530,20 +481,20 @@ kll_sketch kll_sketch::deserialize(std::istream& is, const Ser read(is, levels.data(), sizeof(levels[0]) * num_levels); } levels[num_levels] = capacity; - A alloc(allocator); - auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); }; - std::unique_ptr min_item_buffer(alloc.allocate(1), item_buffer_deleter); - std::unique_ptr max_item_buffer(alloc.allocate(1), item_buffer_deleter); - std::unique_ptr min_item(nullptr, item_deleter(allocator)); - std::unique_ptr max_item(nullptr, item_deleter(allocator)); + optional tmp; // space to deserialize min and max + optional min_item; + optional max_item; if (!is_single_item) { - sd.deserialize(is, min_item_buffer.get(), 1); - // serde call did not throw, repackage with destrtuctor - min_item = std::unique_ptr(min_item_buffer.release(), item_deleter(allocator)); - sd.deserialize(is, max_item_buffer.get(), 1); - // serde call did not throw, repackage with destrtuctor - max_item = std::unique_ptr(max_item_buffer.release(), item_deleter(allocator)); + sd.deserialize(is, &*tmp, 1); + // serde call did not throw, repackage and cleanup + min_item.emplace(*tmp); + (*tmp).~T(); + sd.deserialize(is, &*tmp, 1); + // serde call did not throw, repackage and cleanup + max_item.emplace(*tmp); + (*tmp).~T(); } + A alloc(allocator); auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); }; std::unique_ptr items_buffer(alloc.allocate(capacity), items_buffer_deleter); const auto num_items = levels[num_levels] - levels[0]; @@ -552,12 +503,8 @@ kll_sketch kll_sketch::deserialize(std::istream& is, const Ser std::unique_ptr items(items_buffer.release(), items_deleter(levels[0], capacity, allocator)); const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0; if (is_single_item) { - new (min_item_buffer.get()) T(items.get()[levels[0]]); - // copy did not throw, repackage with destrtuctor - min_item = std::unique_ptr(min_item_buffer.release(), item_deleter(allocator)); - new (max_item_buffer.get()) T(items.get()[levels[0]]); - // copy did not throw, repackage with destrtuctor - max_item = std::unique_ptr(max_item_buffer.release(), item_deleter(allocator)); + min_item.emplace(items.get()[levels[0]]); + max_item.emplace(items.get()[levels[0]]); } if (!is.good()) throw std::runtime_error("error reading from std::istream"); @@ -618,20 +565,20 @@ kll_sketch kll_sketch::deserialize(const void* bytes, size_t s ptr += copy_from_mem(ptr, levels.data(), sizeof(levels[0]) * num_levels); } levels[num_levels] = capacity; - A alloc(allocator); - auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); }; - std::unique_ptr min_item_buffer(alloc.allocate(1), item_buffer_deleter); - std::unique_ptr max_item_buffer(alloc.allocate(1), item_buffer_deleter); - std::unique_ptr min_item(nullptr, item_deleter(allocator)); - std::unique_ptr max_item(nullptr, item_deleter(allocator)); + optional tmp; // space to deserialize min and max + optional min_item; + optional max_item; if (!is_single_item) { - ptr += sd.deserialize(ptr, end_ptr - ptr, min_item_buffer.get(), 1); - // serde call did not throw, repackage with destrtuctor - min_item = std::unique_ptr(min_item_buffer.release(), item_deleter(allocator)); - ptr += sd.deserialize(ptr, end_ptr - ptr, max_item_buffer.get(), 1); - // serde call did not throw, repackage with destrtuctor - max_item = std::unique_ptr(max_item_buffer.release(), item_deleter(allocator)); + ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1); + // serde call did not throw, repackage and cleanup + min_item.emplace(*tmp); + (*tmp).~T(); + ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1); + // serde call did not throw, repackage and cleanup + max_item.emplace(*tmp); + (*tmp).~T(); } + A alloc(allocator); auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); }; std::unique_ptr items_buffer(alloc.allocate(capacity), items_buffer_deleter); const auto num_items = levels[num_levels] - levels[0]; @@ -642,12 +589,8 @@ kll_sketch kll_sketch::deserialize(const void* bytes, size_t s if (delta != size) throw std::logic_error("deserialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size)); const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0; if (is_single_item) { - new (min_item_buffer.get()) T(items.get()[levels[0]]); - // copy did not throw, repackage with destrtuctor - min_item = std::unique_ptr(min_item_buffer.release(), item_deleter(allocator)); - new (max_item_buffer.get()) T(items.get()[levels[0]]); - // copy did not throw, repackage with destrtuctor - max_item = std::unique_ptr(max_item_buffer.release(), item_deleter(allocator)); + min_item.emplace(items.get()[levels[0]]); + max_item.emplace(items.get()[levels[0]]); } return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity, std::move(min_item), std::move(max_item), is_level_zero_sorted, comparator); @@ -670,12 +613,12 @@ double kll_sketch::get_normalized_rank_error(uint16_t k, bool pmf) { // for deserialization template kll_sketch::kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32&& levels, - std::unique_ptr items, uint32_t items_size, std::unique_ptr min_item, - std::unique_ptr max_item, bool is_level_zero_sorted, const C& comparator): + std::unique_ptr items, uint32_t items_size, optional&& min_item, + optional&& max_item, bool is_level_zero_sorted, const C& comparator): comparator_(comparator), allocator_(levels.get_allocator()), k_(k), -m_(DEFAULT_M), +m_(kll_constants::DEFAULT_M), min_k_(min_k), num_levels_(num_levels), is_level_zero_sorted_(is_level_zero_sorted), @@ -683,8 +626,8 @@ n_(n), levels_(std::move(levels)), items_(items.release()), items_size_(items_size), -min_item_(min_item.release()), -max_item_(max_item.release()), +min_item_(std::move(min_item)), +max_item_(std::move(max_item)), sorted_view_(nullptr) {} @@ -820,7 +763,7 @@ quantiles_sorted_view kll_sketch::get_sorted_view() const { for (uint8_t level = 0; level < num_levels_; ++level) { const auto from = items_ + levels_[level]; const auto to = items_ + levels_[level + 1]; // exclusive - view.add(from, to, 1 << level); + view.add(from, to, 1ULL << level); } view.convert_to_cummulative(); return view; @@ -917,8 +860,8 @@ uint32_t kll_sketch::get_num_retained_above_level_zero() const { template void kll_sketch::check_m(uint8_t m) { - if (m != DEFAULT_M) { - throw std::invalid_argument("Possible corruption: M must be " + std::to_string(DEFAULT_M) + if (m != kll_constants::DEFAULT_M) { + throw std::invalid_argument("Possible corruption: M must be " + std::to_string(kll_constants::DEFAULT_M) + ": " + std::to_string(m)); } } @@ -1019,20 +962,6 @@ typename kll_sketch::const_iterator kll_sketch::end() const { return kll_sketch::const_iterator(nullptr, levels_.data(), num_levels_); } -template -class kll_sketch::item_deleter { - public: - item_deleter(const A& allocator): allocator_(allocator) {} - void operator() (T* ptr) { - if (ptr != nullptr) { - ptr->~T(); - allocator_.deallocate(ptr, 1); - } - } - private: - A allocator_; -}; - template class kll_sketch::items_deleter { public: diff --git a/src/Metrics.h b/src/Metrics.h index ed101ffd2..ae979d2af 100644 --- a/src/Metrics.h +++ b/src/Metrics.h @@ -58,6 +58,12 @@ static inline uint64_t timespec_to_uint64(timespec &stamp) return stamp.tv_sec * 1000000000ULL + stamp.tv_nsec; } +template +static inline std::vector get_quantiles(const datasketches::kll_sketch &quatile) +{ + return {quatile.get_quantile(0.50), quatile.get_quantile(0.90), quatile.get_quantile(0.95), quatile.get_quantile(0.99)}; +} + class Metric { public: @@ -353,10 +359,9 @@ class Quantile final : public Metric if (other._quantile.is_empty()) { return; } - const double fractions[4]{0.50, 0.90, 0.95, 0.99}; - auto other_quantiles = other._quantile.get_quantiles(fractions, 4); + auto other_quantiles = get_quantiles(other._quantile); if (_quantiles_sum.empty()) { - _quantiles_sum = _quantile.get_quantiles(fractions, 4); + _quantiles_sum = get_quantiles(_quantile); } for (uint8_t i = 0; i < 4; i++) { _quantiles_sum[i] += other_quantiles[i]; @@ -395,8 +400,7 @@ class Quantile final : public Metric std::vector quantiles; if (_quantiles_sum.empty()) { - const double fractions[4]{0.50, 0.90, 0.95, 0.99}; - quantiles = _quantile.get_quantiles(fractions, 4); + quantiles = get_quantiles(_quantile); } else { quantiles = _quantiles_sum; } @@ -417,8 +421,7 @@ class Quantile final : public Metric std::vector quantiles; if (_quantiles_sum.empty()) { - const double fractions[4]{0.50, 0.90, 0.95, 0.99}; - quantiles = _quantile.get_quantiles(fractions, 4); + quantiles = get_quantiles(_quantile); } else { quantiles = _quantiles_sum; } @@ -451,9 +454,8 @@ class Quantile final : public Metric } std::vector quantiles; - const double fractions[4]{0.50, 0.90, 0.95, 0.99}; if (_quantiles_sum.empty()) { - quantiles = _quantile.get_quantiles(fractions, 4); + quantiles = get_quantiles(_quantile); } else { quantiles = _quantiles_sum; } @@ -464,6 +466,7 @@ class Quantile final : public Metric auto summary_data_point = metric->mutable_summary()->add_data_points(); summary_data_point->set_start_time_unix_nano(timespec_to_uint64(start)); summary_data_point->set_time_unix_nano(timespec_to_uint64(end)); + const double fractions[4]{0.50, 0.90, 0.95, 0.99}; for (auto it = quantiles.begin(); it != quantiles.end(); ++it) { auto quantile = summary_data_point->add_quantile_values(); quantile->set_quantile(fractions[it - quantiles.begin()]); diff --git a/src/tests/test_sketches.cpp b/src/tests/test_sketches.cpp index 4f89f7714..e7d2e5c37 100644 --- a/src/tests/test_sketches.cpp +++ b/src/tests/test_sketches.cpp @@ -91,13 +91,15 @@ TEST_CASE("Quantiles", "[kll]") const double fractions[3]{0.0, 0.5, 1.0}; // inclusive (default) - auto quantiles = sketch.get_quantiles(fractions, 3); + auto quantiles = std::vector{sketch.get_quantile(fractions[0]), + sketch.get_quantile(fractions[1]), sketch.get_quantile(fractions[2])}; CHECK(3ul == quantiles.size()); CHECK(0.0f == quantiles[0]); CHECK(static_cast((n - 1) / 2) == quantiles[1]); CHECK(static_cast(n - 1) == quantiles[2]); // exclusive - quantiles = sketch.get_quantiles(fractions, 3, false); + quantiles = std::vector{sketch.get_quantile(fractions[0], false), + sketch.get_quantile(fractions[1], false), sketch.get_quantile(fractions[2], false)}; CHECK(3ul == quantiles.size()); CHECK(0.0f == quantiles[0]); CHECK(static_cast(n) / 2 == quantiles[1]);