diff --git a/changelog.d/12203_journald_globbing_matches.feature.md b/changelog.d/12203_journald_globbing_matches.feature.md new file mode 100644 index 0000000000000..8f2010cbfe829 --- /dev/null +++ b/changelog.d/12203_journald_globbing_matches.feature.md @@ -0,0 +1,3 @@ +Added support for globbing in the `journald` source's `include_matches` and `exclude_matches` options. + +authors: AmitPr diff --git a/src/sources/journald.rs b/src/sources/journald.rs index c6177fba1eb6c..adf905669f256 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -11,6 +11,7 @@ use std::{ use bytes::Bytes; use chrono::{TimeZone, Utc}; use futures::{StreamExt, poll, stream::BoxStream, task::Poll}; +use glob::{Pattern, PatternError}; use nix::{ sys::signal::{Signal, kill}, unistd::Pid, @@ -89,6 +90,12 @@ enum BuildError { value, ))] DuplicatedMatches { field: String, value: String }, + #[snafu(display("Invalid glob pattern {:?} for field {:?}: {}", pattern, field, source))] + InvalidGlobPattern { + field: String, + pattern: String, + source: PatternError, + }, #[snafu(display( "`current_boot_only: false` not supported for systemd versions 250 through 257 (got {}).", systemd_version @@ -97,6 +104,7 @@ enum BuildError { } type Matches = HashMap>; +type CompiledMatches = HashMap>; /// Configuration for the `journald` source. #[configurable_component(source("journald", "Collect logs from JournalD."))] @@ -137,6 +145,9 @@ pub struct JournaldConfig { /// If empty or not present, all journal fields are accepted. /// /// If `include_units` is specified, it is merged into this list. + /// + /// Values support glob patterns (e.g., `my-app-*` matches `my-app-foo`). + /// Special characters: `*` (any sequence), `?` (any char), `[...]` (char class). #[serde(default)] #[configurable(metadata( docs::additional_props_description = "The set of field values to match in journal entries that are to be included." @@ -148,6 +159,9 @@ pub struct JournaldConfig { /// excludes the entry from this source. /// /// If `exclude_units` is specified, it is merged into this list. + /// + /// Values support glob patterns (e.g., `my-app-*` matches `my-app-foo`). + /// Special characters: `*` (any sequence), `?` (any char), `[...]` (char class). #[serde(default)] #[configurable(metadata( docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded." @@ -367,6 +381,9 @@ impl SourceConfig for JournaldConfig { return Err(BuildError::DuplicatedMatches { field, value }.into()); } + let include_matches = compile_matches(&include_matches)?; + let exclude_matches = compile_matches(&exclude_matches)?; + let mut checkpoint_path = data_dir; checkpoint_path.push(CHECKPOINT_FILENAME); @@ -429,8 +446,8 @@ impl SourceConfig for JournaldConfig { } struct JournaldSource { - include_matches: Matches, - exclude_matches: Matches, + include_matches: CompiledMatches, + exclude_matches: CompiledMatches, checkpoint_path: PathBuf, batch_size: usize, remap_priority: bool, @@ -1008,7 +1025,7 @@ fn remap_priority(priority: &mut JsonValue) { } } -fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool { +fn filter_matches(record: &Record, includes: &CompiledMatches, excludes: &CompiledMatches) -> bool { match (includes.is_empty(), excludes.is_empty()) { (true, true) => false, (false, true) => !contains_match(record, includes), @@ -1017,14 +1034,13 @@ fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bo } } -fn contains_match(record: &Record, matches: &Matches) -> bool { - let f = move |(field, value)| { +fn contains_match(record: &Record, matches: &CompiledMatches) -> bool { + record.iter().any(|(field, value)| { matches .get(field) - .map(|x| x.contains(value)) + .map(|patterns| patterns.iter().any(|p| p.matches(value))) .unwrap_or(false) - }; - record.iter().any(f) + }) } fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> { @@ -1043,6 +1059,24 @@ fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(Str None } +fn compile_matches(matches: &Matches) -> Result { + let mut compiled = HashMap::new(); + for (field, values) in matches { + let patterns: Result, _> = values + .iter() + .map(|v| { + Pattern::new(v).map_err(|source| BuildError::InvalidGlobPattern { + field: field.clone(), + pattern: v.clone(), + source, + }) + }) + .collect(); + compiled.insert(field.clone(), patterns?); + } + Ok(compiled) +} + enum Finalizer { Sync(SharedCheckpointer), Async(OrderedFinalizer), @@ -1307,6 +1341,10 @@ mod tests { matches } + fn compile_test_matches(matches: &Matches) -> CompiledMatches { + compile_matches(matches).expect("test patterns should be valid") + } + #[tokio::test] async fn reads_journal() { let received = run_with_units(&[], &[], None).await; @@ -1488,9 +1526,9 @@ mod tests { #[test] fn filter_matches_works_correctly() { - let empty: Matches = HashMap::new(); - let includes = create_unit_matches(vec!["one", "two"]); - let excludes = create_unit_matches(vec!["foo", "bar"]); + let empty: CompiledMatches = HashMap::new(); + let includes = compile_test_matches(&create_unit_matches(vec!["one", "two"])); + let excludes = compile_test_matches(&create_unit_matches(vec!["foo", "bar"])); let zero = HashMap::new(); assert!(!filter_matches(&zero, &empty, &empty)); @@ -1822,4 +1860,70 @@ mod tests { matches_schema(&config, LogNamespace::Legacy) } + + #[tokio::test] + async fn includes_matches_with_glob() { + // Test that *.service glob matches service units + let include_matches = create_matches(vec![(SYSTEMD_UNIT, "*.service")]); + let received = run_journal(include_matches, HashMap::new(), None, false).await; + // Should match: unit.service, badunit.service, syslog.service, NetworkManager.service + assert_eq!(received.len(), 4); + // Verify we got the expected service units + let messages: Vec<_> = received.iter().map(|e| message(e)).collect(); + assert!(messages.contains(&Value::Bytes("unit message".into()))); + assert!(messages.contains(&Value::Bytes("¿Hello?".into()))); // badunit.service + } + + #[tokio::test] + async fn excludes_matches_with_glob() { + // Test that bad* glob excludes badunit.service + let exclude_matches = create_matches(vec![(SYSTEMD_UNIT, "bad*")]); + let received = run_journal(HashMap::new(), exclude_matches, None, false).await; + // Should exclude badunit.service (1 entry), leaving 7 entries + assert_eq!(received.len(), 7); + // Verify badunit.service message is NOT in results + let messages: Vec<_> = received.iter().map(|e| message(e)).collect(); + assert!(!messages.contains(&Value::Bytes("¿Hello?".into()))); + } + + #[test] + fn contains_match_with_glob_works() { + // Test glob pattern matching in contains_match + let matches = compile_test_matches(&create_matches(vec![ + (SYSTEMD_UNIT, "*.service"), + ("PRIORITY", "ERR"), + ])); + + // Should match: unit.service matches *.service + let mut record1 = HashMap::new(); + record1.insert(String::from(SYSTEMD_UNIT), String::from("unit.service")); + assert!(contains_match(&record1, &matches)); + + // Should match: any.service matches *.service + let mut record2 = HashMap::new(); + record2.insert(String::from(SYSTEMD_UNIT), String::from("any.service")); + assert!(contains_match(&record2, &matches)); + + // Should NOT match: sysinit.target does not match *.service + let mut record3 = HashMap::new(); + record3.insert(String::from(SYSTEMD_UNIT), String::from("sysinit.target")); + assert!(!contains_match(&record3, &matches)); + + // Should match: PRIORITY=ERR matches exactly + let mut record4 = HashMap::new(); + record4.insert(String::from("PRIORITY"), String::from("ERR")); + assert!(contains_match(&record4, &matches)); + + // Test ? wildcard: matches single character + let matches_question = + compile_test_matches(&create_matches(vec![(SYSTEMD_UNIT, "uni?.service")])); + let mut record5 = HashMap::new(); + record5.insert(String::from(SYSTEMD_UNIT), String::from("unit.service")); + assert!(contains_match(&record5, &matches_question)); + + // Test [abc] character class + let matches_class = + compile_test_matches(&create_matches(vec![(SYSTEMD_UNIT, "[ub]nit.service")])); + assert!(contains_match(&record5, &matches_class)); // matches 'unit.service' + } }