From 8dd3966f311924829522c825f48a9cc32b68dc9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 23 Mar 2026 21:37:12 +0100 Subject: [PATCH 1/5] wip --- packages/db/src/buffers/event-buffer.test.ts | 26 +- .../db/src/buffers/profile-buffer.test.ts | 117 +++-- packages/db/src/buffers/profile-buffer.ts | 427 ++++++++---------- .../db/src/buffers/session-buffer.test.ts | 122 +++++ packages/redis/cachable.test.ts | 10 +- 5 files changed, 439 insertions(+), 263 deletions(-) create mode 100644 packages/db/src/buffers/session-buffer.test.ts diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 178f94545..be44688fa 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -10,7 +10,11 @@ import { EventBuffer } from './event-buffer'; const redis = getRedisCache(); beforeEach(async () => { - await redis.flushdb(); + const keys = [ + ...await redis.keys('event*'), + ...await redis.keys('live:*'), + ]; + if (keys.length > 0) await redis.del(...keys); }); afterAll(async () => { @@ -273,4 +277,24 @@ describe('EventBuffer', () => { expect(await eventBuffer.getBufferSize()).toBe(5); }); + + it('retains events in queue when ClickHouse insert fails', async () => { + eventBuffer.add({ + project_id: 'p12', + name: 'event1', + created_at: new Date().toISOString(), + } as any); + await eventBuffer.flush(); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + await eventBuffer.processBuffer(); + + // Events must still be in the queue — not lost + expect(await eventBuffer.getBufferSize()).toBe(1); + + insertSpy.mockRestore(); + }); }); diff --git a/packages/db/src/buffers/profile-buffer.test.ts b/packages/db/src/buffers/profile-buffer.test.ts index bc7d39d22..2dc757f97 100644 --- a/packages/db/src/buffers/profile-buffer.test.ts +++ b/packages/db/src/buffers/profile-buffer.test.ts @@ -1,6 +1,5 @@ import { getRedisCache } from '@openpanel/redis'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { getSafeJson } from '@openpanel/json'; import type { IClickhouseProfile } from '../services/profile.service'; // Mock chQuery to avoid hitting real ClickHouse @@ -36,7 +35,11 @@ function makeProfile(overrides: Partial): IClickhouseProfile } beforeEach(async () => { - await redis.flushdb(); + const keys = [ + ...await redis.keys('profile*'), + ...await redis.keys('lock:profile'), + ]; + if (keys.length > 0) await redis.del(...keys); vi.mocked(chQuery).mockResolvedValue([]); }); @@ -63,89 +66,139 @@ describe('ProfileBuffer', () => { expect(sizeAfter).toBe(sizeBefore + 1); }); - it('merges subsequent updates via cache (sequential calls)', async () => { + it('concurrent adds: both raw profiles are queued', async () => { const identifyProfile = makeProfile({ first_name: 'John', email: 'john@example.com', groups: [], }); + const groupProfile = makeProfile({ + first_name: '', + email: '', + groups: ['group-abc'], + }); + + const sizeBefore = await profileBuffer.getBufferSize(); + await Promise.all([ + profileBuffer.add(identifyProfile), + profileBuffer.add(groupProfile), + ]); + const sizeAfter = await profileBuffer.getBufferSize(); + // Both raw profiles are queued; merge happens at flush time + expect(sizeAfter).toBe(sizeBefore + 2); + }); + + it('merges sequential updates for the same profile at flush time', async () => { + const identifyProfile = makeProfile({ + first_name: 'John', + email: 'john@example.com', + groups: [], + }); const groupProfile = makeProfile({ first_name: '', email: '', groups: ['group-abc'], }); - // Sequential: identify first, then group await profileBuffer.add(identifyProfile); await profileBuffer.add(groupProfile); + await profileBuffer.processBuffer(); - // Second add should read the cached identify profile and merge groups in const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); expect(cached?.first_name).toBe('John'); expect(cached?.email).toBe('john@example.com'); expect(cached?.groups).toContain('group-abc'); }); - it('race condition: concurrent identify + group calls preserve all data', async () => { + it('merges concurrent updates for the same profile at flush time', async () => { const identifyProfile = makeProfile({ first_name: 'John', email: 'john@example.com', groups: [], }); - const groupProfile = makeProfile({ first_name: '', email: '', groups: ['group-abc'], }); - // Both calls run concurrently — the per-profile lock serializes them so the - // second one reads the first's result from cache and merges correctly. await Promise.all([ profileBuffer.add(identifyProfile), profileBuffer.add(groupProfile), ]); + await profileBuffer.processBuffer(); const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); - expect(cached?.first_name).toBe('John'); expect(cached?.email).toBe('john@example.com'); expect(cached?.groups).toContain('group-abc'); }); - it('race condition: concurrent writes produce one merged buffer entry', async () => { - const identifyProfile = makeProfile({ - first_name: 'John', - email: 'john@example.com', - groups: [], + it('uses existing ClickHouse data for cache misses when merging', async () => { + const existingInClickhouse = makeProfile({ + first_name: 'Jane', + email: 'jane@example.com', + groups: ['existing-group'], }); + vi.mocked(chQuery).mockResolvedValue([existingInClickhouse]); - const groupProfile = makeProfile({ + const incomingProfile = makeProfile({ first_name: '', email: '', - groups: ['group-abc'], + groups: ['new-group'], }); - const sizeBefore = await profileBuffer.getBufferSize(); + await profileBuffer.add(incomingProfile); + await profileBuffer.processBuffer(); - await Promise.all([ - profileBuffer.add(identifyProfile), - profileBuffer.add(groupProfile), - ]); + const cached = await profileBuffer.fetchFromCache('profile-1', 'project-1'); + expect(cached?.first_name).toBe('Jane'); + expect(cached?.email).toBe('jane@example.com'); + expect(cached?.groups).toContain('existing-group'); + expect(cached?.groups).toContain('new-group'); + }); - const sizeAfter = await profileBuffer.getBufferSize(); + it('buffer is empty after flush', async () => { + await profileBuffer.add(makeProfile({ first_name: 'John' })); + expect(await profileBuffer.getBufferSize()).toBe(1); - // The second add merges into the first — only 2 buffer entries total - // (one from identify, one merged update with group) - expect(sizeAfter).toBe(sizeBefore + 2); + await profileBuffer.processBuffer(); + + expect(await profileBuffer.getBufferSize()).toBe(0); + }); + + it('retains profiles in queue when ClickHouse insert fails', async () => { + await profileBuffer.add(makeProfile({ first_name: 'John' })); + + const { ch } = await import('../clickhouse/client'); + const insertSpy = vi + .spyOn(ch, 'insert') + .mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + await profileBuffer.processBuffer(); + + // Profiles must still be in the queue — not lost + expect(await profileBuffer.getBufferSize()).toBe(1); + + insertSpy.mockRestore(); + }); + + it('proceeds with insert when ClickHouse fetch fails (treats profiles as new)', async () => { + vi.mocked(chQuery).mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + const { ch } = await import('../clickhouse/client'); + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await profileBuffer.add(makeProfile({ first_name: 'John' })); + await profileBuffer.processBuffer(); - // The last entry in the buffer should have both name and group - const rawEntries = await redis.lrange('profile-buffer', 0, -1); - const entries = rawEntries.map((e) => getSafeJson(e)); - const lastEntry = entries[entries.length - 1]; + // Insert must still have been called — no data loss even when fetch fails + expect(insertSpy).toHaveBeenCalled(); + expect(await profileBuffer.getBufferSize()).toBe(0); - expect(lastEntry?.first_name).toBe('John'); - expect(lastEntry?.groups).toContain('group-abc'); + insertSpy.mockRestore(); }); }); diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 6ff7c00be..0a2d9de26 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -1,9 +1,6 @@ import { deepMergeObjects } from '@openpanel/common'; -import { generateSecureId } from '@openpanel/common/server'; import { getSafeJson } from '@openpanel/json'; -import type { ILogger } from '@openpanel/logger'; import { getRedisCache, type Redis } from '@openpanel/redis'; -import shallowEqual from 'fast-deep-equal'; import { omit, uniq } from 'ramda'; import sqlstring from 'sqlstring'; import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client'; @@ -11,29 +8,24 @@ import type { IClickhouseProfile } from '../services/profile.service'; import { BaseBuffer } from './base-buffer'; export class ProfileBuffer extends BaseBuffer { - private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE + private readonly batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10) : 200; - private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE + private readonly chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE ? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10) : 1000; - private ttlInSeconds = process.env.PROFILE_BUFFER_TTL_IN_SECONDS + private readonly ttlInSeconds = process.env.PROFILE_BUFFER_TTL_IN_SECONDS ? Number.parseInt(process.env.PROFILE_BUFFER_TTL_IN_SECONDS, 10) : 60 * 60; + /** Max profiles per ClickHouse IN-clause fetch to keep query size bounded */ + private readonly fetchChunkSize = process.env.PROFILE_BUFFER_FETCH_CHUNK_SIZE + ? Number.parseInt(process.env.PROFILE_BUFFER_FETCH_CHUNK_SIZE, 10) + : 50; private readonly redisKey = 'profile-buffer'; private readonly redisProfilePrefix = 'profile-cache:'; - private redis: Redis; - private releaseLockSha: string | null = null; - - private readonly releaseLockScript = ` - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - `; + private readonly redis: Redis; constructor() { super({ @@ -43,9 +35,6 @@ export class ProfileBuffer extends BaseBuffer { }, }); this.redis = getRedisCache(); - this.redis.script('LOAD', this.releaseLockScript).then((sha) => { - this.releaseLockSha = sha as string; - }); } private getProfileCacheKey({ @@ -58,243 +47,226 @@ export class ProfileBuffer extends BaseBuffer { return `${this.redisProfilePrefix}${projectId}:${profileId}`; } - private async withProfileLock( + public async fetchFromCache( profileId: string, - projectId: string, - fn: () => Promise - ): Promise { - const lockKey = `profile-lock:${projectId}:${profileId}`; - const lockId = generateSecureId('lock'); - const maxRetries = 20; - const retryDelayMs = 50; - - for (let i = 0; i < maxRetries; i++) { - const acquired = await this.redis.set(lockKey, lockId, 'EX', 5, 'NX'); - if (acquired === 'OK') { - try { - return await fn(); - } finally { - if (this.releaseLockSha) { - await this.redis.evalsha(this.releaseLockSha, 1, lockKey, lockId); - } else { - await this.redis.eval(this.releaseLockScript, 1, lockKey, lockId); - } - } - } - await new Promise((resolve) => setTimeout(resolve, retryDelayMs)); + projectId: string + ): Promise { + const cacheKey = this.getProfileCacheKey({ profileId, projectId }); + const cached = await this.redis.get(cacheKey); + if (!cached) { + return null; } - - this.logger.error( - 'Failed to acquire profile lock, proceeding without lock', - { - profileId, - projectId, - } - ); - return fn(); + return getSafeJson(cached); } - async alreadyExists(profile: IClickhouseProfile) { - const cacheKey = this.getProfileCacheKey({ - profileId: profile.id, - projectId: profile.project_id, - }); - return (await this.redis.exists(cacheKey)) === 1; - } - - async add(profile: IClickhouseProfile, isFromEvent = false) { - const logger = this.logger.child({ - projectId: profile.project_id, - profileId: profile.id, - }); - + async add(profile: IClickhouseProfile, _isFromEvent = false) { try { - logger.debug('Adding profile'); + const result = await this.redis + .multi() + .rpush(this.redisKey, JSON.stringify(profile)) + .incr(this.bufferCounterKey) + .llen(this.redisKey) + .exec(); - if (isFromEvent && (await this.alreadyExists(profile))) { - logger.debug('Profile already created, skipping'); + if (!result) { + this.logger.error('Failed to add profile to Redis', { profile }); return; } - await this.withProfileLock(profile.id, profile.project_id, async () => { - const existingProfile = await this.fetchProfile(profile, logger); - - // Delete any properties that are not server related if we have a non-server profile - if ( - existingProfile?.properties.device !== 'server' && - profile.properties.device === 'server' - ) { - profile.properties = omit( - [ - 'city', - 'country', - 'region', - 'longitude', - 'latitude', - 'os', - 'osVersion', - 'browser', - 'device', - 'isServer', - 'os_version', - 'browser_version', - ], - profile.properties - ); - } - - const mergedProfile: IClickhouseProfile = existingProfile - ? { - ...deepMergeObjects( - existingProfile, - omit(['created_at', 'groups'], profile) - ), - groups: uniq([ - ...(existingProfile.groups ?? []), - ...(profile.groups ?? []), - ]), - } - : profile; - - if ( - profile && - existingProfile && - shallowEqual( - omit(['created_at'], existingProfile), - omit(['created_at'], mergedProfile) - ) - ) { - this.logger.debug('Profile not changed, skipping'); - return; - } - - this.logger.debug('Merged profile will be inserted', { - mergedProfile, - existingProfile, - profile, - }); - - const cacheKey = this.getProfileCacheKey({ - profileId: profile.id, - projectId: profile.project_id, - }); - - const result = await this.redis - .multi() - .set(cacheKey, JSON.stringify(mergedProfile), 'EX', this.ttlInSeconds) - .rpush(this.redisKey, JSON.stringify(mergedProfile)) - .incr(this.bufferCounterKey) - .llen(this.redisKey) - .exec(); - - if (!result) { - this.logger.error('Failed to add profile to Redis', { - profile, - cacheKey, - }); - return; - } - const bufferLength = (result?.[3]?.[1] as number) ?? 0; - - this.logger.debug('Current buffer length', { - bufferLength, - batchSize: this.batchSize, - }); - if (bufferLength >= this.batchSize) { - await this.tryFlush(); - } - }); + const bufferLength = (result?.[2]?.[1] as number) ?? 0; + if (bufferLength >= this.batchSize) { + await this.tryFlush(); + } } catch (error) { this.logger.error('Failed to add profile', { error, profile }); } } - private async fetchProfile( - profile: IClickhouseProfile, - logger: ILogger - ): Promise { - const existingProfile = await this.fetchFromCache( - profile.id, - profile.project_id - ); - if (existingProfile) { - logger.debug('Profile found in Redis'); - return existingProfile; + private mergeProfiles( + existing: IClickhouseProfile | null, + incoming: IClickhouseProfile + ): IClickhouseProfile { + if (!existing) { + return incoming; } - return this.fetchFromClickhouse(profile, logger); - } - - public async fetchFromCache( - profileId: string, - projectId: string - ): Promise { - const cacheKey = this.getProfileCacheKey({ - profileId, - projectId, - }); - const existingProfile = await this.redis.get(cacheKey); - if (!existingProfile) { - return null; + let profile = incoming; + if ( + existing.properties.device !== 'server' && + incoming.properties.device === 'server' + ) { + profile = { + ...incoming, + properties: omit( + [ + 'city', + 'country', + 'region', + 'longitude', + 'latitude', + 'os', + 'osVersion', + 'browser', + 'device', + 'isServer', + 'os_version', + 'browser_version', + ], + incoming.properties + ), + }; } - return getSafeJson(existingProfile); + + return { + ...deepMergeObjects(existing, omit(['created_at', 'groups'], profile)), + groups: uniq([...(existing.groups ?? []), ...(incoming.groups ?? [])]), + }; } - private async fetchFromClickhouse( - profile: IClickhouseProfile, - logger: ILogger - ): Promise { - logger.debug('Fetching profile from Clickhouse'); - const result = await chQuery( - `SELECT - id, - project_id, - last_value(nullIf(first_name, '')) as first_name, - last_value(nullIf(last_name, '')) as last_name, - last_value(nullIf(email, '')) as email, - last_value(nullIf(avatar, '')) as avatar, - last_value(is_external) as is_external, - last_value(properties) as properties, - last_value(created_at) as created_at - FROM ${TABLE_NAMES.profiles} - WHERE - id = ${sqlstring.escape(String(profile.id))} AND - project_id = ${sqlstring.escape(profile.project_id)} - ${ - profile.is_external === false - ? ' AND profiles.created_at > now() - INTERVAL 2 DAY' - : '' + private async batchFetchFromClickhouse( + profiles: IClickhouseProfile[] + ): Promise> { + const result = new Map(); + + // Non-external (anonymous/device) profiles get a 2-day recency filter to + // avoid pulling stale anonymous sessions from far back. + const external = profiles.filter((p) => p.is_external !== false); + const nonExternal = profiles.filter((p) => p.is_external === false); + + const fetchGroup = async ( + group: IClickhouseProfile[], + withDateFilter: boolean + ) => { + for (const chunk of this.chunks(group, this.fetchChunkSize)) { + const tuples = chunk + .map( + (p) => + `(${sqlstring.escape(String(p.id))}, ${sqlstring.escape(p.project_id)})` + ) + .join(', '); + try { + const rows = await chQuery( + `SELECT + id, + project_id, + last_value(nullIf(first_name, '')) as first_name, + last_value(nullIf(last_name, '')) as last_name, + last_value(nullIf(email, '')) as email, + last_value(nullIf(avatar, '')) as avatar, + last_value(is_external) as is_external, + last_value(properties) as properties, + last_value(created_at) as created_at + FROM ${TABLE_NAMES.profiles} + WHERE (id, project_id) IN (${tuples}) + ${withDateFilter ? 'AND created_at > now() - INTERVAL 2 DAY' : ''} + GROUP BY id, project_id + ORDER BY created_at DESC` + ); + for (const row of rows) { + result.set(`${row.project_id}:${row.id}`, row); + } + } catch (error) { + this.logger.warn( + 'Failed to batch fetch profiles from Clickhouse, proceeding without existing data', + { error, chunkSize: chunk.length } + ); } - GROUP BY id, project_id - ORDER BY created_at DESC - LIMIT 1` - ); - logger.debug('Clickhouse fetch result', { - found: !!result[0], - }); - return result[0] || null; + } + }; + + await Promise.all([ + fetchGroup(external, false), + fetchGroup(nonExternal, true), + ]); + + return result; } async processBuffer() { try { this.logger.debug('Starting profile buffer processing'); - const profiles = await this.redis.lrange( + const rawProfiles = await this.redis.lrange( this.redisKey, 0, this.batchSize - 1 ); - if (profiles.length === 0) { + if (rawProfiles.length === 0) { this.logger.debug('No profiles to process'); return; } - this.logger.debug(`Processing ${profiles.length} profiles in buffer`); - const parsedProfiles = profiles.map((p) => - getSafeJson(p) + const parsedProfiles = rawProfiles + .map((p) => getSafeJson(p)) + .filter(Boolean) as IClickhouseProfile[]; + + // Merge within batch: collapse multiple updates for the same profile + const mergedInBatch = new Map(); + for (const profile of parsedProfiles) { + const key = `${profile.project_id}:${profile.id}`; + mergedInBatch.set( + key, + this.mergeProfiles(mergedInBatch.get(key) ?? null, profile) + ); + } + + const uniqueProfiles = Array.from(mergedInBatch.values()); + + // Check Redis cache for all unique profiles in a single MGET + const cacheKeys = uniqueProfiles.map((p) => + this.getProfileCacheKey({ profileId: p.id, projectId: p.project_id }) ); + const cacheResults = await this.redis.mget(...cacheKeys); + + const existingByKey = new Map(); + const cacheMisses: IClickhouseProfile[] = []; + for (let i = 0; i < uniqueProfiles.length; i++) { + const uniqueProfile = uniqueProfiles[i]; + if (uniqueProfile) { + const key = `${uniqueProfile.project_id}:${uniqueProfile.id}`; + const cached = cacheResults[i] + ? getSafeJson(cacheResults[i]!) + : null; + if (cached) { + existingByKey.set(key, cached); + } else { + cacheMisses.push(uniqueProfile); + } + } + } + + // Fetch cache misses from ClickHouse in bounded chunks + if (cacheMisses.length > 0) { + const clickhouseResults = + await this.batchFetchFromClickhouse(cacheMisses); + for (const [key, profile] of clickhouseResults) { + existingByKey.set(key, profile); + } + } - for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { + // Final merge: in-batch profile + existing (from cache or ClickHouse) + const toInsert: IClickhouseProfile[] = []; + const multi = this.redis.multi(); + + for (const profile of uniqueProfiles) { + const key = `${profile.project_id}:${profile.id}`; + const merged = this.mergeProfiles( + existingByKey.get(key) ?? null, + profile + ); + toInsert.push(merged); + multi.set( + this.getProfileCacheKey({ + projectId: profile.project_id, + profileId: profile.id, + }), + JSON.stringify(merged), + 'EX', + this.ttlInSeconds + ); + } + + for (const chunk of this.chunks(toInsert, this.chunkSize)) { await ch.insert({ table: TABLE_NAMES.profiles, values: chunk, @@ -302,22 +274,21 @@ export class ProfileBuffer extends BaseBuffer { }); } - // Only remove profiles after successful insert and update counter - await this.redis - .multi() - .ltrim(this.redisKey, profiles.length, -1) - .decrby(this.bufferCounterKey, profiles.length) - .exec(); + multi + .ltrim(this.redisKey, rawProfiles.length, -1) + .decrby(this.bufferCounterKey, rawProfiles.length); + await multi.exec(); this.logger.debug('Successfully completed profile processing', { - totalProfiles: profiles.length, + totalProfiles: rawProfiles.length, + uniqueProfiles: uniqueProfiles.length, }); } catch (error) { this.logger.error('Failed to process buffer', { error }); } } - async getBufferSize() { + getBufferSize() { return this.getBufferSizeWithCounter(() => this.redis.llen(this.redisKey)); } } diff --git a/packages/db/src/buffers/session-buffer.test.ts b/packages/db/src/buffers/session-buffer.test.ts new file mode 100644 index 000000000..f140ff002 --- /dev/null +++ b/packages/db/src/buffers/session-buffer.test.ts @@ -0,0 +1,122 @@ +import { getRedisCache } from '@openpanel/redis'; +import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ch } from '../clickhouse/client'; + +vi.mock('../clickhouse/client', () => ({ + ch: { + insert: vi.fn().mockResolvedValue(undefined), + }, + TABLE_NAMES: { + sessions: 'sessions', + }, +})); + +import { SessionBuffer } from './session-buffer'; +import type { IClickhouseEvent } from '../services/event.service'; + +const redis = getRedisCache(); + +function makeEvent(overrides: Partial): IClickhouseEvent { + return { + id: 'event-1', + project_id: 'project-1', + profile_id: 'profile-1', + device_id: 'device-1', + session_id: 'session-1', + name: 'screen_view', + path: '/home', + origin: '', + referrer: '', + referrer_name: '', + referrer_type: '', + duration: 0, + properties: {}, + created_at: new Date().toISOString(), + groups: [], + ...overrides, + } as IClickhouseEvent; +} + +beforeEach(async () => { + const keys = [ + ...await redis.keys('session*'), + ...await redis.keys('lock:session'), + ]; + if (keys.length > 0) await redis.del(...keys); + vi.mocked(ch.insert).mockResolvedValue(undefined as any); +}); + +afterAll(async () => { + try { + await redis.quit(); + } catch {} +}); + +describe('SessionBuffer', () => { + let sessionBuffer: SessionBuffer; + + beforeEach(() => { + sessionBuffer = new SessionBuffer(); + }); + + it('adds a new session to the buffer', async () => { + const sizeBefore = await sessionBuffer.getBufferSize(); + await sessionBuffer.add(makeEvent({})); + const sizeAfter = await sessionBuffer.getBufferSize(); + + expect(sizeAfter).toBe(sizeBefore + 1); + }); + + it('skips session_start and session_end events', async () => { + const sizeBefore = await sessionBuffer.getBufferSize(); + await sessionBuffer.add(makeEvent({ name: 'session_start' })); + await sessionBuffer.add(makeEvent({ name: 'session_end' })); + const sizeAfter = await sessionBuffer.getBufferSize(); + + expect(sizeAfter).toBe(sizeBefore); + }); + + it('updates existing session on subsequent events', async () => { + const t0 = Date.now(); + await sessionBuffer.add(makeEvent({ created_at: new Date(t0).toISOString() })); + + // Second event updates the same session — emits old (sign=-1) + new (sign=1) + const sizeBefore = await sessionBuffer.getBufferSize(); + await sessionBuffer.add(makeEvent({ created_at: new Date(t0 + 5000).toISOString() })); + const sizeAfter = await sessionBuffer.getBufferSize(); + + expect(sizeAfter).toBe(sizeBefore + 2); + }); + + it('processes buffer and inserts sessions into ClickHouse', async () => { + await sessionBuffer.add(makeEvent({})); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockResolvedValueOnce(undefined as any); + + await sessionBuffer.processBuffer(); + + expect(insertSpy).toHaveBeenCalledWith( + expect.objectContaining({ table: 'sessions', format: 'JSONEachRow' }) + ); + expect(await sessionBuffer.getBufferSize()).toBe(0); + + insertSpy.mockRestore(); + }); + + it('retains sessions in queue when ClickHouse insert fails', async () => { + await sessionBuffer.add(makeEvent({})); + + const insertSpy = vi + .spyOn(ch, 'insert') + .mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + await sessionBuffer.processBuffer(); + + // Sessions must still be in the queue — not lost + expect(await sessionBuffer.getBufferSize()).toBe(1); + + insertSpy.mockRestore(); + }); +}); diff --git a/packages/redis/cachable.test.ts b/packages/redis/cachable.test.ts index d628f4391..2d212a2ea 100644 --- a/packages/redis/cachable.test.ts +++ b/packages/redis/cachable.test.ts @@ -8,7 +8,10 @@ describe('cachable', () => { beforeEach(async () => { redis = getRedisCache(); // Clear any existing cache data for clean tests - const keys = await redis.keys('cachable:*'); + const keys = [ + ...await redis.keys('cachable:*'), + ...await redis.keys('test-key*'), + ]; if (keys.length > 0) { await redis.del(...keys); } @@ -16,7 +19,10 @@ describe('cachable', () => { afterEach(async () => { // Clean up after each test - const keys = await redis.keys('cachable:*'); + const keys = [ + ...await redis.keys('cachable:*'), + ...await redis.keys('test-key*'), + ]; if (keys.length > 0) { await redis.del(...keys); } From 52be76cc26e51559749274930113be73457e04ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 24 Mar 2026 12:18:17 +0100 Subject: [PATCH 2/5] remove active visitor counter in redis --- apps/api/src/controllers/live.controller.ts | 16 +- .../src/components/overview/live-counter.tsx | 66 ++------ apps/start/src/hooks/use-live-counter.ts | 81 ++++++++++ apps/start/src/routes/widget/counter.tsx | 34 ++-- apps/start/src/routes/widget/realtime.tsx | 150 +++++++++--------- packages/db/src/buffers/event-buffer.test.ts | 26 ++- packages/db/src/buffers/event-buffer.ts | 71 ++------- 7 files changed, 213 insertions(+), 231 deletions(-) create mode 100644 apps/start/src/hooks/use-live-counter.ts diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 488e67138..d73f267ec 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -1,10 +1,7 @@ import type { WebSocket } from '@fastify/websocket'; import { eventBuffer } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { - psubscribeToPublishedEvent, - subscribeToPublishedEvent, -} from '@openpanel/redis'; +import { subscribeToPublishedEvent } from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; import type { FastifyRequest } from 'fastify'; @@ -39,19 +36,8 @@ export function wsVisitors( } ); - const punsubscribe = psubscribeToPublishedEvent( - '__keyevent@0__:expired', - (key) => { - const [, , projectId] = key.split(':'); - if (projectId === params.projectId) { - sendCount(); - } - } - ); - socket.on('close', () => { unsubscribe(); - punsubscribe(); }); } diff --git a/apps/start/src/components/overview/live-counter.tsx b/apps/start/src/components/overview/live-counter.tsx index 2333f5c09..c3d5e7336 100644 --- a/apps/start/src/components/overview/live-counter.tsx +++ b/apps/start/src/components/overview/live-counter.tsx @@ -1,61 +1,25 @@ -import { TooltipComplete } from '@/components/tooltip-complete'; -import { useDebounceState } from '@/hooks/use-debounce-state'; -import useWS from '@/hooks/use-ws'; -import { useTRPC } from '@/integrations/trpc/react'; -import { cn } from '@/utils/cn'; -import { useQuery, useQueryClient } from '@tanstack/react-query'; -import { useEffect, useRef } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; +import { useCallback } from 'react'; import { toast } from 'sonner'; import { AnimatedNumber } from '../animated-number'; +import { TooltipComplete } from '@/components/tooltip-complete'; +import { useLiveCounter } from '@/hooks/use-live-counter'; +import { cn } from '@/utils/cn'; export interface LiveCounterProps { projectId: string; shareId?: string; } -const FIFTEEN_SECONDS = 1000 * 30; - export function LiveCounter({ projectId, shareId }: LiveCounterProps) { - const trpc = useTRPC(); const client = useQueryClient(); - const counter = useDebounceState(0, 1000); - const lastRefresh = useRef(Date.now()); - const query = useQuery( - trpc.overview.liveVisitors.queryOptions({ - projectId, - shareId, - }), - ); - - useEffect(() => { - if (query.data) { - counter.set(query.data); - } - }, [query.data]); - - useWS( - `/live/visitors/${projectId}`, - (value) => { - if (!Number.isNaN(value)) { - counter.set(value); - if (Date.now() - lastRefresh.current > FIFTEEN_SECONDS) { - lastRefresh.current = Date.now(); - if (!document.hidden) { - toast('Refreshed data'); - client.refetchQueries({ - type: 'active', - }); - } - } - } - }, - { - debounce: { - delay: 1000, - maxWait: 5000, - }, - }, - ); + const onRefresh = useCallback(() => { + toast('Refreshed data'); + client.refetchQueries({ + type: 'active', + }); + }, [client]); + const counter = useLiveCounter({ projectId, shareId, onRefresh }); return (
diff --git a/apps/start/src/hooks/use-live-counter.ts b/apps/start/src/hooks/use-live-counter.ts new file mode 100644 index 000000000..d8e0d7eda --- /dev/null +++ b/apps/start/src/hooks/use-live-counter.ts @@ -0,0 +1,81 @@ +import { useQuery, useQueryClient } from '@tanstack/react-query'; +import { useEffect, useRef } from 'react'; +import { useDebounceState } from './use-debounce-state'; +import useWS from './use-ws'; +import { useTRPC } from '@/integrations/trpc/react'; + +const FIFTEEN_SECONDS = 1000 * 15; +/** Refetch from API when WS-only updates may be stale (e.g. visitors left). */ +const FALLBACK_STALE_MS = 1000 * 60; + +export function useLiveCounter({ + projectId, + shareId, + onRefresh, +}: { + projectId: string; + shareId?: string; + onRefresh?: () => void; +}) { + const trpc = useTRPC(); + const queryClient = useQueryClient(); + const counter = useDebounceState(0, 1000); + const lastRefresh = useRef(Date.now()); + const query = useQuery( + trpc.overview.liveVisitors.queryOptions({ + projectId, + shareId: shareId ?? undefined, + }) + ); + + useEffect(() => { + if (query.data) { + counter.set(query.data); + } + }, [query.data]); + + useWS( + `/live/visitors/${projectId}`, + (value) => { + if (!Number.isNaN(value)) { + counter.set(value); + if (Date.now() - lastRefresh.current > FIFTEEN_SECONDS) { + lastRefresh.current = Date.now(); + if (!document.hidden) { + onRefresh?.(); + } + } + } + }, + { + debounce: { + delay: 1000, + maxWait: 5000, + }, + } + ); + + useEffect(() => { + const id = setInterval(async () => { + if (Date.now() - lastRefresh.current < FALLBACK_STALE_MS) { + return; + } + const data = await queryClient.fetchQuery( + trpc.overview.liveVisitors.queryOptions( + { + projectId, + shareId: shareId ?? undefined, + }, + // Default query staleTime is 5m; bypass cache so this reconciliation always hits the API. + { staleTime: 0 } + ) + ); + counter.set(data); + lastRefresh.current = Date.now(); + }, FALLBACK_STALE_MS); + + return () => clearInterval(id); + }, [projectId, shareId, trpc, queryClient, counter.set]); + + return counter; +} diff --git a/apps/start/src/routes/widget/counter.tsx b/apps/start/src/routes/widget/counter.tsx index a55e7c81a..d745aa62e 100644 --- a/apps/start/src/routes/widget/counter.tsx +++ b/apps/start/src/routes/widget/counter.tsx @@ -1,12 +1,11 @@ +import { useQuery, useQueryClient } from '@tanstack/react-query'; +import { createFileRoute } from '@tanstack/react-router'; +import { z } from 'zod'; import { AnimatedNumber } from '@/components/animated-number'; import { Ping } from '@/components/ping'; -import { useNumber } from '@/hooks/use-numer-formatter'; import useWS from '@/hooks/use-ws'; import { useTRPC } from '@/integrations/trpc/react'; import type { RouterOutputs } from '@/trpc/client'; -import { useQuery, useQueryClient } from '@tanstack/react-query'; -import { createFileRoute } from '@tanstack/react-router'; -import { z } from 'zod'; const widgetSearchSchema = z.object({ shareId: z.string(), @@ -20,33 +19,33 @@ export const Route = createFileRoute('/widget/counter')({ }); function RouteComponent() { - const { shareId, limit, color } = Route.useSearch(); + const { shareId } = Route.useSearch(); const trpc = useTRPC(); // Fetch widget data const { data, isLoading } = useQuery( - trpc.widget.counter.queryOptions({ shareId }), + trpc.widget.counter.queryOptions({ shareId }) ); if (isLoading) { return ( -
+
- +
); } if (!data) { return ( -
+
- +
); } - return ; + return ; } interface RealtimeWidgetProps { @@ -57,30 +56,29 @@ interface RealtimeWidgetProps { function CounterWidget({ shareId, data }: RealtimeWidgetProps) { const trpc = useTRPC(); const queryClient = useQueryClient(); - const number = useNumber(); // WebSocket subscription for real-time updates useWS( `/live/visitors/${data.projectId}`, - (res) => { + () => { if (!document.hidden) { queryClient.refetchQueries( - trpc.widget.counter.queryFilter({ shareId }), + trpc.widget.counter.queryFilter({ shareId }) ); } }, { debounce: { delay: 1000, - maxWait: 60000, + maxWait: 60_000, }, - }, + } ); return ( -
+
- +
); } diff --git a/apps/start/src/routes/widget/realtime.tsx b/apps/start/src/routes/widget/realtime.tsx index a60a17a8e..8e1a4cdb3 100644 --- a/apps/start/src/routes/widget/realtime.tsx +++ b/apps/start/src/routes/widget/realtime.tsx @@ -1,3 +1,15 @@ +import { useQuery, useQueryClient } from '@tanstack/react-query'; +import { createFileRoute } from '@tanstack/react-router'; +import type React from 'react'; +import { + Bar, + BarChart, + ResponsiveContainer, + Tooltip, + XAxis, + YAxis, +} from 'recharts'; +import { z } from 'zod'; import { AnimatedNumber } from '@/components/animated-number'; import { ChartTooltipContainer, @@ -14,18 +26,6 @@ import { countries } from '@/translations/countries'; import type { RouterOutputs } from '@/trpc/client'; import { cn } from '@/utils/cn'; import { getChartColor } from '@/utils/theme'; -import { useQuery, useQueryClient } from '@tanstack/react-query'; -import { createFileRoute } from '@tanstack/react-router'; -import type React from 'react'; -import { - Bar, - BarChart, - ResponsiveContainer, - Tooltip, - XAxis, - YAxis, -} from 'recharts'; -import { z } from 'zod'; const widgetSearchSchema = z.object({ shareId: z.string(), @@ -44,7 +44,7 @@ function RouteComponent() { // Fetch widget data const { data: widgetData, isLoading } = useQuery( - trpc.widget.realtimeData.queryOptions({ shareId }), + trpc.widget.realtimeData.queryOptions({ shareId }) ); if (isLoading) { @@ -53,10 +53,10 @@ function RouteComponent() { if (!widgetData) { return ( -
- -

Widget not found

-

+

+ +

Widget not found

+

This widget is not available or has been removed.

@@ -65,10 +65,10 @@ function RouteComponent() { return ( ); } @@ -83,7 +83,6 @@ interface RealtimeWidgetProps { function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { const trpc = useTRPC(); const queryClient = useQueryClient(); - const number = useNumber(); // WebSocket subscription for real-time updates useWS( @@ -91,16 +90,16 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { () => { if (!document.hidden) { queryClient.refetchQueries( - trpc.widget.realtimeData.queryFilter({ shareId }), + trpc.widget.realtimeData.queryFilter({ shareId }) ); } }, { debounce: { delay: 1000, - maxWait: 60000, + maxWait: 60_000, }, - }, + } ); const maxDomain = @@ -111,8 +110,12 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { const referrers = data.referrers.length > 0 ? 1 : 0; const paths = data.paths.length > 0 ? 1 : 0; const value = countries + referrers + paths; - if (value === 3) return 'md:grid-cols-3'; - if (value === 2) return 'md:grid-cols-2'; + if (value === 3) { + return 'md:grid-cols-3'; + } + if (value === 2) { + return 'md:grid-cols-2'; + } return 'md:grid-cols-1'; })(); @@ -120,10 +123,10 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) {
{/* Header with live counter */}
-
-
+
+
-
+
USERS IN LAST 30 MINUTES
{data.project.domain && } @@ -131,14 +134,14 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) {
-
+
-
+
- + - + @@ -174,24 +177,24 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { {(data.countries.length > 0 || data.referrers.length > 0 || data.paths.length > 0) && ( -
+
{/* Countries */} {data.countries.length > 0 && (
-
+
COUNTRY
{(() => { const { visible, rest, restCount } = getRestItems( data.countries, - limit, + limit ); return ( <> {visible.map((item) => ( - +
@@ -224,19 +227,19 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { {/* Referrers */} {data.referrers.length > 0 && (
-
+
REFERRER
{(() => { const { visible, rest, restCount } = getRestItems( data.referrers, - limit, + limit ); return ( <> {visible.map((item) => ( - +
@@ -263,19 +266,19 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { {/* Paths */} {data.paths.length > 0 && (
-
+
PATH
{(() => { const { visible, rest, restCount } = getRestItems( data.paths, - limit, + limit ); return ( <> {visible.map((item) => ( - + {item.path} @@ -303,10 +306,10 @@ function RealtimeWidget({ shareId, data, limit, color }: RealtimeWidgetProps) { } // Custom tooltip component that uses portals to escape overflow hidden -const CustomTooltip = ({ active, payload, coordinate }: any) => { +const CustomTooltip = ({ active, payload }: any) => { const number = useNumber(); - if (!active || !payload || !payload.length) { + if (!(active && payload && payload.length)) { return null; } @@ -328,10 +331,13 @@ const CustomTooltip = ({ active, payload, coordinate }: any) => { function RowItem({ children, count, -}: { children: React.ReactNode; count: number }) { +}: { + children: React.ReactNode; + count: number; +}) { const number = useNumber(); return ( -
+
{children} {number.short(count)}
@@ -340,7 +346,7 @@ function RowItem({ function getRestItems( items: T[], - limit: number, + limit: number ): { visible: T[]; rest: T[]; restCount: number } { const visible = items.slice(0, limit); const rest = items.slice(limit); @@ -375,7 +381,7 @@ function RestRow({ : 'paths'; return ( -
+
{firstName} and {otherCount} more {typeLabel}... @@ -434,13 +440,13 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { const itemCount = Math.min(limit, 5); return ( -
+
{/* Header with live counter */}
-
-
+
+
-
+
USERS IN LAST 30 MINUTES
@@ -448,35 +454,35 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) {
-
-
-
+
+
+
-
-
+
+
{SKELETON_HISTOGRAM.map((item, index) => (
))}
-
-
+
+
-
+
{/* Countries, Referrers, and Paths skeleton */}
{/* Countries skeleton */}
-
+
COUNTRY
@@ -488,7 +494,7 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { {/* Referrers skeleton */}
-
+
REFERRER
@@ -500,7 +506,7 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { {/* Paths skeleton */}
-
+
PATH
@@ -517,12 +523,12 @@ function RealtimeWidgetSkeleton({ limit }: { limit: number }) { function RowItemSkeleton() { return ( -
+
-
+
-
+
); } diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index be44688fa..e269ef0b1 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -1,6 +1,7 @@ import { getRedisCache } from '@openpanel/redis'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { ch } from '../clickhouse/client'; +import * as chClient from '../clickhouse/client'; +const { ch } = chClient; // Break circular dep: event-buffer -> event.service -> buffers/index -> EventBuffer vi.mock('../services/event.service', () => ({})); @@ -10,10 +11,7 @@ import { EventBuffer } from './event-buffer'; const redis = getRedisCache(); beforeEach(async () => { - const keys = [ - ...await redis.keys('event*'), - ...await redis.keys('live:*'), - ]; + const keys = await redis.keys('event*'); if (keys.length > 0) await redis.del(...keys); }); @@ -213,18 +211,16 @@ describe('EventBuffer', () => { }); it('tracks active visitors', async () => { - const event = { - project_id: 'p9', - profile_id: 'u9', - name: 'custom', - created_at: new Date().toISOString(), - } as any; - - eventBuffer.add(event); - await eventBuffer.flush(); + const querySpy = vi + .spyOn(chClient, 'chQuery') + .mockResolvedValueOnce([{ count: 2 }] as any); const count = await eventBuffer.getActiveVisitorCount('p9'); - expect(count).toBeGreaterThanOrEqual(1); + expect(count).toBe(2); + expect(querySpy).toHaveBeenCalledOnce(); + expect(querySpy.mock.calls[0]![0]).toContain("project_id = 'p9'"); + + querySpy.mockRestore(); }); it('handles multiple sessions independently — all events go to buffer', async () => { diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 2e5d43dce..883812adc 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -1,6 +1,6 @@ import { getSafeJson } from '@openpanel/json'; -import { getRedisCache, publishEvent, type Redis } from '@openpanel/redis'; -import { ch } from '../clickhouse/client'; +import { getRedisCache, publishEvent } from '@openpanel/redis'; +import { ch, chQuery } from '../clickhouse/client'; import type { IClickhouseEvent } from '../services/event.service'; import { BaseBuffer } from './base-buffer'; @@ -25,10 +25,6 @@ export class EventBuffer extends BaseBuffer { /** Tracks consecutive flush failures for observability; reset on success. */ private flushRetryCount = 0; - private activeVisitorsExpiration = 60 * 5; // 5 minutes - /** How often (ms) we refresh the heartbeat key + zadd per visitor. */ - private heartbeatRefreshMs = 60_000; // 1 minute - private lastHeartbeat = new Map(); private queueKey = 'event_buffer:queue'; protected bufferCounterKey = 'event_buffer:total_count'; @@ -87,20 +83,12 @@ export class EventBuffer extends BaseBuffer { for (const event of eventsToFlush) { multi.rpush(this.queueKey, JSON.stringify(event)); - if (event.profile_id) { - this.incrementActiveVisitorCount( - multi, - event.project_id, - event.profile_id - ); - } } multi.incrby(this.bufferCounterKey, eventsToFlush.length); await multi.exec(); this.flushRetryCount = 0; - this.pruneHeartbeatMap(); } catch (error) { // Re-queue failed events at the front to preserve order and avoid data loss this.pendingEvents = eventsToFlush.concat(this.pendingEvents); @@ -202,58 +190,21 @@ export class EventBuffer extends BaseBuffer { } } - public async getBufferSize() { + public getBufferSize() { return this.getBufferSizeWithCounter(async () => { const redis = getRedisCache(); return await redis.llen(this.queueKey); }); } - private pruneHeartbeatMap() { - const cutoff = Date.now() - this.activeVisitorsExpiration * 1000; - for (const [key, ts] of this.lastHeartbeat) { - if (ts < cutoff) { - this.lastHeartbeat.delete(key); - } - } - } - - private incrementActiveVisitorCount( - multi: ReturnType, - projectId: string, - profileId: string - ) { - const key = `${projectId}:${profileId}`; - const now = Date.now(); - const last = this.lastHeartbeat.get(key) ?? 0; - - if (now - last < this.heartbeatRefreshMs) { - return; - } - - this.lastHeartbeat.set(key, now); - const zsetKey = `live:visitors:${projectId}`; - const heartbeatKey = `live:visitor:${projectId}:${profileId}`; - multi - .zadd(zsetKey, now, profileId) - .set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration); - } - public async getActiveVisitorCount(projectId: string): Promise { - const redis = getRedisCache(); - const zsetKey = `live:visitors:${projectId}`; - const cutoff = Date.now() - this.activeVisitorsExpiration * 1000; - - const multi = redis.multi(); - multi - .zremrangebyscore(zsetKey, '-inf', cutoff) - .zcount(zsetKey, cutoff, '+inf'); - - const [, count] = (await multi.exec()) as [ - [Error | null, any], - [Error | null, number], - ]; - - return count[1] || 0; + const rows = await chQuery<{ count: number }>( + `SELECT uniq(profile_id) AS count + FROM events + WHERE project_id = '${projectId}' + AND profile_id != '' + AND created_at >= now() - INTERVAL 5 MINUTE` + ); + return rows[0]?.count ?? 0; } } From 75995cb483e63b18ea6e7dc78d39e77026dd4010 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 24 Mar 2026 13:05:51 +0100 Subject: [PATCH 3/5] test --- packages/db/src/buffers/profile-buffer.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index 0a2d9de26..e7547b8bd 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -59,8 +59,19 @@ export class ProfileBuffer extends BaseBuffer { return getSafeJson(cached); } - async add(profile: IClickhouseProfile, _isFromEvent = false) { + async add(profile: IClickhouseProfile, isFromEvent = false) { try { + if (isFromEvent) { + const cacheKey = this.getProfileCacheKey({ + profileId: profile.id, + projectId: profile.project_id, + }); + const exists = await this.redis.exists(cacheKey); + if (exists === 1) { + return; + } + } + const result = await this.redis .multi() .rpush(this.redisKey, JSON.stringify(profile)) From 56430e964ae487dd7712611614403aef81730165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 24 Mar 2026 13:19:06 +0100 Subject: [PATCH 4/5] fix profiel query --- packages/db/src/buffers/profile-buffer.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index e7547b8bd..c2474a486 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -170,8 +170,7 @@ export class ProfileBuffer extends BaseBuffer { FROM ${TABLE_NAMES.profiles} WHERE (id, project_id) IN (${tuples}) ${withDateFilter ? 'AND created_at > now() - INTERVAL 2 DAY' : ''} - GROUP BY id, project_id - ORDER BY created_at DESC` + GROUP BY id, project_id` ); for (const row of rows) { result.set(`${row.project_id}:${row.id}`, row); From af318c3304469d65ccdd980d9afc76f79f1746c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Tue, 24 Mar 2026 13:30:34 +0100 Subject: [PATCH 5/5] fix --- packages/db/src/buffers/profile-buffer.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index c2474a486..e968951c7 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -160,16 +160,16 @@ export class ProfileBuffer extends BaseBuffer { `SELECT id, project_id, - last_value(nullIf(first_name, '')) as first_name, - last_value(nullIf(last_name, '')) as last_name, - last_value(nullIf(email, '')) as email, - last_value(nullIf(avatar, '')) as avatar, - last_value(is_external) as is_external, - last_value(properties) as properties, - last_value(created_at) as created_at + argMax(nullIf(first_name, ''), ${TABLE_NAMES.profiles}.created_at) as first_name, + argMax(nullIf(last_name, ''), ${TABLE_NAMES.profiles}.created_at) as last_name, + argMax(nullIf(email, ''), ${TABLE_NAMES.profiles}.created_at) as email, + argMax(nullIf(avatar, ''), ${TABLE_NAMES.profiles}.created_at) as avatar, + argMax(is_external, ${TABLE_NAMES.profiles}.created_at) as is_external, + argMax(properties, ${TABLE_NAMES.profiles}.created_at) as properties, + max(created_at) as created_at FROM ${TABLE_NAMES.profiles} WHERE (id, project_id) IN (${tuples}) - ${withDateFilter ? 'AND created_at > now() - INTERVAL 2 DAY' : ''} + ${withDateFilter ? `AND ${TABLE_NAMES.profiles}.created_at > now() - INTERVAL 2 DAY` : ''} GROUP BY id, project_id` ); for (const row of rows) {