Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 24 additions & 35 deletions crates/hfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,25 @@ use helios_audit::{
};
use helios_auth::{AuthConfig, InMemoryJtiCache, JtiCache, JwksBearerAuthProvider, JwksCache};
use helios_persistence::{BackendKind, ResourceStorage, TenantContext};
use helios_rest::{
AuthMiddlewareState, ServerConfig, StorageBackendMode, create_app_with_auth, init_logging,
};
use helios_rest::{AuthMiddlewareState, ServerConfig, StorageBackendMode, init_logging};
use tracing::info;

use helios_persistence::backends::local_fs::LocalFsOutputStore;
#[cfg(any(feature = "sqlite", feature = "postgres"))]
use helios_persistence::core::SettingsStore;
use helios_persistence::core::{
BulkExportJobStore, DefaultExportWorker, ExportOutputStore, WorkerId,
};
#[cfg(any(feature = "sqlite", feature = "postgres"))]
use helios_rest::bulk_export_auth::BearerScopeAuth;
// Settings-capable standalone backends (SQLite, PostgreSQL).
#[cfg(any(feature = "sqlite", feature = "postgres"))]
use helios_rest::create_app_with_auth_bulk_export_and_settings;
// Composite/secondary backends (MongoDB, Elasticsearch, S3) that do not host a
// settings store and so use the plain app builders.
#[cfg(any(feature = "mongodb", feature = "elasticsearch", feature = "s3"))]
use helios_rest::create_app_with_auth;
#[cfg(any(feature = "mongodb", feature = "elasticsearch"))]
use helios_rest::create_app_with_auth_and_bulk_export;

