-
Notifications
You must be signed in to change notification settings - Fork 21
Introduce support for Loki logentry Stages for event processing #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jeschkies
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. I have a few suggestions.
| } | ||
| entry = processedEntry | ||
| case <-time.After(timeout): | ||
| return stages.Entry{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you log the timeout? Do we have to cancel anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I was debating whether, if the processing does timeout, we should simply return the unprocessed entry rather than an dropping the result - thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should error instead. Otherwise users will get unexpected log lines.
pkg/promtail.go
Outdated
| stream.Entries = append(stream.Entries, e.entry) | ||
| // Apply pipeline stages to entry | ||
| stageEntry := stages.Entry{ | ||
| Extracted: map[string]interface{}{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this used? I'm wondering if we could avoud allocations here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I included this based on this code - from the comment above, it seems some stages expect the Extracted labels to already contain any existing labels. I agree it's not ideal, but not something I think we can necessarily fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a bit of reflection, I've updated this code to skip this entire step if there are no processing stages specified.
jeschkies
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I have only a nit. Also, we should follo up with some documentation.
pkg/main.go
Outdated
| batchSize, _ = strconv.Atoi(batch) | ||
| } | ||
|
|
||
| pipelineTimeout = 1 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make a constant for the default at the top?
8a9e387 to
1cf0ea0
Compare
Lambda-Promtail currently offers the ability to manipulate labels using Prometheus's relabelling configuration, but does not yet support Promtail/Loki's wider pipeline stages.
This PR introduces support for them into Lambda-Promtail. It provides support for all the stages defined in the logentry package (Apache 2 licensed), using a JSON configuration format (rather than YAML that pipeline requires, given the existing use of JSON configuration).
The implementation adds a new
LokiStagestype that stores all the relevant stages defined in theLOKI_STAGE_CONFIGSenvironment variable. It updates the batch processing to process each entry with the configured stages and adds the processed entries to the batch that is sent.The current Stages implementation is predominantly designed for asynchronous usage (via channels and Run), but Lambda-Promtail uses synchronous functions. If a synchronous
Processfunction is provided for a stage, it will be used, otherwise we use short-lived channels to achieve the equivalent. Where asynchronous calls are used, aPIPELINE_TIMEOUTconfigures the maximum duration (currently 1s by default) before producing an empty entry.Closes #7