Skip to content

Commit f7b2554

Browse files
authored
[FSSDK-12028] add ttl to events in event store (#1103)
Add configurable ttl support to stored events. This will be helpful if for some reason, events are failing permanently (for example, the event enpoint is blocked by adblocker/isp). Also, refactored the code to move the responsibility of limiting the number of saved event to the store implementation from the event processor.
1 parent 697c8df commit f7b2554

15 files changed

+720
-240
lines changed

lib/event_processor/batch_event_processor.react_native.spec.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
import { vi, describe, it, expect, beforeEach } from 'vitest';
17+
import { beforeEach, describe, expect, it, vi } from 'vitest';
1818

1919
const mockNetInfo = vi.hoisted(() => {
2020
const netInfo = {
@@ -45,13 +45,13 @@ vi.mock('@react-native-community/netinfo', () => {
4545
};
4646
});
4747

48-
import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native';
49-
import { getMockRepeater } from '../tests/mock/mock_repeater';
5048
import { getMockAsyncCache } from '../tests/mock/mock_cache';
49+
import { getMockRepeater } from '../tests/mock/mock_repeater';
50+
import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native';
5151

52+
import { createImpressionEvent } from '../tests/mock/create_event';
5253
import { EventWithId } from './batch_event_processor';
5354
import { buildLogEvent } from './event_builder/log_event';
54-
import { createImpressionEvent } from '../tests/mock/create_event';
5555
import { ProcessableEvent } from './event_processor';
5656

5757
const getMockDispatcher = () => {
@@ -77,6 +77,10 @@ describe('ReactNativeNetInfoEventProcessor', () => {
7777
const dispatchRepeater = getMockRepeater();
7878
const failedEventRepeater = getMockRepeater();
7979

80+
// fail the first dispatch attempt that is made on start
81+
eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 })
82+
.mockResolvedValue({ statusCode: 200 });
83+
8084
const cache = getMockAsyncCache<EventWithId>();
8185
const events: ProcessableEvent[] = [];
8286

@@ -98,18 +102,27 @@ describe('ReactNativeNetInfoEventProcessor', () => {
98102
processor.start();
99103
await processor.onRunning();
100104

105+
await exhaustMicrotasks();
106+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
107+
101108
mockNetInfo.pushState(true);
102-
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
109+
await exhaustMicrotasks();
110+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
103111

104112
mockNetInfo.pushState(true);
105-
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
113+
await exhaustMicrotasks();
114+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
106115
});
107116

108-
it('should retry failed events when network becomes reachable', async () => {
117+
it('should retry failed events immediately when network becomes reachable', async () => {
109118
const eventDispatcher = getMockDispatcher();
110119
const dispatchRepeater = getMockRepeater();
111120
const failedEventRepeater = getMockRepeater();
112121

122+
// fail the first dispatch attempt that is made on start
123+
eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 })
124+
.mockResolvedValue({ statusCode: 200 });
125+
113126
const cache = getMockAsyncCache<EventWithId>();
114127
const events: ProcessableEvent[] = [];
115128

@@ -131,14 +144,17 @@ describe('ReactNativeNetInfoEventProcessor', () => {
131144
processor.start();
132145
await processor.onRunning();
133146

134-
mockNetInfo.pushState(false);
135-
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
147+
await exhaustMicrotasks();
148+
149+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
136150

137-
mockNetInfo.pushState(true);
151+
mockNetInfo.pushState(false);
152+
mockNetInfo.pushState(true);
138153

139154
await exhaustMicrotasks();
140155

141-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledWith(buildLogEvent(events));
156+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2);
157+
expect(eventDispatcher.dispatchEvent).toHaveBeenNthCalledWith(2, buildLogEvent(events));
142158
});
143159

144160
it('should unsubscribe from netinfo listener when stopped', async () => {

lib/event_processor/batch_event_processor.spec.ts

Lines changed: 0 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -368,124 +368,6 @@ describe('BatchEventProcessor', async () => {
368368
expect(events).toEqual(eventsInStore);
369369
});
370370

371-
it('should not store the event in the eventStore but still dispatch if the \
372-
number of pending events is greater than the limit', async () => {
373-
const eventDispatcher = getMockDispatcher();
374-
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
375-
mockDispatch.mockResolvedValue(resolvablePromise().promise);
376-
377-
const eventStore = getMockSyncCache<EventWithId>();
378-
379-
const idGenerator = new IdGenerator();
380-
381-
for (let i = 0; i < 505; i++) {
382-
const event = createImpressionEvent(`id-${i}`);
383-
const cacheId = idGenerator.getId();
384-
await eventStore.set(cacheId, { id: cacheId, event });
385-
}
386-
387-
expect(eventStore.size()).toEqual(505);
388-
389-
const processor = new BatchEventProcessor({
390-
eventDispatcher,
391-
dispatchRepeater: getMockRepeater(),
392-
batchSize: 1,
393-
eventStore,
394-
});
395-
396-
processor.start();
397-
await processor.onRunning();
398-
399-
const events: ProcessableEvent[] = [];
400-
for(let i = 0; i < 2; i++) {
401-
const event = createImpressionEvent(`id-${i}`);
402-
events.push(event);
403-
await processor.process(event)
404-
}
405-
406-
expect(eventStore.size()).toEqual(505);
407-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507);
408-
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]]));
409-
expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]]));
410-
});
411-
412-
it('should store events in the eventStore when the number of events in the store\
413-
becomes lower than the limit', async () => {
414-
const eventDispatcher = getMockDispatcher();
415-
416-
const dispatchResponses: ResolvablePromise<any>[] = [];
417-
418-
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
419-
mockDispatch.mockImplementation((arg) => {
420-
const dispatchResponse = resolvablePromise();
421-
dispatchResponses.push(dispatchResponse);
422-
return dispatchResponse.promise;
423-
});
424-
425-
const eventStore = getMockSyncCache<EventWithId>();
426-
427-
const idGenerator = new IdGenerator();
428-
429-
for (let i = 0; i < 502; i++) {
430-
const event = createImpressionEvent(`id-${i}`);
431-
const cacheId = String(i);
432-
await eventStore.set(cacheId, { id: cacheId, event });
433-
}
434-
435-
expect(eventStore.size()).toEqual(502);
436-
437-
const processor = new BatchEventProcessor({
438-
eventDispatcher,
439-
dispatchRepeater: getMockRepeater(),
440-
batchSize: 1,
441-
eventStore,
442-
});
443-
444-
processor.start();
445-
await processor.onRunning();
446-
447-
let events: ProcessableEvent[] = [];
448-
for(let i = 0; i < 2; i++) {
449-
const event = createImpressionEvent(`id-${i + 502}`);
450-
events.push(event);
451-
await processor.process(event)
452-
}
453-
454-
expect(eventStore.size()).toEqual(502);
455-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504);
456-
457-
expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]]));
458-
expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]]));
459-
460-
// resolve the dispatch for events not saved in the store
461-
dispatchResponses[502].resolve({ statusCode: 200 });
462-
dispatchResponses[503].resolve({ statusCode: 200 });
463-
464-
await exhaustMicrotasks();
465-
expect(eventStore.size()).toEqual(502);
466-
467-
// resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit
468-
dispatchResponses[0].resolve({ statusCode: 200 });
469-
dispatchResponses[1].resolve({ statusCode: 200 });
470-
dispatchResponses[2].resolve({ statusCode: 200 });
471-
472-
await exhaustMicrotasks();
473-
expect(eventStore.size()).toEqual(499);
474-
475-
// process 2 more events
476-
events = [];
477-
for(let i = 0; i < 2; i++) {
478-
const event = createImpressionEvent(`id-${i + 504}`);
479-
events.push(event);
480-
await processor.process(event)
481-
}
482-
483-
expect(eventStore.size()).toEqual(500);
484-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506);
485-
expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]]));
486-
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]]));
487-
});
488-
489371
it('should still dispatch events even if the store save fails', async () => {
490372
const eventDispatcher = getMockDispatcher();
491373
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;

lib/event_processor/batch_event_processor.ts

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
3131
import { OptimizelyError } from "../error/optimizly_error";
3232
import { sprintf } from "../utils/fns";
3333
import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service";
34-
import { EVENT_STORE_FULL } from "../message/log_message";
3534

3635
export const DEFAULT_MIN_BACKOFF = 1000;
3736
export const DEFAULT_MAX_BACKOFF = 32000;
@@ -40,7 +39,6 @@ export const MAX_EVENTS_IN_STORE = 500;
4039
export type EventWithId = {
4140
id: string;
4241
event: ProcessableEvent;
43-
notStored?: boolean;
4442
};
4543

4644
export type RetryConfig = {
@@ -73,9 +71,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
7371
private eventQueue: EventWithId[] = [];
7472
private batchSize: number;
7573
private eventStore?: Store<EventWithId>;
76-
private eventCountInStore: Maybe<number> = undefined;
77-
private eventCountWaitPromise: Promise<unknown> = Promise.resolve();
78-
private maxEventsInStore: number = MAX_EVENTS_IN_STORE;
7974
private dispatchRepeater: Repeater;
8075
private failedEventRepeater?: Repeater;
8176
private idGenerator: IdGenerator = new IdGenerator();
@@ -96,7 +91,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
9691
this.dispatchRepeater = config.dispatchRepeater;
9792
this.dispatchRepeater.setTask(() => this.flush());
9893

99-
this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE);
10094
this.failedEventRepeater = config.failedEventRepeater;
10195
this.failedEventRepeater?.setTask(() => this.retryFailedEvents());
10296
if (config.logger) {
@@ -215,9 +209,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
215209
runResult.result.then((res) => {
216210
events.forEach((event) => {
217211
this.eventStore?.remove(event.id);
218-
if (!event.notStored && this.eventCountInStore) {
219-
this.eventCountInStore--;
220-
}
221212
});
222213
return Promise.resolve();
223214
}).catch((err) => {
@@ -250,7 +241,10 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
250241
event: event,
251242
};
252243

253-
await this.storeEvent(eventWithId);
244+
// store in eventStore first to avoid data loss
245+
await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).catch((e) => {
246+
this.logger?.error(e);
247+
});
254248

255249
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
256250
this.flush();
@@ -265,46 +259,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
265259
}
266260
}
267261

268-
private async readEventCountInStore(store: Store<EventWithId>): Promise<void> {
269-
if (this.eventCountInStore !== undefined) {
270-
return;
271-
}
272-
273-
try {
274-
const keys = await store.getKeys();
275-
this.eventCountInStore = keys.length;
276-
} catch (e) {
277-
this.logger?.error(e);
278-
}
279-
}
280-
281-
private async findEventCountInStore(): Promise<unknown> {
282-
if (this.eventStore && this.eventCountInStore === undefined) {
283-
const store = this.eventStore;
284-
this.eventCountWaitPromise = this.eventCountWaitPromise.then(() => this.readEventCountInStore(store));
285-
return this.eventCountWaitPromise;
286-
}
287-
return Promise.resolve();
288-
}
289-
290-
private async storeEvent(eventWithId: EventWithId): Promise<void> {
291-
await this.findEventCountInStore();
292-
if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) {
293-
this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid);
294-
eventWithId.notStored = true;
295-
return;
296-
}
297-
298-
await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => {
299-
if (this.eventCountInStore !== undefined) {
300-
this.eventCountInStore++;
301-
}
302-
}).catch((e) => {
303-
eventWithId.notStored = true;
304-
this.logger?.error(e);
305-
});
306-
}
307-
308262
start(): void {
309263
if (!this.isNew()) {
310264
return;

0 commit comments

Comments
 (0)