Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
70bbdda
Add CockroachDB schema for historical offload consumer
Kbhat1 Apr 22, 2026
619f3a9
Add Sink interface for historical offload consumer
Kbhat1 Apr 22, 2026
71a2672
Add CockroachDB sink for historical offload consumer
Kbhat1 Apr 22, 2026
e0a0371
Export SASL mechanism helper from offload package
Kbhat1 Apr 22, 2026
8d7f6be
Add Kafka reader + ChangelogEntry decoder for consumer
Kbhat1 Apr 22, 2026
537cac4
Add consumer loop: Kafka fetch -> decode -> sink -> commit
Kbhat1 Apr 22, 2026
4311da6
Add historical-offload-consumer binary and example config
Kbhat1 Apr 22, 2026
2d2d1e3
Add validation tests for consumer configs
Kbhat1 Apr 22, 2026
0806250
Add deploy.sh for historical-offload consumer setup
Kbhat1 Apr 23, 2026
8a56040
Add README for historical-offload-consumer
Kbhat1 Apr 23, 2026
ac42c35
Test DecodeEntry roundtrip and LoadConfig parsing
Kbhat1 Apr 23, 2026
5bb21e1
Test consumer loop end-to-end with fake source + sink
Kbhat1 Apr 23, 2026
ff433d0
Test mutation-batch SQL builder
Kbhat1 Apr 23, 2026
a5b577a
Retry sink writes with bounded exponential backoff
Kbhat1 Apr 27, 2026
0f7ee48
Fix golangci-lint findings in consumer package
Kbhat1 Apr 27, 2026
37efd20
Drop CommitInterval from consumer KafkaReaderConfig
Kbhat1 Apr 27, 2026
a00f03a
Hash-shard monotonic offload schema indexes
Kbhat1 Apr 27, 2026
b5714d2
Add Reader interface for historical state
Kbhat1 Apr 27, 2026
47cda76
Add CockroachDB historical Reader implementation
Kbhat1 Apr 27, 2026
2cf0815
Test reader helpers and batch-lookup SQL shape
Kbhat1 Apr 27, 2026
13e3154
Add FallbackStateStore adapter
Kbhat1 Apr 28, 2026
656c83e
Test FallbackStateStore routing
Kbhat1 Apr 28, 2026
96ac9a4
Wire historical fallback into NewStateStore
Kbhat1 Apr 28, 2026
5a2ff06
Parallelize consumer writes across partitions
Kbhat1 Apr 28, 2026
9a128a6
Index state_mutations by (store_name, version DESC)
Kbhat1 Apr 28, 2026
6697bba
Trim comments to the WHY; drop narrating prose
Kbhat1 Apr 28, 2026
d848b2f
Add state_latest + state_at_block tables
Kbhat1 Apr 28, 2026
68b0d4d
UPSERT state_latest on every consumer write
Kbhat1 Apr 28, 2026
c2aeefa
Snapshot state_at_block per block with rolling GC
Kbhat1 Apr 28, 2026
e478a39
Test state_latest builder and snapshot SQL shapes
Kbhat1 Apr 28, 2026
5fdba28
Wire ENABLE_LATEST / SNAPSHOT_STORES / SNAPSHOT_WINDOW_BLOCKS
Kbhat1 Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sei-db/config/ss_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type StateStoreConfig struct {
// When true, data is routed to separate DBs by EVM key family while
// preserving the same logical store key and full key encoding inside each DB.
SeparateEVMSubDBs bool `mapstructure:"evm-separate-dbs"`

// HistoricalOffloadDSN, when set, wraps the SS so reads below the
// primary's earliest version fall back to the Cockroach offload store.
HistoricalOffloadDSN string `mapstructure:"historical-offload-dsn"`
}

// DefaultStateStoreConfig returns the default StateStoreConfig
Expand Down
4 changes: 4 additions & 0 deletions sei-db/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ evm-ss-split = {{ .StateStore.EVMSplit }}
# When false, all EVM data stays in one DB using the current unified layout.
# When true, data is routed to separate DBs while preserving the same evm key prefix format.
evm-ss-separate-dbs = {{ .StateStore.SeparateEVMSubDBs }}

