Skip to content

Commit 228a16a

Browse files
committed
Add table Lambda flow log
1 parent fa5df5b commit 228a16a

File tree

4 files changed

+188
-0
lines changed

4 files changed

+188
-0
lines changed

aws/plugin.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/turbot/tailpipe-plugin-aws/tables/cost_and_usage_report"
1313
"github.com/turbot/tailpipe-plugin-aws/tables/cost_optimization_recommendation"
1414
"github.com/turbot/tailpipe-plugin-aws/tables/guardduty_finding"
15+
"github.com/turbot/tailpipe-plugin-aws/tables/lambda_log"
1516
"github.com/turbot/tailpipe-plugin-aws/tables/nlb_access_log"
1617
"github.com/turbot/tailpipe-plugin-aws/tables/s3_server_access_log"
1718
"github.com/turbot/tailpipe-plugin-aws/tables/vpc_flow_log"
@@ -40,6 +41,7 @@ func init() {
4041
table.RegisterTable[*s3_server_access_log.S3ServerAccessLog, *s3_server_access_log.S3ServerAccessLogTable]()
4142
table.RegisterTable[*vpc_flow_log.VpcFlowLog, *vpc_flow_log.VpcFlowLogTable]()
4243
table.RegisterTable[*waf_traffic_log.WafTrafficLog, *waf_traffic_log.WafTrafficLogTable]()
44+
table.RegisterTable[*lambda_log.LambdaLog, *lambda_log.LambdaLogTable]()
4345

4446
// register sources
4547
row_source.RegisterRowSource[*s3_bucket.AwsS3BucketSource]()

tables/lambda_log/lambda_log.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package lambda_log
2+
3+
import (
4+
"time"
5+
6+
"github.com/turbot/tailpipe-plugin-sdk/schema"
7+
)
8+
9+
type LambdaLog struct {
10+
schema.CommonFields
11+
12+
Timestamp *time.Time `json:"timestamp,omitempty"`
13+
RequestID *string `json:"request_id,omitempty"`
14+
LogType *string `json:"log_type,omitempty"`
15+
LogLevel *string `json:"log_level,omitempty"`
16+
Message *string `json:"message,omitempty"`
17+
18+
// Report Specific Fields
19+
Duration *float64 `json:"duration,omitempty"`
20+
BilledDuration *float64 `json:"billed_duration,omitempty"`
21+
MemorySize *int `json:"memory_size,omitempty"`
22+
MaxMemoryUsed *int `json:"max_memory_used,omitempty"`
23+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package lambda_log
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"strconv"
8+
"strings"
9+
"time"
10+
11+
// "github.com/turbot/tailpipe-plugin-aws/rows"
12+
// "github.com/turbot/tailpipe-plugin-sdk/table"
13+
"github.com/turbot/tailpipe-plugin-sdk/mappers"
14+
)
15+
16+
type LambdaLogMapper struct {
17+
}
18+
19+
func (m *LambdaLogMapper) Identifier() string {
20+
return "lambda_log_mapper"
21+
}
22+
23+
func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*LambdaLog]) (*LambdaLog, error) {
24+
row := &LambdaLog{}
25+
26+
rawRow := ""
27+
28+
switch v := a.(type) {
29+
case []byte:
30+
rawRow = string(v)
31+
case string:
32+
rawRow = v
33+
case *string:
34+
rawRow = *v
35+
default:
36+
return nil, fmt.Errorf("expected string, got %T", a)
37+
}
38+
39+
slog.Error("rawRow ---->>>", rawRow)
40+
41+
rawRow = strings.TrimSuffix(rawRow, "\n")
42+
fields := strings.Fields(rawRow)
43+
44+
switch fields[0] {
45+
case "START", "END":
46+
row.LogType = &fields[0]
47+
row.RequestID = &fields[2]
48+
case "REPORT":
49+
row.LogType = &fields[0]
50+
row.RequestID = &fields[2]
51+
duration, err := strconv.ParseFloat(fields[4], 64)
52+
if err != nil {
53+
return nil, fmt.Errorf("error parsing duration: %w", err)
54+
}
55+
row.Duration = &duration
56+
billed, err := strconv.ParseFloat(fields[8], 64)
57+
if err != nil {
58+
return nil, fmt.Errorf("error parsing billed duration: %w", err)
59+
}
60+
row.BilledDuration = &billed
61+
mem, err := strconv.Atoi(fields[12])
62+
if err != nil {
63+
return nil, fmt.Errorf("error parsing memory size: %w", err)
64+
}
65+
row.MemorySize = &mem
66+
maxMem, err := strconv.Atoi(fields[17])
67+
if err != nil {
68+
return nil, fmt.Errorf("error parsing max memory used: %w", err)
69+
}
70+
row.MaxMemoryUsed = &maxMem
71+
default:
72+
t := "LOG"
73+
row.LogType = &t
74+
75+
ts, err := time.Parse(time.RFC3339, fields[0])
76+
if err != nil {
77+
return nil, fmt.Errorf("error parsing timestamp: %w", err)
78+
}
79+
row.Timestamp = &ts
80+
81+
row.RequestID = &fields[1]
82+
row.LogLevel = &fields[2]
83+
strip := fmt.Sprintf("%s%s", strings.Join(fields[:3], "\t"), "\t")
84+
stripped := strings.TrimPrefix(rawRow, strip)
85+
row.Message = &stripped
86+
}
87+
88+
return row, nil
89+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package lambda_log
2+
3+
import (
4+
"time"
5+
6+
"github.com/rs/xid"
7+
"github.com/turbot/pipe-fittings/v2/utils"
8+
9+
"github.com/turbot/tailpipe-plugin-aws/sources/cloudwatch_log_group"
10+
"github.com/turbot/tailpipe-plugin-aws/sources/s3_bucket"
11+
"github.com/turbot/tailpipe-plugin-sdk/artifact_source"
12+
"github.com/turbot/tailpipe-plugin-sdk/artifact_source_config"
13+
"github.com/turbot/tailpipe-plugin-sdk/constants"
14+
"github.com/turbot/tailpipe-plugin-sdk/row_source"
15+
"github.com/turbot/tailpipe-plugin-sdk/schema"
16+
"github.com/turbot/tailpipe-plugin-sdk/table"
17+
)
18+
19+
const LambdaLogTableIdentifier = "aws_lambda_log"
20+
21+
type LambdaLogTable struct{}
22+
23+
func (c *LambdaLogTable) Identifier() string {
24+
return LambdaLogTableIdentifier
25+
}
26+
27+
func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog], error) {
28+
defaultS3ArtifactConfig := &artifact_source_config.ArtifactSourceConfigImpl{
29+
FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/lambda/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{DATA}.log"),
30+
}
31+
32+
return []*table.SourceMetadata[*LambdaLog]{
33+
{
34+
// S3 artifact source
35+
SourceName: s3_bucket.AwsS3BucketSourceIdentifier,
36+
Mapper: &LambdaLogMapper{},
37+
Options: []row_source.RowSourceOption{
38+
artifact_source.WithDefaultArtifactSourceConfig(defaultS3ArtifactConfig),
39+
artifact_source.WithRowPerLine(),
40+
},
41+
},
42+
{
43+
// S3 artifact source
44+
SourceName: cloudwatch_log_group.AwsCloudwatchLogGroupSourceIdentifier,
45+
Mapper: &LambdaLogMapper{},
46+
},
47+
{
48+
// any artifact source
49+
SourceName: constants.ArtifactSourceIdentifier,
50+
Mapper: &LambdaLogMapper{},
51+
Options: []row_source.RowSourceOption{
52+
artifact_source.WithRowPerLine(),
53+
},
54+
},
55+
}, nil
56+
}
57+
58+
func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema.SourceEnrichment) (*LambdaLog, error) {
59+
row.CommonFields = sourceEnrichmentFields.CommonFields
60+
61+
// Record standardization
62+
row.TpID = xid.New().String()
63+
row.TpIngestTimestamp = time.Now()
64+
if !row.TpTimestamp.IsZero() {
65+
row.TpTimestamp = *row.Timestamp
66+
row.TpDate = row.Timestamp.Truncate(24 * time.Hour)
67+
}
68+
69+
row.TpIndex = schema.DefaultIndex
70+
71+
// TODO: Add enrichment fields
72+
73+
return row, nil
74+
}

0 commit comments

Comments
 (0)