diff --git a/apps/start/src/routes/_app.$organizationId.$projectId.reports_.$reportId.tsx b/apps/start/src/routes/_app.$organizationId.$projectId.reports_.$reportId.tsx
index 714ec89f1..ae258b296 100644
--- a/apps/start/src/routes/_app.$organizationId.$projectId.reports_.$reportId.tsx
+++ b/apps/start/src/routes/_app.$organizationId.$projectId.reports_.$reportId.tsx
@@ -28,6 +28,7 @@ export const Route = createFileRoute(
},
validateSearch: z.object({
dashboardId: z.string().optional(),
+ range: z.string().optional(),
}),
pendingComponent: FullPageLoadingState,
});
diff --git a/apps/start/src/utils/csv-download.ts b/apps/start/src/utils/csv-download.ts
new file mode 100644
index 000000000..39b3efbd7
--- /dev/null
+++ b/apps/start/src/utils/csv-download.ts
@@ -0,0 +1,138 @@
+import type { RouterOutputs } from '@/trpc/client';
+
+type IChartData = RouterOutputs['chart']['chart'];
+type FunnelData = RouterOutputs['chart']['funnel'];
+type CohortData = RouterOutputs['chart']['cohort'];
+type ConversionData = RouterOutputs['chart']['conversion'];
+
+function escapeCsvValue(value: string | number | null | undefined): string {
+ if (value === null || value === undefined) return '';
+ const str = String(value);
+ if (str.includes(',') || str.includes('"') || str.includes('\n')) {
+ return `"${str.replace(/"/g, '""')}"`;
+ }
+ return str;
+}
+
+function buildCSV(rows: (string | number | null | undefined)[][]): string {
+ return rows.map((row) => row.map(escapeCsvValue).join(',')).join('\n');
+}
+
+export function downloadCSV(content: string, filename: string): void {
+ const blob = new Blob([content], { type: 'text/csv;charset=utf-8;' });
+ const url = URL.createObjectURL(blob);
+ const link = document.createElement('a');
+ link.href = url;
+ link.download = filename;
+ document.body.appendChild(link);
+ link.click();
+ document.body.removeChild(link);
+ URL.revokeObjectURL(url);
+}
+
+export function chartDataToCSV(data: IChartData): string {
+ if (!data.series.length) return '';
+
+ const allDates = [
+ ...new Set(data.series.flatMap((s) => s.data.map((d) => d.date))),
+ ].sort();
+
+ // Rows = series, columns = dates
+ const rows = data.series.map((serie) => {
+ const label = serie.names.join(' / ') || serie.event.name;
+ const values = allDates.map((date) => {
+ const point = serie.data.find((d) => d.date === date);
+ return point?.count ?? '';
+ });
+ return [label, ...values];
+ });
+
+ return buildCSV([['Series', ...allDates], ...rows]);
+}
+
+export function funnelDataToCSV(data: FunnelData): string {
+ if (!data.current.length) return '';
+
+ const hasBreakdowns = data.current.length > 1;
+
+ if (hasBreakdowns) {
+ // Columns = steps, rows = breakdown × metric
+ const steps = data.current[0]!.steps.map((s) => s.event.displayName);
+ const header = ['Breakdown / Metric', ...steps];
+ const rows: (string | number | null | undefined)[][] = [];
+
+ for (const variant of data.current) {
+ const label = variant.breakdowns.join(' / ') || '(all)';
+ rows.push([`${label} (Count)`, ...variant.steps.map((s) => s.count)]);
+ rows.push([`${label} (Conversion %)`, ...variant.steps.map((s) => s.percent)]);
+ rows.push([`${label} (Dropped After)`, ...variant.steps.map((s) => s.dropoffCount)]);
+ rows.push([`${label} (Dropoff %)`, ...variant.steps.map((s) => s.dropoffPercent)]);
+ }
+
+ return buildCSV([header, ...rows]);
+ }
+
+ // Columns = steps, rows = metrics
+ const funnel = data.current[0]!;
+ const steps = funnel.steps.map((s) => s.event.displayName);
+ const header = ['Metric', ...steps];
+ const rows = [
+ ['Count', ...funnel.steps.map((s) => s.count)],
+ ['Conversion %', ...funnel.steps.map((s) => s.percent)],
+ ['Dropped After', ...funnel.steps.map((s) => s.dropoffCount)],
+ ['Dropoff %', ...funnel.steps.map((s) => s.dropoffPercent)],
+ ];
+
+ return buildCSV([header, ...rows]);
+}
+
+export function cohortDataToCSV(data: CohortData): string {
+ if (!data.length) return '';
+
+ const cohortDates = data.map((row) => row.cohort_interval);
+ const maxPeriods = data[0]?.values.length ?? 0;
+
+ // Rows = periods, columns = cohort dates
+ const totalRow: (string | number | null | undefined)[] = [
+ 'Total Profiles',
+ ...data.map((row) => row.sum),
+ ];
+
+ const periodRows = Array.from({ length: maxPeriods }, (_, i) => {
+ const label = i === 0 ? 'Period <1' : `Period ${i}`;
+ return [label, ...data.map((row) => row.values[i] ?? '')];
+ });
+
+ return buildCSV([['Cohort Date', ...cohortDates], totalRow, ...periodRows]);
+}
+
+export function cohortMembersToCSV(profileIds: string[]): string {
+ if (!profileIds.length) return '';
+ return buildCSV([['profile_id'], ...profileIds.map((id) => [id])]);
+}
+
+export function conversionDataToCSV(data: ConversionData): string {
+ if (!data.current.length) return '';
+
+ const allDates = data.current[0]?.data.map((d) => d.date) ?? [];
+
+ // Rows = series × metric, columns = dates
+ const rows = data.current.flatMap((serie) => {
+ const label = serie.breakdowns.join(' / ') || serie.id;
+ const rateRow = [
+ `${label} (Rate %)`,
+ ...allDates.map((date) => serie.data.find((d) => d.date === date)?.rate ?? ''),
+ ];
+ const convRow = [
+ `${label} (Conversions)`,
+ ...allDates.map((date) => serie.data.find((d) => d.date === date)?.conversions ?? ''),
+ ];
+ const totalRow = [
+ `${label} (Total)`,
+ ...allDates.map((date) => serie.data.find((d) => d.date === date)?.total ?? ''),
+ ];
+ return [rateRow, convRow, totalRow];
+ });
+
+ return buildCSV([['Series', ...allDates], ...rows]);
+}
diff --git a/apps/start/src/utils/title.ts b/apps/start/src/utils/title.ts
index e735636e2..1974618d6 100644
--- a/apps/start/src/utils/title.ts
+++ b/apps/start/src/utils/title.ts
@@ -94,6 +94,7 @@ export const PAGE_TITLES = {
PROFILES: 'Profiles',
PROFILE_EVENTS: 'Profile events',
PROFILE_DETAILS: 'Profile details',
+ COHORTS: 'Cohorts',
// Sub-sections
CONVERSIONS: 'Conversions',
diff --git a/apps/start/vite.config.ts b/apps/start/vite.config.ts
index 3b0bc073a..ef94a3d14 100644
--- a/apps/start/vite.config.ts
+++ b/apps/start/vite.config.ts
@@ -21,6 +21,7 @@ if (process.env.NITRO) {
nitroV2Plugin({
preset: 'node-server',
compatibilityDate: '2025-10-21',
+ serveStatic: true,
}),
);
} else {
diff --git a/apps/worker/package.json b/apps/worker/package.json
index c8ae019e3..ab4c23550 100644
--- a/apps/worker/package.json
+++ b/apps/worker/package.json
@@ -14,6 +14,7 @@
"@bull-board/api": "6.14.0",
"@bull-board/express": "6.14.0",
"@openpanel/common": "workspace:*",
+ "@openpanel/constants": "workspace:*",
"@openpanel/db": "workspace:*",
"@openpanel/email": "workspace:*",
"@openpanel/integrations": "workspace:^",
diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts
index 9650598e8..d1ce19aac 100644
--- a/apps/worker/src/boot-cron.ts
+++ b/apps/worker/src/boot-cron.ts
@@ -22,7 +22,7 @@ export async function bootCron() {
{
name: 'flush',
type: 'flushEvents',
- pattern: 1000 * 10,
+ pattern: 1000 * 5,
},
{
name: 'flush',
@@ -34,6 +34,11 @@ export async function bootCron() {
type: 'flushSessions',
pattern: 1000 * 10,
},
+ // {
+ // name: 'customAlerts',
+ // type: 'customAlerts',
+ // pattern: '*/15 * * * *',
+ // },
];
if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') {
diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts
index 55652de8d..42896e240 100644
--- a/apps/worker/src/boot-workers.ts
+++ b/apps/worker/src/boot-workers.ts
@@ -64,6 +64,76 @@ function getEnabledQueues(): QueueName[] {
return queues;
}
+function getPodIndex(): number {
+ const hostname = process.env.HOSTNAME || '';
+
+ // Extract numeric index from StatefulSet pod name (e.g., openpanel-worker-0 -> 0)
+ const match = hostname.match(/-(\d+)$/);
+ if (match) {
+ return Number.parseInt(match[1], 10);
+ }
+
+ // Fallback to 0 if no numeric suffix found
+ logger.warn('Could not extract pod index from hostname, defaulting to 0', {
+ hostname,
+ });
+ return 0;
+}
+
+function getAutoPartitionedQueues(): QueueName[] {
+ const enabledQueues = getEnabledQueues();
+
+ const enableShardDistribution =
+ process.env.ENABLE_SHARD_DISTRIBUTION === 'true';
+
+ if (!enableShardDistribution) {
+ return enabledQueues;
+ }
+
+ if (!enabledQueues.includes('events')) {
+ return enabledQueues;
+ }
+
+ const totalPods = Number.parseInt(process.env.TOTAL_POD || '1', 10);
+
+ if (totalPods <= 1) {
+ logger.info('Shard distribution disabled: TOTAL_POD not set or = 1', {
+ totalShards: EVENTS_GROUP_QUEUES_SHARDS,
+ });
+ return enabledQueues;
+ }
+
+ const podIndex = getPodIndex();
+ const shardsPerPod = Math.floor(EVENTS_GROUP_QUEUES_SHARDS / totalPods);
+ const remainderShards = EVENTS_GROUP_QUEUES_SHARDS % totalPods;
+ const extraShard = podIndex < remainderShards ? 1 : 0;
+ const startShard =
+ podIndex * shardsPerPod + Math.min(podIndex, remainderShards);
+ const endShard = startShard + shardsPerPod + extraShard;
+
+ const specificShards = Array.from(
+ { length: endShard - startShard },
+ (_, i) => `events_${startShard + i}`,
+ );
+
+ const partitionedQueues = [
+ ...enabledQueues.filter((q) => q !== 'events'),
+ ...specificShards,
+ ];
+
+ logger.info('Shard distribution enabled', {
+ hostname: process.env.HOSTNAME,
+ podIndex,
+ totalPods,
+ totalShards: EVENTS_GROUP_QUEUES_SHARDS,
+ assignedShards: `${startShard}-${endShard - 1}`,
+ shardsCount: endShard - startShard,
+ queues: partitionedQueues,
+ });
+
+ return partitionedQueues;
+}
+
/**
* Gets the concurrency setting for a queue from environment variables.
* Env var format: {QUEUE_NAME}_CONCURRENCY (e.g., EVENTS_0_CONCURRENCY=32)
@@ -83,7 +153,11 @@ function getConcurrencyFor(queueName: string, defaultValue = 1): number {
}
export async function bootWorkers() {
- const enabledQueues = getEnabledQueues();
+ const enableShardDistribution =
+ process.env.ENABLE_SHARD_DISTRIBUTION === 'true';
+ const enabledQueues = enableShardDistribution
+ ? getAutoPartitionedQueues()
+ : getEnabledQueues();
const workers: (Worker | GroupWorker
)[] = [];
@@ -182,9 +256,15 @@ export async function bootWorkers() {
const importWorker = new Worker(importQueue.name, importJob, {
...workerOptions,
concurrency,
+ lockDuration: 300000, // 5 minutes - for testing to see actual errors faster
+ stalledInterval: 150000, // 2.5 minutes - check for stalled jobs
});
workers.push(importWorker);
- logger.info('Started worker for import', { concurrency });
+ logger.info('Started worker for import', {
+ concurrency,
+ lockDuration: 300000,
+ stalledInterval: 150000,
+ });
}
if (workers.length === 0) {
diff --git a/apps/worker/src/jobs/cron.custom-alerts.ts b/apps/worker/src/jobs/cron.custom-alerts.ts
new file mode 100644
index 000000000..c06a788fd
--- /dev/null
+++ b/apps/worker/src/jobs/cron.custom-alerts.ts
@@ -0,0 +1,503 @@
+import {
+ type AlertFrequency,
+ ALERT_FREQUENCY_MS,
+ ALERT_FREQUENCY_TO_CURRENT_RANGE,
+ ALERT_FREQUENCY_TO_INTERVAL,
+ ALERT_FREQUENCY_TO_RANGE,
+ ANOMALY_HISTORY_COUNT,
+ CONFIDENCE_Z_SCORES,
+} from '@openpanel/constants';
+import {
+ ChartEngine,
+ conversionService,
+ createNotification,
+ db,
+ funnelService,
+ getDatesFromRange,
+ getReportById,
+ getSettingsForProject,
+} from '@openpanel/db';
+
+import { logger } from '../utils/logger';
+
+// Inline types — @openpanel/validation is not a direct worker dependency
+type INotificationRuleThresholdConfig = {
+ type: 'threshold';
+ reportId: string;
+ operator: 'above' | 'below';
+ value: number;
+ frequency: AlertFrequency;
+};
+
+type INotificationRuleAnomalyConfig = {
+ type: 'anomaly';
+ reportId: string;
+ confidence: '95' | '98' | '99';
+ frequency: AlertFrequency;
+};
+
+/**
+ * Main custom alerts cron job
+ * Runs every 15 minutes, evaluates all threshold and anomaly rules
+ */
+export async function customAlerts() {
+ const rules = await db.notificationRule.findMany({
+ where: {
+ config: {
+ path: ['type'],
+ string_contains: '', // We'll filter in code since Prisma JSON filtering is limited
+ },
+ },
+ include: {
+ integrations: true,
+ project: {
+ select: {
+ name: true,
+ organizationId: true,
+ },
+ },
+ },
+ });
+
+ // Filter to only threshold and anomaly rules
+ const alertRules = rules.filter((rule) => {
+ const config = rule.config as { type?: string };
+ return config.type === 'threshold' || config.type === 'anomaly';
+ });
+
+ if (alertRules.length === 0) {
+ logger.debug('No custom alert rules found, skipping');
+ return;
+ }
+
+ logger.info(`[custom-alerts] Starting evaluation of ${alertRules.length} rules`);
+
+ let evaluated = 0;
+ let skippedFrequency = 0;
+ let skippedNotCrossed = 0;
+ let triggered = 0;
+ let errored = 0;
+
+ for (const rule of alertRules) {
+ try {
+ const config = rule.config as
+ | INotificationRuleThresholdConfig
+ | INotificationRuleAnomalyConfig;
+
+ // Frequency check: skip if not enough time has elapsed
+ if (rule.lastNotifiedAt) {
+ const elapsed = Date.now() - rule.lastNotifiedAt.getTime();
+ if (elapsed < ALERT_FREQUENCY_MS[config.frequency]) {
+ skippedFrequency++;
+ logger.debug(
+ `[custom-alerts] Rule "${rule.name}" (${rule.id}): skipped — frequency limit not elapsed (${Math.round(elapsed / 1000 / 60)}min / ${config.frequency})`,
+ );
+ continue;
+ }
+ }
+
+ // Fetch the report this alert is tied to
+ const report = await getReportById(config.reportId);
+ if (!report) {
+ logger.warn(
+ `[custom-alerts] Rule "${rule.name}" (${rule.id}): report ${config.reportId} not found — skipping`,
+ );
+ errored++;
+ continue;
+ }
+
+ evaluated++;
+ let shouldAlert = false;
+ let title = '';
+ let message = '';
+
+ if (config.type === 'threshold') {
+ const result = await evaluateThreshold(report, config);
+ shouldAlert = result.shouldAlert;
+ title = result.title;
+ message = result.message;
+
+ logger.info(
+ `[custom-alerts] Rule "${rule.name}" (${rule.id}): threshold ${config.operator} ${config.value} — current: ${result.currentValue} — ${shouldAlert ? 'TRIGGERED' : 'not crossed'}`,
+ );
+ } else if (config.type === 'anomaly') {
+ const result = await evaluateAnomaly(report, config);
+ shouldAlert = result.shouldAlert;
+ title = result.title;
+ message = result.message;
+
+ logger.info(
+ `[custom-alerts] Rule "${rule.name}" (${rule.id}): anomaly ${config.confidence}% — current: ${result.currentValue} — band: [${result.lowerBound}, ${result.upperBound}] — ${shouldAlert ? 'TRIGGERED' : 'within range'}`,
+ );
+ }
+
+ if (!shouldAlert) {
+ skippedNotCrossed++;
+ continue;
+ }
+
+ triggered++;
+
+ // Build dashboard link scoped to the alert's evaluation window
+ const dashboardUrl =
+ process.env.DASHBOARD_URL || process.env.NEXT_PUBLIC_DASHBOARD_URL;
+ const organizationId = rule.project?.organizationId;
+ const alertRange = ALERT_FREQUENCY_TO_CURRENT_RANGE[config.frequency as AlertFrequency];
+ const reportLink =
+ dashboardUrl && organizationId
+ ? `${dashboardUrl}/${organizationId}/${rule.projectId}/reports/${config.reportId}?range=${alertRange}`
+ : '';
+
+ // Build rich notification message
+ const projectName = rule.project?.name || '';
+ const now = new Date();
+ const timeStr = now.toLocaleDateString('en-US', {
+ weekday: 'short',
+ year: 'numeric',
+ month: 'short',
+ day: '2-digit',
+ hour: '2-digit',
+ minute: '2-digit',
+ timeZoneName: 'short',
+ });
+
+ const lines = [message];
+ if (reportLink) {
+ lines.push(`Report: ${reportLink}`);
+ }
+ lines.push(`Time: ${timeStr}`);
+ if (projectName) {
+ lines.push(`Project: ${projectName}`);
+ }
+ const fullMessage = lines.join('\n');
+
+ const integrationCount = rule.integrations.length + (rule.sendToApp ? 1 : 0);
+ logger.info(
+ `[custom-alerts] Rule "${rule.name}" (${rule.id}): sending ${integrationCount} notification(s)`,
+ );
+
+ const promises = rule.integrations.map((integration) =>
+ createNotification({
+ title,
+ message: fullMessage,
+ projectId: rule.projectId,
+ integrationId: integration.id,
+ notificationRuleId: rule.id,
+ payload: null,
+ }),
+ );
+
+ if (rule.sendToApp) {
+ promises.push(
+ createNotification({
+ title,
+ message: fullMessage,
+ projectId: rule.projectId,
+ integrationId: 'app',
+ notificationRuleId: rule.id,
+ payload: null,
+ }),
+ );
+ }
+
+ await Promise.all(promises);
+
+ // Update lastNotifiedAt
+ await db.notificationRule.update({
+ where: { id: rule.id },
+ data: { lastNotifiedAt: new Date() },
+ });
+
+ logger.info(
+ `[custom-alerts] Rule "${rule.name}" (${rule.id}): lastNotifiedAt updated`,
+ );
+ } catch (error) {
+ errored++;
+ logger.error(
+ `[custom-alerts] Rule "${rule.name}" (${rule.id}): error — ${error}`,
+ );
+ }
+ }
+
+ logger.info(
+ `[custom-alerts] Done — ${alertRules.length} rules: ${evaluated} evaluated, ${triggered} triggered, ${skippedFrequency} skipped (frequency), ${skippedNotCrossed} skipped (not crossed), ${errored} errored`,
+ );
+}
+
+/**
+ * Evaluate a threshold alert rule
+ * Runs the report query for the current frequency window and compares against the fixed threshold
+ */
+async function evaluateThreshold(
+ report: NonNullable>>,
+ config: INotificationRuleThresholdConfig,
+) {
+ const freq = config.frequency;
+ const interval = ALERT_FREQUENCY_TO_INTERVAL[freq];
+
+ // Conversion charts: use conversionService to get the actual conversion rate (%)
+ if (report.chartType === 'conversion') {
+ const { timezone } = await getSettingsForProject(report.projectId);
+ const { startDate, endDate } = getDatesFromRange(
+ ALERT_FREQUENCY_TO_CURRENT_RANGE[freq] as any,
+ timezone,
+ );
+
+ const series = await conversionService.getConversion({
+ projectId: report.projectId,
+ startDate,
+ endDate,
+ series: report.series,
+ breakdowns: report.breakdowns,
+ interval: interval as any,
+ timezone,
+ funnelGroup: report.funnelGroup,
+ funnelWindow: report.funnelWindow,
+ globalFilters: report.globalFilters,
+ holdProperties: report.holdProperties,
+ measuring: 'conversion_rate',
+ limit: report.limit,
+ cohortFilters: report.cohortFilters,
+ });
+
+ const data = series[0]?.data ?? [];
+ const lastPoint = data[data.length - 1];
+ const currentValue = lastPoint?.rate ?? 0;
+
+ const crossed =
+ config.operator === 'above'
+ ? currentValue > config.value
+ : currentValue < config.value;
+
+ return {
+ shouldAlert: crossed,
+ currentValue,
+ title: `Alert: ${report.name}`,
+ message: `The current value for ${report.name} is ${currentValue.toFixed(2)}%. Triggered when the current value is ${config.operator} ${config.value}%.`,
+ };
+ }
+
+ // Funnel charts: use funnelService to get the overall conversion % from the last step
+ if (report.chartType === 'funnel') {
+ const { timezone } = await getSettingsForProject(report.projectId);
+ const { startDate, endDate } = getDatesFromRange(
+ ALERT_FREQUENCY_TO_CURRENT_RANGE[freq] as any,
+ timezone,
+ );
+
+ const funnelResult = await funnelService.getFunnel({
+ projectId: report.projectId,
+ startDate,
+ endDate,
+ series: report.series,
+ breakdowns: report.breakdowns,
+ interval: interval as any,
+ range: report.range,
+ chartType: report.chartType,
+ metric: report.metric as any,
+ previous: false,
+ timezone,
+ funnelGroup: report.funnelGroup,
+ funnelWindow: report.funnelWindow,
+ globalFilters: report.globalFilters,
+ holdProperties: report.holdProperties,
+ measuring: 'conversion_rate',
+ limit: report.limit,
+ cohortFilters: report.cohortFilters,
+ });
+
+ const currentValue = funnelResult[0]?.lastStep?.percent ?? 0;
+
+ const crossed =
+ config.operator === 'above'
+ ? currentValue > config.value
+ : currentValue < config.value;
+
+ return {
+ shouldAlert: crossed,
+ currentValue,
+ title: `Alert: ${report.name}`,
+ message: `The current value for ${report.name} is ${currentValue.toFixed(2)}%. Triggered when the current value is ${config.operator} ${config.value}%.`,
+ };
+ }
+
+ // All other chart types: use ChartEngine with raw metric values
+ const range = ALERT_FREQUENCY_TO_CURRENT_RANGE[freq];
+
+ const result = await ChartEngine.execute({
+ projectId: report.projectId,
+ series: report.series,
+ breakdowns: report.breakdowns,
+ chartType: report.chartType,
+ interval: interval as any,
+ range: range as any,
+ previous: false,
+ formula: report.formula,
+ metric: report.metric as any,
+ globalFilters: report.globalFilters,
+ holdProperties: report.holdProperties,
+ measuring: 'conversion_rate',
+ limit: report.limit,
+ cohortFilters: report.cohortFilters,
+ });
+
+ const currentValue = result.series[0]?.metrics?.sum ?? 0;
+ const metricKey = (report.metric as string) || 'sum';
+ const metricValue =
+ (result.series[0]?.metrics as unknown as Record)?.[metricKey] ??
+ currentValue;
+
+ const crossed =
+ config.operator === 'above'
+ ? metricValue > config.value
+ : metricValue < config.value;
+
+ return {
+ shouldAlert: crossed,
+ currentValue: metricValue,
+ title: `Alert: ${report.name}`,
+ message: `The current value for ${report.name} is ${metricValue.toFixed(2)}. Triggered when the current value is ${config.operator} ${config.value}.`,
+ };
+}
+
+/**
+ * Evaluate an anomaly detection alert rule
+ * Runs the report query for historical periods, computes confidence band, checks if current value is outside
+ */
+async function evaluateAnomaly(
+ report: NonNullable>>,
+ config: INotificationRuleAnomalyConfig,
+) {
+ const freq = config.frequency;
+ const historicalRange = ALERT_FREQUENCY_TO_RANGE[freq];
+ const interval = ALERT_FREQUENCY_TO_INTERVAL[freq];
+
+ // Conversion charts: use conversionService and compare on rate (%) not raw counts
+ if (report.chartType === 'conversion') {
+ const { timezone } = await getSettingsForProject(report.projectId);
+ const { startDate, endDate } = getDatesFromRange(historicalRange as any, timezone);
+
+ const seriesResult = await conversionService.getConversion({
+ projectId: report.projectId,
+ startDate,
+ endDate,
+ series: report.series,
+ breakdowns: report.breakdowns,
+ interval: interval as any,
+ timezone,
+ funnelGroup: report.funnelGroup,
+ funnelWindow: report.funnelWindow,
+ globalFilters: report.globalFilters,
+ holdProperties: report.holdProperties,
+ measuring: 'conversion_rate',
+ limit: report.limit,
+ cohortFilters: report.cohortFilters,
+ });
+
+ const data = seriesResult[0]?.data ?? [];
+ if (data.length < 3) {
+ logger.warn(
+ `Not enough historical data for anomaly detection on report ${report.id}`,
+ );
+ return { shouldAlert: false, currentValue: 0, lowerBound: 0, upperBound: 0, title: '', message: '' };
+ }
+
+ // dataPoints are conversion rates (percentages, e.g. 88.63)
+ const dataPoints = data.map((d) => d.rate);
+ const historicalValues = dataPoints.slice(
+ 0,
+ Math.min(dataPoints.length - 1, ANOMALY_HISTORY_COUNT),
+ );
+ const currentValue = dataPoints[dataPoints.length - 1] ?? 0;
+
+ if (historicalValues.length < 3) {
+ return { shouldAlert: false, currentValue: 0, lowerBound: 0, upperBound: 0, title: '', message: '' };
+ }
+
+ const mean = historicalValues.reduce((sum, v) => sum + v, 0) / historicalValues.length;
+ const variance =
+ historicalValues.reduce((sum, v) => sum + (v - mean) ** 2, 0) /
+ historicalValues.length;
+ const stddev = Math.sqrt(variance);
+
+ const zScore = CONFIDENCE_Z_SCORES[config.confidence] ?? 1.96;
+ const lowerBound = mean - zScore * stddev;
+ const upperBound = mean + zScore * stddev;
+ const isAnomaly = currentValue < lowerBound || currentValue > upperBound;
+
+ return {
+ shouldAlert: isAnomaly,
+ currentValue,
+ lowerBound,
+ upperBound,
+ title: `Alert: ${report.name}`,
+ message: `The current value for ${report.name} is ${currentValue.toFixed(2)}%. Triggered when the current value is not within forecasted range [${lowerBound.toFixed(2)}%, ${upperBound.toFixed(2)}%].`,
+ };
+ }
+
+ // Funnel charts: no time-series data available — anomaly detection not supported
+ if (report.chartType === 'funnel') {
+ logger.warn(
+ `[custom-alerts] Anomaly detection is not supported for funnel charts (report ${report.id}) — skipping`,
+ );
+ return { shouldAlert: false, currentValue: 0, lowerBound: 0, upperBound: 0, title: '', message: '' };
+ }
+
+ // All other chart types: use ChartEngine with raw counts
+ const result = await ChartEngine.execute({
+ projectId: report.projectId,
+ series: report.series,
+ breakdowns: report.breakdowns,
+ chartType: report.chartType,
+ interval: interval as any,
+ range: historicalRange as any,
+ previous: false,
+ formula: report.formula,
+ metric: report.metric as any,
+ globalFilters: report.globalFilters,
+ holdProperties: report.holdProperties,
+ measuring: 'conversion_rate',
+ limit: report.limit,
+ cohortFilters: report.cohortFilters,
+ });
+
+ const series = result.series[0];
+ if (!series || !series.data || series.data.length < 3) {
+ logger.warn(
+ `Not enough historical data for anomaly detection on report ${report.id}`,
+ );
+ return { shouldAlert: false, currentValue: 0, lowerBound: 0, upperBound: 0, title: '', message: '' };
+ }
+
+ const dataPoints = series.data.map((d) => d.count);
+ const historicalValues = dataPoints.slice(
+ 0,
+ Math.min(dataPoints.length - 1, ANOMALY_HISTORY_COUNT),
+ );
+ const currentValue = dataPoints[dataPoints.length - 1] ?? 0;
+
+ if (historicalValues.length < 3) {
+ return { shouldAlert: false, currentValue: 0, lowerBound: 0, upperBound: 0, title: '', message: '' };
+ }
+
+ const mean =
+ historicalValues.reduce((sum, v) => sum + v, 0) / historicalValues.length;
+ const variance =
+ historicalValues.reduce((sum, v) => sum + (v - mean) ** 2, 0) /
+ historicalValues.length;
+ const stddev = Math.sqrt(variance);
+
+ const zScore = CONFIDENCE_Z_SCORES[config.confidence] ?? 1.96;
+ const lowerBound = mean - zScore * stddev;
+ const upperBound = mean + zScore * stddev;
+ const isAnomaly = currentValue < lowerBound || currentValue > upperBound;
+
+ return {
+ shouldAlert: isAnomaly,
+ currentValue,
+ lowerBound,
+ upperBound,
+ title: `Alert: ${report.name}`,
+ message: `The current value for ${report.name} is ${currentValue.toFixed(2)}. Triggered when the current value is not within forecasted range [${lowerBound.toFixed(2)}, ${upperBound.toFixed(2)}].`,
+ };
+}
diff --git a/apps/worker/src/jobs/cron.materialize-columns.ts b/apps/worker/src/jobs/cron.materialize-columns.ts
new file mode 100644
index 000000000..4e83fa3e6
--- /dev/null
+++ b/apps/worker/src/jobs/cron.materialize-columns.ts
@@ -0,0 +1,36 @@
+import { materializeColumnsService } from '@openpanel/db';
+import { logger } from '@/utils/logger';
+
+export async function materializeColumns(options: {
+ dryRun?: boolean;
+ threshold?: number;
+}) {
+ const { dryRun = false, threshold = 150 } = options;
+
+ logger.info('Starting materialized columns cron job', { dryRun, threshold });
+
+ try {
+ const result = await materializeColumnsService.analyze({
+ dryRun,
+ threshold,
+ });
+
+ logger.info('Materialized columns analysis complete', {
+ candidatesFound: result.candidates.length,
+ materialized: result.materialized,
+ dryRun,
+ });
+
+ // Log the report for visibility
+ console.log(result.report);
+
+ return {
+ success: true,
+ candidates: result.candidates.length,
+ materialized: result.materialized.length,
+ };
+ } catch (error) {
+ logger.error('Materialized columns cron job failed', { error });
+ throw error;
+ }
+}
diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts
index b50e3beb5..6b0f371da 100644
--- a/apps/worker/src/jobs/cron.ts
+++ b/apps/worker/src/jobs/cron.ts
@@ -3,7 +3,9 @@ import type { Job } from 'bullmq';
import { eventBuffer, profileBuffer, sessionBuffer } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
+import { customAlerts } from './cron.custom-alerts';
import { jobdeleteProjects } from './cron.delete-projects';
+import { materializeColumns } from './cron.materialize-columns';
import { ping } from './cron.ping';
import { salt } from './cron.salt';
@@ -27,5 +29,14 @@ export async function cronJob(job: Job) {
case 'deleteProjects': {
return await jobdeleteProjects(job);
}
+ case 'materializeColumns': {
+ return await materializeColumns({
+ dryRun: job.data.dryRun ?? false,
+ threshold: job.data.threshold ?? 150,
+ });
+ }
+ case 'customAlerts': {
+ return await customAlerts();
+ }
}
}
diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts
index 4965acac9..35e4954fe 100644
--- a/apps/worker/src/jobs/events.incoming-event.ts
+++ b/apps/worker/src/jobs/events.incoming-event.ts
@@ -11,11 +11,7 @@ import {
parseUserAgent,
} from '@openpanel/common/server';
import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db';
-import {
- checkNotificationRulesForEvent,
- createEvent,
- sessionBuffer,
-} from '@openpanel/db';
+import { createEvent, sessionBuffer } from '@openpanel/db';
import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import * as R from 'ramda';
@@ -34,11 +30,7 @@ async function createEventAndNotify(
logger: ILogger,
) {
logger.info('Creating event', { event: payload });
- const [event] = await Promise.all([
- createEvent(payload),
- checkNotificationRulesForEvent(payload).catch(() => {}),
- ]);
- return event;
+ return createEvent(payload);
}
const parseRevenue = (revenue: unknown): number | undefined => {
diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts
index 1ee1ebb90..838dcd30f 100644
--- a/apps/worker/src/jobs/events.incoming-events.test.ts
+++ b/apps/worker/src/jobs/events.incoming-events.test.ts
@@ -21,7 +21,6 @@ vi.mock('@openpanel/db', async () => {
return {
...actual,
createEvent: vi.fn(),
- checkNotificationRulesForEvent: vi.fn().mockResolvedValue(true),
sessionBuffer: {
getExistingSession: vi.fn(),
},
diff --git a/apps/worker/src/jobs/import.ts b/apps/worker/src/jobs/import.ts
index f03a053cc..b9986bd87 100644
--- a/apps/worker/src/jobs/import.ts
+++ b/apps/worker/src/jobs/import.ts
@@ -139,7 +139,7 @@ export async function importJob(job: Job) {
// Phase 1: Fetch & Transform - Process events in batches
if (shouldRunStep('loading')) {
- const eventBatch: any = [];
+ let eventBatch: any = [];
for await (const rawEvent of providerInstance.parseSource(
resumeLoadingFrom,
)) {
@@ -158,7 +158,13 @@ export async function importJob(job: Job) {
// Process batch when it reaches the batch size
if (eventBatch.length >= BATCH_SIZE) {
- jobLogger.info('Processing batch', { batchSize: eventBatch.length });
+ const memUsage = process.memoryUsage();
+ jobLogger.info('Processing batch', {
+ batchSize: eventBatch.length,
+ heapUsedMB: Math.round(memUsage.heapUsed / 1024 / 1024),
+ heapTotalMB: Math.round(memUsage.heapTotal / 1024 / 1024),
+ externalMB: Math.round(memUsage.external / 1024 / 1024),
+ });
const transformedEvents: IClickhouseEvent[] = eventBatch.map(
(
@@ -170,7 +176,7 @@ export async function importJob(job: Job) {
await insertImportBatch(transformedEvents, importId);
processedEvents += eventBatch.length;
- eventBatch.length = 0;
+ eventBatch = [];
const createdAt = new Date(transformedEvents[0]?.created_at || '')
.toISOString()
@@ -190,6 +196,14 @@ export async function importJob(job: Job) {
// Process remaining events in the last batch
if (eventBatch.length > 0) {
+ const memUsage = process.memoryUsage();
+ jobLogger.info('Processing final batch', {
+ batchSize: eventBatch.length,
+ heapUsedMB: Math.round(memUsage.heapUsed / 1024 / 1024),
+ heapTotalMB: Math.round(memUsage.heapTotal / 1024 / 1024),
+ externalMB: Math.round(memUsage.external / 1024 / 1024),
+ });
+
const transformedEvents = eventBatch.map(
(
// @ts-expect-error
@@ -200,7 +214,7 @@ export async function importJob(job: Job) {
await insertImportBatch(transformedEvents, importId);
processedEvents += eventBatch.length;
- eventBatch.length = 0;
+ eventBatch = [];
const createdAt = new Date(transformedEvents[0]?.created_at || '')
.toISOString()
diff --git a/apps/worker/src/jobs/sessions.ts b/apps/worker/src/jobs/sessions.ts
index 84b35cbb4..c0e14dc17 100644
--- a/apps/worker/src/jobs/sessions.ts
+++ b/apps/worker/src/jobs/sessions.ts
@@ -38,6 +38,7 @@ const updateEventsCount = cacheable(async function updateEventsCount(
});
if (!organization) {
+ logger.warn('updateEventsCount: Organization not found', { projectId });
return;
}
diff --git a/packages/constants/index.ts b/packages/constants/index.ts
index 17cd2d15e..c2545d964 100644
--- a/packages/constants/index.ts
+++ b/packages/constants/index.ts
@@ -91,6 +91,8 @@ export const operators = {
lt: 'Less than',
gte: 'Greater than or equal to',
lte: 'Less than or equal to',
+ inCohort: 'In cohort',
+ notInCohort: 'Not in cohort',
} as const;
export const chartTypes = {
@@ -245,3 +247,62 @@ export function getDefaultIntervalByDates(
return null;
}
+
+// ── Custom Alerts ───────────────────────────────────────────────────
+
+export type AlertFrequency = 'hour' | 'day' | 'week' | 'month';
+
+/**
+ * Maps alert frequency to the range used for historical data (anomaly detection)
+ */
+export const ALERT_FREQUENCY_TO_RANGE: Record = {
+ hour: '7d',
+ day: '30d',
+ week: '30d',
+ month: '180d',
+};
+
+/**
+ * Maps alert frequency to the range used for current period evaluation (threshold)
+ */
+export const ALERT_FREQUENCY_TO_CURRENT_RANGE: Record =
+ {
+ hour: 'lastHour',
+ day: 'today',
+ week: '7d',
+ month: '30d',
+ };
+
+/**
+ * Maps alert frequency to the ChartEngine interval
+ */
+export const ALERT_FREQUENCY_TO_INTERVAL: Record = {
+ hour: 'hour',
+ day: 'day',
+ week: 'week',
+ month: 'month',
+};
+
+/**
+ * How many historical data points to use for anomaly detection
+ */
+export const ANOMALY_HISTORY_COUNT = 30;
+
+/**
+ * Z-scores for confidence levels used in anomaly detection
+ */
+export const CONFIDENCE_Z_SCORES: Record = {
+ '95': 1.96,
+ '98': 2.326,
+ '99': 2.576,
+};
+
+/**
+ * Frequency duration in milliseconds — used for rate limiting notifications
+ */
+export const ALERT_FREQUENCY_MS: Record = {
+ hour: 60 * 60 * 1000,
+ day: 24 * 60 * 60 * 1000,
+ week: 7 * 24 * 60 * 60 * 1000,
+ month: 30 * 24 * 60 * 60 * 1000,
+};
diff --git a/packages/db/code-migrations/10-backfill-profile-event-summary.ts b/packages/db/code-migrations/10-backfill-profile-event-summary.ts
new file mode 100644
index 000000000..d5869cb2e
--- /dev/null
+++ b/packages/db/code-migrations/10-backfill-profile-event-summary.ts
@@ -0,0 +1,170 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import {
+ chMigrationClient,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+/**
+ * Migration 10: Backfill profile_event_summary_mv with historical data
+ *
+ * Context: profile_event_summary_mv was created with populate: false to avoid OOM
+ * This means it only captures NEW events, missing all historical data.
+ * This migration backfills the MV with historical events for cohort queries.
+ *
+ * Hard backstop: January 1, 2025 (won't backfill data before this date)
+ *
+ * Steps:
+ * 1. Check the date range of events in the events table
+ * 2. Backfill historical data day-by-day in batches to avoid OOM
+ * 3. Generate SQL file for review before execution
+ */
+export async function up() {
+ const isClustered = getIsCluster();
+ const sqls: string[] = [];
+
+ // Hard backstop date - don't backfill before this
+ const BACKSTOP_DATE = new Date('2025-01-01');
+
+ // Step 1: Check the actual date range in the events table
+ console.log('🔍 Checking date range in events table...');
+
+ const checkDataQuery = await chMigrationClient.query({
+ query: `
+ SELECT
+ min(toDate(created_at)) as min_date,
+ max(toDate(created_at)) as max_date,
+ count() as total_events,
+ uniq(profile_id) as total_profiles
+ FROM events
+ WHERE profile_id != device_id
+ AND toDate(created_at) >= '2025-01-01'
+ `,
+ format: 'JSONEachRow',
+ });
+
+ const dataRange = await checkDataQuery.json<{
+ min_date: string;
+ max_date: string;
+ total_events: string;
+ total_profiles: string;
+ }>();
+
+ if (dataRange[0]?.min_date && dataRange[0]?.max_date) {
+ let startDate = new Date(dataRange[0].min_date);
+ const endDate = new Date(dataRange[0].max_date);
+
+ // Enforce backstop date
+ if (startDate < BACKSTOP_DATE) {
+ console.log(`⚠️ Data exists before backstop date, limiting to ${BACKSTOP_DATE.toISOString().split('T')[0]}`);
+ startDate = BACKSTOP_DATE;
+ }
+
+ const totalEvents = Number(dataRange[0].total_events);
+ const totalProfiles = Number(dataRange[0].total_profiles);
+ const daysDiff = Math.ceil((endDate.getTime() - startDate.getTime()) / (1000 * 60 * 60 * 24)) + 1;
+
+ console.log('========================================');
+ console.log('📊 Backfill Plan for profile_event_summary_mv:');
+ console.log(` Backstop Date: ${BACKSTOP_DATE.toISOString().split('T')[0]} (hard limit)`);
+ console.log(` Start Date: ${startDate.toISOString().split('T')[0]}`);
+ console.log(` End Date: ${endDate.toISOString().split('T')[0]} (inclusive)`);
+ console.log(` Days: ${daysDiff} days`);
+ console.log(` Events: ${totalEvents.toLocaleString()} total events`);
+ console.log(` Profiles: ${totalProfiles.toLocaleString()} unique profiles`);
+ console.log('========================================');
+ console.log('');
+
+ // Step 2: Generate day-by-day INSERT statements
+ // Process from most recent to oldest to prioritize recent data for cohorts
+ const targetTable = isClustered ? 'profile_event_summary_mv_replicated' : 'profile_event_summary_mv';
+ const backfillSqls: string[] = [];
+
+ let currentDate = new Date(endDate); // Start from endDate (most recent)
+
+ while (currentDate >= startDate) {
+ const dateStr = currentDate.toISOString().split('T')[0];
+
+ const sql = `INSERT INTO ${targetTable}
+ SELECT
+ project_id,
+ profile_id,
+ name,
+ toStartOfDay(created_at) AS event_date,
+ countState() AS event_count,
+ minState(created_at) AS first_event_time,
+ maxState(created_at) AS last_event_time,
+ sumState(duration) AS total_duration
+ FROM events
+ WHERE toDate(created_at) = '${dateStr}'
+ AND profile_id != device_id
+ GROUP BY project_id, profile_id, name, event_date`;
+
+ backfillSqls.push(sql);
+ currentDate.setDate(currentDate.getDate() - 1);
+ }
+
+ sqls.push(...backfillSqls);
+
+ console.log(`📝 Generated ${sqls.length} daily INSERT statements`);
+ } else {
+ console.log('⚠️ No data found in events table since 2025-01-01, skipping backfill');
+ }
+
+ // Step 3: Write SQL to file for review
+ const sqlFilePath = path.join(__filename.replace('.ts', '.sql'));
+ fs.writeFileSync(
+ sqlFilePath,
+ sqls
+ .map((sql) =>
+ sql
+ .trim()
+ .replace(/;$/, '')
+ .replace(/\n{2,}/g, '\n')
+ .concat(';'),
+ )
+ .join('\n\n---\n\n'),
+ );
+
+ console.log(`📄 SQL written to: ${sqlFilePath}`);
+ console.log('');
+
+ // Step 4: Execute if not in dry-run mode
+ if (!process.argv.includes('--dry')) {
+ console.log('🚀 Starting backfill execution...');
+ console.log('⏱️ This may take a while depending on data volume');
+ console.log('');
+
+ let completed = 0;
+ const total = sqls.length;
+
+ for (const sql of sqls) {
+ await runClickhouseMigrationCommands([sql]);
+ completed++;
+
+ // Show progress every 10%
+ if (completed % Math.ceil(total / 10) === 0 || completed === total) {
+ const percentage = Math.round((completed / total) * 100);
+ console.log(` Progress: ${completed}/${total} days (${percentage}%)`);
+ }
+ }
+
+ console.log('');
+ console.log('✅ Migration completed successfully!');
+ console.log('');
+ console.log('📋 Next steps:');
+ console.log(' 1. Verify data in profile_event_summary_mv');
+ console.log(' 2. Test cohort queries with historical data');
+ console.log(' 3. Check cohort member counts');
+ } else {
+ console.log('🔍 DRY RUN MODE: SQL generated but not executed');
+ console.log(' Review the SQL file and run without --dry to execute');
+ }
+}
+
+export async function down() {
+ console.log('⚠️ Down migration not supported for backfill operations');
+ console.log(' Data has been inserted into the MV and cannot be easily rolled back');
+ console.log(' If needed, drop and recreate the MV: profile_event_summary_mv');
+}
diff --git a/packages/db/code-migrations/11-backfill-profile-event-property-v2.ts b/packages/db/code-migrations/11-backfill-profile-event-property-v2.ts
new file mode 100644
index 000000000..2ccd87172
--- /dev/null
+++ b/packages/db/code-migrations/11-backfill-profile-event-property-v2.ts
@@ -0,0 +1,308 @@
+import {
+ chMigrationClient,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+/**
+ * Backfill profile_event_property_summary_mv in hourly batches
+ *
+ * Safe to run on existing table - AggregatingMergeTree will merge states correctly!
+ *
+ * Usage:
+ * # For clustered (production):
+ * npm run migrate -- 11 --start-date=2025-01-01 --end-date=2026-01-10 --table=profile_event_property_summary_mv_replicated
+ *
+ * # For self-hosted:
+ * npm run migrate -- 11 --start-date=2025-01-01 --end-date=2026-01-10 --table=profile_event_property_summary_mv
+ *
+ * # Optional flags:
+ * npm run migrate -- 11 ... --dry (dry run only)
+ * npm run migrate -- 11 ... --batch-hours=2 (larger batches)
+ * npm run migrate -- 11 ... --skip-existing (skip dates that have data)
+ */
+
+interface BackfillOptions {
+ startDate: string;
+ endDate: string;
+ targetTable: string;
+ isDryRun: boolean;
+ batchSizeHours: number;
+}
+
+function parseArgs(): BackfillOptions {
+ const args = process.argv;
+
+ const startDateArg = args.find(arg => arg.startsWith('--start-date='));
+ const endDateArg = args.find(arg => arg.startsWith('--end-date='));
+ const tableArg = args.find(arg => arg.startsWith('--table='));
+ const batchSizeArg = args.find(arg => arg.startsWith('--batch-hours='));
+
+ if (!startDateArg || !endDateArg || !tableArg) {
+ console.error('❌ Missing required arguments');
+ console.log('');
+ console.log('Usage:');
+ console.log(' npm run migrate -- 11 \\');
+ console.log(' --start-date=2025-01-01 \\');
+ console.log(' --end-date=2026-01-10 \\');
+ console.log(' --table=profile_event_property_summary_mv_v2_replicated');
+ console.log('');
+ console.log('Optional flags:');
+ console.log(' --batch-hours=1 (default: 1, process N hours at a time)');
+ console.log(' --dry (generate SQL only, don\'t execute)');
+ console.log('');
+ process.exit(1);
+ }
+
+ return {
+ startDate: startDateArg.split('=')[1]!,
+ endDate: endDateArg.split('=')[1]!,
+ targetTable: tableArg.split('=')[1]!,
+ isDryRun: args.includes('--dry'),
+ batchSizeHours: batchSizeArg ? parseInt(batchSizeArg.split('=')[1]!) : 1,
+ };
+}
+
+export async function up() {
+ const options = parseArgs();
+
+ console.log('🚀 Profile Event Property Summary Backfill');
+ console.log('='.repeat(60));
+ console.log(`📅 Date Range: ${options.startDate} to ${options.endDate}`);
+ console.log(`📦 Target Table: ${options.targetTable}`);
+ console.log(`⏱️ Batch Size: ${options.batchSizeHours} hour(s)`);
+ console.log(`🔧 Mode: ${options.isDryRun ? 'DRY RUN' : 'EXECUTION'}`);
+ console.log('='.repeat(60));
+ console.log('');
+
+ // Skip analysis (too memory intensive for large datasets)
+ console.log('⏩ Skipping analysis - will process all data in date range');
+ console.log('');
+
+ // Generate hourly batches
+ const batches = generateHourlyBatches(
+ options.startDate,
+ options.endDate,
+ options.batchSizeHours,
+ options.targetTable
+ );
+
+ console.log(`📦 Generated ${batches.length} batches`);
+ console.log(`⏱️ Est. Time: ${formatEstimatedTime(batches.length)} (at ~30s/batch)`);
+ console.log('');
+
+ if (options.isDryRun) {
+ console.log('🔍 DRY RUN - Sample batch SQL:');
+ console.log('─'.repeat(80));
+ console.log(batches[0]?.sql.trim() || 'No batches generated');
+ console.log('─'.repeat(80));
+ console.log('');
+ console.log(`💡 Total batches: ${batches.length}`);
+ console.log('💡 Remove --dry to execute');
+ return;
+ }
+
+ // Step 3: Execute batches
+ await executeBatches(batches, options.targetTable);
+}
+
+async function analyzeDataRange(startDate: string, endDate: string) {
+ console.log('🔍 Analyzing data...');
+
+ const query = `
+ SELECT
+ count() as total_events,
+ uniq(profile_id) as total_profiles,
+ sum(length(mapKeys(properties))) as total_properties
+ FROM events
+ PREWHERE toDate(created_at) >= '${startDate}'
+ AND toDate(created_at) <= '${endDate}'
+ AND profile_id != device_id
+ WHERE arrayExists(k -> k != '', mapKeys(properties))
+ AND arrayExists(v -> v != '', mapValues(properties))
+ `;
+
+ const result = await chMigrationClient.query({ query, format: 'JSONEachRow' });
+ const data = await result.json<{
+ total_events: string;
+ total_profiles: string;
+ total_properties: string;
+ }>();
+
+ const row = data[0];
+ if (!row || row.total_events === '0') {
+ return { hasData: false, totalEvents: 0, totalProfiles: 0, avgProperties: 0 };
+ }
+
+ const totalEvents = Number(row.total_events);
+ const totalProperties = Number(row.total_properties);
+
+ return {
+ hasData: true,
+ totalEvents,
+ totalProfiles: Number(row.total_profiles),
+ avgProperties: totalProperties / totalEvents,
+ };
+}
+
+function generateHourlyBatches(
+ startDate: string,
+ endDate: string,
+ batchSizeHours: number,
+ targetTable: string
+) {
+ const batches: Array<{ startTime: string; endTime: string; sql: string }> = [];
+
+ const start = new Date(startDate + 'T00:00:00Z');
+ const end = new Date(endDate + 'T23:59:59Z');
+ let current = new Date(start);
+
+ while (current < end) {
+ const batchEnd = new Date(current);
+ batchEnd.setHours(current.getHours() + batchSizeHours);
+
+ // Ensure we don't go past the end date
+ if (batchEnd > end) {
+ batchEnd.setTime(end.getTime());
+ }
+
+ const startStr = current.toISOString().slice(0, 19).replace('T', ' ');
+ const endStr = batchEnd.toISOString().slice(0, 19).replace('T', ' ');
+
+ const sql = `
+INSERT INTO ${targetTable}
+SELECT
+ project_id,
+ profile_id,
+ name,
+ property_key,
+ property_value,
+ toStartOfDay(created_at) AS event_date,
+ countState() AS event_count,
+ minState(created_at) AS first_event_time,
+ maxState(created_at) AS last_event_time
+FROM events
+ARRAY JOIN
+ mapKeys(properties) AS property_key,
+ mapValues(properties) AS property_value
+PREWHERE
+ created_at >= toDateTime64('${startStr}', 3)
+ AND created_at < toDateTime64('${endStr}', 3)
+ AND profile_id != device_id
+ AND name IN (
+ 'userFlags',
+ 'appsflyerInstallWebhook',
+ '_firstSeen',
+ 'screenVisible',
+ 'trialScreenOpen',
+ 'trialStarted',
+ 'subscriptionPurchased',
+ 'trialConverted',
+ 'supportChatOpened',
+ 'showOpen',
+ 'reelOpen'
+ )
+WHERE
+ property_key != ''
+ AND property_value != ''
+ AND property_key IN (
+ 'subscription-v3-enabled',
+ 'af_adset_id',
+ 'screenName',
+ 'starter-shows',
+ 'reels-entry-point-experiment',
+ 'playlist'
+ )
+GROUP BY project_id, profile_id, name, property_key, property_value, event_date
+SETTINGS
+ max_memory_usage = 10000000000,
+ max_execution_time = 1800,
+ max_threads = 8`;
+
+ batches.push({ startTime: startStr, endTime: endStr, sql });
+
+ // Advance to next batch start
+ current = new Date(batchEnd);
+
+ // Break if we've reached or passed the end (prevent infinite loop)
+ if (current >= end) {
+ break;
+ }
+ }
+
+ return batches;
+}
+
+async function executeBatches(
+ batches: Array<{ startTime: string; endTime: string; sql: string }>,
+ targetTable: string
+) {
+ console.log('🚀 Starting execution...');
+ console.log('💡 AggregatingMergeTree will merge any duplicate data');
+ console.log('');
+
+ let completed = 0;
+ const total = batches.length;
+ const startTime = Date.now();
+
+ for (const batch of batches) {
+ try {
+ // Execute INSERT (no skip - reprocess all data, AggregatingMergeTree will merge)
+ const execStart = Date.now();
+ await runClickhouseMigrationCommands([batch.sql]);
+ const execTime = Date.now() - execStart;
+
+ completed++;
+
+ if (completed % 50 === 0 || execTime > 60000 || completed === total) {
+ logProgress(completed, total, startTime, `${batch.startTime} to ${batch.endTime}`, execTime);
+ }
+
+ } catch (error: any) {
+ console.error(`\n❌ Error: ${batch.startTime}`, error.message);
+ throw error;
+ }
+ }
+
+ const totalTime = Date.now() - startTime;
+
+ console.log('');
+ console.log('✅ Backfill complete!');
+ console.log(` Time: ${formatTime(Math.round(totalTime / 1000))}`);
+ console.log(` Processed: ${completed}/${total} batches`);
+ console.log(` Avg/batch: ${Math.round(totalTime / completed / 1000)}s`);
+ console.log('');
+}
+
+function logProgress(
+ completed: number,
+ total: number,
+ startTime: number,
+ label?: string,
+ execTime?: number
+) {
+ const pct = Math.round((completed / total) * 100);
+ const elapsed = (Date.now() - startTime) / 1000;
+ const remaining = ((total - completed) / completed) * elapsed;
+
+ let msg = ` [${pct}%] ${completed}/${total} | ETA: ${formatTime(Math.round(remaining))}`;
+ if (label && execTime) msg += `\n Last: ${label} (${Math.round(execTime / 1000)}s)`;
+
+ console.log(msg);
+}
+
+function formatTime(sec: number): string {
+ if (sec < 60) return `${sec}s`;
+ const min = Math.floor(sec / 60);
+ if (min < 60) return `${min}m`;
+ const hrs = Math.floor(min / 60);
+ return `${hrs}h ${min % 60}m`;
+}
+
+function formatEstimatedTime(batches: number): string {
+ return formatTime(batches * 30);
+}
+
+export async function down() {
+ console.log('⚠️ No down migration - backfill is data only');
+}
diff --git a/packages/db/code-migrations/12-backfill-profile-event-summary-v2.ts b/packages/db/code-migrations/12-backfill-profile-event-summary-v2.ts
new file mode 100644
index 000000000..fbf2b1a47
--- /dev/null
+++ b/packages/db/code-migrations/12-backfill-profile-event-summary-v2.ts
@@ -0,0 +1,217 @@
+import {
+ chMigrationClient,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+/**
+ * Backfill profile_event_summary_mv_v2 in daily batches
+ *
+ * This MV is for simple event-based cohorts WITHOUT property filters
+ * Much simpler than profile_event_property_summary_mv - no ARRAY JOIN
+ *
+ * Usage:
+ * # For clustered (production):
+ * npm run migrate -- 12 --start-date=2025-01-01 --end-date=2026-01-10 --table=profile_event_summary_mv_v2_replicated
+ *
+ * # For self-hosted:
+ * npm run migrate -- 12 --start-date=2025-01-01 --end-date=2026-01-10 --table=profile_event_summary_mv_v2
+ *
+ * # Optional flags:
+ * npm run migrate -- 12 ... --dry (dry run only)
+ */
+
+interface BackfillOptions {
+ startDate: string;
+ endDate: string;
+ targetTable: string;
+ isDryRun: boolean;
+}
+
+function parseArgs(): BackfillOptions {
+ const args = process.argv;
+
+ const startDateArg = args.find(arg => arg.startsWith('--start-date='));
+ const endDateArg = args.find(arg => arg.startsWith('--end-date='));
+ const tableArg = args.find(arg => arg.startsWith('--table='));
+
+ if (!startDateArg || !endDateArg || !tableArg) {
+ console.error('❌ Missing required arguments');
+ console.log('');
+ console.log('Usage:');
+ console.log(' npm run migrate -- 12 \\');
+ console.log(' --start-date=2025-01-01 \\');
+ console.log(' --end-date=2026-01-10 \\');
+ console.log(' --table=profile_event_summary_mv_v2');
+ console.log('');
+ console.log('Optional flags:');
+ console.log(' --dry (generate SQL only, don\'t execute)');
+ console.log('');
+ process.exit(1);
+ }
+
+ return {
+ startDate: startDateArg.split('=')[1]!,
+ endDate: endDateArg.split('=')[1]!,
+ targetTable: tableArg.split('=')[1]!,
+ isDryRun: args.includes('--dry'),
+ };
+}
+
+export async function up() {
+ const options = parseArgs();
+
+ console.log('🚀 Profile Event Summary Backfill (Daily Batches)');
+ console.log('='.repeat(60));
+ console.log(`📅 Date Range: ${options.startDate} to ${options.endDate}`);
+ console.log(`📦 Target Table: ${options.targetTable}`);
+ console.log(`🔧 Mode: ${options.isDryRun ? 'DRY RUN' : 'EXECUTION'}`);
+ console.log('='.repeat(60));
+ console.log('');
+
+ // Skip analysis
+ console.log('⏩ Skipping analysis - will process all events in date range');
+ console.log('');
+
+ // Generate daily batches
+ const batches = generateDailyBatches(
+ options.startDate,
+ options.endDate,
+ options.targetTable
+ );
+
+ console.log(`📦 Generated ${batches.length} daily batches`);
+ console.log(`⏱️ Est. Time: ${formatEstimatedTime(batches.length)} (at ~2min/day)`);
+ console.log('');
+
+ if (options.isDryRun) {
+ console.log('🔍 DRY RUN - Sample batch SQL:');
+ console.log('─'.repeat(80));
+ console.log(batches[0]?.sql.trim() || 'No batches generated');
+ console.log('─'.repeat(80));
+ console.log('');
+ console.log(`💡 Total batches: ${batches.length}`);
+ console.log('💡 Remove --dry to execute');
+ return;
+ }
+
+ // Execute batches
+ await executeBatches(batches);
+}
+
+function generateDailyBatches(
+ startDate: string,
+ endDate: string,
+ targetTable: string
+) {
+ const batches: Array<{ date: string; sql: string }> = [];
+
+ const start = new Date(startDate);
+ const end = new Date(endDate);
+ let current = new Date(start);
+
+ while (current <= end) {
+ const dateStr = current.toISOString().split('T')[0];
+
+ const sql = `
+INSERT INTO ${targetTable}
+SELECT
+ project_id,
+ profile_id,
+ name,
+ toStartOfDay(created_at) AS event_date,
+ countState() AS event_count,
+ minState(created_at) AS first_event_time,
+ maxState(created_at) AS last_event_time,
+ sumState(duration) AS total_duration
+FROM events
+PREWHERE
+ toDate(created_at) = '${dateStr}'
+ AND profile_id != device_id
+GROUP BY project_id, profile_id, name, event_date
+SETTINGS
+ max_memory_usage = 30000000000,
+ max_execution_time = 7200,
+ max_threads = 16`;
+
+ batches.push({ date: dateStr, sql });
+
+ // Move to next day
+ current.setDate(current.getDate() + 1);
+ }
+
+ return batches;
+}
+
+async function executeBatches(
+ batches: Array<{ date: string; sql: string }>
+) {
+ console.log('🚀 Starting execution...');
+ console.log('💡 AggregatingMergeTree will merge any duplicate data');
+ console.log('');
+
+ let completed = 0;
+ const total = batches.length;
+ const startTime = Date.now();
+
+ for (const batch of batches) {
+ try {
+ // Execute INSERT
+ const execStart = Date.now();
+ await runClickhouseMigrationCommands([batch.sql]);
+ const execTime = Date.now() - execStart;
+
+ completed++;
+
+ // Show progress every 10 batches or if query took > 2 minutes
+ if (completed % 10 === 0 || execTime > 120000 || completed === total) {
+ logProgress(completed, total, startTime, batch.date, execTime);
+ }
+
+ } catch (error: any) {
+ console.error(`\n❌ Error processing ${batch.date}:`, error.message);
+ throw error;
+ }
+ }
+
+ const totalTime = Date.now() - startTime;
+
+ console.log('');
+ console.log('✅ Backfill complete!');
+ console.log(` Time: ${formatTime(Math.round(totalTime / 1000))}`);
+ console.log(` Processed: ${completed}/${total} days`);
+ console.log(` Avg/day: ${Math.round(totalTime / completed / 1000)}s`);
+ console.log('');
+}
+
+function logProgress(
+ completed: number,
+ total: number,
+ startTime: number,
+ date: string,
+ execTime: number
+) {
+ const pct = Math.round((completed / total) * 100);
+ const elapsed = (Date.now() - startTime) / 1000;
+ const remaining = ((total - completed) / completed) * elapsed;
+
+ const msg = ` [${pct}%] ${completed}/${total} | ETA: ${formatTime(Math.round(remaining))}\n Last: ${date} (${Math.round(execTime / 1000)}s)`;
+ console.log(msg);
+}
+
+function formatTime(sec: number): string {
+ if (sec < 60) return `${sec}s`;
+ const min = Math.floor(sec / 60);
+ if (min < 60) return `${min}m`;
+ const hrs = Math.floor(min / 60);
+ return `${hrs}h ${min % 60}m`;
+}
+
+function formatEstimatedTime(days: number): string {
+ // Assume ~2 minutes per day on average
+ return formatTime(days * 120);
+}
+
+export async function down() {
+ console.log('⚠️ No down migration - backfill is data only');
+}
diff --git a/packages/db/code-migrations/13-deduplicate-events.ts b/packages/db/code-migrations/13-deduplicate-events.ts
new file mode 100644
index 000000000..4222ea8f7
--- /dev/null
+++ b/packages/db/code-migrations/13-deduplicate-events.ts
@@ -0,0 +1,211 @@
+import {
+ chMigrationClient,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+/**
+ * IST Data Loss Validation & Recovery
+ *
+ * Validates how much data was lost due to IST filter in moveImportsToProduction,
+ * and optionally recovers the lost events from events_imports_v2 into events_tmp.
+ *
+ * Lost events = those where toDate(created_at) != toDate(addHours(created_at, 5.5))
+ * i.e., events between 18:30-23:59 UTC (00:00-05:30 IST next day)
+ *
+ * Usage:
+ * Validate only (dry run):
+ * pnpm migrate:deploy:code -- 13 --cluster --dry --date=2026-01-05
+ *
+ * Validate and recover:
+ * pnpm migrate:deploy:code -- 13 --cluster --date=2026-01-05 --no-record
+ */
+
+const IMPORTS_TABLE = 'events_imports_v2';
+const TMP_TABLE = 'events_tmp';
+
+const DEDUP_KEY =
+ 'project_id, name, device_id, profile_id, toStartOfSecond(created_at), path, properties';
+
+const COLUMNS = `id, name, sdk_name, sdk_version, device_id, profile_id, project_id,
+ session_id, path, origin, referrer, referrer_name, referrer_type,
+ duration, created_at, country, city, region, longitude, latitude,
+ os, os_version, browser, browser_version, device, brand, model, imported_at`;
+
+function parseArgs() {
+ const args = process.argv;
+ const dateArg = args.find((a: string) => a.startsWith('--date='));
+
+ if (!dateArg) {
+ console.error('Missing required --date=YYYY-MM-DD argument');
+ console.error(' Example: pnpm migrate:deploy:code -- 13 --cluster --date=2026-01-05');
+ process.exit(1);
+ }
+
+ const date = dateArg!.split('=')[1]!;
+
+ return {
+ date,
+ isCluster: getIsCluster(),
+ isDry: args.includes('--dry'),
+ };
+}
+
+export async function up() {
+ const { date, isCluster, isDry } = parseArgs();
+
+ console.log('='.repeat(60));
+ console.log(' IST DATA LOSS VALIDATION');
+ console.log(` Date: ${date}`);
+ console.log(` Mode: ${isDry ? 'DRY RUN (validate only)' : 'EXECUTE (validate + recover)'}`);
+ console.log('='.repeat(60));
+
+ // Step 0: Check events_tmp current state for this date
+ console.log(`\n[Step 0] Current events_tmp counts for UTC ${date}:`);
+ const tmpResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ name,
+ count() as total,
+ uniq(${DEDUP_KEY}) as unique_events
+ FROM ${TMP_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ GROUP BY name
+ ORDER BY total DESC`,
+ format: 'JSONEachRow',
+ });
+ const tmpData = await tmpResult.json<{ name: string; total: string; unique_events: string }>();
+ const tmpMap = new Map(tmpData.map((r) => [r.name, { total: Number(r.total), unique: Number(r.unique_events) }]));
+
+ console.log(`\n ${'Event'.padEnd(25)} ${'Total'.padStart(10)} ${'Unique'.padStart(10)}`);
+ console.log(' ' + '-'.repeat(47));
+ for (const row of tmpData) {
+ console.log(
+ ` ${row.name.padEnd(25)} ${Number(row.total).toLocaleString().padStart(10)} ${Number(row.unique_events).toLocaleString().padStart(10)}`,
+ );
+ }
+
+ // Step 1: Check events_imports_v2 full counts for this date (source of truth)
+ console.log(`\n[Step 1] events_imports_v2 vs events_tmp for UTC ${date}:`);
+ const importsResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ name,
+ count() as total,
+ uniq(${DEDUP_KEY}) as unique_events
+ FROM ${IMPORTS_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ GROUP BY name
+ ORDER BY total DESC`,
+ format: 'JSONEachRow',
+ });
+ const importsData = await importsResult.json<{ name: string; total: string; unique_events: string }>();
+
+ console.log(`\n ${'Event'.padEnd(25)} ${'imp_total'.padStart(10)} ${'imp_uniq'.padStart(10)} ${'tmp_total'.padStart(10)} ${'tmp_uniq'.padStart(10)} ${'Missing'.padStart(10)}`);
+ console.log(' ' + '-'.repeat(77));
+ for (const row of importsData) {
+ const impTotal = Number(row.total);
+ const impUniq = Number(row.unique_events);
+ const tmp = tmpMap.get(row.name) ?? { total: 0, unique: 0 };
+ const missingUniq = impUniq - tmp.unique;
+ console.log(
+ ` ${row.name.padEnd(25)} ${impTotal.toLocaleString().padStart(10)} ${impUniq.toLocaleString().padStart(10)} ${tmp.total.toLocaleString().padStart(10)} ${tmp.unique.toLocaleString().padStart(10)} ${missingUniq.toLocaleString().padStart(10)}`,
+ );
+ }
+
+ // Step 2: Check lost events (18:30-23:59 UTC = where UTC date != IST date)
+ console.log(`\n[Step 2] Lost events (18:30-23:59 UTC, not moved to events_tmp):`);
+ const lostResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ name,
+ count() as lost_total
+ FROM ${IMPORTS_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ AND toDate(created_at) != toDate(addHours(created_at, 5.5))
+ GROUP BY name
+ ORDER BY lost_total DESC`,
+ format: 'JSONEachRow',
+ });
+ const lostData = await lostResult.json<{ name: string; lost_total: string }>();
+
+ let totalLost = 0;
+ console.log(`\n ${'Event'.padEnd(25)} ${'Lost'.padStart(10)}`);
+ console.log(' ' + '-'.repeat(37));
+ for (const row of lostData) {
+ const lost = Number(row.lost_total);
+ totalLost += lost;
+ console.log(
+ ` ${row.name.padEnd(25)} ${lost.toLocaleString().padStart(10)}`,
+ );
+ }
+ console.log(' ' + '-'.repeat(37));
+ console.log(` ${'TOTAL'.padEnd(25)} ${totalLost.toLocaleString().padStart(10)}`);
+
+ if (isDry) {
+ console.log('\n[DRY RUN] Recovery SQL that would execute:');
+ console.log(`
+ INSERT INTO ${TMP_TABLE} (${COLUMNS})
+ SELECT ${COLUMNS}
+ FROM ${IMPORTS_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ AND toDate(created_at) != toDate(addHours(created_at, 5.5))
+ SETTINGS max_memory_usage = 40000000000, max_execution_time = 18000;`);
+ console.log('\n Run without --dry to execute recovery.');
+ return;
+ }
+
+ // Step 3: Recover lost events
+ if (totalLost === 0) {
+ console.log('\n No lost events to recover!');
+ return;
+ }
+
+ console.log(`\n[Step 3] Recovering ${totalLost.toLocaleString()} lost events into ${TMP_TABLE}...`);
+ await runClickhouseMigrationCommands([
+ `INSERT INTO ${TMP_TABLE} (${COLUMNS})
+ SELECT ${COLUMNS}
+ FROM ${IMPORTS_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ AND toDate(created_at) != toDate(addHours(created_at, 5.5))
+ SETTINGS
+ max_memory_usage = 40000000000,
+ max_execution_time = 18000`,
+ ]);
+
+ // Step 4: Verify recovery
+ console.log(`\n[Step 4] Verifying recovery - events_tmp counts after:`);
+ const afterResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ name,
+ count() as total,
+ uniq(${DEDUP_KEY}) as unique_events
+ FROM ${TMP_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ GROUP BY name
+ ORDER BY total DESC`,
+ format: 'JSONEachRow',
+ });
+ const afterData = await afterResult.json<{ name: string; total: string; unique_events: string }>();
+
+ console.log(`\n ${'Event'.padEnd(25)} ${'Before'.padStart(10)} ${'After'.padStart(10)} ${'Recovered'.padStart(10)} ${'Unique'.padStart(10)}`);
+ console.log(' ' + '-'.repeat(67));
+ for (const row of afterData) {
+ const after = Number(row.total);
+ const unique = Number(row.unique_events);
+ const before = (tmpMap.get(row.name) ?? { total: 0 }).total;
+ const recovered = after - before;
+ console.log(
+ ` ${row.name.padEnd(25)} ${before.toLocaleString().padStart(10)} ${after.toLocaleString().padStart(10)} ${recovered.toLocaleString().padStart(10)} ${unique.toLocaleString().padStart(10)}`,
+ );
+ }
+
+ console.log('\n' + '='.repeat(60));
+ console.log(' RECOVERY COMPLETE');
+ console.log('='.repeat(60));
+}
+
+export async function down() {
+ console.log('No down migration');
+}
diff --git a/packages/db/code-migrations/14-move-events-to-tmp.ts b/packages/db/code-migrations/14-move-events-to-tmp.ts
new file mode 100644
index 000000000..135660cd9
--- /dev/null
+++ b/packages/db/code-migrations/14-move-events-to-tmp.ts
@@ -0,0 +1,126 @@
+import {
+ chMigrationClient,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+/**
+ * Move events to events_tmp2 for a given date
+ *
+ * Copies data as-is from events to events_tmp2 (no dedup).
+ * Used to bring Jan 16-31 data into events_tmp2.
+ *
+ * Usage:
+ * Dry run:
+ * pnpm migrate:deploy:code -- 14 --cluster --dry --date=2026-01-16
+ *
+ * Execute:
+ * pnpm migrate:deploy:code -- 14 --cluster --date=2026-01-16 --no-record
+ */
+
+const TMP_TABLE = 'events_tmp2';
+const EVENTS_TABLE = 'events';
+
+function parseArgs() {
+ const args = process.argv;
+ const dateArg = args.find((a: string) => a.startsWith('--date='));
+
+ if (!dateArg) {
+ console.error('Missing required --date=YYYY-MM-DD argument');
+ console.error(' Example: pnpm migrate:deploy:code -- 14 --cluster --date=2026-01-16');
+ process.exit(1);
+ }
+
+ const date = dateArg!.split('=')[1]!;
+
+ return {
+ date,
+ isCluster: getIsCluster(),
+ isDry: args.includes('--dry'),
+ };
+}
+
+export async function up() {
+ const { date, isDry } = parseArgs();
+
+ console.log('='.repeat(60));
+ console.log(' MOVE EVENTS TO EVENTS_TMP2');
+ console.log(` Date: ${date}`);
+ console.log(` Mode: ${isDry ? 'DRY RUN' : 'EXECUTE'}`);
+ console.log('='.repeat(60));
+
+ // Step 0: Check source counts
+ console.log(`\n[Step 0] events count for ${date}:`);
+ const srcResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ name,
+ count() as total
+ FROM ${EVENTS_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ GROUP BY name
+ ORDER BY total DESC`,
+ format: 'JSONEachRow',
+ });
+ const srcData = await srcResult.json<{ name: string; total: string }>();
+
+ let srcTotal = 0;
+ console.log(`\n ${'Event'.padEnd(25)} ${'Total'.padStart(10)}`);
+ console.log(' ' + '-'.repeat(37));
+ for (const row of srcData) {
+ const total = Number(row.total);
+ srcTotal += total;
+ console.log(` ${row.name.padEnd(25)} ${total.toLocaleString().padStart(10)}`);
+ }
+ console.log(' ' + '-'.repeat(37));
+ console.log(` ${'TOTAL'.padEnd(25)} ${srcTotal.toLocaleString().padStart(10)}`);
+
+ if (srcTotal === 0) {
+ console.log('\n No events found for this date!');
+ return;
+ }
+
+ if (isDry) {
+ console.log('\n[DRY RUN] SQL that would execute:');
+ console.log(`
+ INSERT INTO ${TMP_TABLE}
+ SELECT * FROM ${EVENTS_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ SETTINGS max_memory_usage = 40000000000, max_execution_time = 18000;`);
+ return;
+ }
+
+ // Step 1: Copy data
+ console.log(`\n[Step 1] Copying ${srcTotal.toLocaleString()} events into ${TMP_TABLE}...`);
+ await runClickhouseMigrationCommands([
+ `INSERT INTO ${TMP_TABLE}
+ SELECT * FROM ${EVENTS_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ SETTINGS
+ max_memory_usage = 40000000000,
+ max_execution_time = 18000`,
+ ]);
+
+ // Step 2: Verify
+ console.log(`\n[Step 2] Verifying events_tmp2 count for ${date}:`);
+ const dstResult = await chMigrationClient.query({
+ query: `
+ SELECT count() as total
+ FROM ${TMP_TABLE}
+ WHERE toDate(created_at) = '${date}'`,
+ format: 'JSONEachRow',
+ });
+ const dstData = await dstResult.json<{ total: string }>();
+ const dstTotal = Number(dstData[0]?.total ?? 0);
+
+ console.log(`\n events: ${srcTotal.toLocaleString()}`);
+ console.log(` events_tmp2: ${dstTotal.toLocaleString()}`);
+
+ console.log('\n' + '='.repeat(60));
+ console.log(' COPY COMPLETE');
+ console.log('='.repeat(60));
+}
+
+export async function down() {
+ console.log('No down migration');
+}
diff --git a/packages/db/code-migrations/15-validate-optimize-dedup.ts b/packages/db/code-migrations/15-validate-optimize-dedup.ts
new file mode 100644
index 000000000..6b019e896
--- /dev/null
+++ b/packages/db/code-migrations/15-validate-optimize-dedup.ts
@@ -0,0 +1,203 @@
+import {
+ chMigrationClient,
+} from '../src/clickhouse/migration';
+
+/**
+ * Validate OPTIMIZE TABLE DEDUPLICATE impact on events_tmp
+ *
+ * Shows duplicate counts per day and per date range (Jan 1-15 vs Jan 16-31)
+ * to prove that OPTIMIZE on partition 202601 won't affect Jan 16-31 data.
+ *
+ * Usage:
+ * Validate (default):
+ * pnpm migrate:deploy:code -- 15 --cluster --no-record
+ *
+ * Execute OPTIMIZE after validation:
+ * pnpm migrate:deploy:code -- 15 --cluster --no-record --execute
+ */
+
+const TMP_TABLE = 'events_tmp';
+
+const DEDUP_KEY =
+ 'project_id, name, device_id, profile_id, created_at, path';
+
+function parseArgs() {
+ const args = process.argv;
+ return {
+ shouldExecute: args.includes('--execute'),
+ };
+}
+
+export async function up() {
+ const { shouldExecute } = parseArgs();
+
+ console.log('='.repeat(60));
+ console.log(' OPTIMIZE DEDUPLICATE VALIDATION');
+ console.log(` Table: ${TMP_TABLE}`);
+ console.log(` Mode: ${shouldExecute ? 'VALIDATE + EXECUTE' : 'VALIDATE ONLY'}`);
+ console.log('='.repeat(60));
+
+ // Step 1: Per-day breakdown — total vs unique
+ console.log(`\n[Step 1] Per-day duplicate analysis for January 2026:`);
+ const perDayResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ toDate(created_at) as date,
+ count() as total,
+ uniqExact(${DEDUP_KEY}) as unique_events,
+ count() - uniqExact(${DEDUP_KEY}) as duplicates
+ FROM ${TMP_TABLE}
+ WHERE toYYYYMM(created_at) = 202601
+ GROUP BY date
+ ORDER BY date ASC`,
+ format: 'JSONEachRow',
+ });
+ const perDayData = await perDayResult.json<{
+ date: string;
+ total: string;
+ unique_events: string;
+ duplicates: string;
+ }>();
+
+ console.log(
+ `\n ${'Date'.padEnd(14)} ${'Total'.padStart(12)} ${'Unique'.padStart(12)} ${'Duplicates'.padStart(12)} ${'Dup %'.padStart(8)}`,
+ );
+ console.log(' ' + '-'.repeat(60));
+
+ let grandTotal = 0;
+ let grandUnique = 0;
+ let grandDuplicates = 0;
+ for (const row of perDayData) {
+ const total = Number(row.total);
+ const unique = Number(row.unique_events);
+ const dups = Number(row.duplicates);
+ const dupPct = total > 0 ? ((dups / total) * 100).toFixed(1) : '0.0';
+ grandTotal += total;
+ grandUnique += unique;
+ grandDuplicates += dups;
+ console.log(
+ ` ${row.date.padEnd(14)} ${total.toLocaleString().padStart(12)} ${unique.toLocaleString().padStart(12)} ${dups.toLocaleString().padStart(12)} ${(dupPct + '%').padStart(8)}`,
+ );
+ }
+ console.log(' ' + '-'.repeat(60));
+ const grandDupPct =
+ grandTotal > 0
+ ? ((grandDuplicates / grandTotal) * 100).toFixed(1)
+ : '0.0';
+ console.log(
+ ` ${'TOTAL'.padEnd(14)} ${grandTotal.toLocaleString().padStart(12)} ${grandUnique.toLocaleString().padStart(12)} ${grandDuplicates.toLocaleString().padStart(12)} ${(grandDupPct + '%').padStart(8)}`,
+ );
+
+ // Step 2: Date range summary — Jan 1-15 vs Jan 16-31
+ console.log(`\n[Step 2] Date range summary (Jan 1-15 vs Jan 16-31):`);
+ const rangeResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ if(toDate(created_at) <= '2026-01-15', 'Jan 01-15', 'Jan 16-31') as date_range,
+ count() as total,
+ uniqExact(${DEDUP_KEY}) as unique_events,
+ count() - uniqExact(${DEDUP_KEY}) as duplicates
+ FROM ${TMP_TABLE}
+ WHERE toYYYYMM(created_at) = 202601
+ GROUP BY date_range
+ ORDER BY date_range ASC`,
+ format: 'JSONEachRow',
+ });
+ const rangeData = await rangeResult.json<{
+ date_range: string;
+ total: string;
+ unique_events: string;
+ duplicates: string;
+ }>();
+
+ console.log(
+ `\n ${'Range'.padEnd(14)} ${'Total'.padStart(12)} ${'Unique'.padStart(12)} ${'Duplicates'.padStart(12)} ${'Dup %'.padStart(8)}`,
+ );
+ console.log(' ' + '-'.repeat(60));
+ for (const row of rangeData) {
+ const total = Number(row.total);
+ const unique = Number(row.unique_events);
+ const dups = Number(row.duplicates);
+ const dupPct = total > 0 ? ((dups / total) * 100).toFixed(1) : '0.0';
+ console.log(
+ ` ${row.date_range.padEnd(14)} ${total.toLocaleString().padStart(12)} ${unique.toLocaleString().padStart(12)} ${dups.toLocaleString().padStart(12)} ${(dupPct + '%').padStart(8)}`,
+ );
+ }
+
+ // Step 3: Safety check — verify Jan 16-31 has 0 duplicates
+ const jan16_31 = rangeData.find((r) => r.date_range === 'Jan 16-31');
+ const jan16_31_dups = jan16_31 ? Number(jan16_31.duplicates) : 0;
+
+ if (jan16_31_dups > 0) {
+ console.log(`\n ⚠ WARNING: Jan 16-31 has ${jan16_31_dups.toLocaleString()} duplicates!`);
+ console.log(' OPTIMIZE DEDUPLICATE on partition 202601 WILL affect Jan 16-31 data.');
+ console.log(' Review the per-day breakdown above before proceeding.');
+ } else {
+ console.log(`\n ✓ Jan 16-31 has 0 duplicates — OPTIMIZE will NOT affect it.`);
+ }
+
+ if (!shouldExecute) {
+ console.log('\n[VALIDATE ONLY] To execute OPTIMIZE, re-run with --execute flag:');
+ console.log(
+ ' pnpm migrate:deploy:code -- 15 --cluster --no-record --execute',
+ );
+ return;
+ }
+
+ // Step 4: Execute OPTIMIZE DEDUPLICATE
+ console.log(`\n[Step 3] Running OPTIMIZE TABLE DEDUPLICATE on partition 202601...`);
+ console.log(' This may take several minutes...');
+
+ await chMigrationClient.query({
+ query: `
+ OPTIMIZE TABLE ${TMP_TABLE}
+ PARTITION '202601'
+ FINAL
+ DEDUPLICATE BY ${DEDUP_KEY}`,
+ });
+
+ console.log(' OPTIMIZE complete!');
+
+ // Step 5: Verify after OPTIMIZE
+ console.log(`\n[Step 4] Post-OPTIMIZE verification:`);
+ const afterResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ if(toDate(created_at) <= '2026-01-15', 'Jan 01-15', 'Jan 16-31') as date_range,
+ count() as total,
+ uniqExact(${DEDUP_KEY}) as unique_events,
+ count() - uniqExact(${DEDUP_KEY}) as duplicates
+ FROM ${TMP_TABLE}
+ WHERE toYYYYMM(created_at) = 202601
+ GROUP BY date_range
+ ORDER BY date_range ASC`,
+ format: 'JSONEachRow',
+ });
+ const afterData = await afterResult.json<{
+ date_range: string;
+ total: string;
+ unique_events: string;
+ duplicates: string;
+ }>();
+
+ console.log(
+ `\n ${'Range'.padEnd(14)} ${'Total'.padStart(12)} ${'Unique'.padStart(12)} ${'Duplicates'.padStart(12)}`,
+ );
+ console.log(' ' + '-'.repeat(52));
+ for (const row of afterData) {
+ const total = Number(row.total);
+ const unique = Number(row.unique_events);
+ const dups = Number(row.duplicates);
+ console.log(
+ ` ${row.date_range.padEnd(14)} ${total.toLocaleString().padStart(12)} ${unique.toLocaleString().padStart(12)} ${dups.toLocaleString().padStart(12)}`,
+ );
+ }
+
+ console.log('\n' + '='.repeat(60));
+ console.log(' OPTIMIZE DEDUPLICATE COMPLETE');
+ console.log('='.repeat(60));
+}
+
+export async function down() {
+ console.log('No down migration');
+}
diff --git a/packages/db/code-migrations/16-dedup-tmp-to-tmp2.ts b/packages/db/code-migrations/16-dedup-tmp-to-tmp2.ts
new file mode 100644
index 000000000..3fa7efe1e
--- /dev/null
+++ b/packages/db/code-migrations/16-dedup-tmp-to-tmp2.ts
@@ -0,0 +1,132 @@
+import {
+ chMigrationClient,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+/**
+ * Move events from events_tmp to events_tmp2 with dedup
+ *
+ * Reads from events_tmp, deduplicates using LIMIT 1 BY, writes to events_tmp2.
+ * After all days are done, REPLACE PARTITION from events_tmp2 into events.
+ *
+ * Usage:
+ * Dry run:
+ * pnpm migrate:deploy:code -- 16 --cluster --dry --date=2026-01-01 --no-record
+ *
+ * Execute:
+ * pnpm migrate:deploy:code -- 16 --cluster --date=2026-01-01 --no-record
+ */
+
+const DEDUP_KEY =
+ 'project_id, name, device_id, profile_id, session_id, created_at, path, properties';
+
+const SRC_TABLE = 'events_tmp';
+const DST_TABLE = 'events_tmp2';
+
+function parseArgs() {
+ const args = process.argv;
+ const dateArg = args.find((a: string) => a.startsWith('--date='));
+
+ if (!dateArg) {
+ console.error('Missing required --date=YYYY-MM-DD argument');
+ console.error(' Example: pnpm migrate:deploy:code -- 16 --cluster --date=2026-01-01 --no-record');
+ process.exit(1);
+ }
+
+ const date = dateArg!.split('=')[1]!;
+
+ return {
+ date,
+ isCluster: getIsCluster(),
+ isDry: args.includes('--dry'),
+ };
+}
+
+export async function up() {
+ const { date, isDry } = parseArgs();
+
+ console.log('='.repeat(60));
+ console.log(' DEDUP: events_tmp → events_tmp2');
+ console.log(` Date: ${date}`);
+ console.log(` Mode: ${isDry ? 'DRY RUN' : 'EXECUTE'}`);
+ console.log('='.repeat(60));
+
+ // Step 0: Show current counts in events_tmp for this date
+ console.log(`\n[Step 0] events_tmp counts for ${date}:`);
+ const beforeResult = await chMigrationClient.query({
+ query: `
+ SELECT
+ count() as total,
+ uniq(${DEDUP_KEY}) as unique_events,
+ count() - uniq(${DEDUP_KEY}) as duplicates
+ FROM ${SRC_TABLE}
+ WHERE toDate(created_at) = '${date}'`,
+ format: 'JSONEachRow',
+ });
+ const beforeData = await beforeResult.json<{
+ total: string;
+ unique_events: string;
+ duplicates: string;
+ }>();
+
+ const srcTotal = Number(beforeData[0]?.total ?? 0);
+ const srcUnique = Number(beforeData[0]?.unique_events ?? 0);
+ const srcDups = Number(beforeData[0]?.duplicates ?? 0);
+
+ console.log(`\n Total: ${srcTotal.toLocaleString()}`);
+ console.log(` Unique: ${srcUnique.toLocaleString()}`);
+ console.log(` Duplicates: ${srcDups.toLocaleString()}`);
+
+ if (srcTotal === 0) {
+ console.log('\n No events found for this date!');
+ return;
+ }
+
+ if (isDry) {
+ console.log('\n[DRY RUN] SQL that would execute:');
+ console.log(`
+ INSERT INTO ${DST_TABLE}
+ SELECT * FROM ${SRC_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ LIMIT 1 BY ${DEDUP_KEY}
+ SETTINGS max_memory_usage = 40000000000, max_execution_time = 18000;`);
+ return;
+ }
+
+ // Step 1: Insert deduplicated data
+ console.log(`\n[Step 1] Inserting deduped ${date} into ${DST_TABLE}...`);
+ await runClickhouseMigrationCommands([
+ `INSERT INTO ${DST_TABLE}
+ SELECT * FROM ${SRC_TABLE}
+ WHERE toDate(created_at) = '${date}'
+ LIMIT 1 BY ${DEDUP_KEY}
+ SETTINGS
+ max_memory_usage = 40000000000,
+ max_execution_time = 18000`,
+ ]);
+
+ // Step 2: Verify counts
+ console.log(`\n[Step 2] Verifying ${DST_TABLE} count for ${date}:`);
+ const afterResult = await chMigrationClient.query({
+ query: `
+ SELECT count() as total
+ FROM ${DST_TABLE}
+ WHERE toDate(created_at) = '${date}'`,
+ format: 'JSONEachRow',
+ });
+ const afterData = await afterResult.json<{ total: string }>();
+ const dstTotal = Number(afterData[0]?.total ?? 0);
+
+ console.log(`\n events_tmp: ${srcTotal.toLocaleString()}`);
+ console.log(` events_tmp2: ${dstTotal.toLocaleString()}`);
+ console.log(` Removed: ${(srcTotal - dstTotal).toLocaleString()}`);
+
+ console.log('\n' + '='.repeat(60));
+ console.log(' DEDUP COMPLETE');
+ console.log('='.repeat(60));
+}
+
+export async function down() {
+ console.log('No down migration');
+}
diff --git a/packages/db/code-migrations/4-cohorts.ts b/packages/db/code-migrations/4-cohorts.ts
new file mode 100644
index 000000000..9c09d650f
--- /dev/null
+++ b/packages/db/code-migrations/4-cohorts.ts
@@ -0,0 +1,129 @@
+import {
+ createMaterializedView,
+ createTable,
+ dropTable,
+ getExistingTables,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+export async function up() {
+ const replicatedVersion = '1';
+ const existingTables = await getExistingTables();
+ const isClustered = getIsCluster();
+
+ const sqls: string[] = [];
+
+ // 1. Create cohort_members table
+ // Stores which profile_ids belong to which cohorts
+ // Uses ReplacingMergeTree for efficient updates (version-based deduplication)
+ if (!existingTables.includes('cohort_members_distributed') && !existingTables.includes('cohort_members')) {
+ sqls.push(
+ ...createTable({
+ name: 'cohort_members',
+ columns: [
+ 'project_id String CODEC(ZSTD(3))',
+ 'cohort_id String CODEC(ZSTD(3))',
+ 'profile_id String CODEC(ZSTD(3))',
+ 'matched_at DateTime DEFAULT now()',
+ 'matching_properties Map(String, String) CODEC(ZSTD(3))',
+ 'version UInt64 DEFAULT 1',
+ ],
+ indices: [
+ 'INDEX idx_profile profile_id TYPE bloom_filter GRANULARITY 1',
+ 'INDEX idx_cohort cohort_id TYPE bloom_filter GRANULARITY 1',
+ ],
+ engine: 'ReplacingMergeTree(version)',
+ orderBy: ['project_id', 'cohort_id', 'profile_id'],
+ partitionBy: 'toYYYYMM(matched_at)',
+ settings: {
+ index_granularity: 8192,
+ },
+ distributionHash: 'cityHash64(project_id, cohort_id)',
+ replicatedVersion,
+ isClustered,
+ }),
+ );
+ }
+
+ // 2. Create cohort_metadata table
+ // Caches cohort sizes and sample profile IDs for fast lookups
+ if (!existingTables.includes('cohort_metadata_distributed') && !existingTables.includes('cohort_metadata')) {
+ sqls.push(
+ ...createTable({
+ name: 'cohort_metadata',
+ columns: [
+ 'project_id String',
+ 'cohort_id String',
+ 'member_count UInt64',
+ 'last_computed_at DateTime',
+ 'sample_profiles Array(String)',
+ 'version UInt64 DEFAULT 1',
+ ],
+ engine: 'ReplacingMergeTree(version)',
+ orderBy: ['project_id', 'cohort_id'],
+ settings: {
+ index_granularity: 8192,
+ },
+ distributionHash: 'cityHash64(project_id, cohort_id)',
+ replicatedVersion,
+ isClustered,
+ }),
+ );
+ }
+
+ // 3. Create profile_event_summary_mv materialized view
+ // Aggregates events by profile for fast cohort queries
+ // NOTE: This is different from cohort_events_mv (which is used for retention analysis)
+ // This MV is specifically for cohort computation with frequency filters
+ // populate: false - will build incrementally from new events to avoid OOM
+ if (!existingTables.includes('profile_event_summary_mv_distributed') && !existingTables.includes('profile_event_summary_mv')) {
+ sqls.push(
+ ...createMaterializedView({
+ name: 'profile_event_summary_mv',
+ tableName: 'events',
+ engine: 'AggregatingMergeTree()',
+ orderBy: ['project_id', 'profile_id', 'name', 'event_date'],
+ partitionBy: 'toYYYYMM(event_date)',
+ query: `SELECT
+ project_id,
+ profile_id,
+ name,
+ toStartOfDay(created_at) AS event_date,
+ countState() AS event_count,
+ minState(created_at) AS first_event_time,
+ maxState(created_at) AS last_event_time,
+ sumState(duration) AS total_duration
+ FROM {events}
+ WHERE profile_id != device_id
+ GROUP BY project_id, profile_id, name, event_date`,
+ distributionHash: 'cityHash64(project_id, profile_id)',
+ replicatedVersion,
+ isClustered,
+ populate: false, // Don't populate historical data - build incrementally
+ }),
+ );
+ }
+
+ await runClickhouseMigrationCommands(sqls);
+}
+
+export async function down() {
+ // Down migration is not typically used in production
+ // But included for development/testing rollback
+ const isClustered = getIsCluster();
+
+ const sqls = [
+ dropTable('profile_event_summary_mv_distributed', isClustered),
+ dropTable('profile_event_summary_mv_replicated', isClustered),
+ dropTable('profile_event_summary_mv', isClustered),
+ dropTable('cohort_metadata_distributed', isClustered),
+ dropTable('cohort_metadata_replicated', isClustered),
+ dropTable('cohort_metadata', isClustered),
+ dropTable('cohort_members_distributed', isClustered),
+ dropTable('cohort_members_replicated', isClustered),
+ dropTable('cohort_members', isClustered),
+ ];
+
+ await runClickhouseMigrationCommands(sqls);
+}
diff --git a/packages/db/code-migrations/6-add-imports-table-v2.ts b/packages/db/code-migrations/6-add-imports-table-v2.ts
new file mode 100644
index 000000000..2689a9706
--- /dev/null
+++ b/packages/db/code-migrations/6-add-imports-table-v2.ts
@@ -0,0 +1,90 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import { TABLE_NAMES } from '../src/clickhouse/client';
+import {
+ createTable,
+ modifyTTL,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+export async function up() {
+ const isClustered = getIsCluster();
+
+ const sqls: string[] = [
+ ...createTable({
+ name: 'events_imports_v2',
+ columns: [
+ // Same columns as events table
+ '`id` UUID DEFAULT generateUUIDv4()',
+ '`name` LowCardinality(String)',
+ '`sdk_name` LowCardinality(String)',
+ '`sdk_version` LowCardinality(String)',
+ '`device_id` String CODEC(ZSTD(3))',
+ '`profile_id` String CODEC(ZSTD(3))',
+ '`project_id` String CODEC(ZSTD(3))',
+ '`session_id` String CODEC(LZ4)',
+ '`path` String CODEC(ZSTD(3))',
+ '`origin` String CODEC(ZSTD(3))',
+ '`referrer` String CODEC(ZSTD(3))',
+ '`referrer_name` String CODEC(ZSTD(3))',
+ '`referrer_type` LowCardinality(String)',
+ '`duration` UInt64 CODEC(Delta(4), LZ4)',
+ '`properties` Map(String, String) CODEC(ZSTD(3))',
+ '`created_at` DateTime64(3) CODEC(DoubleDelta, ZSTD(3))',
+ '`country` LowCardinality(FixedString(2))',
+ '`city` String',
+ '`region` LowCardinality(String)',
+ '`longitude` Nullable(Float32) CODEC(Gorilla, LZ4)',
+ '`latitude` Nullable(Float32) CODEC(Gorilla, LZ4)',
+ '`os` LowCardinality(String)',
+ '`os_version` LowCardinality(String)',
+ '`browser` LowCardinality(String)',
+ '`browser_version` LowCardinality(String)',
+ '`device` LowCardinality(String)',
+ '`brand` LowCardinality(String)',
+ '`model` LowCardinality(String)',
+ '`imported_at` Nullable(DateTime) CODEC(Delta(4), LZ4)',
+
+ // Additional metadata columns for import tracking
+ '`import_id` String CODEC(ZSTD(3))',
+ "`import_status` LowCardinality(String) DEFAULT 'pending'",
+ '`imported_at_meta` DateTime DEFAULT now()',
+ ],
+ orderBy: ['created_at', 'name'], // Changed: Removed import_id from ORDER BY since it's now in partition key
+ partitionBy: 'import_id', // Changed: Was toYYYYMM(imported_at_meta) - now partition by import_id for better memory management
+ settings: {
+ index_granularity: 8192,
+ },
+ distributionHash: 'cityHash64(import_id)',
+ replicatedVersion: '2', // Version 2
+ isClustered,
+ }),
+ ];
+
+ // Add TTL policy for auto-cleanup after 7 days
+ sqls.push(
+ modifyTTL({
+ tableName: 'events_imports_v2',
+ isClustered,
+ ttl: 'imported_at_meta + INTERVAL 7 DAY',
+ }),
+ );
+
+ fs.writeFileSync(
+ path.join(__filename.replace('.ts', '.sql')),
+ sqls
+ .map((sql) =>
+ sql
+ .trim()
+ .replace(/;$/, '')
+ .replace(/\n{2,}/g, '\n')
+ .concat(';'),
+ )
+ .join('\n\n---\n\n'),
+ );
+
+ if (!process.argv.includes('--dry')) {
+ await runClickhouseMigrationCommands(sqls);
+ }
+}
diff --git a/packages/db/code-migrations/9-events-daily-stats.ts b/packages/db/code-migrations/9-events-daily-stats.ts
new file mode 100644
index 000000000..c295dfc51
--- /dev/null
+++ b/packages/db/code-migrations/9-events-daily-stats.ts
@@ -0,0 +1,270 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import {
+ chMigrationClient,
+ createMaterializedView,
+ moveDataBetweenTables,
+ runClickhouseMigrationCommands,
+} from '../src/clickhouse/migration';
+import { getIsCluster } from './helpers';
+
+/**
+ * Migration 9: Create events_daily_stats to include ALL events
+ *
+ * Prerequisites: Drop existing events_daily_stats before running this migration
+ *
+ * This migration creates events_daily_stats MV that includes ALL events for complete analytics.
+ * Previous version excluded session_start, session_end, and screen_view events.
+ *
+ * Steps:
+ * 1. Optionally delete existing data up to a certain date
+ * 2. Optionally create events_daily_stats without POPULATE (starts capturing new data)
+ * 3. Backfill historical data from events table using day-by-day batch inserts
+ *
+ * Usage:
+ * # Full migration (create MV + backfill all data)
+ * pnpm tsx packages/db/code-migrations/9-events-daily-stats.ts
+ *
+ * # Backfill specific date range, skip MV creation, delete old data
+ * pnpm tsx packages/db/code-migrations/9-events-daily-stats.ts --start 2024-01-01 --end 2026-01-27 --delete-till 2026-01-28 --skip-mv
+ *
+ * # Backfill without delete
+ * pnpm tsx packages/db/code-migrations/9-events-daily-stats.ts --start 2024-01-01 --end 2026-01-27 --skip-delete --skip-mv
+ *
+ * # Dry run to see what will be executed
+ * pnpm tsx packages/db/code-migrations/9-events-daily-stats.ts --start 2024-01-01 --end 2026-01-27 --dry
+ */
+export async function up() {
+ const isClustered = getIsCluster();
+ const sqls: string[] = [];
+
+ // Parse command line arguments
+ const args = process.argv;
+ const startDateArg = args.find((arg, idx) => args[idx - 1] === '--start');
+ const endDateArg = args.find((arg, idx) => args[idx - 1] === '--end');
+ const deleteDateArg = args.find((arg, idx) => args[idx - 1] === '--delete-till');
+ const skipDelete = args.includes('--skip-delete');
+ const skipMv = args.includes('--skip-mv');
+
+ const targetTable = isClustered ? 'events_daily_stats_replicated' : 'events_daily_stats';
+
+ // Step 1: Delete existing data in batches (optional)
+ // Only delete dates in the backfill range, not everything up to delete-till date
+ if (!skipDelete && deleteDateArg) {
+ console.log(`🗑️ Preparing to delete data for backfill range (day-by-day)`);
+ console.log('');
+ }
+
+ // Step 2: Create MV without POPULATE (so it starts capturing new data immediately)
+ if (!skipMv) {
+ const mvStatements = createMaterializedView({
+ name: 'events_daily_stats',
+ tableName: 'events',
+ orderBy: ['project_id', 'name', 'date'],
+ partitionBy: 'toYYYYMM(date)',
+ query: `SELECT
+ project_id,
+ name,
+ toDate(created_at) as date,
+ uniqState(profile_id) as unique_profiles_state,
+ uniqState(session_id) as unique_sessions_state,
+ countState() as event_count
+ FROM {events}
+ GROUP BY project_id, name, date`,
+ distributionHash: 'cityHash64(project_id, name, date)',
+ replicatedVersion: '1',
+ isClustered,
+ populate: false, // Don't use POPULATE to avoid timeout
+ });
+
+ sqls.push(...mvStatements);
+ }
+
+ // Step 3: Backfill historical data from events table
+ // First, check the actual date range in the events table
+ const checkDataQuery = await chMigrationClient.query({
+ query: `
+ SELECT
+ min(toDate(created_at)) as min_date,
+ max(toDate(created_at)) as max_date,
+ count() as total_events
+ FROM events
+ `,
+ format: 'JSONEachRow',
+ });
+
+ const dataRange = await checkDataQuery.json<{
+ min_date: string;
+ max_date: string;
+ total_events: string;
+ }>();
+
+ if (dataRange[0]?.min_date && dataRange[0]?.max_date) {
+ // Determine start and end dates based on arguments or data range
+ let startDate: Date;
+ let endDate: Date;
+
+ if (startDateArg) {
+ startDate = new Date(startDateArg + 'T00:00:00Z'); // Use UTC
+ } else {
+ startDate = new Date(dataRange[0].min_date + 'T00:00:00Z');
+ }
+
+ if (endDateArg) {
+ endDate = new Date(endDateArg + 'T00:00:00Z'); // Use UTC
+ } else {
+ // If no end date specified, use max date from events table
+ // But typically you'd want yesterday to avoid conflicts with MV
+ endDate = new Date(dataRange[0].max_date + 'T00:00:00Z');
+ }
+
+ // Validate date range
+ if (startDate > endDate) {
+ console.error('❌ Error: Start date must be before or equal to end date');
+ console.error(` Start: ${startDate.toISOString().split('T')[0]}`);
+ console.error(` End: ${endDate.toISOString().split('T')[0]}`);
+ process.exit(1);
+ }
+
+ const totalEvents = Number(dataRange[0].total_events);
+ const backfillDays = Math.ceil((endDate.getTime() - startDate.getTime()) / (1000 * 60 * 60 * 24)) + 1;
+
+ // Generate DELETE statements for the backfill range (if delete not skipped)
+ const deleteStartIndex = sqls.length; // Track where deletes start
+ if (!skipDelete && deleteDateArg) {
+ let currentDeleteDate = new Date(startDate);
+ while (currentDeleteDate <= endDate) {
+ const dateStr = currentDeleteDate.toISOString().split('T')[0];
+ const deleteSql = `ALTER TABLE ${targetTable} DELETE WHERE date = toDate('${dateStr}')`;
+ sqls.push(deleteSql);
+ currentDeleteDate.setDate(currentDeleteDate.getDate() + 1);
+ }
+ }
+ const deleteOps = sqls.length - deleteStartIndex;
+
+ // Show the plan AFTER generating deletes
+ console.log('========================================');
+ console.log('📊 Backfill Plan:');
+ console.log(` Target Table: ${targetTable}`);
+ console.log(` Delete Ops: ${deleteOps > 0 ? `${deleteOps} days` : 'None'}`);
+ console.log(` Create MV: ${skipMv ? 'Skipped' : 'Yes'}`);
+ console.log(` Backfill Start: ${startDate.toISOString().split('T')[0]}`);
+ console.log(` Backfill End: ${endDate.toISOString().split('T')[0]} (inclusive)`);
+ console.log(` Backfill Days: ${backfillDays} days`);
+ console.log(` Chunking: Hourly (24 chunks per day)`);
+ console.log(` Total Chunks: ${backfillDays * 24} hourly inserts`);
+ console.log(` Total Events: ${totalEvents.toLocaleString()} in events table`);
+ console.log('========================================');
+ console.log('');
+
+ // Generate INSERT statements chunked by hour to reduce memory usage
+ // Process each day in 24 hourly chunks to avoid ClickHouse memory limits
+ const backfillSqls: string[] = [];
+
+ let currentDate = new Date(startDate); // Start from startDate (go forward)
+
+ while (currentDate <= endDate) {
+ const dateStr = currentDate.toISOString().split('T')[0];
+
+ // Process each day in 24 hourly chunks
+ for (let hour = 0; hour < 24; hour++) {
+ const hourStart = `${dateStr} ${String(hour).padStart(2, '0')}:00:00`;
+ const hourEnd = `${dateStr} ${String(hour).padStart(2, '0')}:59:59`;
+
+ const sql = `INSERT INTO ${targetTable}
+ SELECT
+ project_id,
+ name,
+ toDate(created_at) as date,
+ uniqState(profile_id) as unique_profiles_state,
+ uniqState(session_id) as unique_sessions_state,
+ countState() as event_count
+ FROM events
+ WHERE created_at >= toDateTime('${hourStart}')
+ AND created_at <= toDateTime('${hourEnd}')
+ GROUP BY project_id, name, date`;
+
+ backfillSqls.push(sql);
+ }
+
+ currentDate.setDate(currentDate.getDate() + 1);
+ }
+
+ sqls.push(...backfillSqls);
+
+ // Add OPTIMIZE TABLE to merge hourly chunks into daily aggregates
+ const optimizeSql = `OPTIMIZE TABLE ${targetTable} FINAL`;
+ sqls.push(optimizeSql);
+ console.log('Added OPTIMIZE TABLE command to merge hourly chunks');
+ console.log('');
+ } else {
+ console.log('No data found in the specified date range, skipping backfill');
+ }
+
+ // Write SQL to file for review
+ const sqlFilePath = path.join(__filename.replace('.ts', '.sql').replace('.js', '.sql'));
+ fs.writeFileSync(
+ sqlFilePath,
+ sqls
+ .map((sql) =>
+ sql
+ .trim()
+ .replace(/;$/, '')
+ .replace(/\n{2,}/g, '\n')
+ .concat(';'),
+ )
+ .join('\n\n---\n\n'),
+ );
+
+ // Count operation types
+ const deleteOpsTotal = sqls.filter(sql => sql.includes('DELETE')).length;
+ const mvOpsTotal = sqls.filter(sql => sql.includes('CREATE') || sql.includes('MATERIALIZED VIEW')).length;
+ const insertOpsTotal = sqls.filter(sql => sql.includes('INSERT')).length;
+
+ const optimizeOpsTotal = sqls.filter(sql => sql.includes('OPTIMIZE')).length;
+
+ console.log(`Generated ${sqls.length} SQL statements:`);
+ console.log(` - ${deleteOpsTotal} DELETE operations (day-by-day)`);
+ console.log(` - ${mvOpsTotal} MV creation operations`);
+ console.log(` - ${insertOpsTotal} INSERT operations (hourly chunks)`);
+ console.log(` - ${optimizeOpsTotal} OPTIMIZE operations (merge chunks)`);
+ console.log(`SQL written to: ${sqlFilePath}`);
+ console.log('');
+
+ // Execute if not in dry-run mode
+ if (!process.argv.includes('--dry')) {
+ console.log('🚀 Executing migration...');
+ console.log('');
+
+ // Execute with progress tracking
+ let completed = 0;
+ const total = sqls.length;
+
+ for (const sql of sqls) {
+ await runClickhouseMigrationCommands([sql]);
+ completed++;
+
+ // Show progress every 100 queries, every 10% or on last query
+ const shouldLog = completed % 100 === 0 ||
+ completed === total ||
+ Math.floor((completed / total) * 10) > Math.floor(((completed - 1) / total) * 10);
+
+ if (shouldLog) {
+ const percentage = ((completed / total) * 100).toFixed(1);
+ console.log(`Progress: ${completed}/${total} (${percentage}%)`);
+ }
+ }
+
+ console.log('');
+ console.log('✅ Migration completed successfully!');
+ console.log('');
+ console.log('Next steps:');
+ console.log('1. Verify data in events_daily_stats:');
+ console.log(` SELECT date, count() FROM ${targetTable} GROUP BY date ORDER BY date`);
+ console.log('2. Check a few sample queries to ensure accuracy');
+ } else {
+ console.log('🔍 Dry-run mode: SQL generated but not executed');
+ console.log('');
+ console.log('To execute, run without --dry flag');
+ }
+}
diff --git a/packages/db/index.ts b/packages/db/index.ts
index 58042d3f3..53418ed0f 100644
--- a/packages/db/index.ts
+++ b/packages/db/index.ts
@@ -29,3 +29,6 @@ export * from './src/clickhouse/query-builder';
export * from './src/services/import.service';
export * from './src/services/overview.service';
export * from './src/session-context';
+export * from './src/services/materialize-columns.service';
+export * from './src/services/cohort.service';
+export * from './src/services/custom-event.service';
diff --git a/packages/db/package.json b/packages/db/package.json
index fb4cd393e..86f9f89e2 100644
--- a/packages/db/package.json
+++ b/packages/db/package.json
@@ -9,6 +9,7 @@
"migrate:deploy:code": "pnpm with-env jiti ./code-migrations/migrate.ts",
"migrate:deploy:db": "pnpm with-env prisma migrate deploy",
"migrate:deploy": "pnpm migrate:deploy:db && pnpm migrate:deploy:code",
+ "materialize:analyze": "pnpm with-env jiti ./src/cli/materialize.ts",
"typecheck": "tsc --noEmit",
"with-env": "dotenv -e ../../.env -c --"
},
diff --git a/packages/db/prisma/migrations/20260124115911_add_cohorts_table/migration.sql b/packages/db/prisma/migrations/20260124115911_add_cohorts_table/migration.sql
new file mode 100644
index 000000000..d0c2ea4f0
--- /dev/null
+++ b/packages/db/prisma/migrations/20260124115911_add_cohorts_table/migration.sql
@@ -0,0 +1,22 @@
+-- CreateTable
+CREATE TABLE "cohorts" (
+ "id" UUID NOT NULL DEFAULT gen_random_uuid(),
+ "name" TEXT NOT NULL,
+ "description" TEXT,
+ "projectId" TEXT NOT NULL,
+ "definition" JSONB NOT NULL,
+ "isStatic" BOOLEAN NOT NULL DEFAULT false,
+ "computeOnDemand" BOOLEAN NOT NULL DEFAULT false,
+ "profileCount" INTEGER NOT NULL DEFAULT 0,
+ "lastComputedAt" TIMESTAMP(3),
+ "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+
+ CONSTRAINT "cohorts_pkey" PRIMARY KEY ("id")
+);
+
+-- CreateIndex
+CREATE INDEX "cohorts_projectId_idx" ON "cohorts"("projectId");
+
+-- AddForeignKey
+ALTER TABLE "cohorts" ADD CONSTRAINT "cohorts_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "projects"("id") ON DELETE CASCADE ON UPDATE CASCADE;
diff --git a/packages/db/prisma/migrations/20260226120000_add_last_notified_at_to_notification_rules/migration.sql b/packages/db/prisma/migrations/20260226120000_add_last_notified_at_to_notification_rules/migration.sql
new file mode 100644
index 000000000..849aaafa0
--- /dev/null
+++ b/packages/db/prisma/migrations/20260226120000_add_last_notified_at_to_notification_rules/migration.sql
@@ -0,0 +1,2 @@
+-- AlterTable
+ALTER TABLE "notification_rules" ADD COLUMN "lastNotifiedAt" TIMESTAMP(3);
diff --git a/packages/db/prisma/migrations/20260304120000_add_global_filters_to_reports/migration.sql b/packages/db/prisma/migrations/20260304120000_add_global_filters_to_reports/migration.sql
new file mode 100644
index 000000000..115a095a8
--- /dev/null
+++ b/packages/db/prisma/migrations/20260304120000_add_global_filters_to_reports/migration.sql
@@ -0,0 +1,2 @@
+-- AlterTable
+ALTER TABLE "reports" ADD COLUMN "globalFilters" JSONB NOT NULL DEFAULT '[]';
diff --git a/packages/db/prisma/migrations/20260305120000_add_hold_properties_to_reports/migration.sql b/packages/db/prisma/migrations/20260305120000_add_hold_properties_to_reports/migration.sql
new file mode 100644
index 000000000..52dd9fc92
--- /dev/null
+++ b/packages/db/prisma/migrations/20260305120000_add_hold_properties_to_reports/migration.sql
@@ -0,0 +1,2 @@
+-- AlterTable
+ALTER TABLE "reports" ADD COLUMN "holdProperties" JSONB NOT NULL DEFAULT '[]';
diff --git a/packages/db/prisma/migrations/20260310120000_add_measuring_to_reports/migration.sql b/packages/db/prisma/migrations/20260310120000_add_measuring_to_reports/migration.sql
new file mode 100644
index 000000000..7d8abcdea
--- /dev/null
+++ b/packages/db/prisma/migrations/20260310120000_add_measuring_to_reports/migration.sql
@@ -0,0 +1,2 @@
+-- AlterTable
+ALTER TABLE "reports" ADD COLUMN "measuring" TEXT;
diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma
index 38a29105d..a3d6285bb 100644
--- a/packages/db/prisma/schema.prisma
+++ b/packages/db/prisma/schema.prisma
@@ -196,6 +196,8 @@ model Project {
notificationRules NotificationRule[]
notifications Notification[]
imports Import[]
+ cohorts Cohort[]
+ customEvents CustomEvent[]
// When deleteAt > now(), the project will be deleted
deleteAt DateTime?
@@ -320,7 +322,10 @@ model Report {
previous Boolean @default(false)
criteria String?
funnelGroup String?
- funnelWindow Float?
+ funnelWindow Float?
+ globalFilters Json @default("[]")
+ holdProperties Json @default("[]")
+ measuring String?
dashboardId String
dashboard Dashboard @relation(fields: [dashboardId], references: [id], onDelete: Cascade)
@@ -385,6 +390,27 @@ model EventMeta {
@@map("event_meta")
}
+model CustomEvent {
+ id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
+ name String
+ description String?
+ projectId String
+ project Project @relation(fields: [projectId], references: [id], onDelete: Cascade)
+
+ // Custom event definition (JSON structure)
+ /// [IPrismaCustomEventDefinition]
+ definition Json
+
+ // Mark as conversion event for filtering
+ conversion Boolean @default(false)
+
+ createdAt DateTime @default(now())
+ updatedAt DateTime @default(now()) @updatedAt
+
+ @@unique([name, projectId])
+ @@map("custom_events")
+}
+
model Reference {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
title String
@@ -414,11 +440,12 @@ model NotificationRule {
sendToApp Boolean @default(false)
sendToEmail Boolean @default(false)
/// [IPrismaNotificationRuleConfig]
- config Json
- template String?
- createdAt DateTime @default(now())
- updatedAt DateTime @default(now()) @updatedAt
- notifications Notification[]
+ config Json
+ template String?
+ lastNotifiedAt DateTime?
+ createdAt DateTime @default(now())
+ updatedAt DateTime @default(now()) @updatedAt
+ notifications Notification[]
@@map("notification_rules")
}
@@ -497,3 +524,45 @@ model Import {
@@map("imports")
}
+
+model MaterializedColumn {
+ id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
+ targetTable String @default("events") // "events" | "profiles"
+ propertyKey String // e.g., "utm_source" (without "properties." prefix)
+ columnName String // e.g., "utm_source" (usually same as propertyKey)
+ cardinality Int // Number of unique values
+ usageCount Int // How many reports use this property
+ benefitScore Float // Calculated benefit score
+ estimatedSize BigInt // Estimated storage cost in bytes
+ status String // "active", "pending", "failed"
+ createdAt DateTime @default(now())
+ materializedAt DateTime? // When ALTER TABLE was executed
+ lastAnalyzedAt DateTime @default(now()) @updatedAt
+
+ @@unique([targetTable, propertyKey])
+ @@map("materialized_columns")
+}
+
+model Cohort {
+ id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
+ name String
+ description String?
+ projectId String
+ project Project @relation(fields: [projectId], references: [id], onDelete: Cascade)
+
+ // Cohort definition (JSON structure)
+ /// [IPrismaCohortDefinition]
+ definition Json
+
+ // Type and metadata
+ isStatic Boolean @default(false) // Static vs Dynamic
+ profileCount Int @default(0) // Cached count
+ lastComputedAt DateTime? // When last computed
+ computeOnDemand Boolean @default(false) // Don't store, compute when needed
+
+ createdAt DateTime @default(now())
+ updatedAt DateTime @default(now()) @updatedAt
+
+ @@map("cohorts")
+ @@index([projectId])
+}
diff --git a/packages/db/src/buffers/base-buffer.ts b/packages/db/src/buffers/base-buffer.ts
index cc8421004..27a8e807a 100644
--- a/packages/db/src/buffers/base-buffer.ts
+++ b/packages/db/src/buffers/base-buffer.ts
@@ -1,7 +1,7 @@
import { generateSecureId } from '@openpanel/common/server';
import { type ILogger, createLogger } from '@openpanel/logger';
import { cronQueue } from '@openpanel/queue';
-import { getRedisCache, runEvery } from '@openpanel/redis';
+import { type ExtendedRedis, getRedisCache, runEvery } from '@openpanel/redis';
export class BaseBuffer {
name: string;
@@ -12,11 +12,15 @@ export class BaseBuffer {
enableParallelProcessing: boolean;
protected bufferCounterKey: string;
+ protected redis: ExtendedRedis;
constructor(options: {
name: string;
onFlush: () => Promise;
enableParallelProcessing?: boolean;
+ // Pass a dedicated Redis client to isolate this buffer from the general cache.
+ // Falls back to getRedisCache() so existing buffers need no changes.
+ redis?: ExtendedRedis;
}) {
this.logger = createLogger({ name: options.name });
this.name = options.name;
@@ -24,6 +28,7 @@ export class BaseBuffer {
this.onFlush = options.onFlush;
this.bufferCounterKey = `${this.name}:buffer:count`;
this.enableParallelProcessing = options.enableParallelProcessing ?? false;
+ this.redis = options.redis ?? getRedisCache();
}
protected chunks(items: T[], size: number) {
@@ -48,14 +53,14 @@ export class BaseBuffer {
fn: async () => {
try {
const actual = await fallbackFn();
- await getRedisCache().set(this.bufferCounterKey, actual.toString());
+ await this.redis.set(this.bufferCounterKey, actual.toString());
} catch (error) {
this.logger.warn('Failed to resync buffer counter', { error });
}
},
}).catch(() => {});
- const counterValue = await getRedisCache().get(key);
+ const counterValue = await this.redis.get(key);
if (counterValue !== null) {
const parsed = Number.parseInt(counterValue, 10);
if (!Number.isNaN(parsed)) {
@@ -70,7 +75,7 @@ export class BaseBuffer {
// Initialize counter with current size
const count = await fallbackFn();
- await getRedisCache().set(key, count.toString());
+ await this.redis.set(key, count.toString());
return count;
} catch (error) {
this.logger.warn(
@@ -90,7 +95,7 @@ export class BaseBuffer {
return 0
end
`;
- await getRedisCache().eval(script, 1, this.lockKey, lockId);
+ await this.redis.eval(script, 1, this.lockKey, lockId);
}
async tryFlush() {
@@ -121,7 +126,7 @@ export class BaseBuffer {
// Sequential mode: Use lock to ensure only one worker processes at a time
const lockId = generateSecureId('lock');
- const acquired = await getRedisCache().set(
+ const acquired = await this.redis.set(
this.lockKey,
lockId,
'EX',
@@ -142,7 +147,7 @@ export class BaseBuffer {
// On error, we might want to reset counter to avoid drift
if (this.bufferCounterKey) {
this.logger.warn('Resetting buffer counter due to flush error');
- await getRedisCache().del(this.bufferCounterKey);
+ await this.redis.del(this.bufferCounterKey);
}
} finally {
await this.releaseLock(lockId);
diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts
index d305372aa..275b4e936 100644
--- a/packages/db/src/buffers/event-buffer.ts
+++ b/packages/db/src/buffers/event-buffer.ts
@@ -1,7 +1,7 @@
import { getSafeJson } from '@openpanel/json';
import {
type Redis,
- getRedisCache,
+ getRedisEvent,
getRedisPub,
publishEvent,
} from '@openpanel/redis';
@@ -25,8 +25,21 @@ import { BaseBuffer } from './base-buffer';
* - Retrieve the last screen_view (don't modify it)
* - Push both screen_view and session_end to buffer
* 4. Flush: Simply process all events from the list buffer
+ *
+ * Optimizations:
+ * - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips
+ * - Batched publishes: All PUBLISH commands are included in the multi pipeline
+ * - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET)
*/
+// Pending event for local buffer
+interface PendingEvent {
+ event: IClickhouseEvent;
+ eventJson: string;
+ eventWithTimestamp?: string;
+ type: 'regular' | 'screen_view' | 'session_end';
+}
+
export class EventBuffer extends BaseBuffer {
// Configurable limits
private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE
@@ -36,13 +49,35 @@ export class EventBuffer extends BaseBuffer {
? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10)
: 1000;
+ // Micro-batching configuration
+ private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS
+ ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10)
+ : 10; // Flush every 10ms by default
+ private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE
+ ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10)
+ : 100; // Or when we hit 100 events
+
+ // Local event buffer for micro-batching
+ private pendingEvents: PendingEvent[] = [];
+ private flushTimer: ReturnType | null = null;
+ private isFlushing = false;
+
+ // Throttled publish configuration
+ private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS
+ ? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10)
+ : 1000; // Publish at most once per second
+ private lastPublishTime = 0;
+ private pendingPublishEvent: IClickhouseEvent | null = null;
+ private publishTimer: ReturnType | null = null;
+
private activeVisitorsExpiration = 60 * 5; // 5 minutes
// LIST - Stores all events ready to be flushed
- private queueKey = 'event_buffer:queue';
+ // {event_buffer} hash tag ensures all EventBuffer keys hash to the same Redis Cluster slot
+ private queueKey = '{event_buffer}:queue';
// STRING - Tracks total buffer size incrementally
- protected bufferCounterKey = 'event_buffer:total_count';
+ protected bufferCounterKey = '{event_buffer}:total_count';
// Script SHAs for loaded Lua scripts
private scriptShas: {
@@ -52,12 +87,12 @@ export class EventBuffer extends BaseBuffer {
// Hash key for storing last screen_view per session
private getLastScreenViewKeyBySession(sessionId: string) {
- return `event_buffer:last_screen_view:session:${sessionId}`;
+ return `{event_buffer}:last_screen_view:session:${sessionId}`;
}
// Hash key for storing last screen_view per profile
private getLastScreenViewKeyByProfile(projectId: string, profileId: string) {
- return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`;
+ return `{event_buffer}:last_screen_view:profile:${projectId}:${profileId}`;
}
/**
@@ -160,6 +195,8 @@ return added
onFlush: async () => {
await this.processBuffer();
},
+ enableParallelProcessing: true,
+ redis: getRedisEvent(),
});
// Load Lua scripts into Redis on startup
this.loadScripts();
@@ -171,7 +208,7 @@ return added
*/
private async loadScripts() {
try {
- const redis = getRedisCache();
+ const redis = this.redis;
const [screenViewSha, sessionEndSha] = await Promise.all([
redis.script('LOAD', this.addScreenViewScript),
redis.script('LOAD', this.addSessionEndScript),
@@ -190,98 +227,228 @@ return added
}
bulkAdd(events: IClickhouseEvent[]) {
- const redis = getRedisCache();
- const multi = redis.multi();
+ // Add all events to local buffer - they will be flushed together
for (const event of events) {
- this.add(event, multi);
+ this.add(event);
}
- return multi.exec();
}
/**
- * Add an event into Redis buffer.
+ * Add an event into the local buffer for micro-batching.
+ *
+ * Events are buffered locally and flushed to Redis every microBatchIntervalMs
+ * or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips.
*
* Logic:
* - screen_view: Store as "last" for session, flush previous if exists
* - session_end: Flush last screen_view + session_end
* - Other events: Add directly to queue
*/
- async add(event: IClickhouseEvent, _multi?: ReturnType) {
+ add(event: IClickhouseEvent, _multi?: ReturnType) {
+ const eventJson = JSON.stringify(event);
+
+ // Determine event type and prepare data
+ let type: PendingEvent['type'] = 'regular';
+ let eventWithTimestamp: string | undefined;
+
+ if (event.session_id && event.name === 'screen_view') {
+ type = 'screen_view';
+ const timestamp = new Date(event.created_at || Date.now()).getTime();
+ eventWithTimestamp = JSON.stringify({
+ event: event,
+ ts: timestamp,
+ });
+ } else if (event.session_id && event.name === 'session_end') {
+ type = 'session_end';
+ }
+
+ const pendingEvent: PendingEvent = {
+ event,
+ eventJson,
+ eventWithTimestamp,
+ type,
+ };
+
+ // If a multi was provided (legacy bulkAdd pattern), add directly without batching
+ if (_multi) {
+ this.addToMulti(_multi, pendingEvent);
+ return;
+ }
+
+ // Add to local buffer for micro-batching
+ this.pendingEvents.push(pendingEvent);
+
+ // Check if we should flush immediately due to size
+ if (this.pendingEvents.length >= this.microBatchMaxSize) {
+ this.flushLocalBuffer();
+ return;
+ }
+
+ // Schedule flush if not already scheduled
+ if (!this.flushTimer) {
+ this.flushTimer = setTimeout(() => {
+ this.flushTimer = null;
+ this.flushLocalBuffer();
+ }, this.microBatchIntervalMs);
+ }
+ }
+
+ /**
+ * Add a single pending event to a multi pipeline.
+ * Used both for legacy _multi pattern and during batch flush.
+ */
+ private addToMulti(multi: ReturnType, pending: PendingEvent) {
+ const { event, eventJson, eventWithTimestamp, type } = pending;
+
+ if (type === 'screen_view' && event.session_id) {
+ const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
+ const profileKey = event.profile_id
+ ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id)
+ : '';
+
+ this.evalScript(
+ multi,
+ 'addScreenView',
+ this.addScreenViewScript,
+ 4,
+ sessionKey,
+ profileKey,
+ this.queueKey,
+ this.bufferCounterKey,
+ eventWithTimestamp!,
+ '3600',
+ );
+ } else if (type === 'session_end' && event.session_id) {
+ const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
+ const profileKey = event.profile_id
+ ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id)
+ : '';
+
+ this.evalScript(
+ multi,
+ 'addSessionEnd',
+ this.addSessionEndScript,
+ 4,
+ sessionKey,
+ profileKey,
+ this.queueKey,
+ this.bufferCounterKey,
+ eventJson,
+ );
+ } else {
+ // Regular events go directly to queue
+ multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey);
+ }
+
+ // Active visitor tracking (simplified - only ZADD, no redundant SET)
+ if (event.profile_id) {
+ this.incrementActiveVisitorCount(
+ multi,
+ event.project_id,
+ event.profile_id,
+ );
+ }
+ }
+
+ /**
+ * Force flush all pending events from local buffer to Redis immediately.
+ * Useful for testing or when you need to ensure all events are persisted.
+ */
+ public async flush() {
+ // Clear any pending timer
+ if (this.flushTimer) {
+ clearTimeout(this.flushTimer);
+ this.flushTimer = null;
+ }
+ await this.flushLocalBuffer();
+ }
+
+ /**
+ * Flush all pending events from local buffer to Redis in a single pipeline.
+ * This is the core optimization - batching many events into one round-trip.
+ */
+ private async flushLocalBuffer() {
+ if (this.isFlushing || this.pendingEvents.length === 0) {
+ return;
+ }
+
+ this.isFlushing = true;
+
+ // Grab current pending events and clear buffer
+ const eventsToFlush = this.pendingEvents;
+ this.pendingEvents = [];
+
try {
- const redis = getRedisCache();
- const eventJson = JSON.stringify(event);
- const multi = _multi || redis.multi();
-
- if (event.session_id && event.name === 'screen_view') {
- // Handle screen_view
- const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
- const profileKey = event.profile_id
- ? this.getLastScreenViewKeyByProfile(
- event.project_id,
- event.profile_id,
- )
- : '';
- const timestamp = new Date(event.created_at || Date.now()).getTime();
-
- // Combine event and timestamp into single JSON for atomic operations
- const eventWithTimestamp = JSON.stringify({
- event: event,
- ts: timestamp,
- });
-
- this.evalScript(
- multi,
- 'addScreenView',
- this.addScreenViewScript,
- 4,
- sessionKey,
- profileKey,
- this.queueKey,
- this.bufferCounterKey,
- eventWithTimestamp,
- '3600', // 1 hour TTL
- );
- } else if (event.session_id && event.name === 'session_end') {
- // Handle session_end
- const sessionKey = this.getLastScreenViewKeyBySession(event.session_id);
- const profileKey = event.profile_id
- ? this.getLastScreenViewKeyByProfile(
- event.project_id,
- event.profile_id,
- )
- : '';
-
- this.evalScript(
- multi,
- 'addSessionEnd',
- this.addSessionEndScript,
- 4,
- sessionKey,
- profileKey,
- this.queueKey,
- this.bufferCounterKey,
- eventJson,
- );
- } else {
- // All other events go directly to queue
- multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey);
- }
+ const redis = this.redis;
+ const multi = redis.multi();
- if (event.profile_id) {
- this.incrementActiveVisitorCount(
- multi,
- event.project_id,
- event.profile_id,
- );
+ // Add all events to the pipeline
+ for (const pending of eventsToFlush) {
+ this.addToMulti(multi, pending);
}
- if (!_multi) {
- await multi.exec();
- }
+ await multi.exec();
- await publishEvent('events', 'received', transformEvent(event));
+ // Throttled publish - just signal that events were received
+ // Store the last event for publishing (we only need one to signal activity)
+ const lastEvent = eventsToFlush[eventsToFlush.length - 1];
+ if (lastEvent) {
+ this.scheduleThrottledPublish(lastEvent.event);
+ }
} catch (error) {
- this.logger.error('Failed to add event to Redis buffer', { error });
+ this.logger.error('Failed to flush local buffer to Redis', {
+ error,
+ eventCount: eventsToFlush.length,
+ });
+ } finally {
+ this.isFlushing = false;
+ }
+ }
+
+ /**
+ * Throttled publish - publishes at most once per publishThrottleMs.
+ * Instead of publishing every event, we just signal that events were received.
+ * This reduces pub/sub load from 3000/s to 1/s.
+ */
+ private scheduleThrottledPublish(event: IClickhouseEvent) {
+ // Always keep the latest event
+ this.pendingPublishEvent = event;
+
+ const now = Date.now();
+ const timeSinceLastPublish = now - this.lastPublishTime;
+
+ // If enough time has passed, publish immediately
+ if (timeSinceLastPublish >= this.publishThrottleMs) {
+ this.executeThrottledPublish();
+ return;
+ }
+
+ // Otherwise, schedule a publish if not already scheduled
+ if (!this.publishTimer) {
+ const delay = this.publishThrottleMs - timeSinceLastPublish;
+ this.publishTimer = setTimeout(() => {
+ this.publishTimer = null;
+ this.executeThrottledPublish();
+ }, delay);
+ }
+ }
+
+ /**
+ * Execute the throttled publish with the latest pending event.
+ */
+ private executeThrottledPublish() {
+ if (!this.pendingPublishEvent) {
+ return;
+ }
+
+ const event = this.pendingPublishEvent;
+ this.pendingPublishEvent = null;
+ this.lastPublishTime = Date.now();
+
+ // Fire-and-forget publish (no multi = returns Promise)
+ const result = publishEvent('events', 'received', transformEvent(event));
+ if (result instanceof Promise) {
+ result.catch(() => {});
}
}
@@ -321,21 +488,20 @@ return added
* 5. Clean up processed events from queue
*/
async processBuffer() {
- const redis = getRedisCache();
+ const redis = this.redis;
try {
- // Fetch events from queue
- const queueEvents = await redis.lrange(
- this.queueKey,
- 0,
- this.batchSize - 1,
- );
+ // Atomically pop events off the queue — no two workers can get the same events
+ const queueEvents = await redis.lpop(this.queueKey, this.batchSize);
- if (queueEvents.length === 0) {
+ if (!queueEvents || queueEvents.length === 0) {
this.logger.debug('No events to process');
return;
}
+ // Decrement counter immediately after popping
+ await redis.decrby(this.bufferCounterKey, queueEvents.length);
+
// Parse events
const eventsToClickhouse: IClickhouseEvent[] = [];
for (const eventStr of queueEvents) {
@@ -357,19 +523,21 @@ return added
new Date(b.created_at || 0).getTime(),
);
- // Insert events into ClickHouse in chunks
+ // Insert events into ClickHouse in parallel chunks
this.logger.info('Inserting events into ClickHouse', {
totalEvents: eventsToClickhouse.length,
chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize),
});
- for (const chunk of this.chunks(eventsToClickhouse, this.chunkSize)) {
- await ch.insert({
- table: 'events',
- values: chunk,
- format: 'JSONEachRow',
- });
- }
+ await Promise.all(
+ this.chunks(eventsToClickhouse, this.chunkSize).map((chunk) =>
+ ch.insert({
+ table: 'events',
+ values: chunk,
+ format: 'JSONEachRow',
+ })
+ )
+ );
// Publish "saved" events
const pubMulti = getRedisPub().multi();
@@ -378,13 +546,6 @@ return added
}
await pubMulti.exec();
- // Clean up processed events from queue
- await redis
- .multi()
- .ltrim(this.queueKey, queueEvents.length, -1)
- .decrby(this.bufferCounterKey, queueEvents.length)
- .exec();
-
this.logger.info('Processed events from Redis buffer', {
batchSize: this.batchSize,
eventsProcessed: eventsToClickhouse.length,
@@ -407,7 +568,7 @@ return added
profileId: string;
},
): Promise {
- const redis = getRedisCache();
+ const redis = this.redis;
let lastScreenViewKey: string;
if ('sessionId' in params) {
@@ -435,28 +596,32 @@ return added
public async getBufferSize() {
return this.getBufferSizeWithCounter(async () => {
- const redis = getRedisCache();
+ const redis = this.redis;
return await redis.llen(this.queueKey);
});
}
- private async incrementActiveVisitorCount(
+ /**
+ * Track active visitors using ZADD only.
+ *
+ * Optimization: Removed redundant heartbeat SET key.
+ * The ZADD score (timestamp) already tracks when a visitor was last seen.
+ * We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors.
+ */
+ private incrementActiveVisitorCount(
multi: ReturnType,
projectId: string,
profileId: string,
) {
- // Track active visitors and emit expiry events when inactive for TTL
const now = Date.now();
- const zsetKey = `live:visitors:${projectId}`;
- const heartbeatKey = `live:visitor:${projectId}:${profileId}`;
- return multi
- .zadd(zsetKey, now, profileId)
- .set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration);
+ const zsetKey = `{event_buffer}:live:visitors:${projectId}`;
+ // Only ZADD - the score is the timestamp, no need for separate heartbeat key
+ return multi.zadd(zsetKey, now, profileId);
}
public async getActiveVisitorCount(projectId: string): Promise {
- const redis = getRedisCache();
- const zsetKey = `live:visitors:${projectId}`;
+ const redis = this.redis;
+ const zsetKey = `{event_buffer}:live:visitors:${projectId}`;
const cutoff = Date.now() - this.activeVisitorsExpiration * 1000;
const multi = redis.multi();
diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts
index fc82937f1..823125a7a 100644
--- a/packages/db/src/buffers/profile-buffer.ts
+++ b/packages/db/src/buffers/profile-buffer.ts
@@ -6,6 +6,7 @@ import shallowEqual from 'fast-deep-equal';
import { omit } from 'ramda';
import { TABLE_NAMES, ch, chQuery } from '../clickhouse/client';
import type { IClickhouseProfile } from '../services/profile.service';
+import { createEvent } from '../services/event.service';
import { BaseBuffer } from './base-buffer';
export class ProfileBuffer extends BaseBuffer {
@@ -68,6 +69,20 @@ export class ProfileBuffer extends BaseBuffer {
const existingProfile = await this.fetchProfile(profile, logger);
+ // Create _firstSeen event for new profiles
+ if (!existingProfile) {
+ logger.info('New profile detected, creating _firstSeen event', {
+ profileId: profile.id,
+ projectId: profile.project_id,
+ });
+ await this.createFirstSeenEvent(profile).catch((error) => {
+ logger.error('Failed to create _firstSeen event', {
+ error,
+ profileId: profile.id,
+ });
+ });
+ }
+
// Delete any properties that are not server related if we have a non-server profile
if (
existingProfile?.properties.device !== 'server' &&
@@ -201,6 +216,8 @@ export class ProfileBuffer extends BaseBuffer {
}
ORDER BY created_at DESC
LIMIT 1`,
+ undefined,
+ true, // Bypass concurrency limit for profile queries
);
logger.debug('Clickhouse fetch result', {
@@ -254,4 +271,52 @@ export class ProfileBuffer extends BaseBuffer {
async getBufferSize() {
return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey));
}
+
+ /**
+ * Creates a _firstSeen event when a new profile is detected
+ * This event only fires once per user in their entire journey
+ * The underscore prefix marks it as a system-generated event
+ */
+ private async createFirstSeenEvent(profile: IClickhouseProfile) {
+ const createdAt = new Date(profile.created_at);
+
+ await createEvent({
+ name: '_firstSeen',
+ deviceId: profile.id,
+ profileId: profile.id,
+ projectId: profile.project_id,
+ sessionId: '', // No session for system events
+ properties: {
+ isExternal: profile.is_external,
+ source: 'system',
+ // Include initial profile properties
+ ...profile.properties,
+ },
+ createdAt,
+ path: profile.properties.path || '',
+ origin: profile.properties.origin || '',
+ country: profile.properties.country || '',
+ city: profile.properties.city || '',
+ region: profile.properties.region || '',
+ longitude: profile.properties.longitude
+ ? Number(profile.properties.longitude)
+ : undefined,
+ latitude: profile.properties.latitude
+ ? Number(profile.properties.latitude)
+ : undefined,
+ os: profile.properties.os || '',
+ osVersion: profile.properties.os_version || '',
+ browser: profile.properties.browser || '',
+ browserVersion: profile.properties.browser_version || '',
+ device: profile.properties.device || '',
+ brand: profile.properties.brand || '',
+ model: profile.properties.model || '',
+ referrer: profile.properties.referrer || '',
+ referrerName: profile.properties.referrer_name || '',
+ referrerType: profile.properties.referrer_type || '',
+ duration: 0,
+ sdkName: '',
+ sdkVersion: '',
+ });
+ }
}
diff --git a/packages/db/src/buffers/session-buffer.ts b/packages/db/src/buffers/session-buffer.ts
index fad5ee097..cea1eb671 100644
--- a/packages/db/src/buffers/session-buffer.ts
+++ b/packages/db/src/buffers/session-buffer.ts
@@ -1,4 +1,4 @@
-import { type Redis, getRedisCache } from '@openpanel/redis';
+import { getRedisSession } from '@openpanel/redis';
import { getSafeJson } from '@openpanel/json';
import { assocPath, clone } from 'ramda';
@@ -16,15 +16,15 @@ export class SessionBuffer extends BaseBuffer {
: 1000;
private readonly redisKey = 'session-buffer';
- private redis: Redis;
+
constructor() {
super({
name: 'session',
onFlush: async () => {
await this.processBuffer();
},
+ redis: getRedisSession(),
});
- this.redis = getRedisCache();
}
public async getExistingSession(
diff --git a/packages/db/src/cli/materialize.ts b/packages/db/src/cli/materialize.ts
new file mode 100644
index 000000000..14ef08d42
--- /dev/null
+++ b/packages/db/src/cli/materialize.ts
@@ -0,0 +1,45 @@
+#!/usr/bin/env node
+import { materializeColumnsService } from '../services/materialize-columns.service';
+
+async function main() {
+ const args = process.argv.slice(2);
+
+ const dryRun = !args.includes('--execute');
+ const thresholdArg = args.find((arg) => arg.startsWith('--threshold='));
+ const threshold = thresholdArg
+ ? Number.parseInt(thresholdArg.split('=')[1]!)
+ : 150;
+
+ if (Number.isNaN(threshold) || threshold < 0) {
+ console.error('Error: Invalid threshold value');
+ process.exit(1);
+ }
+
+ console.log('Materialized Column Analyzer');
+ console.log(`Mode: ${dryRun ? 'DRY RUN' : 'EXECUTE'}`);
+ console.log(`Threshold: ${threshold}`);
+ console.log('');
+
+ try {
+ const result = await materializeColumnsService.analyze({
+ dryRun,
+ threshold,
+ });
+
+ console.log(result.report);
+
+ if (!dryRun && result.materialized.length > 0) {
+ console.log('\n✅ Successfully materialized:');
+ for (const prop of result.materialized) {
+ console.log(` - ${prop}`);
+ }
+ }
+
+ process.exit(0);
+ } catch (error) {
+ console.error('\n❌ Error during analysis:', error);
+ process.exit(1);
+ }
+}
+
+main();
diff --git a/packages/db/src/clickhouse/client.ts b/packages/db/src/clickhouse/client.ts
index 50cfbb177..9f9615ff3 100644
--- a/packages/db/src/clickhouse/client.ts
+++ b/packages/db/src/clickhouse/client.ts
@@ -58,7 +58,11 @@ export const TABLE_NAMES = {
event_property_values_mv: 'event_property_values_mv',
cohort_events_mv: 'cohort_events_mv',
sessions: 'sessions',
- events_imports: 'events_imports',
+ events_imports: 'events_imports_v2',
+ cohort_members: 'cohort_members',
+ cohort_metadata: 'cohort_metadata',
+ profile_event_summary_mv: 'profile_event_summary_mv',
+ profile_event_property_summary_mv: 'profile_event_property_summary_mv',
};
/**
@@ -102,7 +106,7 @@ function getClickhouseSettings(): ClickHouseSettings {
export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
max_open_connections: 30,
- request_timeout: 300000,
+ request_timeout: parseInt(process.env.CLICKHOUSE_REQUEST_TIMEOUT || '3600000', 10),
keep_alive: {
enabled: true,
idle_socket_ttl: 60000,
@@ -131,7 +135,7 @@ const cleanQuery = (query?: string) =>
export async function withRetry(
operation: () => Promise,
- maxRetries = 3,
+ maxRetries = 5,
baseDelay = 500,
): Promise {
let lastError: Error | undefined;
@@ -188,12 +192,25 @@ export const ch = new Proxy(originalCh, {
http_headers_progress_interval_ms: '50000',
// Ensure server holds the connection until the query is finished
wait_end_of_query: 1,
+ // Remove concurrent query limit for INSERT operations to prevent blocking
+ // when multiple buffers flush simultaneously
+ max_concurrent_queries_for_user: Number.parseInt(
+ process.env.CLICKHOUSE_INSERT_QUERY_LIMIT || '50',
+ 10
+ ),
...args[0].clickhouse_settings,
};
return value.apply(target, args);
});
}
+ if (property === 'query') {
+ return (...args: any[]) =>
+ withRetry(() => {
+ return value.apply(target, args);
+ });
+ }
+
if (property === 'command') {
return (...args: any[]) =>
withRetry(() => {
@@ -208,11 +225,25 @@ export const ch = new Proxy(originalCh, {
export async function chQueryWithMeta>(
query: string,
clickhouseSettings?: ClickHouseSettings,
+ bypassConcurrencyLimit = false,
): Promise> {
const start = Date.now();
+
+ // Merge settings, allowing higher concurrent query limit for critical operations
+ // to prevent profile queries from being blocked by dashboard query limits
+ const finalSettings = bypassConcurrencyLimit
+ ? {
+ ...clickhouseSettings,
+ max_concurrent_queries_for_user: Number.parseInt(
+ process.env.CLICKHOUSE_PROFILE_QUERY_LIMIT || '50',
+ 10
+ )
+ }
+ : clickhouseSettings;
+
const res = await ch.query({
query,
- clickhouse_settings: clickhouseSettings,
+ clickhouse_settings: finalSettings,
});
const json = await res.json();
const keys = Object.keys(json.data[0] || {});
@@ -246,24 +277,33 @@ export async function chQueryWithMeta>(
export async function chInsertCSV(tableName: string, rows: string[]) {
try {
const now = performance.now();
- // Create a readable stream in binary mode for CSV (similar to EventBuffer)
- const csvStream = Readable.from(rows.join('\n'), {
- objectMode: false,
- });
-
- await ch.insert({
- table: tableName,
- values: csvStream,
- format: 'CSV',
- clickhouse_settings: {
- format_csv_allow_double_quotes: 1,
- format_csv_allow_single_quotes: 0,
- },
- });
+ const chunkSize = Number.parseInt(
+ process.env.IMPORT_CSV_CHUNK_SIZE || '10000',
+ 10,
+ );
+
+ // Insert in chunks to reduce memory pressure
+ for (let i = 0; i < rows.length; i += chunkSize) {
+ const chunk = rows.slice(i, i + chunkSize);
+ const csvStream = Readable.from(chunk.join('\n'), {
+ objectMode: false,
+ });
+
+ await ch.insert({
+ table: tableName,
+ values: csvStream,
+ format: 'CSV',
+ clickhouse_settings: {
+ format_csv_allow_double_quotes: 1,
+ format_csv_allow_single_quotes: 0,
+ },
+ });
+ }
logger.info('CSV Insert successful', {
elapsed: performance.now() - now,
rows: rows.length,
+ chunks: Math.ceil(rows.length / chunkSize),
});
} catch (error) {
logger.error('CSV Insert failed:', error);
@@ -274,8 +314,9 @@ export async function chInsertCSV(tableName: string, rows: string[]) {
export async function chQuery>(
query: string,
clickhouseSettings?: ClickHouseSettings,
+ bypassConcurrencyLimit = false,
): Promise {
- return (await chQueryWithMeta(query, clickhouseSettings)).data;
+ return (await chQueryWithMeta(query, clickhouseSettings, bypassConcurrencyLimit)).data;
}
export function formatClickhouseDate(
diff --git a/packages/db/src/engine/fetch.ts b/packages/db/src/engine/fetch.ts
index 954c98e99..f11c694b8 100644
--- a/packages/db/src/engine/fetch.ts
+++ b/packages/db/src/engine/fetch.ts
@@ -4,6 +4,7 @@ import { alphabetIds } from '@openpanel/constants';
import type { IGetChartDataInput } from '@openpanel/validation';
import { chQuery } from '../clickhouse/client';
import { getChartSql } from '../services/chart.service';
+import { getCustomEventByName } from '../services/custom-event.service';
import type { ConcreteSeries, Plan } from './types';
/**
@@ -33,13 +34,20 @@ export async function fetch(plan: Plan): Promise {
continue;
}
- // Build query input
+ // Check if this is a custom event
+ const customEvent = await getCustomEventByName(
+ event.name,
+ plan.input.projectId,
+ );
+
+ // Build query input — merge global filters into per-event filters
+ const globalFilters = plan.input.globalFilters ?? [];
const queryInput: IGetChartDataInput = {
event: {
id: event.id,
name: event.name,
segment: event.segment,
- filters: event.filters,
+ filters: [...event.filters, ...globalFilters],
displayName: event.displayName,
property: event.property,
},
@@ -56,11 +64,24 @@ export async function fetch(plan: Plan): Promise {
criteria: plan.input.criteria,
funnelGroup: plan.input.funnelGroup,
funnelWindow: plan.input.funnelWindow,
+ cohortFilters: plan.input.cohortFilters ?? [],
+ globalFilters: plan.input.globalFilters ?? [],
+ holdProperties: plan.input.holdProperties ?? [],
+ measuring: plan.input.measuring ?? 'conversion_rate',
};
- // Execute query
+ // Execute query with custom event if applicable
let queryResult = await chQuery(
- getChartSql({ ...queryInput, timezone: plan.timezone }),
+ await getChartSql({
+ ...queryInput,
+ timezone: plan.timezone,
+ customEvent: customEvent
+ ? {
+ name: customEvent.name,
+ definition: customEvent.definition as any,
+ }
+ : undefined,
+ }),
{
session_timezone: plan.timezone,
},
@@ -69,10 +90,16 @@ export async function fetch(plan: Plan): Promise {
// Fallback: if no results with breakdowns, try without breakdowns
if (queryResult.length === 0 && plan.input.breakdowns.length > 0) {
queryResult = await chQuery(
- getChartSql({
+ await getChartSql({
...queryInput,
breakdowns: [],
timezone: plan.timezone,
+ customEvent: customEvent
+ ? {
+ name: customEvent.name,
+ definition: customEvent.definition as any,
+ }
+ : undefined,
}),
{
session_timezone: plan.timezone,
diff --git a/packages/db/src/services/chart.service.ts b/packages/db/src/services/chart.service.ts
index f9e60337e..2d9a728e1 100644
--- a/packages/db/src/services/chart.service.ts
+++ b/packages/db/src/services/chart.service.ts
@@ -7,10 +7,204 @@ import type {
IChartInput,
IChartRange,
IGetChartDataInput,
+ ICustomEventDefinition,
+ CohortDefinition,
} from '@openpanel/validation';
import { TABLE_NAMES, formatClickhouseDate } from '../clickhouse/client';
import { createSqlBuilder } from '../sql-builder';
+import {
+ getCustomEventByName,
+ expandCustomEventToSQL,
+} from './custom-event.service';
+import { db } from '../../index';
+import { buildEventCriteriaQuery, buildPropertyBasedCohortQuery } from './cohort.service';
+
+// Cache for materialized columns mapping
+let materializedColumnsCache: Record | null = null;
+let cacheTimestamp = 0;
+const CACHE_TTL = 60 * 60 * 1000; // 1 hour
+
+function filterByTargetTable(
+ cache: Record,
+ targetTable?: 'events' | 'profiles',
+): Record {
+ if (!targetTable) return cache;
+ return Object.fromEntries(
+ Object.entries(cache).filter(([key]) =>
+ targetTable === 'profiles'
+ ? key.startsWith('profile.properties.')
+ : key.startsWith('properties.'),
+ ),
+ );
+}
+
+/**
+ * Get materialized columns from database with caching.
+ * Pass targetTable to get only columns for that table:
+ * 'events' → keys starting with "properties.*"
+ * 'profiles' → keys starting with "profile.properties.*"
+ * (omit) → all columns (used by getSelectPropertyKey)
+ */
+export async function getMaterializedColumns(targetTable?: 'events' | 'profiles'): Promise> {
+ const now = Date.now();
+
+ // Return cached value if still valid
+ if (materializedColumnsCache && (now - cacheTimestamp) < CACHE_TTL) {
+ return filterByTargetTable(materializedColumnsCache, targetTable);
+ }
+
+ try {
+ const columns = await db.materializedColumn.findMany({
+ where: { status: 'active' },
+ select: { propertyKey: true, columnName: true, targetTable: true },
+ });
+
+ const mapping: Record = {};
+ for (const col of columns) {
+ const quotedColName = `\`${col.columnName}\``;
+ if (col.targetTable === 'profiles') {
+ // e.g. "profile.properties.campaign" -> "profile.`campaign`"
+ mapping[`profile.properties.${col.propertyKey}`] = `profile.${quotedColName}`;
+ } else {
+ // e.g. "properties.utm_source" -> "`utm_source`"
+ mapping[`properties.${col.propertyKey}`] = quotedColName;
+ }
+ }
+
+ materializedColumnsCache = mapping;
+ cacheTimestamp = now;
+ return filterByTargetTable(mapping, targetTable);
+ } catch (error) {
+ // If database query fails, return empty mapping (fallback to properties['key'])
+ console.warn('Failed to load materialized columns:', error);
+ return {};
+ }
+}
+
+/**
+ * Initialize materialized columns cache (call at startup)
+ */
+export async function initMaterializedColumnsCache(): Promise {
+ await getMaterializedColumns();
+}
+
+/**
+ * Refresh materialized columns cache (call after adding new columns)
+ */
+export async function refreshMaterializedColumnsCache(): Promise {
+ materializedColumnsCache = null;
+ cacheTimestamp = 0;
+ await getMaterializedColumns();
+}
+
+// Initialize cache on module load (lazy)
+getMaterializedColumns().catch(() => {
+ // Ignore errors on initial load
+});
+
+// Cohort metadata type
+type CohortMetadata = {
+ id: string;
+ computeOnDemand: boolean;
+ definition: CohortDefinition;
+};
+
+/**
+ * Fetch metadata for multiple cohorts from Postgres (no cache - always fresh)
+ */
+export async function fetchCohortsMetadata(
+ cohortIds: string[],
+): Promise