diff --git a/src/sources/demo_logs.rs b/src/sources/demo_logs.rs index 2fa543353..8b8438e42 100644 --- a/src/sources/demo_logs.rs +++ b/src/sources/demo_logs.rs @@ -1,3 +1,4 @@ +use std::fmt; use chrono::Utc; use fakedata::logs::*; use futures::StreamExt; @@ -97,6 +98,10 @@ pub enum DemoLogsConfigError { ShuffleDemoLogsItemsEmpty, #[snafu(display("A non-empty sample log file is required for sample file format"))] SampleFileDemoLogsEmpty, + #[snafu(display("A non-empty time format is required for sample file format"))] + SampleFileTimeFormatEmpty, + #[snafu(display("time format provided is invalid for sample file format"))] + SampleFileTimeFormatInvalid, #[snafu(display("Could not open sample file"))] SampleFileOpenFailed { message: String, @@ -223,7 +228,14 @@ impl OutputFormat { GenCtx::TimeJoin { data } => { let (time_prefix, time_suffix) = &data[n % data.len()]; let now = Local::now(); - let timestamp = now.format(&time_format).to_string(); + + let timestamp = match try_format_timestamp(&now, time_format) { + Some(ts) => ts, + None => { + warn!("Failed to format timestamp with provided time_format, falling back to RFC3339"); + now.to_rfc3339() + } + }; format!("{}{}{}", time_prefix, timestamp, time_suffix) } GenCtx::None => { @@ -264,11 +276,33 @@ impl OutputFormat { Ok(()) } } + Self::SampleFile {path, time_format} => { + if path.is_empty() { + return Err(DemoLogsConfigError::SampleFileDemoLogsEmpty); + } else if time_format.is_empty() { + return Err(DemoLogsConfigError::SampleFileTimeFormatEmpty); + } + + match try_format_timestamp(&Local::now(), time_format) { + Some(_) => Ok(()), + None => Err(DemoLogsConfigError::SampleFileTimeFormatInvalid), + } + } _ => Ok(()), } } } +fn try_format_timestamp(time: &chrono::DateTime, time_format: &str) -> Option { + let fmt_obj = time.format(time_format); // bind formatter so it outlives use + let mut buf = String::new(); + if fmt::write(&mut buf, format_args!("{}", fmt_obj)).is_ok() { + Some(buf) + } else { + None + } +} + impl DemoLogsConfig { #[cfg(test)] pub fn repeat( @@ -552,6 +586,68 @@ mod tests { } } + #[tokio::test] + async fn test_sample_file_generate_reads_syslog_lines_fallback() { + let mut tempfile = NamedTempFile::new().unwrap(); + let mut wtr = WriterBuilder::new() + .has_headers(true) + .flexible(false) + .quote_style(csv::QuoteStyle::NonNumeric) + .from_writer(&mut tempfile); + + let syslog_lines= vec![ + // prefix empty + ("", " myhost sshd[1234]: Accepted password for user1 from 192.168.1.10 port 54321 ssh2"), + // suffix empty + ("myhost sshd[1234]: Accepted password for user1 from 192.168.1.10 port 54321 ssh2 time: ", ""), + // both prefix and suffix empty + ("", ""), + ("Timestamp: ", " myhost systemd[1]: Started Session 42 of user root."), + ("Time: ", " myhost kernel: [123456.789012] eth0: link up, 1000 Mbps, full-duplex"), + ]; + let expected_log_patterns = [ + "%Y-%m-%dT%H:%M:%S%.f%:z myhost sshd[1234]: Accepted password for user1 from 192.168.1.10 port 54321 ssh2", + "myhost sshd[1234]: Accepted password for user1 from 192.168.1.10 port 54321 ssh2 time: %Y-%m-%dT%H:%M:%S%.f%:z", + "%Y-%m-%dT%H:%M:%S%.f%:z", + "Timestamp: %Y-%m-%dT%H:%M:%S%.f%:z myhost systemd[1]: Started Session 42 of user root.", + "Time: %Y-%m-%dT%H:%M:%S%.f%:z myhost kernel: [123456.789012] eth0: link up, 1000 Mbps, full-duplex", + ]; + + wtr.write_record(&["prefix", "suffix"]).unwrap(); + for (prefix, suffix) in &syslog_lines { + wtr.write_record(&[prefix, suffix]).unwrap(); + } + wtr.flush().unwrap(); + mem::drop(wtr); + + let path = tempfile.path().to_string_lossy(); + let pattern = "%s.%3N"; + let message_key = log_schema().message_key().unwrap().to_string(); + let demo_log_config = DemoLogsConfig { + format: OutputFormat::SampleFile { + path: (&path).to_string(), + time_format: pattern.to_string(), + }, + count: 8, + ..DemoLogsConfig::default() + }; + let toml_string = toml::to_string(&demo_log_config).unwrap(); + + let mut rx = runit(toml_string.as_str()).await; + + let length = expected_log_patterns.len(); + for num in 0..5 { + let event = match poll!(rx.next()) { + Poll::Ready(event) => event.unwrap(), + _ => unreachable!(), + }; + let log = event.as_log(); + let message = log[&message_key].to_string_lossy(); + let expected_log_pattern = expected_log_patterns[num%length]; + validate_log_line_with_timestamp(expected_log_pattern, &&*message); + } + } + #[test] fn config_sample_file_generate_empty_file() { let temp_file = NamedTempFile::new().expect("failed to create temp file"); @@ -615,6 +711,54 @@ mod tests { ); } + #[test] + fn config_sample_file_path_not_empty() { + let errant_config = DemoLogsConfig { + format: OutputFormat::SampleFile { + path: "/path/to/file".to_string(), + time_format: "".to_string(), + }, + ..DemoLogsConfig::default() + }; + + assert_eq!( + errant_config.format.validate(), + Err(DemoLogsConfigError::SampleFileTimeFormatEmpty) + ); + } + + #[test] + fn config_sample_file_time_format_not_empty() { + let errant_config = DemoLogsConfig { + format: OutputFormat::SampleFile { + path: "".to_string(), + time_format: "%y-%m-%d %H:%M:%S".to_string(), + }, + ..DemoLogsConfig::default() + }; + + assert_eq!( + errant_config.format.validate(), + Err(DemoLogsConfigError::SampleFileDemoLogsEmpty) + ); + } + + #[test] + fn config_sample_file_time_format_invalid() { + let errant_config = DemoLogsConfig { + format: OutputFormat::SampleFile { + path: "/path/to/file".to_string(), + time_format: "%s.%3N".to_string(), + }, + ..DemoLogsConfig::default() + }; + + assert_eq!( + errant_config.format.validate(), + Err(DemoLogsConfigError::SampleFileTimeFormatInvalid) + ); + } + #[tokio::test] async fn shuffle_demo_logs_copies_lines() { let message_key = log_schema().message_key().unwrap().to_string();