Skip to content

Commit d730878

Browse files
committed
* Refactored internal client balancer: added singleton for getting gRPC-connection (auto dial and auto reconnect on non-ready state) for use in discovery attempts
1 parent 55b0abc commit d730878

File tree

3 files changed

+103
-43
lines changed

3 files changed

+103
-43
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Refactored internal client balancer: added singleton for getting gRPC-connection (auto dial and auto reconnect on non-ready state) for use in discovery attempts
12
* Added `topicoptions.IncludePartitionStats()` for `Topic().Describe()` in order to get partition stats from server
23

34
## v3.106.1

internal/balancer/balancer.go

Lines changed: 99 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/ydb-platform/ydb-go-genproto/Ydb_Discovery_V1"
1010
"google.golang.org/grpc"
11+
"google.golang.org/grpc/connectivity"
1112

1213
"github.com/ydb-platform/ydb-go-sdk/v3/config"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
@@ -40,7 +41,10 @@ type Balancer struct {
4041
pool *conn.Pool
4142
discoveryRepeater repeater.Repeater
4243

43-
discover func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error)
44+
address string
45+
cc atomic.Pointer[grpc.ClientConn]
46+
47+
discover func(context.Context, *grpc.ClientConn) (endpoints []endpoint.Endpoint, location string, err error)
4448
localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)
4549

