Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions lib/event_processor/batch_event_processor.react_native.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { vi, describe, it, expect, beforeEach } from 'vitest';
import { beforeEach, describe, expect, it, vi } from 'vitest';

const mockNetInfo = vi.hoisted(() => {
const netInfo = {
Expand Down Expand Up @@ -45,13 +45,13 @@ vi.mock('@react-native-community/netinfo', () => {
};
});

import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native';
import { getMockRepeater } from '../tests/mock/mock_repeater';
import { getMockAsyncCache } from '../tests/mock/mock_cache';
import { getMockRepeater } from '../tests/mock/mock_repeater';
import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native';

import { createImpressionEvent } from '../tests/mock/create_event';
import { EventWithId } from './batch_event_processor';
import { buildLogEvent } from './event_builder/log_event';
import { createImpressionEvent } from '../tests/mock/create_event';
import { ProcessableEvent } from './event_processor';

const getMockDispatcher = () => {
Expand All @@ -77,6 +77,10 @@ describe('ReactNativeNetInfoEventProcessor', () => {
const dispatchRepeater = getMockRepeater();
const failedEventRepeater = getMockRepeater();

// fail the first dispatch attempt that is made on start
eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 })
.mockResolvedValue({ statusCode: 200 });

const cache = getMockAsyncCache<EventWithId>();
const events: ProcessableEvent[] = [];

Expand All @@ -98,18 +102,27 @@ describe('ReactNativeNetInfoEventProcessor', () => {
processor.start();
await processor.onRunning();

await exhaustMicrotasks();
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);

mockNetInfo.pushState(true);
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
await exhaustMicrotasks();
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);

mockNetInfo.pushState(true);
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
await exhaustMicrotasks();
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
});

it('should retry failed events when network becomes reachable', async () => {
it('should retry failed events immediately when network becomes reachable', async () => {
const eventDispatcher = getMockDispatcher();
const dispatchRepeater = getMockRepeater();
const failedEventRepeater = getMockRepeater();

// fail the first dispatch attempt that is made on start
eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 })
.mockResolvedValue({ statusCode: 200 });

const cache = getMockAsyncCache<EventWithId>();
const events: ProcessableEvent[] = [];

Expand All @@ -131,14 +144,17 @@ describe('ReactNativeNetInfoEventProcessor', () => {
processor.start();
await processor.onRunning();

mockNetInfo.pushState(false);
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
await exhaustMicrotasks();

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);

mockNetInfo.pushState(true);
mockNetInfo.pushState(false);
mockNetInfo.pushState(true);

await exhaustMicrotasks();

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledWith(buildLogEvent(events));
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2);
expect(eventDispatcher.dispatchEvent).toHaveBeenNthCalledWith(2, buildLogEvent(events));
});

it('should unsubscribe from netinfo listener when stopped', async () => {
Expand Down
118 changes: 0 additions & 118 deletions lib/event_processor/batch_event_processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,124 +368,6 @@ describe('BatchEventProcessor', async () => {
expect(events).toEqual(eventsInStore);
});

it('should not store the event in the eventStore but still dispatch if the \
number of pending events is greater than the limit', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue(resolvablePromise().promise);

const eventStore = getMockSyncCache<EventWithId>();

const idGenerator = new IdGenerator();

for (let i = 0; i < 505; i++) {
const event = createImpressionEvent(`id-${i}`);
const cacheId = idGenerator.getId();
await eventStore.set(cacheId, { id: cacheId, event });
}

expect(eventStore.size()).toEqual(505);

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater: getMockRepeater(),
batchSize: 1,
eventStore,
});

processor.start();
await processor.onRunning();

const events: ProcessableEvent[] = [];
for(let i = 0; i < 2; i++) {
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event)
}

expect(eventStore.size()).toEqual(505);
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507);
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]]));
expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]]));
});

it('should store events in the eventStore when the number of events in the store\
becomes lower than the limit', async () => {
const eventDispatcher = getMockDispatcher();

const dispatchResponses: ResolvablePromise<any>[] = [];

const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockImplementation((arg) => {
const dispatchResponse = resolvablePromise();
dispatchResponses.push(dispatchResponse);
return dispatchResponse.promise;
});

const eventStore = getMockSyncCache<EventWithId>();

const idGenerator = new IdGenerator();

for (let i = 0; i < 502; i++) {
const event = createImpressionEvent(`id-${i}`);
const cacheId = String(i);
await eventStore.set(cacheId, { id: cacheId, event });
}

expect(eventStore.size()).toEqual(502);

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater: getMockRepeater(),
batchSize: 1,
eventStore,
});

processor.start();
await processor.onRunning();

let events: ProcessableEvent[] = [];
for(let i = 0; i < 2; i++) {
const event = createImpressionEvent(`id-${i + 502}`);
events.push(event);
await processor.process(event)
}

expect(eventStore.size()).toEqual(502);
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504);

expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]]));
expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]]));

// resolve the dispatch for events not saved in the store
dispatchResponses[502].resolve({ statusCode: 200 });
dispatchResponses[503].resolve({ statusCode: 200 });

await exhaustMicrotasks();
expect(eventStore.size()).toEqual(502);

// resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit
dispatchResponses[0].resolve({ statusCode: 200 });
dispatchResponses[1].resolve({ statusCode: 200 });
dispatchResponses[2].resolve({ statusCode: 200 });

await exhaustMicrotasks();
expect(eventStore.size()).toEqual(499);

// process 2 more events
events = [];
for(let i = 0; i < 2; i++) {
const event = createImpressionEvent(`id-${i + 504}`);
events.push(event);
await processor.process(event)
}

expect(eventStore.size()).toEqual(500);
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506);
expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]]));
expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]]));
});

it('should still dispatch events even if the store save fails', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
Expand Down
54 changes: 4 additions & 50 deletions lib/event_processor/batch_event_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
import { OptimizelyError } from "../error/optimizly_error";
import { sprintf } from "../utils/fns";
import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service";
import { EVENT_STORE_FULL } from "../message/log_message";

export const DEFAULT_MIN_BACKOFF = 1000;
export const DEFAULT_MAX_BACKOFF = 32000;
Expand All @@ -40,7 +39,6 @@ export const MAX_EVENTS_IN_STORE = 500;
export type EventWithId = {
id: string;
event: ProcessableEvent;
notStored?: boolean;
};

export type RetryConfig = {
Expand Down Expand Up @@ -73,9 +71,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
private eventQueue: EventWithId[] = [];
private batchSize: number;
private eventStore?: Store<EventWithId>;
private eventCountInStore: Maybe<number> = undefined;
private eventCountWaitPromise: Promise<unknown> = Promise.resolve();
private maxEventsInStore: number = MAX_EVENTS_IN_STORE;
private dispatchRepeater: Repeater;
private failedEventRepeater?: Repeater;
private idGenerator: IdGenerator = new IdGenerator();
Expand All @@ -96,7 +91,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
this.dispatchRepeater = config.dispatchRepeater;
this.dispatchRepeater.setTask(() => this.flush());

this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE);
this.failedEventRepeater = config.failedEventRepeater;
this.failedEventRepeater?.setTask(() => this.retryFailedEvents());
if (config.logger) {
Expand Down Expand Up @@ -215,9 +209,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
runResult.result.then((res) => {
events.forEach((event) => {
this.eventStore?.remove(event.id);
if (!event.notStored && this.eventCountInStore) {
this.eventCountInStore--;
}
});
return Promise.resolve();
}).catch((err) => {
Expand Down Expand Up @@ -250,7 +241,10 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
event: event,
};

await this.storeEvent(eventWithId);
// store in eventStore first to avoid data loss
await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).catch((e) => {
this.logger?.error(e);
});

if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
this.flush();
Expand All @@ -265,46 +259,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
}
}

private async readEventCountInStore(store: Store<EventWithId>): Promise<void> {
if (this.eventCountInStore !== undefined) {
return;
}

try {
const keys = await store.getKeys();
this.eventCountInStore = keys.length;
} catch (e) {
this.logger?.error(e);
}
}

private async findEventCountInStore(): Promise<unknown> {
if (this.eventStore && this.eventCountInStore === undefined) {
const store = this.eventStore;
this.eventCountWaitPromise = this.eventCountWaitPromise.then(() => this.readEventCountInStore(store));
return this.eventCountWaitPromise;
}
return Promise.resolve();
}

private async storeEvent(eventWithId: EventWithId): Promise<void> {
await this.findEventCountInStore();
if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) {
this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid);
eventWithId.notStored = true;
return;
}

await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => {
if (this.eventCountInStore !== undefined) {
this.eventCountInStore++;
}
}).catch((e) => {
eventWithId.notStored = true;
this.logger?.error(e);
});
}

start(): void {
if (!this.isNew()) {
return;
Expand Down
Loading
Loading