Skip to content
This repository was archived by the owner on May 31, 2023. It is now read-only.

Commit 0fb8754

Browse files
author
Richard Patel
authored
more godoc comments (#3)
1 parent 7a99188 commit 0fb8754

File tree

5 files changed

+211
-151
lines changed

5 files changed

+211
-151
lines changed

client.go

Lines changed: 2 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -15,134 +15,20 @@
1515
package pyth
1616

1717
import (
18-
"context"
19-
"errors"
20-
"time"
21-
22-
"github.com/cenkalti/backoff/v4"
2318
"github.com/gagliardetto/solana-go"
2419
"github.com/gagliardetto/solana-go/rpc"
25-
"github.com/gagliardetto/solana-go/rpc/ws"
2620
"go.uber.org/zap"
2721
)
2822

23+
// Client interacts with Pyth via Solana's JSON-RPC API.
2924
type Client struct {
3025
Opts
3126

3227
Log *zap.Logger
28+
RPC *rpc.Client
3329
WebSocketURL string
3430
}
3531

3632
type Opts struct {
3733
ProgramKey solana.PublicKey
3834
}
39-
40-
type PriceAccountUpdate struct {
41-
Slot uint64
42-
*PriceAccount
43-
}
44-
45-
// StreamPriceAccounts sends an update to Prometheus any time a Pyth oracle account changes.
46-
func (c *Client) StreamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
47-
const retryInterval = 3 * time.Second
48-
return backoff.Retry(func() error {
49-
err := c.streamPriceAccounts(ctx, updates)
50-
switch {
51-
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
52-
return backoff.Permanent(err)
53-
default:
54-
return err
55-
}
56-
}, backoff.NewConstantBackOff(retryInterval))
57-
}
58-
59-
func (c *Client) streamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
60-
client, err := ws.Connect(ctx, c.WebSocketURL)
61-
if err != nil {
62-
return err
63-
}
64-
defer client.Close()
65-
66-
metricsWsActiveConns.Inc()
67-
defer metricsWsActiveConns.Dec()
68-
69-
sub, err := client.ProgramSubscribeWithOpts(
70-
c.Opts.ProgramKey,
71-
rpc.CommitmentConfirmed,
72-
solana.EncodingBase64Zstd,
73-
[]rpc.RPCFilter{
74-
{
75-
Memcmp: &rpc.RPCFilterMemcmp{
76-
Offset: 0,
77-
Bytes: solana.Base58{
78-
0xd4, 0xc3, 0xb2, 0xa1, // Magic
79-
0x02, 0x00, 0x00, 0x00, // V2
80-
},
81-
},
82-
},
83-
},
84-
)
85-
if err != nil {
86-
return err
87-
}
88-
89-
// Stream updates.
90-
for {
91-
if err := c.readNextUpdate(ctx, sub, updates); err != nil {
92-
return err
93-
}
94-
}
95-
}
96-
97-
func (c *Client) readNextUpdate(
98-
ctx context.Context,
99-
sub *ws.ProgramSubscription,
100-
updates chan<- PriceAccountUpdate,
101-
) error {
102-
// If no update comes in within 20 seconds, bail.
103-
const readTimeout = 20 * time.Second
104-
ctx, cancel := context.WithTimeout(ctx, readTimeout)
105-
defer cancel()
106-
go func() {
107-
<-ctx.Done()
108-
// Terminate subscription if above timer has expired.
109-
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
110-
c.Log.Warn("Read deadline exceeded, terminating WebSocket connection",
111-
zap.Duration("timeout", readTimeout))
112-
sub.Unsubscribe()
113-
}
114-
}()
115-
116-
// Read next account update from WebSockets.
117-
update, err := sub.Recv()
118-
if err != nil {
119-
return err
120-
}
121-
metricsWsEventsTotal.Inc()
122-
123-
// Decode update.
124-
if update.Value.Account.Owner != c.Opts.ProgramKey {
125-
return nil
126-
}
127-
accountData := update.Value.Account.Data.GetBinary()
128-
if PeekAccount(accountData) != AccountTypePrice {
129-
return nil
130-
}
131-
priceAcc := new(PriceAccount)
132-
if err := priceAcc.UnmarshalBinary(accountData); err != nil {
133-
c.Log.Warn("Failed to unmarshal priceAcc account", zap.Error(err))
134-
return nil
135-
}
136-
137-
// Send update to channel.
138-
msg := PriceAccountUpdate{
139-
Slot: update.Context.Slot,
140-
PriceAccount: priceAcc,
141-
}
142-
select {
143-
case <-ctx.Done():
144-
return ctx.Err()
145-
case updates <- msg:
146-
return nil
147-
}
148-
}