4650
connectionsState atomic.Pointer[connectionsState]
@@ -51,7 +55,7 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
5155
return retry.Retry(
5256
repeater.WithEvent(ctx, repeater.EventInit),
5357
func(childCtx context.Context) (err error) {
54-
if err = b.clusterDiscoveryAttempt(childCtx); err != nil {
58+
if err = b.clusterDiscoveryAttemptWithDial(childCtx); err != nil {
5559
if credentials.IsAccessError(err) {
5660
return credentials.AccessError("cluster discovery failed", err,
5761
credentials.WithEndpoint(b.driverConfig.Endpoint()),
@@ -75,29 +79,96 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
7579
)
7680
}
7781

78-
func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
79-
var (
80-
address = "ydb:///" + b.driverConfig.Endpoint()
81-
onDone = trace.DriverOnBalancerClusterDiscoveryAttempt(
82-
b.driverConfig.Trace(), &ctx,
83-
stack.FunctionID(
84-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).clusterDiscoveryAttempt",
85-
),
86-
address,
87-
b.driverConfig.Database(),
82+
// conn is a singleton with auto reconnect logic
83+
func (b *Balancer) conn(ctx context.Context) (*grpc.ClientConn, error) {
84+
if cc := b.cc.Load(); cc != nil {
85+
if cc.GetState() == connectivity.Ready {
86+
return cc, nil
87+
}
88+
89+
if b.cc.CompareAndSwap(cc, nil) {
90+
cc.Close()
91+
}
92+
}
93+
94+
ctx, traceID, err := meta.TraceID(ctx)
95+
if err != nil {
96+
return nil, xerrors.WithStackTrace(err)
97+
}
98+
99+
ctx, err = b.driverConfig.Meta().Context(ctx)
100+
if err != nil {
101+
return nil, xerrors.WithStackTrace(
102+
fmt.Errorf("failed to enrich context with meta, traceID %q: %w", traceID, err),
88103
)
89-
)
90-
defer func() {
91-
onDone(err)
92-
}()
104+
}
93105

94106
if dialTimeout := b.driverConfig.DialTimeout(); dialTimeout > 0 {
95107
var cancel context.CancelFunc
96108
ctx, cancel = xcontext.WithTimeout(ctx, dialTimeout)
97109
defer cancel()
98110
}
99111

100-
endpoints, location, err := b.discover(ctx)
112+
//nolint:staticcheck,nolintlint
113+
cc, err := grpc.DialContext(ctx, b.address,
114+
append(
115+
b.driverConfig.GrpcDialOptions(),
116+
grpc.WithResolvers(
117+
xresolver.New("ydb", b.driverConfig.Trace()),
118+
),
119+
grpc.WithBlock(), //nolint:staticcheck,nolintlint
120+
)...,
121+
)
122+
if err != nil {
123+
return nil, xerrors.WithStackTrace(
124+
fmt.Errorf("failed to dial %q, traceID %q: %w", b.driverConfig.Endpoint(), traceID, err),
125+
)
126+
}
127+
128+
b.cc.Store(cc)
129+
130+
return cc, nil
131+
}
132+
133+
func (b *Balancer) clusterDiscoveryAttemptWithDial(ctx context.Context) (finalErr error) {
134+
onDone := trace.DriverOnBalancerClusterDiscoveryAttempt(
135+
b.driverConfig.Trace(), &ctx,
136+
stack.FunctionID(
137+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).clusterDiscoveryAttemptWithDial",
138+
),
139+
b.address,
140+
b.driverConfig.Database(),
141+
)
142+
defer func() {
143+
onDone(finalErr)
144+
}()
145+
146+
cc, err := b.conn(ctx)
147+
if err != nil {
148+
return xerrors.WithStackTrace(err)
149+
}
150+
151+
if err = b.clusterDiscoveryAttempt(ctx, cc); err != nil {
152+
return xerrors.WithStackTrace(err)
153+
}
154+
155+
return nil
156+
}
157+
158+
func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context, cc *grpc.ClientConn) (finalErr error) {
159+
onDone := trace.DriverOnBalancerClusterDiscoveryAttempt(
160+
b.driverConfig.Trace(), &ctx,
161+
stack.FunctionID(
162+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).clusterDiscoveryAttempt",
163+
),
164+
b.address,
165+
b.driverConfig.Database(),
166+
)
167+
defer func() {
168+
onDone(finalErr)
169+
}()
170+
171+
endpoints, location, err := b.discover(ctx, cc)
101172
if err != nil {
102173
return xerrors.WithStackTrace(err)
103174
}
@@ -171,51 +242,37 @@ func (b *Balancer) Close(ctx context.Context) (err error) {
171242
b.discoveryRepeater.Stop()
172243
}
173244

245+
if cc := b.cc.Load(); cc != nil {
246+
_ = cc.Close()
247+
}
248+
174249
return nil
175250
}
176251

177252
func makeDiscoveryFunc(
178253
driverConfig *config.Config, discoveryConfig *discoveryConfig.Config,
179-
) func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error) {
180-
return func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error) {
254+
) func(ctx context.Context, cc *grpc.ClientConn) (endpoints []endpoint.Endpoint, location string, err error) {
255+
return func(ctx context.Context, cc *grpc.ClientConn) (endpoints []endpoint.Endpoint, location string, err error) {
181256
ctx, traceID, err := meta.TraceID(ctx)
182-
if err != nil {
183-
return endpoints, location, xerrors.WithStackTrace(err)
184-
}
185-
186-
ctx, err = driverConfig.Meta().Context(ctx)
187257
if err != nil {
188258
return endpoints, location, xerrors.WithStackTrace(
189259
fmt.Errorf("failed to enrich context with meta, traceID %q: %w", traceID, err),
190260
)
191261
}
192262

193-
//nolint:staticcheck,nolintlint
194-
cc, err := grpc.DialContext(ctx,
195-
"ydb:///"+driverConfig.Endpoint(),
196-
append(
197-
driverConfig.GrpcDialOptions(),
198-
grpc.WithResolvers(
199-
xresolver.New("ydb", driverConfig.Trace()),
200-
),
201-
grpc.WithBlock(), //nolint:staticcheck,nolintlint
202-
)...,
203-
)
263+
ctx, err = driverConfig.Meta().Context(ctx)
204264
if err != nil {
205265
return endpoints, location, xerrors.WithStackTrace(
206-
fmt.Errorf("failed to dial %q, traceID %q: %w", driverConfig.Endpoint(), traceID, err),
266+
fmt.Errorf("failed to enrich context with meta, traceID %q: %w", traceID, err),
207267
)
208268
}
209-
defer func() {
210-
_ = cc.Close()
211-
}()
212269

213270
endpoints, location, err = internalDiscovery.Discover(ctx,
214271
Ydb_Discovery_V1.NewDiscoveryServiceClient(cc), discoveryConfig,
215272
)
216273
if err != nil {
217274
return endpoints, location, xerrors.WithStackTrace(
218-
fmt.Errorf("failed to discover database %q, address %q, traceID %q: %w",
275+
fmt.Errorf("failed to discover database %q (address %q, traceID %q): %w",
219276
driverConfig.Database(), driverConfig.Endpoint(), traceID, err,
220277
),
221278
)
@@ -239,6 +296,7 @@ func New(ctx context.Context, driverConfig *config.Config, pool *conn.Pool, opts
239296
b = &Balancer{
240297
driverConfig: driverConfig,
241298
pool: pool,
299+
address: "ydb:///" + driverConfig.Endpoint(),
242300
discoveryConfig: discoveryConfig.New(append(opts,
243301
discoveryConfig.With(driverConfig.Common),
244302
discoveryConfig.WithEndpoint(driverConfig.Endpoint()),
@@ -269,7 +327,7 @@ func New(ctx context.Context, driverConfig *config.Config, pool *conn.Pool, opts
269327
// run background discovering
270328
if d := b.discoveryConfig.Interval(); d > 0 {
271329
b.discoveryRepeater = repeater.New(xcontext.ValueOnly(ctx),
272-
d, b.clusterDiscoveryAttempt,
330+
d, b.clusterDiscoveryAttemptWithDial,
273331
repeater.WithName("discovery"),
274332
repeater.WithTrace(b.driverConfig.Trace()),
275333
)

internal/balancer/local_dc_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77

88
"github.com/stretchr/testify/require"
9+
"google.golang.org/grpc"
910

1011
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/config"
@@ -137,7 +138,7 @@ func TestLocalDCDiscovery(t *testing.T) {
137138
driverConfig: cfg,
138139
balancerConfig: *cfg.Balancer(),
139140
pool: conn.NewPool(context.Background(), cfg),
140-
discover: func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error) {
141+
discover: func(ctx context.Context, _ *grpc.ClientConn) (endpoints []endpoint.Endpoint, location string, err error) {
141142
return []endpoint.Endpoint{
142143
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
143144
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},
@@ -149,7 +150,7 @@ func TestLocalDCDiscovery(t *testing.T) {
149150
},
150151
}
151152

152-
err := r.clusterDiscoveryAttempt(ctx)
153+
err := r.clusterDiscoveryAttempt(ctx, nil)
153154
require.NoError(t, err)
154155

155156
for i := 0; i < 100; i++ {

0 commit comments

Comments
 (0)