@@ -5,9 +5,8 @@ use tokio::net::tcp::OwnedWriteHalf;
55use std:: collections:: HashMap ;
66
77use crate :: config:: { get_config, parse, Role } ;
8- use crate :: constants:: { OID_INT4 , OID_NUMERIC , OID_TEXT } ;
98use crate :: errors:: Error ;
10- use crate :: messages:: { custom_protocol_response_ok , error_response , write_all_half } ;
9+ use crate :: messages:: * ;
1110use crate :: pool:: ConnectionPool ;
1211use crate :: stats:: get_stats;
1312
@@ -56,115 +55,66 @@ async fn show_databases(stream: &mut OwnedWriteHalf, pool: &ConnectionPool) -> R
5655 let config = & * guard. clone ( ) ;
5756 drop ( guard) ;
5857
59- let columns = [
60- "name" ,
61- "host" ,
62- "port" ,
63- "database" ,
64- "force_user" ,
65- "pool_size" ,
66- "min_pool_size" ,
67- "reserve_pool" ,
68- "pool_mode" ,
69- "max_connections" ,
70- "current_connections" ,
71- "paused" ,
72- "disabled" ,
73- ] ;
74-
75- let types = [
76- OID_TEXT , OID_TEXT , OID_TEXT , OID_TEXT , OID_TEXT , OID_INT4 , OID_INT4 , OID_INT4 , OID_TEXT ,
77- OID_INT4 , OID_INT4 , OID_INT4 , OID_INT4 ,
58+ // Columns
59+ let columns = vec ! [
60+ ( "name" , DataType :: Text ) ,
61+ ( "host" , DataType :: Text ) ,
62+ ( "port" , DataType :: Text ) ,
63+ ( "database" , DataType :: Text ) ,
64+ ( "force_user" , DataType :: Text ) ,
65+ ( "pool_size" , DataType :: Int4 ) ,
66+ ( "min_pool_size" , DataType :: Int4 ) ,
67+ ( "reserve_pool" , DataType :: Int4 ) ,
68+ ( "pool_mode" , DataType :: Text ) ,
69+ ( "max_connections" , DataType :: Int4 ) ,
70+ ( "current_connections" , DataType :: Int4 ) ,
71+ ( "paused" , DataType :: Int4 ) ,
72+ ( "disabled" , DataType :: Int4 ) ,
7873 ] ;
7974
8075 let mut res = BytesMut :: new ( ) ;
81- let mut row_desc = BytesMut :: new ( ) ;
82- row_desc. put_i16 ( columns. len ( ) as i16 ) ;
83-
84- for ( i, column) in columns. iter ( ) . enumerate ( ) {
85- row_desc. put_slice ( & format ! ( "{}\0 " , column) . as_bytes ( ) ) ;
86-
87- // Doesn't belong to any table
88- row_desc. put_i32 ( 0 ) ;
89-
90- // Doesn't belong to any table
91- row_desc. put_i16 ( 0 ) ;
9276
93- // Data type
94- row_desc. put_i32 ( types[ i] ) ;
95-
96- // text size = variable (-1)
97- row_desc. put_i16 ( if types[ i] == OID_TEXT { -1 } else { 4 } ) ;
98-
99- // Type modifier: none that I know
100- row_desc. put_i32 ( -1 ) ;
101-
102- // Format being used: text (0), binary (1)
103- row_desc. put_i16 ( 0 ) ;
104- }
105-
106- res. put_u8 ( b'T' ) ;
107- res. put_i32 ( row_desc. len ( ) as i32 + 4 ) ;
108- res. put ( row_desc) ;
77+ // RowDescription
78+ res. put ( row_description ( & columns) ) ;
10979
11080 for shard in 0 ..pool. shards ( ) {
11181 let database_name = & config. shards [ & shard. to_string ( ) ] . database ;
11282 let mut replica_count = 0 ;
113- for server in 0 ..pool. servers ( shard) {
114- // DataRow
115- let mut data_row = BytesMut :: new ( ) ;
116- data_row. put_i16 ( columns. len ( ) as i16 ) ;
11783
84+ for server in 0 ..pool. servers ( shard) {
11885 let address = pool. address ( shard, server) ;
119- let role = address. role . to_string ( ) ;
120- let name = match role. as_ref ( ) {
121- "primary" => format ! ( "shard_{}_primary" , shard) ,
122- "replica" => format ! ( "shard_{}_replica_{}" , shard, replica_count) ,
123- _ => unreachable ! ( ) ,
86+ let name = match address. role {
87+ Role :: Primary => format ! ( "shard_{}_primary" , shard) ,
88+
89+ Role :: Replica => {
90+ let name = format ! ( "shard_{}_replica_{}" , shard, replica_count) ;
91+ replica_count += 1 ;
92+ name
93+ }
12494 } ;
125- let connections = pool. connections ( shard, server) ;
126-
127- let data = HashMap :: from ( [
128- ( "host" , address. host . to_string ( ) ) ,
129- ( "port" , address. port . to_string ( ) ) ,
130- ( "role" , role) ,
131- ( "name" , name) ,
132- ( "database" , database_name. to_string ( ) ) ,
133- ( "force_user" , config. user . name . to_string ( ) ) ,
134- ( "pool_size" , config. general . pool_size . to_string ( ) ) ,
135- ( "min_pool_size" , "0" . to_string ( ) ) ,
136- ( "reserve_pool" , "0" . to_string ( ) ) ,
137- ( "pool_mode" , config. general . pool_mode . to_string ( ) ) ,
138- // There is only one user support at the moment,
139- // so max_connections = num of users * pool_size = 1 * pool_size.
140- ( "max_connections" , config. general . pool_size . to_string ( ) ) ,
141- ( "current_connections" , connections. connections . to_string ( ) ) ,
142- ( "paused" , "0" . to_string ( ) ) ,
143- ( "disabled" , "0" . to_string ( ) ) ,
144- ] ) ;
145-
146- for column in & columns {
147- let value = data[ column] . as_bytes ( ) ;
148-
149- data_row. put_i32 ( value. len ( ) as i32 ) ;
150- data_row. put_slice ( & value) ;
151- }
152-
153- res. put_u8 ( b'D' ) ;
154- res. put_i32 ( data_row. len ( ) as i32 + 4 ) ;
155- res. put ( data_row) ;
156-
157- if address. role == Role :: Replica {
158- replica_count += 1 ;
159- }
95+ let pool_state = pool. pool_state ( shard, server) ;
96+
97+ res. put ( data_row ( & vec ! [
98+ name, // name
99+ address. host. to_string( ) , // host
100+ address. port. to_string( ) , // port
101+ database_name. to_string( ) , // database
102+ config. user. name. to_string( ) , // force_user
103+ config. general. pool_size. to_string( ) , // pool_size
104+ "0" . to_string( ) , // min_pool_size
105+ "0" . to_string( ) , // reserve_pool
106+ config. general. pool_mode. to_string( ) , // pool_mode
107+ config. general. pool_size. to_string( ) , // max_connections
108+ pool_state. connections. to_string( ) , // current_connections
109+ "0" . to_string( ) , // paused
110+ "0" . to_string( ) , // disabled
111+ ] ) ) ;
160112 }
161113 }
162114
163- let command_complete = BytesMut :: from ( & "SHOW\0 " [ ..] ) ;
164- res. put_u8 ( b'C' ) ;
165- res. put_i32 ( command_complete. len ( ) as i32 + 4 ) ;
166- res. put ( command_complete) ;
115+ res. put ( command_complete ( "SHOW" ) ) ;
167116
117+ // ReadyForQuery
168118 res. put_u8 ( b'Z' ) ;
169119 res. put_i32 ( 5 ) ;
170120 res. put_u8 ( b'I' ) ;
@@ -194,10 +144,7 @@ async fn reload(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
194144 let mut res = BytesMut :: new ( ) ;
195145
196146 // CommandComplete
197- let command_complete = BytesMut :: from ( & "RELOAD\0 " [ ..] ) ;
198- res. put_u8 ( b'C' ) ;
199- res. put_i32 ( command_complete. len ( ) as i32 + 4 ) ;
200- res. put ( command_complete) ;
147+ res. put ( command_complete ( "RELOAD" ) ) ;
201148
202149 // ReadyForQuery
203150 res. put_u8 ( b'Z' ) ;
@@ -217,74 +164,31 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
217164 let immutables = [ "host" , "port" , "connect_timeout" ] ;
218165
219166 // Columns
220- let columns = [ "key" , "value" , "default" , "changeable" ] ;
221-
222- // RowDescription
223- let mut row_desc = BytesMut :: new ( ) ;
224- row_desc. put_i16 ( 4 as i16 ) ; // key, value, default, changeable
225-
226- for column in columns {
227- row_desc. put_slice ( & format ! ( "{}\0 " , column) . as_bytes ( ) ) ;
228-
229- // Doesn't belong to any table
230- row_desc. put_i32 ( 0 ) ;
231-
232- // Doesn't belong to any table
233- row_desc. put_i16 ( 0 ) ;
234-
235- // Data type
236- row_desc. put_i32 ( OID_TEXT ) ;
237-
238- // text size = variable (-1)
239- row_desc. put_i16 ( -1 ) ;
240-
241- // Type modifier: none that I know
242- row_desc. put_i32 ( -1 ) ;
243-
244- // Format being used: text (0), binary (1)
245- row_desc. put_i16 ( 0 ) ;
246- }
167+ let columns = vec ! [
168+ ( "key" , DataType :: Text ) ,
169+ ( "value" , DataType :: Text ) ,
170+ ( "default" , DataType :: Text ) ,
171+ ( "changeable" , DataType :: Text ) ,
172+ ] ;
247173
248174 // Response data
249175 let mut res = BytesMut :: new ( ) ;
250- res. put_u8 ( b'T' ) ;
251- res. put_i32 ( row_desc. len ( ) as i32 + 4 ) ;
252- res. put ( row_desc) ;
176+ res. put ( row_description ( & columns) ) ;
253177
254178 // DataRow rows
255179 for ( key, value) in config {
256- let mut data_row = BytesMut :: new ( ) ;
257-
258- data_row. put_i16 ( 4 as i16 ) ; // key, value, default, changeable
259-
260- let key_bytes = key. as_bytes ( ) ;
261- let value = value. as_bytes ( ) ;
262-
263- data_row. put_i32 ( key_bytes. len ( ) as i32 ) ;
264- data_row. put_slice ( & key_bytes) ;
265-
266- data_row. put_i32 ( value. len ( ) as i32 ) ;
267- data_row. put_slice ( & value) ;
268-
269- data_row. put_i32 ( 1 as i32 ) ;
270- data_row. put_slice ( & "-" . as_bytes ( ) ) ;
271-
272180 let changeable = if immutables. iter ( ) . filter ( |col| * col == & key) . count ( ) == 1 {
273- "no" . as_bytes ( )
181+ "no" . to_string ( )
274182 } else {
275- "yes" . as_bytes ( )
183+ "yes" . to_string ( )
276184 } ;
277- data_row. put_i32 ( changeable. len ( ) as i32 ) ;
278- data_row. put_slice ( & changeable) ;
279185
280- res . put_u8 ( b'D' ) ;
281- res . put_i32 ( data_row . len ( ) as i32 + 4 ) ;
282- res. put ( data_row) ;
186+ let row = vec ! [ key , value , "-" . to_string ( ) , changeable ] ;
187+
188+ res. put ( data_row ( & row ) ) ;
283189 }
284190
285- res. put_u8 ( b'C' ) ;
286- res. put_i32 ( "SHOW CONFIG\0 " . as_bytes ( ) . len ( ) as i32 + 4 ) ;
287- res. put_slice ( & "SHOW CONFIG\0 " . as_bytes ( ) ) ;
191+ res. put ( command_complete ( "SHOW" ) ) ;
288192
289193 res. put_u8 ( b'Z' ) ;
290194 res. put_i32 ( 5 ) ;
@@ -295,81 +199,38 @@ async fn show_config(stream: &mut OwnedWriteHalf) -> Result<(), Error> {
295199
296200/// SHOW STATS
297201async fn show_stats ( stream : & mut OwnedWriteHalf ) -> Result < ( ) , Error > {
298- let columns = [
299- "database" ,
300- "total_xact_count" ,
301- "total_query_count" ,
302- "total_received" ,
303- "total_sent" ,
304- "total_xact_time" ,
305- "total_query_time" ,
306- "total_wait_time" ,
307- "avg_xact_count" ,
308- "avg_query_count" ,
309- "avg_recv" ,
310- "avg_sent" ,
311- "avg_xact_time" ,
312- "avg_query_time" ,
313- "avg_wait_time" ,
202+ let columns = vec ! [
203+ ( "database" , DataType :: Text ) ,
204+ ( "total_xact_count" , DataType :: Numeric ) ,
205+ ( "total_query_count" , DataType :: Numeric ) ,
206+ ( "total_received" , DataType :: Numeric ) ,
207+ ( "total_sent" , DataType :: Numeric ) ,
208+ ( "total_xact_time" , DataType :: Numeric ) ,
209+ ( "total_query_time" , DataType :: Numeric ) ,
210+ ( "total_wait_time" , DataType :: Numeric ) ,
211+ ( "avg_xact_count" , DataType :: Numeric ) ,
212+ ( "avg_query_count" , DataType :: Numeric ) ,
213+ ( "avg_recv" , DataType :: Numeric ) ,
214+ ( "avg_sent" , DataType :: Numeric ) ,
215+ ( "avg_xact_time" , DataType :: Numeric ) ,
216+ ( "avg_query_time" , DataType :: Numeric ) ,
217+ ( "avg_wait_time" , DataType :: Numeric ) ,
314218 ] ;
315219
316220 let stats = get_stats ( ) ;
317221 let mut res = BytesMut :: new ( ) ;
318- let mut row_desc = BytesMut :: new ( ) ;
319- let mut data_row = BytesMut :: new ( ) ;
320-
321- // Number of columns: 1
322- row_desc. put_i16 ( columns. len ( ) as i16 ) ;
323- data_row. put_i16 ( columns. len ( ) as i16 ) ;
324-
325- for ( i, column) in columns. iter ( ) . enumerate ( ) {
326- // RowDescription
327-
328- // Column name
329- row_desc. put_slice ( & format ! ( "{}\0 " , column) . as_bytes ( ) ) ;
222+ res. put ( row_description ( & columns) ) ;
330223
331- // Doesn't belong to any table
332- row_desc. put_i32 ( 0 ) ;
333-
334- // Doesn't belong to any table
335- row_desc. put_i16 ( 0 ) ;
336-
337- // Data type
338- row_desc. put_i32 ( if i == 0 { OID_TEXT } else { OID_NUMERIC } ) ;
339-
340- // Numeric/text size = variable (-1)
341- row_desc. put_i16 ( -1 ) ;
342-
343- // Type modifier: none that I know
344- row_desc. put_i32 ( -1 ) ;
345-
346- // Format being used: text (0), binary (1)
347- row_desc. put_i16 ( 0 ) ;
348-
349- // DataRow
350- let value = if i == 0 {
351- String :: from ( "all shards" )
352- } else {
353- stats. get ( & column. to_string ( ) ) . unwrap_or ( & 0 ) . to_string ( )
354- } ;
224+ let mut row = vec ! [
225+ String :: from( "all shards" ) , // TODO: per-database stats,
226+ ] ;
355227
356- data_row . put_i32 ( value . len ( ) as i32 ) ;
357- data_row . put_slice ( value . as_bytes ( ) ) ;
228+ for column in & columns [ 1 .. ] {
229+ row . push ( stats . get ( column . 0 ) . unwrap_or ( & 0 ) . to_string ( ) ) ;
358230 }
359231
360- let command_complete = BytesMut :: from ( & "SHOW\0 " [ ..] ) ;
361-
362- res. put_u8 ( b'T' ) ;
363- res. put_i32 ( row_desc. len ( ) as i32 + 4 ) ;
364- res. put ( row_desc) ;
365-
366- res. put_u8 ( b'D' ) ;
367- res. put_i32 ( data_row. len ( ) as i32 + 4 ) ;
368- res. put ( data_row) ;
369-
370- res. put_u8 ( b'C' ) ;
371- res. put_i32 ( command_complete. len ( ) as i32 + 4 ) ;
372- res. put ( command_complete) ;
232+ res. put ( data_row ( & row) ) ;
233+ res. put ( command_complete ( "SHOW" ) ) ;
373234
374235 res. put_u8 ( b'Z' ) ;
375236 res. put_i32 ( 5 ) ;
0 commit comments