Skip to content

Commit eedde9a

Browse files
authored
Merge pull request #131 from ethpandaops/pk910/generate-attestations
add `generate_attestations` task
2 parents fdfeff1 + 70d6f64 commit eedde9a

File tree

55 files changed

+1156
-40
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1156
-40
lines changed

.github/workflows/_shared-check.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
3737
with:
3838
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
39-
version: v2.1.6
39+
version: v2.7.2
4040

4141
# Optional: working directory, useful for monorepos
4242
# working-directory: somedir

.golangci.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ linters:
2727
- unparam
2828
- unused
2929
- whitespace
30-
- wsl
30+
- wsl_v5
3131
settings:
3232
errcheck:
3333
check-type-assertions: true
@@ -47,6 +47,10 @@ linters:
4747
nolintlint:
4848
require-explanation: true
4949
require-specific: true
50+
wsl_v5:
51+
allow-first-in-block: true
52+
allow-whole-block: false
53+
branch-max-lines: 2
5054
exclusions:
5155
generated: lax
5256
presets:
@@ -58,6 +62,11 @@ linters:
5862
- third_party$
5963
- builtin$
6064
- examples$
65+
rules:
66+
- linters:
67+
- revive
68+
text: "var-naming: avoid meaningless package names"
69+
path: "pkg/coordinator/(types|web/types|web/utils|web/api)/"
6170
formatters:
6271
enable:
6372
- gofmt

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func main() {
1919
runChan := make(chan bool)
2020

2121
var execErr error
22+
2223
go func() {
2324
execErr = cmd.Execute(ctx)
2425

@@ -31,6 +32,7 @@ func main() {
3132
case sig := <-signalChan:
3233
log.Printf("Caught signal: %v, shutdown gracefully...", sig)
3334
cancel()
35+
3436
select {
3537
case <-runChan:
3638
// graceful shutdown completed

pkg/coordinator/clients/consensus/block.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func (block *Block) GetSeenBy() []*Client {
4343
func (block *Block) SetSeenBy(client *Client) {
4444
block.seenMutex.Lock()
4545
defer block.seenMutex.Unlock()
46+
4647
block.seenMap[client.clientIdx] = client
4748
}
4849

pkg/coordinator/clients/consensus/blockcache.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func NewBlockCache(ctx context.Context, logger logrus.FieldLogger, followDistanc
7272
logger.WithError(err2).Errorf("uncaught panic in BlockCache.runCacheCleanup subroutine: %v, stack: %v", err, string(debug.Stack()))
7373
}
7474
}()
75+
7576
cache.runCacheCleanup(ctx)
7677
}()
7778

@@ -202,6 +203,7 @@ func (cache *BlockCache) GetWallclock() *ethwallclock.EthereumBeaconChain {
202203

203204
func (cache *BlockCache) SetFinalizedCheckpoint(finalizedEpoch phase0.Epoch, finalizedRoot phase0.Root) {
204205
cache.finalizedMutex.Lock()
206+
205207
if finalizedEpoch <= cache.finalizedEpoch {
206208
cache.finalizedMutex.Unlock()
207209
return

pkg/coordinator/clients/consensus/clientlogic.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ func (client *Client) processBlock(root phase0.Root, slot phase0.Slot, header *p
299299
}
300300

301301
client.headMutex.Lock()
302+
302303
if bytes.Equal(client.headRoot[:], root[:]) {
303304
client.headMutex.Unlock()
304305
return nil
@@ -316,6 +317,7 @@ func (client *Client) processBlock(root phase0.Root, slot phase0.Slot, header *p
316317

317318
func (client *Client) setFinalizedHead(epoch phase0.Epoch, root phase0.Root) error {
318319
client.headMutex.Lock()
320+
319321
if bytes.Equal(client.finalizedRoot[:], root[:]) {
320322
client.headMutex.Unlock()
321323
return nil

pkg/coordinator/clients/consensus/forks.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type HeadFork struct {
1717
func (pool *Pool) resetHeadForkCache() {
1818
pool.forkCacheMutex.Lock()
1919
defer pool.forkCacheMutex.Unlock()
20+
2021
pool.forkCache = map[int64][]*HeadFork{}
2122
}
2223

pkg/coordinator/clients/consensus/rpc/beaconapi.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func (bc *BeaconClient) getJSON(ctx context.Context, requrl string, returnValue
117117
}
118118

119119
func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, returnValue interface{}) error {
120+
return bc.postJSONWithHeaders(ctx, requrl, postData, returnValue, nil)
121+
}
122+
123+
func (bc *BeaconClient) postJSONWithHeaders(ctx context.Context, requrl string, postData, returnValue interface{}, extraHeaders map[string]string) error {
120124
logurl := getRedactedURL(requrl)
121125

122126
postDataBytes, err := json.Marshal(postData)
@@ -125,8 +129,8 @@ func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, r
125129
}
126130

127131
reader := bytes.NewReader(postDataBytes)
128-
req, err := nethttp.NewRequestWithContext(ctx, "POST", requrl, reader)
129132

133+
req, err := nethttp.NewRequestWithContext(ctx, "POST", requrl, reader)
130134
if err != nil {
131135
return err
132136
}
@@ -137,6 +141,10 @@ func (bc *BeaconClient) postJSON(ctx context.Context, requrl string, postData, r
137141
req.Header.Set(headerKey, headerVal)
138142
}
139143

144+
for headerKey, headerVal := range extraHeaders {
145+
req.Header.Set(headerKey, headerVal)
146+
}
147+
140148
client := &nethttp.Client{Timeout: time.Second * 300}
141149

142150
resp, err := client.Do(req)
@@ -497,6 +505,42 @@ func (bc *BeaconClient) SubmitProposerSlashing(ctx context.Context, slashing *ph
497505
return nil
498506
}
499507

508+
type apiAttestationData struct {
509+
Data *phase0.AttestationData `json:"data"`
510+
}
511+
512+
func (bc *BeaconClient) GetAttestationData(ctx context.Context, slot, committeeIndex uint64) (*phase0.AttestationData, error) {
513+
var attestationData apiAttestationData
514+
515+
err := bc.getJSON(ctx, fmt.Sprintf("%s/eth/v1/validator/attestation_data?slot=%d&committee_index=%d", bc.endpoint, slot, committeeIndex), &attestationData)
516+
if err != nil {
517+
return nil, fmt.Errorf("error retrieving attestation data: %v", err)
518+
}
519+
520+
return attestationData.Data, nil
521+
}
522+
523+
// SingleAttestation represents the Electra single attestation format for the v2 API.
524+
type SingleAttestation struct {
525+
CommitteeIndex uint64 `json:"committee_index,string"`
526+
AttesterIndex uint64 `json:"attester_index,string"`
527+
Data *phase0.AttestationData `json:"data"`
528+
Signature string `json:"signature"`
529+
}
530+
531+
func (bc *BeaconClient) SubmitAttestations(ctx context.Context, attestations []*SingleAttestation) error {
532+
headers := map[string]string{
533+
"Eth-Consensus-Version": "electra",
534+
}
535+
536+
err := bc.postJSONWithHeaders(ctx, fmt.Sprintf("%s/eth/v2/beacon/pool/attestations", bc.endpoint), attestations, nil, headers)
537+
if err != nil {
538+
return err
539+
}
540+
541+
return nil
542+
}
543+
500544
type NodeIdentity struct {
501545
PeerID string `json:"peer_id"`
502546
ENR string `json:"enr"`

pkg/coordinator/clients/consensus/rpc/beaconstream.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func (bs *BeaconStream) startStream() {
8484
bs.ReadyChan <- true
8585
case err := <-stream.Errors:
8686
logger.WithField("client", bs.client.name).Warnf("beacon block stream error: %v", err)
87+
8788
select {
8889
case bs.ReadyChan <- false:
8990
case <-bs.ctx.Done():
@@ -136,8 +137,8 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst
136137
var stream *eventstream.Stream
137138

138139
streamURL := fmt.Sprintf("%s/eth/v1/events?topics=%v", endpoint, topics.String())
139-
req, err := http.NewRequestWithContext(bs.ctx, "GET", streamURL, http.NoBody)
140140

141+
req, err := http.NewRequestWithContext(bs.ctx, "GET", streamURL, http.NoBody)
141142
if err == nil {
142143
for headerKey, headerVal := range bs.client.headers {
143144
req.Header.Set(headerKey, headerVal)
@@ -148,6 +149,7 @@ func (bs *BeaconStream) subscribeStream(endpoint string, events uint16) *eventst
148149

149150
if err != nil {
150151
logger.WithField("client", bs.client.name).Warnf("Error while subscribing beacon event stream %v: %v", getRedactedURL(streamURL), err)
152+
151153
select {
152154
case <-bs.ctx.Done():
153155
return nil
@@ -163,11 +165,11 @@ func (bs *BeaconStream) processBlockEvent(evt eventsource.Event) {
163165
var parsed v1.BlockEvent
164166

165167
err := json.Unmarshal([]byte(evt.Data()), &parsed)
166-
167168
if err != nil {
168169
logger.WithField("client", bs.client.name).Warnf("beacon block stream failed to decode block event: %v", err)
169170
return
170171
}
172+
171173
bs.EventChan <- &BeaconStreamEvent{
172174
Event: StreamBlockEvent,
173175
Data: &parsed,

pkg/coordinator/clients/consensus/rpc/eventstream/eventstream.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,15 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) {
182182
ev, err := dec.Decode()
183183

184184
stream.closeMutex.Lock()
185+
185186
if stream.isClosed {
186187
stream.closeMutex.Unlock()
187188
return
188189
}
189190

190191
if err != nil {
191192
stream.Errors <- err
193+
192194
stream.closeMutex.Unlock()
193195

194196
return
@@ -209,6 +211,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) {
209211
}
210212

211213
stream.Events <- pub
214+
212215
stream.closeMutex.Unlock()
213216
}
214217
}
@@ -237,11 +240,14 @@ func (stream *Stream) retryRestartStream() {
237240
}
238241

239242
stream.closeMutex.Lock()
243+
240244
if stream.isClosed {
241245
stream.closeMutex.Unlock()
242246
return
243247
}
248+
244249
stream.Errors <- err
250+
245251
stream.closeMutex.Unlock()
246252

247253
backoff = 10 * time.Second

0 commit comments

Comments
 (0)