Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions cmd/mithril/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ import (
"k8s.io/klog/v2"
)

const (
// Default RPC endpoint
defaultRpcEndpoint = "https://api.mainnet-beta.solana.com"
minPort = 0
maxPort = 65535
defaultParamArenaSizeMB = 512
defaultBorrowedAccountArenaSize = 1024
defaultMaxConcurrentFlushers = 16
defaultTmpDir = "/tmp"
portDisabled = 0
portNotSet = -1
slotNotSet = -1
manifestFilename = "manifest"
)

var (
Verifier = cobra.Command{
Use: "verifier",
Expand Down Expand Up @@ -87,18 +102,18 @@ func init() {
Verifier.Flags().StringVarP(&outputDir, "out", "o", "", "Output path for writing AccountsDB data to")
Verifier.Flags().StringVarP(&rpcEndpoint, "rpc", "r", "", "URL for RPC endpoint")
Verifier.Flags().Int64Var(&numReplaySlots, "num-replay-slots", 0, "Number of slots to replay.")
Verifier.Flags().Int64VarP(&endSlot, "endslot", "e", -1, "Block at which to stop replaying, inclusive")
Verifier.Flags().Int64Var(&pprofPort, "pprofport", -1, "Port to serve HTTP pprof endpoint")
Verifier.Flags().Int64VarP(&endSlot, "endslot", "e", slotNotSet, "Block at which to stop replaying, inclusive")
Verifier.Flags().Int64Var(&pprofPort, "pprofport", portNotSet, "Port to serve HTTP pprof endpoint")
Verifier.Flags().StringVar(&blockDir, "blockdir", "", "Path containing slot.json files")
Verifier.Flags().Int64Var(&txParallelism, "txpar", 0, "Set to 0 to use sequential execution, or >0 to execute a topsort tx plan with the given number of workers")
Verifier.Flags().StringSliceVar(&debugTxs, "debugtx", []string{}, "Pass tx signature strings to enable debug logging during that transaction's execution")
Verifier.Flags().StringSliceVar(&debugAcctWrites, "debugacctwrites", []string{}, "Pass account pubkeys to enable debug logging of transactions that modify the account")
Verifier.Flags().StringVar(&metricsFilename, "metrics-filename", "", "Filename to write JSONL records of latencies")
Verifier.Flags().StringVar(&cpuprofFilename, "cpuprof-filename", "", "Filename to write CPU profile")
Verifier.Flags().Uint64Var(&paramArenaSizeMB, "param-arena-size-mb", 512, "Size in MB for serialized parameter arena (0 to disable)")
Verifier.Flags().Uint64Var(&borrowedAccountArenaSize, "borrowed-account-arena-size", 1024, "Number of borrowed accounts to preallocate in arena (0 to disable)")
Verifier.Flags().Uint64Var(&paramArenaSizeMB, "param-arena-size-mb", defaultParamArenaSizeMB, "Size in MB for serialized parameter arena (0 to disable)")
Verifier.Flags().Uint64Var(&borrowedAccountArenaSize, "borrowed-account-arena-size", defaultBorrowedAccountArenaSize, "Number of borrowed accounts to preallocate in arena (0 to disable)")
Verifier.Flags().IntVar(&snapshot.ZstdDecoderConcurrency, "zstd-decoder-concurrency", runtime.NumCPU(), "Zstd decoder concurrency")
Verifier.Flags().IntVar(&snapshot.MaxConcurrentFlushers, "max-concurrent-flushers", 16, "Bound for number of log shards to flush to Accounts DB Index at once.")
Verifier.Flags().IntVar(&snapshot.MaxConcurrentFlushers, "max-concurrent-flushers", defaultMaxConcurrentFlushers, "Bound for number of log shards to flush to Accounts DB Index at once.")
Verifier.Flags().BoolVar(&sbpf.UsePool, "use-pool", true, "Disable to allocate fresh slices")
Verifier.Flags().StringVar(&snapshotDlPath, "download-snapshot", "", "Path to download snapshot to")
Verifier.Flags().IntVar(&rpcPort, "rpc-server-port", 0, "RPC server port. Default off.")
Expand All @@ -112,21 +127,21 @@ func init() {
Catchup.Flags().StringSliceVar(&debugAcctWrites, "debugacctwrites", []string{}, "Pass account pubkeys to enable debug logging of transactions that modify the account")
Catchup.Flags().StringVar(&metricsFilename, "metrics-filename", "", "Filename to write JSONL records of latencies")
Catchup.Flags().StringVar(&cpuprofFilename, "cpuprof-filename", "", "Filename to write CPU profile")
Catchup.Flags().Uint64Var(&paramArenaSizeMB, "param-arena-size-mb", 512, "Size in MB for serialized parameter arena (0 to disable)")
Catchup.Flags().Uint64Var(&borrowedAccountArenaSize, "borrowed-account-arena-size", 1024, "Number of borrowed accounts to preallocate in arena (0 to disable)")
Catchup.Flags().Uint64Var(&paramArenaSizeMB, "param-arena-size-mb", defaultParamArenaSizeMB, "Size in MB for serialized parameter arena (0 to disable)")
Catchup.Flags().Uint64Var(&borrowedAccountArenaSize, "borrowed-account-arena-size", defaultBorrowedAccountArenaSize, "Number of borrowed accounts to preallocate in arena (0 to disable)")
Catchup.Flags().IntVar(&snapshot.ZstdDecoderConcurrency, "zstd-decoder-concurrency", runtime.NumCPU(), "Zstd decoder concurrency")
Catchup.Flags().IntVar(&snapshot.MaxConcurrentFlushers, "max-concurrent-flushers", 16, "Bound for number of log shards to flush to Accounts DB Index at once.")
Catchup.Flags().IntVar(&snapshot.MaxConcurrentFlushers, "max-concurrent-flushers", defaultMaxConcurrentFlushers, "Bound for number of log shards to flush to Accounts DB Index at once.")
Catchup.Flags().BoolVar(&sbpf.UsePool, "use-pool", true, "Disable to allocate fresh slices")
Catchup.Flags().StringVar(&blockDir, "blockdir", "/tmp", "Path containing slot.json files")
Catchup.Flags().StringVar(&scratchDir, "scratchdir", "/tmp", "Path for downloads (e.g. snapshots) and other temp state")
Catchup.Flags().StringVar(&blockDir, "blockdir", defaultTmpDir, "Path containing slot.json files")
Catchup.Flags().StringVar(&scratchDir, "scratchdir", defaultTmpDir, "Path for downloads (e.g. snapshots) and other temp state")
Catchup.Flags().IntVar(&rpcPort, "rpc-server-port", 0, "RPC server port. Default off.")
}

func runVerifier(c *cobra.Command, args []string) {
if pprofPort != -1 {
if pprofPort != portNotSet {
startPprofHandlers(int(pprofPort))
}
if endSlot != -1 && numReplaySlots != 0 {
if endSlot != slotNotSet && numReplaySlots != 0 {
klog.Fatalf("specify at most one of --endslot and --num-replay-slots")
}

Expand Down Expand Up @@ -155,7 +170,7 @@ func runVerifier(c *cobra.Command, args []string) {
}

if rpcEndpoint == "" {
rpcEndpoint = "https://api.mainnet-beta.solana.com"
rpcEndpoint = defaultRpcEndpoint
}

if loadFromSnapshot {
Expand Down Expand Up @@ -187,7 +202,7 @@ func runVerifier(c *cobra.Command, args []string) {

mlog.Log.Infof("downloading snapshot...")

path, _, _, err = snapshotdl.DownloadSnapshot("https://api.mainnet-beta.solana.com", snapshotDlPath)
path, _, _, err = snapshotdl.DownloadSnapshot(defaultRpcEndpoint, snapshotDlPath)
if err != nil {
klog.Fatalf("error downloading snapshot: %s", err)
}
Expand All @@ -208,14 +223,14 @@ func runVerifier(c *cobra.Command, args []string) {
klog.Fatalf("unable to open accounts db %s\n", accountsDbDir)
}

manifest, err = snapshot.LoadManifestFromFile(filepath.Join(accountsDbDir, "manifest"))
manifest, err = snapshot.LoadManifestFromFile(filepath.Join(accountsDbDir, manifestFilename))
if err != nil {
klog.Fatalf("unable to open manifest file")
}
}

startSlot := int64(manifest.Bank.Slot + 1)
if endSlot != -1 {
if endSlot != slotNotSet {
numReplaySlots = endSlot - startSlot
} else if numReplaySlots != 0 {
endSlot = startSlot + numReplaySlots
Expand All @@ -226,7 +241,7 @@ func runVerifier(c *cobra.Command, args []string) {
return
}

if endSlot != -1 && endSlot < startSlot {
if endSlot != slotNotSet && endSlot < startSlot {
klog.Fatalf("end slot cannot be lower than start slot")
}
mlog.Log.Infof("will replay startSlot=%d endSlot=%d", startSlot, endSlot)
Expand All @@ -251,9 +266,9 @@ func runVerifier(c *cobra.Command, args []string) {
}

var rpcServer *rpcserver.RpcServer
if rpcPort < 0 || rpcPort > 65535 {
if rpcPort < minPort || rpcPort > maxPort {
klog.Fatalf("invalid port: %d", rpcPort)
} else if rpcPort != 0 {
} else if rpcPort != portDisabled {
rpcServer = rpcserver.NewRpcServer(accountsDb, uint16(rpcPort))
rpcServer.Start()
mlog.Log.Infof("started RPC server on port %d", rpcPort)
Expand Down Expand Up @@ -284,7 +299,7 @@ func runCatchup(c *cobra.Command, args []string) {
}

if rpcEndpoint == "" {
rpcEndpoint = "https://api.mainnet-beta.solana.com"
rpcEndpoint = defaultRpcEndpoint
}

if rpcEndpointFile == "" {
Expand All @@ -293,7 +308,7 @@ func runCatchup(c *cobra.Command, args []string) {

mlog.Log.Infof("downloading full snapshot...")
fullSnapshotDlStart := time.Now()
fullSnapshotPath, _, fullSnapshotSlot, err := snapshotdl.DownloadSnapshot("https://api.mainnet-beta.solana.com", snapshotDownloadPath)
fullSnapshotPath, _, fullSnapshotSlot, err := snapshotdl.DownloadSnapshot(defaultRpcEndpoint, snapshotDownloadPath)
if err != nil {
klog.Fatalf("error downloading snapshot: %s", err)
}
Expand Down Expand Up @@ -330,9 +345,9 @@ func runCatchup(c *cobra.Command, args []string) {
}

var rpcServer *rpcserver.RpcServer
if rpcPort < 0 || rpcPort > 65535 {
if rpcPort < minPort || rpcPort > maxPort {
klog.Fatalf("invalid port: %d", rpcPort)
} else if rpcPort != 0 {
} else if rpcPort != portDisabled {
rpcServer = rpcserver.NewRpcServer(accountsDb, uint16(rpcPort))
rpcServer.Start()
mlog.Log.Infof("started RPC server on port %d", rpcPort)
Expand Down