#[cfg(feature = "sqlite")]
Expand Down Expand Up @@ -841,26 +848,17 @@ async fn start_sqlite(
let serve_audit_state = audit_state.clone();
let backend = Arc::new(create_sqlite_backend(&config)?);

if let Some(bundle) = build_bulk_export(&config, backend.clone(), backend.clone()).await? {
let app = create_app_with_auth_and_bulk_export(
backend,
config.clone(),
auth_config,
auth_state,
audit_state,
bundle,
);
return serve(app, &config, serve_audit_state).await;
}

let app = create_app_with_auth(
Arc::try_unwrap(backend).unwrap_or_else(|_| {
unreachable!("backend Arc is uniquely owned when bulk export is disabled")
}),
// The SQLite backend also hosts the per-user settings store.
let settings_store: Option<Arc<dyn SettingsStore>> = Some(backend.clone());
let bundle = build_bulk_export(&config, backend.clone(), backend.clone()).await?;
let app = create_app_with_auth_bulk_export_and_settings(
backend,
config.clone(),
auth_config,
auth_state,
audit_state,
bundle,
settings_store,
);
serve(app, &config, serve_audit_state).await
}
Expand Down Expand Up @@ -1272,26 +1270,17 @@ async fn start_postgres(
let backend = Arc::new(backend);

let serve_audit_state = audit_state.clone();
if let Some(bundle) = build_bulk_export(&config, backend.clone(), backend.clone()).await? {
let app = create_app_with_auth_and_bulk_export(
backend,
config.clone(),
auth_config,
auth_state,
audit_state,
bundle,
);
return serve(app, &config, serve_audit_state).await;
}

let app = create_app_with_auth(
Arc::try_unwrap(backend).unwrap_or_else(|_| {
unreachable!("backend Arc is uniquely owned when bulk export is disabled")
}),
// The PostgreSQL backend also hosts the per-user settings store.
let settings_store: Option<Arc<dyn SettingsStore>> = Some(backend.clone());
let bundle = build_bulk_export(&config, backend.clone(), backend.clone()).await?;
let app = create_app_with_auth_bulk_export_and_settings(
backend,
config.clone(),
auth_config,
auth_state,
audit_state,
bundle,
settings_store,
);
serve(app, &config, serve_audit_state).await
}
Expand Down
13 changes: 10 additions & 3 deletions crates/hts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ It can also be used standalone as a general-purpose FHIR terminology service, in

An open test server will soon be available at https://hts.heliossoftware.com/ for experimentation and evaluation.

HTS currently uses SQLite as its database backend. PostgreSQL support is planned for a future release - see [Storage Backends](#storage-backends) for details.
HTS supports both SQLite (the zero-config default) and PostgreSQL database backends, with full feature parity between them - see [Storage Backends](#storage-backends) for details.

### Terminology Data

Expand Down Expand Up @@ -402,10 +402,17 @@ The `value_set_expansions` table acts as a write-through cache: the first `$expa

### PostgreSQL

PostgreSQL backend support is planned for a future release. The schema, query patterns, and persistence trait surface have been designed with multi-backend portability in mind, and the integration is being staged behind feature work tracked separately. Until it lands, all production deployments should use the SQLite backend documented above.
HTS fully supports PostgreSQL as an alternative backend, with feature parity with SQLite across all terminology operations, CRUD, search, and import formats. The schema mirrors the SQLite layout and is applied automatically on first connection, so no manual migration step is required.

Build with the `postgres` feature, then select the backend with `--storage-backend postgres` (or `HTS_STORAGE_BACKEND=postgres`) and a `postgresql://` connection string:

```bash
# Coming soon
# Build with PostgreSQL support
cargo build --release -p helios-hts --features postgres

# Run against PostgreSQL
hts run --storage-backend postgres \
--database-url "postgresql://user:pass@localhost/hts"
```

## API Endpoints
Expand Down
1 change: 1 addition & 0 deletions crates/persistence/src/backends/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,6 @@ pub mod search;
mod search_impl;
mod storage;
mod transaction;
mod user_settings;

pub use backend::{PostgresBackend, PostgresConfig};
25 changes: 24 additions & 1 deletion crates/persistence/src/backends/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::error::{BackendError, StorageResult};

/// Current schema version.
pub const SCHEMA_VERSION: i32 = 10;
pub const SCHEMA_VERSION: i32 = 11;

/// Initialize the database schema.
pub async fn initialize_schema(client: &deadpool_postgres::Client) -> StorageResult<()> {
Expand Down Expand Up @@ -273,6 +273,7 @@ async fn migrate_schema(
7 => migrate_v7_to_v8(client).await?,
8 => migrate_v8_to_v9(client).await?,
9 => migrate_v9_to_v10(client).await?,
10 => migrate_v10_to_v11(client).await?,
_ => {
return Err(pg_error(format!("Unknown schema version: {}", version)));
}
Expand Down Expand Up @@ -676,6 +677,28 @@ async fn migrate_v9_to_v10(client: &deadpool_postgres::Client) -> StorageResult<
Ok(())
}

/// v10 -> v11: Add the `user_settings` table backing the per-user UI settings
/// store (theme, default tenant, active FHIR version, recent queries, …).
///
/// One opaque JSONB document is stored per user, keyed by `user_key`, with a
/// monotonic `version` for optimistic locking. This table is independent of the
/// FHIR `resources` table so UI preferences never leak into FHIR machinery.
async fn migrate_v10_to_v11(client: &deadpool_postgres::Client) -> StorageResult<()> {
client
.execute(
"CREATE TABLE IF NOT EXISTS user_settings (
user_key TEXT PRIMARY KEY,
data JSONB NOT NULL,
version BIGINT NOT NULL DEFAULT 1,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)",
&[],
)
.await
.map_err(|e| pg_error(format!("Migration v10->v11 failed: {}", e)))?;
Ok(())
}

fn pg_error(message: String) -> crate::error::StorageError {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "postgres".to_string(),
Expand Down
156 changes: 156 additions & 0 deletions crates/persistence/src/backends/postgres/user_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//! PostgreSQL implementation of the per-user [`SettingsStore`].
//!
//! Each user owns a single row in the `user_settings` table holding an opaque
//! JSONB document plus a monotonic `version` used for optimistic locking. Writes
//! run a `SELECT … FOR UPDATE` read-modify-write inside a transaction so
//! concurrent updates to the same user serialize correctly and the `If-Match`
//! precondition is checked against the live row.

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_json::Value;

use crate::core::user_settings::{SettingsStore, StoredUserSettings, apply_merge_patch};
use crate::error::{BackendError, ConcurrencyError, StorageError, StorageResult};

use super::PostgresBackend;

impl PostgresBackend {
/// Read-modify-write a user's settings document inside a single transaction,
/// locking the row with `SELECT … FOR UPDATE`.
///
/// `compute` receives the currently stored document (or `None` when the user
/// has no settings yet) and returns the document to persist. The optimistic
/// `if_match_version` precondition — where `Some(0)` asserts "does not yet
/// exist" — is checked against the locked row before `compute` runs.
async fn write_settings(
&self,
user_key: &str,
if_match_version: Option<i64>,
compute: impl FnOnce(Option<Value>) -> Value + Send,
) -> StorageResult<StoredUserSettings> {
let mut client = self.get_client().await?;
let txn = client
.transaction()
.await
.map_err(|e| backend_err(format!("begin user_settings transaction: {e}")))?;

let current = txn
.query_opt(
"SELECT version, data FROM user_settings WHERE user_key = $1 FOR UPDATE",
&[&user_key],
)
.await
.map_err(|e| backend_err(format!("read user_settings: {e}")))?;

let (current_version, current_doc) = match &current {
Some(row) => {
let version: i64 = row.get(0);
let doc: Value = row.get(1);
(version, Some(doc))
}
None => (0, None),
};

if let Some(expected) = if_match_version
&& expected != current_version
{
return Err(lock_failure(user_key, expected, current_version));
}

let new_doc = compute(current_doc);
let new_version = current_version + 1;
let now = Utc::now();

txn.execute(
"INSERT INTO user_settings (user_key, data, version, updated_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_key)
DO UPDATE SET data = $2, version = $3, updated_at = $4",
&[&user_key, &new_doc, &new_version, &now],
)
.await
.map_err(|e| backend_err(format!("write user_settings: {e}")))?;

txn.commit()
.await
.map_err(|e| backend_err(format!("commit user_settings: {e}")))?;

Ok(StoredUserSettings {
user_key: user_key.to_string(),
document: new_doc,
version: new_version,
updated_at: now,
})
}
}

#[async_trait]
impl SettingsStore for PostgresBackend {
async fn get_settings(&self, user_key: &str) -> StorageResult<Option<StoredUserSettings>> {
let client = self.get_client().await?;
let row = client
.query_opt(
"SELECT data, version, updated_at FROM user_settings WHERE user_key = $1",
&[&user_key],
)
.await
.map_err(|e| backend_err(format!("read user_settings: {e}")))?;

Ok(row.map(|row| {
let document: Value = row.get(0);
let version: i64 = row.get(1);
let updated_at: DateTime<Utc> = row.get(2);
StoredUserSettings {
user_key: user_key.to_string(),
document,
version,
updated_at,
}
}))
}

async fn put_settings(
&self,
user_key: &str,
document: Value,
if_match_version: Option<i64>,
) -> StorageResult<StoredUserSettings> {
self.write_settings(user_key, if_match_version, move |_current| document)
.await
}

async fn patch_settings(
&self,
user_key: &str,
merge_patch: Value,
if_match_version: Option<i64>,
) -> StorageResult<StoredUserSettings> {
self.write_settings(user_key, if_match_version, move |current| {
apply_merge_patch(
current.unwrap_or_else(|| Value::Object(Default::default())),
&merge_patch,
)
})
.await
}
}

/// Builds an `OptimisticLockFailure` for a `user_settings` write whose
/// `If-Match` precondition did not match the live version.
fn lock_failure(user_key: &str, expected: i64, actual: i64) -> StorageError {
StorageError::Concurrency(ConcurrencyError::OptimisticLockFailure {
resource_type: "UserSettings".to_string(),
id: user_key.to_string(),
expected_etag: format!("W/\"{expected}\""),
actual_etag: Some(format!("W/\"{actual}\"")),
})
}

fn backend_err(message: String) -> StorageError {
StorageError::Backend(BackendError::Internal {
backend_name: "postgres".to_string(),
message,
source: None,
})
}
1 change: 1 addition & 0 deletions crates/persistence/src/backends/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ pub mod search;
mod search_impl;
mod storage;
mod transaction;
mod user_settings;

pub use backend::{SqliteBackend, SqliteBackendConfig};
24 changes: 23 additions & 1 deletion crates/persistence/src/backends/sqlite/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rusqlite::Connection;
use crate::error::StorageResult;

/// Current schema version.
pub const SCHEMA_VERSION: i32 = 10;
pub const SCHEMA_VERSION: i32 = 11;

/// Initialize the database schema.
pub fn initialize_schema(conn: &Connection) -> StorageResult<()> {
Expand Down Expand Up @@ -267,6 +267,7 @@ fn migrate_schema(conn: &Connection, from_version: i32) -> StorageResult<()> {
7 => migrate_v7_to_v8(conn)?,
8 => migrate_v8_to_v9(conn)?,
9 => migrate_v9_to_v10(conn)?,
10 => migrate_v10_to_v11(conn)?,
_ => {
return Err(crate::error::StorageError::Backend(
crate::error::BackendError::Internal {
Expand Down Expand Up @@ -1005,6 +1006,27 @@ fn migrate_v9_to_v10(conn: &Connection) -> StorageResult<()> {
Ok(())
}

/// Migrate from schema version 10 to version 11.
///
/// Adds the `user_settings` table that backs the per-user UI settings store
/// (theme, default tenant, active FHIR version, recent queries, …). One opaque
/// JSON document is stored per user, keyed by `user_key`, with a monotonic
/// `version` for optimistic locking. This table is intentionally independent of
/// the FHIR `resources` table so UI preferences never leak into FHIR machinery.
fn migrate_v10_to_v11(conn: &Connection) -> StorageResult<()> {
conn.execute(
"CREATE TABLE IF NOT EXISTS user_settings (
user_key TEXT NOT NULL PRIMARY KEY,
data BLOB NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
updated_at TEXT NOT NULL
)",
[],
)
.map_err(|e| migration_err(format!("create user_settings table: {e}")))?;
Ok(())
}

fn migration_err(message: String) -> crate::error::StorageError {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
Expand Down
Loading