diff --git a/src/cpp/wallet/py_monero_wallet_rpc.cpp b/src/cpp/wallet/py_monero_wallet_rpc.cpp index 74e5515..edd2483 100644 --- a/src/cpp/wallet/py_monero_wallet_rpc.cpp +++ b/src/cpp/wallet/py_monero_wallet_rpc.cpp @@ -25,9 +25,11 @@ void PyMoneroWalletPoller::set_period_in_ms(uint64_t period_ms) { } void PyMoneroWalletPoller::poll() { + // skip if next poll is queued if (m_num_polling > 1) return; m_num_polling++; + // synchronize polls boost::lock_guard lock(m_mutex); try { // skip if wallet is closed @@ -36,6 +38,7 @@ void PyMoneroWalletPoller::poll() { return; } + // take initial snapshot if (m_prev_balances == boost::none) { m_prev_height = m_wallet->get_height(); monero::monero_tx_query tx_query; @@ -73,10 +76,13 @@ void PyMoneroWalletPoller::poll() { no_longer_locked_hashes.push_back(prev_locked_tx->m_hash.get()); } } + + // save locked txs for next comparison m_prev_locked_txs = locked_txs; std::vector> unlocked_txs; if (!no_longer_locked_hashes.empty()) { + // fetch txs which are no longer locked monero_tx_query tx_query; tx_query.m_is_locked = false; tx_query.m_min_height = min_height; @@ -87,23 +93,34 @@ void PyMoneroWalletPoller::poll() { // announce new unconfirmed and confirmed txs for (const auto &locked_tx : locked_txs) { - if (locked_tx->m_is_confirmed) { - m_prev_confirmed_notifications.push_back(locked_tx->m_hash.get()); - notify_outputs(locked_tx); + bool announced = false; + const std::string& tx_hash = locked_tx->m_hash.get(); + if (bool_equals_2(true, locked_tx->m_is_confirmed)) { + if (std::find(m_prev_confirmed_notifications.begin(), m_prev_confirmed_notifications.end(), tx_hash) == m_prev_confirmed_notifications.end()) { + m_prev_confirmed_notifications.push_back(tx_hash); + announced = true; + } } else { - m_prev_unconfirmed_notifications.push_back(locked_tx->m_hash.get()); + if (std::find(m_prev_unconfirmed_notifications.begin(), m_prev_unconfirmed_notifications.end(), tx_hash) == m_prev_unconfirmed_notifications.end()) { + m_prev_unconfirmed_notifications.push_back(tx_hash); + announced = true; + } } + + if (announced) notify_outputs(locked_tx); } // announce new unlocked outputs for (const auto &unlocked_tx : unlocked_txs) { std::string tx_hash = unlocked_tx->m_hash.get(); + // stop tracking tx notifications m_prev_confirmed_notifications.erase(std::remove_if(m_prev_confirmed_notifications.begin(), m_prev_confirmed_notifications.end(), [&tx_hash](const std::string& iter){ return iter == tx_hash; }), m_prev_confirmed_notifications.end()); m_prev_unconfirmed_notifications.erase(std::remove_if(m_prev_unconfirmed_notifications.begin(), m_prev_unconfirmed_notifications.end(), [&tx_hash](const std::string& iter){ return iter == tx_hash; }), m_prev_unconfirmed_notifications.end()); notify_outputs(unlocked_tx); } + // announce balance changes check_for_changed_balances(); m_num_polling--; @@ -117,7 +134,7 @@ void PyMoneroWalletPoller::poll() { } std::shared_ptr PyMoneroWalletPoller::get_tx(const std::vector>& txs, const std::string& tx_hash) { - for (auto tx : txs) { + for (const auto& tx : txs) { if (tx->m_hash == tx_hash) return tx; } @@ -141,6 +158,8 @@ void PyMoneroWalletPoller::on_new_block(uint64_t height) { } void PyMoneroWalletPoller::notify_outputs(const std::shared_ptr &tx) { + // notify spent outputs + // TODO (monero-project): monero-wallet-rpc does not allow scrape of tx inputs so providing one input with outgoing amount if (tx->m_outgoing_transfer != boost::none) { auto outgoing_transfer = tx->m_outgoing_transfer.get(); if (!tx->m_inputs.empty()) throw std::runtime_error("Tx inputs should be empty"); @@ -148,20 +167,26 @@ void PyMoneroWalletPoller::notify_outputs(const std::shared_ptrm_amount = outgoing_transfer->m_amount.get() + tx->m_fee.get(); output->m_account_index = outgoing_transfer->m_account_index; output->m_tx = tx; + // initialize if transfer sourced from single subaddress if (outgoing_transfer->m_subaddress_indices.size() == 1) { output->m_subaddress_index = outgoing_transfer->m_subaddress_indices[0]; } + tx->m_inputs.clear(); tx->m_inputs.push_back(output); m_wallet->announce_output_spent(output); } + // notify received outputs if (tx->m_incoming_transfers.size() > 0) { if (!tx->m_outputs.empty()) { + // TODO (monero-project): outputs only returned for confirmed txs for(const auto &output : tx->get_outputs_wallet()) { m_wallet->announce_output_received(output); } } else { + // TODO (monero-project): monero-wallet-rpc does not allow scrape of unconfirmed received outputs so using incoming transfer values + tx->m_outputs.clear(); for (const auto &transfer : tx->m_incoming_transfers) { auto output = std::make_shared(); output->m_account_index = transfer->m_account_index; @@ -178,6 +203,7 @@ void PyMoneroWalletPoller::notify_outputs(const std::shared_ptrget_balances(boost::none, boost::none); if (balances->m_balance != m_prev_balances.get()->m_balance || balances->m_unlocked_balance != m_prev_balances.get()->m_unlocked_balance) { @@ -191,6 +217,16 @@ bool PyMoneroWalletPoller::check_for_changed_balances() { PyMoneroWalletRpc::~PyMoneroWalletRpc() { } +void PyMoneroWalletRpc::add_listener(monero_wallet_listener& listener) { + PyMoneroWallet::add_listener(listener); + refresh_listening(); +} + +void PyMoneroWalletRpc::remove_listener(monero_wallet_listener& listener) { + PyMoneroWallet::remove_listener(listener); + refresh_listening(); +} + boost::optional PyMoneroWalletRpc::get_rpc_connection() const { if (m_rpc == nullptr) return boost::none; return boost::optional(*m_rpc); @@ -565,7 +601,7 @@ void PyMoneroWalletRpc::start_syncing(uint64_t sync_period_in_ms) { auto response = m_rpc->send_json_request(request); // update sync period for poller - m_sync_period_in_ms = sync_period_in_seconds * 1000; + m_sync_period_in_ms = sync_period_in_ms; if (m_poller != nullptr) m_poller->set_period_in_ms(m_sync_period_in_ms.get()); // poll if listening @@ -1876,9 +1912,14 @@ void PyMoneroWalletRpc::clear_address_cache() { } void PyMoneroWalletRpc::refresh_listening() { - if (m_rpc->m_zmq_uri == boost::none) { - if (m_poller == nullptr && m_listeners.size() > 0) m_poller = std::make_shared(this); - if (m_poller != nullptr) m_poller->set_is_polling(m_listeners.size() > 0); + if (m_rpc->m_zmq_uri == boost::none || m_rpc->m_zmq_uri.get().empty()) { + if (m_poller == nullptr && m_listeners.size() > 0) { + m_poller = std::make_shared(this); + if (m_sync_period_in_ms != boost::none) m_poller->set_period_in_ms(m_sync_period_in_ms.get()); + } + if (m_poller != nullptr) { + m_poller->set_is_polling(m_listeners.size() > 0); + } } /* else { diff --git a/src/cpp/wallet/py_monero_wallet_rpc.h b/src/cpp/wallet/py_monero_wallet_rpc.h index 9789b45..f99cfd2 100644 --- a/src/cpp/wallet/py_monero_wallet_rpc.h +++ b/src/cpp/wallet/py_monero_wallet_rpc.h @@ -22,7 +22,7 @@ class PyMoneroWalletPoller { mutable boost::recursive_mutex m_mutex; PyMoneroWallet *m_wallet; std::atomic m_is_polling; - uint64_t m_poll_period_ms; + uint64_t m_poll_period_ms = 20000; std::thread m_thread; int m_num_polling; std::vector m_prev_unconfirmed_notifications; @@ -64,6 +64,8 @@ class PyMoneroWalletRpc : public PyMoneroWallet { boost::optional get_rpc_connection() const; std::vector get_seed_languages() const; void stop(); + void add_listener(monero_wallet_listener& listener) override; + void remove_listener(monero_wallet_listener& listener) override; bool is_view_only() const override; boost::optional get_daemon_connection() const override; void set_daemon_connection(const boost::optional& connection, bool is_trusted, const boost::optional& ssl_options); diff --git a/tests/test_monero_wallet_common.py b/tests/test_monero_wallet_common.py index cccbd0e..b3a40e1 100644 --- a/tests/test_monero_wallet_common.py +++ b/tests/test_monero_wallet_common.py @@ -3,6 +3,7 @@ import pytest import logging +from time import sleep from random import shuffle from configparser import ConfigParser from abc import ABC, abstractmethod @@ -24,7 +25,9 @@ StringUtils, AssertUtils, TxUtils, TxContext, GenUtils, WalletUtils, WalletType, IntegrationTestUtils, - ViewOnlyAndOfflineWalletTester + ViewOnlyAndOfflineWalletTester, + WalletNotificationCollector, + MiningUtils ) logger: logging.Logger = logging.getLogger("TestMoneroWalletCommon") @@ -3463,6 +3466,79 @@ def test_get_default_fee_priority(self, wallet: MoneroWallet) -> None: # endregion + #region Notification Tests + + # Can stop listening + @pytest.mark.skipif(TestUtils.TEST_NOTIFICATIONS is False, reason="TEST_NOTIFICATIONS disabled") + def test_stop_listening(self) -> None: + # create wallet and start background synchronizing + wallet: MoneroWallet = self._create_wallet(MoneroWalletConfig()) + + # add listener + listener: WalletNotificationCollector = WalletNotificationCollector() + wallet.add_listener(listener) + sleep(1) + + # remove listener and close + wallet.remove_listener(listener) + self._close_wallet(wallet) + + # Can be created and receive funds + # TODO this test is flaky on monero-wallet-rpc because of mining speed + @pytest.mark.skipif(TestUtils.TEST_NOTIFICATIONS is False, reason="TEST_NOTIFICATIONS disabled") + #@pytest.mark.flaky(reruns=5, reruns_delay=5) + def test_create_and_receive(self, daemon: MoneroDaemonRpc, wallet: MoneroWallet) -> None: + # create random wallet + receiver: MoneroWallet = self._create_wallet(MoneroWalletConfig()) + try: + # listen for received funds + my_listener: WalletNotificationCollector = WalletNotificationCollector() + receiver.add_listener(my_listener) + assert len(receiver.get_listeners()) > 0 + + # wait for txs to confirm and for sufficient unlocked balance + TestUtils.WALLET_TX_TRACKER.wait_for_txs_to_clear_pool(wallet) + TestUtils.WALLET_TX_TRACKER.wait_for_unlocked_balance(wallet, 0, None, TxUtils.MAX_FEE) + + # send funds to the receiver + tx_config: MoneroTxConfig = MoneroTxConfig() + tx_config.account_index = 0 + tx_config.address = receiver.get_primary_address() + tx_config.amount = TxUtils.MAX_FEE + tx_config.relay = True + sent_tx: MoneroTxWallet = wallet.create_tx(tx_config) + assert sent_tx.hash is not None + + # wait for funds to confirm + MiningUtils.try_start_mining() + while True: + tx: MoneroTxWallet | None = wallet.get_tx(sent_tx.hash) + assert tx is not None + assert tx.is_confirmed is not None + if tx.is_confirmed: + logger.debug(f"Confirmed tx at height {tx.get_height()}") + break + + assert tx.is_failed is False, f"Tx failed in mempool: {tx.hash}" + daemon.wait_for_next_block_header() + + # receiver should have notified listeners of received outputs + sleep(TestUtils.SYNC_PERIOD_IN_MS * 10 / 1000) + num_outputs_received: int = len(my_listener.outputs_received) + if TestUtils.REGTEST and num_outputs_received == 0 and isinstance(wallet, MoneroWalletRpc): + assert my_listener.balance_notifications[0][0] == TxUtils.MAX_FEE + assert my_listener.balance_notifications[0][1] == TxUtils.MAX_FEE + logger.warning(f"Wallet rpc lost outputs notifications, reduce mining speed") + else: + assert len(my_listener.outputs_received) > 0 + finally: + logger.debug(f"Closing receiver wallet...") + self._close_wallet(receiver) + logger.debug(f"Closed receiver wallet") + MiningUtils.try_stop_mining() + + #endregion + #region Reset Tests # Can sweep subaddresses diff --git a/tests/test_monero_wallet_keys.py b/tests/test_monero_wallet_keys.py index f914248..f87988d 100644 --- a/tests/test_monero_wallet_keys.py +++ b/tests/test_monero_wallet_keys.py @@ -507,6 +507,16 @@ def test_prove_unrelayed_txs(self, daemon: MoneroDaemonRpc, wallet: MoneroWallet def test_address_book(self, wallet: MoneroWallet) -> None: return super().test_address_book(wallet) + @pytest.mark.not_supported + @override + def test_stop_listening(self) -> None: + return super().test_stop_listening() + + @pytest.mark.not_supported + @override + def test_create_and_receive(self, daemon: MoneroDaemonRpc, wallet: MoneroWallet) -> None: + return super().test_create_and_receive(daemon, wallet) + #endregion #region Tests diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index 312e1af..7e6c4e5 100644 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -25,6 +25,7 @@ from .integration_test_utils import IntegrationTestUtils from .wallet_type import WalletType from .view_only_and_offline_wallet_tester import ViewOnlyAndOfflineWalletTester +from .wallet_notification_collector import WalletNotificationCollector __all__ = [ 'WalletUtils', @@ -53,5 +54,6 @@ 'BlockchainUtils', 'IntegrationTestUtils', 'WalletType', - 'ViewOnlyAndOfflineWalletTester' + 'ViewOnlyAndOfflineWalletTester', + 'WalletNotificationCollector' ] diff --git a/tests/utils/wallet_notification_collector.py b/tests/utils/wallet_notification_collector.py new file mode 100644 index 0000000..75413ec --- /dev/null +++ b/tests/utils/wallet_notification_collector.py @@ -0,0 +1,106 @@ +import logging + +from typing import override + +from monero import ( + MoneroWalletListener, MoneroOutputWallet, + MoneroOutputQuery +) + +logger: logging.Logger = logging.getLogger("WalletNotificationCollector") + + +class WalletNotificationCollector(MoneroWalletListener): + """Collects blocks, outputs and balances changes from wallet""" + + listening: bool + """Indicates if listener is expected to be active""" + block_notifications: list[int] + """Collection of blocks""" + balance_notifications: list[tuple[int, int]] + """Collection of balance notifications""" + outputs_received: list[MoneroOutputWallet] + """Collection of outputs received by the wallet""" + outputs_spent: list[MoneroOutputWallet] + """Collection of outputs spend by the wallet""" + + def __init__(self) -> None: + """ + Initialize a new wallet notification collector. + """ + super().__init__() + self.listening = True + self.block_notifications = [] + self.balance_notifications = [] + self.outputs_received = [] + self.outputs_spent = [] + + @override + def on_new_block(self, height: int) -> None: + assert self.listening + num_block_notifications: int = len(self.block_notifications) + + if num_block_notifications > 0: + # check block notifications order + expected_height: int = self.block_notifications[num_block_notifications - 1] + 1 + assert height == expected_height, f"Expected height {expected_height}, got {height}" + + # collect height + self.block_notifications.append(height) + logger.debug(f"Collected height: {height}") + + @override + def on_balances_changed(self, new_balance: int, new_unclocked_balance: int) -> None: + assert self.listening + num_balance_notifications: int = len(self.balance_notifications) + + if num_balance_notifications > 0: + last_notification: tuple[int, int] = self.balance_notifications[num_balance_notifications - 1] + # test that balances change + assert new_balance != last_notification[0] or new_balance != last_notification[1] + + # collect balance notification + self.balance_notifications.append((new_balance, new_unclocked_balance)) + logger.debug(f"Collected balance: {new_balance}, unlocked balance: {new_unclocked_balance}") + + @override + def on_output_received(self, output: MoneroOutputWallet) -> None: + assert self.listening + # collect received output + self.outputs_received.append(output) + logger.debug(f"Received output: {output.serialize()}") + + @override + def on_output_spent(self, output: MoneroOutputWallet) -> None: + assert self.listening + # collect spent output + self.outputs_spent.append(output) + logger.debug(f"Spent output: {output.serialize()}") + + def get_outputs_received(self, query: MoneroOutputQuery) -> list[MoneroOutputWallet]: + """ + Get outputs received by query. + + :param MoneroOutputQuery query: filter outputs received + :returns list[MoneroOutputWallet]: outputs received by the wallet filtered by query + """ + result: list[MoneroOutputWallet] = [] + # filter received outputs + for output in self.outputs_received: + if query.meets_criteria(output): + result.append(output) + return result + + def get_outputs_spent(self, query: MoneroOutputQuery) -> list[MoneroOutputWallet]: + """ + Get outputs spent by query. + + :param MoneroOutputQuery query: filter outputs spent + :returns list[MoneroOutputWallet]: outputs spent by the wallet filtered by query + """ + result: list[MoneroOutputWallet] = [] + # filter spent outputs + for output in self.outputs_spent: + if query.meets_criteria(output): + result.append(output) + return result