diff --git a/sources/cloudwatch_log_group/cloudwatch_log_group_source.go b/sources/cloudwatch_log_group/cloudwatch_log_group_source.go index 03a5e53..11c529b 100644 --- a/sources/cloudwatch_log_group/cloudwatch_log_group_source.go +++ b/sources/cloudwatch_log_group/cloudwatch_log_group_source.go @@ -177,84 +177,91 @@ func (s *AwsCloudWatchLogGroupSource) Collect(ctx context.Context) error { EndTime: aws.Int64(endTimeMillis), } - events, err := s.filterLogEvents(ctx, input) + err := s.filterLogEvents(ctx, input) if err != nil { s.errorList = append(s.errorList, fmt.Errorf("failed to filter log events for stream %s: %w", batch, err)) continue } - events = sortFilteredLogEvents(events) + } - // Process each event in the page - for _, event := range events { - if event.Message == nil || *event.Message == "" { - s.errorList = append(s.errorList, fmt.Errorf("empty message in stream %s at timestamp %d", *event.LogStreamName, *event.Timestamp)) - continue - } + return nil +} - slog.Info("Processing stream", "stream", *event.LogStreamName) - // Set up source enrichment fields for the current stream - sourceEnrichmentFields := &schema.SourceEnrichment{ - CommonFields: schema.CommonFields{ - TpSourceType: AwsCloudwatchLogGroupSourceIdentifier, - TpSourceName: &s.Config.LogGroupName, - TpSourceLocation: event.LogStreamName, - }, - } +// We should immediately process the events in the batch, rather than waiting for all batches to be processed +// as this will allow us to start processing the events as soon as they are available +// and avoid waiting for all batches to be processed +func (s *AwsCloudWatchLogGroupSource) processLogEvents(ctx context.Context, events []cwTypes.FilteredLogEvent) error { + for _, event := range events { + if event.Message == nil || *event.Message == "" { + s.errorList = append(s.errorList, fmt.Errorf("empty message in stream %s at timestamp %d", *event.LogStreamName, *event.Timestamp)) + continue + } - timestamp := time.UnixMilli(*event.Timestamp) - // Skip already collected events based on state - if !s.CollectionState.ShouldCollect(*event.LogStreamName, timestamp) { - slog.Debug("Skipping already collected event", - "stream", *event.LogStreamName, - "timestamp", timestamp.Format(time.RFC3339)) - continue - } + slog.Info("Processing stream", "stream", *event.LogStreamName) + // Set up source enrichment fields for the current stream + sourceEnrichmentFields := &schema.SourceEnrichment{ + CommonFields: schema.CommonFields{ + TpSourceType: AwsCloudwatchLogGroupSourceIdentifier, + TpSourceName: &s.Config.LogGroupName, + TpSourceLocation: event.LogStreamName, + }, + } - row := &types.RowData{ - Data: event, - SourceEnrichment: sourceEnrichmentFields, - } + timestamp := time.UnixMilli(*event.Timestamp) + // Skip already collected events based on state + if !s.CollectionState.ShouldCollect(*event.LogStreamName, timestamp) { + slog.Debug("Skipping already collected event", + "stream", *event.LogStreamName, + "timestamp", timestamp.Format(time.RFC3339)) + continue + } - // Update collection state with the processed event - if err := s.CollectionState.OnCollected(*event.LogStreamName, timestamp); err != nil { - s.errorList = append(s.errorList, fmt.Errorf("failed to update collection state for stream %s: %w", *event.LogStreamName, err)) - continue - } + row := &types.RowData{ + Data: event, + SourceEnrichment: sourceEnrichmentFields, + } - // Send the row for processing - if err := s.OnRow(ctx, row); err != nil { - s.errorList = append(s.errorList, fmt.Errorf("error processing row in stream %s: %w", *event.LogStreamName, err)) - continue - } + // Update collection state with the processed event + if err := s.CollectionState.OnCollected(*event.LogStreamName, timestamp); err != nil { + s.errorList = append(s.errorList, fmt.Errorf("failed to update collection state for stream %s: %w", *event.LogStreamName, err)) + continue } - } + // Send the row for processing + if err := s.OnRow(ctx, row); err != nil { + s.errorList = append(s.errorList, fmt.Errorf("error processing row in stream %s: %w", *event.LogStreamName, err)) + continue + } + } // Return collected errors if any if len(s.errorList) > 0 { return fmt.Errorf("encountered %d errors during log collection: %v", len(s.errorList), s.errorList) } - return nil } // filterLogEvents retrieves all log events for the given input, handling pagination. // Returns a slice of FilteredLogEvent and any error encountered. -func (s *AwsCloudWatchLogGroupSource) filterLogEvents(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput) ([]cwTypes.FilteredLogEvent, error) { - - allEvents := []cwTypes.FilteredLogEvent{} +func (s *AwsCloudWatchLogGroupSource) filterLogEvents(ctx context.Context, input *cloudwatchlogs.FilterLogEventsInput) (error) { paginator := cloudwatchlogs.NewFilterLogEventsPaginator(s.client, input) for paginator.HasMorePages() { output, err := paginator.NextPage(ctx) if err != nil { - return nil, err + return err } - allEvents = append(allEvents, output.Events...) + // process the events per page once they are pulled from the API + events := sortFilteredLogEvents(output.Events) + processEventErr := s.processLogEvents(ctx, events) + if processEventErr != nil { + slog.Error("error processing events", "error", processEventErr) + return processEventErr + } } - return allEvents, nil + return nil } // sortFilteredLogEvents sorts a slice of FilteredLogEvent by LogStreamName and Timestamp.