77
88//! Objects related to [`SqliteStore`] live here.
99use std:: boxed:: Box ;
10+ use std:: collections:: HashMap ;
1011use std:: fs;
1112use std:: future:: Future ;
1213use std:: path:: PathBuf ;
1314use std:: pin:: Pin ;
15+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
1416use std:: sync:: { Arc , Mutex } ;
1517
1618use lightning:: io;
@@ -41,6 +43,10 @@ const SCHEMA_USER_VERSION: u16 = 2;
4143/// [SQLite]: https://sqlite.org
4244pub struct SqliteStore {
4345 inner : Arc < SqliteStoreInner > ,
46+
47+ // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
48+ // operations aren't sensitive to the order of execution.
49+ next_write_version : AtomicU64 ,
4450}
4551
4652impl SqliteStore {
@@ -54,7 +60,31 @@ impl SqliteStore {
5460 data_dir : PathBuf , db_file_name : Option < String > , kv_table_name : Option < String > ,
5561 ) -> io:: Result < Self > {
5662 let inner = Arc :: new ( SqliteStoreInner :: new ( data_dir, db_file_name, kv_table_name) ?) ;
57- Ok ( Self { inner } )
63+ let next_write_version = AtomicU64 :: new ( 1 ) ;
64+ Ok ( Self { inner, next_write_version } )
65+ }
66+
67+ fn build_locking_key (
68+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
69+ ) -> String {
70+ if primary_namespace. is_empty ( ) {
71+ key. to_owned ( )
72+ } else {
73+ format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, key)
74+ }
75+ }
76+
77+ fn get_new_version_and_lock_ref ( & self , locking_key : String ) -> ( Arc < Mutex < u64 > > , u64 ) {
78+ let version = self . next_write_version . fetch_add ( 1 , Ordering :: Relaxed ) ;
79+ if version == u64:: MAX {
80+ panic ! ( "SqliteStore version counter overflowed" ) ;
81+ }
82+
83+ // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
84+ // cleaning up unused locks.
85+ let inner_lock_ref = self . inner . get_inner_lock_ref ( locking_key) ;
86+
87+ ( inner_lock_ref, version)
5888 }
5989
6090 /// Returns the data directory.
@@ -85,12 +115,22 @@ impl KVStore for SqliteStore {
85115 fn write (
86116 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
87117 ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
118+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
119+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
88120 let primary_namespace = primary_namespace. to_string ( ) ;
89121 let secondary_namespace = secondary_namespace. to_string ( ) ;
90122 let key = key. to_string ( ) ;
91123 let inner = Arc :: clone ( & self . inner ) ;
92124 let fut = tokio:: task:: spawn_blocking ( move || {
93- inner. write_internal ( & primary_namespace, & secondary_namespace, & key, buf)
125+ inner. write_internal (
126+ inner_lock_ref,
127+ locking_key,
128+ version,
129+ & primary_namespace,
130+ & secondary_namespace,
131+ & key,
132+ buf,
133+ )
94134 } ) ;
95135 Box :: pin ( async move {
96136 fut. await . unwrap_or_else ( |e| {
@@ -103,12 +143,22 @@ impl KVStore for SqliteStore {
103143 fn remove (
104144 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
105145 ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
146+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
147+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
106148 let primary_namespace = primary_namespace. to_string ( ) ;
107149 let secondary_namespace = secondary_namespace. to_string ( ) ;
108150 let key = key. to_string ( ) ;
109151 let inner = Arc :: clone ( & self . inner ) ;
110152 let fut = tokio:: task:: spawn_blocking ( move || {
111- inner. remove_internal ( & primary_namespace, & secondary_namespace, & key, lazy)
153+ inner. remove_internal (
154+ inner_lock_ref,
155+ locking_key,
156+ version,
157+ & primary_namespace,
158+ & secondary_namespace,
159+ & key,
160+ lazy,
161+ )
112162 } ) ;
113163 Box :: pin ( async move {
114164 fut. await . unwrap_or_else ( |e| {
@@ -146,13 +196,33 @@ impl KVStoreSync for SqliteStore {
146196 fn write (
147197 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
148198 ) -> io:: Result < ( ) > {
149- self . inner . write_internal ( primary_namespace, secondary_namespace, key, buf)
199+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
200+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
201+ self . inner . write_internal (
202+ inner_lock_ref,
203+ locking_key,
204+ version,
205+ primary_namespace,
206+ secondary_namespace,
207+ key,
208+ buf,
209+ )
150210 }
151211
152212 fn remove (
153213 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
154214 ) -> io:: Result < ( ) > {
155- self . inner . remove_internal ( primary_namespace, secondary_namespace, key, lazy)
215+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
216+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
217+ self . inner . remove_internal (
218+ inner_lock_ref,
219+ locking_key,
220+ version,
221+ primary_namespace,
222+ secondary_namespace,
223+ key,
224+ lazy,
225+ )
156226 }
157227
158228 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
@@ -164,6 +234,7 @@ struct SqliteStoreInner {
164234 connection : Arc < Mutex < Connection > > ,
165235 data_dir : PathBuf ,
166236 kv_table_name : String ,
237+ write_version_locks : Mutex < HashMap < String , Arc < Mutex < u64 > > > > ,
167238}
168239
169240impl SqliteStoreInner {
@@ -237,7 +308,13 @@ impl SqliteStoreInner {
237308 } ) ?;
238309
239310 let connection = Arc :: new ( Mutex :: new ( connection) ) ;
240- Ok ( Self { connection, data_dir, kv_table_name } )
311+ let write_version_locks = Mutex :: new ( HashMap :: new ( ) ) ;
312+ Ok ( Self { connection, data_dir, kv_table_name, write_version_locks } )
313+ }
314+
315+ fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < Mutex < u64 > > {
316+ let mut outer_lock = self . write_version_locks . lock ( ) . unwrap ( ) ;
317+ Arc :: clone ( & outer_lock. entry ( locking_key) . or_default ( ) )
241318 }
242319
243320 fn read_internal (
@@ -289,46 +366,51 @@ impl SqliteStoreInner {
289366 }
290367
291368 fn write_internal (
292- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
369+ & self , inner_lock_ref : Arc < Mutex < u64 > > , locking_key : String , version : u64 ,
370+ primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
293371 ) -> io:: Result < ( ) > {
294372 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
295373
296- let locked_conn = self . connection . lock ( ) . unwrap ( ) ;
374+ self . execute_locked_write ( inner_lock_ref, locking_key, version, || {
375+ let locked_conn = self . connection . lock ( ) . unwrap ( ) ;
297376
298- let sql = format ! (
299- "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);" ,
300- self . kv_table_name
301- ) ;
377+ let sql = format ! (
378+ "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);" ,
379+ self . kv_table_name
380+ ) ;
302381
303- let mut stmt = locked_conn. prepare_cached ( & sql) . map_err ( |e| {
304- let msg = format ! ( "Failed to prepare statement: {}" , e) ;
305- io:: Error :: new ( io:: ErrorKind :: Other , msg)
306- } ) ?;
382+ let mut stmt = locked_conn. prepare_cached ( & sql) . map_err ( |e| {
383+ let msg = format ! ( "Failed to prepare statement: {}" , e) ;
384+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
385+ } ) ?;
307386
308- stmt. execute ( named_params ! {
309- ":primary_namespace" : primary_namespace,
310- ":secondary_namespace" : secondary_namespace,
311- ":key" : key,
312- ":value" : buf,
313- } )
314- . map ( |_| ( ) )
315- . map_err ( |e| {
316- let msg = format ! (
317- "Failed to write to key {}/{}/{}: {}" ,
318- PrintableString ( primary_namespace) ,
319- PrintableString ( secondary_namespace) ,
320- PrintableString ( key) ,
321- e
322- ) ;
323- io:: Error :: new ( io:: ErrorKind :: Other , msg)
387+ stmt. execute ( named_params ! {
388+ ":primary_namespace" : primary_namespace,
389+ ":secondary_namespace" : secondary_namespace,
390+ ":key" : key,
391+ ":value" : buf,
392+ } )
393+ . map ( |_| ( ) )
394+ . map_err ( |e| {
395+ let msg = format ! (
396+ "Failed to write to key {}/{}/{}: {}" ,
397+ PrintableString ( primary_namespace) ,
398+ PrintableString ( secondary_namespace) ,
399+ PrintableString ( key) ,
400+ e
401+ ) ;
402+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
403+ } )
324404 } )
325405 }
326406
327407 fn remove_internal (
328- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
408+ & self , inner_lock_ref : Arc < Mutex < u64 > > , locking_key : String , version : u64 ,
409+ primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
329410 ) -> io:: Result < ( ) > {
330411 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "remove" ) ?;
331412
413+ self . execute_locked_write ( inner_lock_ref, locking_key, version, || {
332414 let locked_conn = self . connection . lock ( ) . unwrap ( ) ;
333415
334416 let sql = format ! ( "DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;" , self . kv_table_name) ;
@@ -354,6 +436,7 @@ impl SqliteStoreInner {
354436 io:: Error :: new ( io:: ErrorKind :: Other , msg)
355437 } ) ?;
356438 Ok ( ( ) )
439+ } )
357440 }
358441
359442 fn list_internal (
@@ -396,6 +479,46 @@ impl SqliteStoreInner {
396479
397480 Ok ( keys)
398481 }
482+
483+ fn execute_locked_write < F : FnOnce ( ) -> Result < ( ) , lightning:: io:: Error > > (
484+ & self , inner_lock_ref : Arc < Mutex < u64 > > , locking_key : String , version : u64 , callback : F ,
485+ ) -> Result < ( ) , lightning:: io:: Error > {
486+ let res = {
487+ let mut last_written_version = inner_lock_ref. lock ( ) . unwrap ( ) ;
488+
489+ // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
490+ // consistency.
491+ let is_stale_version = version <= * last_written_version;
492+
493+ // If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
494+ if is_stale_version {
495+ Ok ( ( ) )
496+ } else {
497+ callback ( ) . map ( |_| {
498+ * last_written_version = version;
499+ } )
500+ }
501+ } ;
502+
503+ self . clean_locks ( & inner_lock_ref, locking_key) ;
504+
505+ res
506+ }
507+
508+ fn clean_locks ( & self , inner_lock_ref : & Arc < Mutex < u64 > > , locking_key : String ) {
509+ // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
510+ // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
511+ // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
512+ // counted.
513+ let mut outer_lock = self . write_version_locks . lock ( ) . unwrap ( ) ;
514+
515+ let strong_count = Arc :: strong_count ( & inner_lock_ref) ;
516+ debug_assert ! ( strong_count >= 2 , "Unexpected SqliteStore strong count" ) ;
517+
518+ if strong_count == 2 {
519+ outer_lock. remove ( & locking_key) ;
520+ }
521+ }
399522}
400523
401524#[ cfg( test) ]
0 commit comments