prices.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package pyth
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/cenkalti/backoff/v4"
9+
"github.com/gagliardetto/solana-go"
10+
"github.com/gagliardetto/solana-go/rpc"
11+
"github.com/gagliardetto/solana-go/rpc/ws"
12+
"go.uber.org/zap"
13+
)
14+
15+
// GetPriceAccount retrieves a price account from the blockchain.
16+
func (c *Client) GetPriceAccount(ctx context.Context, productKey solana.PublicKey) (*PriceAccount, error) {
17+
accountInfo, err := c.RPC.GetAccountInfo(ctx, productKey)
18+
if err != nil {
19+
return nil, err
20+
}
21+
accountData := accountInfo.Value.Data.GetBinary()
22+
23+
price := new(PriceAccount)
24+
if err := price.UnmarshalBinary(accountData); err != nil {
25+
return nil, err
26+
}
27+
return price, nil
28+
}
29+
30+
type PriceAccountUpdate struct {
31+
Slot uint64
32+
*PriceAccount
33+
}
34+
35+
// StreamPriceAccounts sends an update to Prometheus any time a Pyth oracle account changes.
36+
func (c *Client) StreamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
37+
const retryInterval = 3 * time.Second
38+
return backoff.Retry(func() error {
39+
err := c.streamPriceAccounts(ctx, updates)
40+
switch {
41+
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
42+
return backoff.Permanent(err)
43+
default:
44+
return err
45+
}
46+
}, backoff.NewConstantBackOff(retryInterval))
47+
}
48+
49+
func (c *Client) streamPriceAccounts(ctx context.Context, updates chan<- PriceAccountUpdate) error {
50+
client, err := ws.Connect(ctx, c.WebSocketURL)
51+
if err != nil {
52+
return err
53+
}
54+
defer client.Close()
55+
56+
metricsWsActiveConns.Inc()
57+
defer metricsWsActiveConns.Dec()
58+
59+
sub, err := client.ProgramSubscribeWithOpts(
60+
c.Opts.ProgramKey,
61+
rpc.CommitmentConfirmed,
62+
solana.EncodingBase64Zstd,
63+
[]rpc.RPCFilter{
64+
{
65+
Memcmp: &rpc.RPCFilterMemcmp{
66+
Offset: 0,
67+
Bytes: solana.Base58{
68+
0xd4, 0xc3, 0xb2, 0xa1, // Magic
69+
0x02, 0x00, 0x00, 0x00, // V2
70+
},
71+
},
72+
},
73+
},
74+
)
75+
if err != nil {
76+
return err
77+
}
78+
79+
// Stream updates.
80+
for {
81+
if err := c.readNextUpdate(ctx, sub, updates); err != nil {
82+
return err
83+
}
84+
}
85+
}
86+
87+
func (c *Client) readNextUpdate(
88+
ctx context.Context,
89+
sub *ws.ProgramSubscription,
90+
updates chan<- PriceAccountUpdate,
91+
) error {
92+
// If no update comes in within 20 seconds, bail.
93+
const readTimeout = 20 * time.Second
94+
ctx, cancel := context.WithTimeout(ctx, readTimeout)
95+
defer cancel()
96+
go func() {
97+
<-ctx.Done()
98+
// Terminate subscription if above timer has expired.
99+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
100+
c.Log.Warn("Read deadline exceeded, terminating WebSocket connection",
101+
zap.Duration("timeout", readTimeout))
102+
sub.Unsubscribe()
103+
}
104+
}()
105+
106+
// Read next account update from WebSockets.
107+
update, err := sub.Recv()
108+
if err != nil {
109+
return err
110+
}
111+
metricsWsEventsTotal.Inc()
112+
113+
// Decode update.
114+
if update.Value.Account.Owner != c.Opts.ProgramKey {
115+
return nil
116+
}
117+
accountData := update.Value.Account.Data.GetBinary()
118+
if PeekAccount(accountData) != AccountTypePrice {
119+
return nil
120+
}
121+
priceAcc := new(PriceAccount)
122+
if err := priceAcc.UnmarshalBinary(accountData); err != nil {
123+
c.Log.Warn("Failed to unmarshal priceAcc account", zap.Error(err))
124+
return nil
125+
}
126+
127+
// Send update to channel.
128+
msg := PriceAccountUpdate{
129+
Slot: update.Context.Slot,
130+
PriceAccount: priceAcc,
131+
}
132+
select {
133+
case <-ctx.Done():
134+
return ctx.Err()
135+
case updates <- msg:
136+
return nil
137+
}
138+
}

products.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package pyth
2+
3+
import (
4+
"context"
5+
6+
"github.com/gagliardetto/solana-go"
7+
)
8+
9+
// GetProductAccount retrieves a product account from the blockchain.
10+
func (c *Client) GetProductAccount(ctx context.Context, productKey solana.PublicKey) (*ProductAccount, error) {
11+
accountInfo, err := c.RPC.GetAccountInfo(ctx, productKey)
12+
if err != nil {
13+
return nil, err
14+
}
15+
accountData := accountInfo.Value.Data.GetBinary()
16+
17+
product := new(ProductAccount)
18+
if err := product.UnmarshalBinary(accountData); err != nil {
19+
return nil, err
20+
}
21+
return product, nil
22+
}

0 commit comments

Comments
 (0)