Skip to content

Commit 954f074

Browse files
committed
Added support to collect the timestamp from cloudwatch source and enrich the row if row does not have the timestamp
1 parent 228a16a commit 954f074

File tree

2 files changed

+292
-48
lines changed

2 files changed

+292
-48
lines changed

tables/lambda_log/lambda_log_mapper.go

Lines changed: 278 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package lambda_log
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
6-
"log/slog"
7+
"regexp"
8+
"slices"
79
"strconv"
810
"strings"
911
"time"
1012

11-
// "github.com/turbot/tailpipe-plugin-aws/rows"
12-
// "github.com/turbot/tailpipe-plugin-sdk/table"
1313
"github.com/turbot/tailpipe-plugin-sdk/mappers"
1414
)
1515

@@ -20,70 +20,301 @@ func (m *LambdaLogMapper) Identifier() string {
2020
return "lambda_log_mapper"
2121
}
2222

23+
// JSON format for system logs as described in AWS docs
24+
// Contains "time", "type", and "record" fields
25+
// System logs are sometimes known as platform event logs
26+
// https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs-advanced.html#monitoring-cloudwatchlogs-logformat
27+
type jsonFormatSystemLog struct {
28+
Time string `json:"time"`
29+
Type string `json:"type"`
30+
Record map[string]interface{} `json:"record"`
31+
}
32+
33+
// JSON format for application logs as described in AWS docs
34+
// Contains "timestamp", "level", "message", and "requestId" fields
35+
// Generated by Lambda functions using supported logging methods
36+
// https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs-advanced.html#monitoring-cloudwatchlogs-logformat
37+
type jsonFormatApplicationLog struct {
38+
Timestamp string `json:"timestamp"`
39+
Level string `json:"level"`
40+
Message string `json:"message"`
41+
RequestID string `json:"requestId"`
42+
}
43+
2344
func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*LambdaLog]) (*LambdaLog, error) {
2445
row := &LambdaLog{}
2546

26-
rawRow := ""
27-
47+
var raw string
2848
switch v := a.(type) {
2949
case []byte:
30-
rawRow = string(v)
50+
raw = string(v)
3151
case string:
32-
rawRow = v
52+
raw = v
3353
case *string:
34-
rawRow = *v
54+
raw = *v
3555
default:
36-
return nil, fmt.Errorf("expected string, got %T", a)
56+
return nil, fmt.Errorf("expected string or []byte, got %T", a)
3757
}
3858

39-
slog.Error("rawRow ---->>>", rawRow)
59+
// First unmarshal into a minimal structure to detect log type
60+
var probe map[string]json.RawMessage
61+
if err := json.Unmarshal([]byte(raw), &probe); err == nil {
62+
// Check for system log keys (platform events with time, type, record structure)
63+
if _, hasType := probe["type"]; hasType {
64+
var systemLog jsonFormatSystemLog
65+
if err := json.Unmarshal([]byte(raw), &systemLog); err != nil {
66+
return nil, fmt.Errorf("error unmarshalling as system log: %w", err)
67+
}
4068

41-
rawRow = strings.TrimSuffix(rawRow, "\n")
42-
fields := strings.Fields(rawRow)
69+
// Parse system log fields based on AWS JSON format for system logs
70+
if t, err := time.Parse(time.RFC3339, systemLog.Time); err == nil {
71+
row.Timestamp = &t
72+
}
73+
row.LogType = &systemLog.Type
74+
if msgBytes, err := json.Marshal(systemLog.Record); err == nil {
75+
message := string(msgBytes)
76+
row.Message = &message
77+
} else {
78+
// fallback in case of marshal error
79+
message := fmt.Sprintf("%v", systemLog.Record)
80+
row.Message = &message
81+
}
4382

44-
switch fields[0] {
45-
case "START", "END":
46-
row.LogType = &fields[0]
47-
row.RequestID = &fields[2]
48-
case "REPORT":
49-
row.LogType = &fields[0]
50-
row.RequestID = &fields[2]
51-
duration, err := strconv.ParseFloat(fields[4], 64)
52-
if err != nil {
53-
return nil, fmt.Errorf("error parsing duration: %w", err)
83+
// Extract specific fields from platform event record
84+
if requestId, ok := systemLog.Record["requestId"].(string); ok {
85+
row.RequestID = &requestId
86+
}
87+
// Extract metrics from platform.report events
88+
if metrics, ok := systemLog.Record["metrics"].(map[string]interface{}); ok {
89+
if v, ok := metrics["durationMs"].(float64); ok {
90+
row.Duration = &v
91+
}
92+
if v, ok := metrics["billedDurationMs"].(float64); ok {
93+
row.BilledDuration = &v
94+
}
95+
if v, ok := metrics["memorySizeMB"].(float64); ok {
96+
mem := int(v)
97+
row.MemorySize = &mem
98+
}
99+
if v, ok := metrics["maxMemoryUsedMB"].(float64); ok {
100+
mem := int(v)
101+
row.MaxMemoryUsed = &mem
102+
}
103+
}
104+
105+
return row, nil
106+
} else if _, hasLevel := probe["level"]; hasLevel {
107+
// Fallback to application log (JSON format with timestamp, level, message, requestId)
108+
var appLog jsonFormatApplicationLog
109+
if err := json.Unmarshal([]byte(raw), &appLog); err != nil {
110+
return nil, fmt.Errorf("error unmarshalling as application log: %w", err)
111+
}
112+
113+
// Parse application log fields based on AWS JSON format for app logs
114+
if t, err := time.Parse(time.RFC3339, appLog.Timestamp); err == nil {
115+
row.Timestamp = &t
116+
}
117+
row.LogLevel = &appLog.Level
118+
row.Message = &appLog.Message
119+
row.RequestID = &appLog.RequestID
54120
}
55-
row.Duration = &duration
56-
billed, err := strconv.ParseFloat(fields[8], 64)
57-
if err != nil {
58-
return nil, fmt.Errorf("error parsing billed duration: %w", err)
121+
} else if len(strings.Fields(raw)) >= 4 && isTimestamp(strings.Fields(raw)[0]) { // plain text application log
122+
// Handle plain text application logs (format: timestamp requestID logLevel message)
123+
// Example: 2024-10-27T19:17:45.586Z 79b4f56e-95b1-4643-9700-2807f4e68189 INFO some log message
124+
fields := strings.Fields(raw)
125+
// Timestamp
126+
if t, err := time.Parse(time.RFC3339, fields[0]); err == nil {
127+
row.Timestamp = &t
59128
}
60-
row.BilledDuration = &billed
61-
mem, err := strconv.Atoi(fields[12])
62-
if err != nil {
63-
return nil, fmt.Errorf("error parsing memory size: %w", err)
129+
// RequestID
130+
var uuidRegex = regexp.MustCompile(`^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$`)
131+
if uuidRegex.MatchString(fields[1]) {
132+
row.RequestID = &fields[1]
64133
}
65-
row.MemorySize = &mem
66-
maxMem, err := strconv.Atoi(fields[17])
134+
// LogLevel
135+
if slices.Contains([]string{"INFO", "DEBUG", "WARN", "ERROR", "FATAL", "TRACE", ""}, fields[2]) {
136+
row.LogLevel = &fields[2]
137+
}
138+
// Message
139+
if len(fields) >= 4 {
140+
msg := strings.Join(fields[3:], " ")
141+
row.Message = &msg
142+
}
143+
} else {
144+
// Handle legacy plain text system logs (START, END, REPORT format)
145+
row, err := parseLambdaPainTextLog(raw)
67146
if err != nil {
68-
return nil, fmt.Errorf("error parsing max memory used: %w", err)
147+
return nil, fmt.Errorf("error parsing lambda pain text log: %w", err)
69148
}
70-
row.MaxMemoryUsed = &maxMem
71-
default:
72-
t := "LOG"
73-
row.LogType = &t
149+
row.Message = &raw
150+
}
74151

75-
ts, err := time.Parse(time.RFC3339, fields[0])
76-
if err != nil {
77-
return nil, fmt.Errorf("error parsing timestamp: %w", err)
152+
return row, nil
153+
}
154+
155+
// parseLambdaPainTextLog handles the legacy plain text format for Lambda system logs
156+
// These include START, END, and REPORT messages that aren't in JSON format
157+
// Example:
158+
// START RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81
159+
// END RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81
160+
// REPORT RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 128 MB Max Memory Used: 84 MB
161+
func parseLambdaPainTextLog(line string) (*LambdaLog, error) {
162+
log := &LambdaLog{}
163+
now := time.Now().UTC()
164+
log.Timestamp = &now
165+
166+
switch {
167+
case strings.HasPrefix(line, "START RequestId:"):
168+
// Parse START log line
169+
log.LogType = ptr("START")
170+
if id := extractAfter(line, "START RequestId: "); id != "" {
171+
log.RequestID = ptr(strings.Fields(id)[0])
78172
}
79-
row.Timestamp = &ts
173+
log.Message = ptr(line)
80174

81-
row.RequestID = &fields[1]
82-
row.LogLevel = &fields[2]
83-
strip := fmt.Sprintf("%s%s", strings.Join(fields[:3], "\t"), "\t")
84-
stripped := strings.TrimPrefix(rawRow, strip)
85-
row.Message = &stripped
175+
case strings.HasPrefix(line, "END RequestId:"):
176+
// Parse END log line
177+
log.LogType = ptr("END")
178+
if id := extractAfter(line, "END RequestId: "); id != "" {
179+
log.RequestID = ptr(strings.Fields(id)[0])
180+
}
181+
log.Message = ptr(line)
182+
183+
case strings.HasPrefix(line, "REPORT RequestId:"):
184+
// Parse REPORT log line which contains metrics
185+
log.LogType = ptr("REPORT")
186+
log.Message = ptr(line)
187+
188+
// Extract RequestId
189+
if id := extractAfter(line, "REPORT RequestId: "); id != "" {
190+
log.RequestID = ptr(strings.Fields(id)[0])
191+
}
192+
193+
// Extract numeric metrics from REPORT line
194+
if val := extractBetween(line, "Duration: ", " ms"); val != "" {
195+
if f, err := strconv.ParseFloat(val, 64); err == nil {
196+
log.Duration = &f
197+
}
198+
}
199+
if val := extractBetween(line, "Billed Duration: ", " ms"); val != "" {
200+
if f, err := strconv.ParseFloat(val, 64); err == nil {
201+
log.BilledDuration = &f
202+
}
203+
}
204+
if val := extractBetween(line, "Memory Size: ", " MB"); val != "" {
205+
if i, err := strconv.Atoi(val); err == nil {
206+
log.MemorySize = &i
207+
}
208+
}
209+
if val := extractBetween(line, "Max Memory Used: ", " MB"); val != "" {
210+
if i, err := strconv.Atoi(val); err == nil {
211+
log.MaxMemoryUsed = &i
212+
}
213+
}
86214
}
87215

88-
return row, nil
216+
return log, nil
217+
}
218+
219+
// ptr is a helper function to return a pointer to a value
220+
func ptr[T any](v T) *T {
221+
return &v
222+
}
223+
224+
// extractAfter extracts substring that comes after a specific prefix
225+
func extractAfter(s, prefix string) string {
226+
idx := strings.Index(s, prefix)
227+
if idx == -1 {
228+
return ""
229+
}
230+
return strings.TrimSpace(s[idx+len(prefix):])
231+
}
232+
233+
// extractBetween extracts substring between start and end strings
234+
func extractBetween(s, start, end string) string {
235+
i := strings.Index(s, start)
236+
if i == -1 {
237+
return ""
238+
}
239+
i += len(start)
240+
j := strings.Index(s[i:], end)
241+
if j == -1 {
242+
return ""
243+
}
244+
return strings.TrimSpace(s[i : i+j])
245+
}
246+
247+
// isTimestamp checks if the input string matches any common Go time formats.
248+
// Used to identify if a plain text log starts with a timestamp
249+
func isTimestamp(s string) bool {
250+
layouts := []string{
251+
time.RFC3339,
252+
time.RFC3339Nano,
253+
time.RFC1123,
254+
time.RFC1123Z,
255+
time.RFC822,
256+
time.RFC822Z,
257+
time.RFC850,
258+
"2006-01-02 15:04:05", // Common log format
259+
"2006-01-02 15:04:05.000000", // With microseconds
260+
"2006-01-02T15:04:05", // ISO-like without timezone
261+
"2006-01-02T15:04:05Z07:00", // ISO with TZ offset
262+
"20060102T150405Z", // AWS style compact
263+
}
264+
265+
for _, layout := range layouts {
266+
if _, err := time.Parse(layout, s); err == nil {
267+
return true
268+
}
269+
}
270+
271+
return false
89272
}
273+
274+
// Commented out legacy code
275+
// rawRow = strings.TrimSuffix(rawRow, "\n")
276+
// fields := strings.Fields(rawRow)
277+
278+
// switch fields[0] {
279+
// case "START", "END":
280+
// row.LogType = &fields[0]
281+
// row.RequestID = &fields[2]
282+
// case "REPORT":
283+
// row.LogType = &fields[0]
284+
// row.RequestID = &fields[2]
285+
// duration, err := strconv.ParseFloat(fields[4], 64)
286+
// if err != nil {
287+
// return nil, fmt.Errorf("error parsing duration: %w", err)
288+
// }
289+
// row.Duration = &duration
290+
// billed, err := strconv.ParseFloat(fields[8], 64)
291+
// if err != nil {
292+
// return nil, fmt.Errorf("error parsing billed duration: %w", err)
293+
// }
294+
// row.BilledDuration = &billed
295+
// mem, err := strconv.Atoi(fields[12])
296+
// if err != nil {
297+
// return nil, fmt.Errorf("error parsing memory size: %w", err)
298+
// }
299+
// row.MemorySize = &mem
300+
// maxMem, err := strconv.Atoi(fields[17])
301+
// if err != nil {
302+
// return nil, fmt.Errorf("error parsing max memory used: %w", err)
303+
// }
304+
// row.MaxMemoryUsed = &maxMem
305+
// default:
306+
// t := "LOG"
307+
// row.LogType = &t
308+
309+
// ts, err := time.Parse(time.RFC3339, fields[0])
310+
// if err != nil {
311+
// return nil, fmt.Errorf("error parsing timestamp: %w", err)
312+
// }
313+
// row.Timestamp = &ts
314+
315+
// row.RequestID = &fields[1]
316+
// row.LogLevel = &fields[2]
317+
// strip := fmt.Sprintf("%s%s", strings.Join(fields[:3], "\t"), "\t")
318+
// stripped := strings.TrimPrefix(rawRow, strip)
319+
// row.Message = &stripped
320+
// }

tables/lambda_log/lambda_log_table.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package lambda_log
22

33
import (
4+
"regexp"
45
"time"
56

67
"github.com/rs/xid"
@@ -61,13 +62,25 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema
6162
// Record standardization
6263
row.TpID = xid.New().String()
6364
row.TpIngestTimestamp = time.Now()
64-
if !row.TpTimestamp.IsZero() {
65+
if row.Timestamp != nil {
6566
row.TpTimestamp = *row.Timestamp
6667
row.TpDate = row.Timestamp.Truncate(24 * time.Hour)
68+
} else if !row.TpTimestamp.IsZero() {
69+
row.TpDate = row.TpTimestamp.Truncate(24 * time.Hour)
6770
}
6871

6972
row.TpIndex = schema.DefaultIndex
7073

74+
var arnRegex = regexp.MustCompile(`arn:aws:[^,\s'"\\]+`)
75+
76+
seen := map[string]struct{}{}
77+
for _, match := range arnRegex.FindAllString(*row.Message, -1) {
78+
if _, exists := seen[match]; !exists {
79+
seen[match] = struct{}{}
80+
row.TpAkas = append(row.TpAkas, match)
81+
}
82+
}
83+
7184
// TODO: Add enrichment fields
7285

7386
return row, nil

0 commit comments

Comments
 (0)