diff --git a/lib/event_processor/batch_event_processor.react_native.spec.ts b/lib/event_processor/batch_event_processor.react_native.spec.ts index 5e17ca966..045631e9f 100644 --- a/lib/event_processor/batch_event_processor.react_native.spec.ts +++ b/lib/event_processor/batch_event_processor.react_native.spec.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { vi, describe, it, expect, beforeEach } from 'vitest'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; const mockNetInfo = vi.hoisted(() => { const netInfo = { @@ -45,13 +45,13 @@ vi.mock('@react-native-community/netinfo', () => { }; }); -import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native'; -import { getMockRepeater } from '../tests/mock/mock_repeater'; import { getMockAsyncCache } from '../tests/mock/mock_cache'; +import { getMockRepeater } from '../tests/mock/mock_repeater'; +import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native'; +import { createImpressionEvent } from '../tests/mock/create_event'; import { EventWithId } from './batch_event_processor'; import { buildLogEvent } from './event_builder/log_event'; -import { createImpressionEvent } from '../tests/mock/create_event'; import { ProcessableEvent } from './event_processor'; const getMockDispatcher = () => { @@ -77,6 +77,10 @@ describe('ReactNativeNetInfoEventProcessor', () => { const dispatchRepeater = getMockRepeater(); const failedEventRepeater = getMockRepeater(); + // fail the first dispatch attempt that is made on start + eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 }) + .mockResolvedValue({ statusCode: 200 }); + const cache = getMockAsyncCache(); const events: ProcessableEvent[] = []; @@ -98,18 +102,27 @@ describe('ReactNativeNetInfoEventProcessor', () => { processor.start(); await processor.onRunning(); + await exhaustMicrotasks(); + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1); + mockNetInfo.pushState(true); - expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled(); + await exhaustMicrotasks(); + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1); mockNetInfo.pushState(true); - expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled(); + await exhaustMicrotasks(); + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1); }); - it('should retry failed events when network becomes reachable', async () => { + it('should retry failed events immediately when network becomes reachable', async () => { const eventDispatcher = getMockDispatcher(); const dispatchRepeater = getMockRepeater(); const failedEventRepeater = getMockRepeater(); + // fail the first dispatch attempt that is made on start + eventDispatcher.dispatchEvent = vi.fn().mockResolvedValueOnce({ statusCode: 500 }) + .mockResolvedValue({ statusCode: 200 }); + const cache = getMockAsyncCache(); const events: ProcessableEvent[] = []; @@ -131,14 +144,17 @@ describe('ReactNativeNetInfoEventProcessor', () => { processor.start(); await processor.onRunning(); - mockNetInfo.pushState(false); - expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled(); + await exhaustMicrotasks(); + + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1); - mockNetInfo.pushState(true); + mockNetInfo.pushState(false); + mockNetInfo.pushState(true); await exhaustMicrotasks(); - expect(eventDispatcher.dispatchEvent).toHaveBeenCalledWith(buildLogEvent(events)); + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2); + expect(eventDispatcher.dispatchEvent).toHaveBeenNthCalledWith(2, buildLogEvent(events)); }); it('should unsubscribe from netinfo listener when stopped', async () => { diff --git a/lib/event_processor/batch_event_processor.spec.ts b/lib/event_processor/batch_event_processor.spec.ts index 6d7674fd5..0fd18c04c 100644 --- a/lib/event_processor/batch_event_processor.spec.ts +++ b/lib/event_processor/batch_event_processor.spec.ts @@ -368,124 +368,6 @@ describe('BatchEventProcessor', async () => { expect(events).toEqual(eventsInStore); }); - it('should not store the event in the eventStore but still dispatch if the \ - number of pending events is greater than the limit', async () => { - const eventDispatcher = getMockDispatcher(); - const mockDispatch: MockInstance = eventDispatcher.dispatchEvent; - mockDispatch.mockResolvedValue(resolvablePromise().promise); - - const eventStore = getMockSyncCache(); - - const idGenerator = new IdGenerator(); - - for (let i = 0; i < 505; i++) { - const event = createImpressionEvent(`id-${i}`); - const cacheId = idGenerator.getId(); - await eventStore.set(cacheId, { id: cacheId, event }); - } - - expect(eventStore.size()).toEqual(505); - - const processor = new BatchEventProcessor({ - eventDispatcher, - dispatchRepeater: getMockRepeater(), - batchSize: 1, - eventStore, - }); - - processor.start(); - await processor.onRunning(); - - const events: ProcessableEvent[] = []; - for(let i = 0; i < 2; i++) { - const event = createImpressionEvent(`id-${i}`); - events.push(event); - await processor.process(event) - } - - expect(eventStore.size()).toEqual(505); - expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507); - expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]])); - expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]])); - }); - - it('should store events in the eventStore when the number of events in the store\ - becomes lower than the limit', async () => { - const eventDispatcher = getMockDispatcher(); - - const dispatchResponses: ResolvablePromise[] = []; - - const mockDispatch: MockInstance = eventDispatcher.dispatchEvent; - mockDispatch.mockImplementation((arg) => { - const dispatchResponse = resolvablePromise(); - dispatchResponses.push(dispatchResponse); - return dispatchResponse.promise; - }); - - const eventStore = getMockSyncCache(); - - const idGenerator = new IdGenerator(); - - for (let i = 0; i < 502; i++) { - const event = createImpressionEvent(`id-${i}`); - const cacheId = String(i); - await eventStore.set(cacheId, { id: cacheId, event }); - } - - expect(eventStore.size()).toEqual(502); - - const processor = new BatchEventProcessor({ - eventDispatcher, - dispatchRepeater: getMockRepeater(), - batchSize: 1, - eventStore, - }); - - processor.start(); - await processor.onRunning(); - - let events: ProcessableEvent[] = []; - for(let i = 0; i < 2; i++) { - const event = createImpressionEvent(`id-${i + 502}`); - events.push(event); - await processor.process(event) - } - - expect(eventStore.size()).toEqual(502); - expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504); - - expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]])); - expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]])); - - // resolve the dispatch for events not saved in the store - dispatchResponses[502].resolve({ statusCode: 200 }); - dispatchResponses[503].resolve({ statusCode: 200 }); - - await exhaustMicrotasks(); - expect(eventStore.size()).toEqual(502); - - // resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit - dispatchResponses[0].resolve({ statusCode: 200 }); - dispatchResponses[1].resolve({ statusCode: 200 }); - dispatchResponses[2].resolve({ statusCode: 200 }); - - await exhaustMicrotasks(); - expect(eventStore.size()).toEqual(499); - - // process 2 more events - events = []; - for(let i = 0; i < 2; i++) { - const event = createImpressionEvent(`id-${i + 504}`); - events.push(event); - await processor.process(event) - } - - expect(eventStore.size()).toEqual(500); - expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506); - expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]])); - expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]])); - }); - it('should still dispatch events even if the store save fails', async () => { const eventDispatcher = getMockDispatcher(); const mockDispatch: MockInstance = eventDispatcher.dispatchEvent; diff --git a/lib/event_processor/batch_event_processor.ts b/lib/event_processor/batch_event_processor.ts index 86f7ff148..ba9931f06 100644 --- a/lib/event_processor/batch_event_processor.ts +++ b/lib/event_processor/batch_event_processor.ts @@ -31,7 +31,6 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message"; import { OptimizelyError } from "../error/optimizly_error"; import { sprintf } from "../utils/fns"; import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service"; -import { EVENT_STORE_FULL } from "../message/log_message"; export const DEFAULT_MIN_BACKOFF = 1000; export const DEFAULT_MAX_BACKOFF = 32000; @@ -40,7 +39,6 @@ export const MAX_EVENTS_IN_STORE = 500; export type EventWithId = { id: string; event: ProcessableEvent; - notStored?: boolean; }; export type RetryConfig = { @@ -73,9 +71,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { private eventQueue: EventWithId[] = []; private batchSize: number; private eventStore?: Store; - private eventCountInStore: Maybe = undefined; - private eventCountWaitPromise: Promise = Promise.resolve(); - private maxEventsInStore: number = MAX_EVENTS_IN_STORE; private dispatchRepeater: Repeater; private failedEventRepeater?: Repeater; private idGenerator: IdGenerator = new IdGenerator(); @@ -96,7 +91,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { this.dispatchRepeater = config.dispatchRepeater; this.dispatchRepeater.setTask(() => this.flush()); - this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE); this.failedEventRepeater = config.failedEventRepeater; this.failedEventRepeater?.setTask(() => this.retryFailedEvents()); if (config.logger) { @@ -215,9 +209,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { runResult.result.then((res) => { events.forEach((event) => { this.eventStore?.remove(event.id); - if (!event.notStored && this.eventCountInStore) { - this.eventCountInStore--; - } }); return Promise.resolve(); }).catch((err) => { @@ -250,7 +241,10 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { event: event, }; - await this.storeEvent(eventWithId); + // store in eventStore first to avoid data loss + await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).catch((e) => { + this.logger?.error(e); + }); if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) { this.flush(); @@ -265,46 +259,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { } } - private async readEventCountInStore(store: Store): Promise { - if (this.eventCountInStore !== undefined) { - return; - } - - try { - const keys = await store.getKeys(); - this.eventCountInStore = keys.length; - } catch (e) { - this.logger?.error(e); - } - } - - private async findEventCountInStore(): Promise { - if (this.eventStore && this.eventCountInStore === undefined) { - const store = this.eventStore; - this.eventCountWaitPromise = this.eventCountWaitPromise.then(() => this.readEventCountInStore(store)); - return this.eventCountWaitPromise; - } - return Promise.resolve(); - } - - private async storeEvent(eventWithId: EventWithId): Promise { - await this.findEventCountInStore(); - if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) { - this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid); - eventWithId.notStored = true; - return; - } - - await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => { - if (this.eventCountInStore !== undefined) { - this.eventCountInStore++; - } - }).catch((e) => { - eventWithId.notStored = true; - this.logger?.error(e); - }); - } - start(): void { if (!this.isNew()) { return; diff --git a/lib/event_processor/event_processor_factory.browser.spec.ts b/lib/event_processor/event_processor_factory.browser.spec.ts index a5d2a6af3..cc12507c8 100644 --- a/lib/event_processor/event_processor_factory.browser.spec.ts +++ b/lib/event_processor/event_processor_factory.browser.spec.ts @@ -37,8 +37,17 @@ vi.mock('../utils/cache/local_storage_cache.browser', () => { return { LocalStorageCache: vi.fn() }; }); -vi.mock('../utils/cache/store', () => { - return { SyncPrefixStore: vi.fn() }; +vi.mock('./event_store', async (importOriginal) => { + const actual: any = await importOriginal() + return { + ...actual, + EventStore: vi.fn(), + } +}); + +vi.mock('../utils/cache/store', async (importOriginal) => { + const actual: any = await importOriginal() + return { ...actual, SyncPrefixStore: vi.fn(), AsyncPrefixStore: vi.fn() }; }); @@ -46,10 +55,11 @@ import defaultEventDispatcher from './event_dispatcher/default_dispatcher.browse import { LocalStorageCache } from '../utils/cache/local_storage_cache.browser'; import { SyncPrefixStore } from '../utils/cache/store'; import { createForwardingEventProcessor, createBatchEventProcessor } from './event_processor_factory.browser'; -import { EVENT_STORE_PREFIX, extractEventProcessor, getForwardingEventProcessor, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; +import { extractEventProcessor, getForwardingEventProcessor, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; import sendBeaconEventDispatcher from './event_dispatcher/send_beacon_dispatcher.browser'; import browserDefaultEventDispatcher from './event_dispatcher/default_dispatcher.browser'; import { getOpaqueBatchEventProcessor } from './event_processor_factory'; +import { EVENT_STORE_PREFIX, EventStore} from './event_store'; describe('createForwardingEventProcessor', () => { const mockGetForwardingEventProcessor = vi.mocked(getForwardingEventProcessor); @@ -81,26 +91,44 @@ describe('createBatchEventProcessor', () => { const mockGetOpaqueBatchEventProcessor = vi.mocked(getOpaqueBatchEventProcessor); const MockLocalStorageCache = vi.mocked(LocalStorageCache); const MockSyncPrefixStore = vi.mocked(SyncPrefixStore); + const MockEventStore = vi.mocked(EventStore); beforeEach(() => { mockGetOpaqueBatchEventProcessor.mockClear(); MockLocalStorageCache.mockClear(); MockSyncPrefixStore.mockClear(); + MockEventStore.mockClear(); }); - it('uses LocalStorageCache and SyncPrefixStore to create eventStore', () => { - const processor = createBatchEventProcessor({}); + it('uses an EventStore instance with AsyncStorageCache and correct options if no eventStore is provided', () => { + const processor = createBatchEventProcessor({ + storeTtl: 60_000, + }); + expect(Object.is(processor, mockGetOpaqueBatchEventProcessor.mock.results[0].value)).toBe(true); const eventStore = mockGetOpaqueBatchEventProcessor.mock.calls[0][0].eventStore; - expect(Object.is(eventStore, MockSyncPrefixStore.mock.results[0].value)).toBe(true); + expect(Object.is(eventStore, MockEventStore.mock.instances[0])).toBe(true); + + let { store, ttl, maxSize } = MockEventStore.mock.calls[0][0]; + expect(Object.is(store, MockLocalStorageCache.mock.instances[0])).toBe(true); + + expect(ttl).toBe(60_000); + expect(maxSize).toBe(500); // the default max size * 2 < 500 + + const processor2 = createBatchEventProcessor({ + storeTtl: 10_000, + batchSize: 260, + }); + + expect(Object.is(processor2, mockGetOpaqueBatchEventProcessor.mock.results[1].value)).toBe(true); + const eventStore2 = mockGetOpaqueBatchEventProcessor.mock.calls[1][0].eventStore; + expect(Object.is(eventStore2, MockEventStore.mock.instances[1])).toBe(true); - const [cache, prefix, transformGet, transformSet] = MockSyncPrefixStore.mock.calls[0]; - expect(Object.is(cache, MockLocalStorageCache.mock.results[0].value)).toBe(true); - expect(prefix).toBe(EVENT_STORE_PREFIX); + ({ store, ttl, maxSize } = MockEventStore.mock.calls[1][0]); + expect(Object.is(store, MockLocalStorageCache.mock.instances[1])).toBe(true); - // transformGet and transformSet should be identity functions - expect(transformGet('value')).toBe('value'); - expect(transformSet('value')).toBe('value'); + expect(ttl).toBe(10_000); + expect(maxSize).toBe(520); // the provided batch size * 2 > 500 }); it('uses the provided eventDispatcher', () => { diff --git a/lib/event_processor/event_processor_factory.browser.ts b/lib/event_processor/event_processor_factory.browser.ts index e73b8bf24..002d5c083 100644 --- a/lib/event_processor/event_processor_factory.browser.ts +++ b/lib/event_processor/event_processor_factory.browser.ts @@ -22,12 +22,13 @@ import { OpaqueEventProcessor, wrapEventProcessor, getForwardingEventProcessor, + getPrefixEventStore, } from './event_processor_factory'; import defaultEventDispatcher from './event_dispatcher/default_dispatcher.browser'; import sendBeaconEventDispatcher from './event_dispatcher/send_beacon_dispatcher.browser'; import { LocalStorageCache } from '../utils/cache/local_storage_cache.browser'; -import { SyncPrefixStore } from '../utils/cache/store'; -import { EVENT_STORE_PREFIX, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; +import { FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; +import { DEFAULT_MAX_EVENTS_IN_STORE, EventStore } from './event_store'; export const DEFAULT_EVENT_BATCH_SIZE = 10; export const DEFAULT_EVENT_FLUSH_INTERVAL = 1_000; @@ -38,17 +39,15 @@ export const createForwardingEventProcessor = ( return wrapEventProcessor(getForwardingEventProcessor(eventDispatcher)); }; -const identity = (v: T): T => v; - export const createBatchEventProcessor = ( options: BatchEventProcessorOptions = {} ): OpaqueEventProcessor => { - const localStorageCache = new LocalStorageCache(); - const eventStore = new SyncPrefixStore( - localStorageCache, EVENT_STORE_PREFIX, - identity, - identity, - ); + const eventStore = options.eventStore ? getPrefixEventStore(options.eventStore) : new EventStore({ + store: new LocalStorageCache(), + maxSize: options.batchSize ? Math.max(options.batchSize * 2, DEFAULT_MAX_EVENTS_IN_STORE) + : DEFAULT_MAX_EVENTS_IN_STORE, + ttl: options.storeTtl, + }); return getOpaqueBatchEventProcessor({ eventDispatcher: options.eventDispatcher || defaultEventDispatcher, @@ -63,5 +62,6 @@ export const createBatchEventProcessor = ( }, failedEventRetryInterval: FAILED_EVENT_RETRY_INTERVAL, eventStore, + storeTtl: options.storeTtl, }); }; diff --git a/lib/event_processor/event_processor_factory.node.spec.ts b/lib/event_processor/event_processor_factory.node.spec.ts index 22b943f19..d94ca4c3e 100644 --- a/lib/event_processor/event_processor_factory.node.spec.ts +++ b/lib/event_processor/event_processor_factory.node.spec.ts @@ -35,16 +35,18 @@ vi.mock('../utils/cache/async_storage_cache.react_native', () => { return { AsyncStorageCache: vi.fn() }; }); -vi.mock('../utils/cache/store', () => { - return { SyncPrefixStore: vi.fn(), AsyncPrefixStore: vi.fn() }; +vi.mock('../utils/cache/store', async (importActual) => { + const actual: any = await importActual(); + return { ...actual, SyncPrefixStore: vi.fn(), AsyncPrefixStore: vi.fn() }; }); import { createBatchEventProcessor, createForwardingEventProcessor } from './event_processor_factory.node'; import nodeDefaultEventDispatcher from './event_dispatcher/default_dispatcher.node'; -import { EVENT_STORE_PREFIX, extractEventProcessor, getForwardingEventProcessor, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; +import { extractEventProcessor, getForwardingEventProcessor, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; import { getOpaqueBatchEventProcessor } from './event_processor_factory'; import { AsyncStore, AsyncPrefixStore, SyncStore, SyncPrefixStore } from '../utils/cache/store'; import { AsyncStorageCache } from '../utils/cache/async_storage_cache.react_native'; +import { EVENT_STORE_PREFIX } from './event_store'; describe('createForwardingEventProcessor', () => { const mockGetForwardingEventProcessor = vi.mocked(getForwardingEventProcessor); diff --git a/lib/event_processor/event_processor_factory.react_native.spec.ts b/lib/event_processor/event_processor_factory.react_native.spec.ts index 630417a5e..6d559690a 100644 --- a/lib/event_processor/event_processor_factory.react_native.spec.ts +++ b/lib/event_processor/event_processor_factory.react_native.spec.ts @@ -37,8 +37,17 @@ vi.mock('../utils/cache/async_storage_cache.react_native', () => { return { AsyncStorageCache: vi.fn() }; }); -vi.mock('../utils/cache/store', () => { - return { SyncPrefixStore: vi.fn(), AsyncPrefixStore: vi.fn() }; +vi.mock('./event_store', async (importOriginal) => { + const actual: any = await importOriginal() + return { + ...actual, + EventStore: vi.fn(), + } +}); + +vi.mock('../utils/cache/store', async (importOriginal) => { + const actual: any = await importOriginal() + return { ...actual, SyncPrefixStore: vi.fn(), AsyncPrefixStore: vi.fn() }; }); vi.mock('@react-native-community/netinfo', () => { @@ -66,10 +75,11 @@ async function mockRequireNetInfo() { import { createForwardingEventProcessor, createBatchEventProcessor } from './event_processor_factory.react_native'; import defaultEventDispatcher from './event_dispatcher/default_dispatcher.browser'; -import { EVENT_STORE_PREFIX, extractEventProcessor, getForwardingEventProcessor, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; +import { extractEventProcessor, getForwardingEventProcessor, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; import { getOpaqueBatchEventProcessor } from './event_processor_factory'; import { AsyncStore, AsyncPrefixStore, SyncStore, SyncPrefixStore } from '../utils/cache/store'; import { AsyncStorageCache } from '../utils/cache/async_storage_cache.react_native'; +import { EVENT_STORE_PREFIX, EventStore } from './event_store'; import { MODULE_NOT_FOUND_REACT_NATIVE_ASYNC_STORAGE } from '../utils/import.react_native/@react-native-async-storage/async-storage'; describe('createForwardingEventProcessor', () => { @@ -103,28 +113,45 @@ describe('createBatchEventProcessor', () => { const MockAsyncStorageCache = vi.mocked(AsyncStorageCache); const MockSyncPrefixStore = vi.mocked(SyncPrefixStore); const MockAsyncPrefixStore = vi.mocked(AsyncPrefixStore); + const MockEventStore = vi.mocked(EventStore); beforeEach(() => { mockGetOpaqueBatchEventProcessor.mockClear(); MockAsyncStorageCache.mockClear(); MockSyncPrefixStore.mockClear(); MockAsyncPrefixStore.mockClear(); + MockEventStore.mockClear(); }); - it('uses AsyncStorageCache and AsyncPrefixStore to create eventStore if no eventStore is provided', () => { - const processor = createBatchEventProcessor({}); + it('uses an EventStore instance with AsyncStorageCache and correct options if no eventStore is provided', () => { + const processor = createBatchEventProcessor({ + storeTtl: 60_000, + }); expect(Object.is(processor, mockGetOpaqueBatchEventProcessor.mock.results[0].value)).toBe(true); const eventStore = mockGetOpaqueBatchEventProcessor.mock.calls[0][0].eventStore; - expect(Object.is(eventStore, MockAsyncPrefixStore.mock.results[0].value)).toBe(true); + expect(Object.is(eventStore, MockEventStore.mock.instances[0])).toBe(true); - const [cache, prefix, transformGet, transformSet] = MockAsyncPrefixStore.mock.calls[0]; - expect(Object.is(cache, MockAsyncStorageCache.mock.results[0].value)).toBe(true); - expect(prefix).toBe(EVENT_STORE_PREFIX); + let { store, ttl, maxSize } = MockEventStore.mock.calls[0][0]; + expect(Object.is(store, MockAsyncStorageCache.mock.instances[0])).toBe(true); + + expect(ttl).toBe(60_000); + expect(maxSize).toBe(500); // the default max size * 2 < 500 + + const processor2 = createBatchEventProcessor({ + storeTtl: 10_000, + batchSize: 260, + }); + + expect(Object.is(processor2, mockGetOpaqueBatchEventProcessor.mock.results[1].value)).toBe(true); + const eventStore2 = mockGetOpaqueBatchEventProcessor.mock.calls[1][0].eventStore; + expect(Object.is(eventStore2, MockEventStore.mock.instances[1])).toBe(true); + + ({ store, ttl, maxSize } = MockEventStore.mock.calls[1][0]); + expect(Object.is(store, MockAsyncStorageCache.mock.instances[1])).toBe(true); - // transformGet and transformSet should be identity functions - expect(transformGet('value')).toBe('value'); - expect(transformSet('value')).toBe('value'); + expect(ttl).toBe(10_000); + expect(maxSize).toBe(520); // the provided batch size * 2 > 500 }); it('should throw error if @react-native-async-storage/async-storage is not available', async () => { diff --git a/lib/event_processor/event_processor_factory.react_native.ts b/lib/event_processor/event_processor_factory.react_native.ts index b46b594a4..db80ba0e8 100644 --- a/lib/event_processor/event_processor_factory.react_native.ts +++ b/lib/event_processor/event_processor_factory.react_native.ts @@ -23,11 +23,11 @@ import { wrapEventProcessor, getForwardingEventProcessor, } from './event_processor_factory'; -import { EVENT_STORE_PREFIX, FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; -import { AsyncPrefixStore } from '../utils/cache/store'; +import { FAILED_EVENT_RETRY_INTERVAL } from './event_processor_factory'; import { EventWithId } from './batch_event_processor'; import { AsyncStorageCache } from '../utils/cache/async_storage_cache.react_native'; import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native'; +import { DEFAULT_MAX_EVENTS_IN_STORE, EventStore } from './event_store'; export const DEFAULT_EVENT_BATCH_SIZE = 10; export const DEFAULT_EVENT_FLUSH_INTERVAL = 1_000; @@ -38,25 +38,15 @@ export const createForwardingEventProcessor = ( return wrapEventProcessor(getForwardingEventProcessor(eventDispatcher)); }; -const identity = (v: T): T => v; - -const getDefaultEventStore = () => { - const asyncStorageCache = new AsyncStorageCache(); - - const eventStore = new AsyncPrefixStore( - asyncStorageCache, - EVENT_STORE_PREFIX, - identity, - identity, - ); - - return eventStore; -} - export const createBatchEventProcessor = ( options: BatchEventProcessorOptions = {} ): OpaqueEventProcessor => { - const eventStore = options.eventStore ? getPrefixEventStore(options.eventStore) : getDefaultEventStore(); + const eventStore = options.eventStore ? getPrefixEventStore(options.eventStore) : new EventStore({ + store: new AsyncStorageCache(), + maxSize: options.batchSize ? Math.max(options.batchSize * 2, DEFAULT_MAX_EVENTS_IN_STORE) + : DEFAULT_MAX_EVENTS_IN_STORE, + ttl: options.storeTtl, + }); return getOpaqueBatchEventProcessor( { diff --git a/lib/event_processor/event_processor_factory.ts b/lib/event_processor/event_processor_factory.ts index 393ce436a..50663f664 100644 --- a/lib/event_processor/event_processor_factory.ts +++ b/lib/event_processor/event_processor_factory.ts @@ -16,19 +16,19 @@ import { LogLevel } from "../logging/logger"; import { StartupLog } from "../service"; +import { AsyncPrefixStore, Store, SyncPrefixStore } from "../utils/cache/store"; +import { validateStore } from "../utils/cache/store_validator"; import { ExponentialBackoff, IntervalRepeater } from "../utils/repeater/repeater"; +import { Maybe } from "../utils/type"; +import { BatchEventProcessor, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, EventWithId, RetryConfig } from "./batch_event_processor"; import { EventDispatcher } from "./event_dispatcher/event_dispatcher"; import { EventProcessor } from "./event_processor"; +import { EVENT_STORE_PREFIX } from "./event_store"; import { ForwardingEventProcessor } from "./forwarding_event_processor"; -import { BatchEventProcessor, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, EventWithId, RetryConfig } from "./batch_event_processor"; -import { AsyncPrefixStore, Store, SyncPrefixStore } from "../utils/cache/store"; -import { Maybe } from "../utils/type"; -import { validateStore } from "../utils/cache/store_validator"; export const INVALID_EVENT_DISPATCHER = 'Invalid event dispatcher'; export const FAILED_EVENT_RETRY_INTERVAL = 20 * 1000; -export const EVENT_STORE_PREFIX = 'optly_event:'; export const getPrefixEventStore = (store: Store): Store => { if (store.operation === 'async') { @@ -59,6 +59,7 @@ export type BatchEventProcessorOptions = { closingEventDispatcher?: EventDispatcher; flushInterval?: number; batchSize?: number; + storeTtl?: number; eventStore?: Store; }; diff --git a/lib/event_processor/event_store.spec.ts b/lib/event_processor/event_store.spec.ts new file mode 100644 index 000000000..e1effb74f --- /dev/null +++ b/lib/event_processor/event_store.spec.ts @@ -0,0 +1,420 @@ +/** + * Copyright 2025, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { vi, describe, it, expect } from 'vitest'; +import { getMockAsyncCache } from '../tests/mock/mock_cache'; +import { EventWithId } from './batch_event_processor'; +import { EventStore, StoredEvent } from './event_store'; +import { createImpressionEvent } from '../tests/mock/create_event'; + +import { DEFAULT_MAX_EVENTS_IN_STORE } from './event_store'; +import { exhaustMicrotasks } from '../tests/testUtils'; +import { EVENT_STORE_FULL } from 'error_message'; +import { OptimizelyError } from '../error/optimizly_error'; + +type TestStoreConfig = { + maxSize?: number; + ttl?: number; +} + +const getEventStore = (config: TestStoreConfig = {}) => { + const mockStore = getMockAsyncCache(); + const store = new EventStore({...config, store: mockStore }); + return { mockStore, store } +} + +describe('EventStore', () => { + it('should be able to get the stored event correctly', async () => { + const { store } = getEventStore(); + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + await store.set('test', event); + const savedEvent = await store.get('test'); + expect(savedEvent).toEqual(expect.objectContaining(event)); + }); + + it('should remove a key correctly', async () => { + const { store } = getEventStore(); + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + await store.set('test-1', event); + let saved1 = await store.get('test-1'); + expect(saved1).toEqual(expect.objectContaining(event)); + + await store.set('test-2', event); + + await store.remove('test-1'); + saved1 = await store.get('test-1'); + expect(saved1).toBeUndefined() + + const saved2 = await store.get('test-2'); + expect(saved2).toEqual(expect.objectContaining(event)); + }); + + it('should return all keys from getKeys', async () => { + const { store } = getEventStore(); + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + }; + + const keys = []; + + for(let i = 0; i < 10; i++) { + keys.push(`test-${i}`) + await store.set(`test-${i}`, event); + } + + const savedKeys = await store.getKeys(); + savedKeys.sort(); + + expect(savedKeys).toEqual(keys); + }); + + it('should limit the number of saved keys when set is called concurrently', async () => { + const { store } = getEventStore(); + + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + const keys = []; + const promises = []; + + for(let i = 0; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) { + keys.push(i + '') + const res = store.set(i + '', event); + res.catch(() => {}); + promises.push(res); + } + + for(let i = 0; i < DEFAULT_MAX_EVENTS_IN_STORE; i++) { + await expect(promises[i]).resolves.not.toThrow(); + } + + for(let i = 0; i < 10; i++) { + await expect(promises[DEFAULT_MAX_EVENTS_IN_STORE + i]).rejects.toThrow(); + } + + const savedKeys = await store.getKeys(); + savedKeys.sort((a, b) => Number(a) - Number(b)); + + expect(savedKeys).toEqual(keys.slice(0, DEFAULT_MAX_EVENTS_IN_STORE)); + }); + + it('should limit the number of saved keys when set is called serially', async () => { + const { store } = getEventStore({ maxSize: 2 }); + + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + await expect(store.set('event-1', event)).resolves.not.toThrow(); + await expect(store.set('event-2', event)).resolves.not.toThrow(); + await expect(store.set('event-3', event)).rejects.toThrow(new OptimizelyError(EVENT_STORE_FULL, event.id)); + + const savedKeys = await store.getKeys(); + savedKeys.sort(); + + expect(savedKeys).toEqual(['event-1', 'event-2']); + }); + + it('should save keys again when the number of stored events drops below maxSize', async () => { + const { store } = getEventStore(); + + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + const keys = [] + for(let i = 0; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) { + keys.push(i + ''); + await store.set(i + '', event).catch(() => {}); + } + + const savedKeys = await store.getKeys(); + savedKeys.sort((a, b) => Number(a) - Number(b)); + + expect(savedKeys).toEqual(keys.slice(0, DEFAULT_MAX_EVENTS_IN_STORE)); + + for(let i = 0; i < 10; i++) { + const keyToRemove = i + ''; + await store.remove(keyToRemove); + } + + for(let i = DEFAULT_MAX_EVENTS_IN_STORE; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) { + const keyToAdd = i + ''; + await expect(store.set(keyToAdd, event)).resolves.not.toThrow(); + } + + const finalKeys = await store.getKeys(); + finalKeys.sort((a, b) => Number(a) - Number(b)); + + const expectedKeys = []; + for(let i = 10; i < DEFAULT_MAX_EVENTS_IN_STORE + 10; i++) { + expectedKeys.push(i + ''); + } + + expect(finalKeys).toEqual(expectedKeys); + }); + + it('should expire events after ttl', async () => { + const ttl = 100; // 100 ms + const { store } = getEventStore({ ttl }); + + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + await store.set('test', event); + const savedEvent = await store.get('test'); + expect(savedEvent).toEqual(expect.objectContaining(event)); + + // wait for ttl to expire + await new Promise(resolve => setTimeout(resolve, ttl + 50)); + + const expiredEvent = await store.get('test'); + expect(expiredEvent).toBeUndefined(); + + expect(await store.getKeys()).toEqual([]); + }); + + it('should resave events without expiresAt on get', async () => { + const ttl = 120_000; + const { mockStore, store } = getEventStore({ ttl }); + + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + const originalSet = mockStore.set.bind(mockStore); + + let call = 0; + const setSpy = vi.spyOn(mockStore, 'set').mockImplementation(async (key: string, value: StoredEvent) => { + if (call++ > 0) { + return originalSet(key, value); + } + + // Simulate old stored event without expiresAt + const eventWithoutExpiresAt: StoredEvent = { + id: value.id, + event: value.event, + }; + return originalSet(key, eventWithoutExpiresAt); + }); + + await store.set('test', event); + + const savedEvent = await store.get('test'); + expect(savedEvent).toEqual(expect.objectContaining(event)); + + + await exhaustMicrotasks(); + expect(setSpy).toHaveBeenCalledTimes(2); + + const secondCall = setSpy.mock.calls[1]; + + expect(secondCall[1].expiresAt).toBeDefined(); + expect(secondCall[1].expiresAt!).toBeGreaterThanOrEqual(Date.now() + ttl - 10); + }); + + it('should store event when key expires after store being full', async () => { + const ttl = 100; + const { store } = getEventStore({ ttl, maxSize: 2 }); + + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + await store.set('event-1', event); + await store.set('event-2', event); + await expect(store.set('event-3', event)).rejects.toThrow(); + + expect(await store.getKeys().then(keys => keys.sort())).toEqual(['event-1', 'event-2']); + + // wait for ttl to expire + await new Promise(resolve => setTimeout(resolve, ttl + 50)); + + // both events should be expired now + expect(await store.get('event-1')).toBeUndefined(); + expect(await store.get('event-2')).toBeUndefined(); + + await exhaustMicrotasks(); + + // should be able to add new events now + await expect(store.set('event-3', event)).resolves.not.toThrow(); + await expect(store.set('event-4', event)).resolves.not.toThrow(); + + const savedEvent3 = await store.get('event-3'); + expect(savedEvent3).toEqual(expect.objectContaining(event)); + const savedEvent4 = await store.get('event-4'); + expect(savedEvent4).toEqual(expect.objectContaining(event)); + }); + + it('should return all requested events correctly from getBatched', async () => { + const { store } = getEventStore(); + const event1: EventWithId = { id: '1', event: createImpressionEvent('test-1') }; + const event2: EventWithId = { id: '2', event: createImpressionEvent('test-2') }; + const event3: EventWithId = { id: '3', event: createImpressionEvent('test-3') }; + + await store.set('key-1', event1); + await store.set('key-2', event2); + await store.set('key-3', event3); + + const results = await store.getBatched(['key-3', 'key-1', 'key-2', 'key-4']); + + expect(results).toHaveLength(4); + expect(results[0]).toEqual(expect.objectContaining(event3)); + expect(results[1]).toEqual(expect.objectContaining(event1)); + expect(results[2]).toEqual(expect.objectContaining(event2)); + expect(results[3]).toBeUndefined(); + }); + + + it('should handle expired events in getBatched', async () => { + const ttl = 100; + const { store } = getEventStore({ ttl }); + const event1: EventWithId = { id: '1', event: createImpressionEvent('test-1') }; + const event2: EventWithId = { id: '2', event: createImpressionEvent('test-2') }; + + await store.set('key-1', event1); + await new Promise(resolve => setTimeout(resolve, 70)); + await store.set('key-2', event2); + + // wait for first key to expire but not the second + await new Promise(resolve => setTimeout(resolve, 50)); + + const results = await store.getBatched(['key-1', 'key-2']); + + expect(results).toHaveLength(2); + expect(results[0]).toBeUndefined(); + expect(results[1]).toEqual(expect.objectContaining(event2)); + + await expect(store.getKeys()).resolves.toEqual(['key-2']); + }); + + it('should resave events without expiresAt during getBatched', async () => { + const ttl = 120_000; + const { mockStore, store } = getEventStore({ ttl }); + const event: EventWithId = { id: '1', event: createImpressionEvent('test') }; + + const originalSet = mockStore.set.bind(mockStore); + + let call = 0; + const setSpy = vi.spyOn(mockStore, 'set').mockImplementation(async (key: string, value: StoredEvent) => { + if (call++ > 0) { + return originalSet(key, value); + } + + // Simulate old stored event without expiresAt + const eventWithoutExpiresAt: StoredEvent = { + id: value.id, + event: value.event, + }; + return originalSet(key, eventWithoutExpiresAt); + }); + + await store.set('key-1', event); + await store.set('key-2', event); + + const results = await store.getBatched(['key-1', 'key-2']); + + expect(results).toHaveLength(2); + expect(results[0]).toEqual(expect.objectContaining(event)); + expect(results[1]).toEqual(expect.objectContaining(event)); + + await exhaustMicrotasks(); + expect(setSpy).toHaveBeenCalledTimes(3); + + const secondCall = setSpy.mock.calls[1]; + + expect(secondCall[1].expiresAt).toBeDefined(); + expect(secondCall[1].expiresAt!).toBeGreaterThanOrEqual(Date.now() + ttl - 10); + }); + + it('should store event when keys expire during getBatched after store being full', async () => { + const ttl = 100; + const { store } = getEventStore({ ttl, maxSize: 2 }); + + const event: EventWithId = { + id: '1', + event: createImpressionEvent('test'), + } + + await store.set('event-1', event); + await new Promise(resolve => setTimeout(resolve, 70)); + await store.set('event-2', event); + await expect(store.set('event-3', event)).rejects.toThrow(); + + expect(await store.getKeys().then(keys => keys.sort())).toEqual(['event-1', 'event-2']); + + // wait for the first event to expire + await new Promise(resolve => setTimeout(resolve, 50)); + + const results = await store.getBatched(['event-1', 'event-2']); + + expect(results).toHaveLength(2); + expect(results[0]).toBeUndefined(); + expect(results[1]).toEqual(expect.objectContaining(event)); + + await exhaustMicrotasks(); + + // should be able to add new event now + await expect(store.set('event-3', event)).resolves.not.toThrow(); + const savedEvent = await store.get('event-3'); + expect(savedEvent).toEqual(expect.objectContaining(event)); + expect(await store.getKeys().then(keys => keys.sort())).toEqual(['event-2', 'event-3']); + }); + + it('should restore in-memory key consistency after getKeys is called', async () => { + const { mockStore, store } = getEventStore({ maxSize: 2 }); + const event: EventWithId = { id: '1', event: createImpressionEvent('test') }; + + const originalSet = mockStore.set.bind(mockStore); + + let call = 0; + vi.spyOn(mockStore, 'set').mockImplementation(async (key: string, value: StoredEvent) => { + // only the seconde set call should fail + if (call++ != 1) return originalSet(key, value); + return Promise.reject(new Error('Simulated set failure')); + }); + + await expect(store.set('key-1', event)).resolves.not.toThrow(); + // this should fail, but in memory key list will become full + await expect(store.set('key-2', event)).rejects.toThrow('Simulated set failure'); + await expect(store.set('key-3', event)).rejects.toThrow(new OptimizelyError(EVENT_STORE_FULL, event.id)); + + + let keys = await store.getKeys(); + expect(keys.sort()).toEqual(['key-1']); + + await expect(store.set('key-3', event)).resolves.not.toThrow(); + + keys = await store.getKeys(); + expect(keys.sort()).toEqual(['key-1', 'key-3']); + }); +}); diff --git a/lib/event_processor/event_store.ts b/lib/event_processor/event_store.ts new file mode 100644 index 000000000..d8390020e --- /dev/null +++ b/lib/event_processor/event_store.ts @@ -0,0 +1,146 @@ +import { OptimizelyError } from "../error/optimizly_error"; +import { LoggerFacade } from "../logging/logger"; +import { EVENT_STORE_FULL } from "error_message"; +import { + AsyncPrefixStore, + AsyncStore, + AsyncStoreWithBatchedGet, + Store, + StoreWithBatchedGet, + SyncPrefixStore +} from "../utils/cache/store"; +import { SerialRunner } from "../utils/executor/serial_runner"; +import { Maybe } from "../utils/type"; +import { EventWithId } from "./batch_event_processor"; + +export type StoredEvent = EventWithId & { + expiresAt?: number; +}; + +const identity = (v: T): T => v; + +const LOGGER_NAME = 'EventStore'; +export const DEFAULT_MAX_EVENTS_IN_STORE = 500; +export const DEFAULT_STORE_TTL = 10 * 24 * 60 * 60 * 1000; // 10 days + +export const EVENT_STORE_PREFIX = 'optly_event:'; + +export type EventStoreConfig = { + maxSize?: number; + ttl?: number, + store: Store, + logger?: LoggerFacade, +}; + +export class EventStore extends AsyncStoreWithBatchedGet implements AsyncStore { + readonly operation = 'async'; + + private store: StoreWithBatchedGet; + private serializer: SerialRunner = new SerialRunner(); + private logger?: LoggerFacade; + private maxSize: number; + private ttl: number; + private keys?: Set; + + constructor(config: EventStoreConfig) { + super(); + + const { + maxSize, + ttl, + store, + logger + } = config; + + if (store.operation === 'sync') { + this.store = new SyncPrefixStore(store, EVENT_STORE_PREFIX, identity, identity); + } else { + this.store = new AsyncPrefixStore(store, EVENT_STORE_PREFIX, identity, identity); + } + + if (logger) { + logger.setName(LOGGER_NAME); + this.logger = logger; + } + + this.maxSize = maxSize || DEFAULT_MAX_EVENTS_IN_STORE; + this.ttl = ttl || DEFAULT_STORE_TTL; + } + + private async readKeys() { + return this.serializer.run(async () => { + if (this.keys !== undefined) { + return; + } + + try { + this.keys = new Set(await this.getKeys()); + } catch (err) { + this.logger?.error(err); + } + }); + } + + async set(key: string, event: EventWithId): Promise { + await this.readKeys(); + + // readKeys might have failed, in that case we cannot enforce max size + // that means, the store might grow beyond max size in failure scenarios + if (this.keys !== undefined && this.keys.size >= this.maxSize) { + return Promise.reject(new OptimizelyError(EVENT_STORE_FULL, event.event.uuid)); + } + + // this.store.set() might fail and cause the in memory set of keys to + // diverge from the actual stored key list. But the in memory set needs + // to be updated before the store set to limit the eventCount + // when concurrent set are present. Even if the store set fails, it will + // still keep the stored event count below maxSize (it will underfill the store). + // next getKeys() should fix the discrepency. + this.keys?.add(key); + return this.store.set(key, { ...event, expiresAt: Date.now() + this.ttl }); + } + + private processStoredEvent(key: string, value: StoredEvent | undefined): Maybe { + if (!value) return undefined; + + // if there is events in the stored saved by old version of the sdk, + // they will not have the storedAt time, update them with the current time + // before returning + + if (value.expiresAt === undefined) { + value.expiresAt = Date.now() + this.ttl; + this.set(key, value).catch(() => {}); + return value; + } + + if (value.expiresAt <= Date.now()) { + this.remove(key).catch(() => {}); + return undefined; + } + + return value; + } + + async get(key: string): Promise { + const value = await this.store.get(key); + + return this.processStoredEvent(key, value); + } + + async remove(key: string): Promise { + await this.store.remove(key); + this.keys?.delete(key); + return; + } + + async getKeys(): Promise{ + const keys = await this.store.getKeys(); + this.keys = new Set(keys); + return keys; + } + + async getBatched(keys: string[]): Promise[]> { + const values = await this.store.getBatched(keys); + return values.map((value, index) => this.processStoredEvent(keys[index], value)); + } +} diff --git a/lib/message/error_message.ts b/lib/message/error_message.ts index 720baa377..ab84e38ed 100644 --- a/lib/message/error_message.ts +++ b/lib/message/error_message.ts @@ -96,5 +96,6 @@ export const CMAB_FETCH_FAILED = 'CMAB decision fetch failed with status: %s'; export const INVALID_CMAB_FETCH_RESPONSE = 'Invalid CMAB fetch response'; export const PROMISE_NOT_ALLOWED = "Promise value is not allowed in sync operation"; export const SERVICE_NOT_RUNNING = "%s not running"; +export const EVENT_STORE_FULL = 'Event store is full. Not saving event with id %d.'; export const messages: string[] = []; diff --git a/lib/message/log_message.ts b/lib/message/log_message.ts index aaf5a9e36..b4757e2d3 100644 --- a/lib/message/log_message.ts +++ b/lib/message/log_message.ts @@ -60,7 +60,6 @@ export const USER_HAS_NO_FORCED_VARIATION_FOR_EXPERIMENT = 'No experiment %s mapped to user %s in the forced variation map.'; export const INVALID_EXPERIMENT_KEY_INFO = 'Experiment key %s is not in datafile. It is either invalid, paused, or archived.'; -export const EVENT_STORE_FULL = 'Event store is full. Not saving event with id %d.'; export const IGNORE_CMAB_CACHE = 'Ignoring CMAB cache for user %s and rule %s.'; export const RESET_CMAB_CACHE = 'Resetting CMAB cache for user %s and rule %s.'; export const INVALIDATE_CMAB_CACHE = 'Invalidating CMAB cache for user %s and rule %s.'; diff --git a/lib/tests/mock/mock_cache.ts b/lib/tests/mock/mock_cache.ts index 21a89e7a4..0be53f534 100644 --- a/lib/tests/mock/mock_cache.ts +++ b/lib/tests/mock/mock_cache.ts @@ -80,42 +80,54 @@ export const getMockSyncCache = (): SyncCacheWithAddOn & SyncStoreWithAddO return cache; }; +const wait = () => Promise.resolve(); export const getMockAsyncCache = (): AsyncCacheWithAddOn & AsyncStoreWithAddOn => { const cache = { operation: 'async' as const, data: new Map(), async remove(key: string): Promise { + await wait(); this.data.delete(key); }, async clear(): Promise { + await wait(); this.data.clear(); }, async reset(): Promise { + await wait(); this.clear(); }, async getKeys(): Promise { + await wait(); return Array.from(this.data.keys()); }, async getAll(): Promise> { + await wait(); return this.data; }, async getBatched(keys: string[]): Promise[]> { + await wait(); return Promise.all(keys.map((key) => this.get(key))); }, async size(): Promise { + await wait(); return this.data.size; }, async get(key: string): Promise> { + await wait(); return this.data.get(key); }, async lookup(key: string): Promise> { + await wait(); return this.get(key); }, async set(key: string, value: T): Promise { + await wait(); this.data.set(key, value); }, async save(key: string, value: T): Promise { + await wait(); return this.set(key, value); } } diff --git a/lib/utils/cache/store.ts b/lib/utils/cache/store.ts index c13852f65..c2df7bb66 100644 --- a/lib/utils/cache/store.ts +++ b/lib/utils/cache/store.ts @@ -48,6 +48,8 @@ export abstract class AsyncStoreWithBatchedGet implements AsyncStore { abstract getBatched(keys: string[]): Promise[]>; } +export type StoreWithBatchedGet = SyncStoreWithBatchedGet | AsyncStoreWithBatchedGet; + export const getBatchedSync = (store: SyncStore, keys: string[]): Maybe[] => { if (store instanceof SyncStoreWithBatchedGet) { return store.getBatched(keys);