Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 54 additions & 47 deletions sources/cloudwatch_log_group/cloudwatch_log_group_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,84 +177,91 @@ func (s *AwsCloudWatchLogGroupSource) Collect(ctx context.Context) error {
EndTime: aws.Int64(endTimeMillis),
}

events, err := s.filterLogEvents(ctx, input)
err := s.filterLogEvents(ctx, input)
if err != nil {
s.errorList = append(s.errorList, fmt.Errorf("failed to filter log events for stream %s: %w", batch, err))
continue
}

events = sortFilteredLogEvents(events)
}

// Process each event in the page
for _, event := range events {
if event.Message == nil || *event.Message == "" {
s.errorList = append(s.errorList, fmt.Errorf("empty message in stream %s at timestamp %d", *event.LogStreamName, *event.Timestamp))
continue
}
return nil
}

slog.Info("Processing stream", "stream", *event.LogStreamName)
// Set up source enrichment fields for the current stream
sourceEnrichmentFields := &schema.SourceEnrichment{
CommonFields: schema.CommonFields{
TpSourceType: AwsCloudwatchLogGroupSourceIdentifier,
TpSourceName: &s.Config.LogGroupName,
TpSourceLocation: event.LogStreamName,
},
}
// We should immediately process the events in the batch, rather than waiting for all batches to be processed
// as this will allow us to start processing the events as soon as they are available
// and avoid waiting for all batches to be processed
func (s *AwsCloudWatchLogGroupSource) processLogEvents(ctx context.Context, events []cwTypes.FilteredLogEvent) error {
for _, event := range events {
if event.Message == nil || *event.Message == "" {
s.errorList = append(s.errorList, fmt.Errorf("empty message in stream %s at timestamp %d", *event.LogStreamName, *event.Timestamp))
continue
}

timestamp := time.UnixMilli(*event.Timestamp)
// Skip already collected events based on state
if !s.CollectionState.ShouldCollect(*event.LogStreamName, timestamp) {
slog.Debug("Skipping already collected event",
"stream", *event.LogStreamName,
"timestamp", timestamp.Format(time.RFC3339))
continue
}
slog.Info("Processing stream", "stream", *event.LogStreamName)
// Set up source enrichment fields for the current stream
sourceEnrichmentFields := &schema.SourceEnrichment{
CommonFields: schema.CommonFields{
TpSourceType: AwsCloudwatchLogGroupSourceIdentifier,
TpSourceName: &s.Config.LogGroupName,
TpSourceLocation: event.LogStreamName,
},
}

row := &types.RowData{
Data: event,
SourceEnrichment: sourceEnrichmentFields,
}
timestamp := time.UnixMilli(*event.Timestamp)
// Skip already collected events based on state
if !s.CollectionState.ShouldCollect(*event.LogStreamName, timestamp) {
slog.Debug("Skipping already collected event",
"stream", *event.LogStreamName,
"timestamp", timestamp.Format(time.RFC3339))
continue
}

// Update collection state with the processed event
if err := s.CollectionState.OnCollected(*event.LogStreamName, timestamp); err != nil {
s.errorList = append(s.errorList, fmt.Errorf("failed to update collection state for stream %s: %w", *event.LogStreamName, err))
continue
}
row := &types.RowData{
Data: event,
SourceEnrichment: sourceEnrichmentFields,
}

// Send the row for processing
if err := s.OnRow(ctx, row); err != nil {
s.errorList = append(s.errorList, fmt.Errorf("error processing row in stream %s: %w", *event.LogStreamName, err))
continue
}
// Update collection state with the processed event
if err := s.CollectionState.OnCollected(*event.LogStreamName, timestamp); err != nil {
s.errorList = append(s.errorList, fmt.Errorf("failed to update collection state for stream %s: %w", *event.LogStreamName, err))
continue
}
}

// Send the row for processing
if err := s.OnRow(ctx, row); err != nil {
s.errorList = append(s.errorList, fmt.Errorf("error processing row in stream %s: %w", *event.LogStreamName, err))
continue
}
}
// Return collected errors if any
if len(s.errorList) > 0 {
return fmt.Errorf("encountered %d errors during log collection: %v", len(s.errorList), s.errorList)
}

return nil
}

// filterLogEvents retrieves all log events for the given input, handling pagination.
// Returns a slice of FilteredLogEvent and any error encountered.
func (s *AwsCloudWatchLogGroupSource) filterLogEvents(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput) ([]cwTypes.FilteredLogEvent, error) {

allEvents := []cwTypes.FilteredLogEvent{}
func (s *AwsCloudWatchLogGroupSource) filterLogEvents(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput) (error) {

paginator := cloudwatchlogs.NewFilterLogEventsPaginator(s.client, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
return nil, err
return err
}

allEvents = append(allEvents, output.Events...)
// process the events per page once they are pulled from the API
events := sortFilteredLogEvents(output.Events)
processEventErr := s.processLogEvents(ctx, events)
if processEventErr != nil {
slog.Error("error processing events", "error", processEventErr)
return processEventErr
}
}

return allEvents, nil
return nil
}

// sortFilteredLogEvents sorts a slice of FilteredLogEvent by LogStreamName and Timestamp.
Expand Down
Loading