@@ -9,7 +9,8 @@ use crate::{
99} ;
1010use cipherstash_client:: encryption:: Plaintext ;
1111use tokio:: sync:: mpsc:: { self , UnboundedReceiver , UnboundedSender } ;
12- use tracing:: warn;
12+ use tokio:: sync:: oneshot:: Sender ;
13+ use tracing:: { debug, warn} ;
1314
1415mod encrypt_config;
1516mod schema;
@@ -22,6 +23,8 @@ pub type ReloadSender = UnboundedSender<ReloadCommand>;
2223
2324type ReloadReceiver = UnboundedReceiver < ReloadCommand > ;
2425
26+ pub type ReloadResponder = Sender < ( ) > ;
27+
2528/// SQL Statement for loading encrypt configuration from database
2629const ENCRYPT_CONFIG_QUERY : & str = include_str ! ( "./sql/select_config.sql" ) ;
2730
@@ -31,10 +34,10 @@ const SCHEMA_QUERY: &str = include_str!("./sql/select_table_schemas.sql");
3134/// SQL Statement for loading aggregates as part of database schema
3235const AGGREGATE_QUERY : & str = include_str ! ( "./sql/select_aggregates.sql" ) ;
3336
34- #[ derive( Debug , Clone , Copy ) ]
37+ #[ derive( Debug ) ]
3538pub enum ReloadCommand {
36- DatabaseSchema ,
37- EncryptSchema ,
39+ DatabaseSchema ( ReloadResponder ) ,
40+ EncryptSchema ( ReloadResponder ) ,
3841}
3942
4043///
@@ -48,7 +51,6 @@ pub struct Proxy {
4851 pub eql_version : Option < String > ,
4952 zerokms : ZeroKms ,
5053 reload_sender : ReloadSender ,
51- reload_receiver : ReloadReceiver ,
5254}
5355
5456impl Proxy {
@@ -59,22 +61,27 @@ impl Proxy {
5961 // Ensures error on start if credential or network issue
6062 zerokms. init_cipher ( None ) . await ?;
6163
62- let encrypt_config = EncryptConfigManager :: init ( & config. database ) . await ?;
64+ let encrypt_config_manager = EncryptConfigManager :: init ( & config. database ) . await ?;
6365
64- let schema = SchemaManager :: init ( & config. database ) . await ?;
66+ let schema_manager = SchemaManager :: init ( & config. database ) . await ?;
6567
6668 let eql_version = Proxy :: eql_version ( & config) . await ?;
6769
6870 let ( reload_sender, reload_receiver) = mpsc:: unbounded_channel ( ) ;
6971
72+ Proxy :: receive (
73+ reload_receiver,
74+ schema_manager. clone ( ) ,
75+ encrypt_config_manager. clone ( ) ,
76+ ) ;
77+
7078 Ok ( Proxy {
7179 config : Arc :: new ( config) ,
7280 zerokms,
73- encrypt_config_manager : encrypt_config ,
74- schema_manager : schema ,
81+ encrypt_config_manager,
82+ schema_manager,
7583 eql_version,
7684 reload_sender,
77- reload_receiver,
7885 } )
7986 }
8087
@@ -97,13 +104,27 @@ impl Proxy {
97104 Ok ( version)
98105 }
99106
100- pub async fn receive ( & mut self ) {
101- while let Some ( command) = self . reload_receiver . recv ( ) . await {
102- match command {
103- ReloadCommand :: DatabaseSchema => self . schema_manager . reload ( ) . await ,
104- ReloadCommand :: EncryptSchema => self . encrypt_config_manager . reload ( ) . await ,
107+ pub fn receive (
108+ mut reload_receiver : ReloadReceiver ,
109+ schema_manager : SchemaManager ,
110+ encrypt_config_manager : EncryptConfigManager ,
111+ ) {
112+ tokio:: task:: spawn ( async move {
113+ while let Some ( command) = reload_receiver. recv ( ) . await {
114+ debug ! ( msg = "ReloadCommand received" , ?command) ;
115+ match command {
116+ ReloadCommand :: DatabaseSchema ( responder) => {
117+ schema_manager. reload ( ) . await ;
118+ encrypt_config_manager. reload ( ) . await ;
119+ let _ = responder. send ( ( ) ) ;
120+ }
121+ ReloadCommand :: EncryptSchema ( responder) => {
122+ encrypt_config_manager. reload ( ) . await ;
123+ let _ = responder. send ( ( ) ) ;
124+ }
125+ }
105126 }
106- }
127+ } ) ;
107128 }
108129
109130 ///
0 commit comments