From 8dc4a0c4a06861cb7e88cd3903dcbd2d0b73e575 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 6 Nov 2025 15:54:10 -0300 Subject: [PATCH] chainrpc: retry notifier RPCs during startup lag lnd v0.20.0-rc3 delays ChainNotifier startup which causes Loop to hit "chain notifier RPC is still in the process of starting" during initial subscriptions (LND commit c6f458e478f9ef2cf1d394972bfbc512862c6707). Add a shared retry helper in lndclient so block epoch, confirmation and spend registrations transparently retry until the sub-server is ready, along with regression tests covering the behavior. With lnd PR https://github.com/lightningnetwork/lnd/pull/10352 the server now returns gRPC codes.Unavailable instead of codes.Unknown, so the helper accepts either signal (status code or a string). --- chainnotifier_client.go | 145 ++++++++++++++++++--- chainnotifier_client_test.go | 242 +++++++++++++++++++++++++++++++++++ 2 files changed, 372 insertions(+), 15 deletions(-) create mode 100644 chainnotifier_client_test.go diff --git a/chainnotifier_client.go b/chainnotifier_client.go index dc92f3e..21fc124 100644 --- a/chainnotifier_client.go +++ b/chainnotifier_client.go @@ -3,6 +3,7 @@ package lndclient import ( "context" "fmt" + "strings" "sync" "time" @@ -11,6 +12,8 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lnrpc/chainrpc" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // NotifierOptions is a set of functional options that allow callers to further @@ -38,6 +41,19 @@ func DefaultNotifierOptions() *NotifierOptions { // events received from the notifier. type NotifierOption func(*NotifierOptions) +const ( + // chainNotifierStartupMessage matches the error string returned by lnd + // v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the + // sub-server finishes initialization. + chainNotifierStartupMessage = "chain notifier RPC is still in the " + + "process of starting" + + // chainNotifierRetryBackoff defines the delay between successive + // subscription attempts while waiting for the ChainNotifier sub-server + // to become operational. + chainNotifierRetryBackoff = 500 * time.Millisecond +) + // WithIncludeBlock is an optional argument that allows the caller to specify // that the block that mined a transaction should be included in the response. func WithIncludeBlock() NotifierOption { @@ -133,11 +149,23 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context, } } - macaroonAuth := s.chainMac.WithMacaroonAuth(ctx) - resp, err := s.client.RegisterSpendNtfn(macaroonAuth, &chainrpc.SpendRequest{ - HeightHint: uint32(heightHint), - Outpoint: rpcOutpoint, - Script: pkScript, + var ( + resp chainrpc.ChainNotifier_RegisterSpendNtfnClient + err error + ) + + // lnd v0.20.0-rc3 changed the startup ordering so the ChainNotifier + // sub-server can report "still starting" for a short window. Retry the + // registration in that case to avoid aborting clients that subscribe + // immediately at startup. + err = s.retryChainNotifierCall(ctx, func() error { + macaroonAuth := s.chainMac.WithMacaroonAuth(ctx) + resp, err = s.client.RegisterSpendNtfn(macaroonAuth, &chainrpc.SpendRequest{ + HeightHint: uint32(heightHint), + Outpoint: rpcOutpoint, + Script: pkScript, + }) + return err }) if err != nil { return nil, nil, err @@ -251,15 +279,25 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context, if txid != nil { txidSlice = txid[:] } - confStream, err := s.client.RegisterConfirmationsNtfn( - s.chainMac.WithMacaroonAuth(ctx), &chainrpc.ConfRequest{ - Script: pkScript, - NumConfs: uint32(numConfs), - HeightHint: uint32(heightHint), - Txid: txidSlice, - IncludeBlock: opts.IncludeBlock, - }, + var ( + confStream chainrpc.ChainNotifier_RegisterConfirmationsNtfnClient + err error ) + // The confirmation RPC is also subject to the post-v0.20.0-rc3 startup + // ordering change, so we retry here until lnd reports the sub-server + // ready. + err = s.retryChainNotifierCall(ctx, func() error { + confStream, err = s.client.RegisterConfirmationsNtfn( + s.chainMac.WithMacaroonAuth(ctx), &chainrpc.ConfRequest{ + Script: pkScript, + NumConfs: uint32(numConfs), + HeightHint: uint32(heightHint), + Txid: txidSlice, + IncludeBlock: opts.IncludeBlock, + }, + ) + return err + }) if err != nil { return nil, nil, err } @@ -362,9 +400,18 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context, func (s *chainNotifierClient) RegisterBlockEpochNtfn(ctx context.Context) ( chan int32, chan error, error) { - blockEpochClient, err := s.client.RegisterBlockEpochNtfn( - s.chainMac.WithMacaroonAuth(ctx), &chainrpc.BlockEpoch{}, + var ( + blockEpochClient chainrpc.ChainNotifier_RegisterBlockEpochNtfnClient + err error ) + // Block epoch subscriptions similarly need to survive the "still + // starting" period introduced in lnd v0.20.0-rc3. + err = s.retryChainNotifierCall(ctx, func() error { + blockEpochClient, err = s.client.RegisterBlockEpochNtfn( + s.chainMac.WithMacaroonAuth(ctx), &chainrpc.BlockEpoch{}, + ) + return err + }) if err != nil { return nil, nil, err } @@ -393,3 +440,71 @@ func (s *chainNotifierClient) RegisterBlockEpochNtfn(ctx context.Context) ( return blockEpochChan, blockErrorChan, nil } + +// retryChainNotifierCall executes the passed RPC invocation, retrying while +// lnd reports that the ChainNotifier sub-server is still initialising. +// +// Prior to v0.20.0-rc3 the ChainNotifier sub-server finished initialization +// before dependent services started, so a single RPC attempt succeeded. From +// rc3 (LND commit c6f458e478f9ef2cf1d394972bfbc512862c6707) onwards lnd starts +// the notifier later in the daemon lifecycle to avoid rescans from stale +// heights. During the brief gap between client connection and notifier +// readiness lnd returns the string "chain notifier RPC is still in the process +// of starting" wrapped in an Unknown gRPC status. Clients that interact with +// lnd immediately after it connects - such as Loop during integration testing - +// would previously treat that error as fatal and abort startup, even though +// retrying shortly after would succeed. +// +// This helper centralises the retry policy: when the specific "still starting" +// error is encountered we back off briefly and reissue the RPC. Non-startup +// errors are returned to the caller unchanged, and the caller's context +// controls the overall deadline so shutdown conditions are respected. +func (s *chainNotifierClient) retryChainNotifierCall(ctx context.Context, + call func() error) error { + + for { + err := call() + if err == nil { + return nil + } + + if !isChainNotifierStartingErr(err) { + return err + } + + log.Warnf("Chain notifier RPC not ready yet, retrying: %v", err) + + select { + case <-time.After(chainNotifierRetryBackoff): + continue + + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// detectChainNotifierStartupError reports whether err is due to the lnd +// ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3 +// the notifier is initialised later in the daemon lifecycle, and the RPC layer +// surfaces this as an Unknown gRPC status that contains the message defined in +// chainNotifierStartupMessage. There is a PR in LND to return code Unavailable +// instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352 +func isChainNotifierStartingErr(err error) bool { + if err == nil { + return false + } + + // gRPC code Unavailable means "the server can't handle this request + // now, retry later". LND's chain notifier returns this error when + // the server is starting. + // See https://github.com/lightningnetwork/lnd/pull/10352 + st, ok := status.FromError(err) + if ok && st.Code() == codes.Unavailable { + return true + } + + // TODO(ln-v0.20.0) remove the string fallback once lndclient depends on + // a version of lnd that returns codes.Unavailable for this condition. + return strings.Contains(err.Error(), chainNotifierStartupMessage) +} diff --git a/chainnotifier_client_test.go b/chainnotifier_client_test.go new file mode 100644 index 0000000..ad7bed9 --- /dev/null +++ b/chainnotifier_client_test.go @@ -0,0 +1,242 @@ +package lndclient + +import ( + "context" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnrpc/chainrpc" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// stubChainNotifierClient implements chainrpc.ChainNotifierClient with retry +// aware behavior for testing. +type stubChainNotifierClient struct { + chainrpc.ChainNotifierClient + + blockAttempts int + blockSucceedAfter int + confirmAttempts int + confirmSucceedAfter int + spendAttempts int + spendSucceedAfter int +} + +func (s *stubChainNotifierClient) RegisterBlockEpochNtfn(ctx context.Context, + _ *chainrpc.BlockEpoch, _ ...grpc.CallOption) ( + chainrpc.ChainNotifier_RegisterBlockEpochNtfnClient, error) { + + s.blockAttempts++ + if s.blockAttempts <= s.blockSucceedAfter { + return nil, status.Error( + codes.Unknown, chainNotifierStartupMessage, + ) + } + + return &stubBlockEpochStream{ + stubClientStream: newStubClientStream(ctx), + }, nil +} + +func (s *stubChainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context, + _ *chainrpc.ConfRequest, _ ...grpc.CallOption) ( + chainrpc.ChainNotifier_RegisterConfirmationsNtfnClient, error) { + + s.confirmAttempts++ + if s.confirmAttempts <= s.confirmSucceedAfter { + return nil, status.Error( + codes.Unknown, chainNotifierStartupMessage, + ) + } + + return &stubConfirmationsStream{ + stubClientStream: newStubClientStream(ctx), + }, nil +} + +func (s *stubChainNotifierClient) RegisterSpendNtfn(ctx context.Context, + _ *chainrpc.SpendRequest, _ ...grpc.CallOption) ( + chainrpc.ChainNotifier_RegisterSpendNtfnClient, error) { + + s.spendAttempts++ + if s.spendAttempts <= s.spendSucceedAfter { + return nil, status.Error( + codes.Unknown, chainNotifierStartupMessage, + ) + } + + return &stubSpendStream{ + stubClientStream: newStubClientStream(ctx), + }, nil +} + +// stubClientStream is a minimal grpc.ClientStream implementation that respects +// context cancellation. +type stubClientStream struct { + ctx context.Context +} + +func newStubClientStream(ctx context.Context) *stubClientStream { + return &stubClientStream{ctx: ctx} +} + +func (s *stubClientStream) Header() (metadata.MD, error) { + return nil, nil +} + +func (s *stubClientStream) Trailer() metadata.MD { + return nil +} + +func (s *stubClientStream) CloseSend() error { + return nil +} + +func (s *stubClientStream) Context() context.Context { + return s.ctx +} + +func (s *stubClientStream) SendMsg(interface{}) error { + return nil +} + +func (s *stubClientStream) RecvMsg(interface{}) error { + <-s.ctx.Done() + + return s.ctx.Err() +} + +type stubBlockEpochStream struct { + *stubClientStream +} + +func (s *stubBlockEpochStream) Recv() (*chainrpc.BlockEpoch, error) { + <-s.Context().Done() + + return nil, s.Context().Err() +} + +type stubConfirmationsStream struct { + *stubClientStream +} + +func (s *stubConfirmationsStream) Recv() (*chainrpc.ConfEvent, error) { + <-s.Context().Done() + + return nil, s.Context().Err() +} + +type stubSpendStream struct { + *stubClientStream +} + +func (s *stubSpendStream) Recv() (*chainrpc.SpendEvent, error) { + <-s.Context().Done() + + return nil, s.Context().Err() +} + +// TestRegisterBlockEpochNtfnRetries ensures block epoch subscriptions retry +// until the notifier RPC is ready. +func TestRegisterBlockEpochNtfnRetries(t *testing.T) { + t.Parallel() + + stub := &stubChainNotifierClient{ + blockSucceedAfter: 1, + } + + client := &chainNotifierClient{ + client: stub, + chainMac: serializedMacaroon("test"), + timeout: time.Second, + } + + ctx, cancel := context.WithCancel(context.Background()) + blockChan, errChan, err := client.RegisterBlockEpochNtfn(ctx) + require.NoError(t, err) + require.NotNil(t, blockChan) + require.NotNil(t, errChan) + require.Equal(t, 2, stub.blockAttempts) + + cancel() +} + +// TestRegisterConfirmationsNtfnRetries ensures confirmation subscriptions retry +// until lnd exposes the notifier RPC. +func TestRegisterConfirmationsNtfnRetries(t *testing.T) { + t.Parallel() + + stub := &stubChainNotifierClient{ + confirmSucceedAfter: 2, + } + + client := &chainNotifierClient{ + client: stub, + chainMac: serializedMacaroon("test"), + timeout: time.Second, + } + + ctx, cancel := context.WithCancel(context.Background()) + confChan, errChan, err := client.RegisterConfirmationsNtfn( + ctx, nil, nil, 1, 0, + ) + require.NoError(t, err) + require.NotNil(t, confChan) + require.NotNil(t, errChan) + require.Equal(t, 3, stub.confirmAttempts) + + cancel() +} + +// TestRegisterSpendNtfnRetries ensures spend subscriptions retry until the +// notifier RPC is ready. +func TestRegisterSpendNtfnRetries(t *testing.T) { + t.Parallel() + + stub := &stubChainNotifierClient{ + spendSucceedAfter: 1, + } + + client := &chainNotifierClient{ + client: stub, + chainMac: serializedMacaroon("test"), + timeout: time.Second, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + spendChan, errChan, err := client.RegisterSpendNtfn(ctx, nil, nil, 0) + require.NoError(t, err) + require.NotNil(t, spendChan) + require.NotNil(t, errChan) + require.Equal(t, 2, stub.spendAttempts) +} + +// TestIsChainNotifierStartingErr ensures we correctly detect the startup lag +// error returned by lnd v0.20.0-rc3+. +func TestIsChainNotifierStartingErr(t *testing.T) { + t.Parallel() + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unavailable, chainNotifierStartupMessage), + )) + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unknown, chainNotifierStartupMessage), + )) + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unavailable, "some other error"), + )) + + require.False(t, isChainNotifierStartingErr(nil)) + + require.False(t, isChainNotifierStartingErr( + status.Error(codes.Unknown, "some other error"), + )) +}