Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 52 additions & 73 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -42,9 +41,7 @@ import (
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
clusterpkg "github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/cluster/manifest"
manifestpb "github.com/obolnetwork/charon/cluster/manifestpb/v1"
"github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/aggsigdb"
"github.com/obolnetwork/charon/core/bcast"
Expand Down Expand Up @@ -73,8 +70,8 @@ type Config struct {
P2P p2p.Config
Log log.Config
Feature featureset.Config
LockFile string
ManifestFile string
LockFile string
NoVerify bool
PrivKeyFile string
PrivKeyLocking bool
Expand Down Expand Up @@ -118,7 +115,7 @@ type TestConfig struct {
p2p.TestPingConfig

// Lock provides the lock explicitly, skips loading from disk.
Lock *clusterpkg.Lock
Lock *cluster.Lock
// P2PKey provides the p2p privkey explicitly, skips loading from keystore on disk.
P2PKey *k1.PrivateKey
// ParSigExFunc provides an in-memory partial signature exchange.
Expand Down Expand Up @@ -177,19 +174,18 @@ func Run(ctx context.Context, conf Config) (err error) {
eth1Cl := eth1wrap.NewDefaultEthClientRunner(conf.ExecutionEngineAddr)
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartEth1Client, lifecycle.HookFuncCtx(eth1Cl.Run))

cluster, err := loadClusterManifest(ctx, conf, eth1Cl)
lock, err := loadClusterLock(ctx, conf, eth1Cl)
if err != nil {
return err
}

clusterHash := cluster.GetInitialMutationHash()
core.SetClusterHash(clusterHash)
core.SetClusterHash(lock.LockHash)

if err := wireTracing(life, conf, clusterHash); err != nil {
if err := wireTracing(life, conf, lock.LockHash); err != nil {
return err
}

network, err := eth2util.ForkVersionToNetwork(cluster.GetForkVersion())
network, err := eth2util.ForkVersionToNetwork(lock.ForkVersion)
if err != nil {
network = "unknown"
}
Expand All @@ -212,7 +208,7 @@ func Run(ctx context.Context, conf Config) (err error) {
}
}

peers, err := manifest.ClusterPeers(cluster)
peers, err := lock.Peers()
if err != nil {
return err
}
Expand All @@ -221,14 +217,14 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

lockHashHex := Hex7(cluster.GetInitialMutationHash())
lockHashHex := Hex7(lock.LockHash)

p2pNode, err := wireP2P(ctx, life, conf, cluster, p2pKey, lockHashHex)
p2pNode, err := wireP2P(ctx, life, conf, lock, p2pKey, lockHashHex, lock.UUID)
if err != nil {
return err
}

nodeIdx, err := manifest.ClusterNodeIdx(cluster, p2pNode.ID())
nodeIdx, err := lock.NodeIdx(p2pNode.ID())
if err != nil {
return errors.Wrap(err, "private key not matching cluster manifest file")
}
Expand All @@ -242,16 +238,16 @@ func Run(ctx context.Context, conf Config) (err error) {
z.Str("peer_name", p2p.PeerName(p2pNode.ID())),
z.Str("nickname", conf.Nickname),
z.Int("peer_index", nodeIdx.PeerIdx),
z.Str("cluster_name", cluster.GetName()),
z.Str("cluster_name", lock.Name),
z.Str("cluster_hash", lockHashHex),
z.Str("cluster_hash_full", hex.EncodeToString(cluster.GetInitialMutationHash())),
z.Str("cluster_hash_full", hex.EncodeToString(lock.LockHash)),
z.Str("enr", enrRec.String()),
z.Int("peers", len(cluster.GetOperators())))
z.Int("peers", len(lock.Operators)))

// Metric and logging labels.
labels := map[string]string{
"cluster_hash": lockHashHex,
"cluster_name": cluster.GetName(),
"cluster_name": lock.Name,
"cluster_peer": p2p.PeerName(p2pNode.ID()),
"nickname": conf.Nickname,
"cluster_network": network,
Expand All @@ -264,9 +260,9 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

initStartupMetrics(p2p.PeerName(p2pNode.ID()), int(cluster.GetThreshold()), len(cluster.GetOperators()), len(cluster.GetValidators()), network)
initStartupMetrics(p2p.PeerName(p2pNode.ID()), lock.Threshold, len(lock.Operators), len(lock.Validators), network)

eth2Cl, subEth2Cl, err := newETH2Client(ctx, conf, life, cluster, cluster.GetForkVersion(), conf.BeaconNodeTimeout, conf.BeaconNodeSubmitTimeout)
eth2Cl, subEth2Cl, err := newETH2Client(ctx, conf, life, lock, lock.ForkVersion, conf.BeaconNodeTimeout, conf.BeaconNodeSubmitTimeout)
if err != nil {
return err
}
Expand All @@ -276,7 +272,7 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

peerIDs, err := manifest.ClusterPeerIDs(cluster)
peerIDs, err := lock.PeerIDs()
if err != nil {
return err
}
Expand All @@ -292,7 +288,7 @@ func Run(ctx context.Context, conf Config) (err error) {
return errors.New("nickname cannot exceed 32 characters")
}

wirePeerInfo(life, p2pNode, peerIDs, cluster.GetInitialMutationHash(), sender, conf.BuilderAPI, conf.Nickname)
wirePeerInfo(life, p2pNode, peerIDs, lock.LockHash, sender, conf.BuilderAPI, conf.Nickname)

// seenPubkeys channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey)
Expand All @@ -311,17 +307,17 @@ func Run(ctx context.Context, conf Config) (err error) {
}
}

pubkeys, err := getDVPubkeys(cluster)
pubkeys, err := getDVPubkeys(lock)
if err != nil {
return err
}

consensusDebugger := consensus.NewDebugger()

wireMonitoringAPI(ctx, life, conf.MonitoringAddr, conf.DebugAddr, p2pNode, eth2Cl, peerIDs,
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(cluster.GetValidators()))
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(lock.Validators))

err = wireCoreWorkflow(ctx, life, conf, cluster, nodeIdx, p2pNode, p2pKey, eth2Cl, subEth2Cl,
err = wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, p2pNode, p2pKey, eth2Cl, subEth2Cl,
peerIDs, sender, consensusDebugger, pubkeys, seenPubkeysFunc, sseListener, vapiCallsFunc)
if err != nil {
return err
Expand All @@ -340,14 +336,14 @@ func wirePeerInfo(life *lifecycle.Manager, p2pNode host.Host, peers []peer.ID, l

// wireP2P constructs the p2p tcp or udp (libp2p) nodes and registers it with the life cycle manager.
func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
cluster *manifestpb.Cluster, p2pKey *k1.PrivateKey, lockHashHex string,
lock *cluster.Lock, p2pKey *k1.PrivateKey, lockHashHex, uuid string,
) (host.Host, error) {
peerIDs, err := manifest.ClusterPeerIDs(cluster)
peerIDs, err := lock.PeerIDs()
if err != nil {
return nil, err
}

relays, err := p2p.NewRelays(ctx, conf.P2P.Relays, lockHashHex)
relays, err := p2p.NewRelays(ctx, conf.P2P.Relays, lockHashHex, uuid)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -398,7 +394,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,

// wireCoreWorkflow wires the core workflow components.
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
cluster *manifestpb.Cluster, nodeIdx clusterpkg.NodeIdx, p2pNode host.Host, p2pKey *k1.PrivateKey,
lock *cluster.Lock, nodeIdx cluster.NodeIdx, p2pNode host.Host, p2pKey *k1.PrivateKey,
eth2Cl, submissionEth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender,
consensusDebugger consensus.Debugger, pubkeys []core.PubKey, seenPubkeys func(core.PubKey),
sseListener sse.Listener, vapiCalls func(),
Expand All @@ -410,22 +406,18 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
pubshares []eth2p0.BLSPubKey
allPubSharesByKey = make(map[core.PubKey]map[int]tbls.PublicKey) // map[pubkey]map[shareIdx]pubshare
feeRecipientAddrByCorePubkey = make(map[core.PubKey]string)
lockFeeRecipientAddresses = lock.FeeRecipientAddresses()
)

for _, val := range cluster.GetValidators() {
pubkey, err := manifest.ValidatorPublicKey(val)
if err != nil {
return err
}

corePubkey, err := core.PubKeyFromBytes(pubkey[:])
for vi, val := range lock.Validators {
corePubkey, err := core.PubKeyFromBytes(val.PubKey)
if err != nil {
return err
}

allPubShares := make(map[int]tbls.PublicKey)

for i, b := range val.GetPubShares() {
for i, b := range val.PubShares {
pubshare, err := tblsconv.PubkeyFromBytes(b)
if err != nil {
return err
Expand All @@ -435,29 +427,29 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
allPubShares[i+1] = pubshare
}

pubShare, err := manifest.ValidatorPublicShare(val, nodeIdx.PeerIdx)
pubShare, err := val.PublicShare(nodeIdx.PeerIdx)
if err != nil {
return err
}

eth2Share := eth2p0.BLSPubKey(pubShare)

eth2Pubkey := eth2p0.BLSPubKey(pubkey)
eth2Pubkey := eth2p0.BLSPubKey(val.PubKey)

eth2Pubkeys = append(eth2Pubkeys, eth2Pubkey)
pubshares = append(pubshares, eth2Share)
allPubSharesByKey[corePubkey] = allPubShares
feeRecipientAddrByCorePubkey[corePubkey] = val.GetFeeRecipientAddress()
feeRecipientAddrByCorePubkey[corePubkey] = lockFeeRecipientAddresses[vi]

var builderRegistration eth2api.VersionedSignedValidatorRegistration
if err := json.Unmarshal(val.GetBuilderRegistrationJson(), &builderRegistration); err != nil {
return errors.Wrap(err, "unmarshal builder registration")
builderRegistration, err := val.Eth2Registration()
if err != nil {
return errors.Wrap(err, "convert builder registration")
}

builderRegistrations = append(builderRegistrations, &builderRegistration)
builderRegistrations = append(builderRegistrations, builderRegistration)
}

peers, err := manifest.ClusterPeers(cluster)
peers, err := lock.Peers()
if err != nil {
return err
}
Expand Down Expand Up @@ -566,7 +558,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,

dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb"))

vapi, err := validatorapi.NewComponent(eth2Cl, allPubSharesByKey, nodeIdx.ShareIdx, feeRecipientFunc, conf.BuilderAPI, uint(cluster.GetTargetGasLimit()), seenPubkeys)
vapi, err := validatorapi.NewComponent(eth2Cl, allPubSharesByKey, nodeIdx.ShareIdx, feeRecipientFunc, conf.BuilderAPI, lock.TargetGasLimit, seenPubkeys)
if err != nil {
return err
}
Expand All @@ -575,7 +567,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

parSigDB := parsigdb.NewMemDB(int(cluster.GetThreshold()), deadlinerFunc("parsigdb"))
parSigDB := parsigdb.NewMemDB(lock.Threshold, deadlinerFunc("parsigdb"))

var parSigEx core.ParSigEx
if conf.TestConfig.ParSigExFunc != nil {
Expand All @@ -589,7 +581,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
parSigEx = parsigex.NewParSigEx(p2pNode, sender.SendAsync, nodeIdx.PeerIdx, peerIDs, verifyFunc, gaterFunc)
}

sigAgg, err := sigagg.New(int(cluster.GetThreshold()), sigagg.NewVerifier(eth2Cl))
sigAgg, err := sigagg.New(lock.Threshold, sigagg.NewVerifier(eth2Cl))
if err != nil {
return err
}
Expand Down Expand Up @@ -624,9 +616,9 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
coreConsensus := consensusController.CurrentConsensus() // initially points to DefaultConsensus()

// Priority protocol always uses QBFTv2.
err = wirePrioritise(ctx, conf, life, p2pNode, peerIDs, int(cluster.GetThreshold()),
err = wirePrioritise(ctx, conf, life, p2pNode, peerIDs, lock.Threshold,
sender.SendReceive, defaultConsensus, sched, p2pKey, deadlineFunc,
consensusController, cluster.GetConsensusProtocol())
consensusController, lock.ConsensusProtocol)
if err != nil {
return err
}
Expand Down Expand Up @@ -812,29 +804,21 @@ func calculateTrackerDelay(ctx context.Context, cl eth2wrap.Client, now time.Tim
}

// eth2PubKeys returns a list of BLS pubkeys of validators in the cluster lock.
func eth2PubKeys(cluster *manifestpb.Cluster) ([]eth2p0.BLSPubKey, error) {
func eth2PubKeys(lock *cluster.Lock) []eth2p0.BLSPubKey {
var pubkeys []eth2p0.BLSPubKey

for _, val := range cluster.GetValidators() {
pubkey, err := manifest.ValidatorPublicKey(val)
if err != nil {
return []eth2p0.BLSPubKey{}, err
}

pk := eth2p0.BLSPubKey(pubkey)
for _, dv := range lock.Validators {
pk := eth2p0.BLSPubKey(dv.PubKey)
pubkeys = append(pubkeys, pk)
}

return pubkeys, nil
return pubkeys
}

// newETH2Client returns a new eth2client for the configured timeouts; it is either a beaconmock for
// simnet or a multi http client to a real beacon node.
func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager, cluster *manifestpb.Cluster, forkVersion []byte, bnTimeout time.Duration, submissionBnTimeout time.Duration) (eth2Cl eth2wrap.Client, submissionEth2Cl eth2wrap.Client, err error) {
pubkeys, err := eth2PubKeys(cluster)
if err != nil {
return nil, nil, err
}
func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager, lock *cluster.Lock, forkVersion []byte, bnTimeout time.Duration, submissionBnTimeout time.Duration) (eth2Cl eth2wrap.Client, submissionEth2Cl eth2wrap.Client, err error) {
pubkeys := eth2PubKeys(lock)

// Default to 1s slot duration if not set.
if conf.SimnetSlotDuration == 0 {
Expand Down Expand Up @@ -1124,16 +1108,11 @@ func setFeeRecipient(eth2Cl eth2wrap.Client, feeRecipientFunc func(core.PubKey)
}

// getDVPubkeys returns DV public keys from given cluster.Lock.
func getDVPubkeys(cluster *manifestpb.Cluster) ([]core.PubKey, error) {
func getDVPubkeys(lock *cluster.Lock) ([]core.PubKey, error) {
var pubkeys []core.PubKey

for _, val := range cluster.GetValidators() {
pk, err := manifest.ValidatorPublicKey(val)
if err != nil {
return nil, err
}

pubkey, err := core.PubKeyFromBytes(pk[:])
for _, dv := range lock.Validators {
pubkey, err := core.PubKeyFromBytes(dv.PubKey)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading