Skip to content

Commit cec8076

Browse files
committed
assets+loopdb: handle asset deposit expiry and timeout sweep
With this commit, asset deposit expiry is checked on each new block. When a deposit expires, a timeout sweep is published. The deposit state is updated both when the sweep is first published and again upon its confirmation.
1 parent 266bdb8 commit cec8076

File tree

5 files changed

+296
-0
lines changed

5 files changed

+296
-0
lines changed

assets/deposit/manager.go

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightninglabs/taproot-assets/rpcutils"
2121
"github.com/lightninglabs/taproot-assets/taprpc"
2222
"github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
23+
"google.golang.org/protobuf/proto"
2324
)
2425

2526
var (
@@ -31,6 +32,10 @@ var (
3132
// shutting down and that no further calls should be made to it.
3233
ErrManagerShuttingDown = errors.New("asset deposit manager is " +
3334
"shutting down")
35+
36+
// lockExpiration us the expiration time we use for sweep fee
37+
// paying inputs.
38+
lockExpiration = time.Hour * 24
3439
)
3540

3641
// DepositUpdateCallback is a callback that is called when a deposit state is
@@ -67,6 +72,10 @@ type Manager struct {
6772
// currentHeight is the current block height of the chain.
6873
currentHeight uint32
6974

75+
// pendingSweeps is a map of all pending timeout sweeps. The key is the
76+
// deposit ID.
77+
pendingSweeps map[string]struct{}
78+
7079
// deposits is a map of all active deposits. The key is the deposit ID.
7180
deposits map[string]*Deposit
7281

@@ -114,6 +123,7 @@ func NewManager(depositServiceClient swapserverrpc.AssetDepositServiceClient,
114123
sweeper: sweeper,
115124
addressParams: addressParams,
116125
deposits: make(map[string]*Deposit),
126+
pendingSweeps: make(map[string]struct{}),
117127
subscribers: make(map[string][]DepositUpdateCallback),
118128
callEnter: make(chan struct{}),
119129
callLeave: make(chan struct{}),
@@ -216,6 +226,43 @@ func (m *Manager) criticalError(err error) {
216226

217227
// handleBlockEpoch is called when a new block is added to the chain.
218228
func (m *Manager) handleBlockEpoch(ctx context.Context, height uint32) error {
229+
for _, d := range m.deposits {
230+
if d.State != StateConfirmed {
231+
continue
232+
}
233+
234+
log.Debugf("Checking if deposit %v is expired, expiry=%v", d.ID,
235+
d.ConfirmationHeight+d.CsvExpiry)
236+
237+
if height < d.ConfirmationHeight+d.CsvExpiry {
238+
continue
239+
}
240+
241+
err := m.handleDepositExpired(ctx, d)
242+
if err != nil {
243+
log.Errorf("Unable to update deposit %v state: %v",
244+
d.ID, err)
245+
246+
return err
247+
}
248+
}
249+
250+
// Now publish the timeout sweeps for all expired deposits and also
251+
// move them to the pending sweeps map.
252+
for _, d := range m.deposits {
253+
// TODO(bhandras): republish will insert a new transfer entry in
254+
// tapd, despite the transfer already existing. To avoid that,
255+
// we won't re-publish the timeout sweep for now.
256+
if d.State != StateExpired {
257+
continue
258+
}
259+
260+
err := m.publishTimeoutSweep(ctx, d)
261+
if err != nil {
262+
return err
263+
}
264+
}
265+
219266
return nil
220267
}
221268

@@ -699,3 +746,212 @@ func (m *Manager) ListDeposits(ctx context.Context, minConfs, maxConfs uint32) (
699746

700747
return filteredDeposits, nil
701748
}
749+
750+
// handleDepositStateUpdate updates the deposit state in the store and
751+
// notifies all subscribers of the deposit state change.
752+
func (m *Manager) handleDepositExpired(ctx context.Context, d *Deposit) error {
753+
d.State = StateExpired
754+
err := d.GenerateSweepKeys(ctx, m.tapClient)
755+
if err != nil {
756+
log.Errorf("Unable to generate sweep keys for deposit %v: %v",
757+
d.ID, err)
758+
}
759+
760+
return m.handleDepositStateUpdate(ctx, d)
761+
}
762+
763+
// publishTimeoutSweep publishes a timeout sweep for the deposit. As we use the
764+
// same lock ID for the sponsoring inputs, it's possible to republish the sweep
765+
// however it'll create a new transfer entry in tapd, which we want to avoid
766+
// (for now).
767+
func (m *Manager) publishTimeoutSweep(ctx context.Context, d *Deposit) error {
768+
log.Infof("(Re)publishing timeout sweep for deposit %v", d.ID)
769+
770+
// TODO(bhandras): conf target should be dynamic/configrable.
771+
const confTarget = 2
772+
feeRateSatPerKw, err := m.walletKit.EstimateFeeRate(
773+
ctx, confTarget,
774+
)
775+
if err != nil {
776+
return err
777+
}
778+
779+
lockID, err := d.lockID()
780+
if err != nil {
781+
return err
782+
}
783+
784+
snedResp, err := m.sweeper.PublishDepositTimeoutSweep(
785+
ctx, d.Kit, d.Proof, asset.NewScriptKey(d.SweepScriptKey),
786+
d.SweepInternalKey, d.timeoutSweepLabel(),
787+
feeRateSatPerKw.FeePerVByte(), lockID, lockExpiration,
788+
)
789+
if err != nil {
790+
// TODO(bhandras): handle republish errors.
791+
log.Infof("Unable to publish timeout sweep for deposit %v: %v",
792+
d.ID, err)
793+
} else {
794+
log.Infof("Published timeout sweep for deposit %v: %x", d.ID,
795+
snedResp.Transfer.AnchorTxHash)
796+
797+
// Update deposit state on first successful publish.
798+
if d.State != StateTimeoutSweepPublished {
799+
d.State = StateTimeoutSweepPublished
800+
err = m.handleDepositStateUpdate(ctx, d)
801+
if err != nil {
802+
log.Errorf("Unable to update deposit %v "+
803+
"state: %v", d.ID, err)
804+
805+
return err
806+
}
807+
}
808+
}
809+
810+
// Start monitoring the sweep unless we're already doing so.
811+
if _, ok := m.pendingSweeps[d.ID]; !ok {
812+
err := m.waitForDepositSweep(ctx, d, d.timeoutSweepLabel())
813+
if err != nil {
814+
log.Errorf("Unable to wait for deposit %v spend: %v",
815+
d.ID, err)
816+
817+
return err
818+
}
819+
820+
m.pendingSweeps[d.ID] = struct{}{}
821+
}
822+
823+
return nil
824+
}
825+
826+
// waitForDepositSpend waits for the deposit to be spent. It subscribes to
827+
// receive events for the deposit's sweep address notifying us once the transfer
828+
// has completed.
829+
func (m *Manager) waitForDepositSweep(ctx context.Context, d *Deposit,
830+
label string) error {
831+
832+
log.Infof("Waiting for deposit sweep confirmation %s", d.ID)
833+
834+
eventChan, errChan, err := m.tapClient.WaitForSendComplete(
835+
ctx, d.SweepScriptKey.SerializeCompressed(), label,
836+
)
837+
if err != nil {
838+
log.Errorf("unable to subscribe to send events for deposit "+
839+
"sweep: %v", err,
840+
)
841+
}
842+
843+
go func() {
844+
select {
845+
case event := <-eventChan:
846+
// At this point we can consider the deposit confirmed.
847+
err = m.handleDepositSpend(ctx, d, event.Transfer)
848+
if err != nil {
849+
m.criticalError(err)
850+
}
851+
852+
case err := <-errChan:
853+
m.criticalError(err)
854+
}
855+
}()
856+
857+
return nil
858+
}
859+
860+
func formatProtoJSON(resp proto.Message) (string, error) {
861+
jsonBytes, err := taprpc.ProtoJSONMarshalOpts.Marshal(resp)
862+
if err != nil {
863+
return "", err
864+
}
865+
866+
return string(jsonBytes), nil
867+
}
868+
869+
func toJSON(resp proto.Message) string {
870+
jsonStr, _ := formatProtoJSON(resp)
871+
872+
return jsonStr
873+
}
874+
875+
// handleDepositSpend is called when the deposit is spent. It updates the
876+
// deposit state and releases the inputs used for the deposit sweep.
877+
func (m *Manager) handleDepositSpend(ctx context.Context, d *Deposit,
878+
transfer *taprpc.AssetTransfer) error {
879+
880+
done, err := m.scheduleNextCall()
881+
if err != nil {
882+
log.Errorf("Unable to schedule next call: %v", err)
883+
884+
return err
885+
}
886+
defer done()
887+
888+
switch d.State {
889+
case StateTimeoutSweepPublished:
890+
d.State = StateSwept
891+
892+
err := m.releaseDepositSweepInputs(ctx, d)
893+
if err != nil {
894+
log.Errorf("Unable to release deposit sweep inputs: "+
895+
"%v", err)
896+
897+
return err
898+
}
899+
900+
default:
901+
err := fmt.Errorf("Spent deposit %s in unexpected state %s",
902+
d.ID, d.State)
903+
904+
log.Errorf(err.Error())
905+
906+
return err
907+
}
908+
909+
log.Tracef("Deposit %s spent in transfer: %s\n", d.ID, toJSON(transfer))
910+
911+
// TODO(bhandras): should save the spend details to the store?
912+
err = m.handleDepositStateUpdate(ctx, d)
913+
if err != nil {
914+
return err
915+
}
916+
917+
// Sanity check that the deposit is in the pending sweeps map.
918+
if _, ok := m.pendingSweeps[d.ID]; !ok {
919+
log.Errorf("Deposit %v not found in pending deposits", d.ID)
920+
}
921+
922+
// We can now remove the deposit from the pending sweeps map as we don't
923+
// need to monitor for the spend anymore.
924+
delete(m.pendingSweeps, d.ID)
925+
926+
return nil
927+
}
928+
929+
// releaseDepositSweepInputs releases the inputs that were used for the deposit
930+
// sweep.
931+
func (m *Manager) releaseDepositSweepInputs(ctx context.Context,
932+
d *Deposit) error {
933+
934+
lockID, err := d.lockID()
935+
if err != nil {
936+
return err
937+
}
938+
939+
leases, err := m.walletKit.ListLeases(ctx)
940+
if err != nil {
941+
return err
942+
}
943+
944+
for _, lease := range leases {
945+
if lease.LockID != lockID {
946+
continue
947+
}
948+
949+
// Unlock any UTXOs that were used for the deposit sweep.
950+
err = m.walletKit.ReleaseOutput(ctx, lockID, lease.Outpoint)
951+
if err != nil {
952+
return err
953+
}
954+
}
955+
956+
return nil
957+
}

assets/deposit/sql_store.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type Querier interface {
2929

3030
GetAssetDeposits(ctx context.Context) ([]sqlc.GetAssetDepositsRow,
3131
error)
32+
33+
SetAssetDepositSweepKeys(ctx context.Context,
34+
arg sqlc.SetAssetDepositSweepKeysParams) error
3235
}
3336

3437
// DepositBaseDB is the interface that contains all the queries generated
@@ -127,6 +130,20 @@ func (s *SQLStore) UpdateDeposit(ctx context.Context, d *Deposit) error {
127130
if err != nil {
128131
return err
129132
}
133+
134+
case StateExpired:
135+
scriptKey := d.SweepScriptKey.SerializeCompressed()
136+
internalKey := d.SweepInternalKey.SerializeCompressed()
137+
err := tx.SetAssetDepositSweepKeys(
138+
ctx, sqlc.SetAssetDepositSweepKeysParams{
139+
DepositID: d.ID,
140+
SweepScriptPubkey: scriptKey,
141+
SweepInternalPubkey: internalKey,
142+
},
143+
)
144+
if err != nil {
145+
return err
146+
}
130147
}
131148

132149
return tx.UpdateDepositState(

loopdb/sqlc/asset_deposits.sql.go

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/queries/asset_deposits.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ JOIN asset_deposit_updates u ON u.id = (
3939
LIMIT 1
4040
)
4141
ORDER BY d.created_at ASC;
42+
43+
-- name: SetAssetDepositSweepKeys :exec
44+
UPDATE asset_deposits
45+
SET sweep_script_pubkey = $2, sweep_internal_pubkey = $3
46+
WHERE deposit_id = $1;

0 commit comments

Comments
 (0)