@@ -10,10 +10,12 @@ import type {
1010import { EnvironmentVariablesService } from '../config' ;
1111import { IdempotencyRecord } from './IdempotencyRecord' ;
1212import { BasePersistenceLayerInterface } from './BasePersistenceLayerInterface' ;
13- import { IdempotencyValidationError } from '../Exceptions' ;
13+ import { IdempotencyItemAlreadyExistsError , IdempotencyValidationError } from '../Exceptions' ;
14+ import { LRUCache } from './LRUCache' ;
1415
1516abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
1617 public idempotencyKeyPrefix : string ;
18+ private cache ?: LRUCache < string , IdempotencyRecord > ;
1719 private configured : boolean = false ;
1820 // envVarsService is always initialized in the constructor
1921 private envVarsService ! : EnvironmentVariablesService ;
@@ -25,7 +27,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
2527 private useLocalCache : boolean = false ;
2628 private validationKeyJmesPath ?: string ;
2729
28- public constructor ( ) {
30+ public constructor ( ) {
2931 this . envVarsService = new EnvironmentVariablesService ( ) ;
3032 this . idempotencyKeyPrefix = this . getEnvVarsService ( ) . getFunctionName ( ) ;
3133 }
@@ -55,7 +57,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
5557 this . throwOnNoIdempotencyKey = idempotencyConfig ?. throwOnNoIdempotencyKey || false ;
5658 this . eventKeyJmesPath = idempotencyConfig . eventKeyJmesPath ;
5759 this . expiresAfterSeconds = idempotencyConfig . expiresAfterSeconds ; // 1 hour default
58- // TODO: Add support for local cache
60+ this . useLocalCache = idempotencyConfig . useLocalCache ;
61+ if ( this . useLocalCache ) {
62+ this . cache = new LRUCache ( { maxSize : idempotencyConfig . maxLocalCacheSize } ) ;
63+ }
5964 this . hashFunction = idempotencyConfig . hashFunction ;
6065 }
6166
@@ -64,13 +69,15 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
6469 *
6570 * @param data - the data payload that will be hashed to create the hash portion of the idempotency key
6671 */
67- public async deleteRecord ( data : Record < string , unknown > ) : Promise < void > {
68- const idempotencyRecord = new IdempotencyRecord ( {
72+ public async deleteRecord ( data : Record < string , unknown > ) : Promise < void > {
73+ const idempotencyRecord = new IdempotencyRecord ( {
6974 idempotencyKey : this . getHashedIdempotencyKey ( data ) ,
7075 status : IdempotencyRecordStatus . EXPIRED
7176 } ) ;
72-
77+
7378 await this . _deleteRecord ( idempotencyRecord ) ;
79+
80+ this . deleteFromCache ( idempotencyRecord . idempotencyKey ) ;
7481 }
7582
7683 /**
@@ -81,7 +88,15 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
8188 public async getRecord ( data : Record < string , unknown > ) : Promise < IdempotencyRecord > {
8289 const idempotencyKey = this . getHashedIdempotencyKey ( data ) ;
8390
91+ const cachedRecord = this . getFromCache ( idempotencyKey ) ;
92+ if ( cachedRecord ) {
93+ this . validatePayload ( data , cachedRecord ) ;
94+
95+ return cachedRecord ;
96+ }
97+
8498 const record = await this . _getRecord ( idempotencyKey ) ;
99+ this . saveToCache ( record ) ;
85100 this . validatePayload ( data , record ) ;
86101
87102 return record ;
@@ -97,7 +112,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
97112 * @param data - the data payload that will be hashed to create the hash portion of the idempotency key
98113 * @param remainingTimeInMillis - the remaining time left in the lambda execution context
99114 */
100- public async saveInProgress ( data : Record < string , unknown > , remainingTimeInMillis ?: number ) : Promise < void > {
115+ public async saveInProgress ( data : Record < string , unknown > , remainingTimeInMillis ?: number ) : Promise < void > {
101116 const idempotencyRecord = new IdempotencyRecord ( {
102117 idempotencyKey : this . getHashedIdempotencyKey ( data ) ,
103118 status : IdempotencyRecordStatus . INPROGRESS ,
@@ -113,6 +128,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
113128 ) ;
114129 }
115130
131+ if ( this . getFromCache ( idempotencyRecord . idempotencyKey ) ) {
132+ throw new IdempotencyItemAlreadyExistsError ( ) ;
133+ }
134+
116135 await this . _putRecord ( idempotencyRecord ) ;
117136 }
118137
@@ -123,7 +142,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
123142 * @param data - the data payload that will be hashed to create the hash portion of the idempotency key
124143 * @param result - the result of the successfully completed function
125144 */
126- public async saveSuccess ( data : Record < string , unknown > , result : Record < string , unknown > ) : Promise < void > {
145+ public async saveSuccess ( data : Record < string , unknown > , result : Record < string , unknown > ) : Promise < void > {
127146 const idempotencyRecord = new IdempotencyRecord ( {
128147 idempotencyKey : this . getHashedIdempotencyKey ( data ) ,
129148 status : IdempotencyRecordStatus . COMPLETED ,
@@ -133,23 +152,33 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
133152 } ) ;
134153
135154 await this . _updateRecord ( idempotencyRecord ) ;
155+
156+ this . saveToCache ( idempotencyRecord ) ;
136157 }
137158
138159 protected abstract _deleteRecord ( record : IdempotencyRecord ) : Promise < void > ;
139160 protected abstract _getRecord ( idempotencyKey : string ) : Promise < IdempotencyRecord > ;
140161 protected abstract _putRecord ( record : IdempotencyRecord ) : Promise < void > ;
141162 protected abstract _updateRecord ( record : IdempotencyRecord ) : Promise < void > ;
142163
164+ private deleteFromCache ( idempotencyKey : string ) : void {
165+ if ( ! this . useLocalCache ) return ;
166+ // Delete from local cache if it exists
167+ if ( this . cache ?. has ( idempotencyKey ) ) {
168+ this . cache ?. remove ( idempotencyKey ) ;
169+ }
170+ }
171+
143172 /**
144173 * Generates a hash of the data and returns the digest of that hash
145174 *
146175 * @param data the data payload that will generate the hash
147176 * @returns the digest of the generated hash
148177 */
149- private generateHash ( data : string ) : string {
178+ private generateHash ( data : string ) : string {
150179 const hash : Hash = createHash ( this . hashFunction ) ;
151180 hash . update ( data ) ;
152-
181+
153182 return hash . digest ( 'base64' ) ;
154183 }
155184
@@ -168,10 +197,21 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
168197 */
169198 private getExpiryTimestamp ( ) : number {
170199 const currentTime : number = Date . now ( ) / 1000 ;
171-
200+
172201 return currentTime + this . expiresAfterSeconds ;
173202 }
174203
204+ private getFromCache ( idempotencyKey : string ) : IdempotencyRecord | undefined {
205+ if ( ! this . useLocalCache ) return undefined ;
206+ const cachedRecord = this . cache ?. get ( idempotencyKey ) ;
207+ if ( cachedRecord ) {
208+ // if record is not expired, return it
209+ if ( ! cachedRecord . isExpired ( ) ) return cachedRecord ;
210+ // if record is expired, delete it from cache
211+ this . deleteFromCache ( idempotencyKey ) ;
212+ }
213+ }
214+
175215 /**
176216 * Generates the idempotency key used to identify records in the persistence store.
177217 *
@@ -182,14 +222,14 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
182222 if ( this . eventKeyJmesPath ) {
183223 data = search ( data , this . eventKeyJmesPath ) ;
184224 }
185-
225+
186226 if ( BasePersistenceLayer . isMissingIdempotencyKey ( data ) ) {
187227 if ( this . throwOnNoIdempotencyKey ) {
188228 throw new Error ( 'No data found to create a hashed idempotency_key' ) ;
189229 }
190230 console . warn ( `No value found for idempotency_key. jmespath: ${ this . eventKeyJmesPath } ` ) ;
191231 }
192-
232+
193233 return `${ this . idempotencyKeyPrefix } #${ this . generateHash ( JSON . stringify ( data ) ) } ` ;
194234 }
195235
@@ -204,7 +244,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
204244 // Therefore, the assertion is safe.
205245 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
206246 data = search ( data , this . validationKeyJmesPath ! ) ;
207-
247+
208248 return this . generateHash ( JSON . stringify ( data ) ) ;
209249 }
210250
@@ -223,6 +263,20 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
223263 return ! data ;
224264 }
225265
266+ /**
267+ * Save record to local cache except for when status is `INPROGRESS`.
268+ *
269+ * We can't cache `INPROGRESS` records because we have no way to reflect updates
270+ * that might happen to the record outside of the execution context of the function.
271+ *
272+ * @param record - record to save
273+ */
274+ private saveToCache ( record : IdempotencyRecord ) : void {
275+ if ( ! this . useLocalCache ) return ;
276+ if ( record . getStatus ( ) === IdempotencyRecordStatus . INPROGRESS ) return ;
277+ this . cache ?. add ( record . idempotencyKey , record ) ;
278+ }
279+
226280 private validatePayload ( data : Record < string , unknown > , record : IdempotencyRecord ) : void {
227281 if ( this . payloadValidationEnabled ) {
228282 const hashedPayload : string = this . getHashedPayload ( data ) ;
0 commit comments