Skip to content

Commit 457cb28

Browse files
feat: add Amazon Redshift database driver
- Implement Redshift connector with SSL support - Add schema, table, and column loading for Redshift - Use information_schema for better permission compatibility - Support Redshift-specific SQL queries - Register Redshift commands in Tauri handler
1 parent f187189 commit 457cb28

File tree

3 files changed

+276
-0
lines changed

3 files changed

+276
-0
lines changed

src-tauri/src/drivers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod pgsql;
2+
pub mod redshift;

src-tauri/src/drivers/redshift.rs

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use std::{sync::Arc, time::Instant};
2+
3+
use crate::common::{
4+
enums::{PostgresqlError, ProjectConnectionStatus},
5+
pgsql::{PgsqlLoadColumns, PgsqlLoadSchemas, PgsqlLoadTables},
6+
};
7+
use tauri::{AppHandle, Manager, Result, State};
8+
use tokio::{sync::Mutex, time as tokio_time};
9+
use tokio_postgres::Config;
10+
use postgres_native_tls::MakeTlsConnector;
11+
use native_tls::TlsConnector;
12+
13+
use crate::{utils::reflective_get, AppState};
14+
15+
#[tauri::command(rename_all = "snake_case")]
16+
pub async fn redshift_connector(
17+
project_id: &str,
18+
key: Option<[&str; 6]>,
19+
app: AppHandle,
20+
) -> Result<ProjectConnectionStatus> {
21+
let app_state = app.state::<AppState>();
22+
let mut clients = app_state.client.lock().await;
23+
tracing::info!("Redshift connection attempt: {:?}", key);
24+
25+
// check if connection already exists
26+
if clients.as_ref().unwrap().contains_key(project_id) {
27+
tracing::info!("Redshift connection already exists!");
28+
return Ok(ProjectConnectionStatus::Connected);
29+
}
30+
31+
let (user, password, database, host, port_str, use_ssl) = match key {
32+
Some(key) => (
33+
key[0].to_string(),
34+
key[1].to_string(),
35+
key[2].to_string(),
36+
key[3].to_string(),
37+
key[4].to_string(),
38+
key[5] == "true",
39+
),
40+
None => {
41+
let projects_db = app_state.project_db.lock().await;
42+
let projects_db = projects_db.as_ref().unwrap();
43+
let project_details = projects_db.get(project_id).unwrap();
44+
let project_details = match project_details {
45+
Some(bytes) => bincode::deserialize::<Vec<String>>(&bytes).unwrap(),
46+
_ => Vec::new(),
47+
};
48+
(
49+
project_details[1].clone(),
50+
project_details[2].clone(),
51+
project_details[3].clone(),
52+
project_details[4].clone(),
53+
project_details[5].clone(),
54+
project_details.get(6).map(|s| s == "true").unwrap_or(false),
55+
)
56+
}
57+
};
58+
59+
let port: u16 = port_str.parse::<u16>().unwrap_or(5439); // Redshift default port
60+
let mut cfg = Config::new();
61+
cfg.user(&user);
62+
cfg.password(password);
63+
cfg.dbname(&database);
64+
cfg.host(&host);
65+
cfg.port(port);
66+
67+
tracing::info!("Redshift SSL enabled: {}", use_ssl);
68+
69+
// Redshift always uses SSL in production, but allow NoTls for testing
70+
let tls_connector = TlsConnector::builder()
71+
.danger_accept_invalid_certs(true) // Accept self-signed certs
72+
.build()
73+
.unwrap();
74+
let tls = MakeTlsConnector::new(tls_connector);
75+
76+
let connection = tokio_time::timeout(tokio_time::Duration::from_secs(10), cfg.connect(tls))
77+
.await
78+
.map_err(|_| PostgresqlError::ConnectionTimeout);
79+
80+
if let Err(e) = connection {
81+
tracing::error!("Redshift connection timeout error: {:?}", e);
82+
return Ok(ProjectConnectionStatus::Failed);
83+
}
84+
85+
let connection = connection.unwrap();
86+
if let Err(e) = connection {
87+
tracing::error!("Redshift connection error: {:?}", e);
88+
return Ok(ProjectConnectionStatus::Failed);
89+
}
90+
91+
let is_connection_error = Arc::new(Mutex::new(false));
92+
let (client, connection) = connection.unwrap();
93+
tracing::info!("Redshift connection established!");
94+
95+
// check if connection has some error
96+
tokio::spawn({
97+
let is_connection_error = Arc::clone(&is_connection_error);
98+
async move {
99+
if let Err(e) = connection.await {
100+
tracing::info!("Redshift connection error: {:?}", e);
101+
*is_connection_error.lock().await = true;
102+
}
103+
}
104+
});
105+
106+
if *is_connection_error.lock().await {
107+
tracing::error!("Redshift connection error!");
108+
return Ok(ProjectConnectionStatus::Failed);
109+
}
110+
111+
let clients = clients.as_mut().unwrap();
112+
clients.insert(project_id.to_string(), client);
113+
114+
Ok(ProjectConnectionStatus::Connected)
115+
}
116+
117+
#[tauri::command(rename_all = "snake_case")]
118+
pub async fn redshift_load_schemas(
119+
project_id: &str,
120+
app_state: State<'_, AppState>,
121+
) -> Result<PgsqlLoadSchemas> {
122+
let clients = app_state.client.lock().await;
123+
let client = clients.as_ref().unwrap().get(project_id).unwrap();
124+
125+
// Redshift uses similar schema query but with some differences
126+
let query = tokio_time::timeout(
127+
tokio_time::Duration::from_secs(10),
128+
client.query(
129+
r#"
130+
SELECT schema_name
131+
FROM information_schema.schemata
132+
WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_internal')
133+
ORDER BY schema_name;
134+
"#,
135+
&[],
136+
),
137+
)
138+
.await
139+
.map_err(|_| PostgresqlError::QueryTimeout);
140+
141+
if query.is_err() {
142+
tracing::error!("Redshift schema query timeout error!");
143+
return Err(tauri::Error::Io(std::io::Error::new(
144+
std::io::ErrorKind::Other,
145+
PostgresqlError::QueryTimeout,
146+
)));
147+
}
148+
149+
let query = query.unwrap();
150+
if query.is_err() {
151+
tracing::error!("Redshift schema query error!");
152+
return Err(tauri::Error::Io(std::io::Error::new(
153+
std::io::ErrorKind::Other,
154+
PostgresqlError::QueryError,
155+
)));
156+
}
157+
158+
let qeury = query.unwrap();
159+
let schemas = qeury.iter().map(|r| r.get(0)).collect::<Vec<String>>();
160+
tracing::info!("Redshift schemas: {:?}", schemas);
161+
Ok(schemas)
162+
}
163+
164+
#[tauri::command(rename_all = "snake_case")]
165+
pub async fn redshift_load_tables(
166+
project_id: &str,
167+
schema: &str,
168+
app_state: State<'_, AppState>,
169+
) -> Result<PgsqlLoadTables> {
170+
let clients = app_state.client.lock().await;
171+
let client = clients.as_ref().unwrap().get(project_id).unwrap();
172+
173+
// Use information_schema which is more universally accessible
174+
let query = client
175+
.query(
176+
r#"--sql
177+
SELECT
178+
table_name,
179+
'-' AS size
180+
FROM information_schema.tables
181+
WHERE table_schema = $1
182+
AND table_type = 'BASE TABLE'
183+
ORDER BY table_name;
184+
"#,
185+
&[&schema],
186+
)
187+
.await;
188+
189+
190+
if let Err(e) = query {
191+
tracing::error!("Redshift load tables error: {:?}", e);
192+
return Err(tauri::Error::Io(std::io::Error::new(
193+
std::io::ErrorKind::Other,
194+
format!("Failed to load tables: {:?}", e),
195+
)));
196+
}
197+
198+
let rows = query.unwrap();
199+
let tables = rows
200+
.iter()
201+
.map(|r| (r.get(0), r.get(1)))
202+
.collect::<Vec<(String, String)>>();
203+
Ok(tables)
204+
}
205+
206+
#[tauri::command(rename_all = "snake_case")]
207+
pub async fn redshift_load_columns(
208+
project_id: &str,
209+
schema: &str,
210+
table: &str,
211+
app_state: State<'_, AppState>,
212+
) -> Result<PgsqlLoadColumns> {
213+
let clients = app_state.client.lock().await;
214+
let client = clients.as_ref().unwrap().get(project_id).unwrap();
215+
let rows = client
216+
.query(
217+
r#"--sql
218+
SELECT column_name
219+
FROM information_schema.columns
220+
WHERE table_schema = $1 AND table_name = $2
221+
ORDER BY ordinal_position;
222+
"#,
223+
&[&schema, &table],
224+
)
225+
.await
226+
.unwrap();
227+
let cols = rows
228+
.iter()
229+
.map(|r| r.get::<_, String>(0))
230+
.collect::<Vec<String>>();
231+
Ok(cols)
232+
}
233+
234+
#[tauri::command(rename_all = "snake_case")]
235+
pub async fn redshift_run_query(
236+
project_id: &str,
237+
sql: &str,
238+
app_state: State<'_, AppState>,
239+
) -> Result<(Vec<String>, Vec<Vec<String>>, f32)> {
240+
let start = Instant::now();
241+
let clients = app_state.client.lock().await;
242+
let client = clients.as_ref().unwrap().get(project_id).unwrap();
243+
let rows = client.query(sql, &[]).await.unwrap();
244+
245+
if rows.is_empty() {
246+
return Ok((Vec::new(), Vec::new(), 0.0f32));
247+
}
248+
249+
let columns = rows
250+
.first()
251+
.unwrap()
252+
.columns()
253+
.iter()
254+
.map(|c| c.name().to_string())
255+
.collect::<Vec<String>>();
256+
let rows = rows
257+
.iter()
258+
.map(|row| {
259+
let mut row_values = Vec::new();
260+
for i in 0..row.len() {
261+
let value = reflective_get(row, i);
262+
row_values.push(value);
263+
}
264+
row_values
265+
})
266+
.collect::<Vec<Vec<String>>>();
267+
let elasped = start.elapsed().as_millis() as f32;
268+
Ok((columns, rows, elasped))
269+
}
270+

src-tauri/src/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ fn main() {
8080
drivers::pgsql::pgsql_load_tables,
8181
drivers::pgsql::pgsql_load_columns,
8282
drivers::pgsql::pgsql_run_query,
83+
drivers::redshift::redshift_connector,
84+
drivers::redshift::redshift_load_schemas,
85+
drivers::redshift::redshift_load_tables,
86+
drivers::redshift::redshift_load_columns,
87+
drivers::redshift::redshift_run_query,
8388
])
8489
.run(tauri::generate_context!())
8590
.expect("error while running tauri application");

0 commit comments

Comments
 (0)