diff --git a/.changeset/tiny-forks-deny.md b/.changeset/tiny-forks-deny.md new file mode 100644 index 0000000000..a17cad1802 --- /dev/null +++ b/.changeset/tiny-forks-deny.md @@ -0,0 +1,7 @@ +--- +'@hyperdx/common-utils': minor +'@hyperdx/api': minor +'@hyperdx/app': minor +--- + +feat: support sample-weighted aggregations for sampled trace data diff --git a/.gitignore b/.gitignore index d6c3ee284b..923e67f729 100644 --- a/.gitignore +++ b/.gitignore @@ -53,6 +53,9 @@ packages/app/next-env.d.ts **/.npm package-lock.json +# npm lockfile (project uses yarn) +package-lock.json + # dependency directories **/node_modules diff --git a/docker/clickhouse/local/init-db-e2e.sh b/docker/clickhouse/local/init-db-e2e.sh index 4166a6e3e4..7b6fe87889 100755 --- a/docker/clickhouse/local/init-db-e2e.sh +++ b/docker/clickhouse/local/init-db-e2e.sh @@ -81,6 +81,7 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.e2e_otel_traces \`Links.TraceState\` Array(String) CODEC(ZSTD(1)), \`Links.Attributes\` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), \`__hdx_materialized_rum.sessionId\` String MATERIALIZED ResourceAttributes['rum.sessionId'] CODEC(ZSTD(1)), + \`SampleRate\` UInt64 MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) CODEC(T64, ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_rum_session_id __hdx_materialized_rum.sessionId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, diff --git a/docker/otel-collector/schema/seed/00005_otel_traces.sql b/docker/otel-collector/schema/seed/00005_otel_traces.sql index 98341a6733..853ea471fe 100644 --- a/docker/otel-collector/schema/seed/00005_otel_traces.sql +++ b/docker/otel-collector/schema/seed/00005_otel_traces.sql @@ -24,6 +24,7 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_traces `Links.TraceState` Array(String) CODEC(ZSTD(1)), `Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), `__hdx_materialized_rum.sessionId` String MATERIALIZED ResourceAttributes['rum.sessionId'] CODEC(ZSTD(1)), + `SampleRate` UInt64 MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) CODEC(T64, ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_rum_session_id __hdx_materialized_rum.sessionId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, diff --git a/packages/api/migrations/ch/000002_add_sample_rate_column_to_otel_traces.down.sql b/packages/api/migrations/ch/000002_add_sample_rate_column_to_otel_traces.down.sql new file mode 100644 index 0000000000..c21fb812e8 --- /dev/null +++ b/packages/api/migrations/ch/000002_add_sample_rate_column_to_otel_traces.down.sql @@ -0,0 +1 @@ +ALTER TABLE ${DATABASE}.otel_traces DROP COLUMN IF EXISTS `SampleRate`; diff --git a/packages/api/migrations/ch/000002_add_sample_rate_column_to_otel_traces.up.sql b/packages/api/migrations/ch/000002_add_sample_rate_column_to_otel_traces.up.sql new file mode 100644 index 0000000000..5f2cf8cfa5 --- /dev/null +++ b/packages/api/migrations/ch/000002_add_sample_rate_column_to_otel_traces.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE ${DATABASE}.otel_traces + ADD COLUMN IF NOT EXISTS `SampleRate` UInt64 + MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) + CODEC(T64, ZSTD(1)); diff --git a/packages/api/src/controllers/ai.ts b/packages/api/src/controllers/ai.ts index f745832e93..e7c0a496ca 100644 --- a/packages/api/src/controllers/ai.ts +++ b/packages/api/src/controllers/ai.ts @@ -9,6 +9,8 @@ import { AILineTableResponse, AssistantLineTableConfigSchema, ChartConfigWithDateRange, + pickSampleWeightExpressionProps, + SourceKind, } from '@hyperdx/common-utils/dist/types'; import type { LanguageModel } from 'ai'; import * as chrono from 'chrono-node'; @@ -327,6 +329,7 @@ export function getChartConfigFromResolvedConfig( connection: source.connection.toString(), groupBy: resObject.groupBy, timestampValueExpression: source.timestampValueExpression, + ...pickSampleWeightExpressionProps(source), dateRange: [dateRange[0].toString(), dateRange[1].toString()], markdown: resObject.markdown, granularity: 'auto', diff --git a/packages/api/src/models/source.ts b/packages/api/src/models/source.ts index 1e6fb20830..25201dd0f9 100644 --- a/packages/api/src/models/source.ts +++ b/packages/api/src/models/source.ts @@ -163,6 +163,7 @@ export const TraceSource = Source.discriminator( parentSpanIdExpression: String, spanNameExpression: String, spanKindExpression: String, + sampleRateExpression: String, logSourceId: String, sessionSourceId: String, metricSourceId: String, diff --git a/packages/api/src/routers/external-api/v2/charts.ts b/packages/api/src/routers/external-api/v2/charts.ts index 7f574ed9ce..5da7395810 100644 --- a/packages/api/src/routers/external-api/v2/charts.ts +++ b/packages/api/src/routers/external-api/v2/charts.ts @@ -4,6 +4,7 @@ import { Granularity } from '@hyperdx/common-utils/dist/core/utils'; import { ChartConfigWithOptDateRange, DisplayType, + pickSampleWeightExpressionProps, SourceKind, } from '@hyperdx/common-utils/dist/types'; import opentelemetry, { SpanStatusCode } from '@opentelemetry/api'; @@ -280,6 +281,7 @@ const buildChartConfigFromRequest = async ( ], where: '', timestampValueExpression: source.timestampValueExpression, + ...pickSampleWeightExpressionProps(source), dateRange: [new Date(params.startTime), new Date(params.endTime)] as [ Date, Date, diff --git a/packages/api/src/tasks/checkAlerts/index.ts b/packages/api/src/tasks/checkAlerts/index.ts index d69fbd4023..33888fbe9d 100644 --- a/packages/api/src/tasks/checkAlerts/index.ts +++ b/packages/api/src/tasks/checkAlerts/index.ts @@ -23,6 +23,8 @@ import { import { BuilderChartConfigWithOptDateRange, DisplayType, + getSampleWeightExpression, + pickSampleWeightExpressionProps, SourceKind, } from '@hyperdx/common-utils/dist/types'; import * as fns from 'date-fns'; @@ -107,6 +109,7 @@ export async function computeAliasWithClauses( source.kind === SourceKind.Log || source.kind === SourceKind.Trace ? source.implicitColumnExpression : undefined, + ...pickSampleWeightExpressionProps(source), timestampValueExpression: source.timestampValueExpression, }; const query = await renderChartConfig(config, metadata, source.querySettings); @@ -453,6 +456,7 @@ const getChartConfigFromAlert = ( source.kind === SourceKind.Log || source.kind === SourceKind.Trace ? source.implicitColumnExpression : undefined, + ...pickSampleWeightExpressionProps(source), timestampValueExpression: source.timestampValueExpression, }; } else if (details.taskType === AlertTaskType.TILE) { @@ -474,6 +478,7 @@ const getChartConfigFromAlert = ( source.kind === SourceKind.Log || source.kind === SourceKind.Trace ? source.implicitColumnExpression : undefined; + const sampleWeightExpression = getSampleWeightExpression(source); const metricTables = source.kind === SourceKind.Metric ? source.metricTables : undefined; return { @@ -486,6 +491,7 @@ const getChartConfigFromAlert = ( granularity: `${windowSizeInMins} minute`, groupBy: tile.config.groupBy, implicitColumnExpression, + sampleWeightExpression, metricTables, select: tile.config.select, timestampValueExpression: source.timestampValueExpression, diff --git a/packages/api/src/tasks/checkAlerts/template.ts b/packages/api/src/tasks/checkAlerts/template.ts index dce38c276c..8f438516c1 100644 --- a/packages/api/src/tasks/checkAlerts/template.ts +++ b/packages/api/src/tasks/checkAlerts/template.ts @@ -10,6 +10,7 @@ import { AlertChannelType, ChartConfigWithOptDateRange, DisplayType, + pickSampleWeightExpressionProps, SourceKind, WebhookService, zAlertChannelType, @@ -598,6 +599,7 @@ ${targetTemplate}`; where: savedSearch.where, whereLanguage: savedSearch.whereLanguage, implicitColumnExpression: source.implicitColumnExpression, + ...pickSampleWeightExpressionProps(source), timestampValueExpression: source.timestampValueExpression, orderBy: savedSearch.orderBy, limit: { diff --git a/packages/app/src/DBDashboardPage.tsx b/packages/app/src/DBDashboardPage.tsx index 6714b08f0f..781b5cc694 100644 --- a/packages/app/src/DBDashboardPage.tsx +++ b/packages/app/src/DBDashboardPage.tsx @@ -32,6 +32,7 @@ import { DashboardFilter, DisplayType, Filter, + getSampleWeightExpression, isLogSource, isTraceSource, SearchCondition, @@ -233,6 +234,7 @@ const Tile = forwardRef( ...chart.config, // Populate these two columns from the source to support Lucene-based filters ...pick(source, ['implicitColumnExpression', 'from']), + sampleWeightExpression: getSampleWeightExpression(source), dateRange, granularity, filters, @@ -267,6 +269,7 @@ const Tile = forwardRef( isLogSource(source) || isTraceSource(source) ? source.implicitColumnExpression : undefined, + sampleWeightExpression: getSampleWeightExpression(source), filters, metricTables: isMetricSource ? source.metricTables : undefined, }); diff --git a/packages/app/src/DBSearchPage.tsx b/packages/app/src/DBSearchPage.tsx index 1d2a14390a..bcb86425bd 100644 --- a/packages/app/src/DBSearchPage.tsx +++ b/packages/app/src/DBSearchPage.tsx @@ -36,6 +36,7 @@ import { Filter, isLogSource, isTraceSource, + pickSampleWeightExpressionProps, SourceKind, TSource, } from '@hyperdx/common-utils/dist/types'; @@ -687,6 +688,7 @@ function useSearchedConfigToChartConfig( whereLanguage: whereLanguage ?? 'sql', timestampValueExpression: sourceObj.timestampValueExpression, implicitColumnExpression: sourceObj.implicitColumnExpression, + ...pickSampleWeightExpressionProps(sourceObj), connection: sourceObj.connection, displayType: DisplayType.Search, orderBy: orderBy || defaultSearchConfig?.orderBy || defaultOrderBy, diff --git a/packages/app/src/ServicesDashboardPage.tsx b/packages/app/src/ServicesDashboardPage.tsx index 5795006043..a4c6a71690 100644 --- a/packages/app/src/ServicesDashboardPage.tsx +++ b/packages/app/src/ServicesDashboardPage.tsx @@ -18,6 +18,7 @@ import { Filter, isLogSource, isTraceSource, + pickSampleWeightExpressionProps, PresetDashboard, SourceKind, TTraceSource, @@ -33,6 +34,7 @@ function pickSourceConfigFields(source: TSource) { ...(isLogSource(source) || isTraceSource(source) ? { implicitColumnExpression: source.implicitColumnExpression } : {}), + ...pickSampleWeightExpressionProps(source), }; } import { diff --git a/packages/app/src/SessionSubpanel.tsx b/packages/app/src/SessionSubpanel.tsx index 6ccf93d510..f5a302f162 100644 --- a/packages/app/src/SessionSubpanel.tsx +++ b/packages/app/src/SessionSubpanel.tsx @@ -8,6 +8,7 @@ import { tcFromSource } from '@hyperdx/common-utils/dist/core/metadata'; import { ChartConfigWithOptDateRange, DateRange, + pickSampleWeightExpressionProps, SearchCondition, SearchConditionLanguage, TSessionSource, @@ -188,6 +189,7 @@ function useSessionChartConfigs({ where, timestampValueExpression: traceSource.timestampValueExpression, implicitColumnExpression: traceSource.implicitColumnExpression, + ...pickSampleWeightExpressionProps(traceSource), connection: traceSource.connection, orderBy: `${traceSource.timestampValueExpression} ASC`, limit: { diff --git a/packages/app/src/components/AlertPreviewChart.tsx b/packages/app/src/components/AlertPreviewChart.tsx index b1baabef37..9d4dbd51c6 100644 --- a/packages/app/src/components/AlertPreviewChart.tsx +++ b/packages/app/src/components/AlertPreviewChart.tsx @@ -3,6 +3,7 @@ import { aliasMapToWithClauses } from '@hyperdx/common-utils/dist/core/utils'; import { AlertInterval, Filter, + getSampleWeightExpression, isLogSource, isTraceSource, SearchCondition, @@ -74,6 +75,7 @@ export const AlertPreviewChart = ({ isLogSource(source) || isTraceSource(source) ? source.implicitColumnExpression : undefined, + sampleWeightExpression: getSampleWeightExpression(source), groupBy, with: aliasWith, select: [ diff --git a/packages/app/src/components/ChartEditor/utils.ts b/packages/app/src/components/ChartEditor/utils.ts index e3e5e047a4..eee57a5c74 100644 --- a/packages/app/src/components/ChartEditor/utils.ts +++ b/packages/app/src/components/ChartEditor/utils.ts @@ -8,6 +8,7 @@ import { BuilderSavedChartConfig, ChartConfigWithDateRange, DisplayType, + getSampleWeightExpression, isLogSource, isMetricSource, isTraceSource, @@ -142,6 +143,7 @@ export function convertFormStateToChartConfig( isLogSource(source) || isTraceSource(source) ? source.implicitColumnExpression : undefined, + sampleWeightExpression: getSampleWeightExpression(source), metricTables: isMetricSource(source) ? source.metricTables : undefined, where: form.where ?? '', select: isSelectEmpty diff --git a/packages/app/src/components/ServiceDashboardDbQuerySidePanel.tsx b/packages/app/src/components/ServiceDashboardDbQuerySidePanel.tsx index 8580e54fdf..bbcfd20c43 100644 --- a/packages/app/src/components/ServiceDashboardDbQuerySidePanel.tsx +++ b/packages/app/src/components/ServiceDashboardDbQuerySidePanel.tsx @@ -4,6 +4,7 @@ import { parseAsString, useQueryState } from 'nuqs'; import { DisplayType, type Filter, + pickSampleWeightExpressionProps, SourceKind, } from '@hyperdx/common-utils/dist/types'; import { Drawer, Grid, Text } from '@mantine/core'; @@ -109,6 +110,7 @@ export default function ServiceDashboardDbQuerySidePanel({ 'connection', 'from', ]), + ...pickSampleWeightExpressionProps(source), where: '', whereLanguage: 'sql', select: [ @@ -146,6 +148,7 @@ export default function ServiceDashboardDbQuerySidePanel({ 'connection', 'from', ]), + ...pickSampleWeightExpressionProps(source), where: '', whereLanguage: 'sql', select: [ diff --git a/packages/app/src/components/ServiceDashboardEndpointPerformanceChart.tsx b/packages/app/src/components/ServiceDashboardEndpointPerformanceChart.tsx index 995ffa4ed3..91f7e0edf8 100644 --- a/packages/app/src/components/ServiceDashboardEndpointPerformanceChart.tsx +++ b/packages/app/src/components/ServiceDashboardEndpointPerformanceChart.tsx @@ -1,5 +1,8 @@ import { pick } from 'lodash'; -import { TTraceSource } from '@hyperdx/common-utils/dist/types'; +import { + pickSampleWeightExpressionProps, + TTraceSource, +} from '@hyperdx/common-utils/dist/types'; import { MS_NUMBER_FORMAT } from '@/ChartUtils'; import { ChartBox } from '@/components/ChartBox'; @@ -95,6 +98,7 @@ export default function ServiceDashboardEndpointPerformanceChart({ config={{ source: source.id, ...pick(source, ['timestampValueExpression', 'connection', 'from']), + ...pickSampleWeightExpressionProps(source), where: '', whereLanguage: 'sql', select: [ diff --git a/packages/app/src/components/ServiceDashboardEndpointSidePanel.tsx b/packages/app/src/components/ServiceDashboardEndpointSidePanel.tsx index 26998b5386..4bcd8cfa3b 100644 --- a/packages/app/src/components/ServiceDashboardEndpointSidePanel.tsx +++ b/packages/app/src/components/ServiceDashboardEndpointSidePanel.tsx @@ -4,6 +4,7 @@ import { parseAsString, useQueryState } from 'nuqs'; import { DisplayType, type Filter, + pickSampleWeightExpressionProps, SourceKind, } from '@hyperdx/common-utils/dist/types'; import { Drawer, Grid, Text } from '@mantine/core'; @@ -116,6 +117,7 @@ export default function ServiceDashboardEndpointSidePanel({ 'connection', 'from', ]), + ...pickSampleWeightExpressionProps(source), where: '', whereLanguage: 'sql', select: [ @@ -159,6 +161,7 @@ export default function ServiceDashboardEndpointSidePanel({ 'connection', 'from', ]), + ...pickSampleWeightExpressionProps(source), where: '', whereLanguage: 'sql', select: [ diff --git a/packages/app/src/components/ServiceDashboardSlowestEventsTile.tsx b/packages/app/src/components/ServiceDashboardSlowestEventsTile.tsx index 048a3975c4..0f2d169ed1 100644 --- a/packages/app/src/components/ServiceDashboardSlowestEventsTile.tsx +++ b/packages/app/src/components/ServiceDashboardSlowestEventsTile.tsx @@ -1,6 +1,10 @@ import { pick } from 'lodash'; import { ClickHouseQueryError } from '@hyperdx/common-utils/dist/clickhouse'; -import type { Filter, TTraceSource } from '@hyperdx/common-utils/dist/types'; +import { + type Filter, + pickSampleWeightExpressionProps, + type TTraceSource, +} from '@hyperdx/common-utils/dist/types'; import { Box, Code, Group, Text } from '@mantine/core'; import { ChartBox } from '@/components/ChartBox'; @@ -33,6 +37,7 @@ export default function SlowestEventsTile({ { source: source.id, ...pick(source, ['timestampValueExpression', 'connection', 'from']), + ...pickSampleWeightExpressionProps(source), where: '', whereLanguage: 'sql', select: [ @@ -117,6 +122,7 @@ export default function SlowestEventsTile({ 'connection', 'from', ]), + ...pickSampleWeightExpressionProps(source), where: '', whereLanguage: 'sql', select: [ diff --git a/packages/app/src/components/Sources/SourceForm.tsx b/packages/app/src/components/Sources/SourceForm.tsx index 26d3b310c2..2a249f940c 100644 --- a/packages/app/src/components/Sources/SourceForm.tsx +++ b/packages/app/src/components/Sources/SourceForm.tsx @@ -1545,6 +1545,21 @@ export function TraceTableModelForm(props: TableModelProps) { placeholder="SpanAttributes" /> + + + = fromUnixTimestamp64Milli(1739318400000) AND __hdx_time_bucket2 <= fromUnixTimestamp64Milli(1765670400000)) GROUP BY toStartOfInterval(toDateTime(__hdx_time_bucket2), INTERVAL 1 minute) AS \`__hdx_time_bucket\` ORDER BY toStartOfInterval(toDateTime(__hdx_time_bucket2), INTERVAL 1 minute) AS \`__hdx_time_bucket\` LIMIT 10 SETTINGS short_circuit_function_evaluation = 'force_enable', optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000" `; +exports[`renderChartConfig sample-weighted aggregations should handle complex sampleWeightExpression like SpanAttributes map access 1`] = `"SELECT sum(greatest(toUInt64OrZero(toString(SpanAttributes['SampleRate'])), 1)) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should handle mixed weighted and passthrough aggregations 1`] = ` +"SELECT sum(greatest(toUInt64OrZero(toString(SampleRate)), 1)) AS \\"weighted_count\\",sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL) / nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0) AS \\"weighted_avg\\",min( + toFloat64OrDefault(toString(Duration)) + ) AS \\"min_duration\\",count(DISTINCT TraceId) AS \\"unique_traces\\" FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000" +`; + +exports[`renderChartConfig sample-weighted aggregations should leave count_distinct unchanged with sampleWeightExpression 1`] = `"SELECT count(DISTINCT TraceId) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should leave min/max unchanged with sampleWeightExpression 1`] = ` +"SELECT min( + toFloat64OrDefault(toString(Duration)) + ),max( + toFloat64OrDefault(toString(Duration)) + ) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000" +`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite avg to weighted average 1`] = `"SELECT sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL) / nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite avg with where condition 1`] = `"SELECT sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL) / nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (ServiceName = 'api') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite count() to sum(greatest(...)) 1`] = `"SELECT sum(greatest(toUInt64OrZero(toString(SampleRate)), 1)) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite countIf to sumIf(greatest(...), cond) 1`] = `"SELECT sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), StatusCode = 'Error') FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (StatusCode = 'Error') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite quantile to quantileTDigestWeighted 1`] = `"SELECT quantileTDigestWeighted(0.99)(toFloat64OrDefault(toString(Duration)), toUInt32(greatest(toUInt64OrZero(toString(SampleRate)), 1))) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite quantile with where condition 1`] = `"SELECT quantileTDigestWeightedIf(0.95)(toFloat64OrDefault(toString(Duration)), toUInt32(greatest(toUInt64OrZero(toString(SampleRate)), 1)), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (ServiceName = 'api') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite sum to weighted sum 1`] = `"SELECT sum(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1)) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + +exports[`renderChartConfig sample-weighted aggregations should rewrite sum with where condition 1`] = `"SELECT sumIf(toFloat64OrDefault(toString(Duration)) * greatest(toUInt64OrZero(toString(SampleRate)), 1), ServiceName = 'api' AND toFloat64OrDefault(toString(Duration)) IS NOT NULL) FROM default.otel_traces WHERE (Timestamp >= fromUnixTimestamp64Milli(1739318400000) AND Timestamp <= fromUnixTimestamp64Milli(1739491200000)) AND (ServiceName = 'api') SETTINGS optimize_read_in_order = 0, cast_keep_nullable = 1, additional_result_filter = 'x != 2', count_distinct_implementation = 'uniqCombined64', async_insert_busy_timeout_min_ms = 20000"`; + exports[`renderChartConfig should generate sql for a single gauge metric 1`] = ` "WITH Source AS ( SELECT diff --git a/packages/common-utils/src/__tests__/renderChartConfig.test.ts b/packages/common-utils/src/__tests__/renderChartConfig.test.ts index 72af637c76..6725496453 100644 --- a/packages/common-utils/src/__tests__/renderChartConfig.test.ts +++ b/packages/common-utils/src/__tests__/renderChartConfig.test.ts @@ -1911,4 +1911,374 @@ describe('renderChartConfig', () => { ); }); }); + + describe('sample-weighted aggregations', () => { + const baseSampledConfig: ChartConfigWithOptDateRange = { + displayType: DisplayType.Table, + connection: 'test-connection', + from: { + databaseName: 'default', + tableName: 'otel_traces', + }, + select: [], + where: '', + whereLanguage: 'sql', + timestampValueExpression: 'Timestamp', + sampleWeightExpression: 'SampleRate', + dateRange: [new Date('2025-02-12'), new Date('2025-02-14')], + }; + + it('should rewrite count() to sum(greatest(...))', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain( + 'greatest(toUInt64OrZero(toString(SampleRate)), 1)', + ); + expect(actual).toContain('sum('); + expect(actual).not.toContain('count()'); + expect(actual).toMatchSnapshot(); + }); + + it('should rewrite countIf to sumIf(greatest(...), cond)', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: "StatusCode = 'Error'", + aggConditionLanguage: 'sql', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain( + 'sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1)', + ); + expect(actual).not.toContain('countIf'); + expect(actual).toMatchSnapshot(); + }); + + it('should rewrite avg to weighted average', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: '', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain( + '* greatest(toUInt64OrZero(toString(SampleRate)), 1)', + ); + expect(actual).toContain( + '/ nullIf(sumIf(greatest(toUInt64OrZero(toString(SampleRate)), 1), toFloat64OrDefault(toString(Duration)) IS NOT NULL), 0)', + ); + expect(actual).not.toContain('avg('); + expect(actual).toMatchSnapshot(); + }); + + it('should rewrite sum to weighted sum', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'sum', + valueExpression: 'Duration', + aggCondition: '', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain( + '* greatest(toUInt64OrZero(toString(SampleRate)), 1)', + ); + expect(actual).toMatchSnapshot(); + }); + + it('should rewrite quantile to quantileTDigestWeighted', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'quantile', + valueExpression: 'Duration', + aggCondition: '', + level: 0.99, + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('quantileTDigestWeighted(0.99)'); + expect(actual).toContain( + 'toUInt32(greatest(toUInt64OrZero(toString(SampleRate)), 1))', + ); + expect(actual).not.toContain('quantile(0.99)'); + expect(actual).toMatchSnapshot(); + }); + + it('should leave min/max unchanged with sampleWeightExpression', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'min', + valueExpression: 'Duration', + aggCondition: '', + }, + { + aggFn: 'max', + valueExpression: 'Duration', + aggCondition: '', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('min('); + expect(actual).toContain('max('); + expect(actual).not.toContain('SampleRate'); + expect(actual).toMatchSnapshot(); + }); + + it('should leave count_distinct unchanged with sampleWeightExpression', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'count_distinct', + valueExpression: 'TraceId', + aggCondition: '', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('count(DISTINCT'); + expect(actual).not.toContain('SampleRate'); + expect(actual).toMatchSnapshot(); + }); + + it('should handle complex sampleWeightExpression like SpanAttributes map access', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + sampleWeightExpression: "SpanAttributes['SampleRate']", + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain( + "greatest(toUInt64OrZero(toString(SpanAttributes['SampleRate'])), 1)", + ); + expect(actual).toContain('sum('); + expect(actual).not.toContain('count()'); + expect(actual).toMatchSnapshot(); + }); + + it('should rewrite avg with where condition', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: "ServiceName = 'api'", + aggConditionLanguage: 'sql', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('sumIf('); + expect(actual).toContain("ServiceName = 'api'"); + expect(actual).not.toContain('avg('); + expect(actual).toMatchSnapshot(); + }); + + it('should rewrite sum with where condition', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'sum', + valueExpression: 'Duration', + aggCondition: "ServiceName = 'api'", + aggConditionLanguage: 'sql', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('sumIf('); + expect(actual).toContain("ServiceName = 'api'"); + expect(actual).toMatchSnapshot(); + }); + + it('should rewrite quantile with where condition', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'quantile', + valueExpression: 'Duration', + aggCondition: "ServiceName = 'api'", + aggConditionLanguage: 'sql', + level: 0.95, + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('quantileTDigestWeightedIf(0.95)'); + expect(actual).toContain("ServiceName = 'api'"); + expect(actual).not.toContain('quantile(0.95)'); + expect(actual).toMatchSnapshot(); + }); + + it('should handle mixed weighted and passthrough aggregations', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + alias: 'weighted_count', + }, + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: '', + alias: 'weighted_avg', + }, + { + aggFn: 'min', + valueExpression: 'Duration', + aggCondition: '', + alias: 'min_duration', + }, + { + aggFn: 'count_distinct', + valueExpression: 'TraceId', + aggCondition: '', + alias: 'unique_traces', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('sum('); + expect(actual).toContain('min('); + expect(actual).toContain('count(DISTINCT'); + expect(actual).not.toContain('count()'); + expect(actual).not.toContain('avg('); + expect(actual).toMatchSnapshot(); + }); + + it('should not rewrite aggregations without sampleWeightExpression', async () => { + const config: ChartConfigWithOptDateRange = { + ...baseSampledConfig, + sampleWeightExpression: undefined, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + }, + ], + }; + + const generatedSql = await renderChartConfig( + config, + mockMetadata, + querySettings, + ); + const actual = parameterizedQueryToSql(generatedSql); + expect(actual).toContain('count()'); + expect(actual).not.toContain('SampleRate'); + }); + }); }); diff --git a/packages/common-utils/src/__tests__/sampleWeightedAggregations.int.test.ts b/packages/common-utils/src/__tests__/sampleWeightedAggregations.int.test.ts new file mode 100644 index 0000000000..1cfdd9dde6 --- /dev/null +++ b/packages/common-utils/src/__tests__/sampleWeightedAggregations.int.test.ts @@ -0,0 +1,465 @@ +import type { ClickHouseClient } from '@clickhouse/client'; +import { createClient } from '@clickhouse/client'; + +import { parameterizedQueryToSql } from '@/clickhouse'; +import { ClickhouseClient as HdxClickhouseClient } from '@/clickhouse/node'; +import { Metadata, MetadataCache } from '@/core/metadata'; +import { + ChartConfigWithOptDateRange, + DisplayType, + QuerySettings, +} from '@/types'; + +import { renderChartConfig } from '../core/renderChartConfig'; + +describe('sample-weighted aggregations (integration)', () => { + let client: ClickHouseClient; + let hdxClient: HdxClickhouseClient; + let metadata: Metadata; + + const DB = 'default'; + const MAIN_TABLE = 'test_sample_weighted_main'; + const EDGE_TABLE = 'test_sample_weighted_edge'; + + const querySettings: QuerySettings = [ + { setting: 'optimize_read_in_order', value: '0' }, + { setting: 'cast_keep_nullable', value: '1' }, + ]; + + const baseConfig: ChartConfigWithOptDateRange = { + displayType: DisplayType.Table, + connection: 'test-connection', + from: { databaseName: DB, tableName: MAIN_TABLE }, + select: [], + where: '', + whereLanguage: 'sql', + timestampValueExpression: 'Timestamp', + sampleWeightExpression: 'SampleRate', + dateRange: [new Date('2025-01-01'), new Date('2025-12-31')], + }; + + async function executeChartConfig( + config: ChartConfigWithOptDateRange, + ): Promise> { + const generatedSql = await renderChartConfig( + config, + metadata, + querySettings, + ); + const sql = parameterizedQueryToSql(generatedSql); + const result = await client.query({ query: sql, format: 'JSONEachRow' }); + const rows = (await result.json()) as Record[]; + expect(rows.length).toBeGreaterThanOrEqual(1); + return rows[0]!; + } + + async function executeChartConfigAllRows( + config: ChartConfigWithOptDateRange, + ): Promise[]> { + const generatedSql = await renderChartConfig( + config, + metadata, + querySettings, + ); + const sql = parameterizedQueryToSql(generatedSql); + const result = await client.query({ query: sql, format: 'JSONEachRow' }); + return (await result.json()) as Record[]; + } + + beforeAll(async () => { + const host = process.env.CLICKHOUSE_HOST || 'http://localhost:8123'; + const username = process.env.CLICKHOUSE_USER || 'default'; + const password = process.env.CLICKHOUSE_PASSWORD || ''; + + client = createClient({ url: host, username, password }); + hdxClient = new HdxClickhouseClient({ host, username, password }); + + await client.command({ + query: ` + CREATE OR REPLACE TABLE ${DB}.${MAIN_TABLE} ( + Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)), + Duration Float64, + ServiceName LowCardinality(String), + SampleRate UInt64 + ) + ENGINE = MergeTree() + ORDER BY (Timestamp) + `, + }); + + await client.command({ + query: ` + INSERT INTO ${DB}.${MAIN_TABLE} + (Timestamp, Duration, ServiceName, SampleRate) + VALUES + ('2025-06-01 00:00:01', 100, 'api', 1), + ('2025-06-01 00:00:02', 200, 'api', 5), + ('2025-06-01 00:00:03', 150, 'api', 10), + ('2025-06-01 00:00:04', 250, 'api', 1), + ('2025-06-01 00:00:05', 80, 'api', 1), + ('2025-06-01 00:00:06', 120, 'api', 5), + ('2025-06-01 00:00:07', 300, 'web', 1), + ('2025-06-01 00:00:08', 50, 'web', 5), + ('2025-06-01 00:00:09', 175, 'web', 10), + ('2025-06-01 00:00:10', 400, 'web', 1) + `, + }); + + await client.command({ + query: ` + CREATE OR REPLACE TABLE ${DB}.${EDGE_TABLE} ( + Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)), + Duration Float64, + SampleRate UInt64, + ServiceName LowCardinality(String), + SpanAttributes Map(LowCardinality(String), String) + ) + ENGINE = MergeTree() + ORDER BY (Timestamp) + `, + }); + + await client.command({ + query: ` + INSERT INTO ${DB}.${EDGE_TABLE} + (Timestamp, Duration, SampleRate, ServiceName, SpanAttributes) + VALUES + ('2025-06-01 00:00:01', 100, 1, 'api', map('SampleRate', '1')), + ('2025-06-01 00:00:02', 200, 1, 'api', map('SampleRate', '1')), + ('2025-06-01 00:00:03', 300, 1, 'web', map('SampleRate', '1')), + ('2025-06-01 00:00:04', 400, 1, 'web', map('SampleRate', 'abc')), + ('2025-06-01 00:00:05', 50, 1000000, 'api', map('SampleRate', '1000000')) + `, + }); + }); + + beforeEach(() => { + metadata = new Metadata(hdxClient, new MetadataCache()); + }); + + afterAll(async () => { + await client.command({ + query: `DROP TABLE IF EXISTS ${DB}.${EDGE_TABLE}`, + }); + await client.command({ + query: `DROP TABLE IF EXISTS ${DB}.${MAIN_TABLE}`, + }); + await client.close(); + }); + + it('weighted avg when no rows match aggCondition: NULL, not division error', async () => { + const rows = await executeChartConfigAllRows({ + ...baseConfig, + select: [ + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: "ServiceName = 'nonexistent'", + aggConditionLanguage: 'sql', + alias: 'weighted_avg', + }, + ], + }); + expect(rows).toHaveLength(1); + const raw = rows[0]!['weighted_avg']; + expect( + raw === undefined || + raw === null || + raw === '' || + String(raw).toLowerCase() === 'null', + ).toBe(true); + }); + + it('weighted sum when no rows match aggCondition: should return 0', async () => { + const result = await executeChartConfig({ + ...baseConfig, + select: [ + { + aggFn: 'sum', + valueExpression: 'Duration', + aggCondition: "ServiceName = 'nonexistent'", + aggConditionLanguage: 'sql', + alias: 'weighted_sum', + }, + ], + }); + expect(Number(result['weighted_sum'])).toBe(0); + }); + + it('weighted count when no rows match aggCondition: should return 0', async () => { + const result = await executeChartConfig({ + ...baseConfig, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: "ServiceName = 'nonexistent'", + aggConditionLanguage: 'sql', + alias: 'weighted_count', + }, + ], + }); + expect(Number(result['weighted_count'])).toBe(0); + }); + + it('groupBy ServiceName: weighted count per group', async () => { + const rows = await executeChartConfigAllRows({ + ...baseConfig, + groupBy: 'ServiceName', + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + alias: 'weighted_count', + }, + ], + }); + + const byService = Object.fromEntries( + rows.map(r => [r['ServiceName'] as string, Number(r['weighted_count'])]), + ); + expect(byService['api']).toBe(23); + expect(byService['web']).toBe(17); + }); + + it('groupBy ServiceName: weighted avg(Duration) per group', async () => { + const rows = await executeChartConfigAllRows({ + ...baseConfig, + groupBy: 'ServiceName', + select: [ + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: '', + alias: 'weighted_avg', + }, + ], + }); + + const byService = Object.fromEntries( + rows.map(r => [r['ServiceName'] as string, Number(r['weighted_avg'])]), + ); + expect(byService['api']).toBeCloseTo(3530 / 23, 2); + expect(byService['web']).toBeCloseTo(2700 / 17, 2); + }); + + it('groupBy ServiceName: weighted sum(Duration) per group', async () => { + const rows = await executeChartConfigAllRows({ + ...baseConfig, + groupBy: 'ServiceName', + select: [ + { + aggFn: 'sum', + valueExpression: 'Duration', + aggCondition: '', + alias: 'weighted_sum', + }, + ], + }); + + const byService = Object.fromEntries( + rows.map(r => [r['ServiceName'] as string, Number(r['weighted_sum'])]), + ); + expect(byService['api']).toBe(3530); + expect(byService['web']).toBe(2700); + }); + + it('time-series with granularity: weighted count per time bucket', async () => { + const rows = await executeChartConfigAllRows({ + ...baseConfig, + displayType: DisplayType.Line, + granularity: '1 minute', + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + alias: 'weighted_count', + }, + ], + }); + + expect(rows.length).toBeGreaterThanOrEqual(1); + const totalCount = rows.reduce( + (acc, r) => acc + Number(r['weighted_count']), + 0, + ); + expect(totalCount).toBe(40); + }); + + it('time-series with groupBy: weighted count per service per time bucket', async () => { + const rows = await executeChartConfigAllRows({ + ...baseConfig, + displayType: DisplayType.Line, + granularity: '1 minute', + groupBy: 'ServiceName', + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + alias: 'weighted_count', + }, + ], + }); + + const byService = new Map(); + for (const r of rows) { + const svc = r['ServiceName'] as string; + byService.set( + svc, + (byService.get(svc) ?? 0) + Number(r['weighted_count']), + ); + } + expect(byService.get('api')).toBe(23); + expect(byService.get('web')).toBe(17); + }); + + describe('additional edge cases', () => { + const edgeConfig: ChartConfigWithOptDateRange = { + displayType: DisplayType.Table, + connection: 'test-connection', + from: { databaseName: DB, tableName: EDGE_TABLE }, + select: [], + where: '', + whereLanguage: 'sql', + timestampValueExpression: 'Timestamp', + sampleWeightExpression: 'SampleRate', + dateRange: [new Date('2025-01-01'), new Date('2025-12-31')], + }; + + it('all SampleRate=1: weighted results should equal unweighted results', async () => { + const filterConfig = { + ...edgeConfig, + where: 'SampleRate = 1', + whereLanguage: 'sql' as const, + }; + + const weightedResult = await executeChartConfig({ + ...filterConfig, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + alias: 'wcount', + }, + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: '', + alias: 'wavg', + }, + { + aggFn: 'sum', + valueExpression: 'Duration', + aggCondition: '', + alias: 'wsum', + }, + ], + }); + + const unweightedResult = await executeChartConfig({ + ...filterConfig, + sampleWeightExpression: undefined, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + alias: 'count', + }, + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: '', + alias: 'avg', + }, + { + aggFn: 'sum', + valueExpression: 'Duration', + aggCondition: '', + alias: 'sum', + }, + ], + }); + + expect(Number(weightedResult['wcount'])).toBe( + Number(unweightedResult['count']), + ); + expect(Number(weightedResult['wavg'])).toBeCloseTo( + Number(unweightedResult['avg']), + 5, + ); + expect(Number(weightedResult['wsum'])).toBeCloseTo( + Number(unweightedResult['sum']), + 5, + ); + }); + + it('non-numeric SampleRate in SpanAttributes: should clamp to weight 1', async () => { + const result = await executeChartConfig({ + ...edgeConfig, + sampleWeightExpression: "SpanAttributes['SampleRate']", + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: "ServiceName = 'web'", + aggConditionLanguage: 'sql', + alias: 'wcount', + }, + ], + }); + expect(Number(result['wcount'])).toBe(2); + }); + + it('very large SampleRate: should handle without overflow', async () => { + const result = await executeChartConfig({ + ...edgeConfig, + select: [ + { + aggFn: 'count', + valueExpression: '', + aggCondition: '', + alias: 'wcount', + }, + { + aggFn: 'sum', + valueExpression: 'Duration', + aggCondition: '', + alias: 'wsum', + }, + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: '', + alias: 'wavg', + }, + ], + }); + + expect(Number(result['wcount'])).toBe(1000004); + expect(Number(result['wsum'])).toBe(50001000); + expect(Number(result['wavg'])).toBeCloseTo(50001000 / 1000004, 2); + }); + + it('very large SampleRate: weighted avg dominated by high-weight row', async () => { + const result = await executeChartConfig({ + ...edgeConfig, + select: [ + { + aggFn: 'avg', + valueExpression: 'Duration', + aggCondition: '', + alias: 'wavg', + }, + ], + }); + const value = Number(result['wavg']); + expect(value).toBeGreaterThan(49); + expect(value).toBeLessThan(51); + }); + }); +}); diff --git a/packages/common-utils/src/core/renderChartConfig.ts b/packages/common-utils/src/core/renderChartConfig.ts index 944e96a57b..c54720b12b 100644 --- a/packages/common-utils/src/core/renderChartConfig.ts +++ b/packages/common-utils/src/core/renderChartConfig.ts @@ -325,11 +325,13 @@ const aggFnExpr = ({ expr, level, where, + sampleWeightExpression, }: { fn: AggregateFunction | AggregateFunctionWithCombinators; expr?: string; level?: number; where?: string; + sampleWeightExpression?: string; }) => { const isAny = fn === 'any'; const isNone = fn === 'none'; @@ -371,6 +373,79 @@ const aggFnExpr = ({ })`; } + // Sample-weighted aggregations: when sampleWeightExpression is set, + // each row carries a weight (defaults to 1 for unsampled spans). + // Corrected formulas account for upstream sampling (1-in-N). + // The greatest(..., 1) ensures unsampled rows (missing/empty/zero) + // are counted at weight 1 rather than dropped. + if ( + sampleWeightExpression && + !fn.endsWith('Merge') && + !fn.endsWith('State') + ) { + const sampleWeightExpr = `greatest(toUInt64OrZero(toString(${sampleWeightExpression})), 1)`; + const w = { UNSAFE_RAW_SQL: sampleWeightExpr }; + + if (fn === 'count') { + return isWhereUsed + ? chSql`sumIf(${w}, ${{ UNSAFE_RAW_SQL: where }})` + : chSql`sum(${w})`; + } + + if (fn === 'none') { + return chSql`${{ UNSAFE_RAW_SQL: expr ?? '' }}`; + } + + if (expr != null) { + if (fn === 'count_distinct' || fn === 'min' || fn === 'max') { + // These cannot be corrected for sampling; pass through unchanged + if (fn === 'count_distinct') { + return chSql`count${isWhereUsed ? 'If' : ''}(DISTINCT ${{ + UNSAFE_RAW_SQL: expr, + }}${isWhereUsed ? chSql`, ${{ UNSAFE_RAW_SQL: where }}` : ''})`; + } + return chSql`${{ UNSAFE_RAW_SQL: fn }}${isWhereUsed ? 'If' : ''}( + ${unsafeExpr}${isWhereUsed ? chSql`, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }}` : ''} + )`; + } + + if (fn === 'avg') { + const weightedVal = { + UNSAFE_RAW_SQL: `${unsafeExpr.UNSAFE_RAW_SQL} * ${sampleWeightExpr}`, + }; + const nullCheck = `${unsafeExpr.UNSAFE_RAW_SQL} IS NOT NULL`; + if (isWhereUsed) { + const cond = { UNSAFE_RAW_SQL: `${where} AND ${nullCheck}` }; + return chSql`sumIf(${weightedVal}, ${cond}) / nullIf(sumIf(${w}, ${cond}), 0)`; + } + return chSql`sumIf(${weightedVal}, ${{ UNSAFE_RAW_SQL: nullCheck }}) / nullIf(sumIf(${w}, ${{ UNSAFE_RAW_SQL: nullCheck }}), 0)`; + } + + if (fn === 'sum') { + const weightedVal = { + UNSAFE_RAW_SQL: `${unsafeExpr.UNSAFE_RAW_SQL} * ${sampleWeightExpr}`, + }; + if (isWhereUsed) { + return chSql`sumIf(${weightedVal}, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }})`; + } + return chSql`sum(${weightedVal})`; + } + + if (level != null && fn.startsWith('quantile')) { + const levelStr = Number.isFinite(level) ? `${level}` : '0'; + const weightArg = { + UNSAFE_RAW_SQL: `toUInt32(${sampleWeightExpr})`, + }; + if (isWhereUsed) { + return chSql`quantileTDigestWeightedIf(${{ UNSAFE_RAW_SQL: levelStr }})(${unsafeExpr}, ${weightArg}, ${{ UNSAFE_RAW_SQL: whereWithExtraNullCheck }})`; + } + return chSql`quantileTDigestWeighted(${{ UNSAFE_RAW_SQL: levelStr }})(${unsafeExpr}, ${weightArg})`; + } + + // For any other fn (last_value, any, etc.), fall through to default + } + } + if (fn === 'count') { if (isWhereUsed) { return chSql`${fn}If(${{ UNSAFE_RAW_SQL: where }})`; @@ -484,12 +559,14 @@ async function renderSelectList( // @ts-expect-error (TS doesn't know that we've already checked for quantile) level: select.level, where: whereClause.sql, + sampleWeightExpression: chartConfig.sampleWeightExpression, }); } else { expr = aggFnExpr({ fn: select.aggFn, expr: select.valueExpression, where: whereClause.sql, + sampleWeightExpression: chartConfig.sampleWeightExpression, }); } diff --git a/packages/common-utils/src/types.ts b/packages/common-utils/src/types.ts index d7e52ef819..1f80383db6 100644 --- a/packages/common-utils/src/types.ts +++ b/packages/common-utils/src/types.ts @@ -520,6 +520,7 @@ const SharedChartDisplaySettingsSchema = z.object({ export const _ChartConfigSchema = SharedChartDisplaySettingsSchema.extend({ timestampValueExpression: z.string(), implicitColumnExpression: z.string().optional(), + sampleWeightExpression: z.string().optional(), markdown: z.string().optional(), filtersLogicalOperator: z.enum(['AND', 'OR']).optional(), filters: z.array(FilterSchema).optional(), @@ -929,6 +930,7 @@ export const TraceSourceSchema = BaseSourceSchema.extend({ spanKindExpression: z.string().min(1, 'Span Kind Expression is required'), // Optional fields for traces + sampleRateExpression: z.string().optional(), logSourceId: z.string().optional().nullable(), sessionSourceId: z.string().optional(), metricSourceId: z.string().optional(), @@ -1018,6 +1020,28 @@ export function isMetricSource(source: TSource): source is TMetricSource { return source.kind === SourceKind.Metric; } +type SourceLikeForSampleWeight = { + kind: SourceKind; + sampleRateExpression?: string | null; +}; + +/** Trace sample rate expression for chart sampleWeightExpression when set. */ +export function getSampleWeightExpression( + source: SourceLikeForSampleWeight, +): string | undefined { + return source.kind === SourceKind.Trace && source.sampleRateExpression + ? source.sampleRateExpression + : undefined; +} + +/** For object spread: { ...pickSampleWeightExpressionProps(source) } */ +export function pickSampleWeightExpressionProps( + source: SourceLikeForSampleWeight, +): { sampleWeightExpression: string } | undefined { + const w = getSampleWeightExpression(source); + return w ? { sampleWeightExpression: w } : undefined; +} + export const AssistantLineTableConfigSchema = z.object({ displayType: z.enum([DisplayType.Line, DisplayType.Table]), markdown: z.string().optional(),