Skip to content

Commit 38b3a77

Browse files
committed
assets+loopdb: handle asset deposit expiry and timeout sweep
With this commit, asset deposit expiry is checked on each new block. When a deposit expires, a timeout sweep is published. The deposit state is updated both when the sweep is first published and again upon its confirmation.
1 parent e629677 commit 38b3a77

File tree

5 files changed

+293
-0
lines changed

5 files changed

+293
-0
lines changed

assets/deposit/manager.go

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightninglabs/taproot-assets/rpcutils"
2121
"github.com/lightninglabs/taproot-assets/taprpc"
2222
"github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
23+
"google.golang.org/protobuf/proto"
2324
)
2425

2526
var (
@@ -31,6 +32,10 @@ var (
3132
// shutting down and that no further calls should be made to it.
3233
ErrManagerShuttingDown = errors.New("asset deposit manager is " +
3334
"shutting down")
35+
36+
// lockExpiration us the expiration time we use for sweep fee
37+
// paying inputs.
38+
lockExpiration = time.Hour * 24
3439
)
3540

3641
// DepositUpdateCallback is a callback that is called when a deposit state is
@@ -67,6 +72,10 @@ type Manager struct {
6772
// currentHeight is the current block height of the chain.
6873
currentHeight uint32
6974

75+
// pendingSweeps is a map of all pending timeout sweeps. The key is the
76+
// deposit ID.
77+
pendingSweeps map[string]struct{}
78+
7079
// deposits is a map of all active deposits. The key is the deposit ID.
7180
deposits map[string]*Deposit
7281

@@ -114,6 +123,7 @@ func NewManager(depositServiceClient swapserverrpc.AssetDepositServiceClient,
114123
sweeper: sweeper,
115124
addressParams: addressParams,
116125
deposits: make(map[string]*Deposit),
126+
pendingSweeps: make(map[string]struct{}),
117127
subscribers: make(map[string][]DepositUpdateCallback),
118128
callEnter: make(chan struct{}),
119129
callLeave: make(chan struct{}),
@@ -217,6 +227,43 @@ func (m *Manager) criticalError(err error) {
217227

218228
// handleBlockEpoch is called when a new block is added to the chain.
219229
func (m *Manager) handleBlockEpoch(ctx context.Context, height uint32) error {
230+
for _, d := range m.deposits {
231+
if d.State != StateConfirmed {
232+
continue
233+
}
234+
235+
log.Debugf("Checking if deposit %v is expired, expiry=%v", d.ID,
236+
d.ConfirmationHeight+d.CsvExpiry)
237+
238+
if height < d.ConfirmationHeight+d.CsvExpiry {
239+
continue
240+
}
241+
242+
err := m.handleDepositExpired(ctx, d)
243+
if err != nil {
244+
log.Errorf("Unable to update deposit %v state: %v",
245+
d.ID, err)
246+
247+
return err
248+
}
249+
}
250+
251+
// Now publish the timeout sweeps for all expired deposits and also
252+
// move them to the pending sweeps map.
253+
for _, d := range m.deposits {
254+
// TODO(bhandras): republish will insert a new transfer entry in
255+
// tapd, despite the transfer already existing. To avoid that,
256+
// we won't re-publish the timeout sweep for now.
257+
if d.State != StateExpired {
258+
continue
259+
}
260+
261+
err := m.publishTimeoutSweep(ctx, d)
262+
if err != nil {
263+
return err
264+
}
265+
}
266+
220267
return nil
221268
}
222269

@@ -700,3 +747,209 @@ func (m *Manager) ListDeposits(ctx context.Context, minConfs, maxConfs uint32) (
700747

701748
return filteredDeposits, nil
702749
}
750+
751+
// handleDepositStateUpdate updates the deposit state in the store and
752+
// notifies all subscribers of the deposit state change.
753+
func (m *Manager) handleDepositExpired(ctx context.Context, d *Deposit) error {
754+
d.State = StateExpired
755+
err := d.GenerateSweepKeys(ctx, m.tapClient)
756+
if err != nil {
757+
log.Errorf("Unable to generate sweep keys for deposit %v: %v",
758+
d.ID, err)
759+
}
760+
761+
return m.handleDepositStateUpdate(ctx, d)
762+
}
763+
764+
// publishTimeoutSweep publishes a timeout sweep for the deposit. As we use the
765+
// same lock ID for the sponsoring inputs, it's possible to republish the sweep
766+
// however it'll create a new transfer entry in tapd, which we want to avoid
767+
// (for now).
768+
func (m *Manager) publishTimeoutSweep(ctx context.Context, d *Deposit) error {
769+
log.Infof("(Re)publishing timeout sweep for deposit %v", d.ID)
770+
771+
// TODO(bhandras): conf target should be dynamic/configrable.
772+
const confTarget = 2
773+
feeRateSatPerKw, err := m.walletKit.EstimateFeeRate(
774+
ctx, confTarget,
775+
)
776+
777+
lockID, err := d.lockID()
778+
if err != nil {
779+
return err
780+
}
781+
782+
snedResp, err := m.sweeper.PublishDepositTimeoutSweep(
783+
ctx, d.Kit, d.Proof, asset.NewScriptKey(d.SweepScriptKey),
784+
d.SweepInternalKey, d.timeoutSweepLabel(),
785+
feeRateSatPerKw.FeePerVByte(), lockID, lockExpiration,
786+
)
787+
if err != nil {
788+
// TOOD(bhandras): handle republish errors.
789+
log.Infof("Unable to publish timeout sweep for deposit %v: %v",
790+
d.ID, err)
791+
} else {
792+
log.Infof("Published timeout sweep for deposit %v: %x", d.ID,
793+
snedResp.Transfer.AnchorTxHash)
794+
795+
// Update deposit state on first successful publish.
796+
if d.State != StateTimeoutSweepPublished {
797+
d.State = StateTimeoutSweepPublished
798+
err = m.handleDepositStateUpdate(ctx, d)
799+
if err != nil {
800+
log.Errorf("Unable to update deposit %v "+
801+
"state: %v", d.ID, err)
802+
803+
return err
804+
}
805+
}
806+
}
807+
808+
// Start monitoring the sweep unless we're already doing so.
809+
if _, ok := m.pendingSweeps[d.ID]; !ok {
810+
err := m.waitForDepositSweep(ctx, d, d.timeoutSweepLabel())
811+
if err != nil {
812+
log.Errorf("Unable to wait for deposit %v spend: %v",
813+
d.ID, err)
814+
815+
return err
816+
}
817+
818+
m.pendingSweeps[d.ID] = struct{}{}
819+
}
820+
821+
return nil
822+
}
823+
824+
// waitForDepositSpend waits for the deposit to be spent. It subscribes to
825+
// receive events for the deposit's sweep address notifying us once the transfer
826+
// has completed.
827+
func (m *Manager) waitForDepositSweep(ctx context.Context, d *Deposit,
828+
label string) error {
829+
830+
log.Infof("Waiting for deposit sweep confirmation %s", d.ID)
831+
832+
eventChan, errChan, err := m.tapClient.WaitForSendComplete(
833+
ctx, d.SweepScriptKey.SerializeCompressed(), label,
834+
)
835+
if err != nil {
836+
log.Errorf("unable to subscribe to send events for deposit "+
837+
"sweep: %v", err,
838+
)
839+
}
840+
841+
go func() {
842+
select {
843+
case event := <-eventChan:
844+
// At this point we can consider the deposit confirmed.
845+
err = m.handleDepositSpend(ctx, d, event.Transfer)
846+
if err != nil {
847+
m.criticalError(err)
848+
}
849+
850+
case err := <-errChan:
851+
m.criticalError(err)
852+
}
853+
}()
854+
855+
return nil
856+
}
857+
858+
func formatProtoJSON(resp proto.Message) (string, error) {
859+
jsonBytes, err := taprpc.ProtoJSONMarshalOpts.Marshal(resp)
860+
if err != nil {
861+
return "", err
862+
}
863+
864+
return string(jsonBytes), nil
865+
}
866+
867+
func toJSON(resp proto.Message) string {
868+
jsonStr, _ := formatProtoJSON(resp)
869+
870+
return jsonStr
871+
}
872+
873+
// handleDepositSpend is called when the deposit is spent. It updates the
874+
// deposit state and releases the inputs used for the deposit sweep.
875+
func (m *Manager) handleDepositSpend(ctx context.Context, d *Deposit,
876+
transfer *taprpc.AssetTransfer) error {
877+
878+
done, err := m.scheduleNextCall()
879+
if err != nil {
880+
log.Errorf("Unable to schedule next call: %v", err)
881+
882+
return err
883+
}
884+
defer done()
885+
886+
switch d.State {
887+
case StateTimeoutSweepPublished:
888+
d.State = StateSwept
889+
890+
err := m.releaseDepositSweepInputs(ctx, d)
891+
if err != nil {
892+
log.Errorf("Unable to release deposit sweep inputs: "+
893+
"%v", err)
894+
895+
return err
896+
}
897+
898+
default:
899+
err := fmt.Errorf("Spent deposit %s in unexpected state %s",
900+
d.ID, d.State)
901+
902+
log.Errorf(err.Error())
903+
904+
return err
905+
}
906+
907+
log.Tracef("Deposit %s spent in transfer: %s\n", d.ID, toJSON(transfer))
908+
909+
// TODO(bhandras): should save the spend details to the store?
910+
err = m.handleDepositStateUpdate(ctx, d)
911+
if err != nil {
912+
return err
913+
}
914+
915+
// Sanity check that the deposit is in the pending sweeps map.
916+
if _, ok := m.pendingSweeps[d.ID]; !ok {
917+
log.Errorf("Deposit %v not found in pending deposits", d.ID)
918+
}
919+
920+
// We can now remove the deposit from the pending sweeps map as we don't
921+
// need to monitor for the spend anymore.
922+
delete(m.pendingSweeps, d.ID)
923+
924+
return nil
925+
}
926+
927+
// releaseDepositSweepInputs releases the inputs that were used for the deposit
928+
// sweep.
929+
func (m *Manager) releaseDepositSweepInputs(ctx context.Context,
930+
d *Deposit) error {
931+
932+
lockID, err := d.lockID()
933+
if err != nil {
934+
return err
935+
}
936+
937+
leases, err := m.walletKit.ListLeases(ctx)
938+
if err != nil {
939+
return err
940+
}
941+
942+
for _, lease := range leases {
943+
if lease.LockID != lockID {
944+
continue
945+
}
946+
947+
// Unlock any UTXOs that were used for the deposit sweep.
948+
err = m.walletKit.ReleaseOutput(ctx, lockID, lease.Outpoint)
949+
if err != nil {
950+
return err
951+
}
952+
}
953+
954+
return nil
955+
}

assets/deposit/sql_store.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type Querier interface {
2929

3030
GetAssetDeposits(ctx context.Context) ([]sqlc.GetAssetDepositsRow,
3131
error)
32+
33+
SetAssetDepositSweepKeys(ctx context.Context,
34+
arg sqlc.SetAssetDepositSweepKeysParams) error
3235
}
3336

3437
// DepositBaseDB is the interface that contains all the queries generated
@@ -127,6 +130,20 @@ func (s *SQLStore) UpdateDeposit(ctx context.Context, d *Deposit) error {
127130
if err != nil {
128131
return err
129132
}
133+
134+
case StateExpired:
135+
scriptKey := d.SweepScriptKey.SerializeCompressed()
136+
internalKey := d.SweepInternalKey.SerializeCompressed()
137+
err := tx.SetAssetDepositSweepKeys(
138+
ctx, sqlc.SetAssetDepositSweepKeysParams{
139+
DepositID: d.ID,
140+
SweepScriptPubkey: scriptKey,
141+
SweepInternalPubkey: internalKey,
142+
},
143+
)
144+
if err != nil {
145+
return err
146+
}
130147
}
131148

132149
return tx.UpdateDepositState(

loopdb/sqlc/asset_deposits.sql.go

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/queries/asset_deposits.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ JOIN asset_deposit_updates u ON u.id = (
3939
LIMIT 1
4040
)
4141
ORDER BY d.created_at ASC;
42+
43+
-- name: SetAssetDepositSweepKeys :exec
44+
UPDATE asset_deposits
45+
SET sweep_script_pubkey = $2, sweep_internal_pubkey = $3
46+
WHERE deposit_id = $1;

0 commit comments

Comments
 (0)