diff --git a/cmd/reorgapi.go b/cmd/reorgapi.go new file mode 100644 index 0000000..f492e85 --- /dev/null +++ b/cmd/reorgapi.go @@ -0,0 +1,36 @@ +package cmd + +import ( + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "github.com/thirdweb-dev/indexer/internal/libs" + "github.com/thirdweb-dev/indexer/internal/reorgapi" +) + +var reorgAPICmd = &cobra.Command{ + Use: "reorg-api", + Short: "HTTP API to publish manual reorg fixes to Kafka", + Long: `Loads old block data from ClickHouse, fetches canonical data from RPC, and publishes +to Kafka using the same reorg semantics as automatic reorg handling (old blocks as deleted, then new blocks). + +Requires the same env as committer for RPC, ClickHouse, and Kafka (no S3). + +Example: + curl -sS -X POST http://localhost:8080/v1/reorg/publish \ + -H 'Content-Type: application/json' \ + -d '{"chain_id":8453,"block_numbers":[12345,12346]}'`, + Run: runReorgAPI, +} + +func runReorgAPI(cmd *cobra.Command, args []string) { + libs.InitRPCClient() + libs.InitNewClickHouseV2() + libs.InitKafkaV2ForRole("reorg-api") + libs.InitRedis() + + log.Info().Str("chain_id", libs.ChainIdStr).Msg("starting reorg-api") + + if err := reorgapi.RunHTTPServer(); err != nil { + log.Fatal().Err(err).Msg("reorg-api server exited with error") + } +} diff --git a/cmd/root.go b/cmd/root.go index 6440fb3..b209e4c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -33,6 +33,7 @@ func init() { rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "optional config file path (defaults to env-only when unset)") rootCmd.AddCommand(committerCmd) rootCmd.AddCommand(backfillCmd) + rootCmd.AddCommand(reorgAPICmd) } func initConfig() { diff --git a/configs/config.go b/configs/config.go index 2987132..dac12c1 100644 --- a/configs/config.go +++ b/configs/config.go @@ -16,10 +16,11 @@ type LogConfig struct { } type KafkaConfig struct { - Brokers string `mapstructure:"brokers"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - EnableTLS bool `mapstructure:"enableTLS"` + Brokers string `mapstructure:"brokers"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + EnableTLS bool `mapstructure:"enableTLS"` + ProducerRole string } type RPCBatchRequestConfig struct { @@ -90,6 +91,12 @@ type Config struct { RedisDB int `env:"REDIS_DB" envDefault:"0"` ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"` EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"` + // ReorgAPIListenAddr is the bind address for the manual reorg publish HTTP server (reorg-api command). + ReorgAPIListenAddr string `env:"REORG_API_LISTEN_ADDR" envDefault:":8080"` + // ReorgAPIKey, when non-empty, requires requests to send Authorization: Bearer . + ReorgAPIKey string `env:"REORG_API_KEY"` + // ReorgAPIClickhouseBatchSize is how many block numbers to load from ClickHouse per reorg-api sub-request (manual reorg publish). + ReorgAPIClickhouseBatchSize uint64 `env:"REORG_API_CLICKHOUSE_BATCH_SIZE" envDefault:"10"` } var Cfg Config diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index b5419d7..21c5fef 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -5,11 +5,13 @@ import ( "crypto/tls" "fmt" "math/big" + "slices" "strconv" "strings" "sync" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" "github.com/ClickHouse/clickhouse-go/v2" config "github.com/thirdweb-dev/indexer/configs" @@ -250,6 +252,153 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl return blockData, nil } +func joinUint64sForIN(nums []uint64) string { + var b strings.Builder + b.Grow(len(nums) * 12) + for i, n := range nums { + if i > 0 { + b.WriteByte(',') + } + b.WriteString(strconv.FormatUint(n, 10)) + } + return b.String() +} + +func queryBlocksByBlockNumbers(chainId uint64, nums []uint64) ([]common.Block, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.blocks FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number", + strings.Join(defaultBlockFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Block](q) +} + +func queryTransactionsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Transaction, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.transactions FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index", + strings.Join(defaultTransactionFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Transaction](q) +} + +func queryLogsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Log, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.logs FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, log_index", + strings.Join(defaultLogFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Log](q) +} + +func queryTracesByBlockNumbers(chainId uint64, nums []uint64) ([]common.Trace, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.traces FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index", + strings.Join(defaultTraceFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Trace](q) +} + +// GetBlockDataFromClickHouseForBlockNumbers loads stored block data for specific block numbers using +// block_number IN (...). Callers that send very large lists should chunk requests (e.g. reorg-api batches by REORG_API_CLICKHOUSE_BATCH_SIZE). +func GetBlockDataFromClickHouseForBlockNumbers(chainId uint64, blockNumbers []uint64) ([]*common.BlockData, error) { + if len(blockNumbers) == 0 { + return nil, nil + } + nums := slices.Clone(blockNumbers) + slices.Sort(nums) + nums = slices.Compact(nums) + + var blocks []common.Block + var txs []common.Transaction + var logs []common.Log + var traces []common.Trace + g := new(errgroup.Group) + g.Go(func() (err error) { + blocks, err = queryBlocksByBlockNumbers(chainId, nums) + return err + }) + g.Go(func() (err error) { + txs, err = queryTransactionsByBlockNumbers(chainId, nums) + return err + }) + g.Go(func() (err error) { + logs, err = queryLogsByBlockNumbers(chainId, nums) + return err + }) + g.Go(func() (err error) { + traces, err = queryTracesByBlockNumbers(chainId, nums) + return err + }) + if err := g.Wait(); err != nil { + return nil, err + } + + blocksByNum := make(map[uint64]common.Block, len(blocks)) + for _, b := range blocks { + if b.Number != nil { + blocksByNum[b.Number.Uint64()] = b + } + } + txByNum := make(map[uint64][]common.Transaction) + for _, t := range txs { + if t.BlockNumber != nil { + bn := t.BlockNumber.Uint64() + txByNum[bn] = append(txByNum[bn], t) + } + } + logsByNum := make(map[uint64][]common.Log) + for _, l := range logs { + if l.BlockNumber != nil { + bn := l.BlockNumber.Uint64() + logsByNum[bn] = append(logsByNum[bn], l) + } + } + tracesByNum := make(map[uint64][]common.Trace) + for _, tr := range traces { + if tr.BlockNumber != nil { + bn := tr.BlockNumber.Uint64() + tracesByNum[bn] = append(tracesByNum[bn], tr) + } + } + + out := make([]*common.BlockData, 0, len(nums)) + for _, bn := range nums { + b, ok := blocksByNum[bn] + if !ok || b.ChainId == nil || b.Number == nil || b.ChainId.Uint64() == 0 { + continue + } + out = append(out, &common.BlockData{ + Block: b, + Transactions: txByNum[bn], + Logs: logsByNum[bn], + Traces: tracesByNum[bn], + }) + } + return out, nil +} + // GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range, // where the stored transaction_count in the blocks table does not match the number // of transactions in the transactions table. It returns the minimum and maximum diff --git a/internal/libs/kafka.go b/internal/libs/kafka.go index ac43c0c..09933a6 100644 --- a/internal/libs/kafka.go +++ b/internal/libs/kafka.go @@ -9,12 +9,17 @@ import ( var KafkaPublisherV2 *storage.KafkaPublisher func InitKafkaV2() { + InitKafkaV2ForRole("committer") +} + +func InitKafkaV2ForRole(role string) { var err error KafkaPublisherV2, err = storage.NewKafkaPublisher(&config.KafkaConfig{ - Brokers: config.Cfg.CommitterKafkaBrokers, - Username: config.Cfg.CommitterKafkaUsername, - Password: config.Cfg.CommitterKafkaPassword, - EnableTLS: config.Cfg.CommitterKafkaEnableTLS, + Brokers: config.Cfg.CommitterKafkaBrokers, + Username: config.Cfg.CommitterKafkaUsername, + Password: config.Cfg.CommitterKafkaPassword, + EnableTLS: config.Cfg.CommitterKafkaEnableTLS, + ProducerRole: role, }) if err != nil { log.Fatal().Err(err).Msg("Failed to initialize Kafka publisher") diff --git a/internal/libs/libblockdata/getblockdata.go b/internal/libs/libblockdata/getblockdata.go index e0fb992..541d13b 100644 --- a/internal/libs/libblockdata/getblockdata.go +++ b/internal/libs/libblockdata/getblockdata.go @@ -271,6 +271,152 @@ func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData { return blockData } +// fetchBlockDataFromRpcBatch fetches full block data from RPC with retries; returns an error instead of panicking. +func fetchBlockDataFromRpcBatch(blockNumbers []uint64) ([]*common.BlockData, error) { + var rpcResults []rpc.GetFullBlockResult + chainIdStr := libs.ChainIdStr + indexerName := config.Cfg.ZeetProjectName + + // Initial fetch + rpcResults = libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(blockNumbers)) + + metrics.CommitterRPCRowsToFetch.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockNumbers))) + + // Create array of failed block numbers for retry + failedBlockNumbers := make([]uint64, 0) + for i, result := range rpcResults { + if result.Error != nil { + log.Error().Uint64("block_number", blockNumbers[i]).Err(result.Error).Msg("Failed to fetch block data from RPC") + failedBlockNumbers = append(failedBlockNumbers, blockNumbers[i]) + } + } + + // Retry only failed blocks up to 3 times + for retry := range 3 { + if len(failedBlockNumbers) == 0 { + break // All blocks succeeded + } + + // Track retry metric + metrics.CommitterRPCRetries.WithLabelValues(indexerName, chainIdStr).Set(float64(len(failedBlockNumbers))) + + log.Warn(). + Int("retry", retry+1). + Int("failed_count", len(failedBlockNumbers)). + Msg("Retrying failed block fetches...") + + // Retry only the failed blocks + retryResults := libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(failedBlockNumbers)) + + // Update rpcResults with successful ones and create new failed array + newFailedBlockNumbers := make([]uint64, 0) + retryIndex := 0 + + for i, result := range rpcResults { + if result.Error != nil { + // This was a failed block, check if retry succeeded + if retryIndex < len(retryResults) && retryResults[retryIndex].Error == nil { + // Retry succeeded - update the result + rpcResults[i] = retryResults[retryIndex] + } else { + // Still failed - add to new failed array + newFailedBlockNumbers = append(newFailedBlockNumbers, blockNumbers[i]) + } + retryIndex++ + } + } + + failedBlockNumbers = newFailedBlockNumbers + + // Add delay between retries + if len(failedBlockNumbers) > 0 && retry < 2 { + time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) + } + } + + // Check if any blocks still failed after all retries + if len(failedBlockNumbers) > 0 { + return nil, fmt.Errorf("failed to fetch %d block(s) from RPC after 3 retries", len(failedBlockNumbers)) + } + + blockData := make([]*common.BlockData, len(rpcResults)) + for i, result := range rpcResults { + blockData[i] = &result.Data + rpcResults[i] = rpc.GetFullBlockResult{} // free memory + } + + for i, block := range blockData { + if isValid, _ := Validate(block); !isValid { + bn := uint64(0) + if i < len(blockNumbers) { + bn = blockNumbers[i] + } + return nil, fmt.Errorf("validation failed for block %d (batch index %d)", bn, i) + } + } + + return blockData, nil +} + +// FetchBlockDataFromRPC fetches and validates block data from RPC, returning an error on failure (for HTTP/tools). +func FetchBlockDataFromRPC(blockNumbers []uint64) ([]*common.BlockData, error) { + rpcBatchSize := config.Cfg.RPCBatchSize + totalBlocks := len(blockNumbers) + if totalBlocks == 0 { + return nil, nil + } + blockData := make([]*common.BlockData, totalBlocks) + numBatches := (totalBlocks + int(rpcBatchSize) - 1) / int(rpcBatchSize) + + var wg sync.WaitGroup + var mu sync.Mutex + var firstErr error + + maxConcurrentBatches := 4 + semaphore := make(chan struct{}, maxConcurrentBatches) + + for batchIndex := range numBatches { + wg.Add(1) + go func(batchIdx int) { + defer wg.Done() + + semaphore <- struct{}{} + defer func() { <-semaphore }() + + start := batchIdx * int(rpcBatchSize) + end := min(start+int(rpcBatchSize), totalBlocks) + + batchBlockNumbers := blockNumbers[start:end] + batchResults, err := fetchBlockDataFromRpcBatch(batchBlockNumbers) + if err != nil { + mu.Lock() + if firstErr == nil { + firstErr = fmt.Errorf("batch starting at block %d: %w", batchBlockNumbers[0], err) + } + mu.Unlock() + return + } + + for i, result := range batchResults { + blockData[start+i] = result + } + + log.Debug(). + Int("batch", batchIdx). + Int("start", start). + Int("end", end). + Int("batch_size", len(batchBlockNumbers)). + Msg("Completed RPC batch fetch (FetchBlockDataFromRPC)") + }(batchIndex) + } + + wg.Wait() + if firstErr != nil { + return nil, firstErr + } + return blockData, nil +} + func blockNumbersToBigInt(blockNumbers []uint64) []*big.Int { bigInts := make([]*big.Int, len(blockNumbers)) for i, blockNumber := range blockNumbers { diff --git a/internal/libs/redis.go b/internal/libs/redis.go index 6827282..967daae 100644 --- a/internal/libs/redis.go +++ b/internal/libs/redis.go @@ -13,6 +13,7 @@ import ( var RedisClient *redis.Client const RedisReorgLastValidBlock = "reorg_last_valid" +const RedisReorgAPIMaxProcessedBlock = "reorg_api_max_processed_block" // InitRedis initializes the Redis client func InitRedis() { @@ -51,6 +52,29 @@ func SetReorgLastValidBlock(chainID string, blockNumber int64) error { return RedisClient.HSet(context.Background(), RedisReorgLastValidBlock, chainID, blockNumber).Err() } +func GetReorgAPIMaxProcessedBlock(chainID string) (uint64, error) { + result, err := RedisClient.HGet(context.Background(), RedisReorgAPIMaxProcessedBlock, chainID).Result() + if err == redis.Nil { + return 0, nil + } + if err != nil { + return 0, err + } + n, err := strconv.ParseUint(result, 10, 64) + if err != nil { + return 0, err + } + return n, nil +} + +func SetReorgAPIMaxProcessedBlock(chainID string, blockNumber uint64) error { + return RedisClient.HSet(context.Background(), RedisReorgAPIMaxProcessedBlock, chainID, strconv.FormatUint(blockNumber, 10)).Err() +} + +func ClearReorgAPIMaxProcessedBlock(chainID string) error { + return RedisClient.HDel(context.Background(), RedisReorgAPIMaxProcessedBlock, chainID).Err() +} + // CloseRedis closes the Redis client func CloseRedis() { if RedisClient != nil { diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go new file mode 100644 index 0000000..37e8645 --- /dev/null +++ b/internal/reorgapi/server.go @@ -0,0 +1,220 @@ +package reorgapi + +import ( + "fmt" + "net/http" + "slices" + "strings" + "sync" + + "github.com/gin-gonic/gin" + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/libs" + "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" +) + +// PublishReorgRequest is the JSON body for POST /v1/reorg/publish. +type PublishReorgRequest struct { + ChainID uint64 `json:"chain_id"` + BlockNumbers []uint64 `json:"block_numbers"` + SkipIdempotencyCheck bool `json:"skip_idempotency_check"` +} + +type PublishReorgResponse struct { + OK bool `json:"ok"` + BlocksPublished int `json:"blocks_published"` + Message string `json:"message,omitempty"` +} + +var ( + manualReorgMu sync.Mutex +) + +// RunHTTPServer starts a blocking HTTP server that publishes manual reorg batches to Kafka. +func RunHTTPServer() error { + gin.SetMode(gin.ReleaseMode) + r := gin.New() + r.Use(gin.Recovery()) + r.Use(gin.LoggerWithWriter(gin.DefaultWriter)) + + r.GET("/health", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"status": "ok"}) + }) + + v1 := r.Group("/v1") + v1.POST("/reorg/publish", authMiddleware(), handlePublishReorg) + + addr := config.Cfg.ReorgAPIListenAddr + log.Info().Str("addr", addr).Msg("reorg-api HTTP server listening") + return r.Run(addr) +} + +func authMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + key := config.Cfg.ReorgAPIKey + if key == "" { + c.Next() + return + } + auth := c.GetHeader("Authorization") + const prefix = "Bearer " + if !strings.HasPrefix(auth, prefix) || strings.TrimPrefix(auth, prefix) != key { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + c.Abort() + return + } + c.Next() + } +} + +func handlePublishReorg(c *gin.Context) { + var req PublishReorgRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("invalid json: %v", err)}) + return + } + if req.ChainID == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "chain_id is required"}) + return + } + if libs.ChainId == nil || libs.ChainId.Uint64() != req.ChainID { + c.JSON(http.StatusBadRequest, gin.H{ + "error": fmt.Sprintf("chain_id must match this deployment's RPC chain (%s)", libs.ChainIdStr), + }) + return + } + if len(req.BlockNumbers) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "block_numbers must be non-empty"}) + return + } + + if !manualReorgMu.TryLock() { + c.JSON(http.StatusConflict, gin.H{"error": "a manual reorg publish is already running"}) + return + } + defer manualReorgMu.Unlock() + + sorted := slices.Clone(req.BlockNumbers) + slices.Sort(sorted) + sorted = slices.Compact(sorted) + + var lastPublishedMaxBlock uint64 + var work []uint64 + if req.SkipIdempotencyCheck { + work = sorted + log.Info(). + Uint64("chain_id", req.ChainID). + Int("requested", len(sorted)). + Msg("manual reorg: skip_idempotency_check enabled; processing all requested blocks") + } else { + if libs.RedisClient == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "redis is not initialized"}) + return + } + var err error + lastPublishedMaxBlock, err = libs.GetReorgAPIMaxProcessedBlock(libs.ChainIdStr) + if err != nil { + log.Error().Err(err).Msg("manual reorg: redis get max processed block") + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read idempotency cursor from redis"}) + return + } + + work = make([]uint64, 0, len(sorted)) + for _, bn := range sorted { + if bn > lastPublishedMaxBlock { + work = append(work, bn) + } + } + if len(work) == 0 { + c.JSON(http.StatusOK, PublishReorgResponse{ + OK: true, + BlocksPublished: 0, + Message: fmt.Sprintf("nothing to publish: all requested blocks are at or below last published max (%d)", lastPublishedMaxBlock), + }) + return + } + if len(work) < len(sorted) { + log.Info(). + Uint64("last_published_max_block", lastPublishedMaxBlock). + Int("skipped", len(sorted)-len(work)). + Int("to_publish", len(work)). + Msg("manual reorg: skipping blocks already processed") + } + } + + batchSize := config.Cfg.ReorgAPIClickhouseBatchSize + if batchSize == 0 { + batchSize = 10 + } + totalBatches := (len(work) + int(batchSize) - 1) / int(batchSize) + var anyPartialOld bool + batchIdx := 0 + for i := 0; i < len(work); i += int(batchSize) { + end := min(i+int(batchSize), len(work)) + chunk := work[i:end] + batchIdx++ + rangeStart := chunk[0] + rangeEnd := chunk[len(chunk)-1] + log.Info(). + Uint64("chain_id", req.ChainID). + Int("batch", batchIdx). + Int("batch_total", totalBatches). + Uint64("block_range_start", rangeStart). + Uint64("block_range_end", rangeEnd). + Int("batch_block_count", len(chunk)). + Msg("manual reorg: processing batch (ClickHouse → RPC → Kafka)") + + chunkOld, err := libs.GetBlockDataFromClickHouseForBlockNumbers(req.ChainID, chunk) + if err != nil { + log.Error().Err(err).Msg("manual reorg: clickhouse") + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if len(chunkOld) < len(chunk) { + anyPartialOld = true + } + + chunkNew, err := libblockdata.FetchBlockDataFromRPC(chunk) + if err != nil { + log.Error().Err(err).Msg("manual reorg: rpc") + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + if len(chunkNew) != len(chunk) { + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal: rpc result length mismatch"}) + return + } + for j, bn := range chunk { + if chunkNew[j] == nil || chunkNew[j].Block.Number == nil || chunkNew[j].Block.Number.Uint64() != bn { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("rpc block order mismatch at index %d", j)}) + return + } + } + + if err := libs.KafkaPublisherV2.PublishBlockDataReorg(chunkNew, chunkOld); err != nil { + log.Error().Err(err).Msg("manual reorg: kafka") + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + + if !req.SkipIdempotencyCheck && rangeEnd > lastPublishedMaxBlock { + lastPublishedMaxBlock = rangeEnd + if err := libs.SetReorgAPIMaxProcessedBlock(libs.ChainIdStr, lastPublishedMaxBlock); err != nil { + log.Error().Err(err).Msg("manual reorg: redis set max processed block") + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update idempotency cursor in redis"}) + return + } + } + } + + msg := "published reorg batches (per batch: delete old from CH if present, then insert new from RPC)" + if anyPartialOld { + msg = "published reorg batches; at least one batch had fewer ClickHouse rows than requested (FINAL); Kafka inserts still sent for every block in those batches" + } + c.JSON(http.StatusOK, PublishReorgResponse{ + OK: true, + BlocksPublished: len(work), + Message: msg, + }) +} diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 3dabbc1..92cb3e9 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -61,13 +61,17 @@ func (b PublishableMessageRevert) GetType() MessageType { func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { brokers := strings.Split(cfg.Brokers, ",") chainID := config.Cfg.RPC.ChainID + role := strings.TrimSpace(cfg.ProducerRole) + if role == "" { + role = "default" + } opts := []kgo.Opt{ kgo.SeedBrokers(brokers...), kgo.AllowAutoTopicCreation(), kgo.ProducerBatchCompression(kgo.ZstdCompression()), - kgo.ClientID(fmt.Sprintf("insight-indexer-kafka-storage-%s", chainID)), - kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)), + kgo.ClientID(fmt.Sprintf("insight-indexer-kafka-storage-%s-%s", chainID, role)), + kgo.TransactionalID(fmt.Sprintf("insight-producer-%s-%s", chainID, role)), kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB kgo.MaxBufferedRecords(1_000_000), kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB