Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 8 additions & 20 deletions include/cuco/detail/hyperloglog/finalizer.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,11 @@ class finalizer {

__host__ __device__ constexpr int interpolation_anchor_index(double e) const noexcept
{
auto estimates = raw_estimate_data(this->precision_);
int const n = raw_estimate_data_size(this->precision_);
int left = 0;
int right = static_cast<int>(n) - 1;
int mid = -1;
int candidate_index = 0; // Index of the closest element found

auto estimates = raw_estimate_data(this->precision_);
int const n = raw_estimate_data_size(this->precision_);
int left = 0;
int right = static_cast<int>(n) - 1;
int mid = -1;
while (left <= right) {
mid = left + (right - left) / 2;

Expand All @@ -143,19 +141,9 @@ class finalizer {
}
}

// At this point, 'left' is the insertion point. We need to compare the elements at 'left' and
// 'left - 1' to find the closest one, taking care of boundary conditions.

// Distance from 'e' to the element at 'left', if within bounds
double const dist_lhs = left < static_cast<int>(n) ? cuda::std::abs(estimates[left] - e)
: cuda::std::numeric_limits<double>::max();
// Distance from 'e' to the element at 'left - 1', if within bounds
double const dist_rhs = left - 1 >= 0 ? cuda::std::abs(estimates[left - 1] - e)
: cuda::std::numeric_limits<double>::max();

candidate_index = (dist_lhs < dist_rhs) ? left : left - 1;

return candidate_index;
// At this point, `left` is the insertion point. Spark uses the insertion point as the anchor
// index when the exact estimate is not present.
return left;
}

static constexpr auto k = 6; ///< Number of interpolation points to consider
Expand Down
29 changes: 11 additions & 18 deletions include/cuco/detail/hyperloglog/hyperloglog_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
#include <cuda/std/__algorithm/max.h> // TODO #include <cuda/std/algorithm> once available
#include <cuda/std/bit>
#include <cuda/std/cstddef>
#include <cuda/std/limits>
#include <cuda/std/span>
#include <cuda/std/type_traits>
#include <cuda/std/utility>
#include <cuda/stream_ref>
#include <thrust/iterator/constant_iterator.h>
Expand Down Expand Up @@ -62,6 +64,9 @@ class hyperloglog_impl {
using fp_type = double; ///< Floating point type used for reduction
using hash_value_type = cuda::std::remove_cvref_t<decltype(cuda::std::declval<Hash>()(
cuda::std::declval<T>()))>; ///< Hash value type
static_assert(cuda::std::is_unsigned_v<hash_value_type>,
"HyperLogLog requires an unsigned hash value type");

public:
static constexpr auto thread_scope = Scope; ///< CUDA thread scope

Expand Down Expand Up @@ -153,13 +158,11 @@ class hyperloglog_impl {
*/
__device__ constexpr void add(T const& item) noexcept
{
auto const h = this->hash_(item);
auto const reg = h & this->register_mask();
auto const zeroes = cuda::std::countl_zero(h | this->register_mask()) + 1; // __clz

// reversed order (same one as Spark uses)
// auto const reg = h >> ((sizeof(hash_value_type) * 8) - this->precision_);
// auto const zeroes = cuda::std::countl_zero(h << this->precision_) + 1;
constexpr auto hash_bits = cuda::std::numeric_limits<hash_value_type>::digits;
auto const h = this->hash_(item);
auto const reg = static_cast<int>(h >> (hash_bits - this->precision_));
auto const w_padding = hash_value_type{1} << static_cast<hash_value_type>(this->precision_ - 1);
auto const zeroes = cuda::std::countl_zero((h << this->precision_) | w_padding) + 1; // __clz

this->update_max(reg, zeroes);
}
Expand Down Expand Up @@ -405,7 +408,7 @@ class hyperloglog_impl {
int thread_zeroes = 0;
for (int i = group.thread_rank(); i < this->sketch_.size(); i += group.size()) {
auto const reg = this->sketch_[i];
thread_sum += fp_type{1} / static_cast<fp_type>(1 << reg);
thread_sum += fp_type{1} / static_cast<fp_type>(1ull << reg);
thread_zeroes += reg == 0;
}

Expand Down Expand Up @@ -626,16 +629,6 @@ class hyperloglog_impl {
}
}

/**
* @brief Gets the register mask used to separate register index from count.
*
* @return The register mask
*/
__host__ __device__ constexpr hash_value_type register_mask() const noexcept
{
return (1ull << this->precision_) - 1;
}

hasher hash_; ///< Hash function used to hash items
int32_t precision_; ///< HLL precision parameter
cuda::std::span<register_type> sketch_; ///< HLL sketch storage
Expand Down
4 changes: 2 additions & 2 deletions include/cuco/detail/hyperloglog/kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ CUCO_KERNEL void add_if_shmem(
auto idx = cuco::detail::global_thread_id();
auto const block = cooperative_groups::this_thread_block();

local_ref_type local_ref(cuda::std::span{local_sketch, ref.sketch_bytes()}, {});
local_ref_type local_ref(cuda::std::span{local_sketch, ref.sketch_bytes()}, ref.hash_function());
local_ref.clear(block);
block.sync();

Expand Down Expand Up @@ -92,7 +92,7 @@ CUCO_KERNEL void add_if_shmem_vectorized(typename RefType::value_type const* fir
auto const grid = cooperative_groups::this_grid();
auto const block = cooperative_groups::this_thread_block();

local_ref_type local_ref(cuda::std::span{local_sketch, ref.sketch_bytes()}, {});
local_ref_type local_ref(cuda::std::span{local_sketch, ref.sketch_bytes()}, ref.hash_function());
local_ref.clear(block);
block.sync();

Expand Down
32 changes: 32 additions & 0 deletions tests/hyperloglog/spark_parity_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <cstddef>
#include <cstdint>
#include <cstring> // std::memcpy
#include <vector>

/**
* @file spark_parity_test.cu
Expand Down Expand Up @@ -99,6 +100,37 @@ TEST_CASE("hyperloglog: Spark parity: deterministic cardinality estimation", "")
REQUIRE(relative_error < expected_standard_deviation * tolerance_factor);
}

TEST_CASE("hyperloglog: Spark parity: regression for issue #696", "")
{
using T = int;
using estimator_type = cuco::hyperloglog<T, cuda::thread_scope_device, cuco::xxhash_64<T>>;

auto const standard_deviation = cuco::standard_deviation{0.3};
std::vector<T> const host_items = {
434971005, -1801141102, 1963272577, -493001830, -1087762159, 843441079, 959409252,
252071729, -1830233271, 820808802, -1535782039, 1531475465, 1642188005, 552222160,
-194998970, 2109544455, 1405026214, 1672131131, 1247840828, -180033177, -1286780806,
933672832, 1401381638, -241603026, 615622263, -957425136, -276735314, -2009711680,
-639722582, 974221725, 713012837, -1402812678, -546850329, -866141232, 848946484,
-635203849, -1450175774, 844979905, 888971584, 1855780699, -1268565561, -1185513673,
1019479409, -1333229875, -1246182436, -2147483648, 900525526, 1006079044, -698588704,
-943987698, 27695788, -84695147, -1441291062, 397673504, -392707402, 1290858625,
1420750585, -1178564290, 1921246226, 188935376, 6560145, -1928347973, 820364161,
-401706971, -1118924186, 1759421546, -1350108963, 2097517825, -23883470, -1221269093,
1264159503, 97097882, 982791723, 638708040, -349593807, 361658100, 341780548,
-4171545, 1095633384, -1694321873, 1777502952, -1699998259, -1432813716, 1113816192,
-966808405, 1583478695, -650293396, 35500231, -440874147, 995739986, 207692068,
0, -1243401007, -1576220155, 1868986580, -87141217, 2108694405, -251958436,
2028975576, 1725957984, -354115601, 888726314, 1032487345, -1968749299, 1880817790,
1113480821, 789387254, -1724956749, -1201901245};
thrust::device_vector<T> items = host_items;

estimator_type estimator{standard_deviation, cuco::xxhash_64<T>{42}};
estimator.add(items.begin(), items.end());

REQUIRE(estimator.estimate() == 81);
}

// the following test is omitted since we refrain from doing randomized unit tests in cuco
// TEST_CASE("hyperloglog: Spark parity: random cardinality estimation", "")

Expand Down
Loading