From dba12cdd92fcf6c08c9e5f1822e8c952264fc824 Mon Sep 17 00:00:00 2001 From: JagadishSunilPednekar Date: Mon, 2 Feb 2026 12:34:24 +0530 Subject: [PATCH] feat: Add Transaction helper methods and extreme tx test --- bitcoinutils/transactions.py | 38 ++++++++++++++++++++ tests/test_extreme_txs.py | 30 ++++++++++++++++ tests/test_transaction_helpers.py | 59 +++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+) create mode 100644 tests/test_extreme_txs.py create mode 100644 tests/test_transaction_helpers.py diff --git a/bitcoinutils/transactions.py b/bitcoinutils/transactions.py index 75892a6..5d298db 100644 --- a/bitcoinutils/transactions.py +++ b/bitcoinutils/transactions.py @@ -767,6 +767,44 @@ def get_transaction_digest( return tx_digest + def add_input(self, input: TxInput): + """Adds an input to the transaction (and an empty witness if segwit)""" + self.inputs.append(input) + if self.has_segwit: + self.witnesses.append(TxWitnessInput([])) + + def update_input(self, index: int, input: TxInput): + """Updates an input at the specified index""" + if index < 0 or index >= len(self.inputs): + raise IndexError("Transaction input index out of range") + self.inputs[index] = input + + def remove_input(self, index: int): + """Removes an input at the specified index (and its witness if segwit)""" + if index < 0 or index >= len(self.inputs): + raise IndexError("Transaction input index out of range") + del self.inputs[index] + if self.has_segwit: + # assuming witnesses logic is consistent with inputs length + if index < len(self.witnesses): + del self.witnesses[index] + + def add_output(self, output: TxOutput): + """Adds an output to the transaction""" + self.outputs.append(output) + + def update_output(self, index: int, output: TxOutput): + """Updates an output at the specified index""" + if index < 0 or index >= len(self.outputs): + raise IndexError("Transaction output index out of range") + self.outputs[index] = output + + def remove_output(self, index: int): + """Removes an output at the specified index""" + if index < 0 or index >= len(self.outputs): + raise IndexError("Transaction output index out of range") + del self.outputs[index] + def get_transaction_segwit_digest( self, txin_index: int, script: Script, amount: int, sighash: int = SIGHASH_ALL ): diff --git a/tests/test_extreme_txs.py b/tests/test_extreme_txs.py new file mode 100644 index 0000000..0a52859 --- /dev/null +++ b/tests/test_extreme_txs.py @@ -0,0 +1,30 @@ + +import unittest +from bitcoinutils.transactions import Transaction, TxInput, TxOutput +from bitcoinutils.script import Script + +class TestExtremeTransactions(unittest.TestCase): + def test_many_outputs(self): + """Test transaction with 260 outputs""" + tx = Transaction() + # Add one input + tx.add_input(TxInput("a"*64, 0, Script([]))) + + # Add 260 outputs + # 260 is chosen because it might trigger varint boundaries or size limits if logic is flawed + # For varint: 253 is the boundary where it switches from 1 byte to 3 bytes (fd xx xx) + # So 260 ensures we cover the multi-byte varint case for number of outputs. + for i in range(260): + tx.add_output(TxOutput(i, Script([]))) + + # Serialize and check size + raw_tx = tx.to_bytes(has_segwit=False) + self.assertTrue(len(raw_tx) > 0) + + # Verify we can deserialize it + tx_from_raw = Transaction.from_raw(raw_tx) + self.assertEqual(len(tx_from_raw.outputs), 260) + self.assertEqual(tx_from_raw.outputs[-1].amount, 259) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_transaction_helpers.py b/tests/test_transaction_helpers.py new file mode 100644 index 0000000..ba0c40e --- /dev/null +++ b/tests/test_transaction_helpers.py @@ -0,0 +1,59 @@ + +import unittest +from bitcoinutils.transactions import Transaction, TxInput, TxOutput, TxWitnessInput +from bitcoinutils.script import Script + +class TestTransactionHelpers(unittest.TestCase): + def setUp(self): + self.tx_in = TxInput("txid", 0, Script(["OP_0"])) + self.tx_out = TxOutput(1000, Script(["OP_1"])) + self.tx = Transaction() + + def test_add_input(self): + self.tx.add_input(self.tx_in) + self.assertEqual(len(self.tx.inputs), 1) + self.assertEqual(self.tx.inputs[0], self.tx_in) + + def test_add_input_segwit(self): + self.tx.has_segwit = True + self.tx.add_input(self.tx_in) + self.assertEqual(len(self.tx.inputs), 1) + self.assertEqual(len(self.tx.witnesses), 1) + self.assertIsInstance(self.tx.witnesses[0], TxWitnessInput) + + def test_remove_input(self): + self.tx.add_input(self.tx_in) + self.tx.remove_input(0) + self.assertEqual(len(self.tx.inputs), 0) + + def test_remove_input_segwit(self): + self.tx.has_segwit = True + self.tx.add_input(self.tx_in) # adds witness + self.tx.remove_input(0) # should remove witness + self.assertEqual(len(self.tx.inputs), 0) + self.assertEqual(len(self.tx.witnesses), 0) + + def test_update_input(self): + self.tx.add_input(self.tx_in) + new_in = TxInput("new_txid", 1, Script(["OP_2"])) + self.tx.update_input(0, new_in) + self.assertEqual(self.tx.inputs[0], new_in) + + def test_add_output(self): + self.tx.add_output(self.tx_out) + self.assertEqual(len(self.tx.outputs), 1) + self.assertEqual(self.tx.outputs[0], self.tx_out) + + def test_remove_output(self): + self.tx.add_output(self.tx_out) + self.tx.remove_output(0) + self.assertEqual(len(self.tx.outputs), 0) + + def test_update_output(self): + self.tx.add_output(self.tx_out) + new_out = TxOutput(2000, Script(["OP_3"])) + self.tx.update_output(0, new_out) + self.assertEqual(self.tx.outputs[0], new_out) + +if __name__ == '__main__': + unittest.main()