Skip to content

Commit 4435ca7

Browse files
committed
update
1 parent e201425 commit 4435ca7

11 files changed

+65
-211
lines changed

lib/event_processor/batch_event_processor.react_native.spec.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ import { EventWithId } from './batch_event_processor';
5353
import { buildLogEvent } from './event_builder/log_event';
5454
import { createImpressionEvent } from '../tests/mock/create_event';
5555
import { ProcessableEvent } from './event_processor';
56+
import { wait } from '../tests/testUtils';
57+
import { mock } from 'node:test';
5658

5759
const getMockDispatcher = () => {
5860
return {
@@ -77,6 +79,10 @@ describe('ReactNativeNetInfoEventProcessor', () => {
7779
const dispatchRepeater = getMockRepeater();
7880
const failedEventRepeater = getMockRepeater();
7981

82+
// fail the first dispatch attempt that is made on start
83+
eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 })
84+
.mockResolvedValue({ statusCode: 200 });
85+
8086
const cache = getMockAsyncCache<EventWithId>();
8187
const events: ProcessableEvent[] = [];
8288

@@ -98,18 +104,27 @@ describe('ReactNativeNetInfoEventProcessor', () => {
98104
processor.start();
99105
await processor.onRunning();
100106

107+
await exhaustMicrotasks();
108+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
109+
101110
mockNetInfo.pushState(true);
102-
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
111+
await exhaustMicrotasks();
112+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
103113

104114
mockNetInfo.pushState(true);
105-
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
115+
await exhaustMicrotasks();
116+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
106117
});
107118

108-
it('should retry failed events when network becomes reachable', async () => {
119+
it('should retry failed events immediately when network becomes reachable', async () => {
109120
const eventDispatcher = getMockDispatcher();
110121
const dispatchRepeater = getMockRepeater();
111122
const failedEventRepeater = getMockRepeater();
112123

124+
// fail the first dispatch attempt that is made on start
125+
eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 })
126+
.mockResolvedValue({ statusCode: 200 });
127+
113128
const cache = getMockAsyncCache<EventWithId>();
114129
const events: ProcessableEvent[] = [];
115130

@@ -131,14 +146,17 @@ describe('ReactNativeNetInfoEventProcessor', () => {
131146
processor.start();
132147
await processor.onRunning();
133148

134-
mockNetInfo.pushState(false);
135-
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
149+
await exhaustMicrotasks();
150+
151+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
136152

137-
mockNetInfo.pushState(true);
153+
mockNetInfo.pushState(false);
154+
mockNetInfo.pushState(true);
138155

139156
await exhaustMicrotasks();
140157

141-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledWith(buildLogEvent(events));
158+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2);
159+
expect(eventDispatcher.dispatchEvent).toHaveBeenNthCalledWith(2, buildLogEvent(events));
142160
});
143161

144162
it('should unsubscribe from netinfo listener when stopped', async () => {

lib/event_processor/batch_event_processor.spec.ts

Lines changed: 0 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -368,124 +368,6 @@ describe('BatchEventProcessor', async () => {
368368
expect(events).toEqual(eventsInStore);
369369
});
370370

371-
it('should not store the event in the eventStore but still dispatch if the \
372-
number of pending events is greater than the limit', async () => {
373-
const eventDispatcher = getMockDispatcher();
374-
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
375-
mockDispatch.mockResolvedValue(resolvablePromise().promise);
376-
377-
const eventStore = getMockSyncCache<EventWithId>();
378-
379-
const idGenerator = new IdGenerator();
380-
381-
for (let i = 0; i < 505; i++) {
382-
const event = createImpressionEvent(`id-${i}`);
383-
const cacheId = idGenerator.getId();
384-
await eventStore.set(cacheId, { id: cacheId, event });
385-
}
386-
387-
expect(eventStore.size()).toEqual(505);
388-
389-
const processor = new BatchEventProcessor({
390-
eventDispatcher,
391-
dispatchRepeater: getMockRepeater(),
392-
batchSize: 1,
393-
eventStore,
394-
});
395-
396-
processor.start();
397-
await processor.onRunning();
398-
399-
const events: ProcessableEvent[] = [];
400-
for(let i = 0; i < 2; i++) {
401-
const event = createImpressionEvent(`id-${i}`);
402-
events.push(event);
403-
await processor.process(event)
404-
}
405-
406-
expect(eventStore.size()).toEqual(505);
407-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507);
408-
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]]));
409-
expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]]));
410-
});
411-
412-
it('should store events in the eventStore when the number of events in the store\
413-
becomes lower than the limit', async () => {
414-
const eventDispatcher = getMockDispatcher();
415-
416-
const dispatchResponses: ResolvablePromise<any>[] = [];
417-
418-
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
419-
mockDispatch.mockImplementation((arg) => {
420-
const dispatchResponse = resolvablePromise();
421-
dispatchResponses.push(dispatchResponse);
422-
return dispatchResponse.promise;
423-
});
424-
425-
const eventStore = getMockSyncCache<EventWithId>();
426-
427-
const idGenerator = new IdGenerator();
428-
429-
for (let i = 0; i < 502; i++) {
430-
const event = createImpressionEvent(`id-${i}`);
431-
const cacheId = String(i);
432-
await eventStore.set(cacheId, { id: cacheId, event });
433-
}
434-
435-
expect(eventStore.size()).toEqual(502);
436-
437-
const processor = new BatchEventProcessor({
438-
eventDispatcher,
439-
dispatchRepeater: getMockRepeater(),
440-
batchSize: 1,
441-
eventStore,
442-
});
443-
444-
processor.start();
445-
await processor.onRunning();
446-
447-
let events: ProcessableEvent[] = [];
448-
for(let i = 0; i < 2; i++) {
449-
const event = createImpressionEvent(`id-${i + 502}`);
450-
events.push(event);
451-
await processor.process(event)
452-
}
453-
454-
expect(eventStore.size()).toEqual(502);
455-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504);
456-
457-
expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]]));
458-
expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]]));
459-
460-
// resolve the dispatch for events not saved in the store
461-
dispatchResponses[502].resolve({ statusCode: 200 });
462-
dispatchResponses[503].resolve({ statusCode: 200 });
463-
464-
await exhaustMicrotasks();
465-
expect(eventStore.size()).toEqual(502);
466-
467-
// resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit
468-
dispatchResponses[0].resolve({ statusCode: 200 });
469-
dispatchResponses[1].resolve({ statusCode: 200 });
470-
dispatchResponses[2].resolve({ statusCode: 200 });
471-
472-
await exhaustMicrotasks();
473-
expect(eventStore.size()).toEqual(499);
474-
475-
// process 2 more events
476-
events = [];
477-
for(let i = 0; i < 2; i++) {
478-
const event = createImpressionEvent(`id-${i + 504}`);
479-
events.push(event);
480-
await processor.process(event)
481-
}
482-
483-
expect(eventStore.size()).toEqual(500);
484-
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506);
485-
expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]]));
486-
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]]));
487-
});
488-
489371
it('should still dispatch events even if the store save fails', async () => {
490372
const eventDispatcher = getMockDispatcher();
491373
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;

lib/event_processor/batch_event_processor.ts

Lines changed: 6 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
3131
import { OptimizelyError } from "../error/optimizly_error";
3232
import { sprintf } from "../utils/fns";
3333
import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service";
34-
import { EVENT_STORE_FULL } from "../message/log_message";
3534

3635
export const DEFAULT_MIN_BACKOFF = 1000;
3736
export const DEFAULT_MAX_BACKOFF = 32000;
@@ -40,7 +39,7 @@ export const MAX_EVENTS_IN_STORE = 500;
4039
export type EventWithId = {
4140
id: string;
4241
event: ProcessableEvent;
43-
notStored?: boolean;
42+
// notStored?: boolean;
4443
};
4544

4645
export type RetryConfig = {
@@ -73,9 +72,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
7372
private eventQueue: EventWithId[] = [];
7473
private batchSize: number;
7574
private eventStore?: Store<EventWithId>;
76-
private eventCountInStore: Maybe<number> = undefined;
77-
private eventCountWaitPromise: Promise<unknown> = Promise.resolve();
78-
private maxEventsInStore: number = MAX_EVENTS_IN_STORE;
7975
private dispatchRepeater: Repeater;
8076
private failedEventRepeater?: Repeater;
8177
private idGenerator: IdGenerator = new IdGenerator();
@@ -96,7 +92,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
9692
this.dispatchRepeater = config.dispatchRepeater;
9793
this.dispatchRepeater.setTask(() => this.flush());
9894

99-
this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE);
95+
// this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE);
10096
this.failedEventRepeater = config.failedEventRepeater;
10197
this.failedEventRepeater?.setTask(() => this.retryFailedEvents());
10298
if (config.logger) {
@@ -215,9 +211,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
215211
runResult.result.then((res) => {
216212
events.forEach((event) => {
217213
this.eventStore?.remove(event.id);
218-
if (!event.notStored && this.eventCountInStore) {
219-
this.eventCountInStore--;
220-
}
221214
});
222215
return Promise.resolve();
223216
}).catch((err) => {
@@ -250,7 +243,10 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
250243
event: event,
251244
};
252245

253-
await this.storeEvent(eventWithId);
246+
// store in eventStore first to avoid data loss
247+
await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).catch((e) => {
248+
this.logger?.error(e);
249+
});
254250

255251
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
256252
this.flush();
@@ -265,46 +261,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
265261
}
266262
}
267263

268-
private async readEventCountInStore(store: Store<EventWithId>): Promise<void> {
269-
if (this.eventCountInStore !== undefined) {
270-
return;
271-
}
272-
273-
try {
274-
const keys = await store.getKeys();
275-
this.eventCountInStore = keys.length;
276-
} catch (e) {
277-
this.logger?.error(e);
278-
}
279-
}
280-
281-
private async findEventCountInStore(): Promise<unknown> {
282-
if (this.eventStore && this.eventCountInStore === undefined) {
283-
const store = this.eventStore;
284-
this.eventCountWaitPromise = this.eventCountWaitPromise.then(() => this.readEventCountInStore(store));
285-
return this.eventCountWaitPromise;
286-
}
287-
return Promise.resolve();
288-
}
289-
290-
private async storeEvent(eventWithId: EventWithId): Promise<void> {
291-
await this.findEventCountInStore();
292-
if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) {
293-
this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid);
294-
eventWithId.notStored = true;
295-
return;
296-
}
297-
298-
await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => {
299-
if (this.eventCountInStore !== undefined) {
300-
this.eventCountInStore++;
301-
}
302-
}).catch((e) => {
303-
eventWithId.notStored = true;
304-
this.logger?.error(e);
305-
});
306-
}
307-
308264
start(): void {
309265
if (!this.isNew()) {
310266
return;

lib/event_processor/event_processor_factory.browser.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ import {
2222
OpaqueEventProcessor,
2323
wrapEventProcessor,
2424
getForwardingEventProcessor,
25+
transformEventStore,
26+
getPrefixEventStore,
2527
} from './event_processor_factory';
2628
import defaultEventDispatcher from './event_dispatcher/default_dispatcher.browser';
2729
import sendBeaconEventDispatcher from './event_dispatcher/send_beacon_dispatcher.browser';
2830
import { LocalStorageCache } from '../utils/cache/local_storage_cache.browser';
29-
import { SyncPrefixStore } from '../utils/cache/store';
30-
import { EVENT_STORE_PREFIX, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory';
31+
import { FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory';
32+
import { DEFAULT_MAX_EVENTS_IN_STORE, EventStore } from './event_store';
3133

3234
export const DEFAULT_EVENT_BATCH_SIZE = 10;
3335
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1_000;
@@ -38,17 +40,15 @@ export const createForwardingEventProcessor = (
3840
return wrapEventProcessor(getForwardingEventProcessor(eventDispatcher));
3941
};
4042

41-
const identity = <T>(v: T): T => v;
42-
4343
export const createBatchEventProcessor = (
4444
options: BatchEventProcessorOptions = {}
4545
): OpaqueEventProcessor => {
46-
const localStorageCache = new LocalStorageCache<EventWithId>();
47-
const eventStore = new SyncPrefixStore<EventWithId, EventWithId>(
48-
localStorageCache, EVENT_STORE_PREFIX,
49-
identity,
50-
identity,
51-
);
46+
const eventStore = options.eventStore ? getPrefixEventStore(options.eventStore) : new EventStore({
47+
store: new LocalStorageCache<EventWithId>(),
48+
maxSize: options.batchSize ? Math.max(options.batchSize * 2, DEFAULT_MAX_EVENTS_IN_STORE)
49+
: DEFAULT_MAX_EVENTS_IN_STORE,
50+
ttl: options.storeTtl,
51+
});
5252

5353
return getOpaqueBatchEventProcessor({
5454
eventDispatcher: options.eventDispatcher || defaultEventDispatcher,
@@ -63,5 +63,6 @@ export const createBatchEventProcessor = (
6363
},
6464
failedEventRetryInterval: FAILED_EVENT_RETRY_INTERVAL,
6565
eventStore,
66+
storeTtl: options.storeTtl,
6667
});
6768
};

lib/event_processor/event_processor_factory.react_native.spec.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,10 @@ describe('createBatchEventProcessor', () => {
111111
MockAsyncPrefixStore.mockClear();
112112
});
113113

114-
it('uses AsyncStorageCache and AsyncPrefixStore to create eventStore if no eventStore is provided', () => {
115-
const processor = createBatchEventProcessor({});
114+
it('uses an EventStore instance with correct options if no eventStore is provided', () => {
115+
const processor = createBatchEventProcessor({
116+
storeTtl: 60_000,
117+
});
116118

117119
expect(Object.is(processor, mockGetOpaqueBatchEventProcessor.mock.results[0].value)).toBe(true);
118120
const eventStore = mockGetOpaqueBatchEventProcessor.mock.calls[0][0].eventStore;

0 commit comments

Comments
 (0)