Skip to content

Commit 7df96b3

Browse files
committed
up
1 parent 3d35f68 commit 7df96b3

File tree

4 files changed

+349
-21
lines changed

4 files changed

+349
-21
lines changed
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/**
2+
* Copyright 2025, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import { vi, describe, it, expect, beforeEach } from 'vitest';
17+
import { getMockAsyncCache, getMockSyncCache } from '../tests/mock/mock_cache';
18+
import { SyncStore } from '../utils/cache/store';
19+
import { EventWithId } from './batch_event_processor';
20+
import { EventStore, StoredEvent } from './event_store';
21+
import { createImpressionEvent } from '../tests/mock/create_event';
22+
23+
import { DEFAULT_MAX_EVENTS_IN_STORE } from './event_store';
24+
import { exhaustMicrotasks } from '../tests/testUtils';
25+
26+
type TestStoreConfig = {
27+
maxSize?: number;
28+
ttl?: number;
29+
}
30+
31+
const getEventStore = (config: TestStoreConfig = {}) => {
32+
const mockStore = getMockAsyncCache<StoredEvent>();
33+
const store = new EventStore({...config, store: mockStore });
34+
return { mockStore, store }
35+
}
36+
37+
describe('EventStore', () => {
38+
it('should be able to get the stored event correctly', async () => {
39+
const { store } = getEventStore();
40+
const event: EventWithId = {
41+
id: '1',
42+
event: createImpressionEvent('test'),
43+
}
44+
45+
await store.set('test', event);
46+
const savedEvent = await store.get('test');
47+
expect(savedEvent).toEqual(expect.objectContaining(event));
48+
});
49+
50+
it('should remove a key correctly', async () => {
51+
const { store } = getEventStore();
52+
const event: EventWithId = {
53+
id: '1',
54+
event: createImpressionEvent('test'),
55+
}
56+
57+
await store.set('test-1', event);
58+
let saved1 = await store.get('test-1');
59+
expect(saved1).toEqual(expect.objectContaining(event));
60+
61+
await store.set('test-2', event);
62+
63+
await store.remove('test-1');
64+
saved1 = await store.get('test-1');
65+
expect(saved1).toBeUndefined()
66+
67+
const saved2 = await store.get('test-2');
68+
expect(saved2).toEqual(expect.objectContaining(event));
69+
});
70+
71+
it('should return all keys from getKeys()', async () => {
72+
const { store } = getEventStore();
73+
const event: EventWithId = {
74+
id: '1',
75+
event: createImpressionEvent('test'),
76+
};
77+
78+
const keys = [];
79+
80+
for(let i = 0; i < 10; i++) {
81+
keys.push(`test-${i}`)
82+
await store.set(`test-${i}`, event);
83+
}
84+
85+
const savedKeys = await store.getKeys();
86+
savedKeys.sort();
87+
88+
expect(savedKeys).toEqual(keys);
89+
});
90+
91+
it('should limit the number of saved keys', async () => {
92+
const { store } = getEventStore();
93+
94+
const event: EventWithId = {
95+
id: '1',
96+
event: createImpressionEvent('test'),
97+
}
98+
99+
const keys = [];
100+
const promises = [];
101+
102+
for(let i = 0; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) {
103+
keys.push(i + '')
104+
const res = store.set(i + '', event);
105+
res.catch(() => {});
106+
promises.push(res);
107+
}
108+
109+
for(let i = 0; i < DEFAULT_MAX_EVENTS_IN_STORE; i++) {
110+
await expect(promises[i]).resolves.not.toThrow();
111+
}
112+
113+
for(let i = 0; i < 10; i++) {
114+
await expect(promises[DEFAULT_MAX_EVENTS_IN_STORE + i]).rejects.toThrow();
115+
}
116+
117+
const savedKeys = await store.getKeys();
118+
savedKeys.sort((a, b) => Number(a) - Number(b));
119+
120+
expect(savedKeys).toEqual(keys.slice(0, DEFAULT_MAX_EVENTS_IN_STORE));
121+
});
122+
123+
it('should save keys again when the number of stored events drops below maxSize', async () => {
124+
const { store } = getEventStore();
125+
126+
const event: EventWithId = {
127+
id: '1',
128+
event: createImpressionEvent('test'),
129+
}
130+
131+
const keys = []
132+
for(let i = 0; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) {
133+
keys.push(i + '');
134+
await store.set(i + '', event).catch(() => {});
135+
}
136+
137+
const savedKeys = await store.getKeys();
138+
savedKeys.sort((a, b) => Number(a) - Number(b));
139+
140+
expect(savedKeys).toEqual(keys.slice(0, DEFAULT_MAX_EVENTS_IN_STORE));
141+
142+
for(let i = 0; i < 10; i++) {
143+
const keyToRemove = i + '';
144+
await store.remove(keyToRemove);
145+
}
146+
147+
for(let i = DEFAULT_MAX_EVENTS_IN_STORE; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) {
148+
const keyToAdd = i + '';
149+
await expect(store.set(keyToAdd, event)).resolves.not.toThrow();
150+
}
151+
152+
const finalKeys = await store.getKeys();
153+
finalKeys.sort((a, b) => Number(a) - Number(b));
154+
155+
const expectedKeys = [];
156+
for(let i = 10; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) {
157+
expectedKeys.push(i + '');
158+
}
159+
160+
expect(finalKeys).toEqual(expectedKeys);
161+
});
162+
163+
it('should expire events after ttl', async () => {
164+
const ttl = 100; // 100 ms
165+
const { store } = getEventStore({ ttl });
166+
167+
const event: EventWithId = {
168+
id: '1',
169+
event: createImpressionEvent('test'),
170+
}
171+
172+
await store.set('test', event);
173+
const savedEvent = await store.get('test');
174+
expect(savedEvent).toEqual(expect.objectContaining(event));
175+
176+
// wait for ttl to expire
177+
await new Promise(resolve => setTimeout(resolve, ttl + 50));
178+
179+
const expiredEvent = await store.get('test');
180+
expect(expiredEvent).toBeUndefined();
181+
182+
expect(await store.getKeys()).toEqual([]);
183+
});
184+
185+
it('should resave events without expireAt on get', async () => {
186+
const ttl = 120_000;
187+
const { mockStore, store } = getEventStore({ ttl });
188+
189+
const event: EventWithId = {
190+
id: '1',
191+
event: createImpressionEvent('test'),
192+
}
193+
194+
195+
const originalSet = mockStore.set.bind(mockStore);
196+
197+
let call = 0;
198+
const setSpy = vi.spyOn(mockStore, 'set').mockImplementation(async (key: string, value: StoredEvent) => {
199+
if (call++ > 0) {
200+
return originalSet(key, value);
201+
}
202+
203+
// Simulate old stored event without expireAt
204+
const eventWithoutExpireAt: StoredEvent = {
205+
id: value.id,
206+
event: value.event,
207+
};
208+
return originalSet(key, eventWithoutExpireAt);
209+
});
210+
211+
await store.set('test', event);
212+
213+
const savedEvent = await store.get('test');
214+
expect(savedEvent).toEqual(expect.objectContaining(event));
215+
216+
217+
await exhaustMicrotasks();
218+
expect(setSpy).toHaveBeenCalledTimes(2);
219+
220+
const secondCall = setSpy.mock.calls[1];
221+
222+
expect(secondCall[1].expiresAt).toBeDefined();
223+
expect(secondCall[1].expiresAt!).toBeGreaterThanOrEqual(Date.now() + ttl - 10);
224+
});
225+
});

lib/event_processor/event_store.ts

Lines changed: 111 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,130 @@
11
import { EventWithId } from "./batch_event_processor";
2-
import { AsyncPrefixStore, OpStore, Store, SyncPrefixStore } from "../utils/cache/store";
2+
import { AsyncPrefixStore, AsyncStore, AsyncStoreWithBatchedGet, OpStore, Store, SyncPrefixStore } from "../utils/cache/store";
33
import { Maybe, OpType, OpValue } from "../utils/type";
4+
import { SerialRunner } from "../utils/executor/serial_runner";
5+
import { LoggerFacade } from "../logging/logger";
6+
import { EVENT_STORE_FULL } from "../message/log_message";
7+
import { OptimizelyError } from "../error/optimizly_error";
48

5-
type StoredEvent = EventWithId & {
6-
storedAt?: number;
9+
export type StoredEvent = EventWithId & {
10+
expiresAt?: number;
711
};
812

9-
export class EventStore<OP extends OpType> implements OpStore<OP, EventWithId> {
10-
private store: OpStore<OP, StoredEvent>;
11-
readonly operation: OP;
12-
private keysSet?: Set<string>;
13+
const identity = <T>(v: T): T => v;
14+
15+
const LOGGER_NAME = 'EventStore';
16+
export const DEFAULT_MAX_EVENTS_IN_STORE = 500;
17+
export const DEFAULT_STORE_TTL = 10 * 24 * 60 * 60 * 1000; // 10 days
18+
19+
export type EventStoreConfig = {
20+
maxSize?: number;
21+
ttl?: number,
22+
store: Store<EventWithId>,
23+
logger?: LoggerFacade,
24+
};
25+
26+
export class EventStore extends AsyncStoreWithBatchedGet<EventWithId> implements AsyncStore<EventWithId> {
27+
readonly operation = 'async';
28+
29+
private store: Store<StoredEvent>;
30+
private serializer: SerialRunner = new SerialRunner();
31+
private logger?: LoggerFacade;
32+
private maxSize: number;
33+
private ttl: number;
34+
private keys?: Set<string>;
35+
36+
constructor(config: EventStoreConfig) {
37+
super();
38+
39+
const {
40+
maxSize,
41+
ttl,
42+
store,
43+
logger
44+
} = config;
1345

14-
constructor(store: OpStore<OP, EventWithId>) {
1546
if (store.operation === 'sync') {
16-
this.store = new SyncPrefixStore(store as OpStore<'sync', EventWithId>, 'optly_event:', a => a, a => a) as unknown as OpStore<OP, StoredEvent>;
47+
this.store = new SyncPrefixStore(store, 'optly_event:', identity, identity);
1748
} else {
18-
this.store = new AsyncPrefixStore(store as OpStore<'async', EventWithId>, 'optly_event:', a => a, a => a) as unknown as OpStore<OP, StoredEvent>;
49+
this.store = new AsyncPrefixStore(store, 'optly_event:', identity, identity);
50+
}
51+
52+
if (logger) {
53+
logger.setName(LOGGER_NAME);
54+
this.logger = logger;
55+
}
56+
57+
this.maxSize = maxSize || DEFAULT_MAX_EVENTS_IN_STORE;
58+
this.ttl = ttl || DEFAULT_STORE_TTL;
59+
}
60+
61+
private async readKeys() {
62+
return this.serializer.run(async () => {
63+
if (this.keys !== undefined) {
64+
return;
65+
}
66+
67+
try {
68+
this.keys = new Set(await this.getKeys());
69+
} catch (err) {
70+
this.logger?.error(err);
71+
}
72+
});
73+
}
74+
75+
async set(key: string, event: EventWithId): Promise<unknown> {
76+
await this.readKeys();
77+
78+
if (this.keys !== undefined && this.keys.size >= this.maxSize) {
79+
this.logger?.info(EVENT_STORE_FULL, event.event.uuid);
80+
return Promise.reject(new OptimizelyError(EVENT_STORE_FULL, event.event.uuid));
1981
}
2082

21-
this.operation = store.operation;
83+
// this.store.set() might fail and cause the in memory set of keys to
84+
// diverge from the actual stored key list. But the in memory set needs
85+
// to be updated before the store set to limit the eventCount
86+
// when concurrent set are present. Even if the store set fails, it will
87+
// still keep the stored event cound below maxSize (it will underfill the store).
88+
// next getKeys() should fix the discrepency.
89+
this.keys?.add(key);
90+
return this.store.set(key, { ...event, expiresAt: Date.now() + this.ttl });
2291
}
2392

24-
set(key: string, value: EventWithId): OpValue<OP, unknown> {
25-
throw new Error("Method not implemented.");
93+
async get(key: string): Promise<EventWithId | undefined> {
94+
const value = await this.store.get(key);
95+
if (!value) return undefined;
96+
97+
// if there is events in the stored saved by old version of the sdk,
98+
// they will not have the storedAt time, update them with the current time
99+
// before returning
100+
101+
if (value.expiresAt === undefined) {
102+
value.expiresAt = Date.now() + this.ttl;
103+
this.set(key, value).catch(() => {});
104+
return value;
105+
}
106+
107+
if (value.expiresAt <= Date.now()) {
108+
this.remove(key).catch(() => {});
109+
return undefined;
110+
}
111+
112+
return value;
26113
}
27114

28-
get(key: string): OpValue<OP, Maybe<EventWithId>> {
29-
throw new Error("Method not implemented.");
115+
async remove(key: string): Promise<unknown> {
116+
await this.store.remove(key);
117+
this.keys?.delete(key);
118+
return;
30119
}
31120

32-
remove(key: string): OpValue<OP, unknown> {
33-
throw new Error("Method not implemented.");
121+
async getKeys(): Promise<string[]>{
122+
const keys = await this.store.getKeys();
123+
this.keys = new Set(keys);
124+
return keys;
34125
}
35126

36-
getKeys(): OpValue<OP, string[]> {
37-
throw new Error("Method not implemented.");
38-
}
127+
async getBatched(keys: string[]): Promise<Maybe<EventWithId>[]> {
128+
return [];
129+
}
39130
}

0 commit comments

Comments
 (0)