Skip to content

Commit 624a7a2

Browse files
committed
WIP: AI step
1 parent 4f878d9 commit 624a7a2

File tree

4 files changed

+98
-29
lines changed

4 files changed

+98
-29
lines changed

coordinator/conf/config_proxy.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{
22
"proxy_manager": {
33
"proxy_cli": {
4-
"proxy_name": "proxy_name"
4+
"proxy_name": "proxy_name",
5+
"secret": "client private key"
56
},
67
"auth": {
78
"secret": "proxy secret key",

coordinator/internal/config/proxy_config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ type ProxyManager struct {
1717
}
1818

1919
func (m *ProxyManager) Normalize() {
20-
if m.Client.Auth == nil {
21-
m.Client.Auth = m.Auth
20+
if m.Client.Secret == "" {
21+
m.Client.Secret = m.Auth.Secret
2222
}
2323

2424
if m.Client.ProxyVersion == "" {
@@ -30,7 +30,7 @@ func (m *ProxyManager) Normalize() {
3030
type ProxyClient struct {
3131
ProxyName string `json:"proxy_name"`
3232
ProxyVersion string `json:"proxy_version,omitempty"`
33-
Auth *Auth `json:"auth,omitempty"`
33+
Secret string `json:"secret,omitempty"`
3434
}
3535

3636
// Coordinator configuration

coordinator/internal/controller/proxy/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
type ClientHelper interface {
1818
GenLoginParam(string) (*types.LoginParameter, error)
19-
OnError(isUnauth bool)
19+
OnResp(*upClient, *http.Response)
2020
}
2121

2222
// Client wraps an http client with a preset host for coordinator API calls

coordinator/internal/controller/proxy/client_manager.go

Lines changed: 92 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@ import (
44
"context"
55
"crypto/ecdsa"
66
"fmt"
7+
"net/http"
78
"sync"
9+
"time"
810

9-
"github.com/gin-gonic/gin"
1011
"github.com/scroll-tech/go-ethereum/common"
1112
"github.com/scroll-tech/go-ethereum/crypto"
13+
"github.com/scroll-tech/go-ethereum/log"
1214

1315
"scroll-tech/coordinator/internal/config"
1416
"scroll-tech/coordinator/internal/types"
1517
)
1618

1719
type Client interface {
18-
Client(*gin.Context) *upClient
20+
Client(context.Context) *upClient
1921
}
2022

2123
type ClientManager struct {
@@ -25,9 +27,9 @@ type ClientManager struct {
2527

2628
cachedCli struct {
2729
sync.RWMutex
28-
cli *upClient
29-
completionCtx context.Context
30-
completionDone context.CancelFunc
30+
cli *upClient
31+
completionCtx context.Context
32+
resultChan chan *upClient
3133
}
3234
}
3335

@@ -52,7 +54,7 @@ func buildPrivateKey(inputBytes []byte) (*ecdsa.PrivateKey, error) {
5254

5355
func NewClientManager(cliCfg *config.ProxyClient, cfg *config.UpStream) (*ClientManager, error) {
5456

55-
privKey, err := buildPrivateKey([]byte(cliCfg.Auth.Secret))
57+
privKey, err := buildPrivateKey([]byte(cliCfg.Secret))
5658
if err != nil {
5759
return nil, err
5860
}
@@ -64,13 +66,35 @@ func NewClientManager(cliCfg *config.ProxyClient, cfg *config.UpStream) (*Client
6466
}, nil
6567
}
6668

67-
func (cliMgr *ClientManager) doLogin() *upClient {
68-
loginCli := newUpClient(cliMgr.cfg, cliMgr)
69+
func (cliMgr *ClientManager) doLogin(ctx context.Context, loginCli *upClient) time.Time {
70+
// Calculate wait time between 2 seconds and cfg.RetryWaitTime
71+
minWait := 2 * time.Second
72+
waitDuration := time.Duration(cliMgr.cfg.RetryWaitTime) * time.Second
73+
if waitDuration < minWait {
74+
waitDuration = minWait
75+
}
6976

70-
return loginCli
77+
for {
78+
log.Info("attempting login to upstream coordinator", "baseURL", cliMgr.cfg.BaseUrl)
79+
loginResult, err := loginCli.Login(ctx)
80+
if err == nil && loginResult != nil {
81+
log.Info("login to upstream coordinator successful", "baseURL", cliMgr.cfg.BaseUrl, "time", loginResult.Time)
82+
return loginResult.Time
83+
}
84+
log.Info("login to upstream coordinator failed, retrying", "baseURL", cliMgr.cfg.BaseUrl, "error", err, "waitDuration", waitDuration)
85+
86+
timer := time.NewTimer(waitDuration)
87+
select {
88+
case <-ctx.Done():
89+
timer.Stop()
90+
return time.Now()
91+
case <-timer.C:
92+
// Continue to next retry
93+
}
94+
}
7195
}
7296

73-
func (cliMgr *ClientManager) Client(ctx *gin.Context) *upClient {
97+
func (cliMgr *ClientManager) Client(ctx context.Context) *upClient {
7498
cliMgr.cachedCli.RLock()
7599
if cliMgr.cachedCli.cli != nil {
76100
defer cliMgr.cachedCli.RUnlock()
@@ -91,21 +115,54 @@ func (cliMgr *ClientManager) Client(ctx *gin.Context) *upClient {
91115
} else {
92116
// Set new completion context and launch login goroutine
93117
ctx, completionDone := context.WithCancel(context.TODO())
94-
cliMgr.cachedCli.completionCtx = ctx
118+
loginCli := newUpClient(cliMgr.cfg, cliMgr)
119+
cliMgr.cachedCli.completionCtx = context.WithValue(ctx, "cli", loginCli)
95120

96121
// Launch login goroutine
97122
go func() {
98123
defer completionDone()
124+
expiredT := cliMgr.doLogin(context.Background(), loginCli)
125+
126+
cliMgr.cachedCli.Lock()
127+
cliMgr.cachedCli.cli = loginCli
128+
cliMgr.cachedCli.completionCtx = nil
129+
130+
// Launch waiting thread to clear cached client before expiration
131+
go func() {
132+
now := time.Now()
133+
clearTime := expiredT.Add(-10 * time.Second) // 10s before expiration
134+
135+
// If clear time is too soon (less than 10s from now), set it to 10s from now
136+
if clearTime.Before(now.Add(10 * time.Second)) {
137+
clearTime = now.Add(10 * time.Second)
138+
log.Error("token expiration time is too close, delaying clear time",
139+
"baseURL", cliMgr.cfg.BaseUrl,
140+
"expiredT", expiredT,
141+
"adjustedClearTime", clearTime)
142+
}
143+
144+
waitDuration := time.Until(clearTime)
145+
log.Info("token expiration monitor started",
146+
"baseURL", cliMgr.cfg.BaseUrl,
147+
"expiredT", expiredT,
148+
"clearTime", clearTime,
149+
"waitDuration", waitDuration)
150+
151+
timer := time.NewTimer(waitDuration)
152+
select {
153+
case <-ctx.Done():
154+
timer.Stop()
155+
log.Info("token expiration monitor cancelled", "baseURL", cliMgr.cfg.BaseUrl)
156+
case <-timer.C:
157+
log.Info("clearing cached client before token expiration",
158+
"baseURL", cliMgr.cfg.BaseUrl,
159+
"expiredT", expiredT)
160+
cliMgr.clearCachedCli(loginCli)
161+
}
162+
}()
163+
164+
cliMgr.cachedCli.Unlock()
99165

100-
loginCli := cliMgr.doLogin()
101-
if loginResult, err := loginCli.Login(context.Background()); err == nil {
102-
loginCli.loginToken = loginResult.Token
103-
104-
cliMgr.cachedCli.Lock()
105-
cliMgr.cachedCli.cli = loginCli
106-
cliMgr.cachedCli.completionCtx = nil
107-
cliMgr.cachedCli.Unlock()
108-
}
109166
}()
110167
}
111168
cliMgr.cachedCli.Unlock()
@@ -115,15 +172,26 @@ func (cliMgr *ClientManager) Client(ctx *gin.Context) *upClient {
115172
case <-ctx.Done():
116173
return nil
117174
case <-completionCtx.Done():
118-
cliMgr.cachedCli.Lock()
119-
cli := cliMgr.cachedCli.cli
120-
cliMgr.cachedCli.Unlock()
175+
cli := completionCtx.Value("cli").(*upClient)
121176
return cli
122177
}
123178
}
124179

125-
func (cliMgr *ClientManager) OnError(isUnauth bool) {
180+
func (cliMgr *ClientManager) clearCachedCli(cli *upClient) {
181+
cliMgr.cachedCli.Lock()
182+
if cliMgr.cachedCli.cli == cli {
183+
cliMgr.cachedCli.cli = nil
184+
cliMgr.cachedCli.completionCtx = nil
185+
log.Info("cached client cleared due to forbidden response", "baseURL", cliMgr.cfg.BaseUrl)
186+
}
187+
cliMgr.cachedCli.Unlock()
188+
}
126189

190+
func (cliMgr *ClientManager) OnResp(cli *upClient, resp *http.Response) {
191+
if resp.StatusCode == http.StatusForbidden {
192+
log.Info("cached client cleared due to forbidden response", "baseURL", cliMgr.cfg.BaseUrl)
193+
cliMgr.clearCachedCli(cli)
194+
}
127195
}
128196

129197
func (cliMgr *ClientManager) GenLoginParam(challenge string) (*types.LoginParameter, error) {

0 commit comments

Comments
 (0)