From c6ae7e01b795fa691f58988679e42fb248ee04e2 Mon Sep 17 00:00:00 2001 From: Akhil Repala Date: Sun, 9 Nov 2025 20:30:20 -0600 Subject: [PATCH 1/6] feat(aws): Implemented S3-backed blob storage with key-value interface Signed-off-by: Akhil Repala --- database/plugin/blob/aws/commit_timestamp.go | 35 ++++ database/plugin/blob/aws/database.go | 177 +++++++++++++++++++ database/plugin/blob/aws/logger.go | 46 +++++ database/plugin/blob/aws/metrics.go | 36 ++++ 4 files changed, 294 insertions(+) create mode 100644 database/plugin/blob/aws/commit_timestamp.go create mode 100644 database/plugin/blob/aws/database.go create mode 100644 database/plugin/blob/aws/logger.go create mode 100644 database/plugin/blob/aws/metrics.go diff --git a/database/plugin/blob/aws/commit_timestamp.go b/database/plugin/blob/aws/commit_timestamp.go new file mode 100644 index 00000000..81e763c0 --- /dev/null +++ b/database/plugin/blob/aws/commit_timestamp.go @@ -0,0 +1,35 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "context" + "math/big" +) + +const commitTimestampBlobKey = "metadata_commit_timestamp" + +func (b *BlobStoreS3) GetCommitTimestamp(ctx context.Context) (int64, error) { + data, err := b.Get(ctx, commitTimestampBlobKey) + if err != nil { + return 0, err + } + return new(big.Int).SetBytes(data).Int64(), nil +} + +func (b *BlobStoreS3) SetCommitTimestamp(ctx context.Context, ts int64) error { + raw := new(big.Int).SetInt64(ts).Bytes() + return b.Put(ctx, commitTimestampBlobKey, raw) +} diff --git a/database/plugin/blob/aws/database.go b/database/plugin/blob/aws/database.go new file mode 100644 index 00000000..7364c9f5 --- /dev/null +++ b/database/plugin/blob/aws/database.go @@ -0,0 +1,177 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/blinklabs-io/dingo/database/plugin" + "github.com/prometheus/client_golang/prometheus" +) + +// Register plugin +func init() { + plugin.Register( + plugin.PluginEntry{ + Type: plugin.PluginTypeBlob, + Name: "s3", + }, + ) +} + +// BlobStoreS3 stores data in an AWS S3 bucket +type BlobStoreS3 struct { + promRegistry prometheus.Registerer + startupCtx context.Context + logger *S3Logger + client *s3.Client + bucket string + prefix string + startupCancel context.CancelFunc +} + +// New creates a new S3-backed blob store and dataDir must be "s3://bucket" or "s3://bucket/prefix" +func New( + dataDir string, + logger *slog.Logger, + promRegistry prometheus.Registerer, +) (*BlobStoreS3, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + + if logger == nil { + logger = slog.New(slog.NewJSONHandler(io.Discard, nil)) + } + + const prefix = "s3://" + if !strings.HasPrefix(strings.ToLower(dataDir), prefix) { + cancel() + return nil, errors.New("s3 blob: expected dataDir='s3://[/prefix]'") + } + path := strings.TrimPrefix(dataDir, prefix) + parts := strings.SplitN(path, "/", 2) + if parts[0] == "" { + cancel() + return nil, errors.New("s3 blob: bucket not set") + } + bucket := parts[0] + keyPrefix := "" + if len(parts) == 2 { + keyPrefix = strings.TrimSuffix(parts[1], "/") + if keyPrefix != "" { + keyPrefix += "/" + } + } + + // Loads all the aws credentials. + awsCfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + cancel() + return nil, fmt.Errorf("s3 blob: load default AWS config: %w", err) + } + client := s3.NewFromConfig(awsCfg) + + db := &BlobStoreS3{ + logger: NewS3Logger(logger), + promRegistry: promRegistry, + client: client, + bucket: bucket, + prefix: keyPrefix, + startupCtx: ctx, + startupCancel: cancel, + } + if err := db.init(); err != nil { + return db, err + } + return db, nil +} + +func (d *BlobStoreS3) init() error { + // Configure metrics + if d.promRegistry != nil { + d.registerBlobMetrics() + } + + // Close the startup context so that initialization will succeed. + if d.startupCancel != nil { + d.startupCancel() + d.startupCancel = nil + } + d.startupCtx = context.Background() + return nil +} + +// Returns the S3 client. +func (d *BlobStoreS3) Client() *s3.Client { + return d.client +} + +// Returns the bucket handle. +func (d *BlobStoreS3) Bucket() string { + return d.bucket +} + +// Returns the S3 key with an optional prefix. +func (d *BlobStoreS3) fullKey(key string) string { + return d.prefix + key +} + +func awsString(s string) *string { + return &s +} + +// Get reads the value at key. +func (d *BlobStoreS3) Get(ctx context.Context, key string) ([]byte, error) { + out, err := d.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &d.bucket, + Key: awsString(d.fullKey(key)), + }) + if err != nil { + d.logger.Errorf("s3 get %q failed: %v", key, err) + return nil, err + } + defer out.Body.Close() + + data, err := io.ReadAll(out.Body) + if err != nil { + d.logger.Errorf("s3 read %q failed: %v", key, err) + return nil, err + } + d.logger.Infof("s3 get %q ok (%d bytes)", key, len(data)) + return data, nil +} + +// Put writes a value to key. +func (d *BlobStoreS3) Put(ctx context.Context, key string, value []byte) error { + _, err := d.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &d.bucket, + Key: awsString(d.fullKey(key)), + Body: bytes.NewReader(value), + }) + if err != nil { + d.logger.Errorf("s3 put %q failed: %v", key, err) + return err + } + d.logger.Infof("s3 put %q ok (%d bytes)", key, len(value)) + return nil +} diff --git a/database/plugin/blob/aws/logger.go b/database/plugin/blob/aws/logger.go new file mode 100644 index 00000000..90c6343b --- /dev/null +++ b/database/plugin/blob/aws/logger.go @@ -0,0 +1,46 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "fmt" + "io" + "log/slog" +) + +// S3Logger is a thin wrapper giving our logger a consistent interface. +type S3Logger struct { + logger *slog.Logger +} + +func NewS3Logger(logger *slog.Logger) *S3Logger { + if logger == nil { + logger = slog.New(slog.NewJSONHandler(io.Discard, nil)) + } + return &S3Logger{logger: logger} +} + +func (g *S3Logger) Infof(msg string, args ...any) { + g.logger.Info(fmt.Sprintf(msg, args...), "component", "database") +} +func (g *S3Logger) Warningf(msg string, args ...any) { + g.logger.Warn(fmt.Sprintf(msg, args...), "component", "database") +} +func (g *S3Logger) Debugf(msg string, args ...any) { + g.logger.Debug(fmt.Sprintf(msg, args...), "component", "database") +} +func (g *S3Logger) Errorf(msg string, args ...any) { + g.logger.Error(fmt.Sprintf(msg, args...), "component", "database") +} diff --git a/database/plugin/blob/aws/metrics.go b/database/plugin/blob/aws/metrics.go new file mode 100644 index 00000000..1b44d885 --- /dev/null +++ b/database/plugin/blob/aws/metrics.go @@ -0,0 +1,36 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import "github.com/prometheus/client_golang/prometheus" + +const s3MetricNamePrefix = "database_blob_" + +func (d *BlobStoreS3) registerBlobMetrics() { + opsTotal := prometheus.NewCounter( + prometheus.CounterOpts{ + Name: s3MetricNamePrefix + "ops_total", + Help: "Total number of S3 blob operations", + }, + ) + bytesTotal := prometheus.NewCounter( + prometheus.CounterOpts{ + Name: s3MetricNamePrefix + "bytes_total", + Help: "Total bytes read/written for S3 blob operations", + }, + ) + + d.promRegistry.MustRegister(opsTotal, bytesTotal) +} From 2680d89e29b5ae1910b3d0d7519192a43253b07d Mon Sep 17 00:00:00 2001 From: Akhil Repala Date: Sun, 9 Nov 2025 20:38:02 -0600 Subject: [PATCH 2/6] feat(aws): Fixed the log messages to overcome golangcilint errors Signed-off-by: Akhil Repala --- database/plugin/blob/aws/logger.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/database/plugin/blob/aws/logger.go b/database/plugin/blob/aws/logger.go index 90c6343b..e7f1767a 100644 --- a/database/plugin/blob/aws/logger.go +++ b/database/plugin/blob/aws/logger.go @@ -33,14 +33,26 @@ func NewS3Logger(logger *slog.Logger) *S3Logger { } func (g *S3Logger) Infof(msg string, args ...any) { - g.logger.Info(fmt.Sprintf(msg, args...), "component", "database") + g.logger.Info( + fmt.Sprintf(msg, args...), + "component", "database", + ) } func (g *S3Logger) Warningf(msg string, args ...any) { - g.logger.Warn(fmt.Sprintf(msg, args...), "component", "database") + g.logger.Warn( + fmt.Sprintf(msg, args...), + "component", "database", + ) } func (g *S3Logger) Debugf(msg string, args ...any) { - g.logger.Debug(fmt.Sprintf(msg, args...), "component", "database") + g.logger.Debug( + fmt.Sprintf(msg, args...), + "component", "database", + ) } func (g *S3Logger) Errorf(msg string, args ...any) { - g.logger.Error(fmt.Sprintf(msg, args...), "component", "database") + g.logger.Error( + fmt.Sprintf(msg, args...), + "component", "database", + ) } From 46177b236b3542410bd8b400c7a4cbf46a8c364a Mon Sep 17 00:00:00 2001 From: Akhil Repala Date: Sun, 9 Nov 2025 20:43:14 -0600 Subject: [PATCH 3/6] feat(aws): Fixed the log messages to overcome golangcilint errors Signed-off-by: Akhil Repala --- database/plugin/blob/aws/logger.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/database/plugin/blob/aws/logger.go b/database/plugin/blob/aws/logger.go index e7f1767a..a6a8468a 100644 --- a/database/plugin/blob/aws/logger.go +++ b/database/plugin/blob/aws/logger.go @@ -38,18 +38,21 @@ func (g *S3Logger) Infof(msg string, args ...any) { "component", "database", ) } + func (g *S3Logger) Warningf(msg string, args ...any) { g.logger.Warn( fmt.Sprintf(msg, args...), "component", "database", ) } + func (g *S3Logger) Debugf(msg string, args ...any) { g.logger.Debug( fmt.Sprintf(msg, args...), "component", "database", ) } + func (g *S3Logger) Errorf(msg string, args ...any) { g.logger.Error( fmt.Sprintf(msg, args...), From dd3a019854d00650594586e6a965bcb813a93898 Mon Sep 17 00:00:00 2001 From: Akhil Repala Date: Sun, 9 Nov 2025 20:50:14 -0600 Subject: [PATCH 4/6] feat(aws): Fixed the nailway workflow failures Signed-off-by: Akhil Repala --- database/plugin/blob/aws/database.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/database/plugin/blob/aws/database.go b/database/plugin/blob/aws/database.go index 7364c9f5..d84494b8 100644 --- a/database/plugin/blob/aws/database.go +++ b/database/plugin/blob/aws/database.go @@ -68,15 +68,22 @@ func New( cancel() return nil, errors.New("s3 blob: expected dataDir='s3://[/prefix]'") } + path := strings.TrimPrefix(dataDir, prefix) - parts := strings.SplitN(path, "/", 2) - if parts[0] == "" { + if path == "" { cancel() return nil, errors.New("s3 blob: bucket not set") } + + parts := strings.SplitN(path, "/", 2) + if len(parts) == 0 || parts[0] == "" { + cancel() + return nil, errors.New("s3 blob: invalid S3 path (missing bucket)") + } + bucket := parts[0] keyPrefix := "" - if len(parts) == 2 { + if len(parts) > 1 && parts[1] != "" { keyPrefix = strings.TrimSuffix(parts[1], "/") if keyPrefix != "" { keyPrefix += "/" From 0abec74906b2d431c38757de7f479948a24b91a1 Mon Sep 17 00:00:00 2001 From: Akhil Repala Date: Mon, 10 Nov 2025 11:10:20 -0600 Subject: [PATCH 5/6] feat(aws): Fixed the coderabbit comments Signed-off-by: Akhil Repala --- database/plugin/blob/aws/database.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/database/plugin/blob/aws/database.go b/database/plugin/blob/aws/database.go index d84494b8..7de1e552 100644 --- a/database/plugin/blob/aws/database.go +++ b/database/plugin/blob/aws/database.go @@ -108,7 +108,8 @@ func New( startupCancel: cancel, } if err := db.init(); err != nil { - return db, err + cancel() + return nil, err } return db, nil } @@ -124,7 +125,6 @@ func (d *BlobStoreS3) init() error { d.startupCancel() d.startupCancel = nil } - d.startupCtx = context.Background() return nil } From 8103433a650c0497fe883aa70a77ae0d56e89dac Mon Sep 17 00:00:00 2001 From: Akhil Repala Date: Mon, 10 Nov 2025 11:18:09 -0600 Subject: [PATCH 6/6] feat(aws): Fixed the case sensitivity mismatch in prefix handling Signed-off-by: Akhil Repala --- database/plugin/blob/aws/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/plugin/blob/aws/database.go b/database/plugin/blob/aws/database.go index 7de1e552..3eb6b3f3 100644 --- a/database/plugin/blob/aws/database.go +++ b/database/plugin/blob/aws/database.go @@ -69,7 +69,7 @@ func New( return nil, errors.New("s3 blob: expected dataDir='s3://[/prefix]'") } - path := strings.TrimPrefix(dataDir, prefix) + path := strings.TrimPrefix(strings.ToLower(dataDir), prefix) if path == "" { cancel() return nil, errors.New("s3 blob: bucket not set")