From 67cac5bf86b3a5f83a2c5f1a4f9af6a9da125993 Mon Sep 17 00:00:00 2001 From: ziggie Date: Mon, 20 Oct 2025 18:30:58 +0200 Subject: [PATCH 01/11] multi: thread context through DeletePayment --- payments/db/interface.go | 3 ++- payments/db/kv_store.go | 4 ++-- payments/db/payment_test.go | 36 ++++++++++++++++++++++++++---------- payments/db/sql_store.go | 4 +--- rpcserver.go | 2 +- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 7fefad08917..caf222b7ae0 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -31,7 +31,8 @@ type PaymentReader interface { // database. type PaymentWriter interface { // DeletePayment deletes a payment from the DB given its payment hash. - DeletePayment(paymentHash lntypes.Hash, failedAttemptsOnly bool) error + DeletePayment(ctx context.Context, paymentHash lntypes.Hash, + failedAttemptsOnly bool) error // DeletePayments deletes all payments from the DB given the specified // flags. diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 84946841b9b..138edb6fc49 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -295,7 +295,7 @@ func (p *KVStore) DeleteFailedAttempts(hash lntypes.Hash) error { // logic. This decision should be made in the application layer. if !p.keepFailedPaymentAttempts { const failedHtlcsOnly = true - err := p.DeletePayment(hash, failedHtlcsOnly) + err := p.DeletePayment(context.TODO(), hash, failedHtlcsOnly) if err != nil { return err } @@ -1275,7 +1275,7 @@ func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash, // DeletePayment deletes a payment from the DB given its payment hash. If // failedHtlcsOnly is set, only failed HTLC attempts of the payment will be // deleted. -func (p *KVStore) DeletePayment(paymentHash lntypes.Hash, +func (p *KVStore) DeletePayment(_ context.Context, paymentHash lntypes.Hash, failedHtlcsOnly bool) error { return kvdb.Update(p.db, func(tx kvdb.RwTx) error { diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index ddba0e0285c..1180cf95374 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -511,8 +511,8 @@ func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) { // operation are performed in general therefore we do NOT expect an // error in this case. if keepFailedPaymentAttempts { - require.NoError( - t, paymentDB.DeleteFailedAttempts(payments[1].id), + require.NoError(t, paymentDB.DeleteFailedAttempts( + payments[1].id), ) } else { require.Error(t, paymentDB.DeleteFailedAttempts(payments[1].id)) @@ -656,6 +656,8 @@ func TestMPPRecordValidation(t *testing.T) { func TestDeleteSinglePayment(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB, _ := NewTestDB(t) // Register four payments: @@ -687,7 +689,9 @@ func TestDeleteSinglePayment(t *testing.T) { assertDBPayments(t, paymentDB, payments) // Delete HTLC attempts for first payment only. - require.NoError(t, paymentDB.DeletePayment(payments[0].id, true)) + require.NoError(t, paymentDB.DeletePayment( + ctx, payments[0].id, true, + )) // The first payment is the only altered one as its failed HTLC should // have been removed but is still present as payment. @@ -695,19 +699,25 @@ func TestDeleteSinglePayment(t *testing.T) { assertDBPayments(t, paymentDB, payments) // Delete the first payment completely. - require.NoError(t, paymentDB.DeletePayment(payments[0].id, false)) + require.NoError(t, paymentDB.DeletePayment( + ctx, payments[0].id, false, + )) // The first payment should have been deleted. assertDBPayments(t, paymentDB, payments[1:]) // Now delete the second payment completely. - require.NoError(t, paymentDB.DeletePayment(payments[1].id, false)) + require.NoError(t, paymentDB.DeletePayment( + ctx, payments[1].id, false, + )) // The Second payment should have been deleted. assertDBPayments(t, paymentDB, payments[2:]) // Delete failed HTLC attempts for the third payment. - require.NoError(t, paymentDB.DeletePayment(payments[2].id, true)) + require.NoError(t, paymentDB.DeletePayment( + ctx, payments[2].id, true, + )) // Only the successful HTLC attempt should be left for the third // payment. @@ -715,21 +725,27 @@ func TestDeleteSinglePayment(t *testing.T) { assertDBPayments(t, paymentDB, payments[2:]) // Now delete the third payment completely. - require.NoError(t, paymentDB.DeletePayment(payments[2].id, false)) + require.NoError(t, paymentDB.DeletePayment( + ctx, payments[2].id, false, + )) // Only the last payment should be left. assertDBPayments(t, paymentDB, payments[3:]) // Deleting HTLC attempts from InFlight payments should not work and an // error returned. - require.Error(t, paymentDB.DeletePayment(payments[3].id, true)) + require.Error(t, paymentDB.DeletePayment( + ctx, payments[3].id, true, + )) // The payment is InFlight and therefore should not have been altered. assertDBPayments(t, paymentDB, payments[3:]) // Finally deleting the InFlight payment should also not work and an // error returned. - require.Error(t, paymentDB.DeletePayment(payments[3].id, false)) + require.Error(t, paymentDB.DeletePayment( + ctx, payments[3].id, false, + )) // The payment is InFlight and therefore should not have been altered. assertDBPayments(t, paymentDB, payments[3:]) @@ -2597,7 +2613,7 @@ func TestQueryPayments(t *testing.T) { // We delete the whole payment. err = paymentDB.DeletePayment( - paymentInfos[1].PaymentIdentifier, false, + ctx, paymentInfos[1].PaymentIdentifier, false, ) require.NoError(t, err) diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index 0109ca1afa1..f6f5d0d37ad 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1206,11 +1206,9 @@ func computePaymentStatusFromDB(ctx context.Context, cfg *sqldb.QueryConfig, // // This method is part of the PaymentWriter interface, which is embedded in // the DB interface. -func (s *SQLStore) DeletePayment(paymentHash lntypes.Hash, +func (s *SQLStore) DeletePayment(ctx context.Context, paymentHash lntypes.Hash, failedHtlcsOnly bool) error { - ctx := context.TODO() - err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { dbPayment, err := fetchPaymentByHash(ctx, db, paymentHash) if err != nil { diff --git a/rpcserver.go b/rpcserver.go index ee810d1e1ef..39f22a86c62 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7678,7 +7678,7 @@ func (r *rpcServer) DeletePayment(ctx context.Context, rpcsLog.Infof("[DeletePayment] payment_identifier=%v, "+ "failed_htlcs_only=%v", hash, req.FailedHtlcsOnly) - err = r.server.paymentsDB.DeletePayment(hash, req.FailedHtlcsOnly) + err = r.server.paymentsDB.DeletePayment(ctx, hash, req.FailedHtlcsOnly) if err != nil { return nil, err } From f2b5035ea7cb761f5d3f0936a73a003d028a82fb Mon Sep 17 00:00:00 2001 From: ziggie Date: Mon, 20 Oct 2025 19:04:14 +0200 Subject: [PATCH 02/11] multi: thread context through DeletePayments --- payments/db/interface.go | 3 ++- payments/db/kv_store.go | 2 +- payments/db/kv_store_test.go | 6 ++++-- payments/db/payment_test.go | 10 ++++++---- payments/db/sql_store.go | 10 +++++----- rpcserver.go | 2 +- 6 files changed, 19 insertions(+), 14 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index caf222b7ae0..c6f1bf35312 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -36,7 +36,8 @@ type PaymentWriter interface { // DeletePayments deletes all payments from the DB given the specified // flags. - DeletePayments(failedOnly, failedAttemptsOnly bool) (int, error) + DeletePayments(ctx context.Context, failedOnly, + failedAttemptsOnly bool) (int, error) PaymentControl } diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 138edb6fc49..d3d347afe97 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -1372,7 +1372,7 @@ func (p *KVStore) DeletePayment(_ context.Context, paymentHash lntypes.Hash, // failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC // attempts. The method returns the number of deleted payments, which is always // 0 if failedHtlcsOnly is set. -func (p *KVStore) DeletePayments(failedOnly, +func (p *KVStore) DeletePayments(_ context.Context, failedOnly, failedHtlcsOnly bool) (int, error) { var numPayments int diff --git a/payments/db/kv_store_test.go b/payments/db/kv_store_test.go index f0c2b148fd0..cf5a9a9a001 100644 --- a/payments/db/kv_store_test.go +++ b/payments/db/kv_store_test.go @@ -30,6 +30,8 @@ import ( func TestKVStoreDeleteNonInFlight(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB := NewKVTestDB(t) // Create a sequence number for duplicate payments that will not collide @@ -180,7 +182,7 @@ func TestKVStoreDeleteNonInFlight(t *testing.T) { } // Delete all failed payments. - numPayments, err := paymentDB.DeletePayments(true, false) + numPayments, err := paymentDB.DeletePayments(ctx, true, false) require.NoError(t, err) require.EqualValues(t, 1, numPayments) @@ -216,7 +218,7 @@ func TestKVStoreDeleteNonInFlight(t *testing.T) { } // Now delete all payments except in-flight. - numPayments, err = paymentDB.DeletePayments(false, false) + numPayments, err = paymentDB.DeletePayments(ctx, false, false) require.NoError(t, err) require.EqualValues(t, 2, numPayments) diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 1180cf95374..4a9a69b0ce6 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -1658,6 +1658,8 @@ func TestFailsWithoutInFlight(t *testing.T) { func TestDeletePayments(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB, _ := NewTestDB(t) // Register three payments: @@ -1678,7 +1680,7 @@ func TestDeletePayments(t *testing.T) { assertDBPayments(t, paymentDB, payments) // Delete HTLC attempts for failed payments only. - numPayments, err := paymentDB.DeletePayments(true, true) + numPayments, err := paymentDB.DeletePayments(ctx, true, true) require.NoError(t, err) require.EqualValues(t, 0, numPayments) @@ -1687,7 +1689,7 @@ func TestDeletePayments(t *testing.T) { assertDBPayments(t, paymentDB, payments) // Delete failed attempts for all payments. - numPayments, err = paymentDB.DeletePayments(false, true) + numPayments, err = paymentDB.DeletePayments(ctx, false, true) require.NoError(t, err) require.EqualValues(t, 0, numPayments) @@ -1697,14 +1699,14 @@ func TestDeletePayments(t *testing.T) { assertDBPayments(t, paymentDB, payments) // Now delete all failed payments. - numPayments, err = paymentDB.DeletePayments(true, false) + numPayments, err = paymentDB.DeletePayments(ctx, true, false) require.NoError(t, err) require.EqualValues(t, 1, numPayments) assertDBPayments(t, paymentDB, payments[1:]) // Finally delete all completed payments. - numPayments, err = paymentDB.DeletePayments(false, false) + numPayments, err = paymentDB.DeletePayments(ctx, false, false) require.NoError(t, err) require.EqualValues(t, 1, numPayments) diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index f6f5d0d37ad..ef0d96c93cd 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1874,13 +1874,13 @@ func (s *SQLStore) Fail(paymentHash lntypes.Hash, // This method is part of the PaymentWriter interface, which is embedded in // the DB interface. // -// TODO(ziggie): batch this call instead in the background so for dbs with -// many payments it doesn't block the main thread. -func (s *SQLStore) DeletePayments(failedOnly, failedHtlcsOnly bool) (int, - error) { +// TODO(ziggie): batch and use iterator instead, moreover we dont need to fetch +// the complete payment data for each payment, we can just fetch the payment ID +// and the resolution types to decide if the payment is removable. +func (s *SQLStore) DeletePayments(ctx context.Context, failedOnly, + failedHtlcsOnly bool) (int, error) { var numPayments int - ctx := context.TODO() extractCursor := func(row sqlc.FilterPaymentsRow) int64 { return row.Payment.ID diff --git a/rpcserver.go b/rpcserver.go index 39f22a86c62..e75e0dcd15a 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7719,7 +7719,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context, req.FailedHtlcsOnly) numDeletedPayments, err := r.server.paymentsDB.DeletePayments( - req.FailedPaymentsOnly, req.FailedHtlcsOnly, + ctx, req.FailedPaymentsOnly, req.FailedHtlcsOnly, ) if err != nil { return nil, err From 36c19f2597a61ad9759cc4d1f6dca85126ac264c Mon Sep 17 00:00:00 2001 From: ziggie Date: Mon, 20 Oct 2025 20:16:47 +0200 Subject: [PATCH 03/11] multi: thread context through FetchPayment --- payments/db/interface.go | 3 ++- payments/db/kv_store.go | 4 ++-- payments/db/kv_store_test.go | 10 ++++++---- payments/db/payment_test.go | 16 +++++++++++----- payments/db/sql_store.go | 4 ++-- payments/db/test_kvdb.go | 4 +++- routing/control_tower.go | 17 ++++++++++++----- routing/mock_test.go | 9 +++++---- routing/payment_lifecycle.go | 8 ++++++-- routing/router.go | 4 +++- routing/router_test.go | 4 +++- 11 files changed, 55 insertions(+), 28 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index c6f1bf35312..5368d53d328 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -21,7 +21,8 @@ type PaymentReader interface { // FetchPayment fetches the payment corresponding to the given payment // hash. - FetchPayment(paymentHash lntypes.Hash) (*MPPayment, error) + FetchPayment(ctx context.Context, + paymentHash lntypes.Hash) (*MPPayment, error) // FetchInFlightPayments returns all payments with status InFlight. FetchInFlightPayments() ([]*MPPayment, error) diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index d3d347afe97..86b37edcf72 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -585,8 +585,8 @@ func (p *KVStore) Fail(paymentHash lntypes.Hash, } // FetchPayment returns information about a payment from the database. -func (p *KVStore) FetchPayment(paymentHash lntypes.Hash) ( - *MPPayment, error) { +func (p *KVStore) FetchPayment(_ context.Context, + paymentHash lntypes.Hash) (*MPPayment, error) { var payment *MPPayment err := kvdb.View(p.db, func(tx kvdb.RTx) error { diff --git a/payments/db/kv_store_test.go b/payments/db/kv_store_test.go index cf5a9a9a001..910c1812dca 100644 --- a/payments/db/kv_store_test.go +++ b/payments/db/kv_store_test.go @@ -409,6 +409,8 @@ func deletePayment(t *testing.T, db kvdb.Backend, paymentHash lntypes.Hash, func TestFetchPaymentWithSequenceNumber(t *testing.T) { paymentDB := NewKVTestDB(t) + ctx := t.Context() + // Generate a test payment which does not have duplicates. noDuplicates, _, err := genInfo(t) require.NoError(t, err) @@ -421,7 +423,7 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) { // Fetch the payment so we can get its sequence nr. noDuplicatesPayment, err := paymentDB.FetchPayment( - noDuplicates.PaymentIdentifier, + ctx, noDuplicates.PaymentIdentifier, ) require.NoError(t, err) @@ -437,7 +439,7 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) { // Fetch the payment so we can get its sequence nr. hasDuplicatesPayment, err := paymentDB.FetchPayment( - hasDuplicates.PaymentIdentifier, + ctx, hasDuplicates.PaymentIdentifier, ) require.NoError(t, err) @@ -749,7 +751,7 @@ func TestKVStoreQueryPaymentsDuplicates(t *testing.T) { // Immediately delete the payment with index 2. if i == 1 { pmt, err := paymentDB.FetchPayment( - info.PaymentIdentifier, + ctx, info.PaymentIdentifier, ) require.NoError(t, err) @@ -766,7 +768,7 @@ func TestKVStoreQueryPaymentsDuplicates(t *testing.T) { // duplicate payments will always be succeeded. if i == (nonDuplicatePayments - 1) { pmt, err := paymentDB.FetchPayment( - info.PaymentIdentifier, + ctx, info.PaymentIdentifier, ) require.NoError(t, err) diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 4a9a69b0ce6..8954f8a5a75 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -235,7 +235,9 @@ func assertPaymentInfo(t *testing.T, p DB, hash lntypes.Hash, t.Helper() - payment, err := p.FetchPayment(hash) + ctx := t.Context() + + payment, err := p.FetchPayment(ctx, hash) if err != nil { t.Fatal(err) } @@ -303,7 +305,9 @@ func assertDBPaymentstatus(t *testing.T, p DB, hash lntypes.Hash, t.Helper() - payment, err := p.FetchPayment(hash) + ctx := t.Context() + + payment, err := p.FetchPayment(ctx, hash) if errors.Is(err, ErrPaymentNotInitiated) { return } @@ -1796,6 +1800,8 @@ func TestSwitchDoubleSend(t *testing.T) { func TestSwitchFail(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB, harness := NewTestDB(t) preimg, err := genPreimage(t) @@ -1834,7 +1840,7 @@ func TestSwitchFail(t *testing.T) { // Lookup the payment so we can get its old sequence number before it is // overwritten. - payment, err := paymentDB.FetchPayment(info.PaymentIdentifier) + payment, err := paymentDB.FetchPayment(ctx, info.PaymentIdentifier) require.NoError(t, err) // Sends the htlc again, which should succeed since the prior payment @@ -2609,7 +2615,7 @@ func TestQueryPayments(t *testing.T) { // Now delete the payment at index 1 (the second // payment). pmt, err := paymentDB.FetchPayment( - paymentInfos[1].PaymentIdentifier, + ctx, paymentInfos[1].PaymentIdentifier, ) require.NoError(t, err) @@ -2621,7 +2627,7 @@ func TestQueryPayments(t *testing.T) { // Verify the payment is deleted. _, err = paymentDB.FetchPayment( - paymentInfos[1].PaymentIdentifier, + ctx, paymentInfos[1].PaymentIdentifier, ) require.ErrorIs( t, err, ErrPaymentNotInitiated, diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index ef0d96c93cd..94233649e8d 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -922,8 +922,8 @@ func fetchPaymentByHash(ctx context.Context, db SQLQueries, // Returns ErrPaymentNotInitiated if no payment with the given hash exists. // // This is part of the DB interface. -func (s *SQLStore) FetchPayment(paymentHash lntypes.Hash) (*MPPayment, error) { - ctx := context.TODO() +func (s *SQLStore) FetchPayment(ctx context.Context, + paymentHash lntypes.Hash) (*MPPayment, error) { var mpPayment *MPPayment diff --git a/payments/db/test_kvdb.go b/payments/db/test_kvdb.go index ed1710b14fa..c2de0b43f04 100644 --- a/payments/db/test_kvdb.go +++ b/payments/db/test_kvdb.go @@ -57,9 +57,11 @@ func (h *kvTestHarness) AssertPaymentIndex(t *testing.T, t.Helper() + ctx := t.Context() + // Lookup the payment so that we have its sequence number and check // that it has correctly been indexed in the payment indexes bucket. - pmt, err := h.db.FetchPayment(expectedHash) + pmt, err := h.db.FetchPayment(ctx, expectedHash) require.NoError(t, err) hash, err := h.fetchPaymentIndexEntry(t, pmt.SequenceNum) diff --git a/routing/control_tower.go b/routing/control_tower.go index 2b9e7dd9d28..31028948775 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -1,6 +1,7 @@ package routing import ( + "context" "sync" "github.com/lightningnetwork/lnd/lntypes" @@ -52,7 +53,8 @@ type ControlTower interface { // FetchPayment fetches the payment corresponding to the given payment // hash. - FetchPayment(paymentHash lntypes.Hash) (paymentsdb.DBMPPayment, error) + FetchPayment(ctx context.Context, + paymentHash lntypes.Hash) (paymentsdb.DBMPPayment, error) // FailPayment transitions a payment into the Failed state, and records // the ultimate reason the payment failed. Note that this should only @@ -164,6 +166,8 @@ func NewControlTower(db paymentsdb.DB) ControlTower { func (p *controlTower) InitPayment(paymentHash lntypes.Hash, info *paymentsdb.PaymentCreationInfo) error { + ctx := context.TODO() + err := p.db.InitPayment(paymentHash, info) if err != nil { return err @@ -174,7 +178,7 @@ func (p *controlTower) InitPayment(paymentHash lntypes.Hash, p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) - payment, err := p.db.FetchPayment(paymentHash) + payment, err := p.db.FetchPayment(ctx, paymentHash) if err != nil { return err } @@ -250,10 +254,11 @@ func (p *controlTower) FailAttempt(paymentHash lntypes.Hash, } // FetchPayment fetches the payment corresponding to the given payment hash. -func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) ( +func (p *controlTower) FetchPayment(ctx context.Context, + paymentHash lntypes.Hash) ( paymentsdb.DBMPPayment, error) { - return p.db.FetchPayment(paymentHash) + return p.db.FetchPayment(ctx, paymentHash) } // FailPayment transitions a payment into the Failed state, and records the @@ -293,12 +298,14 @@ func (p *controlTower) FetchInFlightPayments() ([]*paymentsdb.MPPayment, func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) ( ControlTowerSubscriber, error) { + ctx := context.TODO() + // Take lock before querying the db to prevent missing or duplicating an // update. p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) - payment, err := p.db.FetchPayment(paymentHash) + payment, err := p.db.FetchPayment(ctx, paymentHash) if err != nil { return nil, err } diff --git a/routing/mock_test.go b/routing/mock_test.go index 19a76ee9010..556601ecd0e 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -1,6 +1,7 @@ package routing import ( + "context" "errors" "fmt" "sync" @@ -509,8 +510,8 @@ func (m *mockControlTowerOld) FailPayment(phash lntypes.Hash, return nil } -func (m *mockControlTowerOld) FetchPayment(phash lntypes.Hash) ( - paymentsdb.DBMPPayment, error) { +func (m *mockControlTowerOld) FetchPayment(_ context.Context, + phash lntypes.Hash) (paymentsdb.DBMPPayment, error) { m.Lock() defer m.Unlock() @@ -786,8 +787,8 @@ func (m *mockControlTower) FailPayment(phash lntypes.Hash, return args.Error(0) } -func (m *mockControlTower) FetchPayment(phash lntypes.Hash) ( - paymentsdb.DBMPPayment, error) { +func (m *mockControlTower) FetchPayment(_ context.Context, + phash lntypes.Hash) (paymentsdb.DBMPPayment, error) { args := m.Called(phash) diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 8353cba157f..4eb78c8e22e 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -1114,7 +1114,9 @@ func (p *paymentLifecycle) patchLegacyPaymentHash( func (p *paymentLifecycle) reloadInflightAttempts() (paymentsdb.DBMPPayment, error) { - payment, err := p.router.cfg.Control.FetchPayment(p.identifier) + ctx := context.TODO() + + payment, err := p.router.cfg.Control.FetchPayment(ctx, p.identifier) if err != nil { return nil, err } @@ -1139,8 +1141,10 @@ func (p *paymentLifecycle) reloadInflightAttempts() (paymentsdb.DBMPPayment, func (p *paymentLifecycle) reloadPayment() (paymentsdb.DBMPPayment, *paymentsdb.MPPaymentState, error) { + ctx := context.TODO() + // Read the db to get the latest state of the payment. - payment, err := p.router.cfg.Control.FetchPayment(p.identifier) + payment, err := p.router.cfg.Control.FetchPayment(ctx, p.identifier) if err != nil { return nil, nil, err } diff --git a/routing/router.go b/routing/router.go index 3c35b7c52cc..cdf2013aa86 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1064,13 +1064,15 @@ func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route, firstHopCustomRecords lnwire.CustomRecords) (*paymentsdb.HTLCAttempt, error) { + ctx := context.TODO() + // Helper function to fail a payment. It makes sure the payment is only // failed once so that the failure reason is not overwritten. failPayment := func(paymentIdentifier lntypes.Hash, reason paymentsdb.FailureReason) error { payment, fetchErr := r.cfg.Control.FetchPayment( - paymentIdentifier, + ctx, paymentIdentifier, ) if fetchErr != nil { return fetchErr diff --git a/routing/router_test.go b/routing/router_test.go index 9f088917daf..a20b1b75dd2 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -1093,7 +1093,9 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { require.Equal(t, paymentsdb.FailureReasonNoRoute, err) // Inspect the two attempts that were made before the payment failed. - p, err := ctx.router.cfg.Control.FetchPayment(*payment.paymentHash) + p, err := ctx.router.cfg.Control.FetchPayment( + t.Context(), *payment.paymentHash, + ) require.NoError(t, err) htlcs := p.GetHTLCs() From 2f79e8d299a38b2900a3d12bc747c8eb278fea12 Mon Sep 17 00:00:00 2001 From: ziggie Date: Mon, 20 Oct 2025 20:24:32 +0200 Subject: [PATCH 04/11] multi: thread context through FetchInflightPayments --- payments/db/interface.go | 2 +- payments/db/kv_store.go | 4 +++- payments/db/payment_test.go | 10 +++++++--- payments/db/sql_store.go | 4 +--- routing/control_tower.go | 13 ++++++++----- routing/mock_test.go | 4 ++-- routing/router.go | 4 +++- 7 files changed, 25 insertions(+), 16 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 5368d53d328..616906c738e 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -25,7 +25,7 @@ type PaymentReader interface { paymentHash lntypes.Hash) (*MPPayment, error) // FetchInFlightPayments returns all payments with status InFlight. - FetchInFlightPayments() ([]*MPPayment, error) + FetchInFlightPayments(ctx context.Context) ([]*MPPayment, error) } // PaymentWriter represents the interface to write operations to the payments diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 86b37edcf72..1b48cac67ec 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -741,7 +741,9 @@ func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) { } // FetchInFlightPayments returns all payments with status InFlight. -func (p *KVStore) FetchInFlightPayments() ([]*MPPayment, error) { +func (p *KVStore) FetchInFlightPayments(_ context.Context) ([]*MPPayment, + error) { + var ( inFlights []*MPPayment start = time.Now() diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 8954f8a5a75..cc1c6e9199d 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -2740,6 +2740,8 @@ func TestQueryPayments(t *testing.T) { func TestFetchInFlightPayments(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB, _ := NewTestDB(t) // Register payments with different statuses: @@ -2765,7 +2767,7 @@ func TestFetchInFlightPayments(t *testing.T) { assertDBPayments(t, paymentDB, payments) // Fetch in-flight payments. - inFlightPayments, err := paymentDB.FetchInFlightPayments() + inFlightPayments, err := paymentDB.FetchInFlightPayments(ctx) require.NoError(t, err) // We should only get the two in-flight payments. @@ -2795,7 +2797,7 @@ func TestFetchInFlightPayments(t *testing.T) { require.NoError(t, err) // Fetch in-flight payments again. - inFlightPayments, err = paymentDB.FetchInFlightPayments() + inFlightPayments, err = paymentDB.FetchInFlightPayments(ctx) require.NoError(t, err) // We should now only get one in-flight payment. @@ -2812,6 +2814,8 @@ func TestFetchInFlightPayments(t *testing.T) { func TestFetchInFlightPaymentsMultipleAttempts(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB, _ := NewTestDB(t) preimg, err := genPreimage(t) @@ -2843,7 +2847,7 @@ func TestFetchInFlightPaymentsMultipleAttempts(t *testing.T) { require.NoError(t, err) // Both attempts are in-flight. Fetch in-flight payments. - inFlightPayments, err := paymentDB.FetchInFlightPayments() + inFlightPayments, err := paymentDB.FetchInFlightPayments(ctx) require.NoError(t, err) // We should only get one payment even though it has 2 in-flight diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index 94233649e8d..7fad7cc7e43 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -972,11 +972,9 @@ func (s *SQLStore) FetchPayment(ctx context.Context, // While inflight payments are typically a small subset, this would improve // memory efficiency for nodes with unusually high numbers of concurrent // payments and would better leverage the existing pagination infrastructure. -func (s *SQLStore) FetchInFlightPayments() ([]*MPPayment, +func (s *SQLStore) FetchInFlightPayments(ctx context.Context) ([]*MPPayment, error) { - ctx := context.TODO() - var mpPayments []*MPPayment err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { diff --git a/routing/control_tower.go b/routing/control_tower.go index 31028948775..718dca3ff54 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -67,7 +67,8 @@ type ControlTower interface { FailPayment(lntypes.Hash, paymentsdb.FailureReason) error // FetchInFlightPayments returns all payments with status InFlight. - FetchInFlightPayments() ([]*paymentsdb.MPPayment, error) + FetchInFlightPayments(ctx context.Context) ([]*paymentsdb.MPPayment, + error) // SubscribePayment subscribes to updates for the payment with the given // hash. A first update with the current state of the payment is always @@ -286,10 +287,10 @@ func (p *controlTower) FailPayment(paymentHash lntypes.Hash, } // FetchInFlightPayments returns all payments with status InFlight. -func (p *controlTower) FetchInFlightPayments() ([]*paymentsdb.MPPayment, - error) { +func (p *controlTower) FetchInFlightPayments( + ctx context.Context) ([]*paymentsdb.MPPayment, error) { - return p.db.FetchInFlightPayments() + return p.db.FetchInFlightPayments(ctx) } // SubscribePayment subscribes to updates for the payment with the given hash. A @@ -342,6 +343,8 @@ func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) ( func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) { subscriber := newControlTowerSubscriber() + ctx := context.TODO() + // Add the subscriber to the list before fetching in-flight payments, so // no events are missed. If a payment attempt update occurs after // appending and before fetching in-flight payments, an out-of-order @@ -353,7 +356,7 @@ func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) { p.subscribersMtx.Unlock() log.Debugf("Scanning for inflight payments") - inflightPayments, err := p.db.FetchInFlightPayments() + inflightPayments, err := p.db.FetchInFlightPayments(ctx) if err != nil { return nil, err } diff --git a/routing/mock_test.go b/routing/mock_test.go index 556601ecd0e..b30627165da 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -546,7 +546,7 @@ func (m *mockControlTowerOld) fetchPayment(phash lntypes.Hash) ( return mp, nil } -func (m *mockControlTowerOld) FetchInFlightPayments() ( +func (m *mockControlTowerOld) FetchInFlightPayments(_ context.Context) ( []*paymentsdb.MPPayment, error) { if m.fetchInFlight != nil { @@ -801,7 +801,7 @@ func (m *mockControlTower) FetchPayment(_ context.Context, return payment, args.Error(1) } -func (m *mockControlTower) FetchInFlightPayments() ( +func (m *mockControlTower) FetchInFlightPayments(_ context.Context) ( []*paymentsdb.MPPayment, error) { args := m.Called() diff --git a/routing/router.go b/routing/router.go index cdf2013aa86..f713216913a 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1417,9 +1417,11 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi], // resumePayments fetches inflight payments and resumes their payment // lifecycles. func (r *ChannelRouter) resumePayments() error { + ctx := context.TODO() + // Get all payments that are inflight. log.Debugf("Scanning for inflight payments") - payments, err := r.cfg.Control.FetchInFlightPayments() + payments, err := r.cfg.Control.FetchInFlightPayments(ctx) if err != nil { return err } From a8d35602d53efce72e67bdd4b3540aa050b7917b Mon Sep 17 00:00:00 2001 From: ziggie Date: Mon, 20 Oct 2025 22:00:16 +0200 Subject: [PATCH 05/11] multi: thread context through InitPayment --- payments/db/interface.go | 2 +- payments/db/kv_store.go | 2 +- payments/db/kv_store_test.go | 8 ++++---- payments/db/payment_test.go | 34 +++++++++++++++++++++------------- payments/db/sql_store.go | 4 +--- routing/control_tower.go | 11 +++++------ routing/control_tower_test.go | 12 ++++++------ routing/mock_test.go | 6 +++--- routing/router.go | 6 ++++-- 9 files changed, 46 insertions(+), 39 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 616906c738e..2d0335609d2 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -61,7 +61,7 @@ type PaymentControl interface { // exists in the database before creating a new payment. However, it // should allow the user making a subsequent payment if the payment is // in a Failed state. - InitPayment(lntypes.Hash, *PaymentCreationInfo) error + InitPayment(context.Context, lntypes.Hash, *PaymentCreationInfo) error // RegisterAttempt atomically records the provided HTLCAttemptInfo. // diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 1b48cac67ec..81b257ce86f 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -186,7 +186,7 @@ func initKVStore(db kvdb.Backend) error { // making sure it does not already exist as an in-flight payment. When this // method returns successfully, the payment is guaranteed to be in the InFlight // state. -func (p *KVStore) InitPayment(paymentHash lntypes.Hash, +func (p *KVStore) InitPayment(_ context.Context, paymentHash lntypes.Hash, info *PaymentCreationInfo) error { // Obtain a new sequence number for this payment. This is used diff --git a/payments/db/kv_store_test.go b/payments/db/kv_store_test.go index 910c1812dca..6837134e7d4 100644 --- a/payments/db/kv_store_test.go +++ b/payments/db/kv_store_test.go @@ -80,7 +80,7 @@ func TestKVStoreDeleteNonInFlight(t *testing.T) { require.NoError(t, err) // Sends base htlc message which initiate StatusInFlight. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) if err != nil { t.Fatalf("unable to send htlc message: %v", err) } @@ -417,7 +417,7 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) { // Create a new payment entry in the database. err = paymentDB.InitPayment( - noDuplicates.PaymentIdentifier, noDuplicates, + ctx, noDuplicates.PaymentIdentifier, noDuplicates, ) require.NoError(t, err) @@ -433,7 +433,7 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) { // Create a new payment entry in the database. err = paymentDB.InitPayment( - hasDuplicates.PaymentIdentifier, hasDuplicates, + ctx, hasDuplicates.PaymentIdentifier, hasDuplicates, ) require.NoError(t, err) @@ -744,7 +744,7 @@ func TestKVStoreQueryPaymentsDuplicates(t *testing.T) { // Create a new payment entry in the database. err = paymentDB.InitPayment( - info.PaymentIdentifier, info, + ctx, info.PaymentIdentifier, info, ) require.NoError(t, err) diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index cc1c6e9199d..4b2cbcbd82d 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -125,6 +125,8 @@ type payment struct { func createTestPayments(t *testing.T, p DB, payments []*payment) { t.Helper() + ctx := t.Context() + attemptID := uint64(0) for i := 0; i < len(payments); i++ { @@ -145,7 +147,7 @@ func createTestPayments(t *testing.T, p DB, payments []*payment) { attemptID++ // Init the payment. - err = p.InitPayment(info.PaymentIdentifier, info) + err = p.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err, "unable to send htlc message") // Register and fail the first attempt for all payments. @@ -559,6 +561,8 @@ func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) { func TestMPPRecordValidation(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB, _ := NewTestDB(t) preimg, err := genPreimage(t) @@ -575,7 +579,7 @@ func TestMPPRecordValidation(t *testing.T) { require.NoError(t, err, "unable to generate htlc message") // Init the payment. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err, "unable to send htlc message") // Create three unique attempts we'll use for the test, and @@ -633,7 +637,7 @@ func TestMPPRecordValidation(t *testing.T) { require.NoError(t, err, "unable to generate htlc message") - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err, "unable to send htlc message") attempt.Route.FinalHop().MPP = nil @@ -1722,6 +1726,8 @@ func TestDeletePayments(t *testing.T) { func TestSwitchDoubleSend(t *testing.T) { t.Parallel() + ctx := t.Context() + paymentDB, harness := NewTestDB(t) preimg, err := genPreimage(t) @@ -1734,7 +1740,7 @@ func TestSwitchDoubleSend(t *testing.T) { // Sends base htlc message which initiate base status and move it to // StatusInFlight and verifies that it was changed. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err, "unable to send htlc message") harness.AssertPaymentIndex(t, info.PaymentIdentifier) @@ -1748,7 +1754,7 @@ func TestSwitchDoubleSend(t *testing.T) { // Try to initiate double sending of htlc message with the same // payment hash, should result in error indicating that payment has // already been sent. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.ErrorIs(t, err, ErrPaymentExists) // Record an attempt. @@ -1766,7 +1772,7 @@ func TestSwitchDoubleSend(t *testing.T) { ) // Sends base htlc message which initiate StatusInFlight. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) if !errors.Is(err, ErrPaymentInFlight) { t.Fatalf("payment control wrong behaviour: " + "double sending must trigger ErrPaymentInFlight error") @@ -1789,7 +1795,7 @@ func TestSwitchDoubleSend(t *testing.T) { t, paymentDB, info.PaymentIdentifier, info, nil, htlc, ) - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) if !errors.Is(err, ErrAlreadyPaid) { t.Fatalf("unable to send htlc message: %v", err) } @@ -1813,7 +1819,7 @@ func TestSwitchFail(t *testing.T) { require.NoError(t, err) // Sends base htlc message which initiate StatusInFlight. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err, "unable to send htlc message") harness.AssertPaymentIndex(t, info.PaymentIdentifier) @@ -1845,7 +1851,7 @@ func TestSwitchFail(t *testing.T) { // Sends the htlc again, which should succeed since the prior payment // failed. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err, "unable to send htlc message") // Check that our index has been updated, and the old index has been @@ -1940,7 +1946,7 @@ func TestSwitchFail(t *testing.T) { // Attempt a final payment, which should now fail since the prior // payment succeed. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) if !errors.Is(err, ErrAlreadyPaid) { t.Fatalf("unable to send htlc message: %v", err) } @@ -1951,6 +1957,8 @@ func TestSwitchFail(t *testing.T) { func TestMultiShard(t *testing.T) { t.Parallel() + ctx := t.Context() + // We will register three HTLC attempts, and always fail the second // one. We'll generate all combinations of settling/failing the first // and third HTLC, and assert that the payment status end up as we @@ -1977,7 +1985,7 @@ func TestMultiShard(t *testing.T) { info := genPaymentCreationInfo(t, rhash) // Init the payment, moving it to the StatusInFlight state. - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err) harness.AssertPaymentIndex(t, info.PaymentIdentifier) @@ -2607,7 +2615,7 @@ func TestQueryPayments(t *testing.T) { // Create a new payment entry in the database. err = paymentDB.InitPayment( - info.PaymentIdentifier, info, + ctx, info.PaymentIdentifier, info, ) require.NoError(t, err) } @@ -2826,7 +2834,7 @@ func TestFetchInFlightPaymentsMultipleAttempts(t *testing.T) { // Init payment with double the amount to allow two attempts. info.Value *= 2 - err = paymentDB.InitPayment(info.PaymentIdentifier, info) + err = paymentDB.InitPayment(ctx, info.PaymentIdentifier, info) require.NoError(t, err) // Register two attempts for the same payment. diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index 7fad7cc7e43..cacd5cdd6b7 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1265,11 +1265,9 @@ func (s *SQLStore) DeletePayment(ctx context.Context, paymentHash lntypes.Hash, // This method is part of the PaymentControl interface, which is embedded in // the PaymentWriter interface and ultimately the DB interface, representing // the first step in the payment lifecycle control flow. -func (s *SQLStore) InitPayment(paymentHash lntypes.Hash, +func (s *SQLStore) InitPayment(ctx context.Context, paymentHash lntypes.Hash, paymentCreationInfo *PaymentCreationInfo) error { - ctx := context.TODO() - // Create the payment in the database. err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { existingPayment, err := db.FetchPayment(ctx, paymentHash[:]) diff --git a/routing/control_tower.go b/routing/control_tower.go index 718dca3ff54..8df87b473bf 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -20,7 +20,8 @@ type ControlTower interface { // also notifies subscribers of the payment creation. // // NOTE: Subscribers should be notified by the new state of the payment. - InitPayment(lntypes.Hash, *paymentsdb.PaymentCreationInfo) error + InitPayment(context.Context, lntypes.Hash, + *paymentsdb.PaymentCreationInfo) error // DeleteFailedAttempts removes all failed HTLCs from the db. It should // be called for a given payment whenever all inflight htlcs are @@ -164,12 +165,10 @@ func NewControlTower(db paymentsdb.DB) ControlTower { // making sure it does not already exist as an in-flight payment. Then this // method returns successfully, the payment is guaranteed to be in the // Initiated state. -func (p *controlTower) InitPayment(paymentHash lntypes.Hash, - info *paymentsdb.PaymentCreationInfo) error { +func (p *controlTower) InitPayment(ctx context.Context, + paymentHash lntypes.Hash, info *paymentsdb.PaymentCreationInfo) error { - ctx := context.TODO() - - err := p.db.InitPayment(paymentHash, info) + err := p.db.InitPayment(ctx, paymentHash, info) if err != nil { return err } diff --git a/routing/control_tower_test.go b/routing/control_tower_test.go index de0aacf880b..0993fb2a688 100644 --- a/routing/control_tower_test.go +++ b/routing/control_tower_test.go @@ -81,7 +81,7 @@ func TestControlTowerSubscribeSuccess(t *testing.T) { t.Fatal(err) } - err = pControl.InitPayment(info.PaymentIdentifier, info) + err = pControl.InitPayment(t.Context(), info.PaymentIdentifier, info) if err != nil { t.Fatal(err) } @@ -212,7 +212,7 @@ func TestKVStoreSubscribeAllSuccess(t *testing.T) { info1, attempt1, preimg1, err := genInfo() require.NoError(t, err) - err = pControl.InitPayment(info1.PaymentIdentifier, info1) + err = pControl.InitPayment(t.Context(), info1.PaymentIdentifier, info1) require.NoError(t, err) // Subscription should succeed and immediately report the Initiated @@ -228,7 +228,7 @@ func TestKVStoreSubscribeAllSuccess(t *testing.T) { info2, attempt2, preimg2, err := genInfo() require.NoError(t, err) - err = pControl.InitPayment(info2.PaymentIdentifier, info2) + err = pControl.InitPayment(t.Context(), info2.PaymentIdentifier, info2) require.NoError(t, err) // Register an attempt on the second payment. @@ -337,7 +337,7 @@ func TestKVStoreSubscribeAllImmediate(t *testing.T) { info, attempt, _, err := genInfo() require.NoError(t, err) - err = pControl.InitPayment(info.PaymentIdentifier, info) + err = pControl.InitPayment(t.Context(), info.PaymentIdentifier, info) require.NoError(t, err) // Register a payment update. @@ -392,7 +392,7 @@ func TestKVStoreUnsubscribeSuccess(t *testing.T) { info, attempt, _, err := genInfo() require.NoError(t, err) - err = pControl.InitPayment(info.PaymentIdentifier, info) + err = pControl.InitPayment(t.Context(), info.PaymentIdentifier, info) require.NoError(t, err) // Assert all subscriptions receive the update. @@ -465,7 +465,7 @@ func testKVStoreSubscribeFail(t *testing.T, registerAttempt, t.Fatal(err) } - err = pControl.InitPayment(info.PaymentIdentifier, info) + err = pControl.InitPayment(t.Context(), info.PaymentIdentifier, info) if err != nil { t.Fatal(err) } diff --git a/routing/mock_test.go b/routing/mock_test.go index b30627165da..5b9d4854b13 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -297,8 +297,8 @@ func makeMockControlTower() *mockControlTowerOld { } } -func (m *mockControlTowerOld) InitPayment(phash lntypes.Hash, - c *paymentsdb.PaymentCreationInfo) error { +func (m *mockControlTowerOld) InitPayment(_ context.Context, + phash lntypes.Hash, c *paymentsdb.PaymentCreationInfo) error { if m.init != nil { m.init <- initArgs{c} @@ -734,7 +734,7 @@ type mockControlTower struct { var _ ControlTower = (*mockControlTower)(nil) -func (m *mockControlTower) InitPayment(phash lntypes.Hash, +func (m *mockControlTower) InitPayment(_ context.Context, phash lntypes.Hash, c *paymentsdb.PaymentCreationInfo) error { args := m.Called(phash, c) diff --git a/routing/router.go b/routing/router.go index f713216913a..55d58e78bda 100644 --- a/routing/router.go +++ b/routing/router.go @@ -967,6 +967,8 @@ func spewPayment(payment *LightningPayment) lnutils.LogClosure { func (r *ChannelRouter) PreparePayment(payment *LightningPayment) ( PaymentSession, shards.ShardTracker, error) { + ctx := context.TODO() + // Assemble any custom data we want to send to the first hop only. var firstHopData fn.Option[tlv.Blob] if len(payment.FirstHopCustomRecords) > 0 { @@ -1026,7 +1028,7 @@ func (r *ChannelRouter) PreparePayment(payment *LightningPayment) ( ) } - err = r.cfg.Control.InitPayment(payment.Identifier(), info) + err = r.cfg.Control.InitPayment(ctx, payment.Identifier(), info) if err != nil { return nil, nil, err } @@ -1131,7 +1133,7 @@ func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route, FirstHopCustomRecords: firstHopCustomRecords, } - err := r.cfg.Control.InitPayment(paymentIdentifier, info) + err := r.cfg.Control.InitPayment(ctx, paymentIdentifier, info) switch { // If this is an MPP attempt and the hash is already registered with // the database, we can go on to launch the shard. From 60ed5b385a97e9a77fdd3219c816d42abd8bb7ed Mon Sep 17 00:00:00 2001 From: ziggie Date: Tue, 11 Nov 2025 11:23:15 +0100 Subject: [PATCH 06/11] multi: thread context through RegisterAttempt method --- payments/db/interface.go | 3 +- payments/db/kv_store.go | 2 +- payments/db/kv_store_test.go | 2 +- payments/db/payment_test.go | 52 +++++++++++++++++++++++------------ payments/db/sql_store.go | 4 +-- routing/control_tower.go | 9 +++--- routing/control_tower_test.go | 28 +++++++++++++------ routing/mock_test.go | 8 +++--- routing/payment_lifecycle.go | 4 ++- 9 files changed, 70 insertions(+), 42 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 2d0335609d2..3af2dfb671b 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -75,7 +75,8 @@ type PaymentControl interface { // - Result: 1700 sats sent, exceeding the payment amount // The payment router/controller layer is responsible for ensuring // serialized access per payment hash. - RegisterAttempt(lntypes.Hash, *HTLCAttemptInfo) (*MPPayment, error) + RegisterAttempt(context.Context, lntypes.Hash, + *HTLCAttemptInfo) (*MPPayment, error) // SettleAttempt marks the given attempt settled with the preimage. If // this is a multi shard payment, this might implicitly mean the diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 81b257ce86f..5511bf8bc44 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -359,7 +359,7 @@ func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) { // RegisterAttempt atomically records the provided HTLCAttemptInfo to the // DB. -func (p *KVStore) RegisterAttempt(paymentHash lntypes.Hash, +func (p *KVStore) RegisterAttempt(_ context.Context, paymentHash lntypes.Hash, attempt *HTLCAttemptInfo) (*MPPayment, error) { // Serialize the information before opening the db transaction. diff --git a/payments/db/kv_store_test.go b/payments/db/kv_store_test.go index 6837134e7d4..0c51dccf59c 100644 --- a/payments/db/kv_store_test.go +++ b/payments/db/kv_store_test.go @@ -85,7 +85,7 @@ func TestKVStoreDeleteNonInFlight(t *testing.T) { t.Fatalf("unable to send htlc message: %v", err) } _, err = paymentDB.RegisterAttempt( - info.PaymentIdentifier, attempt, + ctx, info.PaymentIdentifier, attempt, ) if err != nil { t.Fatalf("unable to send htlc message: %v", err) diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 4b2cbcbd82d..8e2c7a47c82 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -151,7 +151,7 @@ func createTestPayments(t *testing.T, p DB, payments []*payment) { require.NoError(t, err, "unable to send htlc message") // Register and fail the first attempt for all payments. - _, err = p.RegisterAttempt(info.PaymentIdentifier, attempt) + _, err = p.RegisterAttempt(ctx, info.PaymentIdentifier, attempt) require.NoError(t, err, "unable to send htlc message") htlcFailure := HTLCFailUnreadable @@ -175,7 +175,7 @@ func createTestPayments(t *testing.T, p DB, payments []*payment) { require.NoError(t, err) attemptID++ - _, err = p.RegisterAttempt(info.PaymentIdentifier, attempt) + _, err = p.RegisterAttempt(ctx, info.PaymentIdentifier, attempt) require.NoError(t, err, "unable to send htlc message") switch payments[i].status { @@ -592,7 +592,7 @@ func TestMPPRecordValidation(t *testing.T) { info.Value, [32]byte{1}, ) - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt) + _, err = paymentDB.RegisterAttempt(ctx, info.PaymentIdentifier, attempt) require.NoError(t, err, "unable to send htlc message") // Now try to register a non-MPP attempt, which should fail. @@ -604,21 +604,27 @@ func TestMPPRecordValidation(t *testing.T) { attempt2.Route.FinalHop().MPP = nil - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt2) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, attempt2, + ) require.ErrorIs(t, err, ErrMPPayment) // Try to register attempt one with a different payment address. attempt2.Route.FinalHop().MPP = record.NewMPP( info.Value, [32]byte{2}, ) - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt2) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, attempt2, + ) require.ErrorIs(t, err, ErrMPPPaymentAddrMismatch) // Try registering one with a different total amount. attempt2.Route.FinalHop().MPP = record.NewMPP( info.Value/2, [32]byte{1}, ) - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt2) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, attempt2, + ) require.ErrorIs(t, err, ErrMPPTotalAmountMismatch) // Create and init a new payment. This time we'll check that we cannot @@ -641,7 +647,9 @@ func TestMPPRecordValidation(t *testing.T) { require.NoError(t, err, "unable to send htlc message") attempt.Route.FinalHop().MPP = nil - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, attempt, + ) require.NoError(t, err, "unable to send htlc message") // Attempt to register an MPP attempt, which should fail. @@ -655,7 +663,9 @@ func TestMPPRecordValidation(t *testing.T) { info.Value, [32]byte{1}, ) - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt2) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, attempt2, + ) require.ErrorIs(t, err, ErrNonMPPayment) } @@ -1758,7 +1768,7 @@ func TestSwitchDoubleSend(t *testing.T) { require.ErrorIs(t, err, ErrPaymentExists) // Record an attempt. - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt) + _, err = paymentDB.RegisterAttempt(ctx, info.PaymentIdentifier, attempt) require.NoError(t, err, "unable to send htlc message") assertDBPaymentstatus( t, paymentDB, info.PaymentIdentifier, StatusInFlight, @@ -1869,7 +1879,7 @@ func TestSwitchFail(t *testing.T) { // Record a new attempt. In this test scenario, the attempt fails. // However, this is not communicated to control tower in the current // implementation. It only registers the initiation of the attempt. - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt) + _, err = paymentDB.RegisterAttempt(ctx, info.PaymentIdentifier, attempt) require.NoError(t, err, "unable to register attempt") htlcReason := HTLCFailUnreadable @@ -1899,7 +1909,7 @@ func TestSwitchFail(t *testing.T) { ) require.NoError(t, err) - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, attempt) + _, err = paymentDB.RegisterAttempt(ctx, info.PaymentIdentifier, attempt) require.NoError(t, err, "unable to send htlc message") assertDBPaymentstatus( t, paymentDB, info.PaymentIdentifier, StatusInFlight, @@ -2017,7 +2027,7 @@ func TestMultiShard(t *testing.T) { attempts = append(attempts, a) _, err = paymentDB.RegisterAttempt( - info.PaymentIdentifier, a, + ctx, info.PaymentIdentifier, a, ) if err != nil { t.Fatalf("unable to send htlc message: %v", err) @@ -2049,7 +2059,9 @@ func TestMultiShard(t *testing.T) { info.Value, [32]byte{1}, ) - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, b) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, b, + ) require.ErrorIs(t, err, ErrValueExceedsAmt) // Fail the second attempt. @@ -2156,7 +2168,9 @@ func TestMultiShard(t *testing.T) { info.Value, [32]byte{1}, ) - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, b) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, b, + ) if test.settleFirst { require.ErrorIs( t, err, ErrPaymentPendingSettled, @@ -2255,7 +2269,9 @@ func TestMultiShard(t *testing.T) { ) // Finally assert we cannot register more attempts. - _, err = paymentDB.RegisterAttempt(info.PaymentIdentifier, b) + _, err = paymentDB.RegisterAttempt( + ctx, info.PaymentIdentifier, b, + ) require.ErrorIs(t, err, registerErr) } @@ -2658,7 +2674,7 @@ func TestQueryPayments(t *testing.T) { require.NoError(t, err) _, err = paymentDB.RegisterAttempt( - lastPaymentInfo.PaymentIdentifier, + ctx, lastPaymentInfo.PaymentIdentifier, &attempt.HTLCAttemptInfo, ) require.NoError(t, err) @@ -2842,7 +2858,7 @@ func TestFetchInFlightPaymentsMultipleAttempts(t *testing.T) { require.NoError(t, err) _, err = paymentDB.RegisterAttempt( - info.PaymentIdentifier, attempt1, + ctx, info.PaymentIdentifier, attempt1, ) require.NoError(t, err) @@ -2850,7 +2866,7 @@ func TestFetchInFlightPaymentsMultipleAttempts(t *testing.T) { require.NoError(t, err) _, err = paymentDB.RegisterAttempt( - info.PaymentIdentifier, attempt2, + ctx, info.PaymentIdentifier, attempt2, ) require.NoError(t, err) diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index cacd5cdd6b7..8c963d360b3 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1496,11 +1496,9 @@ func (s *SQLStore) insertRouteHops(ctx context.Context, db SQLQueries, // the PaymentWriter interface and ultimately the DB interface. It represents // step 2 in the payment lifecycle control flow, called after InitPayment and // potentially multiple times for multi-path payments. -func (s *SQLStore) RegisterAttempt(paymentHash lntypes.Hash, +func (s *SQLStore) RegisterAttempt(ctx context.Context, paymentHash lntypes.Hash, attempt *HTLCAttemptInfo) (*MPPayment, error) { - ctx := context.TODO() - var mpPayment *MPPayment err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { diff --git a/routing/control_tower.go b/routing/control_tower.go index 8df87b473bf..28432d73224 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -31,7 +31,8 @@ type ControlTower interface { // RegisterAttempt atomically records the provided HTLCAttemptInfo. // // NOTE: Subscribers should be notified by the new state of the payment. - RegisterAttempt(lntypes.Hash, *paymentsdb.HTLCAttemptInfo) error + RegisterAttempt(context.Context, lntypes.Hash, + *paymentsdb.HTLCAttemptInfo) error // SettleAttempt marks the given attempt settled with the preimage. If // this is a multi shard payment, this might implicitly mean the the @@ -196,13 +197,13 @@ func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error { // RegisterAttempt atomically records the provided HTLCAttemptInfo to the // DB. -func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash, - attempt *paymentsdb.HTLCAttemptInfo) error { +func (p *controlTower) RegisterAttempt(ctx context.Context, + paymentHash lntypes.Hash, attempt *paymentsdb.HTLCAttemptInfo) error { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) - payment, err := p.db.RegisterAttempt(paymentHash, attempt) + payment, err := p.db.RegisterAttempt(ctx, paymentHash, attempt) if err != nil { return err } diff --git a/routing/control_tower_test.go b/routing/control_tower_test.go index 0993fb2a688..20bdd17564f 100644 --- a/routing/control_tower_test.go +++ b/routing/control_tower_test.go @@ -92,7 +92,9 @@ func TestControlTowerSubscribeSuccess(t *testing.T) { require.NoError(t, err, "expected subscribe to succeed, but got") // Register an attempt. - err = pControl.RegisterAttempt(info.PaymentIdentifier, attempt) + err = pControl.RegisterAttempt( + t.Context(), info.PaymentIdentifier, attempt, + ) if err != nil { t.Fatal(err) } @@ -221,7 +223,9 @@ func TestKVStoreSubscribeAllSuccess(t *testing.T) { require.NoError(t, err, "expected subscribe to succeed, but got: %v") // Register an attempt. - err = pControl.RegisterAttempt(info1.PaymentIdentifier, attempt1) + err = pControl.RegisterAttempt( + t.Context(), info1.PaymentIdentifier, attempt1, + ) require.NoError(t, err) // Initiate a second payment after the subscription is already active. @@ -232,7 +236,9 @@ func TestKVStoreSubscribeAllSuccess(t *testing.T) { require.NoError(t, err) // Register an attempt on the second payment. - err = pControl.RegisterAttempt(info2.PaymentIdentifier, attempt2) + err = pControl.RegisterAttempt( + t.Context(), info2.PaymentIdentifier, attempt2, + ) require.NoError(t, err) // Mark the first payment as successful. @@ -341,7 +347,9 @@ func TestKVStoreSubscribeAllImmediate(t *testing.T) { require.NoError(t, err) // Register a payment update. - err = pControl.RegisterAttempt(info.PaymentIdentifier, attempt) + err = pControl.RegisterAttempt( + t.Context(), info.PaymentIdentifier, attempt, + ) require.NoError(t, err) subscription, err := pControl.SubscribeAllPayments() @@ -414,7 +422,9 @@ func TestKVStoreUnsubscribeSuccess(t *testing.T) { subscription1.Close() // Register a payment update. - err = pControl.RegisterAttempt(info.PaymentIdentifier, attempt) + err = pControl.RegisterAttempt( + t.Context(), info.PaymentIdentifier, attempt, + ) require.NoError(t, err) // Assert only subscription 2 receives the update. @@ -479,10 +489,10 @@ func testKVStoreSubscribeFail(t *testing.T, registerAttempt, // making any attempts at all. if registerAttempt { // Register an attempt. - err = pControl.RegisterAttempt(info.PaymentIdentifier, attempt) - if err != nil { - t.Fatal(err) - } + err = pControl.RegisterAttempt( + t.Context(), info.PaymentIdentifier, attempt, + ) + require.NoError(t, err) // Fail the payment attempt. failInfo := paymentsdb.HTLCFailInfo{ diff --git a/routing/mock_test.go b/routing/mock_test.go index 5b9d4854b13..daad344fdf8 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -354,8 +354,8 @@ func (m *mockControlTowerOld) DeleteFailedAttempts(phash lntypes.Hash) error { return nil } -func (m *mockControlTowerOld) RegisterAttempt(phash lntypes.Hash, - a *paymentsdb.HTLCAttemptInfo) error { +func (m *mockControlTowerOld) RegisterAttempt(_ context.Context, + phash lntypes.Hash, a *paymentsdb.HTLCAttemptInfo) error { if m.registerAttempt != nil { m.registerAttempt <- registerAttemptArgs{a} @@ -746,8 +746,8 @@ func (m *mockControlTower) DeleteFailedAttempts(phash lntypes.Hash) error { return args.Error(0) } -func (m *mockControlTower) RegisterAttempt(phash lntypes.Hash, - a *paymentsdb.HTLCAttemptInfo) error { +func (m *mockControlTower) RegisterAttempt(_ context.Context, + phash lntypes.Hash, a *paymentsdb.HTLCAttemptInfo) error { args := m.Called(phash, a) return args.Error(0) diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 4eb78c8e22e..0499475416d 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -584,6 +584,8 @@ func (p *paymentLifecycle) collectResult( func (p *paymentLifecycle) registerAttempt(rt *route.Route, remainingAmt lnwire.MilliSatoshi) (*paymentsdb.HTLCAttempt, error) { + ctx := context.TODO() + // If this route will consume the last remaining amount to send // to the receiver, this will be our last shard (for now). isLastAttempt := rt.ReceiverAmt() == remainingAmt @@ -601,7 +603,7 @@ func (p *paymentLifecycle) registerAttempt(rt *route.Route, // Switch for its whereabouts. The route is needed to handle the result // when it eventually comes back. err = p.router.cfg.Control.RegisterAttempt( - p.identifier, &attempt.HTLCAttemptInfo, + ctx, p.identifier, &attempt.HTLCAttemptInfo, ) return attempt, err From 15c5a2450370a977a1b893f8a3a1b2881404a754 Mon Sep 17 00:00:00 2001 From: ziggie Date: Tue, 21 Oct 2025 08:54:50 +0200 Subject: [PATCH 07/11] multi: thread context through SettleAttempt --- payments/db/interface.go | 3 ++- payments/db/kv_store.go | 2 +- payments/db/kv_store_test.go | 2 +- payments/db/payment_test.go | 15 ++++++++------- payments/db/sql_store.go | 9 ++++----- routing/control_tower.go | 15 +++++++++------ routing/control_tower_test.go | 9 ++++++--- routing/mock_test.go | 6 +++--- routing/payment_lifecycle.go | 4 +++- 9 files changed, 37 insertions(+), 28 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 3af2dfb671b..452a7e5f79a 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -86,7 +86,8 @@ type PaymentControl interface { // error to prevent us from making duplicate payments to the same // payment hash. The provided preimage is atomically saved to the DB // for record keeping. - SettleAttempt(lntypes.Hash, uint64, *HTLCSettleInfo) (*MPPayment, error) + SettleAttempt(context.Context, lntypes.Hash, uint64, + *HTLCSettleInfo) (*MPPayment, error) // FailAttempt marks the given payment attempt failed. FailAttempt(lntypes.Hash, uint64, *HTLCFailInfo) (*MPPayment, error) diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 5511bf8bc44..3739232639c 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -430,7 +430,7 @@ func (p *KVStore) RegisterAttempt(_ context.Context, paymentHash lntypes.Hash, // After invoking this method, InitPayment should always return an error to // prevent us from making duplicate payments to the same payment hash. The // provided preimage is atomically saved to the DB for record keeping. -func (p *KVStore) SettleAttempt(hash lntypes.Hash, +func (p *KVStore) SettleAttempt(_ context.Context, hash lntypes.Hash, attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) { var b bytes.Buffer diff --git a/payments/db/kv_store_test.go b/payments/db/kv_store_test.go index 0c51dccf59c..fb7a2757338 100644 --- a/payments/db/kv_store_test.go +++ b/payments/db/kv_store_test.go @@ -133,7 +133,7 @@ func TestKVStoreDeleteNonInFlight(t *testing.T) { case p.success: // Verifies that status was changed to StatusSucceeded. _, err := paymentDB.SettleAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCSettleInfo{ Preimage: preimg, }, diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 8e2c7a47c82..159f9709285 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -198,7 +198,7 @@ func createTestPayments(t *testing.T, p DB, payments []*payment) { // Settle the attempt case StatusSucceeded: _, err := p.SettleAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCSettleInfo{ Preimage: preimg, }, @@ -1643,6 +1643,7 @@ func TestSuccessesWithoutInFlight(t *testing.T) { // Attempt to complete the payment should fail. _, err = paymentDB.SettleAttempt( + t.Context(), info.PaymentIdentifier, 0, &HTLCSettleInfo{ Preimage: preimg, @@ -1790,7 +1791,7 @@ func TestSwitchDoubleSend(t *testing.T) { // After settling, the error should be ErrAlreadyPaid. _, err = paymentDB.SettleAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCSettleInfo{ Preimage: preimg, }, @@ -1926,7 +1927,7 @@ func TestSwitchFail(t *testing.T) { // Settle the attempt and verify that status was changed to // StatusSucceeded. payment, err = paymentDB.SettleAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCSettleInfo{ Preimage: preimg, }, @@ -2099,7 +2100,7 @@ func TestMultiShard(t *testing.T) { var firstFailReason *FailureReason if test.settleFirst { _, err := paymentDB.SettleAttempt( - info.PaymentIdentifier, a.AttemptID, + ctx, info.PaymentIdentifier, a.AttemptID, &HTLCSettleInfo{ Preimage: preimg, }, @@ -2193,7 +2194,7 @@ func TestMultiShard(t *testing.T) { if test.settleLast { // Settle the last outstanding attempt. _, err = paymentDB.SettleAttempt( - info.PaymentIdentifier, a.AttemptID, + ctx, info.PaymentIdentifier, a.AttemptID, &HTLCSettleInfo{ Preimage: preimg, }, @@ -2683,7 +2684,7 @@ func TestQueryPayments(t *testing.T) { copy(preimg[:], rev[:]) _, err = paymentDB.SettleAttempt( - lastPaymentInfo.PaymentIdentifier, + ctx, lastPaymentInfo.PaymentIdentifier, attempt.AttemptID, &HTLCSettleInfo{ Preimage: preimg, @@ -2813,7 +2814,7 @@ func TestFetchInFlightPayments(t *testing.T) { require.NoError(t, err) _, err = paymentDB.SettleAttempt( - payments[2].id, 5, + ctx, payments[2].id, 5, &HTLCSettleInfo{ Preimage: preimg, }, diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index 8c963d360b3..ca5add1d156 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1496,8 +1496,9 @@ func (s *SQLStore) insertRouteHops(ctx context.Context, db SQLQueries, // the PaymentWriter interface and ultimately the DB interface. It represents // step 2 in the payment lifecycle control flow, called after InitPayment and // potentially multiple times for multi-path payments. -func (s *SQLStore) RegisterAttempt(ctx context.Context, paymentHash lntypes.Hash, - attempt *HTLCAttemptInfo) (*MPPayment, error) { +func (s *SQLStore) RegisterAttempt(ctx context.Context, + paymentHash lntypes.Hash, attempt *HTLCAttemptInfo) (*MPPayment, + error) { var mpPayment *MPPayment @@ -1621,11 +1622,9 @@ func (s *SQLStore) RegisterAttempt(ctx context.Context, paymentHash lntypes.Hash // the PaymentWriter interface and ultimately the DB interface. It represents // step 3a in the payment lifecycle control flow (step 3b is FailAttempt), // called after RegisterAttempt when an HTLC successfully completes. -func (s *SQLStore) SettleAttempt(paymentHash lntypes.Hash, +func (s *SQLStore) SettleAttempt(ctx context.Context, paymentHash lntypes.Hash, attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) { - ctx := context.TODO() - var mpPayment *MPPayment err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { diff --git a/routing/control_tower.go b/routing/control_tower.go index 28432d73224..163aec3e753 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -44,8 +44,8 @@ type ControlTower interface { // for record keeping. // // NOTE: Subscribers should be notified by the new state of the payment. - SettleAttempt(lntypes.Hash, uint64, *paymentsdb.HTLCSettleInfo) ( - *paymentsdb.HTLCAttempt, error) + SettleAttempt(context.Context, lntypes.Hash, uint64, + *paymentsdb.HTLCSettleInfo) (*paymentsdb.HTLCAttempt, error) // FailAttempt marks the given payment attempt failed. // @@ -217,14 +217,17 @@ func (p *controlTower) RegisterAttempt(ctx context.Context, // SettleAttempt marks the given attempt settled with the preimage. If // this is a multi shard payment, this might implicitly mean the the // full payment succeeded. -func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash, - attemptID uint64, settleInfo *paymentsdb.HTLCSettleInfo) ( - *paymentsdb.HTLCAttempt, error) { +func (p *controlTower) SettleAttempt(ctx context.Context, + paymentHash lntypes.Hash, attemptID uint64, + settleInfo *paymentsdb.HTLCSettleInfo) (*paymentsdb.HTLCAttempt, + error) { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) - payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo) + payment, err := p.db.SettleAttempt( + ctx, paymentHash, attemptID, settleInfo, + ) if err != nil { return nil, err } diff --git a/routing/control_tower_test.go b/routing/control_tower_test.go index 20bdd17564f..20e8e82e42a 100644 --- a/routing/control_tower_test.go +++ b/routing/control_tower_test.go @@ -108,7 +108,8 @@ func TestControlTowerSubscribeSuccess(t *testing.T) { Preimage: preimg, } htlcAttempt, err := pControl.SettleAttempt( - info.PaymentIdentifier, attempt.AttemptID, &settleInfo, + t.Context(), info.PaymentIdentifier, attempt.AttemptID, + &settleInfo, ) if err != nil { t.Fatal(err) @@ -246,7 +247,8 @@ func TestKVStoreSubscribeAllSuccess(t *testing.T) { Preimage: preimg1, } htlcAttempt1, err := pControl.SettleAttempt( - info1.PaymentIdentifier, attempt1.AttemptID, &settleInfo1, + t.Context(), info1.PaymentIdentifier, attempt1.AttemptID, + &settleInfo1, ) require.NoError(t, err) require.Equal( @@ -259,7 +261,8 @@ func TestKVStoreSubscribeAllSuccess(t *testing.T) { Preimage: preimg2, } htlcAttempt2, err := pControl.SettleAttempt( - info2.PaymentIdentifier, attempt2.AttemptID, &settleInfo2, + t.Context(), info2.PaymentIdentifier, attempt2.AttemptID, + &settleInfo2, ) require.NoError(t, err) require.Equal( diff --git a/routing/mock_test.go b/routing/mock_test.go index daad344fdf8..77f98ab017d 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -408,8 +408,8 @@ func (m *mockControlTowerOld) RegisterAttempt(_ context.Context, return nil } -func (m *mockControlTowerOld) SettleAttempt(phash lntypes.Hash, - pid uint64, settleInfo *paymentsdb.HTLCSettleInfo) ( +func (m *mockControlTowerOld) SettleAttempt(_ context.Context, + phash lntypes.Hash, pid uint64, settleInfo *paymentsdb.HTLCSettleInfo) ( *paymentsdb.HTLCAttempt, error) { if m.settleAttempt != nil { @@ -753,7 +753,7 @@ func (m *mockControlTower) RegisterAttempt(_ context.Context, return args.Error(0) } -func (m *mockControlTower) SettleAttempt(phash lntypes.Hash, +func (m *mockControlTower) SettleAttempt(_ context.Context, phash lntypes.Hash, pid uint64, settleInfo *paymentsdb.HTLCSettleInfo) ( *paymentsdb.HTLCAttempt, error) { diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 0499475416d..a2e3935a0b1 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -1166,6 +1166,8 @@ func (p *paymentLifecycle) reloadPayment() (paymentsdb.DBMPPayment, func (p *paymentLifecycle) handleAttemptResult(attempt *paymentsdb.HTLCAttempt, result *htlcswitch.PaymentResult) (*attemptResult, error) { + ctx := context.TODO() + // If the result has an error, we need to further process it by failing // the attempt and maybe fail the payment. if result.Error != nil { @@ -1187,7 +1189,7 @@ func (p *paymentLifecycle) handleAttemptResult(attempt *paymentsdb.HTLCAttempt, // In case of success we atomically store settle result to the DB and // move the shard to the settled state. htlcAttempt, err := p.router.cfg.Control.SettleAttempt( - p.identifier, attempt.AttemptID, + ctx, p.identifier, attempt.AttemptID, &paymentsdb.HTLCSettleInfo{ Preimage: result.Preimage, SettleTime: p.router.cfg.Clock.Now(), From 814285f8ea9e44cbb15461c7e7c29ef09fdd278c Mon Sep 17 00:00:00 2001 From: ziggie Date: Tue, 21 Oct 2025 09:02:22 +0200 Subject: [PATCH 08/11] multi: thread context through FailAttempt --- payments/db/interface.go | 3 ++- payments/db/kv_store.go | 2 +- payments/db/kv_store_test.go | 2 +- payments/db/payment_test.go | 12 ++++++------ payments/db/sql_store.go | 4 +--- routing/control_tower.go | 12 ++++++------ routing/control_tower_test.go | 6 ++++-- routing/mock_test.go | 10 ++++++---- routing/payment_lifecycle.go | 4 +++- routing/router.go | 4 +++- 10 files changed, 33 insertions(+), 26 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 452a7e5f79a..45d0e9a8a6b 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -90,7 +90,8 @@ type PaymentControl interface { *HTLCSettleInfo) (*MPPayment, error) // FailAttempt marks the given payment attempt failed. - FailAttempt(lntypes.Hash, uint64, *HTLCFailInfo) (*MPPayment, error) + FailAttempt(context.Context, lntypes.Hash, uint64, + *HTLCFailInfo) (*MPPayment, error) // Fail transitions a payment into the Failed state, and records // the ultimate reason the payment failed. Note that this should only diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 3739232639c..59fe24f36ed 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -443,7 +443,7 @@ func (p *KVStore) SettleAttempt(_ context.Context, hash lntypes.Hash, } // FailAttempt marks the given payment attempt failed. -func (p *KVStore) FailAttempt(hash lntypes.Hash, +func (p *KVStore) FailAttempt(_ context.Context, hash lntypes.Hash, attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) { var b bytes.Buffer diff --git a/payments/db/kv_store_test.go b/payments/db/kv_store_test.go index fb7a2757338..ee8412a6fb3 100644 --- a/payments/db/kv_store_test.go +++ b/payments/db/kv_store_test.go @@ -100,7 +100,7 @@ func TestKVStoreDeleteNonInFlight(t *testing.T) { // Fail the payment attempt. htlcFailure := HTLCFailUnreadable _, err := paymentDB.FailAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCFailInfo{ Reason: htlcFailure, }, diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 159f9709285..879dfb12448 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -156,7 +156,7 @@ func createTestPayments(t *testing.T, p DB, payments []*payment) { htlcFailure := HTLCFailUnreadable _, err = p.FailAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCFailInfo{ Reason: htlcFailure, }, @@ -183,7 +183,7 @@ func createTestPayments(t *testing.T, p DB, payments []*payment) { case StatusFailed: htlcFailure := HTLCFailUnreadable _, err = p.FailAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCFailInfo{ Reason: htlcFailure, }, @@ -1885,7 +1885,7 @@ func TestSwitchFail(t *testing.T) { htlcReason := HTLCFailUnreadable _, err = paymentDB.FailAttempt( - info.PaymentIdentifier, attempt.AttemptID, + ctx, info.PaymentIdentifier, attempt.AttemptID, &HTLCFailInfo{ Reason: htlcReason, }, @@ -2069,7 +2069,7 @@ func TestMultiShard(t *testing.T) { a := attempts[1] htlcFail := HTLCFailUnreadable _, err = paymentDB.FailAttempt( - info.PaymentIdentifier, a.AttemptID, + ctx, info.PaymentIdentifier, a.AttemptID, &HTLCFailInfo{ Reason: htlcFail, }, @@ -2118,7 +2118,7 @@ func TestMultiShard(t *testing.T) { ) } else { _, err := paymentDB.FailAttempt( - info.PaymentIdentifier, a.AttemptID, + ctx, info.PaymentIdentifier, a.AttemptID, &HTLCFailInfo{ Reason: htlcFail, }, @@ -2209,7 +2209,7 @@ func TestMultiShard(t *testing.T) { } else { // Fail the attempt. _, err := paymentDB.FailAttempt( - info.PaymentIdentifier, a.AttemptID, + ctx, info.PaymentIdentifier, a.AttemptID, &HTLCFailInfo{ Reason: htlcFail, }, diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index ca5add1d156..a921a12335b 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1695,11 +1695,9 @@ func (s *SQLStore) SettleAttempt(ctx context.Context, paymentHash lntypes.Hash, // the PaymentWriter interface and ultimately the DB interface. It represents // step 3b in the payment lifecycle control flow (step 3a is SettleAttempt), // called after RegisterAttempt when an HTLC fails. -func (s *SQLStore) FailAttempt(paymentHash lntypes.Hash, +func (s *SQLStore) FailAttempt(ctx context.Context, paymentHash lntypes.Hash, attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) { - ctx := context.TODO() - var mpPayment *MPPayment err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { diff --git a/routing/control_tower.go b/routing/control_tower.go index 163aec3e753..cbb79d4c79a 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -50,8 +50,8 @@ type ControlTower interface { // FailAttempt marks the given payment attempt failed. // // NOTE: Subscribers should be notified by the new state of the payment. - FailAttempt(lntypes.Hash, uint64, *paymentsdb.HTLCFailInfo) ( - *paymentsdb.HTLCAttempt, error) + FailAttempt(context.Context, lntypes.Hash, uint64, + *paymentsdb.HTLCFailInfo) (*paymentsdb.HTLCAttempt, error) // FetchPayment fetches the payment corresponding to the given payment // hash. @@ -239,14 +239,14 @@ func (p *controlTower) SettleAttempt(ctx context.Context, } // FailAttempt marks the given payment attempt failed. -func (p *controlTower) FailAttempt(paymentHash lntypes.Hash, - attemptID uint64, failInfo *paymentsdb.HTLCFailInfo) ( - *paymentsdb.HTLCAttempt, error) { +func (p *controlTower) FailAttempt(ctx context.Context, + paymentHash lntypes.Hash, attemptID uint64, + failInfo *paymentsdb.HTLCFailInfo) (*paymentsdb.HTLCAttempt, error) { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) - payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo) + payment, err := p.db.FailAttempt(ctx, paymentHash, attemptID, failInfo) if err != nil { return nil, err } diff --git a/routing/control_tower_test.go b/routing/control_tower_test.go index 20e8e82e42a..5241d814dff 100644 --- a/routing/control_tower_test.go +++ b/routing/control_tower_test.go @@ -448,7 +448,8 @@ func TestKVStoreUnsubscribeSuccess(t *testing.T) { Reason: paymentsdb.HTLCFailInternal, } _, err = pControl.FailAttempt( - info.PaymentIdentifier, attempt.AttemptID, &failInfo, + t.Context(), info.PaymentIdentifier, attempt.AttemptID, + &failInfo, ) require.NoError(t, err, "unable to fail htlc") @@ -502,7 +503,8 @@ func testKVStoreSubscribeFail(t *testing.T, registerAttempt, Reason: paymentsdb.HTLCFailInternal, } htlcAttempt, err := pControl.FailAttempt( - info.PaymentIdentifier, attempt.AttemptID, &failInfo, + t.Context(), info.PaymentIdentifier, attempt.AttemptID, + &failInfo, ) if err != nil { t.Fatalf("unable to fail htlc: %v", err) diff --git a/routing/mock_test.go b/routing/mock_test.go index 77f98ab017d..f10c38ad0d4 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -451,8 +451,9 @@ func (m *mockControlTowerOld) SettleAttempt(_ context.Context, return nil, fmt.Errorf("pid not found") } -func (m *mockControlTowerOld) FailAttempt(phash lntypes.Hash, pid uint64, - failInfo *paymentsdb.HTLCFailInfo) (*paymentsdb.HTLCAttempt, error) { +func (m *mockControlTowerOld) FailAttempt(_ context.Context, phash lntypes.Hash, + pid uint64, failInfo *paymentsdb.HTLCFailInfo) (*paymentsdb.HTLCAttempt, + error) { if m.failAttempt != nil { m.failAttempt <- failAttemptArgs{failInfo} @@ -767,8 +768,9 @@ func (m *mockControlTower) SettleAttempt(_ context.Context, phash lntypes.Hash, return attempt.(*paymentsdb.HTLCAttempt), args.Error(1) } -func (m *mockControlTower) FailAttempt(phash lntypes.Hash, pid uint64, - failInfo *paymentsdb.HTLCFailInfo) (*paymentsdb.HTLCAttempt, error) { +func (m *mockControlTower) FailAttempt(_ context.Context, phash lntypes.Hash, + pid uint64, failInfo *paymentsdb.HTLCFailInfo) (*paymentsdb.HTLCAttempt, + error) { args := m.Called(phash, pid, failInfo) diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index a2e3935a0b1..904d399cd13 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -1003,6 +1003,8 @@ func (p *paymentLifecycle) handleFailureMessage(rt *route.Route, func (p *paymentLifecycle) failAttempt(attemptID uint64, sendError error) (*attemptResult, error) { + ctx := context.TODO() + log.Warnf("Attempt %v for payment %v failed: %v", attemptID, p.identifier, sendError) @@ -1019,7 +1021,7 @@ func (p *paymentLifecycle) failAttempt(attemptID uint64, } attempt, err := p.router.cfg.Control.FailAttempt( - p.identifier, attemptID, failInfo, + ctx, p.identifier, attemptID, failInfo, ) if err != nil { return nil, err diff --git a/routing/router.go b/routing/router.go index 55d58e78bda..e48b7b43dbb 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1531,6 +1531,8 @@ func (r *ChannelRouter) resumePayments() error { func (r *ChannelRouter) failStaleAttempt(a paymentsdb.HTLCAttempt, payHash lntypes.Hash) { + ctx := context.TODO() + // We can only fail inflight HTLCs so we skip the settled/failed ones. if a.Failure != nil || a.Settle != nil { return @@ -1614,7 +1616,7 @@ func (r *ChannelRouter) failStaleAttempt(a paymentsdb.HTLCAttempt, Reason: paymentsdb.HTLCFailUnknown, FailTime: r.cfg.Clock.Now(), } - _, err = r.cfg.Control.FailAttempt(payHash, a.AttemptID, failInfo) + _, err = r.cfg.Control.FailAttempt(ctx, payHash, a.AttemptID, failInfo) if err != nil { log.Errorf("Fail attempt=%v got error: %v", a.AttemptID, err) } From 937d7127fae31abcc2612be3a7d76861c4310ca7 Mon Sep 17 00:00:00 2001 From: ziggie Date: Tue, 21 Oct 2025 10:11:32 +0200 Subject: [PATCH 09/11] multi: thread context through Fail payment functions --- payments/db/interface.go | 2 +- payments/db/kv_store.go | 2 +- payments/db/kv_store_test.go | 2 +- payments/db/payment_test.go | 13 +++++++------ payments/db/sql_store.go | 4 +--- routing/control_tower.go | 9 +++++---- routing/control_tower_test.go | 3 ++- routing/mock_test.go | 4 ++-- routing/payment_lifecycle.go | 21 ++++++++++++++++++--- routing/router.go | 4 +++- 10 files changed, 41 insertions(+), 23 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 45d0e9a8a6b..2d2c47b3b70 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -99,7 +99,7 @@ type PaymentControl interface { // invoking this method, InitPayment should return nil on its next call // for this payment hash, allowing the user to make a subsequent // payment. - Fail(lntypes.Hash, FailureReason) (*MPPayment, error) + Fail(context.Context, lntypes.Hash, FailureReason) (*MPPayment, error) // DeleteFailedAttempts removes all failed HTLCs from the db. It should // be called for a given payment whenever all inflight htlcs are diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 59fe24f36ed..285074b51e3 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -528,7 +528,7 @@ func (p *KVStore) updateHtlcKey(paymentHash lntypes.Hash, // payment failed. After invoking this method, InitPayment should return nil on // its next call for this payment hash, allowing the switch to make a // subsequent payment. -func (p *KVStore) Fail(paymentHash lntypes.Hash, +func (p *KVStore) Fail(_ context.Context, paymentHash lntypes.Hash, reason FailureReason) (*MPPayment, error) { var ( diff --git a/payments/db/kv_store_test.go b/payments/db/kv_store_test.go index ee8412a6fb3..de3fc4ad24f 100644 --- a/payments/db/kv_store_test.go +++ b/payments/db/kv_store_test.go @@ -112,7 +112,7 @@ func TestKVStoreDeleteNonInFlight(t *testing.T) { // Fail the payment, which should moved it to Failed. failReason := FailureReasonNoRoute _, err = paymentDB.Fail( - info.PaymentIdentifier, failReason, + ctx, info.PaymentIdentifier, failReason, ) if err != nil { t.Fatalf("unable to fail payment hash: %v", err) diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 879dfb12448..5f1ab69c164 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -191,8 +191,9 @@ func createTestPayments(t *testing.T, p DB, payments []*payment) { require.NoError(t, err, "unable to fail htlc") failReason := FailureReasonNoRoute - _, err = p.Fail(info.PaymentIdentifier, - failReason) + _, err = p.Fail( + ctx, info.PaymentIdentifier, failReason, + ) require.NoError(t, err, "unable to fail payment hash") // Settle the attempt @@ -1667,7 +1668,7 @@ func TestFailsWithoutInFlight(t *testing.T) { // Calling Fail should return an error. _, err = paymentDB.Fail( - info.PaymentIdentifier, FailureReasonNoRoute, + t.Context(), info.PaymentIdentifier, FailureReasonNoRoute, ) require.ErrorIs(t, err, ErrPaymentNotInitiated) } @@ -1843,7 +1844,7 @@ func TestSwitchFail(t *testing.T) { // Fail the payment, which should moved it to Failed. failReason := FailureReasonNoRoute - _, err = paymentDB.Fail(info.PaymentIdentifier, failReason) + _, err = paymentDB.Fail(ctx, info.PaymentIdentifier, failReason) require.NoError(t, err, "unable to fail payment hash") // Verify the status is indeed Failed. @@ -2139,7 +2140,7 @@ func TestMultiShard(t *testing.T) { // a terminal state. failReason := FailureReasonNoRoute _, err = paymentDB.Fail( - info.PaymentIdentifier, failReason, + ctx, info.PaymentIdentifier, failReason, ) if err != nil { t.Fatalf("unable to fail payment hash: %v", err) @@ -2232,7 +2233,7 @@ func TestMultiShard(t *testing.T) { // syncing. failReason := FailureReasonPaymentDetails _, err = paymentDB.Fail( - info.PaymentIdentifier, failReason, + ctx, info.PaymentIdentifier, failReason, ) require.NoError(t, err, "unable to fail") } diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index a921a12335b..a9863b53e30 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1782,11 +1782,9 @@ func (s *SQLStore) FailAttempt(ctx context.Context, paymentHash lntypes.Hash, // This method is part of the PaymentControl interface, which is embedded in // the PaymentWriter interface and ultimately the DB interface. It represents // step 4 in the payment lifecycle control flow. -func (s *SQLStore) Fail(paymentHash lntypes.Hash, +func (s *SQLStore) Fail(ctx context.Context, paymentHash lntypes.Hash, reason FailureReason) (*MPPayment, error) { - ctx := context.TODO() - var mpPayment *MPPayment err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { diff --git a/routing/control_tower.go b/routing/control_tower.go index cbb79d4c79a..b39a378bd47 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -66,7 +66,8 @@ type ControlTower interface { // payment. // // NOTE: Subscribers should be notified by the new state of the payment. - FailPayment(lntypes.Hash, paymentsdb.FailureReason) error + FailPayment(context.Context, lntypes.Hash, + paymentsdb.FailureReason) error // FetchInFlightPayments returns all payments with status InFlight. FetchInFlightPayments(ctx context.Context) ([]*paymentsdb.MPPayment, @@ -272,13 +273,13 @@ func (p *controlTower) FetchPayment(ctx context.Context, // // NOTE: This method will overwrite the failure reason if the payment is already // failed. -func (p *controlTower) FailPayment(paymentHash lntypes.Hash, - reason paymentsdb.FailureReason) error { +func (p *controlTower) FailPayment(ctx context.Context, + paymentHash lntypes.Hash, reason paymentsdb.FailureReason) error { p.paymentsMtx.Lock(paymentHash) defer p.paymentsMtx.Unlock(paymentHash) - payment, err := p.db.Fail(paymentHash, reason) + payment, err := p.db.Fail(ctx, paymentHash, reason) if err != nil { return err } diff --git a/routing/control_tower_test.go b/routing/control_tower_test.go index 5241d814dff..c9e8f485733 100644 --- a/routing/control_tower_test.go +++ b/routing/control_tower_test.go @@ -516,7 +516,8 @@ func testKVStoreSubscribeFail(t *testing.T, registerAttempt, // Mark the payment as failed. err = pControl.FailPayment( - info.PaymentIdentifier, paymentsdb.FailureReasonTimeout, + t.Context(), info.PaymentIdentifier, + paymentsdb.FailureReasonTimeout, ) if err != nil { t.Fatal(err) diff --git a/routing/mock_test.go b/routing/mock_test.go index f10c38ad0d4..e72b392496f 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -491,7 +491,7 @@ func (m *mockControlTowerOld) FailAttempt(_ context.Context, phash lntypes.Hash, return nil, fmt.Errorf("pid not found") } -func (m *mockControlTowerOld) FailPayment(phash lntypes.Hash, +func (m *mockControlTowerOld) FailPayment(_ context.Context, phash lntypes.Hash, reason paymentsdb.FailureReason) error { m.Lock() @@ -782,7 +782,7 @@ func (m *mockControlTower) FailAttempt(_ context.Context, phash lntypes.Hash, return attempt.(*paymentsdb.HTLCAttempt), args.Error(1) } -func (m *mockControlTower) FailPayment(phash lntypes.Hash, +func (m *mockControlTower) FailPayment(_ context.Context, phash lntypes.Hash, reason paymentsdb.FailureReason) error { args := m.Called(phash, reason) diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 904d399cd13..37dbd1c8ab4 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -364,11 +364,18 @@ func (p *paymentLifecycle) checkContext(ctx context.Context) error { p.identifier.String()) } + // The context is already cancelled at this point, so we create + // a new context so the payment can successfully be marked as + // failed. + cleanupCtx := context.WithoutCancel(ctx) + // By marking the payment failed, depending on whether it has // inflight HTLCs or not, its status will now either be // `StatusInflight` or `StatusFailed`. In either case, no more // HTLCs will be attempted. - err := p.router.cfg.Control.FailPayment(p.identifier, reason) + err := p.router.cfg.Control.FailPayment( + cleanupCtx, p.identifier, reason, + ) if err != nil { return fmt.Errorf("FailPayment got %w", err) } @@ -389,6 +396,8 @@ func (p *paymentLifecycle) checkContext(ctx context.Context) error { func (p *paymentLifecycle) requestRoute( ps *paymentsdb.MPPaymentState) (*route.Route, error) { + ctx := context.TODO() + remainingFees := p.calcFeeBudget(ps.FeesPaid) // Query our payment session to construct a route. @@ -430,7 +439,9 @@ func (p *paymentLifecycle) requestRoute( log.Warnf("Marking payment %v permanently failed with no route: %v", p.identifier, failureCode) - err = p.router.cfg.Control.FailPayment(p.identifier, failureCode) + err = p.router.cfg.Control.FailPayment( + ctx, p.identifier, failureCode, + ) if err != nil { return nil, fmt.Errorf("FailPayment got: %w", err) } @@ -800,6 +811,8 @@ func (p *paymentLifecycle) failPaymentAndAttempt( attemptID uint64, reason *paymentsdb.FailureReason, sendErr error) (*attemptResult, error) { + ctx := context.TODO() + log.Errorf("Payment %v failed: final_outcome=%v, raw_err=%v", p.identifier, *reason, sendErr) @@ -808,7 +821,9 @@ func (p *paymentLifecycle) failPaymentAndAttempt( // NOTE: we must fail the payment first before failing the attempt. // Otherwise, once the attempt is marked as failed, another goroutine // might make another attempt while we are failing the payment. - err := p.router.cfg.Control.FailPayment(p.identifier, *reason) + err := p.router.cfg.Control.FailPayment( + ctx, p.identifier, *reason, + ) if err != nil { log.Errorf("Unable to fail payment: %v", err) return nil, err diff --git a/routing/router.go b/routing/router.go index e48b7b43dbb..acd572e6380 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1088,7 +1088,9 @@ func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route, return nil } - return r.cfg.Control.FailPayment(paymentIdentifier, reason) + return r.cfg.Control.FailPayment( + ctx, paymentIdentifier, reason, + ) } log.Debugf("SendToRoute for payment %v with skipTempErr=%v", From 08db1c462400ef9560d5ba0280200aadc513d2a7 Mon Sep 17 00:00:00 2001 From: ziggie Date: Tue, 21 Oct 2025 10:18:02 +0200 Subject: [PATCH 10/11] multi: thread context through DeleteFailedAttempts --- payments/db/interface.go | 2 +- payments/db/kv_store.go | 6 ++++-- payments/db/payment_test.go | 26 +++++++++++++++++++------- payments/db/sql_store.go | 4 ++-- routing/control_tower.go | 8 +++++--- routing/mock_test.go | 8 ++++++-- routing/payment_lifecycle.go | 8 +++++++- 7 files changed, 44 insertions(+), 18 deletions(-) diff --git a/payments/db/interface.go b/payments/db/interface.go index 2d2c47b3b70..6edaa7f45b5 100644 --- a/payments/db/interface.go +++ b/payments/db/interface.go @@ -104,7 +104,7 @@ type PaymentControl interface { // DeleteFailedAttempts removes all failed HTLCs from the db. It should // be called for a given payment whenever all inflight htlcs are // completed, and the payment has reached a final terminal state. - DeleteFailedAttempts(lntypes.Hash) error + DeleteFailedAttempts(context.Context, lntypes.Hash) error } // DBMPPayment is an interface that represents the payment state during a diff --git a/payments/db/kv_store.go b/payments/db/kv_store.go index 285074b51e3..0ce0601e498 100644 --- a/payments/db/kv_store.go +++ b/payments/db/kv_store.go @@ -290,12 +290,14 @@ func (p *KVStore) InitPayment(_ context.Context, paymentHash lntypes.Hash, // DeleteFailedAttempts deletes all failed htlcs for a payment if configured // by the KVStore db. -func (p *KVStore) DeleteFailedAttempts(hash lntypes.Hash) error { +func (p *KVStore) DeleteFailedAttempts(ctx context.Context, + hash lntypes.Hash) error { + // TODO(ziggie): Refactor to not mix application logic with database // logic. This decision should be made in the application layer. if !p.keepFailedPaymentAttempts { const failedHtlcsOnly = true - err := p.DeletePayment(context.TODO(), hash, failedHtlcsOnly) + err := p.DeletePayment(ctx, hash, failedHtlcsOnly) if err != nil { return err } diff --git a/payments/db/payment_test.go b/payments/db/payment_test.go index 5f1ab69c164..25aafbb5464 100644 --- a/payments/db/payment_test.go +++ b/payments/db/payment_test.go @@ -503,7 +503,9 @@ func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) { // Calling DeleteFailedAttempts on a failed payment should delete all // HTLCs. - require.NoError(t, paymentDB.DeleteFailedAttempts(payments[0].id)) + require.NoError(t, paymentDB.DeleteFailedAttempts( + t.Context(), payments[0].id, + )) // Expect all HTLCs to be deleted if the config is set to delete them. if !keepFailedPaymentAttempts { @@ -518,11 +520,15 @@ func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) { // operation are performed in general therefore we do NOT expect an // error in this case. if keepFailedPaymentAttempts { - require.NoError(t, paymentDB.DeleteFailedAttempts( - payments[1].id), + err := paymentDB.DeleteFailedAttempts( + t.Context(), payments[1].id, ) + require.NoError(t, err) } else { - require.Error(t, paymentDB.DeleteFailedAttempts(payments[1].id)) + err := paymentDB.DeleteFailedAttempts( + t.Context(), payments[1].id, + ) + require.Error(t, err) } // Since DeleteFailedAttempts returned an error, we should expect the @@ -530,7 +536,9 @@ func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) { assertDBPayments(t, paymentDB, payments) // Cleaning up a successful payment should remove failed htlcs. - require.NoError(t, paymentDB.DeleteFailedAttempts(payments[2].id)) + require.NoError(t, paymentDB.DeleteFailedAttempts( + t.Context(), payments[2].id, + )) // Expect all HTLCs except for the settled one to be deleted if the // config is set to delete them. @@ -547,13 +555,17 @@ func testDeleteFailedAttempts(t *testing.T, keepFailedPaymentAttempts bool) { // payments, if the control tower is configured to keep failed // HTLCs. require.NoError( - t, paymentDB.DeleteFailedAttempts(lntypes.ZeroHash), + t, paymentDB.DeleteFailedAttempts( + t.Context(), lntypes.ZeroHash, + ), ) } else { // Attempting to cleanup a non-existent payment returns an // error. require.Error( - t, paymentDB.DeleteFailedAttempts(lntypes.ZeroHash), + t, paymentDB.DeleteFailedAttempts( + t.Context(), lntypes.ZeroHash, + ), ) } } diff --git a/payments/db/sql_store.go b/payments/db/sql_store.go index a9863b53e30..d23e80895f9 100644 --- a/payments/db/sql_store.go +++ b/payments/db/sql_store.go @@ -1106,8 +1106,8 @@ func (s *SQLStore) FetchInFlightPayments(ctx context.Context) ([]*MPPayment, // the final step (step 5) in the payment lifecycle control flow and should be // called after a payment reaches a terminal state (succeeded or permanently // failed) to clean up historical failed attempts. -func (s *SQLStore) DeleteFailedAttempts(paymentHash lntypes.Hash) error { - ctx := context.TODO() +func (s *SQLStore) DeleteFailedAttempts(ctx context.Context, + paymentHash lntypes.Hash) error { // In case we are configured to keep failed payment attempts, we exit // early. diff --git a/routing/control_tower.go b/routing/control_tower.go index b39a378bd47..1c246f17d9b 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -26,7 +26,7 @@ type ControlTower interface { // DeleteFailedAttempts removes all failed HTLCs from the db. It should // be called for a given payment whenever all inflight htlcs are // completed, and the payment has reached a final settled state. - DeleteFailedAttempts(lntypes.Hash) error + DeleteFailedAttempts(context.Context, lntypes.Hash) error // RegisterAttempt atomically records the provided HTLCAttemptInfo. // @@ -192,8 +192,10 @@ func (p *controlTower) InitPayment(ctx context.Context, // DeleteFailedAttempts deletes all failed htlcs if the payment was // successfully settled. -func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error { - return p.db.DeleteFailedAttempts(paymentHash) +func (p *controlTower) DeleteFailedAttempts(ctx context.Context, + paymentHash lntypes.Hash) error { + + return p.db.DeleteFailedAttempts(ctx, paymentHash) } // RegisterAttempt atomically records the provided HTLCAttemptInfo to the diff --git a/routing/mock_test.go b/routing/mock_test.go index e72b392496f..472f1261623 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -328,7 +328,9 @@ func (m *mockControlTowerOld) InitPayment(_ context.Context, return nil } -func (m *mockControlTowerOld) DeleteFailedAttempts(phash lntypes.Hash) error { +func (m *mockControlTowerOld) DeleteFailedAttempts(_ context.Context, + phash lntypes.Hash) error { + p, ok := m.payments[phash] if !ok { return paymentsdb.ErrPaymentNotInitiated @@ -742,7 +744,9 @@ func (m *mockControlTower) InitPayment(_ context.Context, phash lntypes.Hash, return args.Error(0) } -func (m *mockControlTower) DeleteFailedAttempts(phash lntypes.Hash) error { +func (m *mockControlTower) DeleteFailedAttempts(_ context.Context, + phash lntypes.Hash) error { + args := m.Called(phash) return args.Error(0) } diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 37dbd1c8ab4..6405e850687 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -190,6 +190,10 @@ func (p *paymentLifecycle) decideNextStep( func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte, *route.Route, error) { + // We need to make sure we can still do db operations after the context + // is cancelled. + cleanupCtx := context.WithoutCancel(ctx) + // When the payment lifecycle loop exits, we make sure to signal any // sub goroutine of the HTLC attempt to exit, then wait for them to // return. @@ -328,7 +332,9 @@ lifecycle: // Optionally delete the failed attempts from the database. Depends on // the database options deleting attempts is not allowed so this will // just be a no-op. - err = p.router.cfg.Control.DeleteFailedAttempts(p.identifier) + err = p.router.cfg.Control.DeleteFailedAttempts( + cleanupCtx, p.identifier, + ) if err != nil { log.Errorf("Error deleting failed htlc attempts for payment "+ "%v: %v", p.identifier, err) From 736f6452e270c1d897421f223c3343c080ad4215 Mon Sep 17 00:00:00 2001 From: ziggie Date: Tue, 21 Oct 2025 10:42:44 +0200 Subject: [PATCH 11/11] docs: add release notes --- docs/release-notes/release-notes-0.21.0.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index 436c027c169..1057ccc47e4 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -83,6 +83,9 @@ functions](https://github.com/lightningnetwork/lnd/pull/10368) * Finalize SQL payments implementation [enabling unit and itests for SQL backend](https://github.com/lightningnetwork/lnd/pull/10292) + * [Thread context through payment + db functions Part 1](https://github.com/lightningnetwork/lnd/pull/10307) + ## Code Health