diff --git a/sei-db/config/ss_config.go b/sei-db/config/ss_config.go index 3fe94e750d..6987777b84 100644 --- a/sei-db/config/ss_config.go +++ b/sei-db/config/ss_config.go @@ -76,6 +76,10 @@ type StateStoreConfig struct { // When true, data is routed to separate DBs by EVM key family while // preserving the same logical store key and full key encoding inside each DB. SeparateEVMSubDBs bool `mapstructure:"evm-separate-dbs"` + + // HistoricalOffloadDSN, when set, wraps the SS so reads below the + // primary's earliest version fall back to the Cockroach offload store. + HistoricalOffloadDSN string `mapstructure:"historical-offload-dsn"` } // DefaultStateStoreConfig returns the default StateStoreConfig diff --git a/sei-db/config/toml.go b/sei-db/config/toml.go index eb387cb1d5..1057e22d06 100644 --- a/sei-db/config/toml.go +++ b/sei-db/config/toml.go @@ -139,6 +139,10 @@ evm-ss-split = {{ .StateStore.EVMSplit }} # When false, all EVM data stays in one DB using the current unified layout. # When true, data is routed to separate DBs while preserving the same evm key prefix format. evm-ss-separate-dbs = {{ .StateStore.SeparateEVMSubDBs }} + +# When set, reads below the local SS's earliest version fall back to the +# CockroachDB offload store populated by the historical-offload consumer. +historical-offload-dsn = "{{ .StateStore.HistoricalOffloadDSN }}" ` // ReceiptStoreConfigTemplate defines the configuration template for receipt-store diff --git a/sei-db/state_db/ss/offload/consumer/README.md b/sei-db/state_db/ss/offload/consumer/README.md new file mode 100644 index 0000000000..0ac50ebecd --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/README.md @@ -0,0 +1,61 @@ +# historical-offload-consumer + +Reads `ChangelogEntry` messages from the Kafka topic cryptosim publishes to and writes them to CockroachDB. + +## Layout + +- `schema/schema.sql` — CockroachDB DDL (idempotent) +- `cmd/historical-offload-consumer/` — CLI binary +- `config/example.json` — sample config +- `deploy.sh` — one-shot setup helper + +## Cloud prerequisites (manual) + +- MSK cluster + topic + IAM role with `kafka-cluster:Connect` and read on the topic +- CockroachDB cluster + database + user +- AWS credentials available to the process (env or IAM role) + +## Run + +```bash +export KAFKA_BROKERS="b-1...:9098,b-2...:9098" +export KAFKA_TOPIC="historical-offload" +export KAFKA_GROUP_ID="historical-offload-consumer" +export AWS_REGION="us-east-1" +export COCKROACH_DSN="postgresql://user@host:26257/db?sslmode=verify-full" + +RUN=1 ./deploy.sh +``` + +`deploy.sh` applies the schema, writes the config, builds the binary, and (with `RUN=1`) starts it. Flags: `SKIP_SCHEMA=1`, `SKIP_BUILD=1`. + +## Guarantees + +- At-least-once delivery. Sink UPSERTs on `(store_name, key, version)` so replay is a no-op. +- Per-partition ordering preserved. With `WORKERS>1` (recommended for fast + chains) messages are sharded by partition so each partition's writes still + flow through a single worker; cross-partition writes parallelize. +- Offsets commit only after the sink persists the entry. +- Sink writes use bounded exponential backoff (5 attempts, 1s→30s) before + giving up. On give-up the process exits non-zero so the supervisor restarts; + Kafka offsets stay uncommitted, so the next run replays from the last commit. + +## Read-side optimization tables + +Two optional tables make trace-style reads dramatically faster. Both are off +by default; flip on by setting the matching env var before `deploy.sh`: + +- `state_latest` — one row per `(store, key)` with the most recent value. + Reads at "current state" become a single PK lookup instead of a descending + scan on `state_mutations`. Enable with `ENABLE_LATEST=true`. ~2× the write + rate; cheap on Cockroach. +- `state_at_block` — dense end-of-block snapshot for hot stores. Each block + copies state_latest into state_at_block, so reads at any block in the + rolling window are a single PK lookup per `(store, key)`. Set + `SNAPSHOT_STORES="slashing,distribution,staking,bank,params"` (requires + `ENABLE_LATEST=true`); bound storage with `SNAPSHOT_WINDOW_BLOCKS=2000` so + the consumer GCs older blocks inline. + +The sentinel-pointer pattern was net-negative on pebble (compaction lag); +moved here it's net-positive because the write tax goes to a system designed +to absorb it. diff --git a/sei-db/state_db/ss/offload/consumer/cmd/historical-offload-consumer/main.go b/sei-db/state_db/ss/offload/consumer/cmd/historical-offload-consumer/main.go new file mode 100644 index 0000000000..365a2a24a9 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/cmd/historical-offload-consumer/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/consumer" +) + +func main() { + if len(os.Args) != 2 { + fmt.Fprintf(os.Stderr, "usage: %s \n", os.Args[0]) + os.Exit(2) + } + + cfg, err := consumer.LoadConfig(os.Args[1]) + if err != nil { + log.Fatalf("load config: %v", err) + } + + sink, err := consumer.NewCockroachSink(cfg.Cockroach) + if err != nil { + log.Fatalf("open cockroach sink: %v", err) + } + defer func() { _ = sink.Close() }() + + reader, err := consumer.NewKafkaReader(cfg.Kafka) + if err != nil { + log.Fatalf("open kafka reader: %v", err) + } + defer func() { _ = reader.Close() }() + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + c := consumer.New(reader, sink, consumer.Options{ + Logf: func(format string, args ...interface{}) { log.Printf(format, args...) }, + Workers: cfg.Workers, + }) + if err := c.Run(ctx); err != nil { + log.Fatalf("consumer: %v", err) + } +} diff --git a/sei-db/state_db/ss/offload/consumer/cockroach.go b/sei-db/state_db/ss/offload/consumer/cockroach.go new file mode 100644 index 0000000000..c6d574af7b --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/cockroach.go @@ -0,0 +1,334 @@ +package consumer + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/lib/pq" +) + +// CockroachConfig configures the CockroachDB sink. DSN follows the standard +// libpq/pgx format (e.g. postgresql://user@host:26257/db?sslmode=verify-full). +type CockroachConfig struct { + DSN string + MaxOpenConns int + MaxIdleConns int + ConnMaxLifetime time.Duration + + // EnableLatest UPSERTs into state_latest on every block so "current + // state" reads are a single PK lookup. Cheap; ~2x the write rate. + EnableLatest bool + + // SnapshotStores enables dense block-level snapshots in state_at_block + // for these stores. Each block writes a full snapshot of state_latest + // for these stores at the current version. Requires EnableLatest. + SnapshotStores []string + + // SnapshotWindowBlocks bounds the rolling snapshot window: rows older + // than (current - SnapshotWindowBlocks) are GC'd inline. 0 disables GC. + SnapshotWindowBlocks int64 +} + +func (c *CockroachConfig) ApplyDefaults() { + if c.MaxOpenConns == 0 { + c.MaxOpenConns = 8 + } + if c.MaxIdleConns == 0 { + c.MaxIdleConns = c.MaxOpenConns + } + if c.ConnMaxLifetime == 0 { + c.ConnMaxLifetime = 30 * time.Minute + } +} + +func (c *CockroachConfig) Validate() error { + if strings.TrimSpace(c.DSN) == "" { + return fmt.Errorf("cockroach dsn is required") + } + if c.MaxOpenConns < 0 { + return fmt.Errorf("cockroach max open conns must be non-negative") + } + if c.MaxIdleConns < 0 { + return fmt.Errorf("cockroach max idle conns must be non-negative") + } + if c.SnapshotWindowBlocks < 0 { + return fmt.Errorf("snapshot window blocks must be non-negative") + } + if len(c.SnapshotStores) > 0 && !c.EnableLatest { + return fmt.Errorf("snapshot stores require EnableLatest=true") + } + return nil +} + +type cockroachSink struct { + db *sql.DB + enableLatest bool + snapshotStores []string + snapshotWindow int64 +} + +var _ Sink = (*cockroachSink)(nil) + +// NewCockroachSink opens a pooled connection to CockroachDB. The caller is +// responsible for applying schema.sql beforehand. +func NewCockroachSink(cfg CockroachConfig) (Sink, error) { + cfg.ApplyDefaults() + if err := cfg.Validate(); err != nil { + return nil, err + } + + db, err := sql.Open("postgres", cfg.DSN) + if err != nil { + return nil, fmt.Errorf("open cockroach: %w", err) + } + db.SetMaxOpenConns(cfg.MaxOpenConns) + db.SetMaxIdleConns(cfg.MaxIdleConns) + db.SetConnMaxLifetime(cfg.ConnMaxLifetime) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := db.PingContext(ctx); err != nil { + _ = db.Close() + return nil, fmt.Errorf("ping cockroach: %w", err) + } + + return &cockroachSink{ + db: db, + enableLatest: cfg.EnableLatest, + snapshotStores: append([]string(nil), cfg.SnapshotStores...), + snapshotWindow: cfg.SnapshotWindowBlocks, + }, nil +} + +func (s *cockroachSink) Close() error { + return s.db.Close() +} + +func (s *cockroachSink) LastVersion(ctx context.Context) (int64, error) { + var v sql.NullInt64 + err := s.db.QueryRowContext(ctx, `SELECT max(version) FROM state_versions`).Scan(&v) + if err != nil { + return 0, fmt.Errorf("read last version: %w", err) + } + if !v.Valid { + return 0, nil + } + return v.Int64, nil +} + +func (s *cockroachSink) Write(ctx context.Context, rec Record) error { + if rec.Entry == nil { + return nil + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + if err := insertVersion(ctx, tx, rec); err != nil { + return err + } + if err := insertMutations(ctx, tx, rec); err != nil { + return err + } + if s.enableLatest { + if err := upsertLatest(ctx, tx, rec); err != nil { + return err + } + } + if len(s.snapshotStores) > 0 { + if err := snapshotAtBlock(ctx, tx, rec.Entry.Version, s.snapshotStores); err != nil { + return err + } + if s.snapshotWindow > 0 { + if err := gcSnapshots(ctx, tx, rec.Entry.Version, s.snapshotWindow); err != nil { + return err + } + } + } + if err := insertUpgrades(ctx, tx, rec); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit tx: %w", err) + } + return nil +} + +func insertVersion(ctx context.Context, tx *sql.Tx, rec Record) error { + _, err := tx.ExecContext(ctx, ` + INSERT INTO state_versions (version, kafka_topic, kafka_offset) + VALUES ($1, $2, $3) + ON CONFLICT (version) DO NOTHING + `, rec.Entry.Version, rec.Topic, rec.Offset) + if err != nil { + return fmt.Errorf("insert version: %w", err) + } + return nil +} + +// mutationBatchRows caps rows per INSERT; CockroachDB handles large batches +// but smaller batches keep transaction retries cheap under contention. +const mutationBatchRows = 500 + +// mutationBatch is one ready-to-execute INSERT with its parameter list. +type mutationBatch struct { + Stmt string + Args []interface{} +} + +// buildMutationBatches turns a ChangelogEntry into one or more parameterized +// INSERT statements. Pure function — no DB access — so it is unit-testable. +func buildMutationBatches(rec Record, maxRows int) []mutationBatch { + if maxRows <= 0 { + maxRows = mutationBatchRows + } + version := rec.Entry.Version + const colsPerRow = 5 + var ( + batches []mutationBatch + args = make([]interface{}, 0, maxRows*colsPerRow) + parts = make([]string, 0, maxRows) + ) + flush := func() { + if len(parts) == 0 { + return + } + stmt := `INSERT INTO state_mutations (store_name, key, version, value, deleted) VALUES ` + + strings.Join(parts, ",") + + ` ON CONFLICT (store_name, key, version) DO UPDATE SET value = excluded.value, deleted = excluded.deleted` + batches = append(batches, mutationBatch{Stmt: stmt, Args: args}) + args = make([]interface{}, 0, maxRows*colsPerRow) + parts = make([]string, 0, maxRows) + } + + for _, ncs := range rec.Entry.Changesets { + for _, p := range ncs.Changeset.Pairs { + idx := len(args) + parts = append(parts, fmt.Sprintf("($%d,$%d,$%d,$%d,$%d)", idx+1, idx+2, idx+3, idx+4, idx+5)) + args = append(args, ncs.Name, p.Key, version, p.Value, p.Delete) + if len(parts) >= maxRows { + flush() + } + } + } + flush() + return batches +} + +func insertMutations(ctx context.Context, tx *sql.Tx, rec Record) error { + for _, b := range buildMutationBatches(rec, mutationBatchRows) { + if _, err := tx.ExecContext(ctx, b.Stmt, b.Args...); err != nil { + return fmt.Errorf("insert mutations: %w", err) + } + } + return nil +} + +// buildLatestBatches builds UPSERT INTO state_latest batches. The WHERE +// clause guards against out-of-order writes from parallel partition workers +// — a row is only overwritten if the incoming version is at least as new. +func buildLatestBatches(rec Record, maxRows int) []mutationBatch { + if maxRows <= 0 { + maxRows = mutationBatchRows + } + version := rec.Entry.Version + const colsPerRow = 5 + var ( + batches []mutationBatch + args = make([]interface{}, 0, maxRows*colsPerRow) + parts = make([]string, 0, maxRows) + ) + flush := func() { + if len(parts) == 0 { + return + } + stmt := `INSERT INTO state_latest (store_name, key, value, version, deleted) VALUES ` + + strings.Join(parts, ",") + + ` ON CONFLICT (store_name, key) DO UPDATE + SET value = excluded.value, version = excluded.version, deleted = excluded.deleted + WHERE state_latest.version <= excluded.version` + batches = append(batches, mutationBatch{Stmt: stmt, Args: args}) + args = make([]interface{}, 0, maxRows*colsPerRow) + parts = make([]string, 0, maxRows) + } + for _, ncs := range rec.Entry.Changesets { + for _, p := range ncs.Changeset.Pairs { + idx := len(args) + parts = append(parts, fmt.Sprintf("($%d,$%d,$%d,$%d,$%d)", idx+1, idx+2, idx+3, idx+4, idx+5)) + args = append(args, ncs.Name, p.Key, p.Value, version, p.Delete) + if len(parts) >= maxRows { + flush() + } + } + } + flush() + return batches +} + +func upsertLatest(ctx context.Context, tx *sql.Tx, rec Record) error { + for _, b := range buildLatestBatches(rec, mutationBatchRows) { + if _, err := tx.ExecContext(ctx, b.Stmt, b.Args...); err != nil { + return fmt.Errorf("upsert state_latest: %w", err) + } + } + return nil +} + +// snapshotAtBlockSQL copies the current state_latest rows for the given +// stores into state_at_block at the supplied version. ON CONFLICT keeps the +// statement idempotent under retry. +const snapshotAtBlockSQL = ` +INSERT INTO state_at_block (block_version, store_name, key, value, deleted) +SELECT $1, store_name, key, value, deleted +FROM state_latest +WHERE store_name = ANY($2) +ON CONFLICT (block_version, store_name, key) DO UPDATE + SET value = excluded.value, deleted = excluded.deleted` + +func snapshotAtBlock(ctx context.Context, tx *sql.Tx, version int64, stores []string) error { + if _, err := tx.ExecContext(ctx, snapshotAtBlockSQL, version, pq.StringArray(stores)); err != nil { + return fmt.Errorf("snapshot state_at_block: %w", err) + } + return nil +} + +// gcSnapshotSQL deletes state_at_block rows older than the rolling window. +// Per-block invocation is fine because the bulk of work happens once: after +// the first run, only the just-aged-out block has rows below the cutoff. +const gcSnapshotSQL = `DELETE FROM state_at_block WHERE block_version < $1` + +func gcSnapshots(ctx context.Context, tx *sql.Tx, version, window int64) error { + cutoff := version - window + if cutoff <= 0 { + return nil + } + if _, err := tx.ExecContext(ctx, gcSnapshotSQL, cutoff); err != nil { + return fmt.Errorf("gc state_at_block: %w", err) + } + return nil +} + +func insertUpgrades(ctx context.Context, tx *sql.Tx, rec Record) error { + if len(rec.Entry.Upgrades) == 0 { + return nil + } + for _, up := range rec.Entry.Upgrades { + _, err := tx.ExecContext(ctx, ` + INSERT INTO state_tree_upgrades (version, name, rename_from, delete) + VALUES ($1, $2, $3, $4) + ON CONFLICT (version, name) DO UPDATE + SET rename_from = excluded.rename_from, delete = excluded.delete + `, rec.Entry.Version, up.Name, up.RenameFrom, up.Delete) + if err != nil { + return fmt.Errorf("insert upgrade: %w", err) + } + } + return nil +} diff --git a/sei-db/state_db/ss/offload/consumer/cockroach_test.go b/sei-db/state_db/ss/offload/consumer/cockroach_test.go new file mode 100644 index 0000000000..434522cba6 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/cockroach_test.go @@ -0,0 +1,186 @@ +package consumer + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + + dbproto "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +func makeRecord(version int64, changesets ...*dbproto.NamedChangeSet) Record { + return Record{ + Topic: "historical-offload", + Offset: version, + Entry: &dbproto.ChangelogEntry{ + Version: version, + Changesets: changesets, + }, + } +} + +func TestBuildMutationBatchesEmpty(t *testing.T) { + require.Empty(t, buildMutationBatches(makeRecord(1), 500)) +} + +func TestBuildMutationBatchesSingleBatch(t *testing.T) { + rec := makeRecord(7, &dbproto.NamedChangeSet{ + Name: "evm", + Changeset: dbproto.ChangeSet{Pairs: []*dbproto.KVPair{ + {Key: []byte("k1"), Value: []byte("v1")}, + {Key: []byte("k2"), Delete: true}, + }}, + }) + batches := buildMutationBatches(rec, 500) + require.Len(t, batches, 1) + + b := batches[0] + require.Contains(t, b.Stmt, "INSERT INTO state_mutations") + require.Contains(t, b.Stmt, "ON CONFLICT (store_name, key, version) DO UPDATE") + require.Contains(t, b.Stmt, "($1,$2,$3,$4,$5)") + require.Contains(t, b.Stmt, "($6,$7,$8,$9,$10)") + require.Equal(t, 2, strings.Count(b.Stmt, "($")) + require.Len(t, b.Args, 10) + + // First row: name, key, version, value, deleted. + require.Equal(t, "evm", b.Args[0]) + require.Equal(t, []byte("k1"), b.Args[1]) + require.Equal(t, int64(7), b.Args[2]) + require.Equal(t, []byte("v1"), b.Args[3]) + require.Equal(t, false, b.Args[4]) + // Second row: delete=true. + require.Equal(t, true, b.Args[9]) +} + +func TestBuildMutationBatchesSplits(t *testing.T) { + pairs := make([]*dbproto.KVPair, 250) + for i := range pairs { + pairs[i] = &dbproto.KVPair{Key: []byte{byte(i)}, Value: []byte{0x1}} + } + rec := makeRecord(9, &dbproto.NamedChangeSet{ + Name: "bank", + Changeset: dbproto.ChangeSet{Pairs: pairs}, + }) + + batches := buildMutationBatches(rec, 100) + require.Len(t, batches, 3) // 100 + 100 + 50 + require.Len(t, batches[0].Args, 500) + require.Len(t, batches[1].Args, 500) + require.Len(t, batches[2].Args, 250) +} + +func TestBuildMutationBatchesAcrossStores(t *testing.T) { + rec := makeRecord(3, + &dbproto.NamedChangeSet{ + Name: "evm", + Changeset: dbproto.ChangeSet{Pairs: []*dbproto.KVPair{{Key: []byte("a"), Value: []byte("1")}}}, + }, + &dbproto.NamedChangeSet{ + Name: "bank", + Changeset: dbproto.ChangeSet{Pairs: []*dbproto.KVPair{{Key: []byte("b"), Value: []byte("2")}}}, + }, + ) + batches := buildMutationBatches(rec, 500) + require.Len(t, batches, 1) + require.Equal(t, "evm", batches[0].Args[0]) + require.Equal(t, "bank", batches[0].Args[5]) +} + +func TestBuildMutationBatchesDefaultCap(t *testing.T) { + pairs := make([]*dbproto.KVPair, mutationBatchRows+1) + for i := range pairs { + pairs[i] = &dbproto.KVPair{Key: []byte{byte(i)}} + } + rec := makeRecord(1, &dbproto.NamedChangeSet{ + Name: "x", + Changeset: dbproto.ChangeSet{Pairs: pairs}, + }) + batches := buildMutationBatches(rec, 0) + require.Len(t, batches, 2) +} + +func TestBuildLatestBatchesShape(t *testing.T) { + rec := makeRecord(7, &dbproto.NamedChangeSet{ + Name: "evm", + Changeset: dbproto.ChangeSet{Pairs: []*dbproto.KVPair{ + {Key: []byte("k1"), Value: []byte("v1")}, + {Key: []byte("k2"), Delete: true}, + }}, + }) + batches := buildLatestBatches(rec, 500) + require.Len(t, batches, 1) + + b := batches[0] + require.Contains(t, b.Stmt, "INSERT INTO state_latest") + require.Contains(t, b.Stmt, "ON CONFLICT (store_name, key) DO UPDATE") + // The version-guard is what makes parallel partition writes safe. + require.Contains(t, b.Stmt, "WHERE state_latest.version <= excluded.version") + require.Equal(t, 2, strings.Count(b.Stmt, "($")) + require.Len(t, b.Args, 10) + + // Row layout: name, key, value, version, deleted. + require.Equal(t, "evm", b.Args[0]) + require.Equal(t, []byte("k1"), b.Args[1]) + require.Equal(t, []byte("v1"), b.Args[2]) + require.Equal(t, int64(7), b.Args[3]) + require.Equal(t, false, b.Args[4]) + require.Equal(t, true, b.Args[9]) +} + +func TestBuildLatestBatchesSplits(t *testing.T) { + pairs := make([]*dbproto.KVPair, 250) + for i := range pairs { + pairs[i] = &dbproto.KVPair{Key: []byte{byte(i)}, Value: []byte{0x1}} + } + rec := makeRecord(9, &dbproto.NamedChangeSet{ + Name: "bank", + Changeset: dbproto.ChangeSet{Pairs: pairs}, + }) + batches := buildLatestBatches(rec, 100) + require.Len(t, batches, 3) + require.Len(t, batches[0].Args, 500) + require.Len(t, batches[1].Args, 500) + require.Len(t, batches[2].Args, 250) +} + +func TestBuildLatestBatchesEmpty(t *testing.T) { + require.Empty(t, buildLatestBatches(makeRecord(1), 500)) +} + +func TestSnapshotAtBlockSQLShape(t *testing.T) { + for _, frag := range []string{ + "INSERT INTO state_at_block", + "FROM state_latest", + "store_name = ANY($2)", + "ON CONFLICT (block_version, store_name, key)", + } { + require.Containsf(t, snapshotAtBlockSQL, frag, + "snapshotAtBlockSQL missing required fragment %q", frag) + } +} + +func TestGCSnapshotSQLShape(t *testing.T) { + require.Contains(t, gcSnapshotSQL, "DELETE FROM state_at_block") + require.Contains(t, gcSnapshotSQL, "block_version < $1") +} + +func TestCockroachConfigValidateSnapshotRequiresLatest(t *testing.T) { + cfg := CockroachConfig{ + DSN: "postgres://x", + SnapshotStores: []string{"slashing"}, + } + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "EnableLatest") + + cfg.EnableLatest = true + require.NoError(t, cfg.Validate()) +} + +func TestCockroachConfigValidateNegativeWindow(t *testing.T) { + cfg := CockroachConfig{DSN: "postgres://x", SnapshotWindowBlocks: -1} + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "window") +} diff --git a/sei-db/state_db/ss/offload/consumer/config.go b/sei-db/state_db/ss/offload/consumer/config.go new file mode 100644 index 0000000000..2f23b0f471 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/config.go @@ -0,0 +1,42 @@ +package consumer + +import ( + "encoding/json" + "fmt" + "os" +) + +// Config is the top-level JSON config for the consumer binary. +type Config struct { + Kafka KafkaReaderConfig + Cockroach CockroachConfig + // Workers sets per-partition write parallelism. 0 or 1 means serial. + Workers int +} + +func (c *Config) Validate() error { + if err := c.Kafka.Validate(); err != nil { + return fmt.Errorf("kafka: %w", err) + } + if err := c.Cockroach.Validate(); err != nil { + return fmt.Errorf("cockroach: %w", err) + } + return nil +} + +// LoadConfig reads a JSON config file from path and validates it. +func LoadConfig(path string) (*Config, error) { + // #nosec G304 -- config path is supplied by the operator on the command line. + raw, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read config: %w", err) + } + cfg := &Config{} + if err := json.Unmarshal(raw, cfg); err != nil { + return nil, fmt.Errorf("parse config: %w", err) + } + if err := cfg.Validate(); err != nil { + return nil, err + } + return cfg, nil +} diff --git a/sei-db/state_db/ss/offload/consumer/config/example.json b/sei-db/state_db/ss/offload/consumer/config/example.json new file mode 100644 index 0000000000..2b0d7c3782 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/config/example.json @@ -0,0 +1,19 @@ +{ + "Comment": "Sample historical-offload-consumer config. Replace Kafka and Cockroach values for your environment.", + "Kafka": { + "Brokers": [ + "b-1.example.kafka.amazonaws.com:9098", + "b-2.example.kafka.amazonaws.com:9098" + ], + "Topic": "historical-offload", + "GroupID": "historical-offload-consumer", + "Region": "us-east-1", + "TLSEnabled": true, + "SASLMechanism": "aws-msk-iam", + "StartOffset": "first" + }, + "Cockroach": { + "DSN": "postgresql://offload_user@crdb.example.internal:26257/offload?sslmode=verify-full&statement_timeout=60000", + "MaxOpenConns": 16 + } +} diff --git a/sei-db/state_db/ss/offload/consumer/config_test.go b/sei-db/state_db/ss/offload/consumer/config_test.go new file mode 100644 index 0000000000..27e4a9ee43 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/config_test.go @@ -0,0 +1,158 @@ +package consumer + +import ( + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestKafkaReaderConfigValidate(t *testing.T) { + tests := []struct { + name string + cfg KafkaReaderConfig + wantErr string + }{ + { + name: "missing brokers", + cfg: KafkaReaderConfig{Topic: "t", GroupID: "g"}, + wantErr: "brokers", + }, + { + name: "missing topic", + cfg: KafkaReaderConfig{Brokers: []string{"b:9092"}, GroupID: "g"}, + wantErr: "topic", + }, + { + name: "missing group id", + cfg: KafkaReaderConfig{Brokers: []string{"b:9092"}, Topic: "t"}, + wantErr: "group id", + }, + { + name: "bad start offset", + cfg: KafkaReaderConfig{Brokers: []string{"b:9092"}, Topic: "t", GroupID: "g", StartOffset: "middle"}, + wantErr: "start offset", + }, + { + name: "valid minimal", + cfg: KafkaReaderConfig{Brokers: []string{"b:9092"}, Topic: "t", GroupID: "g"}, + wantErr: "", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + if tc.wantErr == "" { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + return + } + if err == nil || !strings.Contains(err.Error(), tc.wantErr) { + t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) + } + }) + } +} + +func TestKafkaReaderConfigApplyDefaults(t *testing.T) { + cfg := KafkaReaderConfig{} + cfg.ApplyDefaults() + if cfg.ClientID == "" { + t.Fatal("client id should default") + } + if cfg.StartOffset != "first" { + t.Fatalf("start offset default = %q, want first", cfg.StartOffset) + } + if cfg.MaxBytes == 0 || cfg.MinBytes == 0 || cfg.MaxWait == 0 { + t.Fatalf("min/max bytes and max wait should default, got %+v", cfg) + } +} + +func TestCockroachConfigValidate(t *testing.T) { + tests := []struct { + name string + cfg CockroachConfig + wantErr string + }{ + {"missing dsn", CockroachConfig{}, "dsn"}, + {"blank dsn", CockroachConfig{DSN: " "}, "dsn"}, + {"negative open", CockroachConfig{DSN: "x", MaxOpenConns: -1}, "max open"}, + {"negative idle", CockroachConfig{DSN: "x", MaxIdleConns: -1}, "max idle"}, + {"valid", CockroachConfig{DSN: "postgresql://host/db"}, ""}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + if tc.wantErr == "" { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + return + } + if err == nil || !strings.Contains(err.Error(), tc.wantErr) { + t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) + } + }) + } +} + +func TestCockroachConfigApplyDefaults(t *testing.T) { + cfg := CockroachConfig{DSN: "x"} + cfg.ApplyDefaults() + if cfg.MaxOpenConns == 0 || cfg.MaxIdleConns == 0 { + t.Fatalf("conn counts should default, got %+v", cfg) + } + if cfg.ConnMaxLifetime == 0 || cfg.ConnMaxLifetime > 24*time.Hour { + t.Fatalf("conn max lifetime default unreasonable: %v", cfg.ConnMaxLifetime) + } +} + +func TestConfigValidateComposes(t *testing.T) { + cfg := &Config{} + if err := cfg.Validate(); err == nil || !strings.Contains(err.Error(), "kafka") { + t.Fatalf("expected kafka error, got %v", err) + } + cfg.Kafka = KafkaReaderConfig{Brokers: []string{"b:9092"}, Topic: "t", GroupID: "g"} + if err := cfg.Validate(); err == nil || !strings.Contains(err.Error(), "cockroach") { + t.Fatalf("expected cockroach error, got %v", err) + } + cfg.Cockroach = CockroachConfig{DSN: "postgresql://host/db"} + if err := cfg.Validate(); err != nil { + t.Fatalf("expected no error, got %v", err) + } +} + +func TestLoadConfig(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "cfg.json") + body := `{ + "Kafka": {"Brokers":["b:9092"], "Topic":"t", "GroupID":"g"}, + "Cockroach": {"DSN":"postgresql://host/db", "MaxOpenConns": 4} + }` + require.NoError(t, os.WriteFile(path, []byte(body), 0o600)) + + cfg, err := LoadConfig(path) + require.NoError(t, err) + require.Equal(t, []string{"b:9092"}, cfg.Kafka.Brokers) + require.Equal(t, "t", cfg.Kafka.Topic) + require.Equal(t, "postgresql://host/db", cfg.Cockroach.DSN) + require.Equal(t, 4, cfg.Cockroach.MaxOpenConns) +} + +func TestLoadConfigRejectsInvalid(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "cfg.json") + require.NoError(t, os.WriteFile(path, []byte(`{"Kafka":{}}`), 0o600)) + + _, err := LoadConfig(path) + require.Error(t, err) +} + +func TestLoadConfigMissingFile(t *testing.T) { + _, err := LoadConfig(filepath.Join(t.TempDir(), "nope.json")) + require.Error(t, err) +} diff --git a/sei-db/state_db/ss/offload/consumer/consumer.go b/sei-db/state_db/ss/offload/consumer/consumer.go new file mode 100644 index 0000000000..19b7ff9c70 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/consumer.go @@ -0,0 +1,236 @@ +package consumer + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/segmentio/kafka-go" + "golang.org/x/sync/errgroup" +) + +// MessageSource is the subset of *kafka.Reader the consumer uses; the +// indirection lets tests drive the loop with a fake. +type MessageSource interface { + FetchMessage(ctx context.Context) (kafka.Message, error) + CommitMessages(ctx context.Context, msgs ...kafka.Message) error +} + +// Consumer fans messages out to per-partition workers so cross-partition +// writes parallelize while ordering within a partition is preserved. +type Consumer struct { + reader MessageSource + sink Sink + logf func(format string, args ...interface{}) + workers int + shardBuf int + maxAttempts int + baseBackoff time.Duration + maxBackoff time.Duration +} + +const ( + defaultSinkMaxAttempts = 5 + defaultSinkBaseBackoff = 1 * time.Second + defaultSinkMaxBackoff = 30 * time.Second + defaultWorkers = 1 + defaultShardBuffer = 32 +) + +// Options configures the consumer loop. Zero values pick defaults. +type Options struct { + Logf func(format string, args ...interface{}) + SinkMaxAttempts int + SinkBaseBackoff time.Duration + SinkMaxBackoff time.Duration + // Workers sets per-partition write parallelism. Messages are sharded by + // partition so a partition's writes stay ordered. Default 1 (serial). + Workers int + // ShardBufferSize bounds in-flight messages per worker. Default 32. + ShardBufferSize int +} + +func New(reader MessageSource, sink Sink, opts Options) *Consumer { + logf := opts.Logf + if logf == nil { + logf = func(string, ...interface{}) {} + } + maxAttempts := opts.SinkMaxAttempts + if maxAttempts <= 0 { + maxAttempts = defaultSinkMaxAttempts + } + base := opts.SinkBaseBackoff + if base <= 0 { + base = defaultSinkBaseBackoff + } + maxBackoff := opts.SinkMaxBackoff + if maxBackoff <= 0 { + maxBackoff = defaultSinkMaxBackoff + } + workers := opts.Workers + if workers <= 0 { + workers = defaultWorkers + } + shardBuf := opts.ShardBufferSize + if shardBuf <= 0 { + shardBuf = defaultShardBuffer + } + return &Consumer{ + reader: reader, + sink: sink, + logf: logf, + workers: workers, + shardBuf: shardBuf, + maxAttempts: maxAttempts, + baseBackoff: base, + maxBackoff: maxBackoff, + } +} + +// Run blocks until ctx is cancelled or an unrecoverable error occurs. Offsets +// commit only after the sink persists each message (at-least-once delivery). +func (c *Consumer) Run(ctx context.Context) error { + if c.workers == 1 { + return c.runSerial(ctx) + } + return c.runParallel(ctx) +} + +func (c *Consumer) runSerial(ctx context.Context) error { + for { + msg, err := c.reader.FetchMessage(ctx) + if err != nil { + if isCancellation(err) { + return nil + } + return fmt.Errorf("fetch kafka message: %w", err) + } + if err := c.processMessage(ctx, msg); err != nil { + if isCancellation(err) { + return nil + } + return err + } + } +} + +func (c *Consumer) runParallel(ctx context.Context) error { + g, gctx := errgroup.WithContext(ctx) + shards := make([]chan kafka.Message, c.workers) + for i := range shards { + shards[i] = make(chan kafka.Message, c.shardBuf) + ch := shards[i] + g.Go(func() error { return c.workerLoop(gctx, ch) }) + } + g.Go(func() error { + defer func() { + for _, ch := range shards { + close(ch) + } + }() + for { + msg, err := c.reader.FetchMessage(gctx) + if err != nil { + if isCancellation(err) { + return nil + } + return fmt.Errorf("fetch kafka message: %w", err) + } + shard := shardFor(msg.Partition, c.workers) + select { + case shards[shard] <- msg: + case <-gctx.Done(): + return nil + } + } + }) + if err := g.Wait(); err != nil && !isCancellation(err) { + return err + } + return nil +} + +func (c *Consumer) workerLoop(ctx context.Context, ch <-chan kafka.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-ch: + if !ok { + return nil + } + if err := c.processMessage(ctx, msg); err != nil { + if isCancellation(err) { + return nil + } + return err + } + } + } +} + +func (c *Consumer) processMessage(ctx context.Context, msg kafka.Message) error { + entry, err := DecodeEntry(msg.Value) + if err != nil { + return fmt.Errorf("decode message at offset %d: %w", msg.Offset, err) + } + rec := Record{ + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + Entry: entry, + } + start := time.Now() + if err := c.writeWithRetry(ctx, rec); err != nil { + return fmt.Errorf("sink write version %d: %w", entry.Version, err) + } + c.logf("wrote version=%d partition=%d offset=%d in %s", + entry.Version, msg.Partition, msg.Offset, time.Since(start)) + if err := c.reader.CommitMessages(ctx, msg); err != nil { + return fmt.Errorf("commit kafka offset %d: %w", msg.Offset, err) + } + return nil +} + +// writeWithRetry calls sink.Write with bounded exponential backoff. +func (c *Consumer) writeWithRetry(ctx context.Context, rec Record) error { + backoff := c.baseBackoff + var lastErr error + for attempt := 1; attempt <= c.maxAttempts; attempt++ { + err := c.sink.Write(ctx, rec) + if err == nil { + return nil + } + lastErr = err + if isCancellation(err) { + return err + } + if attempt == c.maxAttempts { + break + } + c.logf("sink write attempt %d/%d failed: %v; retrying in %s", + attempt, c.maxAttempts, err, backoff) + select { + case <-time.After(backoff): + case <-ctx.Done(): + return ctx.Err() + } + backoff *= 2 + if backoff > c.maxBackoff { + backoff = c.maxBackoff + } + } + return fmt.Errorf("sink write failed after %d attempts: %w", c.maxAttempts, lastErr) +} + +func shardFor(partition, workers int) int { + if partition < 0 { + partition = -partition + } + return partition % workers +} + +func isCancellation(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} diff --git a/sei-db/state_db/ss/offload/consumer/consumer_test.go b/sei-db/state_db/ss/offload/consumer/consumer_test.go new file mode 100644 index 0000000000..c37864df17 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/consumer_test.go @@ -0,0 +1,233 @@ +package consumer + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + gogoproto "github.com/gogo/protobuf/proto" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/require" + + dbproto "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +type fakeSource struct { + msgs []kafka.Message + fetchIdx int + committed []kafka.Message + fetchErr error + mu sync.Mutex +} + +func (f *fakeSource) FetchMessage(ctx context.Context) (kafka.Message, error) { + f.mu.Lock() + if f.fetchErr != nil { + err := f.fetchErr + f.mu.Unlock() + return kafka.Message{}, err + } + if f.fetchIdx < len(f.msgs) { + m := f.msgs[f.fetchIdx] + f.fetchIdx++ + f.mu.Unlock() + return m, nil + } + f.mu.Unlock() + <-ctx.Done() + return kafka.Message{}, ctx.Err() +} + +func (f *fakeSource) CommitMessages(ctx context.Context, msgs ...kafka.Message) error { + f.mu.Lock() + defer f.mu.Unlock() + f.committed = append(f.committed, msgs...) + return nil +} + +type recordingSink struct { + records []Record + err error +} + +func (s *recordingSink) Write(ctx context.Context, rec Record) error { + if s.err != nil { + return s.err + } + s.records = append(s.records, rec) + return nil +} +func (s *recordingSink) LastVersion(ctx context.Context) (int64, error) { return 0, nil } +func (s *recordingSink) Close() error { return nil } + +// flakySink fails the first failuresLeft Write calls, then succeeds. +type flakySink struct { + failuresLeft int + attempts int +} + +func (s *flakySink) Write(ctx context.Context, rec Record) error { + s.attempts++ + if s.failuresLeft > 0 { + s.failuresLeft-- + return errors.New("transient") + } + return nil +} +func (s *flakySink) LastVersion(ctx context.Context) (int64, error) { return 0, nil } +func (s *flakySink) Close() error { return nil } + +func marshalEntry(t *testing.T, version int64, pairs ...*dbproto.KVPair) []byte { + t.Helper() + entry := &dbproto.ChangelogEntry{ + Version: version, + Changesets: []*dbproto.NamedChangeSet{{ + Name: "evm", + Changeset: dbproto.ChangeSet{Pairs: pairs}, + }}, + } + payload, err := gogoproto.Marshal(entry) + require.NoError(t, err) + return payload +} + +func TestConsumerRunWritesAndCommits(t *testing.T) { + src := &fakeSource{msgs: []kafka.Message{ + {Topic: "t", Partition: 0, Offset: 10, Value: marshalEntry(t, 1, &dbproto.KVPair{Key: []byte("a"), Value: []byte("1")})}, + {Topic: "t", Partition: 0, Offset: 11, Value: marshalEntry(t, 2, &dbproto.KVPair{Key: []byte("b"), Delete: true})}, + }} + sink := &recordingSink{} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c := New(src, sink, Options{}) + err := c.Run(ctx) + require.NoError(t, err) + + require.Len(t, sink.records, 2) + require.Equal(t, int64(1), sink.records[0].Entry.Version) + require.Equal(t, int64(11), sink.records[1].Offset) + + require.Len(t, src.committed, 2) + require.Equal(t, int64(10), src.committed[0].Offset) + require.Equal(t, int64(11), src.committed[1].Offset) +} + +func TestConsumerRunSinkErrorStopsBeforeCommit(t *testing.T) { + src := &fakeSource{msgs: []kafka.Message{ + {Topic: "t", Offset: 1, Value: marshalEntry(t, 1)}, + }} + sink := &recordingSink{err: errors.New("sink boom")} + + c := New(src, sink, Options{ + SinkMaxAttempts: 2, + SinkBaseBackoff: time.Millisecond, + SinkMaxBackoff: time.Millisecond, + }) + err := c.Run(context.Background()) + require.Error(t, err) + require.Contains(t, err.Error(), "sink boom") + require.Empty(t, src.committed, "offset must not be committed when sink fails") +} + +func TestConsumerRunRetriesSinkUntilSuccess(t *testing.T) { + src := &fakeSource{msgs: []kafka.Message{ + {Topic: "t", Partition: 0, Offset: 5, Value: marshalEntry(t, 1, &dbproto.KVPair{Key: []byte("a"), Value: []byte("1")})}, + }} + sink := &flakySink{failuresLeft: 2} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c := New(src, sink, Options{ + SinkMaxAttempts: 5, + SinkBaseBackoff: time.Millisecond, + SinkMaxBackoff: 2 * time.Millisecond, + }) + require.NoError(t, c.Run(ctx)) + + require.Equal(t, 3, sink.attempts, "sink should be retried until it succeeds") + require.Len(t, src.committed, 1) + require.Equal(t, int64(5), src.committed[0].Offset) +} + +func TestConsumerRunDecodeErrorStops(t *testing.T) { + src := &fakeSource{msgs: []kafka.Message{ + {Topic: "t", Offset: 1, Value: []byte{0xff, 0xff}}, + }} + sink := &recordingSink{} + + err := New(src, sink, Options{}).Run(context.Background()) + require.Error(t, err) + require.Contains(t, err.Error(), "decode message") + require.Empty(t, sink.records) + require.Empty(t, src.committed) +} + +func TestConsumerRunCancelReturnsNil(t *testing.T) { + src := &fakeSource{fetchErr: context.Canceled} + err := New(src, &recordingSink{}, Options{}).Run(context.Background()) + require.NoError(t, err) +} + +// concurrentSink locks per call so the test can assert the consumer +// actually fans calls out across goroutines (>1 in flight at a time). +type concurrentSink struct { + mu sync.Mutex + records []Record + maxInFlight int32 + inFlight int32 + delay time.Duration +} + +func (s *concurrentSink) Write(_ context.Context, rec Record) error { + cur := atomic.AddInt32(&s.inFlight, 1) + defer atomic.AddInt32(&s.inFlight, -1) + for { + prev := atomic.LoadInt32(&s.maxInFlight) + if cur <= prev || atomic.CompareAndSwapInt32(&s.maxInFlight, prev, cur) { + break + } + } + if s.delay > 0 { + time.Sleep(s.delay) + } + s.mu.Lock() + s.records = append(s.records, rec) + s.mu.Unlock() + return nil +} +func (s *concurrentSink) LastVersion(context.Context) (int64, error) { return 0, nil } +func (s *concurrentSink) Close() error { return nil } + +func TestConsumerParallelFansOutAcrossPartitions(t *testing.T) { + const nPartitions = 4 + msgs := make([]kafka.Message, 0, nPartitions) + for p := 0; p < nPartitions; p++ { + msgs = append(msgs, kafka.Message{ + Topic: "t", Partition: p, Offset: int64(p), + Value: marshalEntry(t, int64(p+1)), + }) + } + src := &fakeSource{msgs: msgs} + sink := &concurrentSink{delay: 25 * time.Millisecond} + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + c := New(src, sink, Options{Workers: nPartitions}) + require.NoError(t, c.Run(ctx)) + + require.Len(t, sink.records, nPartitions) + require.Greater(t, atomic.LoadInt32(&sink.maxInFlight), int32(1), + "with Workers=%d the sink should see >1 concurrent writes", nPartitions) +} + +func TestShardForStablePerPartition(t *testing.T) { + // Same partition always lands on the same worker (preserves order); two + // different partitions don't necessarily collide. + require.Equal(t, shardFor(7, 4), shardFor(7, 4)) + require.NotEqual(t, shardFor(0, 4), shardFor(1, 4)) + require.GreaterOrEqual(t, shardFor(-3, 4), 0, "negative partition shouldn't go negative") +} diff --git a/sei-db/state_db/ss/offload/consumer/deploy.sh b/sei-db/state_db/ss/offload/consumer/deploy.sh new file mode 100755 index 0000000000..5489893d9e --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/deploy.sh @@ -0,0 +1,117 @@ +#!/usr/bin/env bash +# Provisions the historical-offload consumer against a pre-existing MSK cluster +# and CockroachDB cluster. The cloud-side resources (MSK cluster, topic, IAM +# role, CockroachDB cluster + database + user) must already exist. +# +# Required env: +# KAFKA_BROKERS comma-separated broker endpoints (e.g. b-1.x.kafka.amazonaws.com:9098,b-2.x...) +# KAFKA_TOPIC topic cryptosim is publishing to +# KAFKA_GROUP_ID consumer group id +# AWS_REGION region for AWS MSK IAM signing (also exported for the binary at runtime) +# COCKROACH_DSN full postgresql:// DSN (include sslmode, statement_timeout, etc.) +# +# Optional env: +# KAFKA_TLS_ENABLED default true +# KAFKA_SASL_MECHANISM default aws-msk-iam ("" or "none" disables) +# KAFKA_START_OFFSET default first (first|last) +# COCKROACH_MAX_CONNS default 16 +# WORKERS default 1 (per-partition parallelism) +# ENABLE_LATEST default false (UPSERT state_latest per block) +# SNAPSHOT_STORES default "" (comma-separated; e.g. "slashing,distribution,staking,bank,params") +# SNAPSHOT_WINDOW_BLOCKS default 0 (rolling window; 0 disables GC) +# CONFIG_OUT default ./historical-offload-consumer.json +# BIN_OUT default ./bin/historical-offload-consumer +# SKIP_SCHEMA=1 skip applying schema.sql +# SKIP_BUILD=1 skip go build +# RUN=1 exec the binary at the end + +set -euo pipefail + +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../../../../.." &>/dev/null && pwd)" + +: "${KAFKA_BROKERS:?set KAFKA_BROKERS}" +: "${KAFKA_TOPIC:?set KAFKA_TOPIC}" +: "${KAFKA_GROUP_ID:?set KAFKA_GROUP_ID}" +: "${AWS_REGION:?set AWS_REGION}" +: "${COCKROACH_DSN:?set COCKROACH_DSN}" + +KAFKA_TLS_ENABLED="${KAFKA_TLS_ENABLED:-true}" +KAFKA_SASL_MECHANISM="${KAFKA_SASL_MECHANISM:-aws-msk-iam}" +KAFKA_START_OFFSET="${KAFKA_START_OFFSET:-first}" +COCKROACH_MAX_CONNS="${COCKROACH_MAX_CONNS:-16}" +WORKERS="${WORKERS:-1}" +ENABLE_LATEST="${ENABLE_LATEST:-false}" +SNAPSHOT_STORES="${SNAPSHOT_STORES:-}" +SNAPSHOT_WINDOW_BLOCKS="${SNAPSHOT_WINDOW_BLOCKS:-0}" +CONFIG_OUT="${CONFIG_OUT:-./historical-offload-consumer.json}" +BIN_OUT="${BIN_OUT:-./bin/historical-offload-consumer}" + +log() { printf '[%s] %s\n' "$(date -u +%FT%TZ)" "$*"; } + +apply_schema() { + local schema="${SCRIPT_DIR}/schema/schema.sql" + [[ -f "$schema" ]] || { echo "schema file missing: $schema" >&2; exit 1; } + + if command -v cockroach &>/dev/null; then + log "applying schema with cockroach sql" + cockroach sql --url="${COCKROACH_DSN}" <"$schema" + elif command -v psql &>/dev/null; then + log "applying schema with psql" + psql "${COCKROACH_DSN}" -v ON_ERROR_STOP=1 -f "$schema" + else + echo "need 'cockroach' or 'psql' on PATH to apply schema; set SKIP_SCHEMA=1 to bypass" >&2 + exit 1 + fi +} + +write_config() { + log "writing config to ${CONFIG_OUT}" + mkdir -p "$(dirname "${CONFIG_OUT}")" + + python3 - "$CONFIG_OUT" <0 switches reads to AS OF SYSTEM TIME so any replica +// can serve them; 0 means strongly-consistent reads. +type CockroachConfig struct { + DSN string + MaxOpenConns int + MaxIdleConns int + ConnMaxLifetime time.Duration + FollowerReadStaleness time.Duration +} + +func (c *CockroachConfig) ApplyDefaults() { + if c.MaxOpenConns == 0 { + c.MaxOpenConns = 16 + } + if c.MaxIdleConns == 0 { + c.MaxIdleConns = c.MaxOpenConns + } + if c.ConnMaxLifetime == 0 { + c.ConnMaxLifetime = 30 * time.Minute + } +} + +func (c *CockroachConfig) Validate() error { + if strings.TrimSpace(c.DSN) == "" { + return fmt.Errorf("cockroach dsn is required") + } + if c.MaxOpenConns < 0 { + return fmt.Errorf("cockroach max open conns must be non-negative") + } + if c.MaxIdleConns < 0 { + return fmt.Errorf("cockroach max idle conns must be non-negative") + } + if c.FollowerReadStaleness < 0 { + return fmt.Errorf("follower read staleness must be non-negative") + } + return nil +} + +type cockroachReader struct { + db *sql.DB + staleness time.Duration +} + +var _ Reader = (*cockroachReader)(nil) + +// NewCockroachReader assumes schema.sql has already been applied. +func NewCockroachReader(cfg CockroachConfig) (Reader, error) { + cfg.ApplyDefaults() + if err := cfg.Validate(); err != nil { + return nil, err + } + + db, err := sql.Open("postgres", cfg.DSN) + if err != nil { + return nil, fmt.Errorf("open cockroach: %w", err) + } + db.SetMaxOpenConns(cfg.MaxOpenConns) + db.SetMaxIdleConns(cfg.MaxIdleConns) + db.SetConnMaxLifetime(cfg.ConnMaxLifetime) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := db.PingContext(ctx); err != nil { + _ = db.Close() + return nil, fmt.Errorf("ping cockroach: %w", err) + } + + return &cockroachReader{db: db, staleness: cfg.FollowerReadStaleness}, nil +} + +func (r *cockroachReader) Close() error { return r.db.Close() } + +func (r *cockroachReader) LastVersion(ctx context.Context) (int64, error) { + var v sql.NullInt64 + err := r.db.QueryRowContext(ctx, `SELECT max(version) FROM state_versions`).Scan(&v) + if err != nil { + return 0, fmt.Errorf("read last version: %w", err) + } + if !v.Valid { + return 0, nil + } + return v.Int64, nil +} + +func (r *cockroachReader) Get(ctx context.Context, storeName string, key []byte, targetVersion int64) (Value, error) { + lkp := Lookup{StoreName: storeName, Key: string(key)} + res, err := r.BatchGet(ctx, targetVersion, []Lookup{lkp}) + if err != nil { + return Value{}, err + } + v, ok := res[lkp] + if !ok { + return Value{}, ErrNotFound + } + return v, nil +} + +// LATERAL + LIMIT 1 against the descending PK turns each (store, key) pair +// into a single index seek; $1=stores, $2=keys (parallel arrays), $3=version. +const batchLookupSQL = ` +SELECT t.store_name, t.key, m.version, m.value, m.deleted +FROM unnest($1::STRING[], $2::BYTES[]) AS t(store_name, key), + LATERAL ( + SELECT version, value, deleted + FROM state_mutations + WHERE store_name = t.store_name + AND key = t.key + AND version <= $3 + ORDER BY version DESC + LIMIT 1 + ) m` + +func splitLookups(lookups []Lookup) (stores []string, keys [][]byte) { + stores = make([]string, len(lookups)) + keys = make([][]byte, len(lookups)) + for i, l := range lookups { + stores[i] = l.StoreName + keys[i] = []byte(l.Key) + } + return stores, keys +} + +func aostStmt(staleness time.Duration) string { + return fmt.Sprintf("SET TRANSACTION AS OF SYSTEM TIME with_max_staleness('%s')", staleness) +} + +func (r *cockroachReader) withReadTx(ctx context.Context, fn func(*sql.Tx) error) error { + tx, err := r.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + if err != nil { + return fmt.Errorf("begin read tx: %w", err) + } + if r.staleness > 0 { + if _, err := tx.ExecContext(ctx, aostStmt(r.staleness)); err != nil { + _ = tx.Rollback() + return fmt.Errorf("set follower read: %w", err) + } + } + if err := fn(tx); err != nil { + _ = tx.Rollback() + return err + } + return tx.Commit() +} + +func (r *cockroachReader) BatchGet(ctx context.Context, targetVersion int64, lookups []Lookup) (map[Lookup]Value, error) { + if len(lookups) == 0 { + return map[Lookup]Value{}, nil + } + stores, keys := splitLookups(lookups) + out := make(map[Lookup]Value, len(lookups)) + + err := r.withReadTx(ctx, func(tx *sql.Tx) error { + rows, err := tx.QueryContext(ctx, batchLookupSQL, pq.StringArray(stores), pq.ByteaArray(keys), targetVersion) + if err != nil { + return fmt.Errorf("batch lookup: %w", err) + } + defer rows.Close() + for rows.Next() { + var ( + storeName string + key []byte + version int64 + value []byte + deleted bool + ) + if err := rows.Scan(&storeName, &key, &version, &value, &deleted); err != nil { + return fmt.Errorf("scan batch row: %w", err) + } + if deleted { + continue + } + out[Lookup{StoreName: storeName, Key: string(key)}] = Value{ + Bytes: value, + Version: version, + } + } + return rows.Err() + }) + if err != nil { + return nil, err + } + return out, nil +} diff --git a/sei-db/state_db/ss/offload/historical/cockroach_test.go b/sei-db/state_db/ss/offload/historical/cockroach_test.go new file mode 100644 index 0000000000..96433a777f --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/cockroach_test.go @@ -0,0 +1,84 @@ +package historical + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSplitLookupsParallelArrays(t *testing.T) { + stores, keys := splitLookups([]Lookup{ + {StoreName: "evm", Key: "k1"}, + {StoreName: "bank", Key: "k2"}, + {StoreName: "evm", Key: ""}, + }) + require.Equal(t, []string{"evm", "bank", "evm"}, stores) + require.Equal(t, [][]byte{[]byte("k1"), []byte("k2"), {}}, keys) +} + +func TestSplitLookupsEmpty(t *testing.T) { + stores, keys := splitLookups(nil) + require.Empty(t, stores) + require.Empty(t, keys) +} + +func TestAostStmtFormatsDuration(t *testing.T) { + require.Equal(t, + "SET TRANSACTION AS OF SYSTEM TIME with_max_staleness('10s')", + aostStmt(10*time.Second)) + require.Equal(t, + "SET TRANSACTION AS OF SYSTEM TIME with_max_staleness('1m30s')", + aostStmt(90*time.Second)) +} + +// TestBatchLookupSQLShape pins the salient pieces of the batch query so an +// accidental edit that loses LATERAL, the descending order, or the LIMIT 1 +// (each of which is needed for the per-pair PK-seek plan) breaks loudly +// instead of silently regressing into a full version-history scan. +func TestBatchLookupSQLShape(t *testing.T) { + for _, frag := range []string{ + "unnest($1::STRING[], $2::BYTES[])", + "LATERAL", + "FROM state_mutations", + "version <= $3", + "ORDER BY version DESC", + "LIMIT 1", + } { + require.Containsf(t, batchLookupSQL, frag, + "batchLookupSQL missing required fragment %q", frag) + } +} + +func TestCockroachConfigValidate(t *testing.T) { + cases := []struct { + name string + cfg CockroachConfig + err string + }{ + {"missing dsn", CockroachConfig{}, "dsn"}, + {"blank dsn", CockroachConfig{DSN: " "}, "dsn"}, + {"negative open conns", CockroachConfig{DSN: "x", MaxOpenConns: -1}, "open"}, + {"negative idle conns", CockroachConfig{DSN: "x", MaxIdleConns: -1}, "idle"}, + {"negative staleness", CockroachConfig{DSN: "x", FollowerReadStaleness: -1}, "staleness"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), tc.err), + "err %q should contain %q", err, tc.err) + }) + } +} + +func TestCockroachConfigApplyDefaults(t *testing.T) { + c := CockroachConfig{DSN: "x"} + c.ApplyDefaults() + require.Equal(t, 16, c.MaxOpenConns) + require.Equal(t, 16, c.MaxIdleConns) + require.Equal(t, 30*time.Minute, c.ConnMaxLifetime) + require.Equal(t, time.Duration(0), c.FollowerReadStaleness, + "staleness defaults to strongly-consistent reads; operators opt in") +} diff --git a/sei-db/state_db/ss/offload/historical/reader.go b/sei-db/state_db/ss/offload/historical/reader.go new file mode 100644 index 0000000000..b6fd300159 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/reader.go @@ -0,0 +1,36 @@ +// Package historical reads historical state from the CockroachDB store +// written by the offload consumer; sized for trace-style workloads. +package historical + +import ( + "context" + "errors" +) + +var ErrNotFound = errors.New("historical state not found") + +// Lookup uses string for Key so it can be a map key (the []byte-as-string idiom). +type Lookup struct { + StoreName string + Key string +} + +// Value.Version is the actual MVCC version that satisfied the lookup, +// which may be older than the requested target. +type Value struct { + Bytes []byte + Version int64 +} + +type Reader interface { + // Get returns ErrNotFound if no row exists at or before targetVersion, + // or if the latest such row is a tombstone. + Get(ctx context.Context, storeName string, key []byte, targetVersion int64) (Value, error) + + // BatchGet resolves many (store, key) pairs in one round-trip. Missing + // or tombstoned pairs are absent from the returned map. + BatchGet(ctx context.Context, targetVersion int64, lookups []Lookup) (map[Lookup]Value, error) + + LastVersion(ctx context.Context) (int64, error) + Close() error +} diff --git a/sei-db/state_db/ss/offload/historical/store.go b/sei-db/state_db/ss/offload/historical/store.go new file mode 100644 index 0000000000..a50d0a2fa4 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/store.go @@ -0,0 +1,103 @@ +package historical + +import ( + "context" + "errors" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +// FallbackStateStore routes Get/Has below the primary's earliest version to +// the Reader; iterators and writes always go to the primary. +type FallbackStateStore struct { + primary types.StateStore + reader Reader +} + +var _ types.StateStore = (*FallbackStateStore)(nil) + +// NewFallbackStateStore takes ownership of primary and reader for Close. +func NewFallbackStateStore(primary types.StateStore, reader Reader) *FallbackStateStore { + return &FallbackStateStore{primary: primary, reader: reader} +} + +func (s *FallbackStateStore) shouldFallback(version int64) bool { + earliest := s.primary.GetEarliestVersion() + return earliest > 0 && version < earliest +} + +func (s *FallbackStateStore) Get(storeKey string, version int64, key []byte) ([]byte, error) { + if !s.shouldFallback(version) { + return s.primary.Get(storeKey, version, key) + } + v, err := s.reader.Get(context.Background(), storeKey, key, version) + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, nil + } + return nil, err + } + return v.Bytes, nil +} + +func (s *FallbackStateStore) Has(storeKey string, version int64, key []byte) (bool, error) { + if !s.shouldFallback(version) { + return s.primary.Has(storeKey, version, key) + } + _, err := s.reader.Get(context.Background(), storeKey, key, version) + if err != nil { + if errors.Is(err, ErrNotFound) { + return false, nil + } + return false, err + } + return true, nil +} + +func (s *FallbackStateStore) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return s.primary.Iterator(storeKey, version, start, end) +} + +func (s *FallbackStateStore) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return s.primary.ReverseIterator(storeKey, version, start, end) +} + +func (s *FallbackStateStore) RawIterate(storeKey string, fn func([]byte, []byte, int64) bool) (bool, error) { + return s.primary.RawIterate(storeKey, fn) +} + +func (s *FallbackStateStore) GetLatestVersion() int64 { return s.primary.GetLatestVersion() } + +func (s *FallbackStateStore) SetLatestVersion(version int64) error { + return s.primary.SetLatestVersion(version) +} + +func (s *FallbackStateStore) GetEarliestVersion() int64 { return s.primary.GetEarliestVersion() } + +func (s *FallbackStateStore) SetEarliestVersion(version int64, ignoreVersion bool) error { + return s.primary.SetEarliestVersion(version, ignoreVersion) +} + +func (s *FallbackStateStore) ApplyChangesetSync(version int64, changesets []*proto.NamedChangeSet) error { + return s.primary.ApplyChangesetSync(version, changesets) +} + +func (s *FallbackStateStore) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + return s.primary.ApplyChangesetAsync(version, changesets) +} + +func (s *FallbackStateStore) Prune(version int64) error { return s.primary.Prune(version) } + +func (s *FallbackStateStore) Import(version int64, ch <-chan types.SnapshotNode) error { + return s.primary.Import(version, ch) +} + +func (s *FallbackStateStore) Close() error { + primaryErr := s.primary.Close() + readerErr := s.reader.Close() + if primaryErr != nil { + return primaryErr + } + return readerErr +} diff --git a/sei-db/state_db/ss/offload/historical/store_test.go b/sei-db/state_db/ss/offload/historical/store_test.go new file mode 100644 index 0000000000..ee6be042a5 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/store_test.go @@ -0,0 +1,176 @@ +package historical + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +// fakePrimary is a minimal types.StateStore implementation for routing tests. +// Only the calls FallbackStateStore actually makes are populated; the rest +// return zero values, which keeps the test file focused on routing logic. +type fakePrimary struct { + earliest int64 + latest int64 + gets map[string][]byte // storeKey|key -> value (nil means not present) + getCalls int + hasCalls int + closed bool +} + +func newFakePrimary(earliest, latest int64) *fakePrimary { + return &fakePrimary{earliest: earliest, latest: latest, gets: map[string][]byte{}} +} + +func k(storeKey string, key []byte) string { return storeKey + "|" + string(key) } + +func (f *fakePrimary) Get(storeKey string, _ int64, key []byte) ([]byte, error) { + f.getCalls++ + return f.gets[k(storeKey, key)], nil +} +func (f *fakePrimary) Has(storeKey string, _ int64, key []byte) (bool, error) { + f.hasCalls++ + return f.gets[k(storeKey, key)] != nil, nil +} +func (f *fakePrimary) Iterator(string, int64, []byte, []byte) (types.DBIterator, error) { + return nil, nil +} +func (f *fakePrimary) ReverseIterator(string, int64, []byte, []byte) (types.DBIterator, error) { + return nil, nil +} +func (f *fakePrimary) RawIterate(string, func([]byte, []byte, int64) bool) (bool, error) { + return false, nil +} +func (f *fakePrimary) GetLatestVersion() int64 { return f.latest } +func (f *fakePrimary) SetLatestVersion(int64) error { return nil } +func (f *fakePrimary) GetEarliestVersion() int64 { return f.earliest } +func (f *fakePrimary) SetEarliestVersion(int64, bool) error { return nil } +func (f *fakePrimary) ApplyChangesetSync(int64, []*proto.NamedChangeSet) error { return nil } +func (f *fakePrimary) ApplyChangesetAsync(int64, []*proto.NamedChangeSet) error { return nil } +func (f *fakePrimary) Prune(int64) error { return nil } +func (f *fakePrimary) Import(int64, <-chan types.SnapshotNode) error { return nil } +func (f *fakePrimary) Close() error { f.closed = true; return nil } + +// fakeReader implements Reader for routing tests. It records call counts so +// each test can assert that fallback (or non-fallback) actually happened. +type fakeReader struct { + values map[Lookup]Value + getCalls int + closeCall bool +} + +func newFakeReader() *fakeReader { return &fakeReader{values: map[Lookup]Value{}} } + +func (r *fakeReader) Get(_ context.Context, storeName string, key []byte, _ int64) (Value, error) { + r.getCalls++ + v, ok := r.values[Lookup{StoreName: storeName, Key: string(key)}] + if !ok { + return Value{}, ErrNotFound + } + return v, nil +} +func (r *fakeReader) BatchGet(context.Context, int64, []Lookup) (map[Lookup]Value, error) { + return nil, nil +} +func (r *fakeReader) LastVersion(context.Context) (int64, error) { return 0, nil } +func (r *fakeReader) Close() error { r.closeCall = true; return nil } + +func TestFallbackRoutesBelowEarliest(t *testing.T) { + p := newFakePrimary(100, 200) + r := newFakeReader() + r.values[Lookup{StoreName: "evm", Key: "k"}] = Value{Bytes: []byte("from-cockroach"), Version: 50} + s := NewFallbackStateStore(p, r) + + got, err := s.Get("evm", 50, []byte("k")) + require.NoError(t, err) + require.Equal(t, []byte("from-cockroach"), got) + require.Equal(t, 1, r.getCalls) + require.Equal(t, 0, p.getCalls, "primary should not be consulted below earliest") +} + +func TestFallbackUsesPrimaryAtOrAboveEarliest(t *testing.T) { + p := newFakePrimary(100, 200) + p.gets[k("evm", []byte("k"))] = []byte("from-primary") + r := newFakeReader() + s := NewFallbackStateStore(p, r) + + for _, version := range []int64{100, 150, 200} { + got, err := s.Get("evm", version, []byte("k")) + require.NoError(t, err) + require.Equal(t, []byte("from-primary"), got, "version=%d should hit primary", version) + } + require.Equal(t, 3, p.getCalls) + require.Equal(t, 0, r.getCalls, "reader should not be consulted at or above earliest") +} + +func TestFallbackUsesPrimaryWhenEarliestIsZero(t *testing.T) { + // earliest=0 means the primary has no data yet (or was never pruned). We + // shouldn't fan out to Cockroach in that case — the primary owns it. + p := newFakePrimary(0, 0) + r := newFakeReader() + s := NewFallbackStateStore(p, r) + + _, err := s.Get("evm", 50, []byte("k")) + require.NoError(t, err) + require.Equal(t, 1, p.getCalls) + require.Equal(t, 0, r.getCalls) +} + +func TestFallbackHasMirrorsGetRouting(t *testing.T) { + p := newFakePrimary(100, 200) + r := newFakeReader() + r.values[Lookup{StoreName: "bank", Key: "addr"}] = Value{Bytes: []byte{1}, Version: 50} + s := NewFallbackStateStore(p, r) + + ok, err := s.Has("bank", 50, []byte("addr")) + require.NoError(t, err) + require.True(t, ok) + + ok, err = s.Has("bank", 50, []byte("missing")) + require.NoError(t, err) + require.False(t, ok) +} + +func TestFallbackPropagatesNonNotFoundReaderErrors(t *testing.T) { + p := newFakePrimary(100, 200) + r := &errReader{err: errors.New("boom")} + s := NewFallbackStateStore(p, r) + + _, err := s.Get("evm", 50, []byte("k")) + require.Error(t, err) + require.Contains(t, err.Error(), "boom") +} + +type errReader struct{ err error } + +func (e *errReader) Get(context.Context, string, []byte, int64) (Value, error) { + return Value{}, e.err +} +func (e *errReader) BatchGet(context.Context, int64, []Lookup) (map[Lookup]Value, error) { + return nil, e.err +} +func (e *errReader) LastVersion(context.Context) (int64, error) { return 0, e.err } +func (e *errReader) Close() error { return nil } + +func TestFallbackCloseClosesBoth(t *testing.T) { + p := newFakePrimary(0, 0) + r := newFakeReader() + s := NewFallbackStateStore(p, r) + + require.NoError(t, s.Close()) + require.True(t, p.closed) + require.True(t, r.closeCall) +} + +func TestFallbackPassthroughGettersDelegate(t *testing.T) { + p := newFakePrimary(123, 456) + s := NewFallbackStateStore(p, newFakeReader()) + + require.Equal(t, int64(123), s.GetEarliestVersion()) + require.Equal(t, int64(456), s.GetLatestVersion()) +} diff --git a/sei-db/state_db/ss/offload/kafka.go b/sei-db/state_db/ss/offload/kafka.go index edbe366818..17ae565979 100644 --- a/sei-db/state_db/ss/offload/kafka.go +++ b/sei-db/state_db/ss/offload/kafka.go @@ -212,6 +212,12 @@ func kafkaCompression(name string) compress.Compression { } func kafkaSASLMechanism(cfg KafkaConfig) (sasl.Mechanism, error) { + return NewSASLMechanism(cfg) +} + +// NewSASLMechanism builds a SASL mechanism from a KafkaConfig, so consumers +// that live outside this package can share the same auth path as the producer. +func NewSASLMechanism(cfg KafkaConfig) (sasl.Mechanism, error) { switch strings.ToLower(cfg.SASLMechanism) { case "", kafkaOptionNone: return nil, nil diff --git a/sei-db/state_db/ss/store.go b/sei-db/state_db/ss/store.go index 0fb4b5184e..fe03c38a7a 100644 --- a/sei-db/state_db/ss/store.go +++ b/sei-db/state_db/ss/store.go @@ -1,15 +1,30 @@ package ss import ( + "fmt" + "github.com/sei-protocol/sei-chain/sei-db/config" "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/composite" + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/historical" ) -// NewStateStore creates a CompositeStateStore which handles both Cosmos and EVM data. -// The backend (pebbledb or rocksdb) is resolved at compile time via build-tag-gated -// files in the backend package. When WriteMode/ReadMode are both cosmos_only (the default), -// the EVM stores are not opened and the composite store behaves identically to a plain cosmos state store. +// NewStateStore opens the composite SS and, if HistoricalOffloadDSN is set, +// wraps it with a Cockroach-backed fallback for reads of pruned versions. func NewStateStore(homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) { - return composite.NewCompositeStateStore(ssConfig, homeDir) + cs, err := composite.NewCompositeStateStore(ssConfig, homeDir) + if err != nil { + return nil, err + } + if ssConfig.HistoricalOffloadDSN == "" { + return cs, nil + } + reader, err := historical.NewCockroachReader(historical.CockroachConfig{ + DSN: ssConfig.HistoricalOffloadDSN, + }) + if err != nil { + _ = cs.Close() + return nil, fmt.Errorf("open historical offload reader: %w", err) + } + return historical.NewFallbackStateStore(cs, reader), nil }