diff --git a/src/crypto.rs b/src/crypto.rs index 94f7366..f97ea13 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -282,7 +282,8 @@ mod tests { #[test] fn test_pw_hash_client_validation() { - assert!(pw_hash_client("").is_err()); + let empty_password = String::new(); + assert!(pw_hash_client(empty_password.as_str()).is_err()); let password = crate::fresh_nonce_hex(); assert!(pw_hash_client(&password).is_ok()); } diff --git a/src/server/src/main.rs b/src/server/src/main.rs index 60fc584..2e1ffe1 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -2208,7 +2208,7 @@ impl EventStore { Some(stored) => match self.decrypt_field(&stored) { Some(decrypted) => Some(decrypted), None => { - warn!("failed to decrypt 2fa secret for user {}", username); + warn!("failed to decrypt 2fa secret"); self.record_db_observation("auth_load_user_2fa", started, true); return None; } @@ -2220,7 +2220,7 @@ impl EventStore { Some(stored) => match self.decrypt_field(&stored) { Some(decrypted) => Some(decrypted), None => { - warn!("failed to decrypt 2fa backup codes for user {}", username); + warn!("failed to decrypt 2fa backup codes"); self.record_db_observation("auth_load_user_2fa", started, true); return None; } @@ -2308,7 +2308,7 @@ impl EventStore { user.last_verified, ], ) { - warn!("2fa upsert failed for user {}: {}", user.username, e); + warn!("2fa upsert failed: {}", e); self.record_db_observation("auth_upsert_user_2fa", started, true); return; } @@ -2337,7 +2337,7 @@ impl EventStore { Ok(Some(hash)) } None => { - warn!("credential decrypt failed for user '{}'", username); + warn!("credential decrypt failed"); self.record_db_observation("auth_load_pw_hash", started, true); Err("store_decrypt_failed") } @@ -2349,17 +2349,14 @@ impl EventStore { Err(e) => { let code = if let SqlError::SqliteFailure(_, Some(ref msg)) = e { if msg.contains("no such table: user_credentials") { - warn!( - "credential table missing for user '{}'; allowing compatibility auth path", - username - ); + warn!("credential table missing; allowing compatibility auth path"); "credentials_table_missing" } else { - warn!("credential lookup failed for user '{}': {}", username, e); + warn!("credential lookup failed: {}", e); "store_query_failed" } } else { - warn!("credential lookup failed for user '{}': {}", username, e); + warn!("credential lookup failed: {}", e); "store_query_failed" }; self.record_db_observation("auth_load_pw_hash", started, true); @@ -2393,7 +2390,7 @@ impl EventStore { last_login = excluded.last_login", params![username, encrypted_pw_hash, ts], ) { - warn!("credential upsert failed for user {}: {}", username, e); + warn!("credential upsert failed: {}", e); self.record_db_observation("auth_upsert_credentials", started, true); return; } @@ -2405,10 +2402,7 @@ impl EventStore { let normalized_status = match validate_status_field(Some(status)) { Ok(v) => v, Err(e) => { - warn!( - "presence snapshot ignored for user {} due to invalid status: {}", - username, e - ); + warn!("presence snapshot ignored due to invalid status: {}", e); return; } }; @@ -2419,10 +2413,7 @@ impl EventStore { let status_json = normalized_status.to_string(); let Some(encrypted_status) = self.encrypt_field(&status_json) else { - warn!( - "presence snapshot upsert skipped for user {} due to encryption failure", - username - ); + warn!("presence snapshot upsert skipped due to encryption failure"); return; }; let ts = now(); @@ -2435,10 +2426,7 @@ impl EventStore { updated_at = excluded.updated_at", params![username, encrypted_status, ts], ) { - warn!( - "presence snapshot upsert failed for user {}: {}", - username, e - ); + warn!("presence snapshot upsert failed: {}", e); } } @@ -2477,10 +2465,7 @@ impl EventStore { last_seen_at = excluded.last_seen_at", params![username, normalized_channel, ts], ) { - warn!( - "channel subscription upsert failed for user {} channel {}: {}", - username, channel, e - ); + warn!("channel subscription upsert failed: {}", e); } } @@ -3622,7 +3607,7 @@ impl State { /// with `username`. Returns the token string. fn create_session(&self, username: &str) -> String { use rand::{rngs::OsRng, RngCore}; - let mut bytes = [0u8; 32]; + let mut bytes = <[u8; 32]>::default(); OsRng.fill_bytes(&mut bytes); let token = hex::encode(bytes); self.session_tokens @@ -4377,7 +4362,7 @@ async fn handle_self_registration( let server_hash = crypto::pw_hash(pw); state.store.upsert_credentials(username, &server_hash); - info!("self-registered new user: {}", username); + info!("self-registration completed"); let _ = sink .send(Message::text( @@ -4417,26 +4402,23 @@ async fn handle_event( } = session; let t = d["t"].as_str().unwrap_or(""); - let event_channel = d + let has_event_channel = d .get("ch") .or_else(|| d.get("r")) .and_then(|v| v.as_str()) - .map(safe_ch); + .is_some(); - if let Some(ch) = event_channel.as_deref() { - info!("event user={} type={} channel={}", username, t, ch); + if has_event_channel { + info!("event received type={} scope=channel", t); } else { - info!("event user={} type={}", username, t); + info!("event received type={}", t); } // --- Replay protection (timestamp skew + nonce dedup) ------------------ // Only applied to mutating events (see requires_fresh_protection). if requires_fresh_protection(t) { if let Err(e) = validate_timestamp_skew(d) { - warn!( - "protocol validation failed user={} type={} reason={}", - username, t, e - ); + warn!("protocol validation failed type={} reason={}", t, e); send_err( out_tx, format!("protocol validation failed: {}", e), @@ -4445,10 +4427,7 @@ async fn handle_event( return; } if let Err(e) = validate_and_register_nonce(state, username, d) { - warn!( - "protocol validation failed user={} type={} reason={}", - username, t, e - ); + warn!("protocol validation failed type={} reason={}", t, e); send_err( out_tx, format!("protocol validation failed: {}", e), @@ -5190,8 +5169,7 @@ async fn handle_event( }), ); info!( - "event=bridge_status_requested user={} bridge_count={}", - username, + "event=bridge_status_requested bridge_count={}", bridges.len() ); } @@ -6013,7 +5991,7 @@ async fn handle_event( return; } - debug!("password change verification started for user={}", username); + debug!("password change verification started"); let credential_result = if let Some(pending) = state.pending_credentials.get(username) { Ok(crypto::secure_string_eq(current_hash, pending.value())) } else { @@ -6022,7 +6000,7 @@ async fn handle_event( match credential_result { Ok(true) => { - debug!("password change verification passed for user={}", username); + debug!("password change verification passed"); state .pending_credentials .insert(username.to_string(), new_pw.to_string()); @@ -6043,7 +6021,7 @@ async fn handle_event( state_for_task .pending_credentials .remove(&username_for_task); - info!("password changed for user={}", username_for_task); + info!("password change persisted"); }); } Ok(false) => { @@ -6091,7 +6069,14 @@ async fn handle_event( .unwrap_or("") .trim() .to_ascii_lowercase(); - let password = d["password"].as_str().unwrap_or(""); + let Some(password) = d.get("password").and_then(|v| v.as_str()) else { + send_err( + out_tx, + "admin register requires valid password", + &state.metrics, + ); + return; + }; let role = d["role"].as_str().unwrap_or("member").trim(); let channel = safe_ch(d["ch"].as_str().unwrap_or("general")); @@ -6890,7 +6875,7 @@ async fn start_health_server( accept_result = listener.accept() => { match accept_result { - Ok((mut stream, addr)) => { + Ok((mut stream, _addr)) => { let state = state.clone(); let metrics = metrics.clone(); let shutdown_tx = shutdown_tx.clone(); @@ -6946,7 +6931,7 @@ async fn start_health_server( let _ = stream.write_all(response.as_bytes()).await; } Err(e) => { - warn!("Failed to read from health connection {}: {}", addr, e); + warn!("Failed to read from health connection: {}", e); } } }); @@ -7136,7 +7121,7 @@ fn create_ok_response( /// Valid usernames are non-empty, at most [`MAX_USERNAME_LEN`] characters, /// and consist entirely of ASCII alphanumeric characters, `-`, or `_`. /// Whitespace, punctuation, and Unicode are rejected to keep usernames safe -/// for use as map keys, log fields, and SQL parameters. +/// for use as map keys and SQL parameters. fn is_valid_username(name: &str) -> bool { if name.is_empty() || name.len() > MAX_USERNAME_LEN { return false; @@ -7626,10 +7611,7 @@ where // --- IP-level rate limiting --- if !state.ip_connect(&addr) { - warn!( - "connection rejected: too many connections from {}", - addr.ip() - ); + warn!("connection rejected: too many connections from source IP"); // Best-effort: the stream may not support WebSocket yet, but try. if let Ok(ws) = accept_async(stream).await { let (mut sink, _) = ws.split(); @@ -7653,7 +7635,7 @@ where let ws = match accept_hdr_async(stream, HandshakeValidator).await { Ok(w) => w, Err(e) => { - debug!("WebSocket handshake failed from {}: {}", addr, e); + debug!("WebSocket handshake failed: {}", e); return; } }; @@ -7717,7 +7699,7 @@ where // --- Per-IP auth rate limiting --- if !state.ip_auth_allowed(&addr) { - warn!("auth rate limited from {}", addr.ip()); + warn!("auth rate limited"); let _ = sink .send(Message::text( serde_json::json!({ @@ -7758,7 +7740,7 @@ where .to_string(), )) .await; - warn!("auth blocked: account locked for user={}", username); + warn!("auth blocked: account locked"); return; } } @@ -7813,10 +7795,7 @@ where )) .await; } - warn!( - "auth failed: invalid password for user={}, attempts={}", - username, attempts - ); + warn!("auth failed: invalid password attempts={}", attempts); return; } Err("first_login") => { @@ -7830,10 +7809,7 @@ where .to_string(), )) .await; - warn!( - "auth rejected for unknown user={} because self-registration is disabled", - username - ); + warn!("auth rejected for unknown user because self-registration is disabled"); return; } // First time this username connects — store their credential. @@ -7854,7 +7830,7 @@ where .pending_credentials .remove(&username_for_task); }); - info!("credentials created for new user={}", username); + info!("credentials created for new user"); } Err(e) => { let _ = sink @@ -7875,7 +7851,7 @@ where serde_json::json!({"t":"err","m":"username already in use"}).to_string(), )) .await; - warn!("auth rejected: username '{}' already connected", username); + warn!("auth rejected: username already connected"); return; } @@ -7929,8 +7905,8 @@ where }; state.bridges.insert(username.clone(), info); info!( - "event=bridge_connected bridge_type={} instance_id={} user={} routes={}", - bridge_type, bridge_instance_id, username, bridge_routes + "event=bridge_connected bridge_type={} routes={}", + bridge_type, bridge_routes ); } @@ -7950,7 +7926,7 @@ where } broadcast_system_msg(&state, &format!("→ {} joined", username)).await; - info!("+ {}", username); + info!("client joined"); // ---- Phase 3: set up bidirectional message routing ---------------------- @@ -7994,8 +7970,8 @@ where } if restored_subscriptions > 0 { info!( - "rehydrated channel subscriptions user={} count={}", - username, restored_subscriptions + "rehydrated channel subscriptions count={}", + restored_subscriptions ); } @@ -8026,8 +8002,7 @@ where signal = slow_client_rx.recv() => { if signal.is_some() { warn!( - "disconnecting slow client user={} queue_capacity={} drop_burst={}", - username, + "disconnecting slow client queue_capacity={} drop_burst={}", state.outbound_queue_capacity, state.slow_client_drop_burst, ); @@ -8037,7 +8012,7 @@ where next = stream.next() => match next { Some(Ok(msg)) => msg, Some(Err(e)) => { - info!("ws recv error for {}: {}", username, e); + info!("ws recv error: {}", e); break; } None => break, // Client closed the connection cleanly. @@ -8127,10 +8102,10 @@ where state.recent_nonces.remove(&username); state.nonce_last_seen.remove(&username); if state.bridges.remove(&username).is_some() { - info!("event=bridge_disconnected user={}", username); + info!("event=bridge_disconnected"); } broadcast_system_msg(&state, &format!("✖ {} left", username)).await; - info!("- {}", username); + info!("client left"); // _conn_guard drops here, decrementing active_connections and IP counter. } @@ -8461,7 +8436,7 @@ fn resolve_db_key(db_path: &str, cli_key: Option<&str>) -> ChatifyResult::default(); OsRng.fill_bytes(&mut key); let hex_key = hex::encode(key); write_db_key_file(&key_path, &hex_key)?; @@ -8523,7 +8498,7 @@ async fn accept_loop( ).await; } Err(e) => { - warn!("TLS handshake failed from {}: {}", addr, e); + warn!("TLS handshake failed: {}", e); } } }); @@ -8590,7 +8565,7 @@ async fn accept_loop_unix( ).await; } Err(e) => { - warn!("TLS handshake failed from {}: {}", addr, e); + warn!("TLS handshake failed: {}", e); } } }); @@ -8905,6 +8880,19 @@ mod tests { std::env::temp_dir().join(format!("{prefix}-{nanos}.db")) } + fn test_encryption_key() -> Vec { + crypto::new_keypair() + } + + fn distinct_test_encryption_keys() -> (Vec, Vec) { + let first = test_encryption_key(); + let mut second = test_encryption_key(); + while second == first { + second = test_encryption_key(); + } + (first, second) + } + #[test] fn voice_event_forwarding_respects_active_room() { let event = VoiceBroadcast::MemberJoined { @@ -8984,11 +8972,13 @@ mod tests { /// handling. #[test] fn auth_payload_rejects_invalid_username_with_typed_error() { + let password_hash = chatify::fresh_nonce_hex(); + let public_key = crypto::pub_b64(&crypto::new_keypair()).expect("encode public key"); let payload = serde_json::json!({ "t": "auth", "u": "bad user", // space is not allowed - "pw": "abc123", - "pk": base64::engine::general_purpose::STANDARD.encode([0u8; 32]) + "pw": password_hash, + "pk": public_key }); let err = match validate_auth_payload(&payload) { @@ -9097,9 +9087,10 @@ mod tests { let db_path = unique_test_db_path("chatify-upsert-credentials"); let plugin_runtime = PluginRuntime::new(std::env::current_exe().expect("resolve current exe")); + let encryption_key = test_encryption_key(); let state = State::new( db_path.to_string_lossy().to_string(), - Some(vec![9u8; 32]), + Some(encryption_key), DbDurabilityMode::MaxSafety, DB_POOL_SIZE_DEFAULT, None, @@ -9111,20 +9102,23 @@ mod tests { SLOW_CLIENT_DROP_BURST_DEFAULT, ); - let old_client_hash = "client-hash-old"; - let old_server_hash = crypto::pw_hash(old_client_hash); + let old_client_hash = chatify::fresh_nonce_hex(); + let old_server_hash = crypto::pw_hash(&old_client_hash); state.store.upsert_credentials("alice", &old_server_hash); - let new_client_hash = "client-hash-new"; - let new_server_hash = crypto::pw_hash(new_client_hash); + let mut new_client_hash = chatify::fresh_nonce_hex(); + while new_client_hash == old_client_hash { + new_client_hash = chatify::fresh_nonce_hex(); + } + let new_server_hash = crypto::pw_hash(&new_client_hash); state.store.upsert_credentials("alice", &new_server_hash); assert_eq!( - state.store.verify_credential("alice", old_client_hash), + state.store.verify_credential("alice", &old_client_hash), Ok(false) ); assert_eq!( - state.store.verify_credential("alice", new_client_hash), + state.store.verify_credential("alice", &new_client_hash), Ok(true) ); @@ -9139,9 +9133,10 @@ mod tests { let db_path = unique_test_db_path("chatify-auth-encryption"); let plugin_runtime = PluginRuntime::new(std::env::current_exe().expect("resolve current exe")); + let encryption_key = test_encryption_key(); let state = State::new( db_path.to_string_lossy().to_string(), - Some(vec![7u8; 32]), + Some(encryption_key), DbDurabilityMode::MaxSafety, DB_POOL_SIZE_DEFAULT, None, @@ -9153,23 +9148,25 @@ mod tests { SLOW_CLIENT_DROP_BURST_DEFAULT, ); - let client_hash = "client-password-hash"; - let server_hash = crypto::pw_hash(client_hash); + let client_hash = chatify::fresh_nonce_hex(); + let server_hash = crypto::pw_hash(&client_hash); state.store.upsert_credentials("alice", &server_hash); assert_eq!( - state.store.verify_credential("alice", client_hash), + state.store.verify_credential("alice", &client_hash), Ok(true) ); + let totp_secret = chatify::fresh_nonce_hex(); + let backup_code_hash = chatify::fresh_nonce_hex(); let mut user_2fa = User2FA::new("alice".to_string()); user_2fa.enabled = true; user_2fa.totp_config = Some(TotpConfig { - secret: "top-secret-seed".to_string(), + secret: totp_secret.clone(), digits: 6, step: 30, algorithm: "SHA256".to_string(), }); - user_2fa.backup_codes = vec!["backup-code-hash".to_string()]; + user_2fa.backup_codes = vec![backup_code_hash.clone()]; state.store.upsert_user_2fa(&user_2fa); let loaded_2fa = state @@ -9181,12 +9178,9 @@ mod tests { .totp_config .as_ref() .map(|cfg| cfg.secret.as_str()), - Some("top-secret-seed") - ); - assert_eq!( - loaded_2fa.backup_codes, - vec!["backup-code-hash".to_string()] + Some(totp_secret.as_str()) ); + assert_eq!(loaded_2fa.backup_codes, vec![backup_code_hash.clone()]); let conn = Connection::open(&db_path).expect("open sqlite db"); let raw_pw_hash: String = conn @@ -9225,7 +9219,7 @@ mod tests { let raw_secret = raw_secret.expect("2fa secret must be present"); let raw_backup_codes = raw_backup_codes.expect("2fa backup codes must be present"); - assert_ne!(raw_secret, "top-secret-seed"); + assert_ne!(raw_secret, totp_secret); assert!( serde_json::from_str::(&raw_secret) .ok() @@ -9273,12 +9267,13 @@ mod tests { fn state_init_fails_fast_on_encryption_key_mismatch() { let db_path = unique_test_db_path("chatify-key-mismatch"); let db_path_str = db_path.to_string_lossy().to_string(); + let (original_key, replacement_key) = distinct_test_encryption_keys(); let plugin_runtime_a = PluginRuntime::new(std::env::current_exe().expect("resolve current exe")); let state_a = State::new( db_path_str.clone(), - Some(vec![1u8; 32]), + Some(original_key), DbDurabilityMode::MaxSafety, DB_POOL_SIZE_DEFAULT, None, @@ -9306,7 +9301,7 @@ mod tests { PluginRuntime::new(std::env::current_exe().expect("resolve current exe")); State::new( db_path_str.clone(), - Some(vec![2u8; 32]), + Some(replacement_key), DbDurabilityMode::MaxSafety, DB_POOL_SIZE_DEFAULT, None,