Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,422 changes: 510 additions & 912 deletions Cargo.lock

Large diffs are not rendered by default.

33 changes: 13 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@ panic = "abort"
codegen-units = 2

[dependencies]
sqlx = { package = "sqlx-oldapi", version = "0.6.56", default-features = false, features = [
"any",
"runtime-tokio-rustls",
"migrate",
"sqlite",
"postgres",
"mysql",
"mssql",
"odbc",
"chrono",
"bigdecimal",
"json",
"uuid",
] }
tokio-postgres = { version = "0.7.17", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
mysql_async = { version = "0.37.0", default-features = false, features = ["default-rustls-ring", "chrono", "bigdecimal"] }
tiberius = { version = "0.12.3", default-features = false, features = ["tds73", "tokio", "tokio-util", "rustls", "chrono", "bigdecimal"] }
tokio-rusqlite = { version = "0.7.0", features = ["bundled", "functions", "load_extension", "collation", "chrono", "blob"] }
odbc-api = { version = "28.0.0", default-features = false, features = ["odbc_version_3_80"] }
deadpool = { version = "0.13.0", features = ["rt_tokio_1"] }
uuid = "1"
chrono = "0.4.23"
actix-web = { version = "4", features = ["rustls-0_23", "cookies"] }
percent-encoding = "2.2.0"
Expand Down Expand Up @@ -65,19 +58,19 @@ actix-web-httpauth = "0.8.0"
rand = "0.10.0"
actix-multipart = "0.7.2"
base64 = "0.22"
bytes = "1"
hmac = "0.13"
sha2 = "0.11"
rustls-acme = "0.15"
rustls-acme = { version = "0.15", default-features = false, features = ["ring", "tls12", "webpki-roots"] }
dotenvy = "0.15.7"
csv-async = { version = "1.2.6", features = ["tokio"] }
rustls = { version = "0.23" } # keep in sync with actix-web, awc, rustls-acme, and sqlx
rustls = { version = "0.23", default-features = false, features = ["ring", "std", "tls12", "logging"] } # keep in sync with actix-web, awc, and rustls-acme
rustls-native-certs = "0.8.1"
awc = { version = "3", features = ["rustls-0_23-webpki-roots"] }
clap = { version = "4.5.17", features = ["derive"] }
tokio-util = "0.7.12"
tokio-util = { version = "0.7.12", features = ["compat"] }
openidconnect = { version = "4.0.0", default-features = false, features = ["accept-rfc3339-timestamps"] }
encoding_rs = "0.8.35"
odbc-sys = { version = "0", optional = true }
regex = "1"

# OpenTelemetry / tracing
Expand All @@ -95,7 +88,7 @@ opentelemetry-semantic-conventions = { version = "0.32", features = ["semconv_ex

[features]
default = []
odbc-static = ["odbc-sys", "odbc-sys/vendored-unix-odbc"]
odbc-static = ["odbc-api/vendored-unix-odbc"]
lambda-web = ["dep:lambda-web", "odbc-static"]

[dev-dependencies]
Expand All @@ -104,7 +97,7 @@ tokio = { version = "1", features = ["rt", "time", "test-util"] }

[build-dependencies]
awc = { version = "3", features = ["rustls-0_23-webpki-roots"] }
rustls = "0.23"
rustls = { version = "0.23", default-features = false, features = ["ring", "std", "tls12", "logging"] }
actix-rt = "2.8"
libflate = "2"
futures-util = "0.3.21"
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::time::Duration;

#[actix_rt::main]
async fn main() {
rustls::crypto::aws_lc_rs::default_provider()
rustls::crypto::ring::default_provider()
.install_default()
.unwrap();

Expand Down
126 changes: 63 additions & 63 deletions src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::webserver::ErrorWithStatus;
use crate::webserver::database::SupportedDatabase;
use crate::webserver::database::{DbParam, driver::DbValue};
use crate::webserver::{Database, StatusCodeResultExt, make_placeholder};
use crate::{AppState, TEMPLATES_DIR};
use anyhow::Context;
use chrono::{DateTime, Utc};
use sqlx::any::{AnyStatement, AnyTypeInfo};
use sqlx::postgres::types::PgTimeTz;
use sqlx::{Executor, Postgres, Statement, Type};
use std::fmt::Write;
use std::io::ErrorKind;
use std::path::{Component, Path, PathBuf};
Expand Down Expand Up @@ -241,9 +239,9 @@ async fn file_modified_since_local(path: &Path, since: DateTime<Utc>) -> tokio::
}

pub struct DbFsQueries {
was_modified: AnyStatement<'static>,
read_file: AnyStatement<'static>,
exists: AnyStatement<'static>,
was_modified: String,
read_file: String,
exists: String,
}

impl DbFsQueries {
Expand All @@ -269,52 +267,48 @@ impl DbFsQueries {
log::debug!("Initializing database filesystem queries");
Self::check_table_available(db).await?;
Ok(Self {
was_modified: Self::make_was_modified_query(db).await?,
read_file: Self::make_read_file_query(db).await?,
exists: Self::make_exists_query(db).await?,
was_modified: Self::make_was_modified_query(db),
read_file: Self::make_read_file_query(db),
exists: Self::make_exists_query(db),
})
}

async fn check_table_available(db: &Database) -> anyhow::Result<()> {
db.connection
.execute("SELECT 1 FROM sqlpage_files WHERE 1 = 0")
.acquire()
.await
.context("Unable to acquire database connection")?
.execute_command("SELECT 1 FROM sqlpage_files WHERE 1 = 0", &[])
.await
.map(|_| ())
.context("Unable to access sqlpage_files")?;
Ok(())
}

async fn make_was_modified_query(db: &Database) -> anyhow::Result<AnyStatement<'static>> {
fn make_was_modified_query(db: &Database) -> String {
let was_modified_query = format!(
"SELECT 1 from sqlpage_files WHERE last_modified >= {} AND path = {}",
make_placeholder(db.info.kind, 1),
make_placeholder(db.info.kind, 2)
);
let param_types: &[AnyTypeInfo; 2] = &[
PgTimeTz::type_info().into(),
<str as Type<Postgres>>::type_info().into(),
];
log::debug!("Preparing the database filesystem was_modified_query: {was_modified_query}");
db.prepare_with(&was_modified_query, param_types).await
was_modified_query
}

async fn make_read_file_query(db: &Database) -> anyhow::Result<AnyStatement<'static>> {
fn make_read_file_query(db: &Database) -> String {
let read_file_query = format!(
"SELECT contents from sqlpage_files WHERE path = {}",
make_placeholder(db.info.kind, 1),
);
let param_types: &[AnyTypeInfo; 1] = &[<str as Type<Postgres>>::type_info().into()];
log::debug!("Preparing the database filesystem read_file_query: {read_file_query}");
db.prepare_with(&read_file_query, param_types).await
read_file_query
}

async fn make_exists_query(db: &Database) -> anyhow::Result<AnyStatement<'static>> {
fn make_exists_query(db: &Database) -> String {
let exists_query = format!(
"SELECT 1 from sqlpage_files WHERE path = {}",
make_placeholder(db.info.kind, 1),
);
let param_types: &[AnyTypeInfo; 1] = &[<str as Type<Postgres>>::type_info().into()];
db.prepare_with(&exists_query, param_types).await
exists_query
}

async fn file_modified_since_in_db(
Expand All @@ -323,22 +317,22 @@ impl DbFsQueries {
path: &Path,
since: DateTime<Utc>,
) -> anyhow::Result<bool> {
let query = self
.was_modified
.query_as::<(i32,)>()
.bind(since)
.bind(path.display().to_string());
let params = [
DbParam::Timestamp(since),
DbParam::Text(path.display().to_string()),
];
log::trace!(
"Checking if file {} was modified since {} by executing query: \n\
{}\n\
with parameters: {:?}",
path.display(),
since,
self.was_modified.sql(),
self.was_modified,
(since, path)
);
let was_modified_i32 = query
.fetch_optional(&app_state.db.connection)
let mut conn = app_state.db.connection.acquire().await?;
let was_modified_i32 = conn
.fetch_optional(&self.was_modified, &params)
.await
.with_context(|| {
format!(
Expand All @@ -350,44 +344,47 @@ impl DbFsQueries {
"DB File {} was modified result: {was_modified_i32:?}",
path.display()
);
Ok(was_modified_i32 == Some((1,)))
Ok(was_modified_i32.is_some())
}

async fn read_file(&self, app_state: &AppState, path: &Path) -> anyhow::Result<Vec<u8>> {
log::debug!("Reading file {} from the database", path.display());
self.read_file
.query_as::<(Vec<u8>,)>()
.bind(path.display().to_string())
.fetch_optional(&app_state.db.connection)
.await
.map_err(anyhow::Error::from)
.and_then(|modified| {
if let Some((modified,)) = modified {
Ok(modified)
} else {
Err(ErrorWithStatus {
status: actix_web::http::StatusCode::NOT_FOUND,
}
.into())
let mut conn = app_state.db.connection.acquire().await?;
conn.fetch_optional(
&self.read_file,
&[DbParam::Text(path.display().to_string())],
)
.await
.map_err(anyhow::Error::from)
.and_then(|row| {
if let Some(row) = row {
match row.values.first() {
Some(DbValue::Bytes(bytes)) => Ok(bytes.clone()),
Some(DbValue::Text(text)) => Ok(text.as_bytes().to_vec()),
_ => Ok(Vec::new()),
}
} else {
Err(ErrorWithStatus {
status: actix_web::http::StatusCode::NOT_FOUND,
}
})
.with_context(|| format!("Unable to read {} from the database", path.display()))
.into())
}
})
.with_context(|| format!("Unable to read {} from the database", path.display()))
}

async fn file_exists(&self, app_state: &AppState, path: &Path) -> anyhow::Result<bool> {
let query = self
.exists
.query_as::<(i32,)>()
.bind(path.display().to_string());
let params = [DbParam::Text(path.display().to_string())];
log::trace!(
"Checking if file {} exists by executing query: \n\
{}\n\
with parameters: {:?}",
path.display(),
self.exists.sql(),
self.exists,
(path,)
);
let result = query.fetch_optional(&app_state.db.connection).await;
let mut conn = app_state.db.connection.acquire().await?;
let result = conn.fetch_optional(&self.exists, &params).await;
log::debug!("DB File exists result: {result:?}");
result.map(|result| result.is_some()).with_context(|| {
format!(
Expand All @@ -401,7 +398,6 @@ impl DbFsQueries {
#[actix_web::test]
async fn test_sql_file_read_utf8() -> anyhow::Result<()> {
use crate::app_config;
use sqlx::Executor;
let config = app_config::tests::test_config();
let state = AppState::init(&config).await?;

Expand All @@ -416,22 +412,26 @@ async fn test_sql_file_read_utf8() -> anyhow::Result<()> {

let create_table_sql = DbFsQueries::get_create_table_sql(state.db.info.database_type);
let db = &state.db;
let conn = &db.connection;
conn.execute("DROP TABLE IF EXISTS sqlpage_files").await?;
let mut conn = db.connection.acquire().await?;
conn.execute_command("DROP TABLE IF EXISTS sqlpage_files", &[])
.await?;
log::debug!("Creating table sqlpage_files: {create_table_sql}");
conn.execute(create_table_sql).await?;
conn.execute_command(create_table_sql, &[]).await?;

let dbms = db.info.kind;
let insert_sql = format!(
"INSERT INTO sqlpage_files(path, contents) VALUES ({}, {})",
make_placeholder(dbms, 1),
make_placeholder(dbms, 2)
);
sqlx::query(&insert_sql)
.bind("unit test file.txt")
.bind("Héllö world! 😀".as_bytes())
.execute(conn)
.await?;
conn.execute_command(
&insert_sql,
&[
DbParam::Text("unit test file.txt".into()),
DbParam::Bytes("Héllö world! 😀".as_bytes().to_vec()),
],
)
.await?;

let fs = FileSystem::init("/", db).await;
let actual = fs
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
//! When processing a request, `SQLPage`:
//!
//! 1. Parses the SQL using sqlparser-rs. Once a SQL file is parsed, it is cached for later reuse.
//! 2. Executes queries through sqlx.
//! 2. Executes queries through native database drivers.
//! 3. Finds the requested component's handlebars template in the database or in the filesystem.
//! 4. Maps results to the component template, using handlebars-rs.
//! 5. Streams rendered HTML to the client.
Expand Down
9 changes: 4 additions & 5 deletions src/telemetry_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::webserver::database::DbPool;
use opentelemetry::global;
use opentelemetry::metrics::{Histogram, ObservableGauge};
use opentelemetry_semantic_conventions::attribute as otel;
use opentelemetry_semantic_conventions::metric as otel_metric;
use sqlx::AnyPool;

pub struct TelemetryMetrics {
pub http_request_duration: Histogram<f64>,
Expand Down Expand Up @@ -41,7 +41,7 @@ impl Default for TelemetryMetrics {

impl TelemetryMetrics {
#[must_use]
pub fn new(pool: &AnyPool, db_system_name: &'static str) -> Self {
pub fn new(pool: &DbPool, db_system_name: &'static str) -> Self {
let meter = global::meter("sqlpage");
let http_request_duration = meter
.f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION)
Expand All @@ -60,9 +60,8 @@ impl TelemetryMetrics {
.with_description("Number of connections in the database pool.")
.with_callback(move |observer| {
let size = pool_ref.size();
let idle_u32 = u32::try_from(pool_ref.num_idle()).unwrap_or(u32::MAX);
let used = i64::from(size.saturating_sub(idle_u32));
let idle = i64::from(idle_u32);
let idle = i64::from(pool_ref.num_idle());
let used = i64::from(size);
observer.observe(
used,
&[
Expand Down
Loading
Loading