# When set, reads below the local SS's earliest version fall back to the
# CockroachDB offload store populated by the historical-offload consumer.
historical-offload-dsn = "{{ .StateStore.HistoricalOffloadDSN }}"
`

// ReceiptStoreConfigTemplate defines the configuration template for receipt-store
Expand Down
61 changes: 61 additions & 0 deletions sei-db/state_db/ss/offload/consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# historical-offload-consumer

Reads `ChangelogEntry` messages from the Kafka topic cryptosim publishes to and writes them to CockroachDB.

## Layout

- `schema/schema.sql` — CockroachDB DDL (idempotent)
- `cmd/historical-offload-consumer/` — CLI binary
- `config/example.json` — sample config
- `deploy.sh` — one-shot setup helper

## Cloud prerequisites (manual)

- MSK cluster + topic + IAM role with `kafka-cluster:Connect` and read on the topic
- CockroachDB cluster + database + user
- AWS credentials available to the process (env or IAM role)

## Run

```bash
export KAFKA_BROKERS="b-1...:9098,b-2...:9098"
export KAFKA_TOPIC="historical-offload"
export KAFKA_GROUP_ID="historical-offload-consumer"
export AWS_REGION="us-east-1"
export COCKROACH_DSN="postgresql://user@host:26257/db?sslmode=verify-full"

RUN=1 ./deploy.sh
```

`deploy.sh` applies the schema, writes the config, builds the binary, and (with `RUN=1`) starts it. Flags: `SKIP_SCHEMA=1`, `SKIP_BUILD=1`.

## Guarantees

- At-least-once delivery. Sink UPSERTs on `(store_name, key, version)` so replay is a no-op.
- Per-partition ordering preserved. With `WORKERS>1` (recommended for fast
chains) messages are sharded by partition so each partition's writes still
flow through a single worker; cross-partition writes parallelize.
- Offsets commit only after the sink persists the entry.
- Sink writes use bounded exponential backoff (5 attempts, 1s→30s) before
giving up. On give-up the process exits non-zero so the supervisor restarts;
Kafka offsets stay uncommitted, so the next run replays from the last commit.

## Read-side optimization tables

Two optional tables make trace-style reads dramatically faster. Both are off
by default; flip on by setting the matching env var before `deploy.sh`:

- `state_latest` — one row per `(store, key)` with the most recent value.
Reads at "current state" become a single PK lookup instead of a descending
scan on `state_mutations`. Enable with `ENABLE_LATEST=true`. ~2× the write
rate; cheap on Cockroach.
- `state_at_block` — dense end-of-block snapshot for hot stores. Each block
copies state_latest into state_at_block, so reads at any block in the
rolling window are a single PK lookup per `(store, key)`. Set
`SNAPSHOT_STORES="slashing,distribution,staking,bank,params"` (requires
`ENABLE_LATEST=true`); bound storage with `SNAPSHOT_WINDOW_BLOCKS=2000` so
the consumer GCs older blocks inline.

The sentinel-pointer pattern was net-negative on pebble (compaction lag);
moved here it's net-positive because the write tax goes to a system designed
to absorb it.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/consumer"
)

func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "usage: %s <config.json>\n", os.Args[0])
os.Exit(2)
}

cfg, err := consumer.LoadConfig(os.Args[1])
if err != nil {
log.Fatalf("load config: %v", err)
}

sink, err := consumer.NewCockroachSink(cfg.Cockroach)
if err != nil {
log.Fatalf("open cockroach sink: %v", err)
}
defer func() { _ = sink.Close() }()

reader, err := consumer.NewKafkaReader(cfg.Kafka)
if err != nil {
log.Fatalf("open kafka reader: %v", err)
}
defer func() { _ = reader.Close() }()

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

c := consumer.New(reader, sink, consumer.Options{
Logf: func(format string, args ...interface{}) { log.Printf(format, args...) },
Workers: cfg.Workers,
})
if err := c.Run(ctx); err != nil {
log.Fatalf("consumer: %v", err)
}
}
Loading
Loading