Skip to content

Commit 31af804

Browse files
authored
statistics: add count-min sketch UDAF implementation (#28786)
1 parent 7054b5d commit 31af804

File tree

11 files changed

+603
-0
lines changed

11 files changed

+603
-0
lines changed

ydb/apps/ydbd/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ PEERDIR(
4848
ydb/library/yql/udfs/common/clickhouse/client
4949
ydb/library/yql/udfs/common/knn
5050
ydb/library/yql/udfs/common/roaring
51+
ydb/library/yql/udfs/statistics_internal
5152
yql/essentials/parser/pg_wrapper
5253
yql/essentials/sql/pg
5354
yql/essentials/udfs/common/compress_base
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# StatisticsInternal YQL UDF module
2+
3+
This module contains the implementation of UDAFs needed for column statistics calculation.
4+
5+
They are not intended to be invoked by end users.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#include <ydb/library/yql/udfs/statistics_internal/all_agg_funcs.h>
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#pragma once
2+
3+
#include <ydb/core/scheme/scheme_tablecell.h>
4+
#include <yql/essentials/core/minsketch/count_min_sketch.h>
5+
6+
namespace NKikimr::NStat::NAggFuncs {
7+
8+
class TCMSAggFunc {
9+
public:
10+
static constexpr std::string_view GetName() { return "CountMinSketch"; }
11+
12+
using TState = std::unique_ptr<NKikimr::TCountMinSketch>;
13+
14+
static constexpr size_t ParamsCount = 2;
15+
16+
static TState CreateState(const std::array<ui64, ParamsCount>& params) {
17+
return TState(NKikimr::TCountMinSketch::Create(params[0], params[1]));
18+
}
19+
static auto CreateStateUpdater() {
20+
return [](TState& state, const TCell& cell) {
21+
state->Count(cell.Data(), cell.Size());
22+
};
23+
}
24+
static void MergeStates(const TState& left, TState& right) {
25+
*right += *left;
26+
}
27+
static TString SerializeState(const TState& state) {
28+
return TString(state->AsStringBuf());
29+
}
30+
static TState DeserializeState(const char* data, size_t size) {
31+
return TState(NKikimr::TCountMinSketch::FromString(data, size));
32+
}
33+
static TString FinalizeState(const TState& state) {
34+
return SerializeState(state);
35+
}
36+
};
37+
38+
using TAllAggFuncsList = TTypeList<
39+
TCMSAggFunc
40+
>;
41+
42+
} // NKikimr::NStat::NAggFuncs
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"test.test[count_min_sketch]": [
3+
{
4+
"uri": "file://test.test_count_min_sketch_/results.txt"
5+
}
6+
]
7+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
[
2+
{
3+
"Write" = [
4+
{
5+
"Type" = [
6+
"ListType";
7+
[
8+
"StructType";
9+
[
10+
[
11+
"column0";
12+
[
13+
"OptionalType";
14+
[
15+
"DataType";
16+
"String"
17+
]
18+
]
19+
];
20+
[
21+
"column1";
22+
[
23+
"OptionalType";
24+
[
25+
"DataType";
26+
"String"
27+
]
28+
]
29+
]
30+
]
31+
]
32+
];
33+
"Data" = [
34+
[
35+
[
36+
"\2\0\0\0\0\0\0\0\2\0\0\0\0\0\0\0\3\0\0\0\0\0\0\0\2\0\0\0\1\0\0\0\1\0\0\0\2\0\0\0"
37+
];
38+
[
39+
"\3\0\0\0\0\0\0\0\3\0\0\0\0\0\0\0\3\0\0\0\0\0\0\0\2\0\0\0\0\0\0\0\1\0\0\0\1\0\0\0\0\0\0\0\2\0\0\0\1\0\0\0\1\0\0\0\1\0\0\0"
40+
]
41+
]
42+
]
43+
}
44+
]
45+
}
46+
]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
$get_factory = ($width, $depth) -> { return AggregationFactory(
2+
"UDAF",
3+
($item, $parent) -> { return Udf(StatisticsInternal::CountMinSketchCreate, $parent as Depends)($item, $width, $depth) },
4+
($state, $item, $parent) -> { return Udf(StatisticsInternal::CountMinSketchAddValue, $parent as Depends)($state, $item) },
5+
StatisticsInternal::CountMinSketchMerge,
6+
StatisticsInternal::CountMinSketchFinalize,
7+
StatisticsInternal::CountMinSketchSerialize,
8+
StatisticsInternal::CountMinSketchDeserialize,
9+
)
10+
};
11+
12+
$t1 = [
13+
<|key: 1, value: "v1"|>,
14+
<|key: 2, value: "v2"|>,
15+
<|key: 3, value: "v3"|>
16+
];
17+
18+
select AGGREGATE_BY(key, $get_factory(2, 2)), AGGREGATE_BY(value, $get_factory(3, 3)) from AS_TABLE($t1);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
YQL_UDF_TEST()
2+
3+
DEPENDS(ydb/library/yql/udfs/statistics_internal)
4+
5+
SIZE(MEDIUM)
6+
7+
END()

0 commit comments

Comments
 (0)