-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcallback.go
More file actions
122 lines (103 loc) · 3.26 KB
/
callback.go
File metadata and controls
122 lines (103 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
func compressAndUpload(cfg *Config, pcapPath string) error {
pcapPath = filepath.Clean(pcapPath)
log.Printf("Processing %s", pcapPath)
originalInfo, err := os.Stat(pcapPath)
if err != nil {
return fmt.Errorf("pcap file not found: %w", err)
}
// Compress with bzip2 (replaces original file with .bz2)
log.Printf("Compressing %s", pcapPath)
bzipCmd := exec.Command("bzip2", pcapPath)
if out, err := bzipCmd.CombinedOutput(); err != nil {
return fmt.Errorf("bzip2 failed: %s: %w", string(out), err)
}
compressedPath := pcapPath + ".bz2"
compressedInfo, err := os.Stat(compressedPath)
if err != nil {
return fmt.Errorf("stat compressed file: %w", err)
}
compressionStats := formatCompressionStats(
filepath.Base(pcapPath),
filepath.Base(compressedPath),
originalInfo.Size(),
compressedInfo.Size(),
)
log.Printf("Compression complete: %s", compressionStats)
// Upload to S3 with year/month/day folder structure
now := time.Now().UTC()
key := fmt.Sprintf("%s%d/%02d/%02d/%s",
cfg.S3.Prefix, now.Year(), now.Month(), now.Day(),
filepath.Base(compressedPath))
log.Printf("Uploading to s3://%s/%s", cfg.S3.Bucket, key)
if err := uploadToS3(cfg, compressedPath, key); err != nil {
return fmt.Errorf("upload failed (file kept at %s): %w", compressedPath, err)
}
log.Printf("Upload complete: %s", filepath.Base(compressedPath))
if cfg.DeleteAfterUpload {
if err := os.Remove(compressedPath); err != nil {
log.Printf("WARNING: failed to delete %s: %v", compressedPath, err)
}
}
return nil
}
func formatCompressionStats(originalName, compressedName string, originalSize, compressedSize int64) string {
ratioText := "ratio unavailable"
if originalSize == 0 {
ratioText = "ratio unavailable: empty input file"
} else if compressedSize == 0 {
ratioText = "ratio unavailable: empty compressed file"
} else {
ratioText = fmt.Sprintf("ratio %.2f:1", float64(originalSize)/float64(compressedSize))
}
return fmt.Sprintf("%s -> %s (%d bytes -> %d bytes, %s)", originalName, compressedName, originalSize, compressedSize, ratioText)
}
func uploadToS3(cfg *Config, filePath, key string) error {
ctx := context.Background()
sdkCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(cfg.S3.Region),
awsconfig.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(
cfg.S3.AccessKeyID,
cfg.S3.SecretAccessKey,
"",
),
),
)
if err != nil {
return fmt.Errorf("load AWS config: %w", err)
}
client := s3.NewFromConfig(sdkCfg, func(o *s3.Options) {
o.BaseEndpoint = aws.String("https://" + cfg.S3.Endpoint)
o.UsePathStyle = true
})
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("open file: %w", err)
}
defer file.Close()
tm := transfermanager.New(client)
_, err = tm.UploadObject(ctx, &transfermanager.UploadObjectInput{
Bucket: aws.String(cfg.S3.Bucket),
Key: aws.String(key),
Body: file,
})
if err != nil {
return fmt.Errorf("s3 upload: %w", err)
}
return nil
}