diff --git a/3rd/datasketches/common/CMakeLists.txt b/3rd/datasketches/common/CMakeLists.txt index 3a19e40ac..22cb1aeb0 100644 --- a/3rd/datasketches/common/CMakeLists.txt +++ b/3rd/datasketches/common/CMakeLists.txt @@ -43,4 +43,5 @@ target_sources(common ${CMAKE_CURRENT_SOURCE_DIR}/include/quantiles_sorted_view_impl.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/optional.hpp ${CMAKE_CURRENT_SOURCE_DIR}/include/version.hpp.in + ${CMAKE_CURRENT_SOURCE_DIR}/include/xxhash64.h ) diff --git a/3rd/datasketches/common/include/MurmurHash3.h b/3rd/datasketches/common/include/MurmurHash3.h index ed9962a1c..2dde3dfb4 100644 --- a/3rd/datasketches/common/include/MurmurHash3.h +++ b/3rd/datasketches/common/include/MurmurHash3.h @@ -71,10 +71,10 @@ typedef struct { // Block read - if your platform needs to do endian-swapping or can only // handle aligned reads, do the conversion here -MURMUR3_FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, size_t i ) +MURMUR3_FORCE_INLINE uint64_t getblock64 ( const uint8_t * p, size_t i ) { uint64_t res; - memcpy(&res, p + i, sizeof(res)); + memcpy(&res, p + i * sizeof(uint64_t), sizeof(res)); return res; } @@ -104,13 +104,12 @@ MURMUR3_FORCE_INLINE void MurmurHash3_x64_128(const void* key, size_t lenBytes, // Number of full 128-bit blocks of 16 bytes. // Possible exclusion of a remainder of up to 15 bytes. - const size_t nblocks = lenBytes >> 4; // bytes / 16 + const size_t nblocks = lenBytes >> 4; // bytes / 16 // Process the 128-bit blocks (the body) into the hash - const uint64_t* blocks = (const uint64_t*)(data); for (size_t i = 0; i < nblocks; ++i) { // 16 bytes per block - uint64_t k1 = getblock64(blocks, i * 2 + 0); - uint64_t k2 = getblock64(blocks, i * 2 + 1); + uint64_t k1 = getblock64(data, i * 2 + 0); + uint64_t k2 = getblock64(data, i * 2 + 1); k1 *= c1; k1 = MURMUR3_ROTL64(k1,31); k1 *= c2; out.h1 ^= k1; out.h1 = MURMUR3_ROTL64(out.h1,27); diff --git a/3rd/datasketches/common/include/common_defs.hpp b/3rd/datasketches/common/include/common_defs.hpp index d8e3e6ca0..6a87e079a 100644 --- a/3rd/datasketches/common/include/common_defs.hpp +++ b/3rd/datasketches/common/include/common_defs.hpp @@ -42,6 +42,7 @@ namespace random_utils { static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2 static thread_local std::mt19937_64 rand(rd()); static thread_local std::uniform_real_distribution<> next_double(0.0, 1.0); + static thread_local std::uniform_int_distribution next_uint64(0, UINT64_MAX); // thread-safe random bit static thread_local std::independent_bits_engine @@ -91,6 +92,23 @@ static inline void write(std::ostream& os, const T* ptr, size_t size_bytes) { os.write(reinterpret_cast(ptr), size_bytes); } +template +T byteswap(T value) { + char* ptr = static_cast(static_cast(&value)); + const int len = sizeof(T); + for (size_t i = 0; i < len / 2; ++i) { + std::swap(ptr[i], ptr[len - i - 1]); + } + return value; +} + +template +static inline T read_big_endian(std::istream& is) { + T value; + is.read(reinterpret_cast(&value), sizeof(T)); + return byteswap(value); +} + // wrapper for iterators to implement operator-> returning temporary value template class return_value_holder { diff --git a/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp b/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp index d57cd53fb..3663950f2 100755 --- a/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp +++ b/3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp @@ -86,19 +86,17 @@ auto quantiles_sorted_view::get_quantile(double rank, bool inclusive) c template auto quantiles_sorted_view::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double { if (entries_.empty()) throw std::runtime_error("operation is undefined for an empty sketch"); - vector_double buckets(entries_.get_allocator()); - if (entries_.size() == 0) return buckets; check_split_points(split_points, size); - buckets.reserve(size + 1); - for (uint32_t i = 0; i < size; ++i) buckets.push_back(get_rank(split_points[i], inclusive)); - buckets.push_back(1); - return buckets; + vector_double ranks(entries_.get_allocator()); + ranks.reserve(size + 1); + for (uint32_t i = 0; i < size; ++i) ranks.push_back(get_rank(split_points[i], inclusive)); + ranks.push_back(1); + return ranks; } template auto quantiles_sorted_view::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double { auto buckets = get_CDF(split_points, size, inclusive); - if (buckets.size() == 0) return buckets; for (uint32_t i = size; i > 0; --i) { buckets[i] -= buckets[i - 1]; } diff --git a/3rd/datasketches/common/include/xxhash64.h b/3rd/datasketches/common/include/xxhash64.h new file mode 100644 index 000000000..4d0bbc5d7 --- /dev/null +++ b/3rd/datasketches/common/include/xxhash64.h @@ -0,0 +1,202 @@ +// ////////////////////////////////////////////////////////// +// xxhash64.h +// Copyright (c) 2016 Stephan Brumme. All rights reserved. +// see http://create.stephan-brumme.com/disclaimer.html +// + +#pragma once +#include // for uint32_t and uint64_t + +/// XXHash (64 bit), based on Yann Collet's descriptions, see http://cyan4973.github.io/xxHash/ +/** How to use: + uint64_t myseed = 0; + XXHash64 myhash(myseed); + myhash.add(pointerToSomeBytes, numberOfBytes); + myhash.add(pointerToSomeMoreBytes, numberOfMoreBytes); // call add() as often as you like to ... + // and compute hash: + uint64_t result = myhash.hash(); + + // or all of the above in one single line: + uint64_t result2 = XXHash64::hash(mypointer, numBytes, myseed); + + Note: my code is NOT endian-aware ! +**/ +class XXHash64 +{ +public: + /// create new XXHash (64 bit) + /** @param seed your seed value, even zero is a valid seed **/ + explicit XXHash64(uint64_t seed) + { + state[0] = seed + Prime1 + Prime2; + state[1] = seed + Prime2; + state[2] = seed; + state[3] = seed - Prime1; + bufferSize = 0; + totalLength = 0; + } + + /// add a chunk of bytes + /** @param input pointer to a continuous block of data + @param length number of bytes + @return false if parameters are invalid / zero **/ + bool add(const void* input, uint64_t length) + { + // no data ? + if (!input || length == 0) + return false; + + totalLength += length; + // byte-wise access + const unsigned char* data = (const unsigned char*)input; + + // unprocessed old data plus new data still fit in temporary buffer ? + if (bufferSize + length < MaxBufferSize) + { + // just add new data + while (length-- > 0) + buffer[bufferSize++] = *data++; + return true; + } + + // point beyond last byte + const unsigned char* stop = data + length; + const unsigned char* stopBlock = stop - MaxBufferSize; + + // some data left from previous update ? + if (bufferSize > 0) + { + // make sure temporary buffer is full (16 bytes) + while (bufferSize < MaxBufferSize) + buffer[bufferSize++] = *data++; + + // process these 32 bytes (4x8) + process(buffer, state[0], state[1], state[2], state[3]); + } + + // copying state to local variables helps optimizer A LOT + uint64_t s0 = state[0], s1 = state[1], s2 = state[2], s3 = state[3]; + // 32 bytes at once + while (data <= stopBlock) + { + // local variables s0..s3 instead of state[0]..state[3] are much faster + process(data, s0, s1, s2, s3); + data += 32; + } + // copy back + state[0] = s0; state[1] = s1; state[2] = s2; state[3] = s3; + + // copy remainder to temporary buffer + bufferSize = stop - data; + for (uint64_t i = 0; i < bufferSize; i++) + buffer[i] = data[i]; + + // done + return true; + } + + /// get current hash + /** @return 64 bit XXHash **/ + uint64_t hash() const + { + // fold 256 bit state into one single 64 bit value + uint64_t result; + if (totalLength >= MaxBufferSize) + { + result = rotateLeft(state[0], 1) + + rotateLeft(state[1], 7) + + rotateLeft(state[2], 12) + + rotateLeft(state[3], 18); + result = (result ^ processSingle(0, state[0])) * Prime1 + Prime4; + result = (result ^ processSingle(0, state[1])) * Prime1 + Prime4; + result = (result ^ processSingle(0, state[2])) * Prime1 + Prime4; + result = (result ^ processSingle(0, state[3])) * Prime1 + Prime4; + } + else + { + // internal state wasn't set in add(), therefore original seed is still stored in state2 + result = state[2] + Prime5; + } + + result += totalLength; + + // process remaining bytes in temporary buffer + const unsigned char* data = buffer; + // point beyond last byte + const unsigned char* stop = data + bufferSize; + + // at least 8 bytes left ? => eat 8 bytes per step + for (; data + 8 <= stop; data += 8) + result = rotateLeft(result ^ processSingle(0, *(uint64_t*)data), 27) * Prime1 + Prime4; + + // 4 bytes left ? => eat those + if (data + 4 <= stop) + { + result = rotateLeft(result ^ (*(uint32_t*)data) * Prime1, 23) * Prime2 + Prime3; + data += 4; + } + + // take care of remaining 0..3 bytes, eat 1 byte per step + while (data != stop) + result = rotateLeft(result ^ (*data++) * Prime5, 11) * Prime1; + + // mix bits + result ^= result >> 33; + result *= Prime2; + result ^= result >> 29; + result *= Prime3; + result ^= result >> 32; + return result; + } + + + /// combine constructor, add() and hash() in one static function (C style) + /** @param input pointer to a continuous block of data + @param length number of bytes + @param seed your seed value, e.g. zero is a valid seed + @return 64 bit XXHash **/ + static uint64_t hash(const void* input, uint64_t length, uint64_t seed) + { + XXHash64 hasher(seed); + hasher.add(input, length); + return hasher.hash(); + } + +private: + /// magic constants :-) + static const uint64_t Prime1 = 11400714785074694791ULL; + static const uint64_t Prime2 = 14029467366897019727ULL; + static const uint64_t Prime3 = 1609587929392839161ULL; + static const uint64_t Prime4 = 9650029242287828579ULL; + static const uint64_t Prime5 = 2870177450012600261ULL; + + /// temporarily store up to 31 bytes between multiple add() calls + static const uint64_t MaxBufferSize = 31+1; + + uint64_t state[4]; + unsigned char buffer[MaxBufferSize]; + uint64_t bufferSize; + uint64_t totalLength; + + /// rotate bits, should compile to a single CPU instruction (ROL) + static inline uint64_t rotateLeft(uint64_t x, unsigned char bits) + { + return (x << bits) | (x >> (64 - bits)); + } + + /// process a single 64 bit value + static inline uint64_t processSingle(uint64_t previous, uint64_t input) + { + return rotateLeft(previous + input * Prime2, 31) * Prime1; + } + + /// process a block of 4x4 bytes, this is the main part of the XXHash32 algorithm + static inline void process(const void* data, uint64_t& state0, uint64_t& state1, uint64_t& state2, uint64_t& state3) + { + const uint64_t* block = (const uint64_t*) data; + state0 = processSingle(state0, block[0]); + state1 = processSingle(state1, block[1]); + state2 = processSingle(state2, block[2]); + state3 = processSingle(state3, block[3]); + } +}; diff --git a/3rd/datasketches/cpc/include/cpc_compressor.hpp b/3rd/datasketches/cpc/include/cpc_compressor.hpp index ffcf776af..40b84f123 100644 --- a/3rd/datasketches/cpc/include/cpc_compressor.hpp +++ b/3rd/datasketches/cpc/include/cpc_compressor.hpp @@ -44,6 +44,10 @@ template class cpc_compressor; template inline cpc_compressor& get_compressor(); +// function called atexit to clean up compression tables +template +void destroy_compressor(); + template class cpc_compressor { public: @@ -109,8 +113,10 @@ class cpc_compressor { }; cpc_compressor(); - template friend cpc_compressor& get_compressor(); + friend cpc_compressor& get_compressor(); + ~cpc_compressor(); + friend void destroy_compressor(); void make_decoding_tables(); // call this at startup void free_decoding_tables(); // call this at the end diff --git a/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp b/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp index e1e75d32a..062e2e0e1 100644 --- a/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp +++ b/3rd/datasketches/cpc/include/cpc_compressor_impl.hpp @@ -22,9 +22,11 @@ #ifndef CPC_COMPRESSOR_IMPL_HPP_ #define CPC_COMPRESSOR_IMPL_HPP_ +#include #include #include +#include "common_defs.hpp" #include "compression_data.hpp" #include "cpc_util.hpp" #include "cpc_common.hpp" @@ -36,9 +38,17 @@ namespace datasketches { template cpc_compressor& get_compressor() { static cpc_compressor* instance = new cpc_compressor(); // use new for global initialization + static int reg_result = std::atexit(destroy_compressor); // just to clean up a little more nicely; don't worry if it fails + unused(reg_result); return *instance; } +// register to call compressor destructor at exit +template +void destroy_compressor() { + delete std::addressof(get_compressor()); +} + template cpc_compressor::cpc_compressor() { make_decoding_tables();