From 99beca24e48be5b26e50cc8c06d7bc6c661fa74e Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 8 Dec 2025 03:51:31 +0100 Subject: [PATCH 1/4] add `generate_attestations` task --- .../clients/consensus/rpc/beaconapi.go | 44 ++ .../tasks/generate_attestations/README.md | 108 +++ .../tasks/generate_attestations/config.go | 92 +++ .../tasks/generate_attestations/task.go | 720 ++++++++++++++++++ pkg/coordinator/tasks/tasks.go | 2 + playbooks/dev/generate-attestations.yaml | 37 + 6 files changed, 1003 insertions(+) create mode 100644 pkg/coordinator/tasks/generate_attestations/README.md create mode 100644 pkg/coordinator/tasks/generate_attestations/config.go create mode 100644 pkg/coordinator/tasks/generate_attestations/task.go create mode 100644 playbooks/dev/generate-attestations.yaml diff --git a/pkg/coordinator/clients/consensus/rpc/beaconapi.go b/pkg/coordinator/clients/consensus/rpc/beaconapi.go index dd24912..f0c3b73 100644 --- a/pkg/coordinator/clients/consensus/rpc/beaconapi.go +++ b/pkg/coordinator/clients/consensus/rpc/beaconapi.go @@ -117,6 +117,10 @@ func (bc *BeaconClient) getJSON(ctx context.Context, requrl string, returnValue } func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, returnValue interface{}) error { + return bc.postJSONWithHeaders(ctx, requrl, postData, returnValue, nil) +} + +func (bc *BeaconClient) postJSONWithHeaders(ctx context.Context, requrl string, postData, returnValue interface{}, extraHeaders map[string]string) error { logurl := getRedactedURL(requrl) postDataBytes, err := json.Marshal(postData) @@ -137,6 +141,10 @@ func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, r req.Header.Set(headerKey, headerVal) } + for headerKey, headerVal := range extraHeaders { + req.Header.Set(headerKey, headerVal) + } + client := &nethttp.Client{Timeout: time.Second * 300} resp, err := client.Do(req) @@ -497,6 +505,42 @@ func (bc *BeaconClient) SubmitProposerSlashing(ctx context.Context, slashing *ph return nil } +type apiAttestationData struct { + Data *phase0.AttestationData `json:"data"` +} + +func (bc *BeaconClient) GetAttestationData(ctx context.Context, slot uint64, committeeIndex uint64) (*phase0.AttestationData, error) { + var attestationData apiAttestationData + + err := bc.getJSON(ctx, fmt.Sprintf("%s/eth/v1/validator/attestation_data?slot=%d&committee_index=%d", bc.endpoint, slot, committeeIndex), &attestationData) + if err != nil { + return nil, fmt.Errorf("error retrieving attestation data: %v", err) + } + + return attestationData.Data, nil +} + +// SingleAttestation represents the Electra single attestation format for the v2 API. +type SingleAttestation struct { + CommitteeIndex uint64 `json:"committee_index,string"` + AttesterIndex uint64 `json:"attester_index,string"` + Data *phase0.AttestationData `json:"data"` + Signature string `json:"signature"` +} + +func (bc *BeaconClient) SubmitAttestations(ctx context.Context, attestations []*SingleAttestation) error { + headers := map[string]string{ + "Eth-Consensus-Version": "electra", + } + + err := bc.postJSONWithHeaders(ctx, fmt.Sprintf("%s/eth/v2/beacon/pool/attestations", bc.endpoint), attestations, nil, headers) + if err != nil { + return err + } + + return nil +} + type NodeIdentity struct { PeerID string `json:"peer_id"` ENR string `json:"enr"` diff --git a/pkg/coordinator/tasks/generate_attestations/README.md b/pkg/coordinator/tasks/generate_attestations/README.md new file mode 100644 index 0000000..9983a6d --- /dev/null +++ b/pkg/coordinator/tasks/generate_attestations/README.md @@ -0,0 +1,108 @@ +## `generate_attestations` Task + +### Description +The `generate_attestations` task is designed to generate valid attestations for a specified range of validator keys and submit them to the network. This task fetches attester duties for the configured validators, retrieves attestation data from the beacon node, signs the attestations with the validator private keys, and submits them via the beacon API. + +The task supports advanced configuration options for testing various attestation scenarios, including attesting for previous epochs, using delayed head blocks, and randomizing late head offsets per attestation. + +### Configuration Parameters + +- **`mnemonic`**:\ + A mnemonic phrase used for generating the validators' private keys. The keys are derived using the standard BIP39/BIP44 path (`m/12381/3600/{index}/0/0`). + +- **`startIndex`**:\ + The starting index within the mnemonic from which to begin generating validator keys. This sets the initial point for key derivation. + +- **`indexCount`**:\ + The number of validator keys to generate from the mnemonic, determining how many validators will be used for attestation generation. + +- **`limitTotal`**:\ + The total limit on the number of attestations that the task will generate. The task will stop after reaching this limit. + +- **`limitEpochs`**:\ + The total number of epochs to generate attestations for. The task will stop after processing this many epochs. + +- **`clientPattern`**:\ + A regex pattern for selecting specific client endpoints for fetching attestation data and submitting attestations. If left empty, any available endpoint will be used. + +- **`excludeClientPattern`**:\ + A regex pattern to exclude certain client endpoints from being used. This parameter adds a layer of control by allowing the exclusion of specific clients. + +### Advanced Settings + +- **`lastEpochAttestations`**:\ + When set to `true`, the task will generate attestations for the previous epoch's duties instead of the current epoch. This is useful for testing late attestation scenarios. Attestations are sent one slot at a time (each wallclock slot sends attestations for the corresponding slot in the previous epoch). + +- **`sendAllLastEpoch`**:\ + When set to `true`, instead of sending attestations slot-by-slot, all attestations for the previous epoch are sent at once at each epoch boundary. This is useful for bulk testing of late attestations. Requires `lastEpochAttestations` to be implicitly treated as true. + +- **`lateHead`**:\ + Offsets the beacon block root in the attestation by the specified number of blocks. For example, setting `lateHead: 5` will use the block root from 5 blocks before the current head. This simulates validators with a delayed view of the chain. Positive values go back (older blocks), negative values go forward. + +- **`randomLateHead`**:\ + Specifies a range for randomizing the late head offset per attestation in `"min:max"` or `"min-max"` format. For example, `randomLateHead: "1-5"` will apply a random late head offset between 1 and 5 blocks (inclusive). By default, each attestation gets its own random offset. Use `lateHeadClusterSize` to group attestations with the same offset. + +- **`lateHeadClusterSize`**:\ + Controls how many attestations share the same random late head offset. Default is `1` (each attestation gets its own random offset). Setting this to a higher value groups attestations together with the same late head value. For example, `lateHeadClusterSize: 10` means every 10 attestations will share the same random offset. + + +### Defaults + +Default settings for the `generate_attestations` task: + +```yaml +- name: generate_attestations + config: + mnemonic: "" + startIndex: 0 + indexCount: 0 + limitTotal: 0 + limitEpochs: 0 + clientPattern: "" + excludeClientPattern: "" + lastEpochAttestations: false + sendAllLastEpoch: false + lateHead: 0 + randomLateHead: "" + lateHeadClusterSize: 1 +``` + +### Example Usage + +Basic usage to generate attestations for 100 validators over 5 epochs: + +```yaml +- name: generate_attestations + config: + mnemonic: "your mnemonic phrase here" + startIndex: 0 + indexCount: 100 + limitEpochs: 5 +``` + +Advanced usage with late head for testing delayed attestations: + +```yaml +- name: generate_attestations + config: + mnemonic: "your mnemonic phrase here" + startIndex: 0 + indexCount: 50 + limitTotal: 1000 + clientPattern: "lighthouse.*" + lastEpochAttestations: true + lateHead: 3 +``` + +Bulk send all previous epoch attestations at once with random late head: + +```yaml +- name: generate_attestations + config: + mnemonic: "your mnemonic phrase here" + startIndex: 0 + indexCount: 100 + limitEpochs: 3 + sendAllLastEpoch: true + randomLateHead: "1:10" +``` diff --git a/pkg/coordinator/tasks/generate_attestations/config.go b/pkg/coordinator/tasks/generate_attestations/config.go new file mode 100644 index 0000000..8e32d0a --- /dev/null +++ b/pkg/coordinator/tasks/generate_attestations/config.go @@ -0,0 +1,92 @@ +package generateattestations + +import ( + "errors" + "fmt" + "strconv" + "strings" +) + +type Config struct { + // Key configuration + Mnemonic string `yaml:"mnemonic" json:"mnemonic"` + StartIndex int `yaml:"startIndex" json:"startIndex"` + IndexCount int `yaml:"indexCount" json:"indexCount"` + + // Limit configuration + LimitTotal int `yaml:"limitTotal" json:"limitTotal"` + LimitEpochs int `yaml:"limitEpochs" json:"limitEpochs"` + + // Client selection + ClientPattern string `yaml:"clientPattern" json:"clientPattern"` + ExcludeClientPattern string `yaml:"excludeClientPattern" json:"excludeClientPattern"` + + // Advanced settings + LastEpochAttestations bool `yaml:"lastEpochAttestations" json:"lastEpochAttestations"` + SendAllLastEpoch bool `yaml:"sendAllLastEpoch" json:"sendAllLastEpoch"` + LateHead int `yaml:"lateHead" json:"lateHead"` + RandomLateHead string `yaml:"randomLateHead" json:"randomLateHead"` + LateHeadClusterSize int `yaml:"lateHeadClusterSize" json:"lateHeadClusterSize"` +} + +// ParseRandomLateHead parses the RandomLateHead string in "min:max" or "min-max" format. +// Returns min, max values and whether random late head is enabled. +func (c *Config) ParseRandomLateHead() (min, max int, enabled bool, err error) { + if c.RandomLateHead == "" { + return 0, 0, false, nil + } + + // Try colon separator first, then dash + var parts []string + if strings.Contains(c.RandomLateHead, ":") { + parts = strings.Split(c.RandomLateHead, ":") + } else if strings.Contains(c.RandomLateHead, "-") { + parts = strings.Split(c.RandomLateHead, "-") + } + + if len(parts) != 2 { + return 0, 0, false, fmt.Errorf("randomLateHead must be in 'min:max' or 'min-max' format, got: %s", c.RandomLateHead) + } + + min, err = strconv.Atoi(strings.TrimSpace(parts[0])) + if err != nil { + return 0, 0, false, fmt.Errorf("invalid min value in randomLateHead: %w", err) + } + + max, err = strconv.Atoi(strings.TrimSpace(parts[1])) + if err != nil { + return 0, 0, false, fmt.Errorf("invalid max value in randomLateHead: %w", err) + } + + if min > max { + return 0, 0, false, fmt.Errorf("min (%d) cannot be greater than max (%d) in randomLateHead", min, max) + } + + return min, max, true, nil +} + +func DefaultConfig() Config { + return Config{} +} + +func (c *Config) Validate() error { + if c.LimitTotal == 0 && c.LimitEpochs == 0 { + return errors.New("either limitTotal or limitEpochs must be set") + } + + if c.Mnemonic == "" { + return errors.New("mnemonic must be set") + } + + if c.IndexCount == 0 { + return errors.New("indexCount must be set") + } + + if c.RandomLateHead != "" { + if _, _, _, err := c.ParseRandomLateHead(); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/coordinator/tasks/generate_attestations/task.go b/pkg/coordinator/tasks/generate_attestations/task.go new file mode 100644 index 0000000..4204aec --- /dev/null +++ b/pkg/coordinator/tasks/generate_attestations/task.go @@ -0,0 +1,720 @@ +package generateattestations + +import ( + "bytes" + "context" + "errors" + "fmt" + "math/rand" + "strings" + "time" + + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients/consensus" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients/consensus/rpc" + "github.com/ethpandaops/assertoor/pkg/coordinator/types" + hbls "github.com/herumi/bls-eth-go-binary/bls" + "github.com/protolambda/zrnt/eth2/beacon/common" + "github.com/protolambda/ztyp/tree" + "github.com/sirupsen/logrus" + "github.com/tyler-smith/go-bip39" + e2types "github.com/wealdtech/go-eth2-types/v2" + util "github.com/wealdtech/go-eth2-util" +) + +var ( + TaskName = "generate_attestations" + TaskDescriptor = &types.TaskDescriptor{ + Name: TaskName, + Description: "Generates valid attestations and sends them to the network", + Config: DefaultConfig(), + NewTask: NewTask, + } +) + +type Task struct { + ctx *types.TaskContext + options *types.TaskOptions + config Config + logger logrus.FieldLogger + + valSeed []byte + validatorKeys map[phase0.ValidatorIndex]*validatorKey + + // Cache for committee duties per epoch + dutiesCache map[uint64][]*v1.BeaconCommittee + dutiesCacheEpoch uint64 +} + +type validatorKey struct { + privkey *e2types.BLSPrivateKey + pubkey []byte +} + +func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) { + return &Task{ + ctx: ctx, + options: options, + logger: ctx.Logger.GetLogger(), + }, nil +} + +func (t *Task) Config() interface{} { + return t.config +} + +func (t *Task) Timeout() time.Duration { + return t.options.Timeout.Duration +} + +func (t *Task) LoadConfig() error { + config := DefaultConfig() + + // parse static config + if t.options.Config != nil { + if err := t.options.Config.Unmarshal(&config); err != nil { + return fmt.Errorf("error parsing task config for %v: %w", TaskName, err) + } + } + + // load dynamic vars + err := t.ctx.Vars.ConsumeVars(&config, t.options.ConfigVars) + if err != nil { + return err + } + + // validate config + if valerr := config.Validate(); valerr != nil { + return valerr + } + + t.valSeed, err = t.mnemonicToSeed(config.Mnemonic) + if err != nil { + return err + } + + t.config = config + + return nil +} + +func (t *Task) Execute(ctx context.Context) error { + // Initialize validator keys + err := t.initValidatorKeys() + if err != nil { + return err + } + + if len(t.validatorKeys) == 0 { + return fmt.Errorf("no validators found for given key range") + } + + t.logger.Infof("found %d validators for key range", len(t.validatorKeys)) + + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + + consensusPool.GetBlockCache().SetMinFollowDistance(uint64(1000)) + + // Subscribe to epoch events + epochSubscription := consensusPool.GetBlockCache().SubscribeWallclockEpochEvent(10) + defer epochSubscription.Unsubscribe() + + // Subscribe to slot events for timing + slotSubscription := consensusPool.GetBlockCache().SubscribeWallclockSlotEvent(10) + defer slotSubscription.Unsubscribe() + + // Get current epoch to start from + _, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now() + if err != nil { + return fmt.Errorf("failed to get current wallclock: %w", err) + } + + startEpoch := currentEpoch.Number() + if t.config.LastEpochAttestations && startEpoch > 0 { + startEpoch-- + } + + t.logger.Infof("starting attestation generation from epoch %d", startEpoch) + + specs := consensusPool.GetBlockCache().GetSpecs() + totalAttestations := 0 + processedEpochs := 0 + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case slot := <-slotSubscription.Channel(): + // Skip slot processing in sendAllLastEpoch mode + if t.config.SendAllLastEpoch { + continue + } + + // Process attestations for this slot + count, err := t.processSlot(ctx, slot.Number(), startEpoch) + if err != nil { + t.logger.Warnf("error processing slot %d: %v", slot.Number(), err) + continue + } + + if count > 0 { + totalAttestations += count + t.ctx.SetResult(types.TaskResultSuccess) + t.logger.Infof("sent %d attestations for slot %d (total: %d)", count, slot.Number(), totalAttestations) + } + + // Check limits + if t.config.LimitTotal > 0 && totalAttestations >= t.config.LimitTotal { + t.logger.Infof("reached total attestation limit: %d", totalAttestations) + return nil + } + + case epoch := <-epochSubscription.Channel(): + epochNum := epoch.Number() + if epochNum <= startEpoch { + continue + } + + if t.config.SendAllLastEpoch { + // Process all slots from the previous epoch at once + prevEpoch := epochNum - 1 + t.logger.Infof("processing all attestations for epoch %d", prevEpoch) + + epochAttestations := 0 + for slotOffset := uint64(0); slotOffset < specs.SlotsPerEpoch; slotOffset++ { + targetSlot := prevEpoch*specs.SlotsPerEpoch + slotOffset + + count, err := t.processSlotForEpoch(ctx, targetSlot, prevEpoch) + if err != nil { + t.logger.Warnf("error processing slot %d: %v", targetSlot, err) + continue + } + + epochAttestations += count + totalAttestations += count + + // Check total limit + if t.config.LimitTotal > 0 && totalAttestations >= t.config.LimitTotal { + t.logger.Infof("reached total attestation limit: %d", totalAttestations) + t.ctx.SetResult(types.TaskResultSuccess) + return nil + } + } + + if epochAttestations > 0 { + t.ctx.SetResult(types.TaskResultSuccess) + t.logger.Infof("sent %d attestations for epoch %d (total: %d)", epochAttestations, prevEpoch, totalAttestations) + } + } + + processedEpochs++ + t.logger.Infof("completed epoch %d, processed epochs: %d", epochNum-1, processedEpochs) + + // Check epoch limit + if t.config.LimitEpochs > 0 && processedEpochs >= t.config.LimitEpochs { + t.logger.Infof("reached epoch limit: %d", processedEpochs) + return nil + } + } + } +} + +func (t *Task) initValidatorKeys() error { + t.validatorKeys = make(map[phase0.ValidatorIndex]*validatorKey) + + validators := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetValidatorSet() + if validators == nil { + return fmt.Errorf("failed to get validator set") + } + + startIndex := uint64(0) + if t.config.StartIndex > 0 { + startIndex = uint64(t.config.StartIndex) //nolint:gosec // no overflow possible + } + + endIndex := startIndex + uint64(t.config.IndexCount) //nolint:gosec // no overflow possible + + for accountIdx := startIndex; accountIdx < endIndex; accountIdx++ { + validatorKeyPath := fmt.Sprintf("m/12381/3600/%d/0/0", accountIdx) + + validatorPrivkey, err := util.PrivateKeyFromSeedAndPath(t.valSeed, validatorKeyPath) + if err != nil { + return fmt.Errorf("failed generating validator key %v: %w", validatorKeyPath, err) + } + + validatorPubkey := validatorPrivkey.PublicKey().Marshal() + + // Find this validator in the validator set + for valIdx, val := range validators { + if bytes.Equal(val.Validator.PublicKey[:], validatorPubkey) { + if val.Status != v1.ValidatorStateActiveOngoing && val.Status != v1.ValidatorStateActiveExiting { + t.logger.Debugf("validator %d is not active (status: %s), skipping", valIdx, val.Status) + continue + } + + t.validatorKeys[valIdx] = &validatorKey{ + privkey: validatorPrivkey, + pubkey: validatorPubkey, + } + + break + } + } + } + + return nil +} + +func (t *Task) processSlot(ctx context.Context, slot uint64, startEpoch uint64) (int, error) { + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + specs := consensusPool.GetBlockCache().GetSpecs() + + slotEpoch := slot / specs.SlotsPerEpoch + + // For last epoch attestations, we attest for the previous epoch + targetEpoch := slotEpoch + if t.config.LastEpochAttestations && slotEpoch > 0 { + targetEpoch = slotEpoch - 1 + } + + // Skip if before start epoch + if targetEpoch < startEpoch { + return 0, nil + } + + // Get the target slot to attest for + targetSlot := slot + if t.config.LastEpochAttestations { + // Attest for the same relative slot in the previous epoch + slotInEpoch := slot % specs.SlotsPerEpoch + targetSlot = targetEpoch*specs.SlotsPerEpoch + slotInEpoch + } + + // Get committee duties for the target epoch + duties, err := t.getCommitteeDuties(ctx, targetEpoch) + if err != nil { + return 0, fmt.Errorf("failed to get committee duties: %w", err) + } + + // Find our validators' duties for this slot + slotDuties := t.findSlotDuties(targetSlot, duties) + if len(slotDuties) == 0 { + return 0, nil + } + + // Group duties by committee index + committeeGroups := make(map[phase0.CommitteeIndex][]*validatorDuty) + for _, duty := range slotDuties { + committeeGroups[duty.committeeIndex] = append(committeeGroups[duty.committeeIndex], duty) + } + + count := 0 + + // Get attestation data and submit for each committee + // Attestation data is requested for targetSlot, then lateHead/sourceOffset/targetOffset are applied on top + for committeeIdx, committeeDuties := range committeeGroups { + submitted, err := t.generateAndSubmitAttestation(ctx, targetSlot, committeeIdx, committeeDuties) + if err != nil { + t.logger.Warnf("failed to submit attestation for slot %d committee %d: %v", targetSlot, committeeIdx, err) + continue + } + + count += submitted + } + + return count, nil +} + +// processSlotForEpoch processes attestations for a specific slot in a given epoch. +// This is used by sendAllLastEpoch mode where we know the target epoch directly. +func (t *Task) processSlotForEpoch(ctx context.Context, slot uint64, epoch uint64) (int, error) { + // Get committee duties for the epoch + duties, err := t.getCommitteeDuties(ctx, epoch) + if err != nil { + return 0, fmt.Errorf("failed to get committee duties: %w", err) + } + + // Find our validators' duties for this slot + slotDuties := t.findSlotDuties(slot, duties) + if len(slotDuties) == 0 { + return 0, nil + } + + // Group duties by committee index + committeeGroups := make(map[phase0.CommitteeIndex][]*validatorDuty) + for _, duty := range slotDuties { + committeeGroups[duty.committeeIndex] = append(committeeGroups[duty.committeeIndex], duty) + } + + count := 0 + + // Get attestation data and submit for each committee + for committeeIdx, committeeDuties := range committeeGroups { + submitted, err := t.generateAndSubmitAttestation(ctx, slot, committeeIdx, committeeDuties) + if err != nil { + t.logger.Warnf("failed to submit attestation for slot %d committee %d: %v", slot, committeeIdx, err) + continue + } + + count += submitted + } + + return count, nil +} + +type validatorDuty struct { + validatorIndex phase0.ValidatorIndex + committeeIndex phase0.CommitteeIndex + committeeLength uint64 + positionInCommittee uint64 +} + +func (t *Task) findSlotDuties(slot uint64, duties []*v1.BeaconCommittee) []*validatorDuty { + var result []*validatorDuty + + for _, committee := range duties { + if uint64(committee.Slot) != slot { + continue + } + + for position, valIdx := range committee.Validators { + if _, ok := t.validatorKeys[valIdx]; ok { + result = append(result, &validatorDuty{ + validatorIndex: valIdx, + committeeIndex: committee.Index, + committeeLength: uint64(len(committee.Validators)), + positionInCommittee: uint64(position), + }) + } + } + } + + return result +} + +func (t *Task) getCommitteeDuties(ctx context.Context, epoch uint64) ([]*v1.BeaconCommittee, error) { + // Check cache first + if duties, ok := t.dutiesCache[epoch]; ok { + return duties, nil + } + + client := t.getClient() + if client == nil { + return nil, fmt.Errorf("no client available") + } + + duties, err := client.GetRPCClient().GetCommitteeDuties(ctx, "head", epoch) + if err != nil { + return nil, err + } + + // Initialize cache if needed + if t.dutiesCache == nil { + t.dutiesCache = make(map[uint64][]*v1.BeaconCommittee) + } + + // Clean up old epochs from cache (keep only current and previous) + for cachedEpoch := range t.dutiesCache { + if cachedEpoch+2 < epoch { + delete(t.dutiesCache, cachedEpoch) + } + } + + // Store in cache + t.dutiesCache[epoch] = duties + + return duties, nil +} + +func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, committeeIdx phase0.CommitteeIndex, duties []*validatorDuty) (int, error) { + client := t.getClient() + if client == nil { + return 0, fmt.Errorf("no client available") + } + + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + specs := consensusPool.GetBlockCache().GetSpecs() + genesis := consensusPool.GetBlockCache().GetGenesis() + + // Get attestation data from beacon node + attData, err := client.GetRPCClient().GetAttestationData(ctx, slot, uint64(committeeIdx)) + if err != nil { + return 0, fmt.Errorf("failed to get attestation data: %w", err) + } + + // Apply static late head offset if configured (applies to all attestations in this batch) + if t.config.LateHead != 0 { + modifiedData := t.applyLateHead(attData, t.config.LateHead) + attData = modifiedData + } + + // Get fork state + forkState, err := client.GetRPCClient().GetForkState(ctx, "head") + if err != nil { + return 0, fmt.Errorf("failed to get fork state: %w", err) + } + + // Compute the signing domain + epoch := uint64(attData.Slot) / specs.SlotsPerEpoch + forkVersion := forkState.CurrentVersion + if epoch < uint64(forkState.Epoch) { + forkVersion = forkState.PreviousVersion + } + + dom := common.ComputeDomain(common.DOMAIN_BEACON_ATTESTER, common.Version(forkVersion), tree.Root(genesis.GenesisValidatorsRoot)) + + if len(duties) == 0 { + return 0, fmt.Errorf("no duties provided") + } + + // Parse random late head config + randomMin, randomMax, randomEnabled, _ := t.config.ParseRandomLateHead() + clusterSize := t.config.LateHeadClusterSize + if clusterSize <= 0 { + clusterSize = 1 // Default: each attestation gets its own random offset + } + + // Create SingleAttestation objects for each validator (Electra format) + var singleAttestations []*rpc.SingleAttestation + + var currentClusterOffset int + var clusterAttData *phase0.AttestationData + attestationCount := 0 + + for _, duty := range duties { + valKey := t.validatorKeys[duty.validatorIndex] + if valKey == nil { + continue + } + + // Apply per-attestation or per-cluster random late head if configured + attDataForValidator := attData + if randomEnabled { + // Generate new random offset at start or when cluster is full + if attestationCount%clusterSize == 0 { + currentClusterOffset = randomMin + rand.Intn(randomMax-randomMin+1) + if currentClusterOffset != 0 { + clusterAttData = t.applyLateHead(attData, currentClusterOffset) + } else { + clusterAttData = attData + } + } + attDataForValidator = clusterAttData + attestationCount++ + } + + // Sign attestation data + msgRoot, err := attDataForValidator.HashTreeRoot() + if err != nil { + return 0, fmt.Errorf("failed to hash attestation data: %w", err) + } + + signingRoot := common.ComputeSigningRoot(msgRoot, dom) + + var secKey hbls.SecretKey + if err := secKey.Deserialize(valKey.privkey.Marshal()); err != nil { + return 0, fmt.Errorf("failed to deserialize private key: %w", err) + } + + sig := secKey.SignHash(signingRoot[:]) + + singleAtt := &rpc.SingleAttestation{ + CommitteeIndex: uint64(committeeIdx), + AttesterIndex: uint64(duty.validatorIndex), + Data: attDataForValidator, + Signature: fmt.Sprintf("0x%x", sig.Serialize()), + } + singleAttestations = append(singleAttestations, singleAtt) + } + + if len(singleAttestations) == 0 { + return 0, fmt.Errorf("no attestations generated") + } + + // Submit attestations + err = client.GetRPCClient().SubmitAttestations(ctx, singleAttestations) + if err != nil { + return 0, fmt.Errorf("failed to submit attestation: %w", err) + } + + return len(singleAttestations), nil +} + +// applyLateHead applies a late head offset to the attestation data. +// Positive offset goes back (older blocks), negative goes forward. +// If the offset would result in a head before the target epoch, the head is clamped +// to the target epoch's first slot (using the target root). +func (t *Task) applyLateHead(attData *phase0.AttestationData, offset int) *phase0.AttestationData { + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + specs := consensusPool.GetBlockCache().GetSpecs() + + newRoot, newSlot := t.walkBlocks(attData.BeaconBlockRoot, uint64(attData.Slot), offset) + + // Validate: head slot must be >= target epoch's first slot + // If not, clamp to target epoch (use target root as head) + targetEpochFirstSlot := uint64(attData.Target.Epoch) * specs.SlotsPerEpoch + if newSlot < targetEpochFirstSlot { + t.logger.Debugf("late head offset %d would result in invalid head (slot %d < target epoch slot %d), clamping to target", + offset, newSlot, targetEpochFirstSlot) + newRoot = attData.Target.Root + newSlot = targetEpochFirstSlot + } + + modifiedData := &phase0.AttestationData{ + Slot: attData.Slot, + Index: attData.Index, + BeaconBlockRoot: newRoot, + Source: &phase0.Checkpoint{ + Epoch: attData.Source.Epoch, + Root: attData.Source.Root, + }, + Target: &phase0.Checkpoint{ + Epoch: attData.Target.Epoch, + Root: attData.Target.Root, + }, + } + + t.logger.Debugf("late head offset: %d, new slot: %d [root: %x]", offset, newSlot, newRoot) + + return modifiedData +} + +// walkBlocks walks N blocks from the given root. +// Positive steps go backwards (using parentRoot), negative steps go forward (finding child blocks). +// Returns the resulting root and slot. Always returns a valid slot from the last known block. +func (t *Task) walkBlocks(startRoot phase0.Root, startSlot uint64, steps int) (phase0.Root, uint64) { + if steps > 0 { + return t.walkBackBlocks(startRoot, startSlot, steps) + } else if steps < 0 { + return t.walkForwardBlocks(startRoot, startSlot, -steps) + } + + // steps == 0, try to get actual slot from block, fallback to startSlot + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + blockCache := consensusPool.GetBlockCache() + + block := blockCache.GetCachedBlockByRoot(startRoot) + if block != nil { + return startRoot, uint64(block.Slot) + } + + return startRoot, startSlot +} + +// walkBackBlocks walks back N blocks from the given root using parentRoot. +func (t *Task) walkBackBlocks(startRoot phase0.Root, startSlot uint64, steps int) (phase0.Root, uint64) { + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + blockCache := consensusPool.GetBlockCache() + + currentRoot := startRoot + currentSlot := startSlot + + // Get initial block to determine starting slot + block := blockCache.GetCachedBlockByRoot(currentRoot) + if block != nil { + currentSlot = uint64(block.Slot) + } + + for range steps { + block := blockCache.GetCachedBlockByRoot(currentRoot) + if block == nil { + break + } + + currentSlot = uint64(block.Slot) + + parentRoot := block.GetParentRoot() + if parentRoot == nil { + break + } + + currentRoot = *parentRoot + } + + // Get the final slot + if block := blockCache.GetCachedBlockByRoot(currentRoot); block != nil { + currentSlot = uint64(block.Slot) + } + + return currentRoot, currentSlot +} + +// walkForwardBlocks walks forward N blocks from the given root by finding child blocks. +func (t *Task) walkForwardBlocks(startRoot phase0.Root, startSlot uint64, steps int) (phase0.Root, uint64) { + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + blockCache := consensusPool.GetBlockCache() + + currentRoot := startRoot + currentSlot := startSlot + + // Get initial block to determine starting slot + block := blockCache.GetCachedBlockByRoot(currentRoot) + if block != nil { + currentSlot = uint64(block.Slot) + } + + for range steps { + // Find a child block whose parent is currentRoot + childBlock := t.findChildBlock(currentRoot) + if childBlock == nil { + break + } + + currentRoot = childBlock.Root + currentSlot = uint64(childBlock.Slot) + } + + return currentRoot, currentSlot +} + +// findChildBlock finds a cached block whose parent is the given root. +func (t *Task) findChildBlock(parentRoot phase0.Root) *consensus.Block { + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() + blockCache := consensusPool.GetBlockCache() + + // Get the parent block to know its slot + parentBlock := blockCache.GetCachedBlockByRoot(parentRoot) + if parentBlock == nil { + return nil + } + + // Search in slots after the parent + parentSlot := uint64(parentBlock.Slot) + for searchSlot := parentSlot + 1; searchSlot <= parentSlot+32; searchSlot++ { + blocks := blockCache.GetCachedBlocksBySlot(phase0.Slot(searchSlot)) + for _, block := range blocks { + blockParent := block.GetParentRoot() + if blockParent != nil && *blockParent == parentRoot { + return block + } + } + } + + return nil +} + +func (t *Task) getClient() *consensus.Client { + clientPool := t.ctx.Scheduler.GetServices().ClientPool() + + if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { + return clientPool.GetConsensusPool().GetReadyEndpoint(consensus.AnyClient) + } + + clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) + if len(clients) == 0 { + return nil + } + + return clients[0].ConsensusClient +} + +func (t *Task) mnemonicToSeed(mnemonic string) (seed []byte, err error) { + mnemonic = strings.TrimSpace(mnemonic) + if !bip39.IsMnemonicValid(mnemonic) { + return nil, errors.New("mnemonic is not valid") + } + + return bip39.NewSeed(mnemonic, ""), nil +} diff --git a/pkg/coordinator/tasks/tasks.go b/pkg/coordinator/tasks/tasks.go index cee9696..5481d40 100644 --- a/pkg/coordinator/tasks/tasks.go +++ b/pkg/coordinator/tasks/tasks.go @@ -17,6 +17,7 @@ import ( checkethcall "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_eth_call" checkethconfig "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_eth_config" checkexecutionsyncstatus "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_execution_sync_status" + generateattestations "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_attestations" generateblobtransactions "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_blob_transactions" generateblschanges "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_bls_changes" generatechildwallet "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_child_wallet" @@ -60,6 +61,7 @@ var AvailableTaskDescriptors = []*types.TaskDescriptor{ checkethcall.TaskDescriptor, checkethconfig.TaskDescriptor, checkexecutionsyncstatus.TaskDescriptor, + generateattestations.TaskDescriptor, generateblobtransactions.TaskDescriptor, generateblschanges.TaskDescriptor, generatechildwallet.TaskDescriptor, diff --git a/playbooks/dev/generate-attestations.yaml b/playbooks/dev/generate-attestations.yaml new file mode 100644 index 0000000..e0ae04d --- /dev/null +++ b/playbooks/dev/generate-attestations.yaml @@ -0,0 +1,37 @@ +id: generate-attestations +name: "Generate Attestations" +timeout: 1h +config: + validatorMnemonic: "giant issue aisle success illegal bike spike question tent bar rely arctic volcano long crawl hungry vocal artwork sniff fantasy very lucky have athlete" + validatorStartIndex: 0 + validatorCount: 100 + attestationEpochs: 3 + lastEpochAttestations: false + sendAllLastEpoch: false + lateHead: 0 + randomLateHead: "" + lateHeadClusterSize: 1 +tasks: +- name: check_clients_are_healthy + title: "Check if at least one client is ready" + timeout: 5m + config: + minClientCount: 1 + +- name: run_tasks + title: "Run tasks" + config: + stopChildOnResult: false + tasks: + - name: generate_attestations + title: "Generate attestations for 3 epochs" + configVars: + mnemonic: "validatorMnemonic" + startIndex: "validatorStartIndex" + indexCount: "validatorCount" + limitEpochs: "attestationEpochs" + lastEpochAttestations: "lastEpochAttestations" + sendAllLastEpoch: "sendAllLastEpoch" + lateHead: "lateHead" + randomLateHead: "randomLateHead" + lateHeadClusterSize: "lateHeadClusterSize" From 9168faa0549634e547031b68ade4299c8697ad00 Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 8 Dec 2025 04:52:08 +0100 Subject: [PATCH 2/4] retry loading attestation data --- .../tasks/generate_attestations/task.go | 56 +++++++++++++++---- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/pkg/coordinator/tasks/generate_attestations/task.go b/pkg/coordinator/tasks/generate_attestations/task.go index 4204aec..b244602 100644 --- a/pkg/coordinator/tasks/generate_attestations/task.go +++ b/pkg/coordinator/tasks/generate_attestations/task.go @@ -429,8 +429,8 @@ func (t *Task) getCommitteeDuties(ctx context.Context, epoch uint64) ([]*v1.Beac } func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, committeeIdx phase0.CommitteeIndex, duties []*validatorDuty) (int, error) { - client := t.getClient() - if client == nil { + clients := t.getClients() + if len(clients) == 0 { return 0, fmt.Errorf("no client available") } @@ -438,12 +438,25 @@ func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, co specs := consensusPool.GetBlockCache().GetSpecs() genesis := consensusPool.GetBlockCache().GetGenesis() - // Get attestation data from beacon node - attData, err := client.GetRPCClient().GetAttestationData(ctx, slot, uint64(committeeIdx)) - if err != nil { - return 0, fmt.Errorf("failed to get attestation data: %w", err) + // Get attestation data from beacon node, retry with different clients if needed + var attData *phase0.AttestationData + var lastErr error + for _, client := range clients { + var err error + attData, err = client.GetRPCClient().GetAttestationData(ctx, slot, uint64(committeeIdx)) + if err == nil { + break + } + lastErr = err + t.logger.Debugf("failed to get attestation data from %s: %v, trying next client", client.GetName(), err) + } + if attData == nil { + return 0, fmt.Errorf("failed to get attestation data from all clients: %w", lastErr) } + // Use first client for remaining operations + client := clients[0] + // Apply static late head offset if configured (applies to all attestations in this batch) if t.config.LateHead != 0 { modifiedData := t.applyLateHead(attData, t.config.LateHead) @@ -696,18 +709,39 @@ func (t *Task) findChildBlock(parentRoot phase0.Root) *consensus.Block { } func (t *Task) getClient() *consensus.Client { + clients := t.getClients() + if len(clients) == 0 { + return nil + } + + return clients[0] +} + +func (t *Task) getClients() []*consensus.Client { clientPool := t.ctx.Scheduler.GetServices().ClientPool() + consensusPool := clientPool.GetConsensusPool() if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { - return clientPool.GetConsensusPool().GetReadyEndpoint(consensus.AnyClient) + allClients := consensusPool.GetAllEndpoints() + clients := make([]*consensus.Client, 0, len(allClients)) + for _, c := range allClients { + if consensusPool.IsClientReady(c) { + clients = append(clients, c) + } + } + + return clients } - clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) - if len(clients) == 0 { - return nil + poolClients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) + clients := make([]*consensus.Client, 0, len(poolClients)) + for _, c := range poolClients { + if c.ConsensusClient != nil && consensusPool.IsClientReady(c.ConsensusClient) { + clients = append(clients, c.ConsensusClient) + } } - return clients[0].ConsensusClient + return clients } func (t *Task) mnemonicToSeed(mnemonic string) (seed []byte, err error) { From f685544ac25663ce4e7475bc5fa04122adb78600 Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 8 Dec 2025 05:07:10 +0100 Subject: [PATCH 3/4] fix staticcheck issue --- pkg/coordinator/tasks/generate_attestations/task.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/coordinator/tasks/generate_attestations/task.go b/pkg/coordinator/tasks/generate_attestations/task.go index b244602..9a27ef3 100644 --- a/pkg/coordinator/tasks/generate_attestations/task.go +++ b/pkg/coordinator/tasks/generate_attestations/task.go @@ -43,8 +43,7 @@ type Task struct { validatorKeys map[phase0.ValidatorIndex]*validatorKey // Cache for committee duties per epoch - dutiesCache map[uint64][]*v1.BeaconCommittee - dutiesCacheEpoch uint64 + dutiesCache map[uint64][]*v1.BeaconCommittee } type validatorKey struct { From 70d6f64a562096c81d404f4cfd3a85b5b2738614 Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 8 Dec 2025 21:35:33 +0100 Subject: [PATCH 4/4] bump `golangci-lint` & fix linter issues --- .github/workflows/_shared-check.yaml | 2 +- .golangci.yml | 11 ++++- main.go | 2 + pkg/coordinator/clients/consensus/block.go | 1 + .../clients/consensus/blockcache.go | 2 + .../clients/consensus/clientlogic.go | 2 + pkg/coordinator/clients/consensus/forks.go | 1 + .../clients/consensus/rpc/beaconapi.go | 4 +- .../clients/consensus/rpc/beaconstream.go | 6 ++- .../consensus/rpc/eventstream/eventstream.go | 6 +++ .../clients/consensus/subscriptions.go | 1 + pkg/coordinator/clients/execution/block.go | 1 + .../clients/execution/blockcache.go | 1 + .../clients/execution/clientlogic.go | 2 +- pkg/coordinator/clients/execution/forks.go | 1 + pkg/coordinator/clients/execution/pool.go | 2 +- .../clients/execution/rpc/ethconfig.go | 2 +- .../clients/execution/rpc/executionapi.go | 1 + .../clients/execution/subscriptions.go | 1 + pkg/coordinator/db/task_results.go | 6 ++- pkg/coordinator/logger/dbwriter.go | 3 +- pkg/coordinator/names/validatornames.go | 4 +- .../check_consensus_attestation_stats/task.go | 1 + .../tasks/check_consensus_identity/task.go | 1 + .../check_consensus_proposer_duty/task.go | 3 +- .../check_consensus_validator_status/task.go | 2 +- .../tasks/generate_attestations/config.go | 14 +++--- .../tasks/generate_attestations/task.go | 44 ++++++++++++++----- .../tasks/generate_blob_transactions/task.go | 2 + .../tasks/generate_bls_changes/task.go | 1 + .../tasks/generate_consolidations/task.go | 7 +-- .../tasks/generate_deposits/task.go | 6 ++- .../tasks/generate_eoa_transactions/task.go | 3 +- pkg/coordinator/tasks/generate_exits/task.go | 1 + .../tasks/generate_slashings/task.go | 1 + .../generate_withdrawal_requests/task.go | 8 ++-- .../tasks/get_consensus_validators/task.go | 2 +- pkg/coordinator/tasks/run_shell/task.go | 1 + pkg/coordinator/tasks/run_task_matrix/task.go | 1 + .../tasks/run_tasks_concurrent/task.go | 1 + pkg/coordinator/testregistry.go | 3 +- pkg/coordinator/testrunner.go | 6 +++ pkg/coordinator/vars/variables.go | 4 ++ pkg/coordinator/wallet/wallet.go | 5 ++- pkg/coordinator/wallet/walletpool.go | 1 - pkg/coordinator/web/api/handler.go | 4 +- pkg/coordinator/web/handlers/clients.go | 2 +- pkg/coordinator/web/handlers/handler.go | 2 +- pkg/coordinator/web/handlers/index.go | 4 +- pkg/coordinator/web/handlers/registry.go | 4 +- pkg/coordinator/web/handlers/test.go | 2 +- pkg/coordinator/web/templates/templates.go | 2 + 52 files changed, 140 insertions(+), 60 deletions(-) diff --git a/.github/workflows/_shared-check.yaml b/.github/workflows/_shared-check.yaml index cdc826e..7fe9e92 100644 --- a/.github/workflows/_shared-check.yaml +++ b/.github/workflows/_shared-check.yaml @@ -36,7 +36,7 @@ jobs: uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v2.1.6 + version: v2.7.2 # Optional: working directory, useful for monorepos # working-directory: somedir diff --git a/.golangci.yml b/.golangci.yml index 785ea31..eac672b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -27,7 +27,7 @@ linters: - unparam - unused - whitespace - - wsl + - wsl_v5 settings: errcheck: check-type-assertions: true @@ -47,6 +47,10 @@ linters: nolintlint: require-explanation: true require-specific: true + wsl_v5: + allow-first-in-block: true + allow-whole-block: false + branch-max-lines: 2 exclusions: generated: lax presets: @@ -58,6 +62,11 @@ linters: - third_party$ - builtin$ - examples$ + rules: + - linters: + - revive + text: "var-naming: avoid meaningless package names" + path: "pkg/coordinator/(types|web/types|web/utils|web/api)/" formatters: enable: - gofmt diff --git a/main.go b/main.go index a1cbc3c..c425b7c 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ func main() { runChan := make(chan bool) var execErr error + go func() { execErr = cmd.Execute(ctx) @@ -31,6 +32,7 @@ func main() { case sig := <-signalChan: log.Printf("Caught signal: %v, shutdown gracefully...", sig) cancel() + select { case <-runChan: // graceful shutdown completed diff --git a/pkg/coordinator/clients/consensus/block.go b/pkg/coordinator/clients/consensus/block.go index 1e701bb..b9b211f 100644 --- a/pkg/coordinator/clients/consensus/block.go +++ b/pkg/coordinator/clients/consensus/block.go @@ -43,6 +43,7 @@ func (block *Block) GetSeenBy() []*Client { func (block *Block) SetSeenBy(client *Client) { block.seenMutex.Lock() defer block.seenMutex.Unlock() + block.seenMap[client.clientIdx] = client } diff --git a/pkg/coordinator/clients/consensus/blockcache.go b/pkg/coordinator/clients/consensus/blockcache.go index 3c5045c..48e5312 100644 --- a/pkg/coordinator/clients/consensus/blockcache.go +++ b/pkg/coordinator/clients/consensus/blockcache.go @@ -72,6 +72,7 @@ func NewBlockCache(ctx context.Context, logger logrus.FieldLogger, followDistanc logger.WithError(err2).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack())) } }() + cache.runCacheCleanup(ctx) }() @@ -202,6 +203,7 @@ func (cache *BlockCache) GetWallclock() *ethwallclock.EthereumBeaconChain { func (cache *BlockCache) SetFinalizedCheckpoint(finalizedEpoch phase0.Epoch, finalizedRoot phase0.Root) { cache.finalizedMutex.Lock() + if finalizedEpoch <= cache.finalizedEpoch { cache.finalizedMutex.Unlock() return diff --git a/pkg/coordinator/clients/consensus/clientlogic.go b/pkg/coordinator/clients/consensus/clientlogic.go index 6f6e492..c268054 100644 --- a/pkg/coordinator/clients/consensus/clientlogic.go +++ b/pkg/coordinator/clients/consensus/clientlogic.go @@ -299,6 +299,7 @@ func (client *Client) processBlock(root phase0.Root, slot phase0.Slot, header *p } client.headMutex.Lock() + if bytes.Equal(client.headRoot[:], root[:]) { client.headMutex.Unlock() return nil @@ -316,6 +317,7 @@ func (client *Client) processBlock(root phase0.Root, slot phase0.Slot, header *p func (client *Client) setFinalizedHead(epoch phase0.Epoch, root phase0.Root) error { client.headMutex.Lock() + if bytes.Equal(client.finalizedRoot[:], root[:]) { client.headMutex.Unlock() return nil diff --git a/pkg/coordinator/clients/consensus/forks.go b/pkg/coordinator/clients/consensus/forks.go index 063b60f..4c7db9b 100644 --- a/pkg/coordinator/clients/consensus/forks.go +++ b/pkg/coordinator/clients/consensus/forks.go @@ -17,6 +17,7 @@ type HeadFork struct { func (pool *Pool) resetHeadForkCache() { pool.forkCacheMutex.Lock() defer pool.forkCacheMutex.Unlock() + pool.forkCache = map[int64][]*HeadFork{} } diff --git a/pkg/coordinator/clients/consensus/rpc/beaconapi.go b/pkg/coordinator/clients/consensus/rpc/beaconapi.go index f0c3b73..934950a 100644 --- a/pkg/coordinator/clients/consensus/rpc/beaconapi.go +++ b/pkg/coordinator/clients/consensus/rpc/beaconapi.go @@ -129,8 +129,8 @@ func (bc *BeaconClient) postJSONWithHeaders(ctx context.Context, requrl string, } reader := bytes.NewReader(postDataBytes) - req, err := nethttp.NewRequestWithContext(ctx, "POST", requrl, reader) + req, err := nethttp.NewRequestWithContext(ctx, "POST", requrl, reader) if err != nil { return err } @@ -509,7 +509,7 @@ type apiAttestationData struct { Data *phase0.AttestationData `json:"data"` } -func (bc *BeaconClient) GetAttestationData(ctx context.Context, slot uint64, committeeIndex uint64) (*phase0.AttestationData, error) { +func (bc *BeaconClient) GetAttestationData(ctx context.Context, slot, committeeIndex uint64) (*phase0.AttestationData, error) { var attestationData apiAttestationData err := bc.getJSON(ctx, fmt.Sprintf("%s/eth/v1/validator/attestation_data?slot=%d&committee_index=%d", bc.endpoint, slot, committeeIndex), &attestationData) diff --git a/pkg/coordinator/clients/consensus/rpc/beaconstream.go b/pkg/coordinator/clients/consensus/rpc/beaconstream.go index 0f1f8e3..cf52f0b 100644 --- a/pkg/coordinator/clients/consensus/rpc/beaconstream.go +++ b/pkg/coordinator/clients/consensus/rpc/beaconstream.go @@ -84,6 +84,7 @@ func (bs *BeaconStream) startStream() { bs.ReadyChan <- true case err := <-stream.Errors: logger.WithField("client", bs.client.name).Warnf("beacon block stream error: %v", err) + select { case bs.ReadyChan <- false: case <-bs.ctx.Done(): @@ -136,8 +137,8 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst var stream *eventstream.Stream streamURL := fmt.Sprintf("%s/eth/v1/events?topics=%v", endpoint, topics.String()) - req, err := http.NewRequestWithContext(bs.ctx, "GET", streamURL, http.NoBody) + req, err := http.NewRequestWithContext(bs.ctx, "GET", streamURL, http.NoBody) if err == nil { for headerKey, headerVal := range bs.client.headers { req.Header.Set(headerKey, headerVal) @@ -148,6 +149,7 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst if err != nil { logger.WithField("client", bs.client.name).Warnf("Error while subscribing beacon event stream %v: %v", getRedactedURL(streamURL), err) + select { case <-bs.ctx.Done(): return nil @@ -163,11 +165,11 @@ func (bs *BeaconStream) processBlockEvent(evt eventsource.Event) { var parsed v1.BlockEvent err := json.Unmarshal([]byte(evt.Data()), &parsed) - if err != nil { logger.WithField("client", bs.client.name).Warnf("beacon block stream failed to decode block event: %v", err) return } + bs.EventChan <- &BeaconStreamEvent{ Event: StreamBlockEvent, Data: &parsed, diff --git a/pkg/coordinator/clients/consensus/rpc/eventstream/eventstream.go b/pkg/coordinator/clients/consensus/rpc/eventstream/eventstream.go index 9a32fcb..ca3268e 100644 --- a/pkg/coordinator/clients/consensus/rpc/eventstream/eventstream.go +++ b/pkg/coordinator/clients/consensus/rpc/eventstream/eventstream.go @@ -182,6 +182,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) { ev, err := dec.Decode() stream.closeMutex.Lock() + if stream.isClosed { stream.closeMutex.Unlock() return @@ -189,6 +190,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) { if err != nil { stream.Errors <- err + stream.closeMutex.Unlock() return @@ -209,6 +211,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) { } stream.Events <- pub + stream.closeMutex.Unlock() } } @@ -237,11 +240,14 @@ func (stream *Stream) retryRestartStream() { } stream.closeMutex.Lock() + if stream.isClosed { stream.closeMutex.Unlock() return } + stream.Errors <- err + stream.closeMutex.Unlock() backoff = 10 * time.Second diff --git a/pkg/coordinator/clients/consensus/subscriptions.go b/pkg/coordinator/clients/consensus/subscriptions.go index 506aab3..ecc556a 100644 --- a/pkg/coordinator/clients/consensus/subscriptions.go +++ b/pkg/coordinator/clients/consensus/subscriptions.go @@ -15,6 +15,7 @@ type Dispatcher[T interface{}] struct { func (d *Dispatcher[T]) Subscribe(capacity int) *Subscription[T] { d.mutex.Lock() defer d.mutex.Unlock() + subscription := &Subscription[T]{ channel: make(chan T, capacity), dispatcher: d, diff --git a/pkg/coordinator/clients/execution/block.go b/pkg/coordinator/clients/execution/block.go index 3c426f3..02261f5 100644 --- a/pkg/coordinator/clients/execution/block.go +++ b/pkg/coordinator/clients/execution/block.go @@ -40,6 +40,7 @@ func (block *Block) GetSeenBy() []*Client { func (block *Block) SetSeenBy(client *Client) { block.seenMutex.Lock() defer block.seenMutex.Unlock() + block.seenMap[client.clientIdx] = client if block.seenChan != nil { diff --git a/pkg/coordinator/clients/execution/blockcache.go b/pkg/coordinator/clients/execution/blockcache.go index c98bc12..4a0f045 100644 --- a/pkg/coordinator/clients/execution/blockcache.go +++ b/pkg/coordinator/clients/execution/blockcache.go @@ -49,6 +49,7 @@ func NewBlockCache(ctx context.Context, logger logrus.FieldLogger, followDistanc logger.WithError(err2).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack())) } }() + cache.runCacheCleanup(ctx) }() diff --git a/pkg/coordinator/clients/execution/clientlogic.go b/pkg/coordinator/clients/execution/clientlogic.go index 9ae0e8e..2f4ffa3 100644 --- a/pkg/coordinator/clients/execution/clientlogic.go +++ b/pkg/coordinator/clients/execution/clientlogic.go @@ -25,7 +25,6 @@ func (client *Client) runClientLoop() { for { err := client.checkClient() - if err == nil { err = client.runClientLogic() } @@ -216,6 +215,7 @@ func (client *Client) processBlock(hash common.Hash, number uint64, block *types } client.headMutex.Lock() + if bytes.Equal(client.headHash[:], hash[:]) { client.headMutex.Unlock() return nil diff --git a/pkg/coordinator/clients/execution/forks.go b/pkg/coordinator/clients/execution/forks.go index 5692b2b..9ad1120 100644 --- a/pkg/coordinator/clients/execution/forks.go +++ b/pkg/coordinator/clients/execution/forks.go @@ -17,6 +17,7 @@ type HeadFork struct { func (pool *Pool) resetHeadForkCache() { pool.forkCacheMutex.Lock() defer pool.forkCacheMutex.Unlock() + pool.forkCache = map[int64][]*HeadFork{} } diff --git a/pkg/coordinator/clients/execution/pool.go b/pkg/coordinator/clients/execution/pool.go index cf40943..c8cf71e 100644 --- a/pkg/coordinator/clients/execution/pool.go +++ b/pkg/coordinator/clients/execution/pool.go @@ -71,8 +71,8 @@ func (pool *Pool) GetBlockCache() *BlockCache { func (pool *Pool) AddEndpoint(endpoint *ClientConfig) (*Client, error) { clientIdx := pool.clientCounter pool.clientCounter++ - client, err := pool.newPoolClient(clientIdx, endpoint) + client, err := pool.newPoolClient(clientIdx, endpoint) if err != nil { return nil, err } diff --git a/pkg/coordinator/clients/execution/rpc/ethconfig.go b/pkg/coordinator/clients/execution/rpc/ethconfig.go index 628fdab..db1678d 100644 --- a/pkg/coordinator/clients/execution/rpc/ethconfig.go +++ b/pkg/coordinator/clients/execution/rpc/ethconfig.go @@ -35,8 +35,8 @@ func (ec *ExecutionClient) GetEthConfig(ctx context.Context) (*EthConfigResponse defer reqCtxCancel() var result json.RawMessage - err := ec.rpcClient.CallContext(reqCtx, &result, "eth_config") + err := ec.rpcClient.CallContext(reqCtx, &result, "eth_config") if err != nil { return nil, err } diff --git a/pkg/coordinator/clients/execution/rpc/executionapi.go b/pkg/coordinator/clients/execution/rpc/executionapi.go index c4728c3..08f4e0f 100644 --- a/pkg/coordinator/clients/execution/rpc/executionapi.go +++ b/pkg/coordinator/clients/execution/rpc/executionapi.go @@ -77,6 +77,7 @@ func (ec *ExecutionClient) GetEthClient() *ethclient.Client { func (ec *ExecutionClient) GetClientVersion(ctx context.Context) (string, error) { var result string + err := ec.rpcClient.CallContext(ctx, &result, "web3_clientVersion") return result, err diff --git a/pkg/coordinator/clients/execution/subscriptions.go b/pkg/coordinator/clients/execution/subscriptions.go index 094394e..66da715 100644 --- a/pkg/coordinator/clients/execution/subscriptions.go +++ b/pkg/coordinator/clients/execution/subscriptions.go @@ -15,6 +15,7 @@ type Dispatcher[T interface{}] struct { func (d *Dispatcher[T]) Subscribe(capacity int) *Subscription[T] { d.mutex.Lock() defer d.mutex.Unlock() + subscription := &Subscription[T]{ channel: make(chan T, capacity), dispatcher: d, diff --git a/pkg/coordinator/db/task_results.go b/pkg/coordinator/db/task_results.go index 17eec4f..1ab58bc 100644 --- a/pkg/coordinator/db/task_results.go +++ b/pkg/coordinator/db/task_results.go @@ -41,11 +41,11 @@ func (db *Database) UpsertTaskResult(tx *sqlx.Tx, result *TaskResult) error { func (db *Database) GetTaskResultByIndex(runID, taskID uint64, resultType string, index int) (*TaskResult, error) { var result TaskResult + err := db.reader.Get(&result, ` SELECT * FROM task_results WHERE run_id = $1 AND task_id = $2 AND result_type = $3 AND result_index = $4`, runID, taskID, resultType, index) - if err != nil { return nil, err } @@ -55,11 +55,11 @@ func (db *Database) GetTaskResultByIndex(runID, taskID uint64, resultType string func (db *Database) GetTaskResultByName(runID, taskID uint64, resultType, name string) (*TaskResult, error) { var result TaskResult + err := db.reader.Get(&result, ` SELECT * FROM task_results WHERE run_id = $1 AND task_id = $2 AND result_type = $3 AND name = $4`, runID, taskID, resultType, name) - if err != nil { return nil, err } @@ -69,6 +69,7 @@ func (db *Database) GetTaskResultByName(runID, taskID uint64, resultType, name s func (db *Database) GetTaskResults(runID, taskID uint64, summaryType string) ([]TaskResult, error) { var results []TaskResult + err := db.reader.Select(&results, ` SELECT * FROM task_results WHERE run_id = $1 AND task_id = $2 AND result_type = $3`, @@ -79,6 +80,7 @@ func (db *Database) GetTaskResults(runID, taskID uint64, summaryType string) ([] func (db *Database) GetAllTaskResultHeaders(runID uint64) ([]TaskResultHeader, error) { var headers []TaskResultHeader + err := db.reader.Select(&headers, ` SELECT task_id, result_type, result_index, name, size FROM task_results WHERE run_id = $1`, diff --git a/pkg/coordinator/logger/dbwriter.go b/pkg/coordinator/logger/dbwriter.go index b2704a1..be720d3 100644 --- a/pkg/coordinator/logger/dbwriter.go +++ b/pkg/coordinator/logger/dbwriter.go @@ -98,12 +98,14 @@ func (lh *logDBWriter) flushDelayed() { lh.bufMtx.Lock() defer lh.bufMtx.Unlock() + lh.flushToDB() }() } func (lh *logDBWriter) flushToDB() { lh.flushMtx.Lock() + defer func() { lh.flushMtx.Unlock() }() @@ -126,7 +128,6 @@ func (lh *logDBWriter) flushToDB() { return nil }) - if err != nil { lh.logger.logger.Errorf("failed to write log entries to db: %v", err) return diff --git a/pkg/coordinator/names/validatornames.go b/pkg/coordinator/names/validatornames.go index c0ee4bd..eb7b05e 100644 --- a/pkg/coordinator/names/validatornames.go +++ b/pkg/coordinator/names/validatornames.go @@ -107,8 +107,8 @@ func (vn *ValidatorNames) parseNamesMap(names map[string]string) int { for idxStr, name := range names { rangeParts := strings.Split(idxStr, "-") - minIdx, err := strconv.ParseUint(rangeParts[0], 10, 64) + minIdx, err := strconv.ParseUint(rangeParts[0], 10, 64) if err != nil { continue } @@ -138,8 +138,8 @@ func (vn *ValidatorNames) loadFromRangesAPI(apiURL string) error { vn.logger.Debugf("Loading validator names from inventory: %v", apiURL) client := &http.Client{Timeout: time.Second * 120} - resp, err := client.Get(apiURL) + resp, err := client.Get(apiURL) if err != nil { return fmt.Errorf("could not fetch inventory (%v): %v", getRedactedURL(apiURL), err) } diff --git a/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go b/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go index 7c4121f..52e3e68 100644 --- a/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go +++ b/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go @@ -106,6 +106,7 @@ func (t *Task) Execute(ctx context.Context) error { t.logger.Infof("current epoch: %v, starting attestation aggregation at epoch %v", lastCheckedEpoch, lastCheckedEpoch+1) t.attesterDutyMap = map[uint64]map[phase0.Root]*attesterDuties{} + defer func() { t.attesterDutyMap = nil }() diff --git a/pkg/coordinator/tasks/check_consensus_identity/task.go b/pkg/coordinator/tasks/check_consensus_identity/task.go index c25f13b..2df01bf 100644 --- a/pkg/coordinator/tasks/check_consensus_identity/task.go +++ b/pkg/coordinator/tasks/check_consensus_identity/task.go @@ -413,6 +413,7 @@ func (t *Task) decodeENR(raw string) (*enr.Record, error) { } var r enr.Record + err = rlp.DecodeBytes(dec[:n], &r) return &r, err diff --git a/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go b/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go index 16a467c..66c5d85 100644 --- a/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go +++ b/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go @@ -118,7 +118,6 @@ func (t *Task) loadEpochDuties(ctx context.Context, epoch uint64) { } proposerDuties, err := client.GetRPCClient().GetProposerDuties(ctx, epoch) - if err != nil { t.logger.Errorf("error while fetching epoch duties: %v", err.Error()) return @@ -158,8 +157,8 @@ func (t *Task) runProposerDutyCheck(slot uint64) bool { if t.config.ValidatorNamePattern != "" { validatorName := t.ctx.Scheduler.GetServices().ValidatorNames().GetValidatorName(uint64(duty.ValidatorIndex)) - matched, err := regexp.MatchString(t.config.ValidatorNamePattern, validatorName) + matched, err := regexp.MatchString(t.config.ValidatorNamePattern, validatorName) if err != nil { t.logger.Errorf("slot %v check failed: validator name pattern invalid: %v", slot, err) return false diff --git a/pkg/coordinator/tasks/check_consensus_validator_status/task.go b/pkg/coordinator/tasks/check_consensus_validator_status/task.go index aace3b1..ba24c24 100644 --- a/pkg/coordinator/tasks/check_consensus_validator_status/task.go +++ b/pkg/coordinator/tasks/check_consensus_validator_status/task.go @@ -171,8 +171,8 @@ func (t *Task) runValidatorStatusCheck() bool { validatorJSON, err := json.Marshal(validator) if err == nil { validatorMap := map[string]interface{}{} - err = json.Unmarshal(validatorJSON, &validatorMap) + err = json.Unmarshal(validatorJSON, &validatorMap) if err == nil { t.ctx.Vars.SetVar(t.config.ValidatorInfoResultVar, validatorMap) } else { diff --git a/pkg/coordinator/tasks/generate_attestations/config.go b/pkg/coordinator/tasks/generate_attestations/config.go index 8e32d0a..2c63955 100644 --- a/pkg/coordinator/tasks/generate_attestations/config.go +++ b/pkg/coordinator/tasks/generate_attestations/config.go @@ -30,8 +30,8 @@ type Config struct { } // ParseRandomLateHead parses the RandomLateHead string in "min:max" or "min-max" format. -// Returns min, max values and whether random late head is enabled. -func (c *Config) ParseRandomLateHead() (min, max int, enabled bool, err error) { +// Returns minVal, maxVal values and whether random late head is enabled. +func (c *Config) ParseRandomLateHead() (minVal, maxVal int, enabled bool, err error) { if c.RandomLateHead == "" { return 0, 0, false, nil } @@ -48,21 +48,21 @@ func (c *Config) ParseRandomLateHead() (min, max int, enabled bool, err error) { return 0, 0, false, fmt.Errorf("randomLateHead must be in 'min:max' or 'min-max' format, got: %s", c.RandomLateHead) } - min, err = strconv.Atoi(strings.TrimSpace(parts[0])) + minVal, err = strconv.Atoi(strings.TrimSpace(parts[0])) if err != nil { return 0, 0, false, fmt.Errorf("invalid min value in randomLateHead: %w", err) } - max, err = strconv.Atoi(strings.TrimSpace(parts[1])) + maxVal, err = strconv.Atoi(strings.TrimSpace(parts[1])) if err != nil { return 0, 0, false, fmt.Errorf("invalid max value in randomLateHead: %w", err) } - if min > max { - return 0, 0, false, fmt.Errorf("min (%d) cannot be greater than max (%d) in randomLateHead", min, max) + if minVal > maxVal { + return 0, 0, false, fmt.Errorf("min (%d) cannot be greater than max (%d) in randomLateHead", minVal, maxVal) } - return min, max, true, nil + return minVal, maxVal, true, nil } func DefaultConfig() Config { diff --git a/pkg/coordinator/tasks/generate_attestations/task.go b/pkg/coordinator/tasks/generate_attestations/task.go index 9a27ef3..46f75fe 100644 --- a/pkg/coordinator/tasks/generate_attestations/task.go +++ b/pkg/coordinator/tasks/generate_attestations/task.go @@ -160,6 +160,7 @@ func (t *Task) Execute(ctx context.Context) error { if count > 0 { totalAttestations += count + t.ctx.SetResult(types.TaskResultSuccess) t.logger.Infof("sent %d attestations for slot %d (total: %d)", count, slot.Number(), totalAttestations) } @@ -182,6 +183,7 @@ func (t *Task) Execute(ctx context.Context) error { t.logger.Infof("processing all attestations for epoch %d", prevEpoch) epochAttestations := 0 + for slotOffset := uint64(0); slotOffset < specs.SlotsPerEpoch; slotOffset++ { targetSlot := prevEpoch*specs.SlotsPerEpoch + slotOffset @@ -198,6 +200,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitTotal > 0 && totalAttestations >= t.config.LimitTotal { t.logger.Infof("reached total attestation limit: %d", totalAttestations) t.ctx.SetResult(types.TaskResultSuccess) + return nil } } @@ -266,7 +269,7 @@ func (t *Task) initValidatorKeys() error { return nil } -func (t *Task) processSlot(ctx context.Context, slot uint64, startEpoch uint64) (int, error) { +func (t *Task) processSlot(ctx context.Context, slot, startEpoch uint64) (int, error) { consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() specs := consensusPool.GetBlockCache().GetSpecs() @@ -328,7 +331,7 @@ func (t *Task) processSlot(ctx context.Context, slot uint64, startEpoch uint64) // processSlotForEpoch processes attestations for a specific slot in a given epoch. // This is used by sendAllLastEpoch mode where we know the target epoch directly. -func (t *Task) processSlotForEpoch(ctx context.Context, slot uint64, epoch uint64) (int, error) { +func (t *Task) processSlotForEpoch(ctx context.Context, slot, epoch uint64) (int, error) { // Get committee duties for the epoch duties, err := t.getCommitteeDuties(ctx, epoch) if err != nil { @@ -384,7 +387,7 @@ func (t *Task) findSlotDuties(slot uint64, duties []*v1.BeaconCommittee) []*vali validatorIndex: valIdx, committeeIndex: committee.Index, committeeLength: uint64(len(committee.Validators)), - positionInCommittee: uint64(position), + positionInCommittee: uint64(position), //nolint:gosec // position from range is always non-negative }) } } @@ -439,16 +442,21 @@ func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, co // Get attestation data from beacon node, retry with different clients if needed var attData *phase0.AttestationData + var lastErr error + for _, client := range clients { var err error + attData, err = client.GetRPCClient().GetAttestationData(ctx, slot, uint64(committeeIdx)) if err == nil { break } + lastErr = err t.logger.Debugf("failed to get attestation data from %s: %v, trying next client", client.GetName(), err) } + if attData == nil { return 0, fmt.Errorf("failed to get attestation data from all clients: %w", lastErr) } @@ -470,6 +478,7 @@ func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, co // Compute the signing domain epoch := uint64(attData.Slot) / specs.SlotsPerEpoch + forkVersion := forkState.CurrentVersion if epoch < uint64(forkState.Epoch) { forkVersion = forkState.PreviousVersion @@ -483,16 +492,19 @@ func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, co // Parse random late head config randomMin, randomMax, randomEnabled, _ := t.config.ParseRandomLateHead() + clusterSize := t.config.LateHeadClusterSize if clusterSize <= 0 { clusterSize = 1 // Default: each attestation gets its own random offset } // Create SingleAttestation objects for each validator (Electra format) - var singleAttestations []*rpc.SingleAttestation + singleAttestations := make([]*rpc.SingleAttestation, 0, len(duties)) var currentClusterOffset int + var clusterAttData *phase0.AttestationData + attestationCount := 0 for _, duty := range duties { @@ -503,9 +515,11 @@ func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, co // Apply per-attestation or per-cluster random late head if configured attDataForValidator := attData + if randomEnabled { // Generate new random offset at start or when cluster is full if attestationCount%clusterSize == 0 { + //nolint:gosec // G404: not security-critical, used for test randomization currentClusterOffset = randomMin + rand.Intn(randomMax-randomMin+1) if currentClusterOffset != 0 { clusterAttData = t.applyLateHead(attData, currentClusterOffset) @@ -513,21 +527,22 @@ func (t *Task) generateAndSubmitAttestation(ctx context.Context, slot uint64, co clusterAttData = attData } } + attDataForValidator = clusterAttData attestationCount++ } // Sign attestation data - msgRoot, err := attDataForValidator.HashTreeRoot() - if err != nil { - return 0, fmt.Errorf("failed to hash attestation data: %w", err) + msgRoot, hashErr := attDataForValidator.HashTreeRoot() + if hashErr != nil { + return 0, fmt.Errorf("failed to hash attestation data: %w", hashErr) } signingRoot := common.ComputeSigningRoot(msgRoot, dom) var secKey hbls.SecretKey - if err := secKey.Deserialize(valKey.privkey.Marshal()); err != nil { - return 0, fmt.Errorf("failed to deserialize private key: %w", err) + if deserializeErr := secKey.Deserialize(valKey.privkey.Marshal()); deserializeErr != nil { + return 0, fmt.Errorf("failed to deserialize private key: %w", deserializeErr) } sig := secKey.SignHash(signingRoot[:]) @@ -570,6 +585,7 @@ func (t *Task) applyLateHead(attData *phase0.AttestationData, offset int) *phase if newSlot < targetEpochFirstSlot { t.logger.Debugf("late head offset %d would result in invalid head (slot %d < target epoch slot %d), clamping to target", offset, newSlot, targetEpochFirstSlot) + newRoot = attData.Target.Root newSlot = targetEpochFirstSlot } @@ -596,7 +612,7 @@ func (t *Task) applyLateHead(attData *phase0.AttestationData, offset int) *phase // walkBlocks walks N blocks from the given root. // Positive steps go backwards (using parentRoot), negative steps go forward (finding child blocks). // Returns the resulting root and slot. Always returns a valid slot from the last known block. -func (t *Task) walkBlocks(startRoot phase0.Root, startSlot uint64, steps int) (phase0.Root, uint64) { +func (t *Task) walkBlocks(startRoot phase0.Root, startSlot uint64, steps int) (resultRoot phase0.Root, resultSlot uint64) { if steps > 0 { return t.walkBackBlocks(startRoot, startSlot, steps) } else if steps < 0 { @@ -616,7 +632,7 @@ func (t *Task) walkBlocks(startRoot phase0.Root, startSlot uint64, steps int) (p } // walkBackBlocks walks back N blocks from the given root using parentRoot. -func (t *Task) walkBackBlocks(startRoot phase0.Root, startSlot uint64, steps int) (phase0.Root, uint64) { +func (t *Task) walkBackBlocks(startRoot phase0.Root, startSlot uint64, steps int) (resultRoot phase0.Root, resultSlot uint64) { consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() blockCache := consensusPool.GetBlockCache() @@ -654,7 +670,7 @@ func (t *Task) walkBackBlocks(startRoot phase0.Root, startSlot uint64, steps int } // walkForwardBlocks walks forward N blocks from the given root by finding child blocks. -func (t *Task) walkForwardBlocks(startRoot phase0.Root, startSlot uint64, steps int) (phase0.Root, uint64) { +func (t *Task) walkForwardBlocks(startRoot phase0.Root, startSlot uint64, steps int) (resultRoot phase0.Root, resultSlot uint64) { consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() blockCache := consensusPool.GetBlockCache() @@ -722,7 +738,9 @@ func (t *Task) getClients() []*consensus.Client { if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { allClients := consensusPool.GetAllEndpoints() + clients := make([]*consensus.Client, 0, len(allClients)) + for _, c := range allClients { if consensusPool.IsClientReady(c) { clients = append(clients, c) @@ -733,7 +751,9 @@ func (t *Task) getClients() []*consensus.Client { } poolClients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) + clients := make([]*consensus.Client, 0, len(poolClients)) + for _, c := range poolClients { if c.ConsensusClient != nil && consensusPool.IsClientReady(c.ConsensusClient) { clients = append(clients, c.ConsensusClient) diff --git a/pkg/coordinator/tasks/generate_blob_transactions/task.go b/pkg/coordinator/tasks/generate_blob_transactions/task.go index fa1d03a..cced7bf 100644 --- a/pkg/coordinator/tasks/generate_blob_transactions/task.go +++ b/pkg/coordinator/tasks/generate_blob_transactions/task.go @@ -203,6 +203,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerBlock > 0 && perBlockCount >= t.config.LimitPerBlock { // await next block perBlockCount = 0 + select { case <-ctx.Done(): return nil @@ -338,6 +339,7 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c t.logger.WithFields(logrus.Fields{ "client": client.GetName(), }).Warnf("error sending tx %v: %v", transactionIdx, err) + return } diff --git a/pkg/coordinator/tasks/generate_bls_changes/task.go b/pkg/coordinator/tasks/generate_bls_changes/task.go index 3687043..792cc89 100644 --- a/pkg/coordinator/tasks/generate_bls_changes/task.go +++ b/pkg/coordinator/tasks/generate_bls_changes/task.go @@ -141,6 +141,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerSlot > 0 && perSlotCount >= t.config.LimitPerSlot { // await next block perSlotCount = 0 + select { case <-ctx.Done(): return nil diff --git a/pkg/coordinator/tasks/generate_consolidations/task.go b/pkg/coordinator/tasks/generate_consolidations/task.go index db76292..26b0071 100644 --- a/pkg/coordinator/tasks/generate_consolidations/task.go +++ b/pkg/coordinator/tasks/generate_consolidations/task.go @@ -155,9 +155,10 @@ func (t *Task) Execute(ctx context.Context) error { } receiptsMapMutex.Lock() + consolidationReceipts[tx.Hash().Hex()] = receipt - receiptsMapMutex.Unlock() + receiptsMapMutex.Unlock() pendingWg.Done() switch { @@ -197,6 +198,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerSlot > 0 && perSlotCount >= t.config.LimitPerSlot { // await next block perSlotCount = 0 + select { case <-ctx.Done(): return nil @@ -228,8 +230,8 @@ func (t *Task) Execute(ctx context.Context) error { receiptJSON, err := json.Marshal(receipt) if err == nil { receiptMap = map[string]interface{}{} - err = json.Unmarshal(receiptJSON, &receiptMap) + err = json.Unmarshal(receiptJSON, &receiptMap) if err != nil { t.logger.Errorf("could not unmarshal transaction receipt for result var: %v", err) @@ -400,7 +402,6 @@ func (t *Task) generateConsolidation(ctx context.Context, accountIdx uint64, onC RebroadcastInterval: 30 * time.Second, MaxRebroadcasts: 5, }) - if err != nil { return nil, fmt.Errorf("failed sending consolidation transaction: %w", err) } diff --git a/pkg/coordinator/tasks/generate_deposits/task.go b/pkg/coordinator/tasks/generate_deposits/task.go index 67a6eee..83d199c 100644 --- a/pkg/coordinator/tasks/generate_deposits/task.go +++ b/pkg/coordinator/tasks/generate_deposits/task.go @@ -164,7 +164,9 @@ func (t *Task) Execute(ctx context.Context) error { } depositReceiptsMtx.Lock() + depositReceipts[tx.Hash().Hex()] = receipt + depositReceiptsMtx.Unlock() switch { @@ -207,6 +209,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerSlot > 0 && perSlotCount >= t.config.LimitPerSlot { // await next block perSlotCount = 0 + select { case <-ctx.Done(): return nil @@ -245,8 +248,8 @@ func (t *Task) Execute(ctx context.Context) error { receiptJSON, err := json.Marshal(receipt) if err == nil { receiptMap = map[string]interface{}{} - err = json.Unmarshal(receiptJSON, &receiptMap) + err = json.Unmarshal(receiptJSON, &receiptMap) if err != nil { t.logger.Errorf("could not unmarshal transaction receipt for result var: %v", err) @@ -459,7 +462,6 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, onConfirm RebroadcastInterval: 30 * time.Second, MaxRebroadcasts: 5, }) - if err != nil { return nil, nil, fmt.Errorf("failed sending deposit transaction: %w", err) } diff --git a/pkg/coordinator/tasks/generate_eoa_transactions/task.go b/pkg/coordinator/tasks/generate_eoa_transactions/task.go index 2da1832..222de87 100644 --- a/pkg/coordinator/tasks/generate_eoa_transactions/task.go +++ b/pkg/coordinator/tasks/generate_eoa_transactions/task.go @@ -225,6 +225,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerBlock > 0 && perBlockCount >= t.config.LimitPerBlock { // await next block perBlockCount = 0 + select { case <-ctx.Done(): return nil @@ -379,6 +380,7 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c t.logger.WithFields(logrus.Fields{ "client": client.GetName(), }).Warnf("error sending tx %v: %v", transactionIdx, err) + return } @@ -399,7 +401,6 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c RebroadcastInterval: 30 * time.Second, MaxRebroadcasts: 5, }) - if err != nil { if strings.Contains(strings.ToLower(err.Error()), "nonce") { txWallet.ResyncState() diff --git a/pkg/coordinator/tasks/generate_exits/task.go b/pkg/coordinator/tasks/generate_exits/task.go index 02fd078..83f6a0c 100644 --- a/pkg/coordinator/tasks/generate_exits/task.go +++ b/pkg/coordinator/tasks/generate_exits/task.go @@ -135,6 +135,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerSlot > 0 && perSlotCount >= t.config.LimitPerSlot { // await next block perSlotCount = 0 + select { case <-ctx.Done(): return nil diff --git a/pkg/coordinator/tasks/generate_slashings/task.go b/pkg/coordinator/tasks/generate_slashings/task.go index ab0b3ed..22de3ba 100644 --- a/pkg/coordinator/tasks/generate_slashings/task.go +++ b/pkg/coordinator/tasks/generate_slashings/task.go @@ -137,6 +137,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerSlot > 0 && perSlotCount >= t.config.LimitPerSlot { // await next block perSlotCount = 0 + select { case <-ctx.Done(): return nil diff --git a/pkg/coordinator/tasks/generate_withdrawal_requests/task.go b/pkg/coordinator/tasks/generate_withdrawal_requests/task.go index 7fa55db..76c1f78 100644 --- a/pkg/coordinator/tasks/generate_withdrawal_requests/task.go +++ b/pkg/coordinator/tasks/generate_withdrawal_requests/task.go @@ -150,9 +150,10 @@ func (t *Task) Execute(ctx context.Context) error { } receiptsMapMutex.Lock() + withdrawalReceipts[tx.Hash().Hex()] = receipt - receiptsMapMutex.Unlock() + receiptsMapMutex.Unlock() pendingWg.Done() switch { @@ -188,6 +189,7 @@ func (t *Task) Execute(ctx context.Context) error { if t.config.LimitPerSlot > 0 && perSlotCount >= t.config.LimitPerSlot { // await next block perSlotCount = 0 + select { case <-ctx.Done(): return nil @@ -219,8 +221,8 @@ func (t *Task) Execute(ctx context.Context) error { receiptJSON, err := json.Marshal(receipt) if err == nil { receiptMap = map[string]interface{}{} - err = json.Unmarshal(receiptJSON, &receiptMap) + err = json.Unmarshal(receiptJSON, &receiptMap) if err != nil { t.logger.Errorf("could not unmarshal transaction receipt for result var: %v", err) @@ -374,6 +376,7 @@ func (t *Task) generateWithdrawal(ctx context.Context, accountIdx uint64, onConf t.logger.WithFields(logrus.Fields{ "client": client.GetName(), }).Warnf("error sending withdrawal tx %v: %v", accountIdx, err) + return } @@ -394,7 +397,6 @@ func (t *Task) generateWithdrawal(ctx context.Context, accountIdx uint64, onConf RebroadcastInterval: 30 * time.Second, MaxRebroadcasts: 5, }) - if err != nil { return nil, fmt.Errorf("failed sending withdrawal transaction: %w", err) } diff --git a/pkg/coordinator/tasks/get_consensus_validators/task.go b/pkg/coordinator/tasks/get_consensus_validators/task.go index 2a294ff..73f4633 100644 --- a/pkg/coordinator/tasks/get_consensus_validators/task.go +++ b/pkg/coordinator/tasks/get_consensus_validators/task.go @@ -101,8 +101,8 @@ func (t *Task) Execute(_ context.Context) error { if t.config.ValidatorNamePattern != "" { var err error - validatorNameRegex, err = regexp.Compile(t.config.ValidatorNamePattern) + validatorNameRegex, err = regexp.Compile(t.config.ValidatorNamePattern) if err != nil { return fmt.Errorf("invalid validator name pattern: %v", err) } diff --git a/pkg/coordinator/tasks/run_shell/task.go b/pkg/coordinator/tasks/run_shell/task.go index 9894ffb..0bf770d 100644 --- a/pkg/coordinator/tasks/run_shell/task.go +++ b/pkg/coordinator/tasks/run_shell/task.go @@ -185,6 +185,7 @@ func (t *Task) Execute(ctx context.Context) error { var execErr error waitChan := make(chan bool) + go func() { defer close(waitChan) diff --git a/pkg/coordinator/tasks/run_task_matrix/task.go b/pkg/coordinator/tasks/run_task_matrix/task.go index ec5f49d..bb1f0e0 100644 --- a/pkg/coordinator/tasks/run_task_matrix/task.go +++ b/pkg/coordinator/tasks/run_task_matrix/task.go @@ -163,6 +163,7 @@ func (t *Task) Execute(ctx context.Context) error { } completeChan := make(chan bool) + go func() { taskWaitGroup.Wait() time.Sleep(100 * time.Millisecond) diff --git a/pkg/coordinator/tasks/run_tasks_concurrent/task.go b/pkg/coordinator/tasks/run_tasks_concurrent/task.go index ae6ee4d..16e4477 100644 --- a/pkg/coordinator/tasks/run_tasks_concurrent/task.go +++ b/pkg/coordinator/tasks/run_tasks_concurrent/task.go @@ -141,6 +141,7 @@ func (t *Task) Execute(ctx context.Context) error { } completeChan := make(chan bool) + go func() { taskWaitGroup.Wait() time.Sleep(100 * time.Millisecond) diff --git a/pkg/coordinator/testregistry.go b/pkg/coordinator/testregistry.go index 4fa4796..f7e212c 100644 --- a/pkg/coordinator/testregistry.go +++ b/pkg/coordinator/testregistry.go @@ -158,7 +158,6 @@ func (c *TestRegistry) LoadTests(ctx context.Context, local []*types.TestConfig, return nil }) - if err != nil { c.coordinator.Logger().Errorf("error adding new test configs to db: %v", err) } @@ -260,7 +259,6 @@ func (c *TestRegistry) AddExternalTest(ctx context.Context, extTestCfg *types.Ex err = c.coordinator.Database().RunTransaction(func(tx *sqlx.Tx) error { return c.coordinator.Database().InsertTestConfig(tx, dbTestCfg) }) - if err != nil { c.coordinator.Logger().Errorf("error adding new test configs to db: %v", err) } @@ -327,6 +325,7 @@ func (c *TestRegistry) externalTestCfgToDB(cfgExternalTest *types.ExternalTestCo func (c *TestRegistry) DeleteTest(testID string) error { c.testDescriptorsMutex.Lock() + if _, ok := c.testDescriptors[testID]; !ok { c.testDescriptorsMutex.Unlock() return nil diff --git a/pkg/coordinator/testrunner.go b/pkg/coordinator/testrunner.go index 72ef0b1..950f79c 100644 --- a/pkg/coordinator/testrunner.go +++ b/pkg/coordinator/testrunner.go @@ -119,11 +119,13 @@ func (c *TestRunner) createTestRun(descriptor types.TestDescriptor, configOverri } c.testRegistryMutex.Lock() + if !skipQueue { c.testQueue = append(c.testQueue, testRef) } c.testRunMap[runID] = testRef + c.testRegistryMutex.Unlock() return testRef, nil @@ -142,10 +144,12 @@ runLoop: var nextTest types.TestRunner c.testRegistryMutex.Lock() + if len(c.testQueue) > 0 { nextTest = c.testQueue[0] c.testQueue = c.testQueue[1:] } + c.testRegistryMutex.Unlock() if nextTest != nil { @@ -158,7 +162,9 @@ runLoop: c.runTest(ctx, nextTest) } + semaphore <- true + if ctx.Err() != nil { break runLoop } diff --git a/pkg/coordinator/vars/variables.go b/pkg/coordinator/vars/variables.go index 61c3be5..2c7ccef 100644 --- a/pkg/coordinator/vars/variables.go +++ b/pkg/coordinator/vars/variables.go @@ -176,6 +176,7 @@ func (v *Variables) GetVarsMap(varsMap map[string]any, skipParent bool) map[stri } v.varsMutex.RLock() + for scopeName, subScope := range v.subScopes { _, exists := varsMap[scopeName] if exists { @@ -193,6 +194,7 @@ func (v *Variables) GetVarsMap(varsMap map[string]any, skipParent bool) map[stri varsMap[varName] = varData.value } + v.varsMutex.RUnlock() if v.parentScope != nil && !skipParent { @@ -200,6 +202,7 @@ func (v *Variables) GetVarsMap(varsMap map[string]any, skipParent bool) map[stri } v.varsMutex.RLock() + for varName, varData := range v.defaultsMap { _, exists := varsMap[varName] if exists { @@ -208,6 +211,7 @@ func (v *Variables) GetVarsMap(varsMap map[string]any, skipParent bool) map[stri varsMap[varName] = varData.value } + v.varsMutex.RUnlock() return varsMap diff --git a/pkg/coordinator/wallet/wallet.go b/pkg/coordinator/wallet/wallet.go index bfe6264..d923997 100644 --- a/pkg/coordinator/wallet/wallet.go +++ b/pkg/coordinator/wallet/wallet.go @@ -67,6 +67,7 @@ func (manager *Manager) newWallet(address common.Address) *Wallet { func (wallet *Wallet) loadState() { wallet.syncingMutex.Lock() + alreadySyncing := false if wallet.isSyncing { @@ -74,6 +75,7 @@ func (wallet *Wallet) loadState() { } else { wallet.isSyncing = true } + wallet.syncingMutex.Unlock() if alreadySyncing { @@ -263,6 +265,7 @@ func (wallet *Wallet) BuildTransaction(ctx context.Context, buildFn func(ctx con signer := types.NewPragueSigner(wallet.manager.clientPool.GetBlockCache().GetChainID()) nonce := wallet.pendingNonce + tx, err := buildFn(ctx, nonce, func(addr common.Address, tx *types.Transaction) (*types.Transaction, error) { if !bytes.Equal(addr[:], wallet.address[:]) { return nil, fmt.Errorf("cannot sign for another wallet") @@ -275,13 +278,11 @@ func (wallet *Wallet) BuildTransaction(ctx context.Context, buildFn func(ctx con return signedTx, nil }) - if err != nil { return nil, err } signedTx, err := types.SignTx(tx, signer, wallet.privkey) - if err != nil { return nil, err } diff --git a/pkg/coordinator/wallet/walletpool.go b/pkg/coordinator/wallet/walletpool.go index 05303c1..0f27abe 100644 --- a/pkg/coordinator/wallet/walletpool.go +++ b/pkg/coordinator/wallet/walletpool.go @@ -186,7 +186,6 @@ func (pool *WalletPool) EnsureFunding(ctx context.Context, minBalance, refillAmo go func(tx *types.Transaction) { _, err := pool.rootWallet.AwaitTransaction(ctx, tx) - if err != nil { pool.logger.Warnf("failed awaiting child wallet refill tx: %v", err) refillError = err diff --git a/pkg/coordinator/web/api/handler.go b/pkg/coordinator/web/api/handler.go index 817c7cb..2418afc 100644 --- a/pkg/coordinator/web/api/handler.go +++ b/pkg/coordinator/web/api/handler.go @@ -43,8 +43,8 @@ func (ah *APIHandler) sendErrorResponse(w http.ResponseWriter, route, message st j := json.NewEncoder(w) response := &Response{} response.Status = "ERROR: " + message - err := j.Encode(response) + err := j.Encode(response) if err != nil { ah.logger.Errorf("error serializing json error for API %v route: %v", route, err) } @@ -56,8 +56,8 @@ func (ah *APIHandler) sendOKResponse(w http.ResponseWriter, route string, data i Status: "OK", Data: data, } - err := j.Encode(response) + err := j.Encode(response) if err != nil { ah.logger.Errorf("error serializing json data for API %v route: %v", route, err) } diff --git a/pkg/coordinator/web/handlers/clients.go b/pkg/coordinator/web/handlers/clients.go index 0ea4ab8..79ed43e 100644 --- a/pkg/coordinator/web/handlers/clients.go +++ b/pkg/coordinator/web/handlers/clients.go @@ -46,8 +46,8 @@ func (fh *FrontendHandler) Clients(w http.ResponseWriter, r *http.Request) { data := fh.initPageData(r, "clients", "/clients", "Clients", templateFiles) var pageError error - data.Data, pageError = fh.getClientsPageData() + data.Data, pageError = fh.getClientsPageData() if pageError != nil { fh.HandlePageError(w, r, pageError) return diff --git a/pkg/coordinator/web/handlers/handler.go b/pkg/coordinator/web/handlers/handler.go index e785d28..6a7eca0 100644 --- a/pkg/coordinator/web/handlers/handler.go +++ b/pkg/coordinator/web/handlers/handler.go @@ -234,8 +234,8 @@ func (fh *FrontendHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } name := path.Clean(upath) - f, err := fh.rootFileSys.Open(name) + f, err := fh.rootFileSys.Open(name) if err != nil { fh.handleHTTPError(err, fh.HandleNotFound, w, r) return diff --git a/pkg/coordinator/web/handlers/index.go b/pkg/coordinator/web/handlers/index.go index 2b5d3ed..21b29ef 100644 --- a/pkg/coordinator/web/handlers/index.go +++ b/pkg/coordinator/web/handlers/index.go @@ -91,8 +91,8 @@ func (fh *FrontendHandler) Index(w http.ResponseWriter, r *http.Request) { pageArgs := fh.parseIndexPageArgs(r) var pageError error - data.Data, pageError = fh.getIndexPageData(pageArgs) + data.Data, pageError = fh.getIndexPageData(pageArgs) if pageError != nil { fh.HandlePageError(w, r, pageError) return @@ -114,8 +114,8 @@ func (fh *FrontendHandler) IndexData(w http.ResponseWriter, r *http.Request) { var pageError error pageArgs := fh.parseIndexPageArgs(r) - pageData, pageError = fh.getIndexPageData(pageArgs) + pageData, pageError = fh.getIndexPageData(pageArgs) if pageError != nil { fh.HandlePageError(w, r, pageError) return diff --git a/pkg/coordinator/web/handlers/registry.go b/pkg/coordinator/web/handlers/registry.go index f21cf3a..5318ce0 100644 --- a/pkg/coordinator/web/handlers/registry.go +++ b/pkg/coordinator/web/handlers/registry.go @@ -89,8 +89,8 @@ func (fh *FrontendHandler) Registry(w http.ResponseWriter, r *http.Request) { pageArgs := fh.parseRegistryPageArgs(r) var pageError error - data.Data, pageError = fh.getRegistryPageData(pageArgs) + data.Data, pageError = fh.getRegistryPageData(pageArgs) if pageError != nil { fh.HandlePageError(w, r, pageError) return @@ -112,8 +112,8 @@ func (fh *FrontendHandler) RegistryData(w http.ResponseWriter, r *http.Request) var pageError error pageArgs := fh.parseRegistryPageArgs(r) - pageData, pageError = fh.getRegistryPageData(pageArgs) + pageData, pageError = fh.getRegistryPageData(pageArgs) if pageError != nil { fh.HandlePageError(w, r, pageError) return diff --git a/pkg/coordinator/web/handlers/test.go b/pkg/coordinator/web/handlers/test.go index fc87a91..015cc2b 100644 --- a/pkg/coordinator/web/handlers/test.go +++ b/pkg/coordinator/web/handlers/test.go @@ -109,8 +109,8 @@ func (fh *FrontendHandler) TestPageData(w http.ResponseWriter, r *http.Request) vars := mux.Vars(r) pageArgs := fh.parseTestPageArgs(r) - pageData, pageError = fh.getTestPageData(vars["testId"], pageArgs) + pageData, pageError = fh.getTestPageData(vars["testId"], pageArgs) if pageError != nil { fh.HandlePageError(w, r, pageError) return diff --git a/pkg/coordinator/web/templates/templates.go b/pkg/coordinator/web/templates/templates.go index 9d940c3..c3843d3 100644 --- a/pkg/coordinator/web/templates/templates.go +++ b/pkg/coordinator/web/templates/templates.go @@ -43,10 +43,12 @@ func (t *Templates) GetTemplate(files ...string) *template.Template { name := strings.Join(files, "-") t.cacheMux.RLock() + if t.cache[name] != nil { defer t.cacheMux.RUnlock() return t.cache[name] } + t.cacheMux.RUnlock() tmpl := template.New(name).Funcs(t.funcs)