@@ -4,14 +4,19 @@ use tokio::net::tcp::OwnedWriteHalf;
44
55use std:: collections:: HashMap ;
66
7- use crate :: config:: { get_config, parse} ;
8- use crate :: constants:: { OID_NUMERIC , OID_TEXT } ;
7+ use crate :: config:: { get_config, parse, Role } ;
8+ use crate :: constants:: { OID_INT4 , OID_NUMERIC , OID_TEXT } ;
99use crate :: errors:: Error ;
10- use crate :: messages:: write_all_half;
10+ use crate :: messages:: { custom_protocol_response_ok, error_response, write_all_half} ;
11+ use crate :: pool:: ConnectionPool ;
1112use crate :: stats:: get_stats;
1213
1314/// Handle admin client
14- pub async fn handle_admin ( stream : & mut OwnedWriteHalf , mut query : BytesMut ) -> Result < ( ) , Error > {
15+ pub async fn handle_admin (
16+ stream : & mut OwnedWriteHalf ,
17+ mut query : BytesMut ,
18+ pool : ConnectionPool ,
19+ ) -> Result < ( ) , Error > {
1520 let code = query. get_u8 ( ) as char ;
1621
1722 if code != 'Q' {
@@ -23,6 +28,8 @@ pub async fn handle_admin(stream: &mut OwnedWriteHalf, mut query: BytesMut) -> R
2328 . to_string ( )
2429 . to_ascii_uppercase ( ) ;
2530
31+ trace ! ( "Admin query: {}" , query) ;
32+
2633 if query. starts_with ( "SHOW STATS" ) {
2734 trace ! ( "SHOW STATS" ) ;
2835 show_stats ( stream) . await
@@ -32,13 +39,147 @@ pub async fn handle_admin(stream: &mut OwnedWriteHalf, mut query: BytesMut) -> R
3239 } else if query. starts_with ( "SHOW CONFIG" ) {
3340 trace ! ( "SHOW CONFIG" ) ;
3441 show_config ( stream) . await
42+ } else if query. starts_with ( "SHOW DATABASES" ) {
43+ trace ! ( "SHOW DATABASES" ) ;
44+ show_databases ( stream, & pool) . await
45+ } else if query. starts_with ( "SET " ) {
46+ trace ! ( "SET" ) ;
47+ ignore_set ( stream) . await
3548 } else {
36- Err ( Error :: ProtocolSyncError )
49+ error_response ( stream , "Unsupported query against the admin database" ) . await
3750 }
3851}
3952
53+ /// SHOW DATABASES
54+ async fn show_databases ( stream : & mut OwnedWriteHalf , pool : & ConnectionPool ) -> Result < ( ) , Error > {
55+ let guard = get_config ( ) ;
56+ let config = & * guard. clone ( ) ;
57+ drop ( guard) ;
58+
59+ let columns = [
60+ "name" ,
61+ "host" ,
62+ "port" ,
63+ "database" ,
64+ "force_user" ,
65+ "pool_size" ,
66+ "min_pool_size" ,
67+ "reserve_pool" ,
68+ "pool_mode" ,
69+ "max_connections" ,
70+ "current_connections" ,
71+ "paused" ,
72+ "disabled" ,
73+ ] ;
74+
75+ let types = [
76+ OID_TEXT , OID_TEXT , OID_TEXT , OID_TEXT , OID_TEXT , OID_INT4 , OID_INT4 , OID_INT4 , OID_TEXT ,
77+ OID_INT4 , OID_INT4 , OID_INT4 , OID_INT4 ,
78+ ] ;
79+
80+ let mut res = BytesMut :: new ( ) ;
81+ let mut row_desc = BytesMut :: new ( ) ;
82+ row_desc. put_i16 ( columns. len ( ) as i16 ) ;
83+
84+ for ( i, column) in columns. iter ( ) . enumerate ( ) {
85+ row_desc. put_slice ( & format ! ( "{}\0 " , column) . as_bytes ( ) ) ;
86+
87+ // Doesn't belong to any table
88+ row_desc. put_i32 ( 0 ) ;
89+
90+ // Doesn't belong to any table
91+ row_desc. put_i16 ( 0 ) ;
92+
93+ // Data type
94+ row_desc. put_i32 ( types[ i] ) ;
95+
96+ // text size = variable (-1)
97+ row_desc. put_i16 ( if types[ i] == OID_TEXT { -1 } else { 4 } ) ;
98+
99+ // Type modifier: none that I know
100+ row_desc. put_i32 ( -1 ) ;
101+
102+ // Format being used: text (0), binary (1)
103+ row_desc. put_i16 ( 0 ) ;
104+ }
105+
106+ res. put_u8 ( b'T' ) ;
107+ res. put_i32 ( row_desc. len ( ) as i32 + 4 ) ;
108+ res. put ( row_desc) ;
109+
110+ for shard in 0 ..pool. shards ( ) {
111+ let database_name = & config. shards [ & shard. to_string ( ) ] . database ;
112+ let mut replica_count = 0 ;
113+ for server in 0 ..pool. servers ( shard) {
114+ // DataRow
115+ let mut data_row = BytesMut :: new ( ) ;
116+ data_row. put_i16 ( columns. len ( ) as i16 ) ;
117+
118+ let address = pool. address ( shard, server) ;
119+ let role = address. role . to_string ( ) ;
120+ let name = match role. as_ref ( ) {
121+ "primary" => format ! ( "shard_{}_primary" , shard) ,
122+ "replica" => format ! ( "shard_{}_replica_{}" , shard, replica_count) ,
123+ _ => unreachable ! ( ) ,
124+ } ;
125+ let connections = pool. connections ( shard, server) ;
126+
127+ let data = HashMap :: from ( [
128+ ( "host" , address. host . to_string ( ) ) ,
129+ ( "port" , address. port . to_string ( ) ) ,
130+ ( "role" , role) ,
131+ ( "name" , name) ,
132+ ( "database" , database_name. to_string ( ) ) ,
133+ ( "force_user" , config. user . name . to_string ( ) ) ,
134+ ( "pool_size" , config. general . pool_size . to_string ( ) ) ,
135+ ( "min_pool_size" , "0" . to_string ( ) ) ,
136+ ( "reserve_pool" , "0" . to_string ( ) ) ,
137+ ( "pool_mode" , config. general . pool_mode . to_string ( ) ) ,
138+ // There is only one user support at the moment,
139+ // so max_connections = num of users * pool_size = 1 * pool_size.
140+ ( "max_connections" , config. general . pool_size . to_string ( ) ) ,
141+ ( "current_connections" , connections. connections . to_string ( ) ) ,
142+ ( "paused" , "0" . to_string ( ) ) ,
143+ ( "disabled" , "0" . to_string ( ) ) ,
144+ ] ) ;
145+
146+ for column in & columns {
147+ let value = data[ column] . as_bytes ( ) ;
148+
149+ data_row. put_i32 ( value. len ( ) as i32 ) ;
150+ data_row. put_slice ( & value) ;
151+ }
152+
153+ res. put_u8 ( b'D' ) ;
154+ res. put_i32 ( data_row. len ( ) as i32 + 4 ) ;
155+ res. put ( data_row) ;
156+
157+ if address. role == Role :: Replica {
158+ replica_count += 1 ;
159+ }
160+ }
161+ }
162+
163+ let command_complete = BytesMut :: from ( & "SHOW\0 " [ ..] ) ;
164+ res. put_u8 ( b'C' ) ;
165+ res. put_i32 ( command_complete. len ( ) as i32 + 4 ) ;
166+ res. put ( command_complete) ;
167+
168+ res. put_u8 ( b'Z' ) ;
169+ res. put_i32 ( 5 ) ;
170+ res. put_u8 ( b'I' ) ;
171+
172+ write_all_half ( stream, res) . await
173+ }
174+
175+ /// Ignore any SET commands the client sends.
176+ /// This is common initialization done by ORMs.
177+ async fn ignore_set ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
178+ custom_protocol_response_ok ( stream, "SET" ) . await
179+ }
180+
40181/// RELOAD
41- pub async fn reload ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
182+ async fn reload ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
42183 info ! ( "Reloading config" ) ;
43184
44185 let config = get_config ( ) ;
@@ -66,7 +207,7 @@ pub async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
66207 write_all_half ( stream, res) . await
67208}
68209
69- pub async fn show_config ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
210+ async fn show_config ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
70211 let guard = get_config ( ) ;
71212 let config = & * guard. clone ( ) ;
72213 let config: HashMap < String , String > = config. into ( ) ;
@@ -153,7 +294,7 @@ pub async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
153294}
154295
155296/// SHOW STATS
156- pub async fn show_stats ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
297+ async fn show_stats ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
157298 let columns = [
158299 "database" ,
159300 "total_xact_count" ,
0 commit comments