From 8afcf55154bafe1e5553bad1fd98fb1c88da405d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 6 Feb 2026 12:41:41 +0000 Subject: [PATCH 1/7] fix: how we fetch profiles in the buffer --- packages/db/src/buffers/profile-buffer.ts | 40 +++++++++++------------ 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index de80bd422..d568bb23c 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -1,11 +1,11 @@ import { deepMergeObjects } from '@openpanel/common'; import { getSafeJson } from '@openpanel/json'; import type { ILogger } from '@openpanel/logger'; -import { type Redis, getRedisCache } from '@openpanel/redis'; +import { getRedisCache, type Redis } from '@openpanel/redis'; import shallowEqual from 'fast-deep-equal'; import { omit } from 'ramda'; import sqlstring from 'sqlstring'; -import { TABLE_NAMES, ch, chQuery } from '../clickhouse/client'; +import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client'; import type { IClickhouseProfile } from '../services/profile.service'; import { BaseBuffer } from './base-buffer'; @@ -89,7 +89,7 @@ export class ProfileBuffer extends BaseBuffer { 'os_version', 'browser_version', ], - profile.properties, + profile.properties ); } @@ -97,16 +97,16 @@ export class ProfileBuffer extends BaseBuffer { ? deepMergeObjects(existingProfile, omit(['created_at'], profile)) : profile; - if (profile && existingProfile) { - if ( - shallowEqual( - omit(['created_at'], existingProfile), - omit(['created_at'], mergedProfile), - ) - ) { - this.logger.debug('Profile not changed, skipping'); - return; - } + if ( + profile && + existingProfile && + shallowEqual( + omit(['created_at'], existingProfile), + omit(['created_at'], mergedProfile) + ) + ) { + this.logger.debug('Profile not changed, skipping'); + return; } this.logger.debug('Merged profile will be inserted', { @@ -151,11 +151,11 @@ export class ProfileBuffer extends BaseBuffer { private async fetchProfile( profile: IClickhouseProfile, - logger: ILogger, + logger: ILogger ): Promise { const existingProfile = await this.fetchFromCache( profile.id, - profile.project_id, + profile.project_id ); if (existingProfile) { logger.debug('Profile found in Redis'); @@ -167,7 +167,7 @@ export class ProfileBuffer extends BaseBuffer { public async fetchFromCache( profileId: string, - projectId: string, + projectId: string ): Promise { const cacheKey = this.getProfileCacheKey({ profileId, @@ -182,7 +182,7 @@ export class ProfileBuffer extends BaseBuffer { private async fetchFromClickhouse( profile: IClickhouseProfile, - logger: ILogger, + logger: ILogger ): Promise { logger.debug('Fetching profile from Clickhouse'); const result = await chQuery( @@ -207,7 +207,7 @@ export class ProfileBuffer extends BaseBuffer { } GROUP BY id, project_id ORDER BY created_at DESC - LIMIT 1`, + LIMIT 1` ); logger.debug('Clickhouse fetch result', { found: !!result[0], @@ -221,7 +221,7 @@ export class ProfileBuffer extends BaseBuffer { const profiles = await this.redis.lrange( this.redisKey, 0, - this.batchSize - 1, + this.batchSize - 1 ); if (profiles.length === 0) { @@ -231,7 +231,7 @@ export class ProfileBuffer extends BaseBuffer { this.logger.debug(`Processing ${profiles.length} profiles in buffer`); const parsedProfiles = profiles.map((p) => - getSafeJson(p), + getSafeJson(p) ); for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { From 664f1abe0a3b3a8fd46b857c0f7cc2c24bd8d3a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Thu, 22 Jan 2026 09:44:30 +0100 Subject: [PATCH 2/7] perf: optimize event buffer --- apps/api/src/controllers/live.controller.ts | 22 +- packages/db/src/buffers/event-buffer.test.ts | 69 ++-- packages/db/src/buffers/event-buffer.ts | 326 ++++++++++++++----- 3 files changed, 292 insertions(+), 125 deletions(-) diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 29931aa84..cd7afe914 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -8,17 +8,10 @@ import { transformMinimalEvent, } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { - psubscribeToPublishedEvent, - subscribeToPublishedEvent, -} from '@openpanel/redis'; +import { subscribeToPublishedEvent } from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; -export function getLiveEventInfo(key: string) { - return key.split(':').slice(2) as [string, string]; -} - export function wsVisitors( socket: WebSocket, req: FastifyRequest<{ @@ -36,21 +29,8 @@ export function wsVisitors( } }); - const punsubscribe = psubscribeToPublishedEvent( - '__keyevent@0__:expired', - (key) => { - const [projectId] = getLiveEventInfo(key); - if (projectId && projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); - } - }, - ); - socket.on('close', () => { unsubscribe(); - punsubscribe(); }); } diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 95852bd29..50600c704 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -71,8 +71,9 @@ describe('EventBuffer', () => { // Get initial count const initialCount = await eventBuffer.getBufferSize(); - // Add event - await eventBuffer.add(event); + // Add event and flush (events are micro-batched) + eventBuffer.add(event); + await eventBuffer.flush(); // Buffer counter should increase by 1 const newCount = await eventBuffer.getBufferSize(); @@ -109,7 +110,8 @@ describe('EventBuffer', () => { // Add first screen_view const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view1); + eventBuffer.add(view1); + await eventBuffer.flush(); // Should be stored as "last" but NOT in queue yet const count2 = await eventBuffer.getBufferSize(); @@ -124,7 +126,8 @@ describe('EventBuffer', () => { expect(last1!.createdAt.toISOString()).toBe(view1.created_at); // Add second screen_view - await eventBuffer.add(view2); + eventBuffer.add(view2); + await eventBuffer.flush(); // Now view1 should be in buffer const count3 = await eventBuffer.getBufferSize(); @@ -138,7 +141,8 @@ describe('EventBuffer', () => { expect(last2!.createdAt.toISOString()).toBe(view2.created_at); // Add third screen_view - await eventBuffer.add(view3); + eventBuffer.add(view3); + await eventBuffer.flush(); // Now view2 should also be in buffer const count4 = await eventBuffer.getBufferSize(); @@ -174,14 +178,16 @@ describe('EventBuffer', () => { // Add screen_view const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Should be stored as "last", not in buffer yet const count2 = await eventBuffer.getBufferSize(); expect(count2).toBe(count1); // Add session_end - await eventBuffer.add(sessionEnd); + eventBuffer.add(sessionEnd); + await eventBuffer.flush(); // Both should now be in buffer (+2) const count3 = await eventBuffer.getBufferSize(); @@ -207,7 +213,8 @@ describe('EventBuffer', () => { } as any; const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(sessionEnd); + eventBuffer.add(sessionEnd); + await eventBuffer.flush(); // Only session_end should be in buffer (+1) const count2 = await eventBuffer.getBufferSize(); @@ -224,7 +231,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Query by profileId const result = await eventBuffer.getLastScreenView({ @@ -248,7 +256,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Query by sessionId const result = await eventBuffer.getLastScreenView({ @@ -275,43 +284,47 @@ describe('EventBuffer', () => { expect(await eventBuffer.getBufferSize()).toBe(0); // Add regular event - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', name: 'event1', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(1); // Add another regular event - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', name: 'event2', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(2); // Add screen_view (not counted until flushed) - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', profile_id: 'u6', session_id: 'session_6', name: 'screen_view', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); // Still 2 (screen_view is pending) expect(await eventBuffer.getBufferSize()).toBe(2); // Add another screen_view (first one gets flushed) - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', profile_id: 'u6', session_id: 'session_6', name: 'screen_view', created_at: new Date(Date.now() + 1000).toISOString(), } as any); + await eventBuffer.flush(); // Now 3 (2 regular + 1 flushed screen_view) expect(await eventBuffer.getBufferSize()).toBe(3); @@ -330,8 +343,9 @@ describe('EventBuffer', () => { created_at: new Date(Date.now() + 1000).toISOString(), } as any; - await eventBuffer.add(event1); - await eventBuffer.add(event2); + eventBuffer.add(event1); + eventBuffer.add(event2); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(2); @@ -361,12 +375,13 @@ describe('EventBuffer', () => { // Add 4 events for (let i = 0; i < 4; i++) { - await eb.add({ + eb.add({ project_id: 'p8', name: `event${i}`, created_at: new Date(Date.now() + i).toISOString(), } as any); } + await eb.flush(); const insertSpy = vi .spyOn(ch, 'insert') @@ -396,7 +411,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(event); + eventBuffer.add(event); + await eventBuffer.flush(); const count = await eventBuffer.getActiveVisitorCount('p9'); expect(count).toBeGreaterThanOrEqual(1); @@ -439,10 +455,11 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 2000).toISOString(), } as any; - await eventBuffer.add(view1a); - await eventBuffer.add(view2a); - await eventBuffer.add(view1b); // Flushes view1a - await eventBuffer.add(view2b); // Flushes view2a + eventBuffer.add(view1a); + eventBuffer.add(view2a); + eventBuffer.add(view1b); // Flushes view1a + eventBuffer.add(view2b); // Flushes view2a + await eventBuffer.flush(); // Should have 2 events in buffer (one from each session) expect(await eventBuffer.getBufferSize()).toBe(2); @@ -470,7 +487,8 @@ describe('EventBuffer', () => { } as any; const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Should go directly to buffer (no session_id) const count2 = await eventBuffer.getBufferSize(); @@ -498,8 +516,9 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 1000).toISOString(), } as any; - await eventBuffer.add(view1); - await eventBuffer.add(view2); + eventBuffer.add(view1); + eventBuffer.add(view2); + await eventBuffer.flush(); // Both sessions should have their own "last" const lastSession1 = await eventBuffer.getLastScreenView({ diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index d305372aa..15d29b5b4 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -25,8 +25,21 @@ import { BaseBuffer } from './base-buffer'; * - Retrieve the last screen_view (don't modify it) * - Push both screen_view and session_end to buffer * 4. Flush: Simply process all events from the list buffer + * + * Optimizations: + * - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips + * - Batched publishes: All PUBLISH commands are included in the multi pipeline + * - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET) */ +// Pending event for local buffer +interface PendingEvent { + event: IClickhouseEvent; + eventJson: string; + eventWithTimestamp?: string; + type: 'regular' | 'screen_view' | 'session_end'; +} + export class EventBuffer extends BaseBuffer { // Configurable limits private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE @@ -36,6 +49,27 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; + // Micro-batching configuration + private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS + ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10) + : 10; // Flush every 10ms by default + private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) + : 100; // Or when we hit 100 events + + // Local event buffer for micro-batching + private pendingEvents: PendingEvent[] = []; + private flushTimer: ReturnType | null = null; + private isFlushing = false; + + // Throttled publish configuration + private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS + ? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10) + : 1000; // Publish at most once per second + private lastPublishTime = 0; + private pendingPublishEvent: IClickhouseEvent | null = null; + private publishTimer: ReturnType | null = null; + private activeVisitorsExpiration = 60 * 5; // 5 minutes // LIST - Stores all events ready to be flushed @@ -190,98 +224,228 @@ return added } bulkAdd(events: IClickhouseEvent[]) { - const redis = getRedisCache(); - const multi = redis.multi(); + // Add all events to local buffer - they will be flushed together for (const event of events) { - this.add(event, multi); + this.add(event); } - return multi.exec(); } /** - * Add an event into Redis buffer. + * Add an event into the local buffer for micro-batching. + * + * Events are buffered locally and flushed to Redis every microBatchIntervalMs + * or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips. * * Logic: * - screen_view: Store as "last" for session, flush previous if exists * - session_end: Flush last screen_view + session_end * - Other events: Add directly to queue */ - async add(event: IClickhouseEvent, _multi?: ReturnType) { + add(event: IClickhouseEvent, _multi?: ReturnType) { + const eventJson = JSON.stringify(event); + + // Determine event type and prepare data + let type: PendingEvent['type'] = 'regular'; + let eventWithTimestamp: string | undefined; + + if (event.session_id && event.name === 'screen_view') { + type = 'screen_view'; + const timestamp = new Date(event.created_at || Date.now()).getTime(); + eventWithTimestamp = JSON.stringify({ + event: event, + ts: timestamp, + }); + } else if (event.session_id && event.name === 'session_end') { + type = 'session_end'; + } + + const pendingEvent: PendingEvent = { + event, + eventJson, + eventWithTimestamp, + type, + }; + + // If a multi was provided (legacy bulkAdd pattern), add directly without batching + if (_multi) { + this.addToMulti(_multi, pendingEvent); + return; + } + + // Add to local buffer for micro-batching + this.pendingEvents.push(pendingEvent); + + // Check if we should flush immediately due to size + if (this.pendingEvents.length >= this.microBatchMaxSize) { + this.flushLocalBuffer(); + return; + } + + // Schedule flush if not already scheduled + if (!this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.flushLocalBuffer(); + }, this.microBatchIntervalMs); + } + } + + /** + * Add a single pending event to a multi pipeline. + * Used both for legacy _multi pattern and during batch flush. + */ + private addToMulti(multi: ReturnType, pending: PendingEvent) { + const { event, eventJson, eventWithTimestamp, type } = pending; + + if (type === 'screen_view' && event.session_id) { + const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); + const profileKey = event.profile_id + ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) + : ''; + + this.evalScript( + multi, + 'addScreenView', + this.addScreenViewScript, + 4, + sessionKey, + profileKey, + this.queueKey, + this.bufferCounterKey, + eventWithTimestamp!, + '3600', + ); + } else if (type === 'session_end' && event.session_id) { + const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); + const profileKey = event.profile_id + ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) + : ''; + + this.evalScript( + multi, + 'addSessionEnd', + this.addSessionEndScript, + 4, + sessionKey, + profileKey, + this.queueKey, + this.bufferCounterKey, + eventJson, + ); + } else { + // Regular events go directly to queue + multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); + } + + // Active visitor tracking (simplified - only ZADD, no redundant SET) + if (event.profile_id) { + this.incrementActiveVisitorCount( + multi, + event.project_id, + event.profile_id, + ); + } + } + + /** + * Force flush all pending events from local buffer to Redis immediately. + * Useful for testing or when you need to ensure all events are persisted. + */ + public async flush() { + // Clear any pending timer + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + await this.flushLocalBuffer(); + } + + /** + * Flush all pending events from local buffer to Redis in a single pipeline. + * This is the core optimization - batching many events into one round-trip. + */ + private async flushLocalBuffer() { + if (this.isFlushing || this.pendingEvents.length === 0) { + return; + } + + this.isFlushing = true; + + // Grab current pending events and clear buffer + const eventsToFlush = this.pendingEvents; + this.pendingEvents = []; + try { const redis = getRedisCache(); - const eventJson = JSON.stringify(event); - const multi = _multi || redis.multi(); - - if (event.session_id && event.name === 'screen_view') { - // Handle screen_view - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile( - event.project_id, - event.profile_id, - ) - : ''; - const timestamp = new Date(event.created_at || Date.now()).getTime(); - - // Combine event and timestamp into single JSON for atomic operations - const eventWithTimestamp = JSON.stringify({ - event: event, - ts: timestamp, - }); + const multi = redis.multi(); - this.evalScript( - multi, - 'addScreenView', - this.addScreenViewScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventWithTimestamp, - '3600', // 1 hour TTL - ); - } else if (event.session_id && event.name === 'session_end') { - // Handle session_end - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile( - event.project_id, - event.profile_id, - ) - : ''; - - this.evalScript( - multi, - 'addSessionEnd', - this.addSessionEndScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventJson, - ); - } else { - // All other events go directly to queue - multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); + // Add all events to the pipeline + for (const pending of eventsToFlush) { + this.addToMulti(multi, pending); } - if (event.profile_id) { - this.incrementActiveVisitorCount( - multi, - event.project_id, - event.profile_id, - ); - } + await multi.exec(); - if (!_multi) { - await multi.exec(); + // Throttled publish - just signal that events were received + // Store the last event for publishing (we only need one to signal activity) + const lastEvent = eventsToFlush[eventsToFlush.length - 1]; + if (lastEvent) { + this.scheduleThrottledPublish(lastEvent.event); } - - await publishEvent('events', 'received', transformEvent(event)); } catch (error) { - this.logger.error('Failed to add event to Redis buffer', { error }); + this.logger.error('Failed to flush local buffer to Redis', { + error, + eventCount: eventsToFlush.length, + }); + } finally { + this.isFlushing = false; + } + } + + /** + * Throttled publish - publishes at most once per publishThrottleMs. + * Instead of publishing every event, we just signal that events were received. + * This reduces pub/sub load from 3000/s to 1/s. + */ + private scheduleThrottledPublish(event: IClickhouseEvent) { + // Always keep the latest event + this.pendingPublishEvent = event; + + const now = Date.now(); + const timeSinceLastPublish = now - this.lastPublishTime; + + // If enough time has passed, publish immediately + if (timeSinceLastPublish >= this.publishThrottleMs) { + this.executeThrottledPublish(); + return; + } + + // Otherwise, schedule a publish if not already scheduled + if (!this.publishTimer) { + const delay = this.publishThrottleMs - timeSinceLastPublish; + this.publishTimer = setTimeout(() => { + this.publishTimer = null; + this.executeThrottledPublish(); + }, delay); + } + } + + /** + * Execute the throttled publish with the latest pending event. + */ + private executeThrottledPublish() { + if (!this.pendingPublishEvent) { + return; + } + + const event = this.pendingPublishEvent; + this.pendingPublishEvent = null; + this.lastPublishTime = Date.now(); + + // Fire-and-forget publish (no multi = returns Promise) + const result = publishEvent('events', 'received', transformEvent(event)); + if (result instanceof Promise) { + result.catch(() => {}); } } @@ -440,18 +604,22 @@ return added }); } - private async incrementActiveVisitorCount( + /** + * Track active visitors using ZADD only. + * + * Optimization: Removed redundant heartbeat SET key. + * The ZADD score (timestamp) already tracks when a visitor was last seen. + * We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors. + */ + private incrementActiveVisitorCount( multi: ReturnType, projectId: string, profileId: string, ) { - // Track active visitors and emit expiry events when inactive for TTL const now = Date.now(); const zsetKey = `live:visitors:${projectId}`; - const heartbeatKey = `live:visitor:${projectId}:${profileId}`; - return multi - .zadd(zsetKey, now, profileId) - .set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration); + // Only ZADD - the score is the timestamp, no need for separate heartbeat key + return multi.zadd(zsetKey, now, profileId); } public async getActiveVisitorCount(projectId: string): Promise { From bf39804767f30cb50478f183e978121d93d1bae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 6 Feb 2026 13:03:14 +0000 Subject: [PATCH 3/7] remove unused file --- .../payments/scripts/assign-product-to-org.ts | 221 ------------------ 1 file changed, 221 deletions(-) delete mode 100644 packages/payments/scripts/assign-product-to-org.ts diff --git a/packages/payments/scripts/assign-product-to-org.ts b/packages/payments/scripts/assign-product-to-org.ts deleted file mode 100644 index 862c8ee39..000000000 --- a/packages/payments/scripts/assign-product-to-org.ts +++ /dev/null @@ -1,221 +0,0 @@ -import { db } from '@openpanel/db'; -import { Polar } from '@polar-sh/sdk'; -import inquirer from 'inquirer'; -import inquirerAutocomplete from 'inquirer-autocomplete-prompt'; -import { getSuccessUrl } from '..'; - -// Register the autocomplete prompt -inquirer.registerPrompt('autocomplete', inquirerAutocomplete); - -interface Answers { - isProduction: boolean; - polarApiKey: string; - productId: string; - organizationId: string; -} - -async function promptForInput() { - // Get all organizations first - const organizations = await db.organization.findMany({ - select: { - id: true, - name: true, - }, - }); - - // Step 1: Collect Polar credentials first - const polarCredentials = await inquirer.prompt<{ - isProduction: boolean; - polarApiKey: string; - polarOrganizationId: string; - }>([ - { - type: 'list', - name: 'isProduction', - message: 'Is this for production?', - choices: [ - { name: 'Yes', value: true }, - { name: 'No', value: false }, - ], - default: true, - }, - { - type: 'string', - name: 'polarApiKey', - message: 'Enter your Polar API key:', - validate: (input: string) => { - if (!input) return 'API key is required'; - return true; - }, - }, - ]); - - // Step 2: Initialize Polar client and fetch products - const polar = new Polar({ - accessToken: polarCredentials.polarApiKey, - server: polarCredentials.isProduction ? 'production' : 'sandbox', - }); - - console.log('Fetching products from Polar...'); - const productsResponse = await polar.products.list({ - limit: 100, - isArchived: false, - sorting: ['price_amount'], - }); - - const products = productsResponse.result.items; - - if (products.length === 0) { - throw new Error('No products found in Polar'); - } - - // Step 3: Continue with product selection and organization selection - const restOfAnswers = await inquirer.prompt<{ - productId: string; - organizationId: string; - }>([ - { - type: 'autocomplete', - name: 'productId', - message: 'Select product:', - source: (answersSoFar: any, input = '') => { - return products - .filter( - (product) => - product.name.toLowerCase().includes(input.toLowerCase()) || - product.id.toLowerCase().includes(input.toLowerCase()), - ) - .map((product) => { - const price = product.prices?.[0]; - const priceStr = - price && 'priceAmount' in price && price.priceAmount - ? `$${(price.priceAmount / 100).toFixed(2)}/${price.recurringInterval || 'month'}` - : 'No price'; - return { - name: `${product.name} (${priceStr})`, - value: product.id, - }; - }); - }, - }, - { - type: 'autocomplete', - name: 'organizationId', - message: 'Select organization:', - source: (answersSoFar: any, input = '') => { - return organizations - .filter( - (org) => - org.name.toLowerCase().includes(input.toLowerCase()) || - org.id.toLowerCase().includes(input.toLowerCase()), - ) - .map((org) => ({ - name: `${org.name} (${org.id})`, - value: org.id, - })); - }, - }, - ]); - - return { - ...polarCredentials, - ...restOfAnswers, - }; -} - -async function main() { - console.log('Assigning existing product to organization...'); - const input = await promptForInput(); - - const polar = new Polar({ - accessToken: input.polarApiKey, - server: input.isProduction ? 'production' : 'sandbox', - }); - - const organization = await db.organization.findUniqueOrThrow({ - where: { - id: input.organizationId, - }, - select: { - id: true, - name: true, - createdBy: { - select: { - id: true, - email: true, - firstName: true, - lastName: true, - }, - }, - projects: { - select: { - id: true, - }, - }, - }, - }); - - if (!organization.createdBy) { - throw new Error( - `Organization ${organization.name} does not have a creator. Cannot proceed.`, - ); - } - - const user = organization.createdBy; - - // Fetch product details for review - const product = await polar.products.get({ id: input.productId }); - const price = product.prices?.[0]; - const priceStr = - price && 'priceAmount' in price && price.priceAmount - ? `$${(price.priceAmount / 100).toFixed(2)}/${price.recurringInterval || 'month'}` - : 'No price'; - - console.log('\nReview the following settings:'); - console.table({ - product: product.name, - price: priceStr, - organization: organization.name, - email: user.email, - name: - [user.firstName, user.lastName].filter(Boolean).join(' ') || 'No name', - }); - - const { confirmed } = await inquirer.prompt([ - { - type: 'confirm', - name: 'confirmed', - message: 'Do you want to proceed?', - default: false, - }, - ]); - - if (!confirmed) { - console.log('Operation canceled'); - return; - } - - const checkoutLink = await polar.checkoutLinks.create({ - paymentProcessor: 'stripe', - productId: input.productId, - allowDiscountCodes: false, - metadata: { - organizationId: organization.id, - userId: user.id, - }, - successUrl: getSuccessUrl( - input.isProduction - ? 'https://dashboard.openpanel.dev' - : 'http://localhost:3000', - organization.id, - ), - }); - - console.log('\nCheckout link created:'); - console.table(checkoutLink); - console.log('\nProduct assigned successfully!'); -} - -main() - .catch(console.error) - .finally(() => db.$disconnect()); From bc08566cd4aae5af1968e82340752727df2d6481 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 6 Feb 2026 13:11:54 +0000 Subject: [PATCH 4/7] fix --- packages/db/src/buffers/event-buffer.ts | 150 ++++-------------------- 1 file changed, 20 insertions(+), 130 deletions(-) diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 15d29b5b4..9c1e5eb92 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -14,9 +14,8 @@ import { import { BaseBuffer } from './base-buffer'; /** - * Simplified Event Buffer + * Event Buffer * - * Rules: * 1. All events go into a single list buffer (event_buffer:queue) * 2. screen_view events are handled specially: * - Store current screen_view as "last" for the session @@ -24,15 +23,8 @@ import { BaseBuffer } from './base-buffer'; * 3. session_end events: * - Retrieve the last screen_view (don't modify it) * - Push both screen_view and session_end to buffer - * 4. Flush: Simply process all events from the list buffer - * - * Optimizations: - * - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips - * - Batched publishes: All PUBLISH commands are included in the multi pipeline - * - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET) + * 4. Flush: Process all events from the list buffer */ - -// Pending event for local buffer interface PendingEvent { event: IClickhouseEvent; eventJson: string; @@ -41,7 +33,6 @@ interface PendingEvent { } export class EventBuffer extends BaseBuffer { - // Configurable limits private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) : 4000; @@ -49,58 +40,48 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; - // Micro-batching configuration private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10) - : 10; // Flush every 10ms by default + : 10; private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) - : 100; // Or when we hit 100 events + : 100; - // Local event buffer for micro-batching private pendingEvents: PendingEvent[] = []; private flushTimer: ReturnType | null = null; private isFlushing = false; + /** Tracks consecutive flush failures for observability; reset on success. */ + private flushRetryCount = 0; - // Throttled publish configuration private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS ? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10) - : 1000; // Publish at most once per second + : 1000; private lastPublishTime = 0; private pendingPublishEvent: IClickhouseEvent | null = null; private publishTimer: ReturnType | null = null; private activeVisitorsExpiration = 60 * 5; // 5 minutes - - // LIST - Stores all events ready to be flushed private queueKey = 'event_buffer:queue'; - - // STRING - Tracks total buffer size incrementally protected bufferCounterKey = 'event_buffer:total_count'; - // Script SHAs for loaded Lua scripts private scriptShas: { addScreenView?: string; addSessionEnd?: string; } = {}; - // Hash key for storing last screen_view per session private getLastScreenViewKeyBySession(sessionId: string) { return `event_buffer:last_screen_view:session:${sessionId}`; } - // Hash key for storing last screen_view per profile private getLastScreenViewKeyByProfile(projectId: string, profileId: string) { return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`; } /** - * Lua script for handling screen_view addition - RACE-CONDITION SAFE without GroupMQ + * Lua script for screen_view addition. + * Uses GETDEL for atomic get-and-delete to prevent race conditions. * - * Strategy: Use Redis GETDEL (atomic get-and-delete) to ensure only ONE thread - * can process the "last" screen_view at a time. - * - * KEYS[1] = last screen_view key (by session) - stores both event and timestamp as JSON + * KEYS[1] = last screen_view key (by session) * KEYS[2] = last screen_view key (by profile, may be empty) * KEYS[3] = queue key * KEYS[4] = buffer counter key @@ -115,24 +96,18 @@ local counterKey = KEYS[4] local newEventData = ARGV[1] local ttl = tonumber(ARGV[2]) --- GETDEL is atomic: get previous and delete in one operation --- This ensures only ONE thread gets the previous event local previousEventData = redis.call("GETDEL", sessionKey) --- Store new screen_view as last for session redis.call("SET", sessionKey, newEventData, "EX", ttl) --- Store new screen_view as last for profile (if key provided) if profileKey and profileKey ~= "" then redis.call("SET", profileKey, newEventData, "EX", ttl) end --- If there was a previous screen_view, add it to queue with calculated duration if previousEventData then local prev = cjson.decode(previousEventData) local curr = cjson.decode(newEventData) - -- Calculate duration (ensure non-negative to handle clock skew) if prev.ts and curr.ts then prev.event.duration = math.max(0, curr.ts - prev.ts) end @@ -146,9 +121,8 @@ return 0 `; /** - * Lua script for handling session_end - RACE-CONDITION SAFE - * - * Uses GETDEL to atomically retrieve and delete the last screen_view + * Lua script for session_end. + * Uses GETDEL to atomically retrieve and delete the last screen_view. * * KEYS[1] = last screen_view key (by session) * KEYS[2] = last screen_view key (by profile, may be empty) @@ -163,11 +137,9 @@ local queueKey = KEYS[3] local counterKey = KEYS[4] local sessionEndJson = ARGV[1] --- GETDEL is atomic: only ONE thread gets the last screen_view local previousEventData = redis.call("GETDEL", sessionKey) local added = 0 --- If there was a previous screen_view, add it to queue if previousEventData then local prev = cjson.decode(previousEventData) redis.call("RPUSH", queueKey, cjson.encode(prev.event)) @@ -175,12 +147,10 @@ if previousEventData then added = added + 1 end --- Add session_end to queue redis.call("RPUSH", queueKey, sessionEndJson) redis.call("INCR", counterKey) added = added + 1 --- Delete profile key if profileKey and profileKey ~= "" then redis.call("DEL", profileKey) end @@ -195,14 +165,9 @@ return added await this.processBuffer(); }, }); - // Load Lua scripts into Redis on startup this.loadScripts(); } - /** - * Load Lua scripts into Redis and cache their SHAs. - * This avoids sending the entire script on every call. - */ private async loadScripts() { try { const redis = getRedisCache(); @@ -224,27 +189,14 @@ return added } bulkAdd(events: IClickhouseEvent[]) { - // Add all events to local buffer - they will be flushed together for (const event of events) { this.add(event); } } - /** - * Add an event into the local buffer for micro-batching. - * - * Events are buffered locally and flushed to Redis every microBatchIntervalMs - * or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips. - * - * Logic: - * - screen_view: Store as "last" for session, flush previous if exists - * - session_end: Flush last screen_view + session_end - * - Other events: Add directly to queue - */ add(event: IClickhouseEvent, _multi?: ReturnType) { const eventJson = JSON.stringify(event); - // Determine event type and prepare data let type: PendingEvent['type'] = 'regular'; let eventWithTimestamp: string | undefined; @@ -266,22 +218,18 @@ return added type, }; - // If a multi was provided (legacy bulkAdd pattern), add directly without batching if (_multi) { this.addToMulti(_multi, pendingEvent); return; } - // Add to local buffer for micro-batching this.pendingEvents.push(pendingEvent); - // Check if we should flush immediately due to size if (this.pendingEvents.length >= this.microBatchMaxSize) { this.flushLocalBuffer(); return; } - // Schedule flush if not already scheduled if (!this.flushTimer) { this.flushTimer = setTimeout(() => { this.flushTimer = null; @@ -290,10 +238,6 @@ return added } } - /** - * Add a single pending event to a multi pipeline. - * Used both for legacy _multi pattern and during batch flush. - */ private addToMulti(multi: ReturnType, pending: PendingEvent) { const { event, eventJson, eventWithTimestamp, type } = pending; @@ -333,11 +277,9 @@ return added eventJson, ); } else { - // Regular events go directly to queue multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); } - // Active visitor tracking (simplified - only ZADD, no redundant SET) if (event.profile_id) { this.incrementActiveVisitorCount( multi, @@ -347,12 +289,7 @@ return added } } - /** - * Force flush all pending events from local buffer to Redis immediately. - * Useful for testing or when you need to ensure all events are persisted. - */ public async flush() { - // Clear any pending timer if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; @@ -360,10 +297,6 @@ return added await this.flushLocalBuffer(); } - /** - * Flush all pending events from local buffer to Redis in a single pipeline. - * This is the core optimization - batching many events into one round-trip. - */ private async flushLocalBuffer() { if (this.isFlushing || this.pendingEvents.length === 0) { return; @@ -371,7 +304,6 @@ return added this.isFlushing = true; - // Grab current pending events and clear buffer const eventsToFlush = this.pendingEvents; this.pendingEvents = []; @@ -379,48 +311,44 @@ return added const redis = getRedisCache(); const multi = redis.multi(); - // Add all events to the pipeline for (const pending of eventsToFlush) { this.addToMulti(multi, pending); } await multi.exec(); - // Throttled publish - just signal that events were received - // Store the last event for publishing (we only need one to signal activity) + this.flushRetryCount = 0; + const lastEvent = eventsToFlush[eventsToFlush.length - 1]; if (lastEvent) { this.scheduleThrottledPublish(lastEvent.event); } } catch (error) { - this.logger.error('Failed to flush local buffer to Redis', { + // Re-queue failed events at the front to preserve order and avoid data loss + this.pendingEvents = eventsToFlush.concat(this.pendingEvents); + + this.flushRetryCount += 1; + this.logger.warn('Failed to flush local buffer to Redis; events re-queued', { error, eventCount: eventsToFlush.length, + flushRetryCount: this.flushRetryCount, }); } finally { this.isFlushing = false; } } - /** - * Throttled publish - publishes at most once per publishThrottleMs. - * Instead of publishing every event, we just signal that events were received. - * This reduces pub/sub load from 3000/s to 1/s. - */ private scheduleThrottledPublish(event: IClickhouseEvent) { - // Always keep the latest event this.pendingPublishEvent = event; const now = Date.now(); const timeSinceLastPublish = now - this.lastPublishTime; - // If enough time has passed, publish immediately if (timeSinceLastPublish >= this.publishThrottleMs) { this.executeThrottledPublish(); return; } - // Otherwise, schedule a publish if not already scheduled if (!this.publishTimer) { const delay = this.publishThrottleMs - timeSinceLastPublish; this.publishTimer = setTimeout(() => { @@ -430,9 +358,6 @@ return added } } - /** - * Execute the throttled publish with the latest pending event. - */ private executeThrottledPublish() { if (!this.pendingPublishEvent) { return; @@ -442,17 +367,12 @@ return added this.pendingPublishEvent = null; this.lastPublishTime = Date.now(); - // Fire-and-forget publish (no multi = returns Promise) const result = publishEvent('events', 'received', transformEvent(event)); if (result instanceof Promise) { result.catch(() => {}); } } - /** - * Execute a Lua script using EVALSHA (cached) or fallback to EVAL. - * This avoids sending the entire script on every call. - */ private evalScript( multi: ReturnType, scriptName: keyof typeof this.scriptShas, @@ -463,32 +383,18 @@ return added const sha = this.scriptShas[scriptName]; if (sha) { - // Use EVALSHA with cached SHA multi.evalsha(sha, numKeys, ...args); } else { - // Fallback to EVAL and try to reload script multi.eval(scriptContent, numKeys, ...args); this.logger.warn(`Script ${scriptName} not loaded, using EVAL fallback`); - // Attempt to reload scripts in background this.loadScripts(); } } - /** - * Process the Redis buffer - simplified version. - * - * Simply: - * 1. Fetch events from the queue (up to batchSize) - * 2. Parse and sort them - * 3. Insert into ClickHouse in chunks - * 4. Publish saved events - * 5. Clean up processed events from queue - */ async processBuffer() { const redis = getRedisCache(); try { - // Fetch events from queue const queueEvents = await redis.lrange( this.queueKey, 0, @@ -500,7 +406,6 @@ return added return; } - // Parse events const eventsToClickhouse: IClickhouseEvent[] = []; for (const eventStr of queueEvents) { const event = getSafeJson(eventStr); @@ -514,14 +419,12 @@ return added return; } - // Sort events by creation time eventsToClickhouse.sort( (a, b) => new Date(a.created_at || 0).getTime() - new Date(b.created_at || 0).getTime(), ); - // Insert events into ClickHouse in chunks this.logger.info('Inserting events into ClickHouse', { totalEvents: eventsToClickhouse.length, chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize), @@ -535,14 +438,12 @@ return added }); } - // Publish "saved" events const pubMulti = getRedisPub().multi(); for (const event of eventsToClickhouse) { await publishEvent('events', 'saved', transformEvent(event), pubMulti); } await pubMulti.exec(); - // Clean up processed events from queue await redis .multi() .ltrim(this.queueKey, queueEvents.length, -1) @@ -558,9 +459,6 @@ return added } } - /** - * Retrieve the latest screen_view event for a given session or profile - */ public async getLastScreenView( params: | { @@ -604,13 +502,6 @@ return added }); } - /** - * Track active visitors using ZADD only. - * - * Optimization: Removed redundant heartbeat SET key. - * The ZADD score (timestamp) already tracks when a visitor was last seen. - * We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors. - */ private incrementActiveVisitorCount( multi: ReturnType, projectId: string, @@ -618,7 +509,6 @@ return added ) { const now = Date.now(); const zsetKey = `live:visitors:${projectId}`; - // Only ZADD - the score is the timestamp, no need for separate heartbeat key return multi.zadd(zsetKey, now, profileId); } From a672b73947f875560dd7961a7c86318e0a135c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Wed, 11 Mar 2026 23:28:20 +0100 Subject: [PATCH 5/7] wip --- apps/api/src/controllers/live.controller.ts | 96 +++++++++--------- .../src/components/events/event-listener.tsx | 37 ++----- .../src/components/events/table/index.tsx | 9 +- .../onboarding/onboarding-verify-listener.tsx | 32 ++---- .../realtime/realtime-active-sessions.tsx | 98 ++++++++----------- ...ationId.$projectId.events._tabs.events.tsx | 2 +- ...pp.$organizationId.$projectId.realtime.tsx | 23 +++-- .../_steps.onboarding.$projectId.verify.tsx | 28 ++---- packages/db/src/buffers/event-buffer.ts | 96 +++++++----------- packages/redis/publisher.ts | 3 +- packages/trpc/src/routers/realtime.ts | 50 ++++------ 11 files changed, 196 insertions(+), 278 deletions(-) diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index cd7afe914..332968ad6 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -1,16 +1,13 @@ -import type { FastifyRequest } from 'fastify'; -import superjson from 'superjson'; - import type { WebSocket } from '@fastify/websocket'; -import { - eventBuffer, - getProfileById, - transformMinimalEvent, -} from '@openpanel/db'; +import { eventBuffer } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { subscribeToPublishedEvent } from '@openpanel/redis'; +import { + psubscribeToPublishedEvent, + subscribeToPublishedEvent, +} from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; +import type { FastifyRequest } from 'fastify'; export function wsVisitors( socket: WebSocket, @@ -18,19 +15,38 @@ export function wsVisitors( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; - const unsubscribe = subscribeToPublishedEvent('events', 'saved', (event) => { - if (event?.projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); + const sendCount = () => { + eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { + socket.send(String(count)); + }); + }; + + const unsubscribe = subscribeToPublishedEvent( + 'events', + 'batch', + ({ projectId }) => { + if (projectId === params.projectId) { + sendCount(); + } } - }); + ); + + const punsubscribe = psubscribeToPublishedEvent( + '__keyevent@0__:expired', + (key) => { + const [, , projectId] = key.split(':'); + if (projectId === params.projectId) { + sendCount(); + } + } + ); socket.on('close', () => { unsubscribe(); + punsubscribe(); }); } @@ -42,18 +58,10 @@ export async function wsProjectEvents( }; Querystring: { token?: string; - type?: 'saved' | 'received'; }; - }>, + }> ) { - const { params, query } = req; - const type = query.type || 'saved'; - - if (!['saved', 'received'].includes(type)) { - socket.send('Invalid type'); - socket.close(); - return; - } + const { params } = req; const userId = req.session?.userId; if (!userId) { @@ -67,24 +75,20 @@ export async function wsProjectEvents( projectId: params.projectId, }); + if (!access) { + socket.send('No access'); + socket.close(); + return; + } + const unsubscribe = subscribeToPublishedEvent( 'events', - type, - async (event) => { - if (event.projectId === params.projectId) { - const profile = await getProfileById(event.profileId, event.projectId); - socket.send( - superjson.stringify( - access - ? { - ...event, - profile, - } - : transformMinimalEvent(event), - ), - ); + 'batch', + ({ projectId, count }) => { + if (projectId === params.projectId) { + socket.send(setSuperJson({ count })); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -96,7 +100,7 @@ export async function wsProjectNotifications( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -123,9 +127,9 @@ export async function wsProjectNotifications( 'created', (notification) => { if (notification.projectId === params.projectId) { - socket.send(superjson.stringify(notification)); + socket.send(setSuperJson(notification)); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -137,7 +141,7 @@ export async function wsOrganizationEvents( Params: { organizationId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -164,7 +168,7 @@ export async function wsOrganizationEvents( 'subscription_updated', (message) => { socket.send(setSuperJson(message)); - }, + } ); socket.on('close', () => unsubscribe()); diff --git a/apps/start/src/components/events/event-listener.tsx b/apps/start/src/components/events/event-listener.tsx index defabb7d1..08df9b61d 100644 --- a/apps/start/src/components/events/event-listener.tsx +++ b/apps/start/src/components/events/event-listener.tsx @@ -1,3 +1,4 @@ +import { AnimatedNumber } from '../animated-number'; import { Tooltip, TooltipContent, @@ -8,71 +9,53 @@ import { useDebounceState } from '@/hooks/use-debounce-state'; import useWS from '@/hooks/use-ws'; import { cn } from '@/utils/cn'; -import type { IServiceEvent, IServiceEventMinimal } from '@openpanel/db'; -import { useParams } from '@tanstack/react-router'; -import { AnimatedNumber } from '../animated-number'; - export default function EventListener({ onRefresh, }: { onRefresh: () => void; }) { - const params = useParams({ - strict: false, - }); const { projectId } = useAppParams(); const counter = useDebounceState(0, 1000); - useWS( + useWS<{ count: number }>( `/live/events/${projectId}`, - (event) => { - if (event) { - const isProfilePage = !!params?.profileId; - if (isProfilePage) { - const profile = 'profile' in event ? event.profile : null; - if (profile?.id === params?.profileId) { - counter.set((prev) => prev + 1); - } - return; - } - - counter.set((prev) => prev + 1); - } + ({ count }) => { + counter.set((prev) => prev + count); }, { debounce: { delay: 1000, maxWait: 5000, }, - }, + } ); return ( diff --git a/apps/start/src/components/events/table/index.tsx b/apps/start/src/components/events/table/index.tsx index 300713997..95ed8e2d7 100644 --- a/apps/start/src/components/events/table/index.tsx +++ b/apps/start/src/components/events/table/index.tsx @@ -35,6 +35,7 @@ type Props = { >, unknown >; + showEventListener?: boolean; }; const LOADING_DATA = [{}, {}, {}, {}, {}, {}, {}, {}, {}] as IServiceEvent[]; @@ -215,7 +216,7 @@ const VirtualizedEventsTable = ({ ); }; -export const EventsTable = ({ query }: Props) => { +export const EventsTable = ({ query, showEventListener = false }: Props) => { const { isLoading } = query; const columns = useColumns(); @@ -272,7 +273,7 @@ export const EventsTable = ({ query }: Props) => { return ( <> - +
{ function EventsTableToolbar({ query, table, + showEventListener, }: { query: Props['query']; table: Table; + showEventListener: boolean; }) { const { projectId } = useAppParams(); const [startDate, setStartDate] = useQueryState( @@ -305,7 +308,7 @@ function EventsTableToolbar({ return (
- query.refetch()} /> + {showEventListener && query.refetch()} />} - + )} + + +
+ {createdAt.toLocaleTimeString()} +
+
+
+ ); } diff --git a/apps/start/src/components/events/table/columns.tsx b/apps/start/src/components/events/table/columns.tsx index 203d4f16c..b14fde859 100644 --- a/apps/start/src/components/events/table/columns.tsx +++ b/apps/start/src/components/events/table/columns.tsx @@ -1,15 +1,14 @@ +import type { IServiceEvent } from '@openpanel/db'; +import type { ColumnDef } from '@tanstack/react-table'; +import { ColumnCreatedAt } from '@/components/column-created-at'; import { EventIcon } from '@/components/events/event-icon'; import { ProjectLink } from '@/components/links'; +import { ProfileAvatar } from '@/components/profiles/profile-avatar'; import { SerieIcon } from '@/components/report-chart/common/serie-icon'; +import { KeyValueGrid } from '@/components/ui/key-value-grid'; import { useNumber } from '@/hooks/use-numer-formatter'; import { pushModal } from '@/modals'; import { getProfileName } from '@/utils/getters'; -import type { ColumnDef } from '@tanstack/react-table'; - -import { ColumnCreatedAt } from '@/components/column-created-at'; -import { ProfileAvatar } from '@/components/profiles/profile-avatar'; -import { KeyValueGrid } from '@/components/ui/key-value-grid'; -import type { IServiceEvent } from '@openpanel/db'; export function useColumns() { const number = useNumber(); @@ -28,17 +27,24 @@ export function useColumns() { accessorKey: 'name', header: 'Name', cell({ row }) { - const { name, path, duration, properties, revenue } = row.original; + const { name, path, revenue } = row.original; + const fullTitle = + name === 'screen_view' + ? path + : name === 'revenue' && revenue + ? `${name} (${number.currency(revenue / 100)})` + : name.replace(/_/g, ' '); + const renderName = () => { if (name === 'screen_view') { if (path.includes('/')) { - return {path}; + return path; } return ( <> Screen: - {path} + {path} ); } @@ -50,38 +56,27 @@ export function useColumns() { return name.replace(/_/g, ' '); }; - const renderDuration = () => { - if (name === 'screen_view') { - return ( - - {number.shortWithUnit(duration / 1000, 'min')} - - ); - } - - return null; - }; - return ( -
+
- + - {renderDuration()}
); @@ -107,8 +101,8 @@ export function useColumns() { if (profile) { return ( {getProfileName(profile)} @@ -119,8 +113,8 @@ export function useColumns() { if (profileId && profileId !== deviceId) { return ( Unknown @@ -130,8 +124,8 @@ export function useColumns() { if (deviceId) { return ( Anonymous @@ -152,10 +146,10 @@ export function useColumns() { const { sessionId } = row.original; return ( - {sessionId.slice(0,6)} + {sessionId.slice(0, 6)} ); }, @@ -175,7 +169,7 @@ export function useColumns() { cell({ row }) { const { country, city } = row.original; return ( -
+
{city}
@@ -189,7 +183,7 @@ export function useColumns() { cell({ row }) { const { os } = row.original; return ( -
+
{os}
@@ -203,7 +197,7 @@ export function useColumns() { cell({ row }) { const { browser } = row.original; return ( -
+
{browser}
@@ -221,14 +215,14 @@ export function useColumns() { const { properties } = row.original; const filteredProperties = Object.fromEntries( Object.entries(properties || {}).filter( - ([key]) => !key.startsWith('__'), - ), + ([key]) => !key.startsWith('__') + ) ); const items = Object.entries(filteredProperties); const limit = 2; const data = items.slice(0, limit).map(([key, value]) => ({ name: key, - value: value, + value, })); if (items.length > limit) { data.push({ diff --git a/biome.json b/biome.json index 9e7b023e8..9f718ec12 100644 --- a/biome.json +++ b/biome.json @@ -71,7 +71,8 @@ "noDangerouslySetInnerHtml": "off" }, "complexity": { - "noForEach": "off" + "noForEach": "off", + "noExcessiveCognitiveComplexity": "off" } } }, diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 50600c704..178f94545 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -2,42 +2,8 @@ import { getRedisCache } from '@openpanel/redis'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { ch } from '../clickhouse/client'; -// Mock transformEvent to avoid circular dependency with buffers -> services -> buffers -vi.mock('../services/event.service', () => ({ - transformEvent: (event: any) => ({ - id: event.id ?? 'id', - name: event.name, - deviceId: event.device_id, - profileId: event.profile_id, - projectId: event.project_id, - sessionId: event.session_id, - properties: event.properties ?? {}, - createdAt: new Date(event.created_at ?? Date.now()), - country: event.country, - city: event.city, - region: event.region, - longitude: event.longitude, - latitude: event.latitude, - os: event.os, - osVersion: event.os_version, - browser: event.browser, - browserVersion: event.browser_version, - device: event.device, - brand: event.brand, - model: event.model, - duration: event.duration ?? 0, - path: event.path ?? '', - origin: event.origin ?? '', - referrer: event.referrer, - referrerName: event.referrer_name, - referrerType: event.referrer_type, - meta: event.meta, - importedAt: undefined, - sdkName: event.sdk_name, - sdkVersion: event.sdk_version, - profile: event.profile, - }), -})); +// Break circular dep: event-buffer -> event.service -> buffers/index -> EventBuffer +vi.mock('../services/event.service', () => ({})); import { EventBuffer } from './event-buffer'; @@ -68,19 +34,16 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - // Get initial count const initialCount = await eventBuffer.getBufferSize(); - // Add event and flush (events are micro-batched) eventBuffer.add(event); await eventBuffer.flush(); - // Buffer counter should increase by 1 const newCount = await eventBuffer.getBufferSize(); expect(newCount).toBe(initialCount + 1); }); - it('adds multiple screen_views - moves previous to buffer with duration', async () => { + it('adds screen_view directly to buffer queue', async () => { const t0 = Date.now(); const sessionId = 'session_1'; @@ -100,63 +63,23 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 1000).toISOString(), } as any; - const view3 = { - project_id: 'p1', - profile_id: 'u1', - session_id: sessionId, - name: 'screen_view', - created_at: new Date(t0 + 3000).toISOString(), - } as any; - - // Add first screen_view const count1 = await eventBuffer.getBufferSize(); + eventBuffer.add(view1); await eventBuffer.flush(); - // Should be stored as "last" but NOT in queue yet + // screen_view goes directly to buffer const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1); // No change in buffer - - // Last screen_view should be retrievable - const last1 = await eventBuffer.getLastScreenView({ - projectId: 'p1', - sessionId: sessionId, - }); - expect(last1).not.toBeNull(); - expect(last1!.createdAt.toISOString()).toBe(view1.created_at); + expect(count2).toBe(count1 + 1); - // Add second screen_view eventBuffer.add(view2); await eventBuffer.flush(); - // Now view1 should be in buffer const count3 = await eventBuffer.getBufferSize(); - expect(count3).toBe(count1 + 1); - - // view2 should now be the "last" - const last2 = await eventBuffer.getLastScreenView({ - projectId: 'p1', - sessionId: sessionId, - }); - expect(last2!.createdAt.toISOString()).toBe(view2.created_at); - - // Add third screen_view - eventBuffer.add(view3); - await eventBuffer.flush(); - - // Now view2 should also be in buffer - const count4 = await eventBuffer.getBufferSize(); - expect(count4).toBe(count1 + 2); - - // view3 should now be the "last" - const last3 = await eventBuffer.getLastScreenView({ - projectId: 'p1', - sessionId: sessionId, - }); - expect(last3!.createdAt.toISOString()).toBe(view3.created_at); + expect(count3).toBe(count1 + 2); }); - it('adds session_end - moves last screen_view and session_end to buffer', async () => { + it('adds session_end directly to buffer queue', async () => { const t0 = Date.now(); const sessionId = 'session_2'; @@ -176,134 +99,36 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 5000).toISOString(), } as any; - // Add screen_view const count1 = await eventBuffer.getBufferSize(); - eventBuffer.add(view); - await eventBuffer.flush(); - - // Should be stored as "last", not in buffer yet - const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1); - - // Add session_end - eventBuffer.add(sessionEnd); - await eventBuffer.flush(); - - // Both should now be in buffer (+2) - const count3 = await eventBuffer.getBufferSize(); - expect(count3).toBe(count1 + 2); - - // Last screen_view should be cleared - const last = await eventBuffer.getLastScreenView({ - projectId: 'p2', - sessionId: sessionId, - }); - expect(last).toBeNull(); - }); - - it('session_end with no previous screen_view - only adds session_end to buffer', async () => { - const sessionId = 'session_3'; - - const sessionEnd = { - project_id: 'p3', - profile_id: 'u3', - session_id: sessionId, - name: 'session_end', - created_at: new Date().toISOString(), - } as any; - const count1 = await eventBuffer.getBufferSize(); + eventBuffer.add(view); eventBuffer.add(sessionEnd); await eventBuffer.flush(); - // Only session_end should be in buffer (+1) const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1 + 1); - }); - - it('gets last screen_view by profileId', async () => { - const view = { - project_id: 'p4', - profile_id: 'u4', - session_id: 'session_4', - name: 'screen_view', - path: '/home', - created_at: new Date().toISOString(), - } as any; - - eventBuffer.add(view); - await eventBuffer.flush(); - - // Query by profileId - const result = await eventBuffer.getLastScreenView({ - projectId: 'p4', - profileId: 'u4', - }); - - expect(result).not.toBeNull(); - expect(result!.name).toBe('screen_view'); - expect(result!.path).toBe('/home'); - }); - - it('gets last screen_view by sessionId', async () => { - const sessionId = 'session_5'; - const view = { - project_id: 'p5', - profile_id: 'u5', - session_id: sessionId, - name: 'screen_view', - path: '/about', - created_at: new Date().toISOString(), - } as any; - - eventBuffer.add(view); - await eventBuffer.flush(); - - // Query by sessionId - const result = await eventBuffer.getLastScreenView({ - projectId: 'p5', - sessionId: sessionId, - }); - - expect(result).not.toBeNull(); - expect(result!.name).toBe('screen_view'); - expect(result!.path).toBe('/about'); - }); - - it('returns null for non-existent last screen_view', async () => { - const result = await eventBuffer.getLastScreenView({ - projectId: 'p_nonexistent', - profileId: 'u_nonexistent', - }); - - expect(result).toBeNull(); + expect(count2).toBe(count1 + 2); }); it('gets buffer count correctly', async () => { - // Initially 0 expect(await eventBuffer.getBufferSize()).toBe(0); - // Add regular event eventBuffer.add({ project_id: 'p6', name: 'event1', created_at: new Date().toISOString(), } as any); await eventBuffer.flush(); - expect(await eventBuffer.getBufferSize()).toBe(1); - // Add another regular event eventBuffer.add({ project_id: 'p6', name: 'event2', created_at: new Date().toISOString(), } as any); await eventBuffer.flush(); - expect(await eventBuffer.getBufferSize()).toBe(2); - // Add screen_view (not counted until flushed) + // screen_view also goes directly to buffer eventBuffer.add({ project_id: 'p6', profile_id: 'u6', @@ -312,21 +137,6 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any); await eventBuffer.flush(); - - // Still 2 (screen_view is pending) - expect(await eventBuffer.getBufferSize()).toBe(2); - - // Add another screen_view (first one gets flushed) - eventBuffer.add({ - project_id: 'p6', - profile_id: 'u6', - session_id: 'session_6', - name: 'screen_view', - created_at: new Date(Date.now() + 1000).toISOString(), - } as any); - await eventBuffer.flush(); - - // Now 3 (2 regular + 1 flushed screen_view) expect(await eventBuffer.getBufferSize()).toBe(3); }); @@ -355,14 +165,12 @@ describe('EventBuffer', () => { await eventBuffer.processBuffer(); - // Should insert both events expect(insertSpy).toHaveBeenCalled(); const callArgs = insertSpy.mock.calls[0]![0]; expect(callArgs.format).toBe('JSONEachRow'); expect(callArgs.table).toBe('events'); expect(Array.isArray(callArgs.values)).toBe(true); - // Buffer should be empty after processing expect(await eventBuffer.getBufferSize()).toBe(0); insertSpy.mockRestore(); @@ -373,7 +181,6 @@ describe('EventBuffer', () => { process.env.EVENT_BUFFER_CHUNK_SIZE = '2'; const eb = new EventBuffer(); - // Add 4 events for (let i = 0; i < 4; i++) { eb.add({ project_id: 'p8', @@ -389,14 +196,12 @@ describe('EventBuffer', () => { await eb.processBuffer(); - // With chunk size 2 and 4 events, should be called twice expect(insertSpy).toHaveBeenCalledTimes(2); const call1Values = insertSpy.mock.calls[0]![0].values as any[]; const call2Values = insertSpy.mock.calls[1]![0].values as any[]; expect(call1Values.length).toBe(2); expect(call2Values.length).toBe(2); - // Restore if (prev === undefined) delete process.env.EVENT_BUFFER_CHUNK_SIZE; else process.env.EVENT_BUFFER_CHUNK_SIZE = prev; @@ -418,126 +223,54 @@ describe('EventBuffer', () => { expect(count).toBeGreaterThanOrEqual(1); }); - it('handles multiple sessions independently', async () => { + it('handles multiple sessions independently — all events go to buffer', async () => { const t0 = Date.now(); + const count1 = await eventBuffer.getBufferSize(); - // Session 1 - const view1a = { + eventBuffer.add({ project_id: 'p10', profile_id: 'u10', session_id: 'session_10a', name: 'screen_view', created_at: new Date(t0).toISOString(), - } as any; - - const view1b = { - project_id: 'p10', - profile_id: 'u10', - session_id: 'session_10a', - name: 'screen_view', - created_at: new Date(t0 + 1000).toISOString(), - } as any; - - // Session 2 - const view2a = { + } as any); + eventBuffer.add({ project_id: 'p10', profile_id: 'u11', session_id: 'session_10b', name: 'screen_view', created_at: new Date(t0).toISOString(), - } as any; - - const view2b = { + } as any); + eventBuffer.add({ + project_id: 'p10', + profile_id: 'u10', + session_id: 'session_10a', + name: 'screen_view', + created_at: new Date(t0 + 1000).toISOString(), + } as any); + eventBuffer.add({ project_id: 'p10', profile_id: 'u11', session_id: 'session_10b', name: 'screen_view', created_at: new Date(t0 + 2000).toISOString(), - } as any; - - eventBuffer.add(view1a); - eventBuffer.add(view2a); - eventBuffer.add(view1b); // Flushes view1a - eventBuffer.add(view2b); // Flushes view2a + } as any); await eventBuffer.flush(); - // Should have 2 events in buffer (one from each session) - expect(await eventBuffer.getBufferSize()).toBe(2); - - // Each session should have its own "last" screen_view - const last1 = await eventBuffer.getLastScreenView({ - projectId: 'p10', - sessionId: 'session_10a', - }); - expect(last1!.createdAt.toISOString()).toBe(view1b.created_at); - - const last2 = await eventBuffer.getLastScreenView({ - projectId: 'p10', - sessionId: 'session_10b', - }); - expect(last2!.createdAt.toISOString()).toBe(view2b.created_at); + // All 4 events are in buffer directly + expect(await eventBuffer.getBufferSize()).toBe(count1 + 4); }); - it('screen_view without session_id goes directly to buffer', async () => { - const view = { + it('bulk adds events to buffer', async () => { + const events = Array.from({ length: 5 }, (_, i) => ({ project_id: 'p11', - profile_id: 'u11', - name: 'screen_view', - created_at: new Date().toISOString(), - } as any; - - const count1 = await eventBuffer.getBufferSize(); - eventBuffer.add(view); - await eventBuffer.flush(); + name: `event${i}`, + created_at: new Date(Date.now() + i).toISOString(), + })) as any[]; - // Should go directly to buffer (no session_id) - const count2 = await eventBuffer.getBufferSize(); - expect(count2).toBe(count1 + 1); - }); - - it('updates last screen_view when new one arrives from same profile but different session', async () => { - const t0 = Date.now(); - - const view1 = { - project_id: 'p12', - profile_id: 'u12', - session_id: 'session_12a', - name: 'screen_view', - path: '/page1', - created_at: new Date(t0).toISOString(), - } as any; - - const view2 = { - project_id: 'p12', - profile_id: 'u12', - session_id: 'session_12b', // Different session! - name: 'screen_view', - path: '/page2', - created_at: new Date(t0 + 1000).toISOString(), - } as any; - - eventBuffer.add(view1); - eventBuffer.add(view2); + eventBuffer.bulkAdd(events); await eventBuffer.flush(); - // Both sessions should have their own "last" - const lastSession1 = await eventBuffer.getLastScreenView({ - projectId: 'p12', - sessionId: 'session_12a', - }); - expect(lastSession1!.path).toBe('/page1'); - - const lastSession2 = await eventBuffer.getLastScreenView({ - projectId: 'p12', - sessionId: 'session_12b', - }); - expect(lastSession2!.path).toBe('/page2'); - - // Profile should have the latest one - const lastProfile = await eventBuffer.getLastScreenView({ - projectId: 'p12', - profileId: 'u12', - }); - expect(lastProfile!.path).toBe('/page2'); + expect(await eventBuffer.getBufferSize()).toBe(5); }); }); diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 306b69d1b..6b5dc8ca7 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -5,32 +5,9 @@ import { publishEvent, } from '@openpanel/redis'; import { ch } from '../clickhouse/client'; -import { - type IClickhouseEvent, - type IServiceEvent, - transformEvent, -} from '../services/event.service'; +import { type IClickhouseEvent } from '../services/event.service'; import { BaseBuffer } from './base-buffer'; -/** - * Event Buffer - * - * 1. All events go into a single list buffer (event_buffer:queue) - * 2. screen_view events are handled specially: - * - Store current screen_view as "last" for the session - * - When a new screen_view arrives, flush the previous one with calculated duration - * 3. session_end events: - * - Retrieve the last screen_view (don't modify it) - * - Push both screen_view and session_end to buffer - * 4. Flush: Process all events from the list buffer - */ -interface PendingEvent { - event: IClickhouseEvent; - eventJson: string; - eventWithTimestamp?: string; - type: 'regular' | 'screen_view' | 'session_end'; -} - export class EventBuffer extends BaseBuffer { private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) @@ -46,7 +23,7 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) : 100; - private pendingEvents: PendingEvent[] = []; + private pendingEvents: IClickhouseEvent[] = []; private flushTimer: ReturnType | null = null; private isFlushing = false; /** Tracks consecutive flush failures for observability; reset on success. */ @@ -59,100 +36,6 @@ export class EventBuffer extends BaseBuffer { private queueKey = 'event_buffer:queue'; protected bufferCounterKey = 'event_buffer:total_count'; - private scriptShas: { - addScreenView?: string; - addSessionEnd?: string; - } = {}; - - private getLastScreenViewKeyBySession(sessionId: string) { - return `event_buffer:last_screen_view:session:${sessionId}`; - } - - private getLastScreenViewKeyByProfile(projectId: string, profileId: string) { - return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`; - } - - /** - * Lua script for screen_view addition. - * Uses GETDEL for atomic get-and-delete to prevent race conditions. - * - * KEYS[1] = last screen_view key (by session) - * KEYS[2] = last screen_view key (by profile, may be empty) - * KEYS[3] = queue key - * KEYS[4] = buffer counter key - * ARGV[1] = new event with timestamp as JSON: {"event": {...}, "ts": 123456} - * ARGV[2] = TTL for last screen_view (1 hour) - */ - private readonly addScreenViewScript = ` -local sessionKey = KEYS[1] -local profileKey = KEYS[2] -local queueKey = KEYS[3] -local counterKey = KEYS[4] -local newEventData = ARGV[1] -local ttl = tonumber(ARGV[2]) - -local previousEventData = redis.call("GETDEL", sessionKey) - -redis.call("SET", sessionKey, newEventData, "EX", ttl) - -if profileKey and profileKey ~= "" then - redis.call("SET", profileKey, newEventData, "EX", ttl) -end - -if previousEventData then - local prev = cjson.decode(previousEventData) - local curr = cjson.decode(newEventData) - - if prev.ts and curr.ts then - prev.event.duration = math.max(0, curr.ts - prev.ts) - end - - redis.call("RPUSH", queueKey, cjson.encode(prev.event)) - redis.call("INCR", counterKey) - return 1 -end - -return 0 -`; - - /** - * Lua script for session_end. - * Uses GETDEL to atomically retrieve and delete the last screen_view. - * - * KEYS[1] = last screen_view key (by session) - * KEYS[2] = last screen_view key (by profile, may be empty) - * KEYS[3] = queue key - * KEYS[4] = buffer counter key - * ARGV[1] = session_end event JSON - */ - private readonly addSessionEndScript = ` -local sessionKey = KEYS[1] -local profileKey = KEYS[2] -local queueKey = KEYS[3] -local counterKey = KEYS[4] -local sessionEndJson = ARGV[1] - -local previousEventData = redis.call("GETDEL", sessionKey) -local added = 0 - -if previousEventData then - local prev = cjson.decode(previousEventData) - redis.call("RPUSH", queueKey, cjson.encode(prev.event)) - redis.call("INCR", counterKey) - added = added + 1 -end - -redis.call("RPUSH", queueKey, sessionEndJson) -redis.call("INCR", counterKey) -added = added + 1 - -if profileKey and profileKey ~= "" then - redis.call("DEL", profileKey) -end - -return added -`; - constructor() { super({ name: 'event', @@ -160,27 +43,6 @@ return added await this.processBuffer(); }, }); - this.loadScripts(); - } - - private async loadScripts() { - try { - const redis = getRedisCache(); - const [screenViewSha, sessionEndSha] = await Promise.all([ - redis.script('LOAD', this.addScreenViewScript), - redis.script('LOAD', this.addSessionEndScript), - ]); - - this.scriptShas.addScreenView = screenViewSha as string; - this.scriptShas.addSessionEnd = sessionEndSha as string; - - this.logger.info('Loaded Lua scripts into Redis', { - addScreenView: this.scriptShas.addScreenView, - addSessionEnd: this.scriptShas.addSessionEnd, - }); - } catch (error) { - this.logger.error('Failed to load Lua scripts', { error }); - } } bulkAdd(events: IClickhouseEvent[]) { @@ -190,30 +52,7 @@ return added } add(event: IClickhouseEvent) { - const eventJson = JSON.stringify(event); - - let type: PendingEvent['type'] = 'regular'; - let eventWithTimestamp: string | undefined; - - if (event.session_id && event.name === 'screen_view') { - type = 'screen_view'; - const timestamp = new Date(event.created_at || Date.now()).getTime(); - eventWithTimestamp = JSON.stringify({ - event: event, - ts: timestamp, - }); - } else if (event.session_id && event.name === 'session_end') { - type = 'session_end'; - } - - const pendingEvent: PendingEvent = { - event, - eventJson, - eventWithTimestamp, - type, - }; - - this.pendingEvents.push(pendingEvent); + this.pendingEvents.push(event); if (this.pendingEvents.length >= this.microBatchMaxSize) { this.flushLocalBuffer(); @@ -228,57 +67,6 @@ return added } } - private addToMulti(multi: ReturnType, pending: PendingEvent) { - const { event, eventJson, eventWithTimestamp, type } = pending; - - if (type === 'screen_view' && event.session_id) { - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) - : ''; - - this.evalScript( - multi, - 'addScreenView', - this.addScreenViewScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventWithTimestamp!, - '3600', - ); - } else if (type === 'session_end' && event.session_id) { - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) - : ''; - - this.evalScript( - multi, - 'addSessionEnd', - this.addSessionEndScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventJson, - ); - } else { - multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); - } - - if (event.profile_id) { - this.incrementActiveVisitorCount( - multi, - event.project_id, - event.profile_id, - ); - } - } - public async flush() { if (this.flushTimer) { clearTimeout(this.flushTimer); @@ -301,9 +89,17 @@ return added const redis = getRedisCache(); const multi = redis.multi(); - for (const pending of eventsToFlush) { - this.addToMulti(multi, pending); + for (const event of eventsToFlush) { + multi.rpush(this.queueKey, JSON.stringify(event)); + if (event.profile_id) { + this.incrementActiveVisitorCount( + multi, + event.project_id, + event.profile_id, + ); + } } + multi.incrby(this.bufferCounterKey, eventsToFlush.length); await multi.exec(); @@ -314,11 +110,14 @@ return added this.pendingEvents = eventsToFlush.concat(this.pendingEvents); this.flushRetryCount += 1; - this.logger.warn('Failed to flush local buffer to Redis; events re-queued', { - error, - eventCount: eventsToFlush.length, - flushRetryCount: this.flushRetryCount, - }); + this.logger.warn( + 'Failed to flush local buffer to Redis; events re-queued', + { + error, + eventCount: eventsToFlush.length, + flushRetryCount: this.flushRetryCount, + }, + ); } finally { this.isFlushing = false; // Events may have accumulated while we were flushing; schedule another flush if needed @@ -331,24 +130,6 @@ return added } } - private evalScript( - multi: ReturnType, - scriptName: keyof typeof this.scriptShas, - scriptContent: string, - numKeys: number, - ...args: (string | number)[] - ) { - const sha = this.scriptShas[scriptName]; - - if (sha) { - multi.evalsha(sha, numKeys, ...args); - } else { - multi.eval(scriptContent, numKeys, ...args); - this.logger.warn(`Script ${scriptName} not loaded, using EVAL fallback`); - this.loadScripts(); - } - } - async processBuffer() { const redis = getRedisCache(); @@ -398,7 +179,10 @@ return added const countByProject = new Map(); for (const event of eventsToClickhouse) { - countByProject.set(event.project_id, (countByProject.get(event.project_id) ?? 0) + 1); + countByProject.set( + event.project_id, + (countByProject.get(event.project_id) ?? 0) + 1, + ); } for (const [projectId, count] of countByProject) { publishEvent('events', 'batch', { projectId, count }); @@ -419,42 +203,6 @@ return added } } - public async getLastScreenView( - params: - | { - sessionId: string; - } - | { - projectId: string; - profileId: string; - }, - ): Promise { - const redis = getRedisCache(); - - let lastScreenViewKey: string; - if ('sessionId' in params) { - lastScreenViewKey = this.getLastScreenViewKeyBySession(params.sessionId); - } else { - lastScreenViewKey = this.getLastScreenViewKeyByProfile( - params.projectId, - params.profileId, - ); - } - - const eventDataStr = await redis.get(lastScreenViewKey); - - if (eventDataStr) { - const eventData = getSafeJson<{ event: IClickhouseEvent; ts: number }>( - eventDataStr, - ); - if (eventData?.event) { - return transformEvent(eventData.event); - } - } - - return null; - } - public async getBufferSize() { return this.getBufferSizeWithCounter(async () => { const redis = getRedisCache(); diff --git a/packages/db/src/services/event.service.ts b/packages/db/src/services/event.service.ts index 94df14770..7a7b4a250 100644 --- a/packages/db/src/services/event.service.ts +++ b/packages/db/src/services/event.service.ts @@ -168,7 +168,6 @@ export function transformEvent(event: IClickhouseEvent): IServiceEvent { device: event.device, brand: event.brand, model: event.model, - duration: event.duration, path: event.path, origin: event.origin, referrer: event.referrer, @@ -216,7 +215,7 @@ export interface IServiceEvent { device?: string | undefined; brand?: string | undefined; model?: string | undefined; - duration: number; + duration?: number; path: string; origin: string; referrer: string | undefined; @@ -247,7 +246,7 @@ export interface IServiceEventMinimal { browser?: string | undefined; device?: string | undefined; brand?: string | undefined; - duration: number; + duration?: number; path: string; origin: string; referrer: string | undefined; @@ -379,7 +378,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) { device: payload.device ?? '', brand: payload.brand ?? '', model: payload.model ?? '', - duration: payload.duration, + duration: payload.duration ?? 0, referrer: payload.referrer ?? '', referrer_name: payload.referrerName ?? '', referrer_type: payload.referrerType ?? '', @@ -477,7 +476,7 @@ export async function getEventList(options: GetEventListOptions) { sb.where.cursor = `created_at < ${sqlstring.escape(formatClickhouseDate(cursor))}`; } - if (!cursor && !(startDate && endDate)) { + if (!(cursor || (startDate && endDate))) { sb.where.cursorWindow = `created_at >= toDateTime64(${sqlstring.escape(formatClickhouseDate(new Date()))}, 3) - INTERVAL ${safeDateIntervalInDays} DAY`; } @@ -562,9 +561,6 @@ export async function getEventList(options: GetEventListOptions) { if (select.model) { sb.select.model = 'model'; } - if (select.duration) { - sb.select.duration = 'duration'; - } if (select.path) { sb.select.path = 'path'; } @@ -771,7 +767,6 @@ class EventService { where, select, limit, - orderBy, filters, }: { projectId: string; @@ -811,7 +806,6 @@ class EventService { select.event.deviceId && 'e.device_id as device_id', select.event.name && 'e.name as name', select.event.path && 'e.path as path', - select.event.duration && 'e.duration as duration', select.event.country && 'e.country as country', select.event.city && 'e.city as city', select.event.os && 'e.os as os', @@ -896,7 +890,6 @@ class EventService { select.event.deviceId && 'e.device_id as device_id', select.event.name && 'e.name as name', select.event.path && 'e.path as path', - select.event.duration && 'e.duration as duration', select.event.country && 'e.country as country', select.event.city && 'e.city as city', select.event.os && 'e.os as os', @@ -1032,7 +1025,6 @@ class EventService { id: true, name: true, createdAt: true, - duration: true, country: true, city: true, os: true, diff --git a/packages/db/src/services/overview.service.ts b/packages/db/src/services/overview.service.ts index 17b8d900d..d62b83a3d 100644 --- a/packages/db/src/services/overview.service.ts +++ b/packages/db/src/services/overview.service.ts @@ -416,6 +416,30 @@ export class OverviewService { const where = this.getRawWhereClause('sessions', filters); const fillConfig = this.getFillConfig(interval, startDate, endDate); + // CTE: per-event screen_view durations via window function + const rawScreenViewDurationsQuery = clix(this.client, timezone) + .select([ + `${clix.toStartOf('created_at', interval as any, timezone)} AS date`, + `dateDiff('millisecond', created_at, lead(created_at, 1, created_at) OVER (PARTITION BY session_id ORDER BY created_at)) AS duration`, + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + clix.datetime(startDate, 'toDateTime'), + clix.datetime(endDate, 'toDateTime'), + ]) + .rawWhere(this.getRawWhereClause('events', filters)); + + // CTE: avg duration per date bucket + const avgDurationByDateQuery = clix(this.client, timezone) + .select([ + 'date', + 'round(avgIf(duration, duration > 0), 2) / 1000 AS avg_session_duration', + ]) + .from('raw_screen_view_durations') + .groupBy(['date']); + // Session aggregation with bounce rates const sessionAggQuery = clix(this.client, timezone) .select([ @@ -473,6 +497,8 @@ export class OverviewService { .where('date', '!=', rollupDate) ) .with('overall_unique_visitors', overallUniqueVisitorsQuery) + .with('raw_screen_view_durations', rawScreenViewDurationsQuery) + .with('avg_duration_by_date', avgDurationByDateQuery) .select<{ date: string; bounce_rate: number; @@ -489,8 +515,7 @@ export class OverviewService { 'dss.bounce_rate as bounce_rate', 'uniq(e.profile_id) AS unique_visitors', 'uniq(e.session_id) AS total_sessions', - 'round(avgIf(duration, duration > 0), 2) / 1000 AS _avg_session_duration', - 'if(isNaN(_avg_session_duration), 0, _avg_session_duration) AS avg_session_duration', + 'coalesce(dur.avg_session_duration, 0) AS avg_session_duration', 'count(*) AS total_screen_views', 'round((count(*) * 1.) / uniq(e.session_id), 2) AS views_per_session', '(SELECT unique_visitors FROM overall_unique_visitors) AS overall_unique_visitors', @@ -502,6 +527,10 @@ export class OverviewService { 'daily_session_stats AS dss', `${clix.toStartOf('e.created_at', interval as any)} = dss.date` ) + .leftJoin( + 'avg_duration_by_date AS dur', + `${clix.toStartOf('e.created_at', interval as any)} = dur.date` + ) .where('e.project_id', '=', projectId) .where('e.name', '=', 'screen_view') .where('e.created_at', 'BETWEEN', [ @@ -509,7 +538,7 @@ export class OverviewService { clix.datetime(endDate, 'toDateTime'), ]) .rawWhere(this.getRawWhereClause('events', filters)) - .groupBy(['date', 'dss.bounce_rate']) + .groupBy(['date', 'dss.bounce_rate', 'dur.avg_session_duration']) .orderBy('date', 'ASC') .fill(fillConfig.from, fillConfig.to, fillConfig.step) .transform({ diff --git a/packages/db/src/services/pages.service.ts b/packages/db/src/services/pages.service.ts index b014c4166..e3bf54318 100644 --- a/packages/db/src/services/pages.service.ts +++ b/packages/db/src/services/pages.service.ts @@ -52,6 +52,24 @@ export class PagesService { .where('created_at', '>=', clix.exp('now() - INTERVAL 30 DAY')) .groupBy(['origin', 'path']); + // CTE: compute screen_view durations via window function (leadInFrame gives next event's timestamp) + const screenViewDurationsCte = clix(this.client, timezone) + .select([ + 'project_id', + 'session_id', + 'path', + 'origin', + `dateDiff('millisecond', created_at, lead(created_at, 1, created_at) OVER (PARTITION BY session_id ORDER BY created_at)) AS duration`, + ]) + .from(TABLE_NAMES.events, false) + .where('project_id', '=', projectId) + .where('name', '=', 'screen_view') + .where('path', '!=', '') + .where('created_at', 'BETWEEN', [ + clix.datetime(startDate, 'toDateTime'), + clix.datetime(endDate, 'toDateTime'), + ]); + // Pre-filtered sessions subquery for better performance const sessionsSubquery = clix(this.client, timezone) .select(['id', 'project_id', 'is_bounce']) @@ -66,6 +84,7 @@ export class PagesService { // Main query: aggregate events and calculate bounce rate from pre-filtered sessions const query = clix(this.client, timezone) .with('page_titles', titlesCte) + .with('screen_view_durations', screenViewDurationsCte) .select([ 'e.origin as origin', 'e.path as path', @@ -74,25 +93,18 @@ export class PagesService { 'count() as pageviews', 'round(avg(e.duration) / 1000 / 60, 2) as avg_duration', `round( - (uniqIf(e.session_id, s.is_bounce = 1) * 100.0) / - nullIf(uniq(e.session_id), 0), + (uniqIf(e.session_id, s.is_bounce = 1) * 100.0) / + nullIf(uniq(e.session_id), 0), 2 ) as bounce_rate`, ]) - .from(`${TABLE_NAMES.events} e`, false) + .from('screen_view_durations e', false) .leftJoin( sessionsSubquery, 'e.session_id = s.id AND e.project_id = s.project_id', 's' ) .leftJoin('page_titles pt', 'concat(e.origin, e.path) = pt.page_key') - .where('e.project_id', '=', projectId) - .where('e.name', '=', 'screen_view') - .where('e.path', '!=', '') - .where('e.created_at', 'BETWEEN', [ - clix.datetime(startDate, 'toDateTime'), - clix.datetime(endDate, 'toDateTime'), - ]) .when(!!search, (q) => { const term = `%${search}%`; q.whereGroup() diff --git a/packages/trpc/src/routers/chart.ts b/packages/trpc/src/routers/chart.ts index 59367ba40..cd6b07e1d 100644 --- a/packages/trpc/src/routers/chart.ts +++ b/packages/trpc/src/routers/chart.ts @@ -1,11 +1,7 @@ -import { flatten, map, pipe, prop, range, sort, uniq } from 'ramda'; -import sqlstring from 'sqlstring'; -import { z } from 'zod'; - +import { round } from '@openpanel/common'; import { - type IClickhouseProfile, - type IServiceProfile, - TABLE_NAMES, + AggregateChartEngine, + ChartEngine, ch, chQuery, clix, @@ -21,8 +17,11 @@ import { getReportById, getSelectPropertyKey, getSettingsForProject, + type IClickhouseProfile, + type IServiceProfile, onlyReportEvents, sankeyService, + TABLE_NAMES, validateShareAccess, } from '@openpanel/db'; import { @@ -33,15 +32,15 @@ import { zReportInput, zTimeInterval, } from '@openpanel/validation'; - -import { round } from '@openpanel/common'; -import { AggregateChartEngine, ChartEngine } from '@openpanel/db'; import { differenceInDays, differenceInMonths, differenceInWeeks, formatISO, } from 'date-fns'; +import { flatten, map, pipe, prop, range, sort, uniq } from 'ramda'; +import sqlstring from 'sqlstring'; +import { z } from 'zod'; import { getProjectAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { @@ -83,7 +82,7 @@ const chartProcedure = publicProcedure.use( session: ctx.session?.userId ? { userId: ctx.session.userId } : undefined, - }, + } ); if (!shareValidation.isValid) { throw TRPCAccessError('You do not have access to this share'); @@ -119,7 +118,7 @@ const chartProcedure = publicProcedure.use( report: null, }, }); - }, + } ); export const chartRouter = createTRPCRouter({ @@ -128,7 +127,7 @@ export const chartRouter = createTRPCRouter({ .input( z.object({ projectId: z.string(), - }), + }) ) .query(async ({ input: { projectId } }) => { const { timezone } = await getSettingsForProject(projectId); @@ -151,7 +150,7 @@ export const chartRouter = createTRPCRouter({ TO toStartOfDay(now()) STEP INTERVAL 1 day SETTINGS session_timezone = '${timezone}' - `, + ` ); const metricsPromise = clix(ch, timezone) @@ -185,7 +184,7 @@ export const chartRouter = createTRPCRouter({ ? Math.round( ((metrics.months_3 - metrics.months_3_prev) / metrics.months_3_prev) * - 100, + 100 ) : null; @@ -209,12 +208,12 @@ export const chartRouter = createTRPCRouter({ .input( z.object({ projectId: z.string(), - }), + }) ) .query(async ({ input: { projectId } }) => { const [events, meta] = await Promise.all([ chQuery<{ name: string; count: number }>( - `SELECT name, count(name) as count FROM ${TABLE_NAMES.event_names_mv} WHERE project_id = ${sqlstring.escape(projectId)} GROUP BY name ORDER BY count DESC, name ASC`, + `SELECT name, count(name) as count FROM ${TABLE_NAMES.event_names_mv} WHERE project_id = ${sqlstring.escape(projectId)} GROUP BY name ORDER BY count DESC, name ASC` ), getEventMetasCached(projectId), ]); @@ -238,7 +237,7 @@ export const chartRouter = createTRPCRouter({ z.object({ event: z.string().optional(), projectId: z.string(), - }), + }) ) .query(async ({ input: { projectId, event } }) => { const profiles = await clix(ch, 'UTC') @@ -252,8 +251,8 @@ export const chartRouter = createTRPCRouter({ const profileProperties = [ ...new Set( profiles.flatMap((p) => - Object.keys(p.properties).map((k) => `profile.properties.${k}`), - ), + Object.keys(p.properties).map((k) => `profile.properties.${k}`) + ) ), ]; @@ -283,7 +282,6 @@ export const chartRouter = createTRPCRouter({ }); const fixedProperties = [ - 'duration', 'revenue', 'has_profile', 'path', @@ -316,7 +314,7 @@ export const chartRouter = createTRPCRouter({ return pipe( sort((a, b) => a.length - b.length), - uniq, + uniq )(properties); }), @@ -326,9 +324,9 @@ export const chartRouter = createTRPCRouter({ event: z.string(), property: z.string(), projectId: z.string(), - }), + }) ) - .query(async ({ input: { event, property, projectId, ...input } }) => { + .query(async ({ input: { event, property, projectId } }) => { if (property === 'has_profile') { return { values: ['true', 'false'], @@ -378,7 +376,7 @@ export const chartRouter = createTRPCRouter({ .from(TABLE_NAMES.profiles) .where('project_id', '=', projectId), 'profile.id = profile_id', - 'profile', + 'profile' ); } @@ -389,8 +387,8 @@ export const chartRouter = createTRPCRouter({ (data: typeof events) => map(prop('values'), data), flatten, uniq, - sort((a, b) => a.length - b.length), - )(events), + sort((a, b) => a.length - b.length) + )(events) ); } @@ -406,8 +404,8 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) .query(async ({ input, ctx }) => { const chartInput = ctx.report @@ -448,8 +446,8 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) .query(async ({ input, ctx }) => { const chartInput = ctx.report @@ -536,12 +534,10 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) - .query(async ({ input, ctx }) => { - console.log('input', input); - + .query(({ input, ctx }) => { const chartInput = ctx.report ? { ...ctx.report, @@ -562,10 +558,10 @@ export const chartRouter = createTRPCRouter({ z.object({ shareId: z.string().optional(), id: z.string().optional(), - }), - ), + }) + ) ) - .query(async ({ input, ctx }) => { + .query(({ input, ctx }) => { const chartInput = ctx.report ? { ...ctx.report, @@ -593,7 +589,7 @@ export const chartRouter = createTRPCRouter({ range: zRange, shareId: z.string().optional(), id: z.string().optional(), - }), + }) ) .query(async ({ input, ctx }) => { const projectId = ctx.report?.projectId ?? input.projectId; @@ -647,7 +643,7 @@ export const chartRouter = createTRPCRouter({ startDate, endDate, }, - timezone, + timezone ); const diffInterval = { minute: () => differenceInDays(dates.endDate, dates.startDate), @@ -677,14 +673,14 @@ export const chartRouter = createTRPCRouter({ const usersSelect = range(0, diffInterval + 1) .map( (index) => - `groupUniqArrayIf(profile_id, x_after_cohort ${countCriteria} ${index}) AS interval_${index}_users`, + `groupUniqArrayIf(profile_id, x_after_cohort ${countCriteria} ${index}) AS interval_${index}_users` ) .join(',\n'); const countsSelect = range(0, diffInterval + 1) .map( (index) => - `length(interval_${index}_users) AS interval_${index}_user_count`, + `length(interval_${index}_users) AS interval_${index}_user_count` ) .join(',\n'); @@ -769,12 +765,10 @@ export const chartRouter = createTRPCRouter({ interval: zTimeInterval.default('day'), series: zChartSeries, breakdowns: z.record(z.string(), z.string()).optional(), - }), + }) ) .query(async ({ input }) => { - const { timezone } = await getSettingsForProject(input.projectId); const { projectId, date, series } = input; - const limit = 100; const serie = series[0]; if (!serie) { @@ -813,7 +807,7 @@ export const chartRouter = createTRPCRouter({ if (profileFields.length > 0) { // Extract top-level field names and select only what's needed const fieldsToSelect = uniq( - profileFields.map((f) => f.split('.')[0]), + profileFields.map((f) => f.split('.')[0]) ).join(', '); sb.joins.profiles = `LEFT ANY JOIN (SELECT id, ${fieldsToSelect} FROM ${TABLE_NAMES.profiles} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) as profile on profile.id = profile_id`; } @@ -836,7 +830,7 @@ export const chartRouter = createTRPCRouter({ // Fetch profile details in batches to avoid exceeding ClickHouse max_query_size const ids = profileIds.map((p) => p.profile_id).filter(Boolean); const BATCH_SIZE = 200; - const profiles = []; + const profiles: IServiceProfile[] = []; for (let i = 0; i < ids.length; i += BATCH_SIZE) { const batch = ids.slice(i, i + BATCH_SIZE); const batchProfiles = await getProfilesCached(batch, projectId); @@ -859,13 +853,13 @@ export const chartRouter = createTRPCRouter({ .optional() .default(false) .describe( - 'If true, show users who dropped off at this step. If false, show users who completed at least this step.', + 'If true, show users who dropped off at this step. If false, show users who completed at least this step.' ), funnelWindow: z.number().optional(), funnelGroup: z.string().optional(), breakdowns: z.array(z.object({ name: z.string() })).optional(), range: zRange, - }), + }) ) .query(async ({ input }) => { const { timezone } = await getSettingsForProject(input.projectId); @@ -911,15 +905,15 @@ export const chartRouter = createTRPCRouter({ // Check for profile filters and add profile join if needed const profileFilters = funnelService.getProfileFilters( - eventSeries as IChartEvent[], + eventSeries as IChartEvent[] ); if (profileFilters.length > 0) { const fieldsToSelect = uniq( - profileFilters.map((f) => f.split('.')[0]), + profileFilters.map((f) => f.split('.')[0]) ).join(', '); funnelCte.leftJoin( `(SELECT id, ${fieldsToSelect} FROM ${TABLE_NAMES.profiles} FINAL WHERE project_id = ${sqlstring.escape(projectId)}) as profile`, - 'profile.id = events.profile_id', + 'profile.id = events.profile_id' ); } @@ -934,7 +928,7 @@ export const chartRouter = createTRPCRouter({ // `max(level) AS level` alias (ILLEGAL_AGGREGATION error). query.with( 'funnel', - 'SELECT profile_id, max(level) AS level FROM (SELECT * FROM session_funnel WHERE level != 0) GROUP BY profile_id', + 'SELECT profile_id, max(level) AS level FROM (SELECT * FROM session_funnel WHERE level != 0) GROUP BY profile_id' ); } else { // For session grouping: filter out level = 0 inside the CTE @@ -969,7 +963,7 @@ export const chartRouter = createTRPCRouter({ // when there are many profile IDs to pass in the IN(...) clause const ids = profileIdsResult.map((p) => p.profile_id).filter(Boolean); const BATCH_SIZE = 500; - const profiles = []; + const profiles: IServiceProfile[] = []; for (let i = 0; i < ids.length; i += BATCH_SIZE) { const batch = ids.slice(i, i + BATCH_SIZE); const batchProfiles = await getProfilesCached(batch, projectId); @@ -986,7 +980,7 @@ function processCohortData( total_first_event_count: number; [key: string]: any; }>, - diffInterval: number, + diffInterval: number ) { if (data.length === 0) { return []; @@ -995,13 +989,13 @@ function processCohortData( const processed = data.map((row) => { const sum = row.total_first_event_count; const values = range(0, diffInterval + 1).map( - (index) => (row[`interval_${index}_user_count`] || 0) as number, + (index) => (row[`interval_${index}_user_count`] || 0) as number ); return { cohort_interval: row.cohort_interval, sum, - values: values, + values, percentages: values.map((value) => (sum > 0 ? round(value / sum, 2) : 0)), }; }); @@ -1041,10 +1035,10 @@ function processCohortData( cohort_interval: 'Weighted Average', sum: round(averageData.totalSum / processed.length, 0), percentages: averageData.percentages.map(({ sum, weightedSum }) => - sum > 0 ? round(weightedSum / sum, 2) : 0, + sum > 0 ? round(weightedSum / sum, 2) : 0 ), values: averageData.values.map(({ sum, weightedSum }) => - sum > 0 ? round(weightedSum / sum, 0) : 0, + sum > 0 ? round(weightedSum / sum, 0) : 0 ), };