Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions cmd/reorgapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cmd

import (
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/thirdweb-dev/indexer/internal/libs"
"github.com/thirdweb-dev/indexer/internal/reorgapi"
)

var reorgAPICmd = &cobra.Command{
Use: "reorg-api",
Short: "HTTP API to publish manual reorg fixes to Kafka",
Long: `Loads old block data from ClickHouse, fetches canonical data from RPC, and publishes
to Kafka using the same reorg semantics as automatic reorg handling (old blocks as deleted, then new blocks).

Requires the same env as committer for RPC, ClickHouse, and Kafka (no S3).

Example:
curl -sS -X POST http://localhost:8080/v1/reorg/publish \
-H 'Content-Type: application/json' \
-d '{"chain_id":8453,"block_numbers":[12345,12346]}'`,
Run: runReorgAPI,
}

func runReorgAPI(cmd *cobra.Command, args []string) {
libs.InitRPCClient()
libs.InitNewClickHouseV2()
libs.InitKafkaV2ForRole("reorg-api")
libs.InitRedis()

log.Info().Str("chain_id", libs.ChainIdStr).Msg("starting reorg-api")

if err := reorgapi.RunHTTPServer(); err != nil {
log.Fatal().Err(err).Msg("reorg-api server exited with error")
}
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "optional config file path (defaults to env-only when unset)")
rootCmd.AddCommand(committerCmd)
rootCmd.AddCommand(backfillCmd)
rootCmd.AddCommand(reorgAPICmd)
}

func initConfig() {
Expand Down
15 changes: 11 additions & 4 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ type LogConfig struct {
}

type KafkaConfig struct {
Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
EnableTLS bool `mapstructure:"enableTLS"`
Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
EnableTLS bool `mapstructure:"enableTLS"`
ProducerRole string
}

type RPCBatchRequestConfig struct {
Expand Down Expand Up @@ -90,6 +91,12 @@ type Config struct {
RedisDB int `env:"REDIS_DB" envDefault:"0"`
ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"`
EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"`
// ReorgAPIListenAddr is the bind address for the manual reorg publish HTTP server (reorg-api command).
ReorgAPIListenAddr string `env:"REORG_API_LISTEN_ADDR" envDefault:":8080"`
// ReorgAPIKey, when non-empty, requires requests to send Authorization: Bearer <ReorgAPIKey>.
ReorgAPIKey string `env:"REORG_API_KEY"`
Comment thread
nischitpra marked this conversation as resolved.
// ReorgAPIClickhouseBatchSize is how many block numbers to load from ClickHouse per reorg-api sub-request (manual reorg publish).
ReorgAPIClickhouseBatchSize uint64 `env:"REORG_API_CLICKHOUSE_BATCH_SIZE" envDefault:"10"`
Comment thread
nischitpra marked this conversation as resolved.
}

var Cfg Config
Expand Down
149 changes: 149 additions & 0 deletions internal/libs/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"crypto/tls"
"fmt"
"math/big"
"slices"
"strconv"
"strings"
"sync"

"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

"github.com/ClickHouse/clickhouse-go/v2"
config "github.com/thirdweb-dev/indexer/configs"
Expand Down Expand Up @@ -250,6 +252,153 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
return blockData, nil
}

func joinUint64sForIN(nums []uint64) string {
var b strings.Builder
b.Grow(len(nums) * 12)
for i, n := range nums {
if i > 0 {
b.WriteByte(',')
}
b.WriteString(strconv.FormatUint(n, 10))
}
return b.String()
}

func queryBlocksByBlockNumbers(chainId uint64, nums []uint64) ([]common.Block, error) {
if len(nums) == 0 {
return nil, nil
}
q := fmt.Sprintf(
"SELECT %s FROM %s.blocks FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number",
strings.Join(defaultBlockFields, ", "),
config.Cfg.CommitterClickhouseDatabase,
chainId,
joinUint64sForIN(nums),
)
return execQueryV2[common.Block](q)
}

func queryTransactionsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Transaction, error) {
if len(nums) == 0 {
return nil, nil
}
q := fmt.Sprintf(
"SELECT %s FROM %s.transactions FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index",
strings.Join(defaultTransactionFields, ", "),
config.Cfg.CommitterClickhouseDatabase,
chainId,
joinUint64sForIN(nums),
)
return execQueryV2[common.Transaction](q)
}

func queryLogsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Log, error) {
if len(nums) == 0 {
return nil, nil
}
q := fmt.Sprintf(
"SELECT %s FROM %s.logs FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, log_index",
strings.Join(defaultLogFields, ", "),
config.Cfg.CommitterClickhouseDatabase,
chainId,
joinUint64sForIN(nums),
)
return execQueryV2[common.Log](q)
}

func queryTracesByBlockNumbers(chainId uint64, nums []uint64) ([]common.Trace, error) {
if len(nums) == 0 {
return nil, nil
}
q := fmt.Sprintf(
"SELECT %s FROM %s.traces FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index",
strings.Join(defaultTraceFields, ", "),
config.Cfg.CommitterClickhouseDatabase,
chainId,
joinUint64sForIN(nums),
)
return execQueryV2[common.Trace](q)
}

// GetBlockDataFromClickHouseForBlockNumbers loads stored block data for specific block numbers using
// block_number IN (...). Callers that send very large lists should chunk requests (e.g. reorg-api batches by REORG_API_CLICKHOUSE_BATCH_SIZE).
func GetBlockDataFromClickHouseForBlockNumbers(chainId uint64, blockNumbers []uint64) ([]*common.BlockData, error) {
if len(blockNumbers) == 0 {
return nil, nil
}
nums := slices.Clone(blockNumbers)
slices.Sort(nums)
nums = slices.Compact(nums)

var blocks []common.Block
var txs []common.Transaction
var logs []common.Log
var traces []common.Trace
g := new(errgroup.Group)
g.Go(func() (err error) {
blocks, err = queryBlocksByBlockNumbers(chainId, nums)
return err
})
g.Go(func() (err error) {
txs, err = queryTransactionsByBlockNumbers(chainId, nums)
return err
})
g.Go(func() (err error) {
logs, err = queryLogsByBlockNumbers(chainId, nums)
return err
})
g.Go(func() (err error) {
traces, err = queryTracesByBlockNumbers(chainId, nums)
return err
})
if err := g.Wait(); err != nil {
return nil, err
Comment thread
nischitpra marked this conversation as resolved.
}

blocksByNum := make(map[uint64]common.Block, len(blocks))
for _, b := range blocks {
if b.Number != nil {
blocksByNum[b.Number.Uint64()] = b
}
}
txByNum := make(map[uint64][]common.Transaction)
for _, t := range txs {
if t.BlockNumber != nil {
bn := t.BlockNumber.Uint64()
txByNum[bn] = append(txByNum[bn], t)
}
}
logsByNum := make(map[uint64][]common.Log)
for _, l := range logs {
if l.BlockNumber != nil {
bn := l.BlockNumber.Uint64()
logsByNum[bn] = append(logsByNum[bn], l)
}
}
tracesByNum := make(map[uint64][]common.Trace)
for _, tr := range traces {
if tr.BlockNumber != nil {
bn := tr.BlockNumber.Uint64()
tracesByNum[bn] = append(tracesByNum[bn], tr)
}
}

out := make([]*common.BlockData, 0, len(nums))
for _, bn := range nums {
b, ok := blocksByNum[bn]
if !ok || b.ChainId == nil || b.Number == nil || b.ChainId.Uint64() == 0 {
continue
}
out = append(out, &common.BlockData{
Block: b,
Transactions: txByNum[bn],
Logs: logsByNum[bn],
Traces: tracesByNum[bn],
})
}
return out, nil
}

// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
// where the stored transaction_count in the blocks table does not match the number
// of transactions in the transactions table. It returns the minimum and maximum
Expand Down
13 changes: 9 additions & 4 deletions internal/libs/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import (
var KafkaPublisherV2 *storage.KafkaPublisher

func InitKafkaV2() {
InitKafkaV2ForRole("committer")
}

func InitKafkaV2ForRole(role string) {
var err error
KafkaPublisherV2, err = storage.NewKafkaPublisher(&config.KafkaConfig{
Brokers: config.Cfg.CommitterKafkaBrokers,
Username: config.Cfg.CommitterKafkaUsername,
Password: config.Cfg.CommitterKafkaPassword,
EnableTLS: config.Cfg.CommitterKafkaEnableTLS,
Brokers: config.Cfg.CommitterKafkaBrokers,
Username: config.Cfg.CommitterKafkaUsername,
Password: config.Cfg.CommitterKafkaPassword,
EnableTLS: config.Cfg.CommitterKafkaEnableTLS,
ProducerRole: role,
})
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize Kafka publisher")
Expand Down
Loading
Loading