From 1a833d200730db924180a797610b84871bc461ac Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Thu, 20 Mar 2025 19:33:12 +0300 Subject: [PATCH 1/4] Implement custom field names --- README.md | 130 +++++++- .../ru/d10xa/jsonlogviewer/ViewElement.scala | 3 +- .../decline/yaml/ConfigYamlLoaderImpl.scala | 43 ++- .../decline/yaml/ConfigYamlLoaderTest.scala | 98 +++++- .../jsonlogviewer/JsonLogLineParser.scala | 59 ++-- .../d10xa/jsonlogviewer/LogLineFilter.scala | 12 +- .../d10xa/jsonlogviewer/LogViewerStream.scala | 276 +++++++++-------- .../d10xa/jsonlogviewer/ParseResultKeys.scala | 45 +-- .../jsonlogviewer/config/ResolvedConfig.scala | 164 ++++++++++ .../jsonlogviewer/csv/CsvLogLineParser.scala | 16 +- .../d10xa/jsonlogviewer/decline/Config.scala | 1 + .../jsonlogviewer/decline/DeclineOpts.scala | 98 ++++-- .../decline/FieldNamesConfig.scala | 10 + .../decline/TimestampConfig.scala | 1 - .../decline/yaml/ConfigYaml.scala | 3 +- .../jsonlogviewer/decline/yaml/Feed.scala | 3 +- .../decline/yaml/FieldNames.scala | 10 + .../formatout/ColorLineFormatter.scala | 101 ++++--- .../logfmt/LogfmtLogLineParser.scala | 18 +- .../jsonlogviewer/FieldNamesMergeTest.scala | 59 ++++ .../LogViewerStreamIntegrationTest.scala | 282 ++++++++++++++++++ .../LogViewerStreamLiveReloadTest.scala | 158 ++++++++++ .../ParseResultKeysGetByKeyTest.scala | 115 +++++++ .../jsonlogviewer/ParseResultKeysTest.scala | 130 ++++++++ .../jsonlogviewer/TimestampFilterTest.scala | 2 +- .../config/ConfigResolverTest.scala | 144 +++++++++ .../csv/CsvLogLineParserTest.scala | 108 +++++-- .../jsonlogviewer/csv/CsvProcessingTest.scala | 110 +++++++ .../query/LogLineQueryPredicateImplTest.scala | 34 ++- 29 files changed, 1924 insertions(+), 309 deletions(-) create mode 100644 json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/config/ResolvedConfig.scala create mode 100644 json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FieldNamesConfig.scala create mode 100644 json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/FieldNames.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/FieldNamesMergeTest.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamLiveReloadTest.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysGetByKeyTest.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysTest.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/config/ConfigResolverTest.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvProcessingTest.scala diff --git a/README.md b/README.md index 54b7a95..5116332 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,15 @@ Below is an example of formatted JSON logs in the command-line interface: - **Multi-Source Input** Combine the outputs of multiple commands into a single unified log stream. +- **Multiple Input Formats** + - **JSON** (default): Process standard JSON log formats + - **Logfmt**: Support for key-value pair log formats + - **CSV**: Parse and analyze CSV-formatted logs with headers + - **Advanced Filtering** - Apply regular expressions to extract relevant log entries. - Use SQL-like queries to filter and query JSON fields. + - Filter logs by timestamp ranges. - **YAML Configuration** Define input streams and configurations using YAML files. @@ -49,6 +55,9 @@ Below is an example of formatted JSON logs in the command-line interface: - **Integration with k9s** Seamlessly integrate with the k9s Kubernetes CLI tool to visualize logs directly within k9s. +- **Custom Field Mapping** + Configure custom field names mapping to work with non-standard log formats. + ## Installation ### Requirements @@ -82,6 +91,28 @@ json-log-viewer --help cat log.txt | json-log-viewer --filter "level = 'ERROR'" ``` +### Working with Different Log Formats + +#### JSON (Default) +For standard JSON logs: +```bash +cat json-logs.txt | json-log-viewer +``` + +#### Logfmt +For logs in logfmt format: +```bash +cat logfmt-logs.txt | json-log-viewer --format-in logfmt +``` + +#### CSV +For CSV-formatted logs (requires header row): +```bash +cat csv-logs.csv | json-log-viewer --format-in csv +``` + +Note: CSV format requires a header row with column names. The tool will map these column names to standard log fields. + ### SQL Filtering `json-log-viewer` supports SQL-like filtering for JSON fields, allowing precise log analysis. @@ -124,6 +155,20 @@ You can use comparison and logical operations. cat log.txt | json-log-viewer --filter "(level = 'ERROR' OR level = 'WARN') AND message LIKE '%connection%'" ``` +### Timestamp Filtering + +Filter logs by timestamp range: + +```bash +cat log.txt | json-log-viewer --timestamp-after 2024-01-01T00:00:00Z --timestamp-before 2024-01-31T23:59:59Z +``` + +You can also specify a custom timestamp field: + +```bash +cat log.txt | json-log-viewer --timestamp-field time +``` + ## Configuration `json-log-viewer` supports defining input streams, filters, and other settings using a YAML configuration file. @@ -139,7 +184,34 @@ Each feed represents a log source and can have the following attributes: - **formatIn** (optional): Input log format. Supported values: - `json` (default). - `logfmt`. + - `csv`. - **rawInclude** and **rawExclude** (optional): Lists of regular expressions to include or exclude from processing. +- **excludeFields** (optional): List of fields to exclude from output. +- **fieldNames** (optional): Custom mapping for field names, helpful when working with non-standard log formats. + +### Custom Field Mapping + +You can define custom field name mappings either globally or per feed: + +```yaml +# Global field mapping +fieldNames: + timestamp: "ts" + level: "severity" + message: "msg" + stackTrace: "error" + loggerName: "logger" + threadName: "thread" + +feeds: + - name: "application-logs" + commands: + - cat log1.txt + # Feed-specific field mapping (overrides global mapping) + fieldNames: + timestamp: "time" + level: "priority" +``` ### Example Configuration File @@ -155,12 +227,21 @@ feeds: - "ERROR" rawExclude: - "DEBUG" + excludeFields: + - "thread_name" - name: "application-2-logs" commands: - cat log2.txt filter: | message NOT LIKE '%heartbeat%' formatIn: logfmt + - name: "csv-logs" + commands: + - cat logs.csv + formatIn: csv + fieldNames: + timestamp: "time" + level: "severity" ``` #### Running with a Configuration File @@ -186,27 +267,56 @@ json-log-viewer --config-file json-log-viewer.yml ```bash cat log.txt | json-log-viewer --config-file json-log-viewer.yml ``` - -- **--format-in**: Specify the input log format (supported formats: json, logfmt). + +- **--format-in**: Specify the input log format (supported formats: json, logfmt, csv). ```bash cat log.txt | json-log-viewer --format-in logfmt ``` - + - **--format-out**: Specify the output format (supported formats: pretty, raw). ```bash cat log.txt | json-log-viewer --format-out raw ``` - + - **--timestamp-after** and **--timestamp-before**: Filter logs by a specific time range. ```bash cat log.txt | json-log-viewer --timestamp-after 2024-01-01T00:00:00Z --timestamp-before 2024-01-31T23:59:59Z ``` - + - **--timestamp-field**: Specify the field name for timestamps (default: @timestamp). ```bash json-log-viewer --timestamp-field time ``` +#### Field Name Options + +You can override the default field names to work with non-standard log formats: + +- **--level-field**: Override default level field name (default: level). + ```bash + json-log-viewer --level-field severity + ``` + +- **--message-field**: Override default message field name (default: message). + ```bash + json-log-viewer --message-field msg + ``` + +- **--stack-trace-field**: Override default stack trace field name (default: stack_trace). + ```bash + json-log-viewer --stack-trace-field exception + ``` + +- **--logger-name-field**: Override default logger name field name (default: logger_name). + ```bash + json-log-viewer --logger-name-field logger + ``` + +- **--thread-name-field**: Override default thread name field name (default: thread_name). + ```bash + json-log-viewer --thread-name-field thread + ``` + ## k9s Plugin Integrate json-log-viewer with k9s to view formatted JSON logs directly within the k9s interface. @@ -256,7 +366,7 @@ plugins: ## Development -This section provides instructions for building and running +This section provides instructions for building and running both the JVM and JavaScript versions of `json-log-viewer`. It also includes notes for working on the `frontend-laminar` module. @@ -267,7 +377,7 @@ Ensure you have the following installed on your system: - sbt (Scala Build Tool) ### Building the JVM Version -To build the JVM version of the project, use the command: +To build the JVM version of the project, use the command: ```bash sbt stage @@ -276,7 +386,7 @@ sbt stage This compiles the code and prepares the executable under the `jvm/target/universal/stage/bin/` directory. ### Running the JVM Version -Run the application with: +Run the application with: ```bash cat log.txt | ./json-log-viewer/jvm/target/universal/stage/bin/json-log-viewer ``` @@ -293,7 +403,7 @@ To build the JavaScript version, you can use one of the following options: ``` This generates a production-ready JavaScript file located at: `frontend-laminar/target/scala-3.6.2/frontend-laminar-opt/main.js` -2. **Fast Development Build**: Use the command: +2. **Fast Development Build**: Use the command: ```bash sbt fastLinkJS ``` @@ -306,4 +416,4 @@ Choose the appropriate option based on your needs: ## License -This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for full details. +This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for full details. \ No newline at end of file diff --git a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala index d8947b2..2dea52a 100644 --- a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala +++ b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala @@ -36,7 +36,8 @@ object ViewElement { formatIn = config.formatIn, rawInclude = None, rawExclude = None, - excludeFields = None + excludeFields = None, + fieldNames = None ) ) ) diff --git a/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala b/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala index 2d227e2..6d265c9 100644 --- a/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala +++ b/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala @@ -128,6 +128,42 @@ class ConfigYamlLoaderImpl extends ConfigYamlLoader { Validated.valid(None) } + private def parseOptionalFieldNames( + fields: Map[String, Json], + fieldName: String + ): ValidatedNel[String, Option[FieldNames]] = + fields.get(fieldName) match { + case Some(jsonValue) => + jsonValue.asObject.map(_.toMap) match { + case None => + Validated.invalidNel( + s"Invalid '$fieldName' field format, should be an object" + ) + case Some(fieldNamesFields) => + val timestampValidated = + parseOptionalString(fieldNamesFields, "timestamp") + val levelValidated = parseOptionalString(fieldNamesFields, "level") + val messageValidated = + parseOptionalString(fieldNamesFields, "message") + val stackTraceValidated = + parseOptionalString(fieldNamesFields, "stackTrace") + val loggerNameValidated = + parseOptionalString(fieldNamesFields, "loggerName") + val threadNameValidated = + parseOptionalString(fieldNamesFields, "threadName") + + ( + timestampValidated, + levelValidated, + messageValidated, + stackTraceValidated, + loggerNameValidated, + threadNameValidated + ).mapN(FieldNames.apply).map(Some(_)) + } + case None => Validated.valid(None) + } + private def parseFeed(feedJson: Json): ValidatedNel[String, Feed] = feedJson.asObject.map(_.toMap) match { case None => Validated.invalidNel("Feed entry is not a valid JSON object") @@ -143,6 +179,8 @@ class ConfigYamlLoaderImpl extends ConfigYamlLoader { val formatInValidated : Validated[NonEmptyList[String], Option[FormatIn]] = parseOptionalFormatIn(feedFields, "formatIn") + val fieldNamesValidated = + parseOptionalFieldNames(feedFields, "fieldNames") val rawIncludeValidated = parseOptionalListString(feedFields, "rawInclude") val rawExcludeValidated = @@ -158,6 +196,7 @@ class ConfigYamlLoaderImpl extends ConfigYamlLoader { inlineInputValidated, filterValidated, formatInValidated, + fieldNamesValidated, rawIncludeValidated, rawExcludeValidated, excludeFieldsValidated @@ -182,7 +221,9 @@ class ConfigYamlLoaderImpl extends ConfigYamlLoader { case Some(fields) => val feedsValidated: ValidatedNel[String, Option[List[Feed]]] = parseOptionalFeeds(fields, "feeds") - feedsValidated.map(ConfigYaml.apply) + val fieldNamesValidated = + parseOptionalFieldNames(fields, "fieldNames") + (fieldNamesValidated, feedsValidated).mapN(ConfigYaml.apply) } } } diff --git a/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala b/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala index 18e5357..ec6cd82 100644 --- a/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala +++ b/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala @@ -1,11 +1,8 @@ package ru.d10xa.jsonlogviewer.decline.yaml -import cats.data.Validated import munit.FunSuite -import ru.d10xa.jsonlogviewer.decline.Config.FormatIn import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYamlLoader -import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYamlLoaderImpl -import ru.d10xa.jsonlogviewer.query.QueryAST +import ru.d10xa.jsonlogviewer.decline.Config.FormatIn class ConfigYamlLoaderTest extends FunSuite { @@ -58,6 +55,7 @@ class ConfigYamlLoaderTest extends FunSuite { val config = result.toOption.get assert(config.feeds.isEmpty) + assert(config.fieldNames.isEmpty) } test("parse invalid yaml") { @@ -68,7 +66,11 @@ class ConfigYamlLoaderTest extends FunSuite { assert(result.isInvalid, s"Result should be invalid: $result") val errors = result.swap.toOption.get - assert(errors.exists(_.contains("Invalid 'feeds' field format, should be a list"))) + assert( + errors.exists( + _.contains("Invalid 'feeds' field format, should be a list") + ) + ) } test("parse valid yaml with excludeFields") { @@ -107,4 +109,90 @@ class ConfigYamlLoaderTest extends FunSuite { assertEquals(feed2.name, Some("service-logs")) assertEquals(feed2.excludeFields, Some(List("@timestamp"))) } + + test("parse yaml with fieldNames") { + val yaml = + """|fieldNames: + | timestamp: "ts" + | level: "severity" + | message: "text" + | stackTrace: "error" + | loggerName: "logger" + | threadName: "thread" + |feeds: + | - name: "pod-logs" + | commands: + | - "./mock-logs.sh pod1" + | - name: "service-logs" + | commands: + | - "./mock-logs.sh service1" + | fieldNames: + | timestamp: "time" + | level: "priority" + |""".stripMargin + + val result = configYamlLoader.parseYamlFile(yaml) + assert(result.isValid, s"Result should be valid: $result") + + val config = result.toOption.get + + // Check global fieldNames + val fieldNames = config.fieldNames.get + assertEquals(fieldNames.timestamp, Some("ts")) + assertEquals(fieldNames.level, Some("severity")) + assertEquals(fieldNames.message, Some("text")) + assertEquals(fieldNames.stackTrace, Some("error")) + assertEquals(fieldNames.loggerName, Some("logger")) + assertEquals(fieldNames.threadName, Some("thread")) + + // Check feed-specific fieldNames + val feeds = config.feeds.get + assertEquals(feeds.size, 2) + + val feed1 = feeds.head + assertEquals(feed1.name, Some("pod-logs")) + assertEquals(feed1.fieldNames, None) + + val feed2 = feeds(1) + assertEquals(feed2.name, Some("service-logs")) + assert(feed2.fieldNames.isDefined) + assertEquals(feed2.fieldNames.get.timestamp, Some("time")) + assertEquals(feed2.fieldNames.get.level, Some("priority")) + } + + test("parse yaml with partial fieldNames") { + val yaml = + """|fieldNames: + | timestamp: "ts" + | level: "severity" + |feeds: + | - name: "service-logs" + | commands: + | - "./mock-logs.sh service1" + | fieldNames: + | message: "content" + |""".stripMargin + + val result = configYamlLoader.parseYamlFile(yaml) + assert(result.isValid, s"Result should be valid: $result") + + val config = result.toOption.get + + // Check global fieldNames + val fieldNames = config.fieldNames.get + assertEquals(fieldNames.timestamp, Some("ts")) + assertEquals(fieldNames.level, Some("severity")) + assertEquals(fieldNames.message, None) + + // Check feed-specific fieldNames + val feeds = config.feeds.get + val feed = feeds.head + assertEquals(feed.name, Some("service-logs")) + assert(feed.fieldNames.isDefined) + assertEquals(feed.fieldNames.get.message, Some("content")) + assertEquals( + feed.fieldNames.get.timestamp, + None + ) + } } diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/JsonLogLineParser.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/JsonLogLineParser.scala index d8fd8d0..c11b1ea 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/JsonLogLineParser.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/JsonLogLineParser.scala @@ -1,27 +1,37 @@ package ru.d10xa.jsonlogviewer -import io.circe.Json +import cats.syntax.all.* +import io.circe.parser.* import io.circe.Decoder import io.circe.HCursor -import io.circe.parser.* -import cats.syntax.all.* -import HardcodedFieldNames.* -import ru.d10xa.jsonlogviewer.decline.Config +import io.circe.Json +import ru.d10xa.jsonlogviewer.config.ResolvedConfig -class JsonLogLineParser(config: Config, jsonPrefixPostfix: JsonPrefixPostfix) - extends LogLineParser { +class JsonLogLineParser(config: ResolvedConfig, jsonPrefixPostfix: JsonPrefixPostfix) + extends LogLineParser: given Decoder[ParsedLine] = (c: HCursor) => - val timestampFieldName = config.timestamp.fieldName + val timestampFieldName = config.fieldNames.timestampFieldName + val levelFieldName = config.fieldNames.levelFieldName + val messageFieldName = config.fieldNames.messageFieldName + val stackTraceFieldName = config.fieldNames.stackTraceFieldName + val loggerNameFieldName = config.fieldNames.loggerNameFieldName + val threadNameFieldName = config.fieldNames.threadNameFieldName - val knownFieldNames = Seq( - timestampFieldName, - levelFieldName, - messageFieldName, - stackTraceFieldName, - loggerNameFieldName, - threadNameFieldName + val knownFieldNames = Set( + timestampFieldName, "@timestamp", + levelFieldName, "level", + messageFieldName, "message", + stackTraceFieldName, "stack_trace", + loggerNameFieldName, "logger_name", + threadNameFieldName, "thread_name" ) + // Function to find value by multiple possible keys + def findByKeys(keys: String*): Option[String] = + keys.flatMap { key => + c.downField(key).as[Option[String]].getOrElse(None) + }.headOption + def mapOtherAttributes(m: Map[String, Json]): Map[String, String] = m.view .mapValues { v => @@ -35,15 +45,16 @@ class JsonLogLineParser(config: Config, jsonPrefixPostfix: JsonPrefixPostfix) ) } .toMap - .--(knownFieldNames) + .filter { case (k, _) => !knownFieldNames.contains(k) } for - timestampOpt <- c.downField(timestampFieldName).as[Option[String]] - levelOpt <- c.downField(levelFieldName).as[Option[String]] - messageOpt <- c.downField(messageFieldName).as[Option[String]] - stackTraceOpt <- c.downField(stackTraceFieldName).as[Option[String]] - loggerNameOpt <- c.downField(loggerNameFieldName).as[Option[String]] - threadNameOpt <- c.downField(threadNameFieldName).as[Option[String]] + // Check both standard and configured field names + timestampOpt <- Either.right(findByKeys("@timestamp", "timestamp", timestampFieldName)) + levelOpt <- Either.right(findByKeys("level", levelFieldName)) + messageOpt <- Either.right(findByKeys("message", messageFieldName)) + stackTraceOpt <- Either.right(findByKeys("stack_trace", stackTraceFieldName)) + loggerNameOpt <- Either.right(findByKeys("logger_name", loggerNameFieldName)) + threadNameOpt <- Either.right(findByKeys("thread_name", threadNameFieldName)) attributes <- c .as[Map[String, Json]] .map(mapOtherAttributes) @@ -76,6 +87,4 @@ class JsonLogLineParser(config: Config, jsonPrefixPostfix: JsonPrefixPostfix) prefix = prefixOpt, postfix = postfixOpt ) - ) - -} + ) \ No newline at end of file diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogLineFilter.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogLineFilter.scala index 968a7c6..b9de98b 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogLineFilter.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogLineFilter.scala @@ -1,9 +1,8 @@ package ru.d10xa.jsonlogviewer -import ru.d10xa.jsonlogviewer.decline.Config -import ru.d10xa.jsonlogviewer.decline.Config.ConfigGrep +import ru.d10xa.jsonlogviewer.config.ResolvedConfig -class LogLineFilter(config: Config, parseResultKeys: ParseResultKeys) { +class LogLineFilter(config: ResolvedConfig, parseResultKeys: ParseResultKeys) { def logLineQueryPredicate(line: ParseResult): Boolean = config.filter match @@ -15,10 +14,11 @@ class LogLineFilter(config: Config, parseResultKeys: ParseResultKeys) { parseResult: ParseResult ): Boolean = config.grep - .map { case ConfigGrep(grepKey, regex) => - parseResultKeys.getByKey(parseResult, grepKey).exists(regex.matches) + .map { grepConfig => + parseResultKeys + .getByKey(parseResult, grepConfig.key) + .exists(grepConfig.value.matches) } match case Nil => true case list => list.reduce(_ || _) - } diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala index 3d3b057..5394305 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala @@ -4,9 +4,10 @@ import cats.effect.IO import cats.effect.Ref import fs2.* import fs2.Pull +import ru.d10xa.jsonlogviewer.config.ConfigResolver +import ru.d10xa.jsonlogviewer.config.ResolvedConfig import ru.d10xa.jsonlogviewer.csv.CsvLogLineParser import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml -import ru.d10xa.jsonlogviewer.decline.yaml.Feed import ru.d10xa.jsonlogviewer.decline.Config import ru.d10xa.jsonlogviewer.decline.Config.FormatIn import ru.d10xa.jsonlogviewer.formatout.ColorLineFormatter @@ -21,126 +22,178 @@ import scala.util.Try object LogViewerStream { - private val stdinLinesStream: Stream[IO, String] = - new StdInLinesStreamImpl().stdinLinesStream + private var stdInLinesStreamImpl: StdInLinesStream = + new StdInLinesStreamImpl() + + def getStdInLinesStreamImpl: StdInLinesStream = stdInLinesStreamImpl + + def setStdInLinesStreamImpl(impl: StdInLinesStream): Unit = + stdInLinesStreamImpl = impl + + private def stdinLinesStream: Stream[IO, String] = + stdInLinesStreamImpl.stdinLinesStream def stream( config: Config, configYamlRef: Ref[IO, Option[ConfigYaml]] - ): Stream[IO, String] = - Stream.eval(configYamlRef.get).flatMap { configYamlOpt => - val feedsOpt: Option[List[Feed]] = - configYamlOpt.flatMap(_.feeds).filter(_.nonEmpty) - - val finalStream = feedsOpt match { - case Some(feeds) => - val feedStreams = feeds.zipWithIndex.map { (feed, index) => - val feedStream: Stream[IO, String] = - commandsAndInlineInputToStream(feed.commands, feed.inlineInput) - - createProcessStream( - config = config, - lines = feedStream, - configYamlRef = configYamlRef, - index = index, - initialFormatIn = feed.formatIn.orElse(config.formatIn) - ) + ): Stream[IO, String] = { + def processStreamWithConfig( + inputStream: Stream[IO, String], + resolvedConfig: ResolvedConfig + ): Stream[IO, String] = + if (resolvedConfig.formatIn.contains(FormatIn.Csv)) { + createCsvProcessStream(resolvedConfig, inputStream) + } else { + inputStream.flatMap { line => + Stream.eval(configYamlRef.get).flatMap { currentConfigYaml => + processLineWithConfig(line, currentConfigYaml, config) } - Stream.emits(feedStreams).parJoin(feedStreams.size) - case None => - createProcessStream( - config = config, - lines = stdinLinesStream, - configYamlRef = configYamlRef, - index = -1, - initialFormatIn = config.formatIn + } + } + + Stream.eval(configYamlRef.get).flatMap { initialConfigYaml => + val resolvedConfigs = ConfigResolver.resolve(config, initialConfigYaml) + + val finalStream = if (resolvedConfigs.isEmpty) { + Stream.empty + } else if (resolvedConfigs.length > 1) { + val feedStreams = resolvedConfigs.map { resolvedConfig => + val feedStream = commandsAndInlineInputToStream( + resolvedConfig.commands, + resolvedConfig.inlineInput ) + processStreamWithConfig(feedStream, resolvedConfig) + } + Stream.emits(feedStreams).parJoin(feedStreams.size) + } else { + processStreamWithConfig(stdinLinesStream, resolvedConfigs.head) } - finalStream - .intersperse("\n") - .append(Stream.emit("\n")) + finalStream.intersperse("\n").append(Stream.emit("\n")) } + } - private def createProcessStream( - config: Config, - lines: Stream[IO, String], + def processLineWithRef( + line: String, configYamlRef: Ref[IO, Option[ConfigYaml]], - index: Int, - initialFormatIn: Option[FormatIn] + config: Config ): Stream[IO, String] = - if (initialFormatIn.contains(FormatIn.Csv)) { - lines.pull.uncons1.flatMap { - case Some((headerLine, rest)) => - val csvHeaderParser = CsvLogLineParser(config, headerLine) - processStreamWithEffectiveConfig( - config = config, - lines = rest, - configYamlRef = configYamlRef, - index = index, - parser = Some(csvHeaderParser) - ).pull.echo - case None => - Pull.done - }.stream - } else { - processStreamWithEffectiveConfig( - config = config, - lines = lines, - configYamlRef = configYamlRef, - index = index, - parser = None - ) + Stream.eval(configYamlRef.get).flatMap { configYaml => + processLineWithConfig(line, configYaml, config) } - private def processStreamWithEffectiveConfig( - config: Config, - lines: Stream[IO, String], - configYamlRef: Ref[IO, Option[ConfigYaml]], - index: Int, - parser: Option[LogLineParser] - ): Stream[IO, String] = - for { - line <- lines - optConfigYaml <- Stream.eval(configYamlRef.get) - - feedConfig = extractFeedConfig(optConfigYaml, index) - - effectiveConfig = config.copy( - filter = feedConfig.filter.orElse(config.filter), - formatIn = feedConfig.formatIn.orElse(config.formatIn) - ) + def processLineWithConfig( + line: String, + configYaml: Option[ConfigYaml], + config: Config + ): Stream[IO, String] = { + val resolvedConfigs = ConfigResolver.resolve(config, configYaml) + + if (resolvedConfigs.isEmpty) { + Stream.empty + } else if (resolvedConfigs.length > 1) { + val results = resolvedConfigs.map { resolvedConfig => + processLineWithResolvedConfig(line, resolvedConfig) + } + Stream.emits(results).parJoinUnbounded + } else { + processLineWithResolvedConfig(line, resolvedConfigs.head) + } + } - timestampFilter = TimestampFilter() - parseResultKeys = ParseResultKeys(effectiveConfig) - logLineFilter = LogLineFilter(effectiveConfig, parseResultKeys) + def processLineWithResolvedConfig( + line: String, + resolvedConfig: ResolvedConfig + ): Stream[IO, String] = { + val getParser: IO[LogLineParser] = + if (resolvedConfig.formatIn.contains(FormatIn.Csv)) { + IO.raiseError( + new IllegalStateException( + "CSV format requires header line, cannot process a single line" + ) + ) + } else { + IO.pure(makeNonCsvLogLineParser(resolvedConfig)) + } - logLineParser = parser.getOrElse( - makeNonCsvLogLineParser(effectiveConfig, feedConfig.formatIn) - ) + Stream.eval(getParser).flatMap { parser => + val timestampFilter = TimestampFilter() + val parseResultKeys = ParseResultKeys(resolvedConfig) + val logLineFilter = LogLineFilter(resolvedConfig, parseResultKeys) - outputLineFormatter = effectiveConfig.formatOut match { + val outputLineFormatter = resolvedConfig.formatOut match { case Some(Config.FormatOut.Raw) => RawFormatter() case Some(Config.FormatOut.Pretty) | None => - ColorLineFormatter(effectiveConfig, feedConfig.feedName, feedConfig.excludeFields) + ColorLineFormatter( + resolvedConfig, + resolvedConfig.feedName, + resolvedConfig.excludeFields + ) } - evaluatedLine <- Stream + Stream .emit(line) - .filter(rawFilter(_, feedConfig.rawInclude, feedConfig.rawExclude)) - .map(logLineParser.parse) + .filter( + rawFilter(_, resolvedConfig.rawInclude, resolvedConfig.rawExclude) + ) + .map(parser.parse) .filter(logLineFilter.grep) .filter(logLineFilter.logLineQueryPredicate) .through( - timestampFilter.filterTimestampAfter(effectiveConfig.timestamp.after) + timestampFilter.filterTimestampAfter(resolvedConfig.timestampAfter) ) .through( timestampFilter.filterTimestampBefore( - effectiveConfig.timestamp.before + resolvedConfig.timestampBefore ) ) .map(formatWithSafety(_, outputLineFormatter)) - } yield evaluatedLine + } + } + + private def createCsvProcessStream( + resolvedConfig: ResolvedConfig, + lines: Stream[IO, String] + ): Stream[IO, String] = + lines.pull.uncons1.flatMap { + case Some((headerLine, rest)) => + val csvHeaderParser = CsvLogLineParser(resolvedConfig, headerLine) + + val timestampFilter = TimestampFilter() + val parseResultKeys = ParseResultKeys(resolvedConfig) + val logLineFilter = LogLineFilter(resolvedConfig, parseResultKeys) + + val outputLineFormatter = resolvedConfig.formatOut match { + case Some(Config.FormatOut.Raw) => RawFormatter() + case Some(Config.FormatOut.Pretty) | None => + ColorLineFormatter( + resolvedConfig, + resolvedConfig.feedName, + resolvedConfig.excludeFields + ) + } + + rest + .filter( + rawFilter(_, resolvedConfig.rawInclude, resolvedConfig.rawExclude) + ) + .map(csvHeaderParser.parse) + .filter(logLineFilter.grep) + .filter(logLineFilter.logLineQueryPredicate) + .through( + timestampFilter.filterTimestampAfter(resolvedConfig.timestampAfter) + ) + .through( + timestampFilter.filterTimestampBefore( + resolvedConfig.timestampBefore + ) + ) + .map(formatWithSafety(_, outputLineFormatter)) + .pull + .echo + case None => + Pull.done + }.stream private def formatWithSafety( parseResult: ParseResult, @@ -151,34 +204,6 @@ object LogViewerStream { case Failure(_) => parseResult.raw } - // TODO - private case class FeedConfig( - feedName: Option[String], - filter: Option[ru.d10xa.jsonlogviewer.query.QueryAST], - formatIn: Option[FormatIn], - rawInclude: Option[List[String]], - rawExclude: Option[List[String]], - excludeFields: Option[List[String]] - ) - - private def extractFeedConfig( - optConfigYaml: Option[ConfigYaml], - index: Int - ): FeedConfig = { - val feedOpt = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index)) - - FeedConfig( - feedName = feedOpt.flatMap(_.name), - filter = feedOpt.flatMap(_.filter), - formatIn = feedOpt.flatMap(_.formatIn), - rawInclude = feedOpt.flatMap(_.rawInclude), - rawExclude = feedOpt.flatMap(_.rawExclude), - excludeFields = feedOpt.flatMap(_.excludeFields) - ) - } - private def commandsAndInlineInputToStream( commands: List[String], inlineInput: Option[String] @@ -186,17 +211,16 @@ object LogViewerStream { new ShellImpl().mergeCommandsAndInlineInput(commands, inlineInput) def makeNonCsvLogLineParser( - config: Config, - optFormatIn: Option[FormatIn] + resolvedConfig: ResolvedConfig ): LogLineParser = { val jsonPrefixPostfix = JsonPrefixPostfix(JsonDetector()) - optFormatIn match { - case Some(FormatIn.Logfmt) => LogfmtLogLineParser(config) + resolvedConfig.formatIn match { + case Some(FormatIn.Logfmt) => LogfmtLogLineParser(resolvedConfig) case Some(FormatIn.Csv) => throw new IllegalStateException( "method makeNonCsvLogLineParser does not support csv" ) - case _ => JsonLogLineParser(config, jsonPrefixPostfix) + case _ => JsonLogLineParser(resolvedConfig, jsonPrefixPostfix) } } @@ -205,10 +229,8 @@ object LogViewerStream { include: Option[List[String]], exclude: Option[List[String]] ): Boolean = { - val includeRegexes: List[Regex] = - include.getOrElse(Nil).map(_.r) - val excludeRegexes: List[Regex] = - exclude.getOrElse(Nil).map(_.r) + val includeRegexes: List[Regex] = include.getOrElse(Nil).map(_.r) + val excludeRegexes: List[Regex] = exclude.getOrElse(Nil).map(_.r) val includeMatches = includeRegexes.isEmpty || includeRegexes.exists( _.findFirstIn(str).isDefined ) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/ParseResultKeys.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/ParseResultKeys.scala index d4cbe75..de9c459 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/ParseResultKeys.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/ParseResultKeys.scala @@ -1,25 +1,36 @@ package ru.d10xa.jsonlogviewer -import HardcodedFieldNames.messageFieldName -import HardcodedFieldNames.levelFieldName -import HardcodedFieldNames.loggerNameFieldName -import HardcodedFieldNames.threadNameFieldName -import HardcodedFieldNames.stackTraceFieldName -import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.config.ResolvedConfig + +class ParseResultKeys(config: ResolvedConfig) { -class ParseResultKeys(config: Config) { def getByKey( parseResult: ParseResult, fieldName: String - ): Option[String] = fieldName match { - case config.timestamp.fieldName => parseResult.parsed.flatMap(_.timestamp) - case `messageFieldName` => parseResult.parsed.flatMap(_.message) - case `levelFieldName` => parseResult.parsed.flatMap(_.level) - case `loggerNameFieldName` => parseResult.parsed.flatMap(_.loggerName) - case `threadNameFieldName` => parseResult.parsed.flatMap(_.threadName) - case `stackTraceFieldName` => parseResult.parsed.flatMap(_.stackTrace) - case "prefix" => parseResult.prefix - case "postfix" => parseResult.postfix - case other => parseResult.parsed.flatMap(_.otherAttributes.get(other)) + ): Option[String] = { + import config.fieldNames.* + + // Determine which standard field the fieldName might refer to + if ( + fieldName == "timestamp" || fieldName == "@timestamp" || fieldName == timestampFieldName + ) { + parseResult.parsed.flatMap(_.timestamp) + } else if (fieldName == "level" || fieldName == levelFieldName) { + parseResult.parsed.flatMap(_.level) + } else if (fieldName == "message" || fieldName == messageFieldName) { + parseResult.parsed.flatMap(_.message) + } else if (fieldName == "stack_trace" || fieldName == stackTraceFieldName) { + parseResult.parsed.flatMap(_.stackTrace) + } else if (fieldName == "logger_name" || fieldName == loggerNameFieldName) { + parseResult.parsed.flatMap(_.loggerName) + } else if (fieldName == "thread_name" || fieldName == threadNameFieldName) { + parseResult.parsed.flatMap(_.threadName) + } else if (fieldName == "prefix") { + parseResult.prefix + } else if (fieldName == "postfix") { + parseResult.postfix + } else { + parseResult.parsed.flatMap(_.otherAttributes.get(fieldName)) + } } } diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/config/ResolvedConfig.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/config/ResolvedConfig.scala new file mode 100644 index 0000000..5699a57 --- /dev/null +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/config/ResolvedConfig.scala @@ -0,0 +1,164 @@ +package ru.d10xa.jsonlogviewer.config + +import java.time.ZonedDateTime +import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml +import ru.d10xa.jsonlogviewer.decline.yaml.Feed +import ru.d10xa.jsonlogviewer.decline.yaml.FieldNames +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.decline.Config.ConfigGrep +import ru.d10xa.jsonlogviewer.decline.Config.FormatIn +import ru.d10xa.jsonlogviewer.decline.Config.FormatOut +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig +import ru.d10xa.jsonlogviewer.query.QueryAST + +/** Fully resolved configuration that combines global and feed-specific + * settings. This eliminates the need to check multiple places for + * configuration values at runtime. + */ +final case class ResolvedConfig( + // Feed info + feedName: Option[String], + commands: List[String], + inlineInput: Option[String], + + // Config settings + filter: Option[QueryAST], + formatIn: Option[FormatIn], + formatOut: Option[FormatOut], + fieldNames: FieldNamesConfig, // already merged + + // Feed-specific settings + rawInclude: Option[List[String]], + rawExclude: Option[List[String]], + excludeFields: Option[List[String]], + + // Timestamp settings + timestampAfter: Option[ZonedDateTime], + timestampBefore: Option[ZonedDateTime], + + // Other settings + grep: List[ConfigGrep] +) + +/** Resolves configuration by merging global and feed-specific settings into a + * list of fully resolved configurations. + */ +object ConfigResolver { + + /** Resolves the configuration by combining global and feed-specific settings. + * + * @param config + * Base CLI configuration + * @param configYaml + * Optional YAML configuration + * @return + * List of fully resolved configurations (one per feed, or a single one + * based on global settings) + */ + def resolve( + config: Config, + configYaml: Option[ConfigYaml] + ): List[ResolvedConfig] = + configYaml match { + case Some(yaml) => + // Merge global field names from YAML with CLI config + val globalFieldNames = yaml.fieldNames match { + case Some(fieldNames) => + mergeFieldNames(config.fieldNames, Some(fieldNames)) + case None => config.fieldNames + } + + // Process feeds if present + yaml.feeds match { + case Some(feeds) if feeds.nonEmpty => + feeds.map { feed => + // For each feed, merge its field names with global field names + val feedFieldNames = + mergeFieldNames(globalFieldNames, feed.fieldNames) + + ResolvedConfig( + feedName = feed.name, + commands = feed.commands, + inlineInput = feed.inlineInput, + filter = feed.filter.orElse(config.filter), + formatIn = feed.formatIn.orElse(config.formatIn), + formatOut = config.formatOut, + fieldNames = feedFieldNames, + rawInclude = feed.rawInclude, + rawExclude = feed.rawExclude, + excludeFields = feed.excludeFields, + timestampAfter = config.timestamp.after, + timestampBefore = config.timestamp.before, + grep = config.grep + ) + } + case _ => + // If no feeds, create one ResolvedConfig based on global settings + List( + ResolvedConfig( + feedName = None, + commands = List.empty, + inlineInput = None, + filter = config.filter, + formatIn = config.formatIn, + formatOut = config.formatOut, + fieldNames = globalFieldNames, + rawInclude = None, + rawExclude = None, + excludeFields = None, + timestampAfter = config.timestamp.after, + timestampBefore = config.timestamp.before, + grep = config.grep + ) + ) + } + case None => + // If no ConfigYaml, create one ResolvedConfig based only on CLI config + List( + ResolvedConfig( + feedName = None, + commands = List.empty, + inlineInput = None, + filter = config.filter, + formatIn = config.formatIn, + formatOut = config.formatOut, + fieldNames = config.fieldNames, + rawInclude = None, + rawExclude = None, + excludeFields = None, + timestampAfter = config.timestamp.after, + timestampBefore = config.timestamp.before, + grep = config.grep + ) + ) + } + + /** Helper method for backwards compatibility with tests. Merges global field + * names with feed-specific field names. + */ + def mergeFieldNames( + globalFieldNames: FieldNamesConfig, + feedFieldNames: Option[FieldNames] + ): FieldNamesConfig = + feedFieldNames match { + case None => globalFieldNames + case Some(feedFields) => + FieldNamesConfig( + timestampFieldName = + feedFields.timestamp.getOrElse(globalFieldNames.timestampFieldName), + levelFieldName = + feedFields.level.getOrElse(globalFieldNames.levelFieldName), + messageFieldName = + feedFields.message.getOrElse(globalFieldNames.messageFieldName), + stackTraceFieldName = feedFields.stackTrace.getOrElse( + globalFieldNames.stackTraceFieldName + ), + loggerNameFieldName = feedFields.loggerName.getOrElse( + globalFieldNames.loggerNameFieldName + ), + threadNameFieldName = feedFields.threadName.getOrElse( + globalFieldNames.threadNameFieldName + ) + ) + } +} diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala index 3df3d12..4225aa8 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala @@ -3,12 +3,16 @@ package ru.d10xa.jsonlogviewer.csv import ru.d10xa.jsonlogviewer.LogLineParser import ru.d10xa.jsonlogviewer.ParseResult import ru.d10xa.jsonlogviewer.ParsedLine -import ru.d10xa.jsonlogviewer.decline.Config -import ru.d10xa.jsonlogviewer.HardcodedFieldNames.* +import ru.d10xa.jsonlogviewer.config.ResolvedConfig -class CsvLogLineParser(config: Config, headers: List[String]) extends LogLineParser { +class CsvLogLineParser(config: ResolvedConfig, headers: List[String]) extends LogLineParser { private val csvParser = new CsvParser() - private val timestampFieldName: String = config.timestamp.fieldName + private val timestampFieldName: String = config.fieldNames.timestampFieldName + private val levelFieldName: String = config.fieldNames.levelFieldName + private val messageFieldName: String = config.fieldNames.messageFieldName + private val stackTraceFieldName: String = config.fieldNames.stackTraceFieldName + private val loggerNameFieldName: String = config.fieldNames.loggerNameFieldName + private val threadNameFieldName: String = config.fieldNames.threadNameFieldName private val knownFieldNames: Seq[String] = Seq( timestampFieldName, @@ -59,9 +63,9 @@ class CsvLogLineParser(config: Config, headers: List[String]) extends LogLinePar } object CsvLogLineParser { - def apply(config: Config, headerLine: String): CsvLogLineParser = { + def apply(config: ResolvedConfig, headerLine: String): CsvLogLineParser = { val csvParser = new CsvParser() val headers = csvParser.parseLine(headerLine) new CsvLogLineParser(config, headers) } -} +} \ No newline at end of file diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala index 9347c3c..8f9b500 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala @@ -7,6 +7,7 @@ import scala.util.matching.Regex final case class Config( configFile: Option[ConfigFile], + fieldNames: FieldNamesConfig, timestamp: TimestampConfig, grep: List[ConfigGrep], filter: Option[QueryAST], diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala index 6c31d61..6fe81e1 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala @@ -5,26 +5,79 @@ import cats.data.Validated import cats.data.ValidatedNel import cats.syntax.all.* import com.monovore.decline.* -import com.monovore.decline.Opts import com.monovore.decline.time.* +import com.monovore.decline.Opts + +import java.time.ZonedDateTime import ru.d10xa.jsonlogviewer.decline.Config.ConfigGrep import ru.d10xa.jsonlogviewer.decline.Config.FormatIn import ru.d10xa.jsonlogviewer.decline.Config.FormatOut import ru.d10xa.jsonlogviewer.query.QueryAST -import java.time.ZonedDateTime - object DeclineOpts { + // Timestamp filter options val timestampAfter: Opts[Option[ZonedDateTime]] = Opts.option[ZonedDateTime]("timestamp-after", "").orNone val timestampBefore: Opts[Option[ZonedDateTime]] = Opts.option[ZonedDateTime]("timestamp-before", "").orNone + + // Field name options val timestampField: Opts[String] = Opts - .option[String]("timestamp-field", help = "") + .option[String]( + "timestamp-field", + help = "Override default timestamp field name" + ) .withDefault("@timestamp") + val levelField: Opts[String] = + Opts + .option[String]("level-field", help = "Override default level field name") + .withDefault("level") + + val messageField: Opts[String] = + Opts + .option[String]( + "message-field", + help = "Override default message field name" + ) + .withDefault("message") + + val stackTraceField: Opts[String] = + Opts + .option[String]( + "stack-trace-field", + help = "Override default stack trace field name" + ) + .withDefault("stack_trace") + + val loggerNameField: Opts[String] = + Opts + .option[String]( + "logger-name-field", + help = "Override default logger name field name" + ) + .withDefault("logger_name") + + val threadNameField: Opts[String] = + Opts + .option[String]( + "thread-name-field", + help = "Override default thread name field name" + ) + .withDefault("thread_name") + + def fieldNamesConfig: Opts[FieldNamesConfig] = + ( + timestampField, + levelField, + messageField, + stackTraceField, + loggerNameField, + threadNameField + ).mapN(FieldNamesConfig.apply) + def validateConfigGrep(string: String): ValidatedNel[String, ConfigGrep] = string.split(":", 2) match { case Array(key, value) => @@ -34,7 +87,7 @@ object DeclineOpts { val grepConfig: Opts[List[ConfigGrep]] = Opts .options[String]("grep", "", metavar = "key:value") - .mapValidated { lines => lines.traverse(validateConfigGrep) } + .mapValidated(lines => lines.traverse(validateConfigGrep)) .orEmpty val filterConfig: Opts[Option[QueryAST]] = Opts @@ -53,7 +106,7 @@ object DeclineOpts { .orNone def timestampConfig: Opts[TimestampConfig] = - (timestampField, timestampAfter, timestampBefore) + (timestampAfter, timestampBefore) .mapN(TimestampConfig.apply) val configFile: Opts[Option[ConfigFile]] = Opts @@ -62,25 +115,34 @@ object DeclineOpts { .orNone val config: Opts[Config] = - (configFile, timestampConfig, grepConfig, filterConfig, formatIn, formatOut) - .mapN { - case ( + ( + configFile, + fieldNamesConfig, + timestampConfig, + grepConfig, + filterConfig, + formatIn, + formatOut + ).mapN { + case ( configFile, + fieldNamesConfig, timestampConfig, grepConfig, filterConfig, formatIn, formatOut ) => - Config( - configFile = configFile, - timestamp = timestampConfig, - grep = grepConfig, - filter = filterConfig, - formatIn = formatIn, - formatOut = formatOut - ) - } + Config( + configFile = configFile, + fieldNames = fieldNamesConfig, + timestamp = timestampConfig, + grep = grepConfig, + filter = filterConfig, + formatIn = formatIn, + formatOut = formatOut + ) + } val command: Command[Config] = Command( name = "json-log-viewer", diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FieldNamesConfig.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FieldNamesConfig.scala new file mode 100644 index 0000000..f900de1 --- /dev/null +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FieldNamesConfig.scala @@ -0,0 +1,10 @@ +package ru.d10xa.jsonlogviewer.decline + +final case class FieldNamesConfig( + timestampFieldName: String, + levelFieldName: String, + messageFieldName: String, + stackTraceFieldName: String, + loggerNameFieldName: String, + threadNameFieldName: String +) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/TimestampConfig.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/TimestampConfig.scala index eb30de3..7107704 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/TimestampConfig.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/TimestampConfig.scala @@ -3,7 +3,6 @@ package ru.d10xa.jsonlogviewer.decline import java.time.ZonedDateTime final case class TimestampConfig( - fieldName: String, // TODO move to FieldNames config after: Option[ZonedDateTime], before: Option[ZonedDateTime] ) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYaml.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYaml.scala index 5663a02..1ecb219 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYaml.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYaml.scala @@ -1,8 +1,9 @@ package ru.d10xa.jsonlogviewer.decline.yaml case class ConfigYaml( + fieldNames: Option[FieldNames], feeds: Option[List[Feed]] ) object ConfigYaml: - val empty: ConfigYaml = ConfigYaml(None) + val empty: ConfigYaml = ConfigYaml(None, None) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala index f60615d..02b19f4 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala @@ -5,10 +5,11 @@ import ru.d10xa.jsonlogviewer.query.QueryAST case class Feed( name: Option[String], - commands: List[String], + commands: List[String], // TODO option inlineInput: Option[String], filter: Option[QueryAST], formatIn: Option[FormatIn], + fieldNames: Option[FieldNames], rawInclude: Option[List[String]], rawExclude: Option[List[String]], excludeFields: Option[List[String]] diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/FieldNames.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/FieldNames.scala new file mode 100644 index 0000000..178f243 --- /dev/null +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/FieldNames.scala @@ -0,0 +1,10 @@ +package ru.d10xa.jsonlogviewer.decline.yaml + +case class FieldNames( + timestamp: Option[String], + level: Option[String], + message: Option[String], + stackTrace: Option[String], + loggerName: Option[String], + threadName: Option[String] +) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala index d94b734..712214a 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala @@ -3,13 +3,15 @@ package ru.d10xa.jsonlogviewer.formatout import fansi.ErrorMode.Strip import fansi.EscapeAttr import fansi.Str -import ru.d10xa.jsonlogviewer.HardcodedFieldNames._ +import ru.d10xa.jsonlogviewer.config.ResolvedConfig import ru.d10xa.jsonlogviewer.OutputLineFormatter import ru.d10xa.jsonlogviewer.ParseResult -import ru.d10xa.jsonlogviewer.decline.Config -class ColorLineFormatter(c: Config, feedName: Option[String], excludeFields: Option[List[String]]) - extends OutputLineFormatter: +class ColorLineFormatter( + config: ResolvedConfig, + feedName: Option[String], + excludeFields: Option[List[String]] +) extends OutputLineFormatter: private val strEmpty: Str = Str("") private val strSpace: Str = Str(" ") private val strNewLine: Str = Str("\n") @@ -26,55 +28,62 @@ class ColorLineFormatter(c: Config, feedName: Option[String], excludeFields: Opt case _ => fansi.Color.White def strLevel(levelOpt: Option[String], colorAttr: EscapeAttr): Seq[Str] = - if (shouldExcludeField(levelFieldName)) Nil - else levelOpt match - case Some(level) => strSpace :: colorAttr(s"[${level.ansiStrip}]") :: Nil - case None => Nil + if (shouldExcludeField(config.fieldNames.levelFieldName)) Nil + else + levelOpt match + case Some(level) => + strSpace :: colorAttr(s"[${level.ansiStrip}]") :: Nil + case None => Nil def strMessage(messageOpt: Option[String], colorAttr: EscapeAttr): Seq[Str] = - if (shouldExcludeField(messageFieldName)) Nil - else messageOpt match - case Some(message) => strSpace :: colorAttr(message.ansiStrip) :: Nil - case None => Nil + if (shouldExcludeField(config.fieldNames.messageFieldName)) Nil + else + messageOpt match + case Some(message) => strSpace :: colorAttr(message.ansiStrip) :: Nil + case None => Nil def strStackTrace( stackTraceOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - if (shouldExcludeField(stackTraceFieldName)) Nil - else stackTraceOpt match - case Some(s) => strNewLine :: colorAttr(s.ansiStrip) :: Nil - case None => Nil + if (shouldExcludeField(config.fieldNames.stackTraceFieldName)) Nil + else + stackTraceOpt match + case Some(s) => strNewLine :: colorAttr(s.ansiStrip) :: Nil + case None => Nil def strLoggerName( loggerNameOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - if (shouldExcludeField(loggerNameFieldName)) Nil - else loggerNameOpt match - case Some(loggerName) => - strSpace :: colorAttr(loggerName.ansiStrip) :: Nil - case None => Nil + if (shouldExcludeField(config.fieldNames.loggerNameFieldName)) Nil + else + loggerNameOpt match + case Some(loggerName) => + strSpace :: colorAttr(loggerName.ansiStrip) :: Nil + case None => Nil def strTimestamp( timestampOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - if (shouldExcludeField(c.timestamp.fieldName)) Nil - else timestampOpt match - case Some(timestamp) => - strSpace :: colorAttr(timestamp.ansiStrip) :: Nil - case None => Nil + if (shouldExcludeField(config.fieldNames.timestampFieldName)) Nil + else + timestampOpt match + case Some(timestamp) => + strSpace :: colorAttr(timestamp.ansiStrip) :: Nil + case None => Nil def strThreadName( threadNameOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - if (shouldExcludeField(threadNameFieldName)) Nil - else threadNameOpt match - case Some(threadName) => - strSpace :: colorAttr(s"[${threadName.ansiStrip}]") :: Nil - case None => Nil + if (shouldExcludeField(config.fieldNames.threadNameFieldName)) Nil + else + threadNameOpt match + case Some(threadName) => + strSpace :: colorAttr(s"[${threadName.ansiStrip}]") :: Nil + case None => Nil def strOtherAttributes( otherAttributes: Map[String, String], @@ -83,7 +92,7 @@ class ColorLineFormatter(c: Config, feedName: Option[String], excludeFields: Opt val filteredAttributes = otherAttributes.filterNot { case (key, _) => shouldExcludeField(key) } - + filteredAttributes match case m if m.isEmpty => Nil case m => @@ -103,24 +112,28 @@ class ColorLineFormatter(c: Config, feedName: Option[String], excludeFields: Opt def strPrefix(s: Option[String]): Seq[Str] = if (shouldExcludeField("prefix")) Nil - else s match - case Some(prefix) => - fansi.Color.White(prefix.ansiStrip) :: strSpace :: Nil - case None => Nil + else + s match + case Some(prefix) => + fansi.Color.White(prefix.ansiStrip) :: strSpace :: Nil + case None => Nil + // TODO fix unused def strFeedName(s: Option[String]): Seq[Str] = if (shouldExcludeField("feed_name")) Nil - else s match - case Some(feedName) => - fansi.Color.White(feedName.ansiStrip) :: strSpace :: Nil - case None => Nil + else + s match + case Some(feedName) => + fansi.Color.White(feedName.ansiStrip) :: strSpace :: Nil + case None => Nil def strPostfix(s: Option[String]): Seq[Str] = if (shouldExcludeField("postfix")) Nil - else s match - case Some(postfix) => - strSpace :: fansi.Color.White(postfix.ansiStrip) :: Nil - case None => Nil + else + s match + case Some(postfix) => + strSpace :: fansi.Color.White(postfix.ansiStrip) :: Nil + case None => Nil override def formatLine(p: ParseResult): Str = p.parsed match diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/logfmt/LogfmtLogLineParser.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/logfmt/LogfmtLogLineParser.scala index db9f825..8bc19ad 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/logfmt/LogfmtLogLineParser.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/logfmt/LogfmtLogLineParser.scala @@ -1,14 +1,18 @@ package ru.d10xa.jsonlogviewer.logfmt -import ru.d10xa.jsonlogviewer.HardcodedFieldNames.* +import ru.d10xa.jsonlogviewer.config.ResolvedConfig import ru.d10xa.jsonlogviewer.LogLineParser import ru.d10xa.jsonlogviewer.ParseResult import ru.d10xa.jsonlogviewer.ParsedLine -import ru.d10xa.jsonlogviewer.decline.Config -class LogfmtLogLineParser(config: Config) extends LogLineParser { +class LogfmtLogLineParser(config: ResolvedConfig) extends LogLineParser { - val timestampFieldName: String = config.timestamp.fieldName + val timestampFieldName: String = config.fieldNames.timestampFieldName + val levelFieldName: String = config.fieldNames.levelFieldName + val messageFieldName: String = config.fieldNames.messageFieldName + val stackTraceFieldName: String = config.fieldNames.stackTraceFieldName + val loggerNameFieldName: String = config.fieldNames.loggerNameFieldName + val threadNameFieldName: String = config.fieldNames.threadNameFieldName val knownFieldNames: Seq[String] = Seq( timestampFieldName, @@ -28,7 +32,7 @@ class LogfmtLogLineParser(config: Config) extends LogLineParser { raw = s, parsed = Some( ParsedLine( - timestamp = res.get(config.timestamp.fieldName), + timestamp = res.get(timestampFieldName), level = res.get(levelFieldName), message = if other.nonEmpty then Some(other) else None, stackTrace = res.get(stackTraceFieldName), @@ -41,12 +45,10 @@ class LogfmtLogLineParser(config: Config) extends LogLineParser { prefix = None, postfix = None ) - } object LogfmtLogLineParser: - // TODO rename def toMap(ast: LogFmtAst): (Map[String, String], String) = ast match case StatementsAst(asts) => @@ -62,4 +64,4 @@ object LogfmtLogLineParser: (pairs, other.map(_.toString).mkString) case other => (Map.empty, other.toString) -end LogfmtLogLineParser +end LogfmtLogLineParser \ No newline at end of file diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/FieldNamesMergeTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/FieldNamesMergeTest.scala new file mode 100644 index 0000000..c647abf --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/FieldNamesMergeTest.scala @@ -0,0 +1,59 @@ +package ru.d10xa.jsonlogviewer + +import munit.FunSuite +import ru.d10xa.jsonlogviewer.decline.yaml.FieldNames +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig +import ru.d10xa.jsonlogviewer.config.ConfigResolver + +class FieldNamesMergeTest extends FunSuite { + + val globalFieldNames: FieldNamesConfig = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" + ) + + test("mergeFieldNames combines global and None") { + val noFeedFieldNames = + ConfigResolver.mergeFieldNames(globalFieldNames, None) + assertEquals(noFeedFieldNames, globalFieldNames) + } + + test("mergeFieldNames combines global and feed configs") { + val partialFeedFieldNames = ConfigResolver.mergeFieldNames( + globalFieldNames, + Some( + FieldNames( + timestamp = Some("ts"), + level = Some("severity"), + message = None, + stackTrace = None, + loggerName = None, + threadName = None + ) + ) + ) + + assertEquals(partialFeedFieldNames.timestampFieldName, "ts") + assertEquals(partialFeedFieldNames.levelFieldName, "severity") + assertEquals( + partialFeedFieldNames.messageFieldName, + "message" + ) + assertEquals( + partialFeedFieldNames.stackTraceFieldName, + "stack_trace" + ) + assertEquals( + partialFeedFieldNames.loggerNameFieldName, + "logger_name" + ) + assertEquals( + partialFeedFieldNames.threadNameFieldName, + "thread_name" + ) + } +} \ No newline at end of file diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala new file mode 100644 index 0000000..1b97e2e --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala @@ -0,0 +1,282 @@ +package ru.d10xa.jsonlogviewer + +import cats.effect.IO +import cats.effect.Ref +import fs2.concurrent.Channel +import fs2.Stream +import munit.CatsEffectSuite +import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml +import ru.d10xa.jsonlogviewer.decline.yaml.Feed +import ru.d10xa.jsonlogviewer.decline.yaml.FieldNames +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig +import ru.d10xa.jsonlogviewer.decline.TimestampConfig +import ru.d10xa.jsonlogviewer.query.QueryCompiler + +import scala.concurrent.duration.* + +class LogViewerStreamIntegrationTest extends CatsEffectSuite { + + // Sample log entries with different formats + val infoLog = + """{"@timestamp":"2023-01-01T10:00:00Z","level":"INFO","message":"Info message","logger_name":"TestLogger","thread_name":"main"}""" + val errorLog = + """{"@timestamp":"2023-01-01T12:00:00Z","level":"ERROR","message":"Error message","logger_name":"TestLogger","thread_name":"main"}""" + val customLog = + """{"ts":"2023-01-01T12:00:00Z","severity":"ERROR","msg":"Custom message","logger_name":"TestLogger","thread_name":"main"}""" + + // Default configuration + val baseConfig = Config( + configFile = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" + ), + timestamp = TimestampConfig(None, None), + grep = List.empty, + filter = None, + formatIn = Some(Config.FormatIn.Json), + formatOut = Some(Config.FormatOut.Raw) + ) + + test("config filters should update during live reload") { + // Create config with INFO filter + val infoFilter = QueryCompiler("level = 'INFO'").toOption + val initialConfig = ConfigYaml( + fieldNames = None, + feeds = Some( + List( + Feed( + name = Some("test-feed"), + commands = List.empty, + inlineInput = None, + filter = infoFilter, + formatIn = None, + fieldNames = None, + rawInclude = None, + rawExclude = None, + excludeFields = None + ) + ) + ) + ) + + // Create updated config with ERROR filter + val errorFilter = QueryCompiler("level = 'ERROR'").toOption + val updatedConfig = initialConfig.copy( + feeds = initialConfig.feeds.map(_.map(_.copy(filter = errorFilter))) + ) + + // Results collector + val results = scala.collection.mutable.ArrayBuffer.empty[String] + + for { + // Initialize config reference with initial configuration + configRef <- Ref.of[IO, Option[ConfigYaml]](Some(initialConfig)) + + // Setup input channel for test logs + logInputChannel <- Channel.unbounded[IO, String] + + // Create test stream implementation + testStreamImpl = new StdInLinesStream { + override def stdinLinesStream: Stream[IO, String] = + logInputChannel.stream + } + + // Save original implementation and use test implementation + originalImpl = LogViewerStream.getStdInLinesStreamImpl + _ <- IO(LogViewerStream.setStdInLinesStreamImpl(testStreamImpl)) + + // Start stream processing in background + streamFiber <- LogViewerStream + .stream(baseConfig, configRef) + .evalTap(result => IO(results.append(result))) + .compile + .drain + .start + + // Wait for stream initialization + _ <- IO.sleep(100.millis) + + // Phase 1: Test initial INFO filter + _ <- logInputChannel.send(infoLog) + _ <- logInputChannel.send(errorLog) + _ <- IO.sleep(200.millis) + + // Collect results and clear buffer + initialResults = results.toList + _ <- IO(results.clear()) + + // Phase 2: Update configuration to ERROR filter + _ <- configRef.set(Some(updatedConfig)) + _ <- IO.sleep(100.millis) + + // Send same logs with updated filter + _ <- logInputChannel.send(infoLog) + _ <- logInputChannel.send(errorLog) + _ <- IO.sleep(200.millis) + + // Collect results after filter update + updatedResults = results.toList + + // Cleanup + _ <- streamFiber.cancel + _ <- IO(LogViewerStream.setStdInLinesStreamImpl(originalImpl)) + + } yield { + // Verify initial INFO filter + assert( + initialResults.exists(_.contains("Info message")), + "INFO log should pass the initial INFO filter" + ) + assert( + !initialResults.exists(_.contains("Error message")), + "ERROR log should not pass the initial INFO filter" + ) + + // Verify updated ERROR filter + assert( + !updatedResults.exists(_.contains("Info message")), + "INFO log should not pass the updated ERROR filter" + ) + assert( + updatedResults.exists(_.contains("Error message")), + "ERROR log should pass the updated ERROR filter" + ) + } + } + + test("field mappings should update during live reload") { + // Initial configuration with standard field names + val initialConfig = ConfigYaml( + fieldNames = None, + feeds = Some( + List( + Feed( + name = Some("test-feed"), + commands = List.empty, + inlineInput = None, + filter = None, + formatIn = None, + fieldNames = None, + rawInclude = None, + rawExclude = None, + excludeFields = None + ) + ) + ) + ) + + // Standard config with ERROR filter (works with standard field names) + val errorFilterConfig = baseConfig.copy( + filter = QueryCompiler("level = 'ERROR'").toOption + ) + + // Updated config with custom field names mapping + val updatedConfig = ConfigYaml( + fieldNames = Some( + FieldNames( + timestamp = Some("ts"), + level = Some("severity"), + message = Some("msg"), + stackTrace = None, + loggerName = None, + threadName = None + ) + ), + feeds = initialConfig.feeds + ) + + // Results collector + val results = scala.collection.mutable.ArrayBuffer.empty[String] + + for { + // Initialize config reference with initial configuration + configRef <- Ref.of[IO, Option[ConfigYaml]](Some(initialConfig)) + + // Setup input channel for test logs + logInputChannel <- Channel.unbounded[IO, String] + + // Create test stream implementation + testStreamImpl = new StdInLinesStream { + override def stdinLinesStream: Stream[IO, String] = + logInputChannel.stream + } + + // Save original implementation and use test implementation + originalImpl = LogViewerStream.getStdInLinesStreamImpl + _ <- IO(LogViewerStream.setStdInLinesStreamImpl(testStreamImpl)) + + // Start stream processing in background + streamFiber <- LogViewerStream + .stream(errorFilterConfig, configRef) + .evalTap(result => IO(results.append(result))) + .compile + .drain + .start + + // Wait for stream initialization + _ <- IO.sleep(100.millis) + + // Phase 1: Test with standard field names mapping + _ <- logInputChannel.send(infoLog) + _ <- logInputChannel.send(errorLog) + _ <- logInputChannel.send(customLog) + _ <- IO.sleep(200.millis) + + // Collect results and clear buffer + initialResults = results.toList + _ <- IO(results.clear()) + + // Phase 2: Update configuration to use custom field names + _ <- configRef.set(Some(updatedConfig)) + _ <- IO.sleep(100.millis) + + // Send same logs with updated field mappings + _ <- logInputChannel.send(infoLog) + _ <- logInputChannel.send(errorLog) + _ <- logInputChannel.send(customLog) + _ <- IO.sleep(200.millis) + + // Collect results after field mapping update + updatedResults = results.toList + + // Cleanup + _ <- streamFiber.cancel + _ <- IO(LogViewerStream.setStdInLinesStreamImpl(originalImpl)) + + } yield { + // Verify initial field name mapping + assert( + !initialResults.exists(_.contains("Info message")), + "INFO log should not pass ERROR filter" + ) + assert( + initialResults.exists(_.contains("Error message")), + "Standard ERROR log should pass the filter" + ) + assert( + !initialResults.exists(_.contains("Custom message")), + "Custom log should not pass filter with initial mapping" + ) + + // Verify updated field name mapping + assert( + !updatedResults.exists(_.contains("Info message")), + "INFO log should still not pass ERROR filter" + ) + assert( + updatedResults.exists(_.contains("Error message")), + "Standard ERROR log should still pass the filter" + ) + assert( + updatedResults.exists(_.contains("Custom message")), + "Custom log should now pass filter with updated mapping" + ) + } + } +} \ No newline at end of file diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamLiveReloadTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamLiveReloadTest.scala new file mode 100644 index 0000000..2fe8ce8 --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamLiveReloadTest.scala @@ -0,0 +1,158 @@ +package ru.d10xa.jsonlogviewer + +import cats.effect.unsafe.implicits.global +import cats.effect.IO +import cats.effect.Ref +import fs2.Stream +import munit.CatsEffectSuite +import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml +import ru.d10xa.jsonlogviewer.decline.yaml.Feed +import ru.d10xa.jsonlogviewer.decline.yaml.FieldNames +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig +import ru.d10xa.jsonlogviewer.decline.TimestampConfig +import ru.d10xa.jsonlogviewer.query.QueryCompiler + +/** + * Tests that verify LogViewerStream's live config reload functionality + */ +class LogViewerStreamLiveReloadTest extends CatsEffectSuite { + + // Test logs + val infoLog = """{"@timestamp":"2023-01-01T10:00:00Z","level":"INFO","message":"Test message","logger_name":"TestLogger","thread_name":"main"}""" + val errorLog = """{"@timestamp":"2023-01-01T12:00:00Z","level":"ERROR","message":"Error message","logger_name":"TestLogger","thread_name":"main"}""" + val customFormatLog = """{"ts":"2023-01-01T12:00:00Z","severity":"ERROR","msg":"Test message","logger_name":"TestLogger","thread_name":"main"}""" + + val baseConfig = Config( + configFile = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" + ), + timestamp = TimestampConfig(None, None), + grep = List.empty, + filter = None, + formatIn = Some(Config.FormatIn.Json), + formatOut = Some(Config.FormatOut.Raw) + ) + + test("live reload should update filters during execution") { + // Initial config with INFO filter + val infoFilter = QueryCompiler("level = 'INFO'").toOption + val initialConfig = ConfigYaml( + fieldNames = None, + feeds = Some(List(Feed( + name = Some("test-feed"), + commands = List.empty, + inlineInput = None, + filter = infoFilter, + formatIn = None, + fieldNames = None, + rawInclude = None, + rawExclude = None, + excludeFields = None + ))) + ) + + // Updated config with ERROR filter + val errorFilter = QueryCompiler("level = 'ERROR'").toOption + val updatedConfig = initialConfig.copy( + feeds = initialConfig.feeds.map(_.map(_.copy(filter = errorFilter))) + ) + + for { + // Create config ref with initial value + configRef <- Ref.of[IO, Option[ConfigYaml]](Some(initialConfig)) + + // Process logs with initial config (INFO filter) + infoResults1 <- LogViewerStream.processLineWithRef(infoLog, configRef, baseConfig).compile.toList + errorResults1 <- LogViewerStream.processLineWithRef(errorLog, configRef, baseConfig).compile.toList + + // Update config to use ERROR filter + _ <- configRef.set(Some(updatedConfig)) + + // Process logs with updated config (ERROR filter) + infoResults2 <- LogViewerStream.processLineWithRef(infoLog, configRef, baseConfig).compile.toList + errorResults2 <- LogViewerStream.processLineWithRef(errorLog, configRef, baseConfig).compile.toList + } yield { + // With initial config (INFO filter): + assert(infoResults1.nonEmpty, "INFO log should pass initial INFO filter") + assert(errorResults1.isEmpty, "ERROR log should not pass initial INFO filter") + + // With updated config (ERROR filter): + assert(infoResults2.isEmpty, "INFO log should not pass updated ERROR filter") + assert(errorResults2.nonEmpty, "ERROR log should pass updated ERROR filter") + } + } + + test("live reload should update field names mapping") { + // Initial config with standard field names + val initialConfig = ConfigYaml( + fieldNames = None, + feeds = Some(List(Feed( + name = Some("test-feed"), + commands = List.empty, + inlineInput = None, + filter = None, + formatIn = None, + fieldNames = None, + rawInclude = None, + rawExclude = None, + excludeFields = None + ))) + ) + + // Updated config with custom field names + val customFieldConfig = ConfigYaml( + fieldNames = Some(FieldNames( + timestamp = Some("ts"), + level = Some("severity"), + message = Some("msg"), + stackTrace = None, + loggerName = None, + threadName = None + )), + feeds = initialConfig.feeds + ) + + // Filter configurations that only work with correctly mapped fields + val levelErrorFilter = QueryCompiler("level = 'ERROR'").toOption + val severityErrorFilter = QueryCompiler("level = 'ERROR'").toOption + val configWithLevelFilter = baseConfig.copy(filter = levelErrorFilter) + val configWithSeverityFilter = baseConfig.copy(filter = severityErrorFilter) + + for { + // Create config ref with initial value + configRef <- Ref.of[IO, Option[ConfigYaml]](Some(initialConfig)) + + // With initial config: + // Standard log with level=ERROR should pass level filter + standardLevelResults <- LogViewerStream.processLineWithRef(errorLog, configRef, configWithLevelFilter).compile.toList + // Custom log with severity=ERROR should not pass level filter (field not recognized) + customLevelResults <- LogViewerStream.processLineWithRef(customFormatLog, configRef, configWithLevelFilter).compile.toList + // Custom log with severity=ERROR should not pass severity filter (field not mapped) + customSeverityResults1 <- LogViewerStream.processLineWithRef(customFormatLog, configRef, configWithSeverityFilter).compile.toList + + // Update config to map custom field names + _ <- configRef.set(Some(customFieldConfig)) + + // With updated config: + // Standard log with level=ERROR should still pass level filter + standardLevelResults2 <- LogViewerStream.processLineWithRef(errorLog, configRef, configWithLevelFilter).compile.toList + // Custom log should now pass severity filter (field properly mapped) + customSeverityResults2 <- LogViewerStream.processLineWithRef(customFormatLog, configRef, configWithSeverityFilter).compile.toList + } yield { + // Before field mapping update: + assert(standardLevelResults.nonEmpty, "Standard log should pass level filter with initial config") + assert(customLevelResults.isEmpty, "Custom log should not pass level filter with initial config") + + // After field mapping update: + assert(standardLevelResults2.nonEmpty, "Standard log should still pass level filter after update") + assert(customSeverityResults2.nonEmpty, "Custom log should pass severity filter after field mapping update") + } + } +} \ No newline at end of file diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysGetByKeyTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysGetByKeyTest.scala new file mode 100644 index 0000000..e6c320d --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysGetByKeyTest.scala @@ -0,0 +1,115 @@ +package ru.d10xa.jsonlogviewer + +import munit.FunSuite +import ru.d10xa.jsonlogviewer.config.ResolvedConfig +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig + +class ParseResultKeysGetByKeyTest extends FunSuite { + + private val customConfig = ResolvedConfig( + feedName = None, + commands = List.empty, + inlineInput = None, + filter = None, + formatIn = None, + formatOut = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "ts", + levelFieldName = "severity", + messageFieldName = "msg", + stackTraceFieldName = "trace", + loggerNameFieldName = "logger", + threadNameFieldName = "thread" + ), + rawInclude = None, + rawExclude = None, + excludeFields = None, + timestampAfter = None, + timestampBefore = None, + grep = List.empty + ) + + private val parseResult = ParseResult( + raw = "raw log line", + parsed = Some( + ParsedLine( + timestamp = Some("2023-01-01T12:00:00Z"), + level = Some("ERROR"), + message = Some("Test message"), + stackTrace = Some("Error trace"), + loggerName = Some("TestLogger"), + threadName = Some("main"), + otherAttributes = Map("custom_field" -> "custom_value") + ) + ), + middle = "middle part", + prefix = Some("prefix"), + postfix = Some("postfix") + ) + + private val parseResultKeys = new ParseResultKeys(customConfig) + + test("should get level value by standard field name even when renamed") { + assertEquals( + parseResultKeys.getByKey(parseResult, "level"), + Some("ERROR"), + "Should get value by standard field name 'level' even when renamed to 'severity'" + ) + } + + test("should get level value by custom field name") { + assertEquals( + parseResultKeys.getByKey(parseResult, "severity"), + Some("ERROR"), + "Should get value by custom field name 'severity'" + ) + } + + test("should get message by standard name") { + assertEquals( + parseResultKeys.getByKey(parseResult, "message"), + Some("Test message"), + "Should get message by standard name" + ) + } + + test("should get message by custom name") { + assertEquals( + parseResultKeys.getByKey(parseResult, "msg"), + Some("Test message"), + "Should get message by custom name" + ) + } + + test("should get timestamp by standard name") { + assertEquals( + parseResultKeys.getByKey(parseResult, "timestamp"), + Some("2023-01-01T12:00:00Z"), + "Should get timestamp by standard name" + ) + } + + test("should get timestamp by custom name") { + assertEquals( + parseResultKeys.getByKey(parseResult, "ts"), + Some("2023-01-01T12:00:00Z"), + "Should get timestamp by custom name" + ) + } + + test("should get stack trace by standard name") { + assertEquals( + parseResultKeys.getByKey(parseResult, "stack_trace"), + Some("Error trace"), + "Should get stack trace by standard name" + ) + } + + test("should get stack trace by custom name") { + assertEquals( + parseResultKeys.getByKey(parseResult, "trace"), + Some("Error trace"), + "Should get stack trace by custom name" + ) + } +} diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysTest.scala new file mode 100644 index 0000000..b93f691 --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/ParseResultKeysTest.scala @@ -0,0 +1,130 @@ +package ru.d10xa.jsonlogviewer + +import munit.FunSuite +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig +import ru.d10xa.jsonlogviewer.decline.TimestampConfig +import ru.d10xa.jsonlogviewer.config.ResolvedConfig + +class ParseResultKeysTest extends FunSuite { + + test("getByKey uses configured field names") { + val standardConfig = ResolvedConfig( + feedName = None, + commands = List.empty, + inlineInput = None, + filter = None, + formatIn = None, + formatOut = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" + ), + rawInclude = None, + rawExclude = None, + excludeFields = None, + timestampAfter = None, + timestampBefore = None, + grep = List.empty + ) + + val customConfig = ResolvedConfig( + feedName = None, + commands = List.empty, + inlineInput = None, + filter = None, + formatIn = None, + formatOut = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "ts", + levelFieldName = "severity", + messageFieldName = "text", + stackTraceFieldName = "exception", + loggerNameFieldName = "logger", + threadNameFieldName = "thread" + ), + rawInclude = None, + rawExclude = None, + excludeFields = None, + timestampAfter = None, + timestampBefore = None, + grep = List.empty + ) + + val parseResult = ParseResult( + raw = "raw log line", + parsed = Some( + ParsedLine( + timestamp = Some("2023-01-01T12:00:00Z"), + level = Some("INFO"), + message = Some("Test message"), + stackTrace = Some("Error trace"), + loggerName = Some("TestLogger"), + threadName = Some("main"), + otherAttributes = Map("custom_field" -> "custom_value") + ) + ), + middle = "middle part", + prefix = Some("prefix"), + postfix = Some("postfix") + ) + + val standardKeys = new ParseResultKeys(standardConfig) + assertEquals( + standardKeys.getByKey(parseResult, "@timestamp"), + Some("2023-01-01T12:00:00Z") + ) + assertEquals(standardKeys.getByKey(parseResult, "level"), Some("INFO")) + assertEquals( + standardKeys.getByKey(parseResult, "message"), + Some("Test message") + ) + assertEquals( + standardKeys.getByKey(parseResult, "stack_trace"), + Some("Error trace") + ) + assertEquals( + standardKeys.getByKey(parseResult, "logger_name"), + Some("TestLogger") + ) + assertEquals( + standardKeys.getByKey(parseResult, "thread_name"), + Some("main") + ) + assertEquals( + standardKeys.getByKey(parseResult, "custom_field"), + Some("custom_value") + ) + assertEquals(standardKeys.getByKey(parseResult, "prefix"), Some("prefix")) + assertEquals(standardKeys.getByKey(parseResult, "postfix"), Some("postfix")) + + val customKeys = new ParseResultKeys(customConfig) + assertEquals( + customKeys.getByKey(parseResult, "ts"), + Some("2023-01-01T12:00:00Z") + ) + assertEquals(customKeys.getByKey(parseResult, "severity"), Some("INFO")) + assertEquals(customKeys.getByKey(parseResult, "text"), Some("Test message")) + assertEquals( + customKeys.getByKey(parseResult, "exception"), + Some("Error trace") + ) + assertEquals(customKeys.getByKey(parseResult, "logger"), Some("TestLogger")) + assertEquals(customKeys.getByKey(parseResult, "thread"), Some("main")) + + assertEquals(customKeys.getByKey(parseResult, "@timestamp"), Some("2023-01-01T12:00:00Z")) + assertEquals(customKeys.getByKey(parseResult, "level"), Some("INFO")) + assertEquals(customKeys.getByKey(parseResult, "message"), Some("Test message")) + + assertEquals( + customKeys.getByKey(parseResult, "custom_field"), + Some("custom_value") + ) + assertEquals(customKeys.getByKey(parseResult, "prefix"), Some("prefix")) + assertEquals(customKeys.getByKey(parseResult, "postfix"), Some("postfix")) + } +} \ No newline at end of file diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/TimestampFilterTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/TimestampFilterTest.scala index 47d9b7b..8970c66 100644 --- a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/TimestampFilterTest.scala +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/TimestampFilterTest.scala @@ -40,4 +40,4 @@ class TimestampFilterTest extends CatsEffectSuite { prefix = None, postfix = None ) -} +} \ No newline at end of file diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/config/ConfigResolverTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/config/ConfigResolverTest.scala new file mode 100644 index 0000000..deeba91 --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/config/ConfigResolverTest.scala @@ -0,0 +1,144 @@ +package ru.d10xa.jsonlogviewer.config + +import munit.FunSuite +import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml +import ru.d10xa.jsonlogviewer.decline.yaml.Feed +import ru.d10xa.jsonlogviewer.decline.yaml.FieldNames +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig +import ru.d10xa.jsonlogviewer.decline.TimestampConfig + +class ConfigResolverTest extends FunSuite { + + val standardConfig: Config = Config( + configFile = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" + ), + timestamp = TimestampConfig(None, None), + grep = List.empty, + filter = None, + formatIn = None, + formatOut = None + ) + + test("resolve returns single config when no ConfigYaml provided") { + val resolvedConfigs = ConfigResolver.resolve(standardConfig, None) + + assertEquals(resolvedConfigs.length, 1) + + val resolved = resolvedConfigs.head + assertEquals(resolved.feedName, None) + assertEquals(resolved.commands, List.empty) + assertEquals(resolved.fieldNames, standardConfig.fieldNames) + assertEquals(resolved.filter, None) + assertEquals(resolved.formatIn, None) + assertEquals(resolved.formatOut, None) + } + + test("resolve merges global fieldNames from ConfigYaml") { + val configYaml = ConfigYaml( + fieldNames = Some( + FieldNames( + timestamp = Some("ts"), + level = Some("severity"), + message = None, + stackTrace = None, + loggerName = None, + threadName = None + ) + ), + feeds = None + ) + + val resolvedConfigs = + ConfigResolver.resolve(standardConfig, Some(configYaml)) + + assertEquals(resolvedConfigs.length, 1) + + val resolved = resolvedConfigs.head + assertEquals(resolved.fieldNames.timestampFieldName, "ts") + assertEquals(resolved.fieldNames.levelFieldName, "severity") + assertEquals( + resolved.fieldNames.messageFieldName, + "message" + ) // From standardConfig + } + + test("resolve correctly handles multiple feeds") { + val configYaml = ConfigYaml( + fieldNames = Some( + FieldNames( + timestamp = Some("ts"), + level = Some("severity"), + message = None, + stackTrace = None, + loggerName = None, + threadName = None + ) + ), + feeds = Some( + List( + Feed( + name = Some("feed1"), + commands = List("cmd1"), + inlineInput = None, + filter = None, + formatIn = Some(Config.FormatIn.Json), + fieldNames = Some( + FieldNames( + timestamp = None, + level = None, + message = Some("msg"), + stackTrace = None, + loggerName = None, + threadName = None + ) + ), + rawInclude = None, + rawExclude = None, + excludeFields = None + ), + Feed( + name = Some("feed2"), + commands = List("cmd2"), + inlineInput = None, + filter = None, + formatIn = Some(Config.FormatIn.Csv), + fieldNames = None, + rawInclude = None, + rawExclude = None, + excludeFields = None + ) + ) + ) + ) + + val resolvedConfigs = + ConfigResolver.resolve(standardConfig, Some(configYaml)) + + assertEquals(resolvedConfigs.length, 2) + + val feed1 = resolvedConfigs.find(_.feedName.contains("feed1")).get + assertEquals(feed1.commands, List("cmd1")) + assertEquals(feed1.formatIn, Some(Config.FormatIn.Json)) + assertEquals(feed1.fieldNames.timestampFieldName, "ts") // From global + assertEquals(feed1.fieldNames.levelFieldName, "severity") // From global + assertEquals(feed1.fieldNames.messageFieldName, "msg") // From feed1 + + val feed2 = resolvedConfigs.find(_.feedName.contains("feed2")).get + assertEquals(feed2.commands, List("cmd2")) + assertEquals(feed2.formatIn, Some(Config.FormatIn.Csv)) + assertEquals(feed2.fieldNames.timestampFieldName, "ts") // From global + assertEquals(feed2.fieldNames.levelFieldName, "severity") // From global + assertEquals( + feed2.fieldNames.messageFieldName, + "message" + ) // Default, not overridden + } +} diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala index 5a6a43e..6a7c6e7 100644 --- a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala @@ -1,31 +1,46 @@ package ru.d10xa.jsonlogviewer.csv import munit.FunSuite +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig import ru.d10xa.jsonlogviewer.decline.Config -import ru.d10xa.jsonlogviewer.decline.TimestampConfig +import ru.d10xa.jsonlogviewer.config.ResolvedConfig class CsvLogLineParserTest extends FunSuite { - val config: Config = Config( - configFile = None, - timestamp = TimestampConfig( - fieldName = "@timestamp", - after = None, - before = None + val config: ResolvedConfig = ResolvedConfig( + feedName = None, + commands = List.empty, + inlineInput = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" ), - grep = List.empty, filter = None, formatIn = Some(Config.FormatIn.Csv), - formatOut = None + formatOut = None, + rawInclude = None, + rawExclude = None, + excludeFields = None, + timestampAfter = None, + timestampBefore = None, + grep = List.empty ) test("parse CSV log line with standard headers") { - val headerLine = "@timestamp,level,message,logger_name,thread_name,stack_trace" + val headerLine = + "@timestamp,level,message,logger_name,thread_name,stack_trace" val logLine = "2023-01-01T12:00:00Z,INFO,Test message,Logger,main,null" val parser = CsvLogLineParser(config, headerLine) val result = parser.parse(logLine) - assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals( + result.parsed.flatMap(_.timestamp), + Some("2023-01-01T12:00:00Z") + ) assertEquals(result.parsed.flatMap(_.level), Some("INFO")) assertEquals(result.parsed.flatMap(_.message), Some("Test message")) assertEquals(result.parsed.flatMap(_.loggerName), Some("Logger")) @@ -41,47 +56,85 @@ class CsvLogLineParserTest extends FunSuite { val parser = CsvLogLineParser(config, headerLine) val result = parser.parse(logLine) - assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals( + result.parsed.flatMap(_.timestamp), + Some("2023-01-01T12:00:00Z") + ) assertEquals(result.parsed.flatMap(_.level), Some("INFO")) assertEquals(result.parsed.flatMap(_.message), Some("Test message")) - assertEquals(result.parsed.exists(_.otherAttributes.contains("custom_field")), true) - assertEquals(result.parsed.flatMap(_.otherAttributes.get("custom_field")), Some("custom value")) + assertEquals( + result.parsed.exists(_.otherAttributes.contains("custom_field")), + true + ) + assertEquals( + result.parsed.flatMap(_.otherAttributes.get("custom_field")), + Some("custom value") + ) } test("parse CSV log line with spaces in header") { - val headerLine = "\"@timestamp\",\"log level\",\"message text\",\"logger name\"" + val headerLine = + "\"@timestamp\",\"log level\",\"message text\",\"logger name\"" val logLine = "2023-01-01T12:00:00Z,INFO,Test message,Logger" val parser = CsvLogLineParser(config, headerLine) val result = parser.parse(logLine) - assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals( + result.parsed.flatMap(_.timestamp), + Some("2023-01-01T12:00:00Z") + ) assertEquals(result.parsed.flatMap(_.level), None) assertEquals(result.parsed.flatMap(_.message), None) assertEquals(result.parsed.flatMap(_.loggerName), None) - assertEquals(result.parsed.exists(_.otherAttributes.contains("log level")), true) - assertEquals(result.parsed.flatMap(_.otherAttributes.get("log level")), Some("INFO")) - assertEquals(result.parsed.exists(_.otherAttributes.contains("message text")), true) - assertEquals(result.parsed.flatMap(_.otherAttributes.get("message text")), Some("Test message")) - assertEquals(result.parsed.exists(_.otherAttributes.contains("logger name")), true) - assertEquals(result.parsed.flatMap(_.otherAttributes.get("logger name")), Some("Logger")) + assertEquals( + result.parsed.exists(_.otherAttributes.contains("log level")), + true + ) + assertEquals( + result.parsed.flatMap(_.otherAttributes.get("log level")), + Some("INFO") + ) + assertEquals( + result.parsed.exists(_.otherAttributes.contains("message text")), + true + ) + assertEquals( + result.parsed.flatMap(_.otherAttributes.get("message text")), + Some("Test message") + ) + assertEquals( + result.parsed.exists(_.otherAttributes.contains("logger name")), + true + ) + assertEquals( + result.parsed.flatMap(_.otherAttributes.get("logger name")), + Some("Logger") + ) } test("parse CSV log line with missing fields") { - val headerLine = "@timestamp,level,message,logger_name,thread_name,stack_trace,custom_field" + val headerLine = + "@timestamp,level,message,logger_name,thread_name,stack_trace,custom_field" val logLine = "2023-01-01T12:00:00Z,INFO,Test message,Logger" val parser = CsvLogLineParser(config, headerLine) val result = parser.parse(logLine) - assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals( + result.parsed.flatMap(_.timestamp), + Some("2023-01-01T12:00:00Z") + ) assertEquals(result.parsed.flatMap(_.level), Some("INFO")) assertEquals(result.parsed.flatMap(_.message), Some("Test message")) assertEquals(result.parsed.flatMap(_.loggerName), Some("Logger")) assertEquals(result.parsed.flatMap(_.threadName), None) assertEquals(result.parsed.flatMap(_.stackTrace), None) - assertEquals(result.parsed.exists(_.otherAttributes.contains("custom_field")), false) + assertEquals( + result.parsed.exists(_.otherAttributes.contains("custom_field")), + false + ) } test("parse CSV log line with more values than headers") { @@ -91,7 +144,10 @@ class CsvLogLineParserTest extends FunSuite { val parser = CsvLogLineParser(config, headerLine) val result = parser.parse(logLine) - assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals( + result.parsed.flatMap(_.timestamp), + Some("2023-01-01T12:00:00Z") + ) assertEquals(result.parsed.flatMap(_.level), Some("INFO")) assertEquals(result.parsed.flatMap(_.message), Some("Test message")) assert(result.parsed.exists(_.otherAttributes.isEmpty)) diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvProcessingTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvProcessingTest.scala new file mode 100644 index 0000000..553721f --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvProcessingTest.scala @@ -0,0 +1,110 @@ +package ru.d10xa.jsonlogviewer.csv + +import cats.effect.unsafe.implicits.global +import cats.effect.IO +import cats.effect.Ref +import fs2.Stream +import munit.CatsEffectSuite +import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig +import ru.d10xa.jsonlogviewer.decline.TimestampConfig +import ru.d10xa.jsonlogviewer.LogViewerStream +import ru.d10xa.jsonlogviewer.StdInLinesStream + +class CsvProcessingTest extends CatsEffectSuite { + + test("should process CSV format with header line") { + val csvHeader = "@timestamp,level,message,logger_name,thread_name" + val csvLine1 = "2023-01-01T10:00:00Z,INFO,Log message 1,TestLogger,main" + val csvLine2 = "2023-01-01T11:00:00Z,WARN,Log message 2,TestLogger,main" + + val csvConfig = Config( + configFile = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" + ), + timestamp = TimestampConfig(None, None), + grep = List.empty, + filter = None, + formatIn = Some(Config.FormatIn.Csv), + formatOut = Some(Config.FormatOut.Raw) + ) + + for { + configRef <- Ref.of[IO, Option[ConfigYaml]](None) + + testStreamImpl = new StdInLinesStream { + override def stdinLinesStream: fs2.Stream[IO, String] = + Stream.emits(List(csvHeader, csvLine1, csvLine2)) + } + + _ <- IO(LogViewerStream.setStdInLinesStreamImpl(testStreamImpl)) + + results <- LogViewerStream.stream(csvConfig, configRef).compile.toList + + } yield { + assert(results.nonEmpty, "Results should not be empty") + + assert( + results.exists(_.contains("Log message 1")), + "Results should contain data from first CSV line" + ) + assert( + results.exists(_.contains("Log message 2")), + "Results should contain data from second CSV line" + ) + } + } + + test( + "should handle CSV with different column order than default field names" + ) { + val csvHeader = + "message,level,@timestamp,thread_name,logger_name" + val csvLine = + "Custom log message,INFO,2023-01-01T12:00:00Z,worker-1,CustomLogger" + + val csvConfig = Config( + configFile = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" + ), + timestamp = TimestampConfig(None, None), + grep = List.empty, + filter = None, + formatIn = Some(Config.FormatIn.Csv), + formatOut = Some(Config.FormatOut.Raw) + ) + + for { + configRef <- Ref.of[IO, Option[ConfigYaml]](None) + + testStreamImpl = new StdInLinesStream { + override def stdinLinesStream: fs2.Stream[IO, String] = + Stream.emits(List(csvHeader, csvLine)) + } + + _ <- IO(LogViewerStream.setStdInLinesStreamImpl(testStreamImpl)) + + results <- LogViewerStream.stream(csvConfig, configRef).compile.toList + + } yield { + assert(results.nonEmpty, "Results should not be empty") + assert( + results.exists(_.contains("Custom log message")), + "Results should contain the message despite different column order" + ) + } + } +} diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/query/LogLineQueryPredicateImplTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/query/LogLineQueryPredicateImplTest.scala index 69ec777..a724de4 100644 --- a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/query/LogLineQueryPredicateImplTest.scala +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/query/LogLineQueryPredicateImplTest.scala @@ -1,13 +1,14 @@ package ru.d10xa.jsonlogviewer.query +import ru.d10xa.jsonlogviewer.config.ResolvedConfig +import ru.d10xa.jsonlogviewer.decline.Config import ru.d10xa.jsonlogviewer.decline.Config.FormatIn +import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig import ru.d10xa.jsonlogviewer.LogLineQueryPredicateImpl import ru.d10xa.jsonlogviewer.LogLineQueryPredicateImpl.likeContains import ru.d10xa.jsonlogviewer.ParseResult import ru.d10xa.jsonlogviewer.ParseResultKeys import ru.d10xa.jsonlogviewer.ParsedLine -import ru.d10xa.jsonlogviewer.decline.Config -import ru.d10xa.jsonlogviewer.decline.TimestampConfig class LogLineQueryPredicateImplTest extends munit.FunSuite { @@ -81,20 +82,30 @@ class LogLineQueryPredicateImplTest extends munit.FunSuite { postfix = None ) - private val config: Config = Config( - configFile = None, - timestamp = TimestampConfig( - fieldName = "@timestamp", - None, - None + private val resolvedConfig: ResolvedConfig = ResolvedConfig( + feedName = None, + commands = List.empty, + inlineInput = None, + fieldNames = FieldNamesConfig( + timestampFieldName = "@timestamp", + levelFieldName = "level", + messageFieldName = "message", + stackTraceFieldName = "stack_trace", + loggerNameFieldName = "logger_name", + threadNameFieldName = "thread_name" ), - grep = List.empty, filter = None, formatIn = None, - formatOut = None + formatOut = None, + rawInclude = None, + rawExclude = None, + excludeFields = None, + timestampAfter = None, + timestampBefore = None, + grep = List.empty ) - private lazy val parseResultKeys = new ParseResultKeys(config = config) + private lazy val parseResultKeys = new ParseResultKeys(resolvedConfig) private def messageLike(s: String): LogLineQueryPredicateImpl = val le = LikeExpr(StrIdentifier("message"), StrLiteral(s), false) @@ -106,6 +117,7 @@ class LogLineQueryPredicateImplTest extends munit.FunSuite { ): LogLineQueryPredicateImpl = val le = LikeExpr(StrIdentifier("stack_trace"), StrLiteral(s), negate) new LogLineQueryPredicateImpl(le, parseResultKeys) + private def customFieldLike( s: String, negate: Boolean From b0d6688274701b296143e049c473dc647d6f1572 Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Sat, 22 Mar 2025 21:09:38 +0300 Subject: [PATCH 2/4] Fix LogViewerStream --- .../ru/d10xa/jsonlogviewer/LogViewerStream.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala index 5394305..da3b60b 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala @@ -66,7 +66,16 @@ object LogViewerStream { } Stream.emits(feedStreams).parJoin(feedStreams.size) } else { - processStreamWithConfig(stdinLinesStream, resolvedConfigs.head) + val resolvedConfig = resolvedConfigs.head + val inputStream = if (resolvedConfig.inlineInput.isDefined) { + commandsAndInlineInputToStream( + resolvedConfig.commands, + resolvedConfig.inlineInput + ) + } else { + stdinLinesStream + } + processStreamWithConfig(inputStream, resolvedConfig) } finalStream.intersperse("\n").append(Stream.emit("\n")) From db944146ebb2e5f17083355ae93628fc50e0b904 Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Sat, 22 Mar 2025 21:15:56 +0300 Subject: [PATCH 3/4] Fix ViewElement --- .../src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala index 2dea52a..adec23b 100644 --- a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala +++ b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala @@ -26,6 +26,7 @@ object ViewElement { def makeConfigYamlForInlineInput(string: String, config: Config): ConfigYaml = ConfigYaml( + fieldNames = None, feeds = Some( List( Feed( From 4f1556753a091df7fc5080675292f5aa7771e002 Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Sat, 22 Mar 2025 21:22:30 +0300 Subject: [PATCH 4/4] sleep 500 --- .../jsonlogviewer/LogViewerStreamIntegrationTest.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala index 1b97e2e..3b1ca4e 100644 --- a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/LogViewerStreamIntegrationTest.scala @@ -100,12 +100,13 @@ class LogViewerStreamIntegrationTest extends CatsEffectSuite { .start // Wait for stream initialization - _ <- IO.sleep(100.millis) + _ <- IO.sleep(500.millis) // Phase 1: Test initial INFO filter _ <- logInputChannel.send(infoLog) + _ <- IO.sleep(100.millis) _ <- logInputChannel.send(errorLog) - _ <- IO.sleep(200.millis) + _ <- IO.sleep(500.millis) // Collect results and clear buffer initialResults = results.toList @@ -113,12 +114,13 @@ class LogViewerStreamIntegrationTest extends CatsEffectSuite { // Phase 2: Update configuration to ERROR filter _ <- configRef.set(Some(updatedConfig)) - _ <- IO.sleep(100.millis) + _ <- IO.sleep(500.millis) // Send same logs with updated filter _ <- logInputChannel.send(infoLog) + _ <- IO.sleep(100.millis) _ <- logInputChannel.send(errorLog) - _ <- IO.sleep(200.millis) + _ <- IO.sleep(500.millis) // Collect results after filter update updatedResults = results.toList