diff --git a/bittensor/core/async_subtensor.py b/bittensor/core/async_subtensor.py index b7926ea807..02e3a04851 100644 --- a/bittensor/core/async_subtensor.py +++ b/bittensor/core/async_subtensor.py @@ -14,6 +14,8 @@ from scalecodec import GenericCall from bittensor.core.chain_data import ( + ColdkeySwapAnnouncementInfo, + ColdkeySwapConstants, CrowdloanConstants, CrowdloanInfo, DelegateInfo, @@ -50,6 +52,10 @@ root_set_pending_childkey_cooldown_extrinsic, set_children_extrinsic, ) +from bittensor.core.extrinsics.asyncex.coldkey_swap import ( + announce_coldkey_swap_extrinsic, + swap_coldkey_announced_extrinsic, +) from bittensor.core.extrinsics.asyncex.crowdloan import ( contribute_crowdloan_extrinsic, create_crowdloan_extrinsic, @@ -1933,6 +1939,203 @@ async def get_children_pending( cooldown, ) + async def get_coldkey_swap_announcement( + self, + coldkey_ss58: str, + block: Optional[int] = None, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> Optional["ColdkeySwapAnnouncementInfo"]: + """ + Retrieves coldkey swap announcement for a specific coldkey. + + This method queries the SubtensorModule.ColdkeySwapAnnouncements storage for an announcement made by the given + coldkey. Announcements allow a coldkey to declare its intention to swap to a new coldkey address after a delay + period. + + Parameters: + coldkey_ss58: SS58 address of the coldkey whose announcement to retrieve. + block: The blockchain block number for the query. If `None`, queries the latest block. + block_hash: The hash of the block at which to check the parameter. Do not set if using `block` or `reuse_block`. + reuse_block: Whether to reuse the last-used block hash. Do not set if using `block_hash` or `block`. + + Returns: + ColdkeySwapAnnouncementInfo if announcement exists, None otherwise. Contains the execution block and + new coldkey hash. + + Notes: + - If the coldkey has no announcement, returns None. + - See: + """ + block_hash = await self.determine_block_hash(block, block_hash, reuse_block) + query = await self.substrate.query( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + params=[coldkey_ss58], + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + return ColdkeySwapAnnouncementInfo.from_query( + coldkey_ss58=coldkey_ss58, query=query + ) + + async def get_coldkey_swap_announcements( + self, + block: Optional[int] = None, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> list["ColdkeySwapAnnouncementInfo"]: + """ + Retrieves all coldkey swap announcements from the chain. + + This method queries the SubtensorModule.ColdkeySwapAnnouncements storage map across all coldkeys and returns a + list of all active announcements. + + Parameters: + block: The blockchain block number for the query. If `None`, queries the latest block. + block_hash: The hash of the block at which to check the parameter. Do not set if using `block` or `reuse_block`. + reuse_block: Whether to reuse the last-used block hash. Do not set if using `block_hash` or `block`. + + Returns: + List of ColdkeySwapAnnouncementInfo objects representing all active coldkey swap announcements on the chain. + + Notes: + - This method queries all announcements on the chain, which may be resource-intensive for large networks. + Consider using :meth:`get_coldkey_swap_announcement` for querying specific coldkeys. + - See: + """ + block_hash = await self.determine_block_hash(block, block_hash, reuse_block) + query_map = await self.substrate.query_map( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + return [ + ColdkeySwapAnnouncementInfo.from_record(record) + async for record in query_map + ] + + async def get_coldkey_swap_announcement_delay( + self, + block: Optional[int] = None, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> int: + """ + Retrieves the ColdkeySwapAnnouncementDelay storage value. + + This method queries the SubtensorModule.ColdkeySwapAnnouncementDelay storage value, which defines the number + of blocks that must elapse after making an announcement before the swap can be executed. + + Parameters: + block: The blockchain block number for the query. If `None`, queries the latest block. + block_hash: The hash of the block at which to check the parameter. Do not set if using `block` or `reuse_block`. + reuse_block: Whether to reuse the last-used block hash. Do not set if using `block_hash` or `block`. + + Returns: + The number of blocks that must elapse before swap execution (integer). + + Notes: + - This is a storage value (can be changed via admin extrinsics), not a runtime constant. + - See: + """ + block_hash = await self.determine_block_hash(block, block_hash, reuse_block) + query = await self.substrate.query( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncementDelay", + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + return query.value if getattr(query, "value", None) else 0 + + async def get_coldkey_swap_reannouncement_delay( + self, + block: Optional[int] = None, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> int: + """ + Retrieves the ColdkeySwapReannouncementDelay storage value. + + This method queries the SubtensorModule.ColdkeySwapReannouncementDelay storage value, which defines the number + of blocks that must elapse between the original announcement and a reannouncement. + + Parameters: + block: The blockchain block number for the query. If `None`, queries the latest block. + block_hash: The hash of the block at which to check the parameter. Do not set if using `block` or `reuse_block`. + reuse_block: Whether to reuse the last-used block hash. Do not set if using `block_hash` or `block`. + + Returns: + The number of blocks that must elapse before reannouncement (integer). + + Notes: + - This is a storage value (can be changed via admin extrinsics), not a runtime constant. + - See: + """ + block_hash = await self.determine_block_hash(block, block_hash, reuse_block) + query = await self.substrate.query( + module="SubtensorModule", + storage_function="ColdkeySwapReannouncementDelay", + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + return query.value if getattr(query, "value", None) else 0 + + async def get_coldkey_swap_constants( + self, + constants: Optional[list[str]] = None, + as_dict: bool = False, + block: Optional[int] = None, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> Union["ColdkeySwapConstants", dict]: + """ + Fetches runtime configuration constants for coldkey swap operations. + + This method retrieves on-chain runtime constants that define cost requirements for coldkey swap operations. + Note: For delay values (ColdkeySwapAnnouncementDelay and ColdkeySwapReannouncementDelay), use the dedicated + query methods `get_coldkey_swap_announcement_delay()` and `get_coldkey_swap_reannouncement_delay()` instead, + as these are storage values, not runtime constants. + + Parameters: + constants: Optional list of specific constant names to fetch. If omitted, all constants defined in + `ColdkeySwapConstants.constants_names()` are queried. Valid constant names include: "KeySwapCost". + as_dict: If True, returns the constants as a dictionary instead of a `ColdkeySwapConstants` object. + block: The blockchain block number for the query. If `None`, queries the latest block. + block_hash: The hash of the block at which to check the parameter. Do not set if using `block` or `reuse_block`. + reuse_block: Whether to reuse the last-used block hash. Do not set if using `block_hash` or `block`. + + Returns: + If `as_dict` is False: ColdkeySwapConstants object containing all requested constants. + If `as_dict` is True: Dictionary mapping constant names to their values (integers for cost in RAO). + + Notes: + - All amounts are returned in RAO. Values reflect the current chain configuration at the specified block. + - KeySwapCost is a runtime constant (queryable via constants). + - See: + """ + result = {} + const_names = constants or ColdkeySwapConstants.constants_names() + + block_hash = await self.determine_block_hash(block, block_hash, reuse_block) + + for const_name in const_names: + # Query as runtime constant + query = await self.query_constant( + module_name="SubtensorModule", + constant_name=const_name, + block=block, + block_hash=block_hash, + reuse_block=reuse_block, + ) + if query is not None: + result[const_name] = query.value + + constants_obj = ColdkeySwapConstants.from_dict(result) + + return constants_obj.to_dict() if as_dict else constants_obj + async def get_commitment( self, netuid: int, @@ -6174,6 +6377,62 @@ async def add_liquidity( wait_for_revealed_execution=wait_for_revealed_execution, ) + async def announce_coldkey_swap( + self, + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = DEFAULT_PERIOD, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, + ) -> ExtrinsicResponse: + """ + Announces a coldkey swap by submitting the BlakeTwo256 hash of the new coldkey. + + This method allows a coldkey to declare its intention to swap to a new coldkey address. The announcement must be + made before the actual swap can be executed, and a delay period must pass before execution is allowed. + After making an announcement, all transactions from the coldkey are blocked except for `swap_coldkey_announced`. + + Parameters: + wallet: Bittensor wallet object (should be the current coldkey wallet). + new_coldkey_ss58: SS58 address of the new coldkey that will replace the current one. + mev_protection: If ``True``, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If ``False``, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning ``False`` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - A swap cost is charged when making the first announcement (not when reannouncing). + - After making an announcement, all transactions from the coldkey are blocked except for + `swap_coldkey_announced`. + - The swap can only be executed after the delay period has passed (check via + `get_coldkey_swap_announcement`). + - See: + """ + return await announce_coldkey_swap_extrinsic( + subtensor=self, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=mev_protection, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + async def add_stake_multiple( self, wallet: "Wallet", @@ -8932,6 +9191,59 @@ async def transfer( wait_for_revealed_execution=wait_for_revealed_execution, ) + async def swap_coldkey_announced( + self, + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = DEFAULT_PERIOD, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, + ) -> ExtrinsicResponse: + """ + Executes a previously announced coldkey swap. + + This method executes a coldkey swap that was previously announced via `announce_coldkey_swap`. The new coldkey + address must match the hash that was announced, and the delay period must have passed. + + Parameters: + wallet: Bittensor wallet object (should be the current coldkey wallet that made the announcement). + new_coldkey_ss58: SS58 address of the new coldkey to swap to. This must match the hash that was announced. + mev_protection: If ``True``, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If ``False``, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning ``False`` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - The new coldkey hash must match the hash that was announced. + - The delay period must have passed (check via `get_coldkey_swap_announcement`). + - All assets, stakes, subnet ownerships, and hotkey associations are transferred from the old coldkey to the new one. + - See: + """ + return await swap_coldkey_announced_extrinsic( + subtensor=self, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=mev_protection, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + async def transfer_stake( self, wallet: "Wallet", diff --git a/bittensor/core/chain_data/__init__.py b/bittensor/core/chain_data/__init__.py index 7cb0840746..79f9794793 100644 --- a/bittensor/core/chain_data/__init__.py +++ b/bittensor/core/chain_data/__init__.py @@ -7,6 +7,10 @@ from .axon_info import AxonInfo from .chain_identity import ChainIdentity +from .coldkey_swap import ( + ColdkeySwapAnnouncementInfo, + ColdkeySwapConstants, +) from .crowdloan_info import CrowdloanInfo, CrowdloanConstants from .delegate_info import DelegateInfo, DelegatedInfo from .delegate_info_lite import DelegateInfoLite @@ -40,6 +44,8 @@ __all__ = [ "AxonInfo", "ChainIdentity", + "ColdkeySwapAnnouncementInfo", + "ColdkeySwapConstants", "CrowdloanInfo", "CrowdloanConstants", "DelegateInfo", diff --git a/bittensor/core/chain_data/coldkey_swap.py b/bittensor/core/chain_data/coldkey_swap.py new file mode 100644 index 0000000000..8daaafa367 --- /dev/null +++ b/bittensor/core/chain_data/coldkey_swap.py @@ -0,0 +1,127 @@ +from dataclasses import asdict, dataclass, fields +from typing import Optional + +from async_substrate_interface.types import ScaleObj + +from bittensor.core.chain_data.utils import decode_account_id + + +@dataclass +class ColdkeySwapAnnouncementInfo: + """ + Information about a coldkey swap announcement. + + This class contains information about a pending coldkey swap announcement. Announcements are used when a coldkey + wants to declare its intention to swap to a new coldkey address. The announcement must be made before the actual + swap can be executed, allowing time for verification and security checks. + + Attributes: + coldkey: The SS58 address of the coldkey that made the announcement. + execution_block: The block number when the swap can be executed (after the delay period has passed). + new_coldkey_hash: The BlakeTwo256 hash of the new coldkey AccountId (hex string with 0x prefix). This hash + must match the actual new coldkey when the swap is executed. + + Notes: + - The announcement is stored on-chain and can be queried via `get_coldkey_swap_announcement()`. + - After making an announcement, all transactions from coldkey are blocked except for `swap_coldkey_announced`. + - The swap can only be executed after the `execution_block` has been reached. + - See: + """ + + coldkey: str + execution_block: int + new_coldkey_hash: str + + @classmethod + def from_query( + cls, coldkey_ss58: str, query: "ScaleObj" + ) -> Optional["ColdkeySwapAnnouncementInfo"]: + """ + Creates a ColdkeySwapAnnouncementInfo object from a Substrate query result. + + Parameters: + coldkey_ss58: The SS58 address of the coldkey that made the announcement. + query: Query result from Substrate `query()` call to `ColdkeySwapAnnouncements` storage function. + + Returns: + ColdkeySwapAnnouncementInfo if announcement exists, None otherwise. + """ + if not getattr(query, "value", None): + return None + + execution_block = query.value[0] + new_coldkey_hash = "0x" + bytes(query.value[1][0]).hex() + return cls( + coldkey=coldkey_ss58, + execution_block=execution_block, + new_coldkey_hash=new_coldkey_hash, + ) + + @classmethod + def from_record(cls, record: tuple) -> "ColdkeySwapAnnouncementInfo": + """ + Creates a ColdkeySwapAnnouncementInfo object from a query_map record. + + Parameters: + record: Data item from query_map records call to ColdkeySwapAnnouncements storage function. Structure is + [key, value] where key is the coldkey AccountId and value contains (BlockNumber, Hash) tuple. + + Returns: + ColdkeySwapAnnouncementInfo object with announcement details for the coldkey from the record. + """ + coldkey_ss58 = decode_account_id(record[0]) + announcement_data = record[1] + return cls.from_query(coldkey_ss58, announcement_data) + + +@dataclass +class ColdkeySwapConstants: + """ + Represents runtime constants for coldkey swap operations in the SubtensorModule. + + This class contains runtime constants that define cost requirements for coldkey swap operations. + Note: For delay values (ColdkeySwapAnnouncementDelay and ColdkeySwapReannouncementDelay), use the dedicated + query methods `get_coldkey_swap_announcement_delay()` and `get_coldkey_swap_reannouncement_delay()` instead, + as these are storage values, not runtime constants. + + Attributes: + KeySwapCost: The cost in RAO required to make a coldkey swap announcement. This cost is charged when making the + first announcement (not when reannouncing). This is a runtime constant (queryable via constants). + + Notes: + - All amounts are in RAO. + - Values reflect the current chain configuration at the time of retrieval. + - See: + """ + + KeySwapCost: Optional[int] + + @classmethod + def constants_names(cls) -> list[str]: + """Returns the list of all constant field names defined in this dataclass. + + Returns: + List of constant field names as strings. + """ + return [f.name for f in fields(cls)] + + @classmethod + def from_dict(cls, data: dict) -> "ColdkeySwapConstants": + """ + Creates a ColdkeySwapConstants instance from a dictionary of decoded chain constants. + + Parameters: + data: Dictionary mapping constant names to their decoded values (returned by `Subtensor.query_constant()`). + + Returns: + ColdkeySwapConstants object with constants filled in. Fields not found in data will be set to `None`. + """ + return cls(**{name: data.get(name) for name in cls.constants_names()}) + + def to_dict(self) -> dict: + """Converts the ColdkeySwapConstants instance to a dictionary. + + Returns: + Dictionary mapping constant names to their values. + """ + return asdict(self) diff --git a/bittensor/core/chain_data/proxy.py b/bittensor/core/chain_data/proxy.py index 6ddb63961c..8e663d26b3 100644 --- a/bittensor/core/chain_data/proxy.py +++ b/bittensor/core/chain_data/proxy.py @@ -31,14 +31,12 @@ class ProxyType(str, Enum): NonTransfer: Allows all operations except those involving token transfers. Prohibited operations: - All Balances module calls - transfer_stake - - schedule_swap_coldkey - - swap_coldkey NonFungible: Allows all operations except token-related operations and registrations. Prohibited operations: - All Balances module calls - All staking operations (add_stake, remove_stake, unstake_all, swap_stake, move_stake, transfer_stake) - Registration operations (burned_register, root_register) - - Key swap operations (schedule_swap_coldkey, swap_coldkey, swap_hotkey) + - Key swap operations (announce_coldkey_swap, swap_coldkey_announced, swap_hotkey) Staking: Allows only staking-related operations. Permitted operations: - add_stake, add_stake_limit diff --git a/bittensor/core/extrinsics/asyncex/coldkey_swap.py b/bittensor/core/extrinsics/asyncex/coldkey_swap.py new file mode 100644 index 0000000000..e8318d7827 --- /dev/null +++ b/bittensor/core/extrinsics/asyncex/coldkey_swap.py @@ -0,0 +1,337 @@ +from typing import TYPE_CHECKING, Optional + +from bittensor.core.extrinsics.asyncex.mev_shield import submit_encrypted_extrinsic +from bittensor.core.extrinsics.pallets import SubtensorModule +from bittensor.core.extrinsics.utils import ( + compute_coldkey_hash, + verify_coldkey_hash, +) +from bittensor_wallet import Keypair +from bittensor.core.settings import DEFAULT_MEV_PROTECTION +from bittensor.core.types import ExtrinsicResponse +from bittensor.utils.btlogging import logging + +if TYPE_CHECKING: + from bittensor_wallet import Wallet + from bittensor.core.async_subtensor import AsyncSubtensor + + +async def announce_coldkey_swap_extrinsic( + subtensor: "AsyncSubtensor", + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = None, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, +) -> ExtrinsicResponse: + """ + Announces a coldkey swap by submitting the BlakeTwo256 hash of the new coldkey. + + This extrinsic allows a coldkey to declare its intention to swap to a new coldkey address. The announcement + must be made before the actual swap can be executed, and a delay period must pass before execution is allowed. + After making an announcement, all transactions from the coldkey are blocked except for `swap_coldkey_announced`. + + Parameters: + subtensor: AsyncSubtensor instance with the connection to the chain. + wallet: Bittensor wallet object (should be the current coldkey wallet). + new_coldkey_ss58: SS58 address of the new coldkey that will replace the current one. + mev_protection: If ``True``, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If ``False``, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning ``False`` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - A swap cost is charged when making the first announcement (not when reannouncing). + - After making an announcement, all transactions from the coldkey are blocked except for `swap_coldkey_announced`. + - The swap can only be executed after the delay period has passed (check via `get_coldkey_swap_announcement`). + - See: + """ + try: + if not ( + unlocked := ExtrinsicResponse.unlock_wallet(wallet, raise_error) + ).success: + return unlocked + + # Compute hash of new coldkey + new_coldkey = Keypair(ss58_address=new_coldkey_ss58) + new_coldkey_hash = compute_coldkey_hash(new_coldkey) + + logging.debug( + f"Announcing coldkey swap: current=[blue]{wallet.coldkeypub.ss58_address}[/blue], " + f"new=[blue]{new_coldkey_ss58}[/blue], " + f"hash=[blue]{new_coldkey_hash}[/blue] " + f"on [blue]{subtensor.network}[/blue]." + ) + + call = await SubtensorModule(subtensor).announce_coldkey_swap( + new_coldkey_hash=new_coldkey_hash + ) + + if mev_protection: + response = await submit_encrypted_extrinsic( + subtensor=subtensor, + wallet=wallet, + call=call, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + else: + response = await subtensor.sign_and_send_extrinsic( + call=call, + wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + period=period, + raise_error=raise_error, + ) + + if response.success: + logging.debug("[green]Coldkey swap announced successfully.[/green]") + else: + logging.error(f"[red]{response.message}[/red]") + + return response + + except Exception as error: + return ExtrinsicResponse.from_exception(raise_error=raise_error, error=error) + + +async def swap_coldkey_announced_extrinsic( + subtensor: "AsyncSubtensor", + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = None, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, +) -> ExtrinsicResponse: + """ + Executes a previously announced coldkey swap. + + This extrinsic executes a coldkey swap that was previously announced via `announce_coldkey_swap_extrinsic`. + The new coldkey address must match the hash that was announced, and the delay period must have passed. + + Parameters: + subtensor: AsyncSubtensor instance with the connection to the chain. + wallet: Bittensor wallet object (should be the current coldkey wallet that made the announcement). + new_coldkey_ss58: SS58 address of the new coldkey to swap to. This must match the hash that was announced. + mev_protection: If ``True``, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If ``False``, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning ``False`` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - The new coldkey hash must match the hash that was announced. + - The delay period must have passed (check via `get_coldkey_swap_announcement`). + - All assets, stakes, subnet ownerships, and hotkey associations are transferred from the old coldkey to the new + one. + - See: + """ + try: + if not ( + unlocked := ExtrinsicResponse.unlock_wallet(wallet, raise_error) + ).success: + return unlocked + + # Verify announcement exists and hash matches + announcement = await subtensor.get_coldkey_swap_announcement( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + + if announcement is None: + error_msg = "No coldkey swap announcement found. Make an announcement first using announce_coldkey_swap_extrinsic." + if raise_error: + raise ValueError(error_msg) + return ExtrinsicResponse( + success=False, + message=error_msg, + extrinsic_receipt=None, + ) + + new_coldkey = Keypair(ss58_address=new_coldkey_ss58) + if not verify_coldkey_hash(new_coldkey, announcement.new_coldkey_hash): + error_msg = ( + f"New coldkey hash does not match announcement. " + f"Expected: {announcement.new_coldkey_hash}, " + f"Got: {compute_coldkey_hash(new_coldkey)}" + ) + if raise_error: + raise ValueError(error_msg) + return ExtrinsicResponse( + success=False, + message=error_msg, + extrinsic_receipt=None, + ) + + # Check if delay has passed + current_block = await subtensor.get_current_block() + if current_block < announcement.execution_block: + error_msg = ( + f"Swap too early. Current block: {current_block}, " + f"Execution block: {announcement.execution_block}. " + f"Wait for {announcement.execution_block - current_block} more blocks." + ) + if raise_error: + raise ValueError(error_msg) + return ExtrinsicResponse( + success=False, + message=error_msg, + extrinsic_receipt=None, + ) + + logging.debug( + f"Executing coldkey swap: current=[blue]{wallet.coldkeypub.ss58_address}[/blue], " + f"new=[blue]{new_coldkey_ss58}[/blue] " + f"on [blue]{subtensor.network}[/blue]." + ) + + call = await SubtensorModule(subtensor).swap_coldkey_announced( + new_coldkey=new_coldkey_ss58 + ) + + if mev_protection: + response = await submit_encrypted_extrinsic( + subtensor=subtensor, + wallet=wallet, + call=call, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + else: + response = await subtensor.sign_and_send_extrinsic( + call=call, + wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + period=period, + raise_error=raise_error, + ) + + if response.success: + logging.debug("[green]Coldkey swap executed successfully.[/green]") + else: + logging.error(f"[red]{response.message}[/red]") + + return response + + except Exception as error: + return ExtrinsicResponse.from_exception(raise_error=raise_error, error=error) + + +async def remove_coldkey_swap_announcement_extrinsic( + subtensor: "AsyncSubtensor", + wallet: "Wallet", + coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = None, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, +) -> ExtrinsicResponse: + """ + Removes a coldkey swap announcement. + + This extrinsic can only called by root. It removes a pending coldkey swap announcement for the specified coldkey. + + Parameters: + subtensor: AsyncSubtensor instance with the connection to the chain. + wallet: Bittensor wallet object (must be root/admin wallet). + coldkey_ss58: SS58 address of the coldkey to remove the swap announcement for. + mev_protection: If ``True``, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If ``False``, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning ``False`` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - This function can only called by root. + - See: + """ + try: + if not ( + unlocked := ExtrinsicResponse.unlock_wallet(wallet, raise_error) + ).success: + return unlocked + + logging.debug( + f"Removing coldkey swap announcement: coldkey=[blue]{coldkey_ss58}[/blue] " + f"on [blue]{subtensor.network}[/blue]." + ) + + call = await SubtensorModule(subtensor).remove_coldkey_swap_announcement( + coldkey=coldkey_ss58 + ) + + if mev_protection: + response = await submit_encrypted_extrinsic( + subtensor=subtensor, + wallet=wallet, + call=call, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + else: + response = await subtensor.sign_and_send_extrinsic( + call=call, + wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + period=period, + raise_error=raise_error, + ) + + if response.success: + logging.debug( + "[green]Coldkey swap announcement removed successfully.[/green]" + ) + else: + logging.error(f"[red]{response.message}[/red]") + + return response + + except Exception as error: + return ExtrinsicResponse.from_exception(raise_error=raise_error, error=error) diff --git a/bittensor/core/extrinsics/coldkey_swap.py b/bittensor/core/extrinsics/coldkey_swap.py new file mode 100644 index 0000000000..e037b52be1 --- /dev/null +++ b/bittensor/core/extrinsics/coldkey_swap.py @@ -0,0 +1,338 @@ +from typing import TYPE_CHECKING, Optional + +from bittensor_wallet import Keypair + +from bittensor.core.extrinsics.mev_shield import submit_encrypted_extrinsic +from bittensor.core.extrinsics.pallets import SubtensorModule +from bittensor.core.extrinsics.utils import ( + compute_coldkey_hash, + verify_coldkey_hash, +) +from bittensor.core.settings import DEFAULT_MEV_PROTECTION +from bittensor.core.types import ExtrinsicResponse +from bittensor.utils.btlogging import logging + +if TYPE_CHECKING: + from bittensor_wallet import Wallet + from bittensor.core.subtensor import Subtensor + + +def announce_coldkey_swap_extrinsic( + subtensor: "Subtensor", + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = None, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, +) -> ExtrinsicResponse: + """ + Announces a coldkey swap by submitting the BlakeTwo256 hash of the new coldkey. + + This extrinsic allows a coldkey to declare its intention to swap to a new coldkey address. The announcement + must be made before the actual swap can be executed, and a delay period must pass before execution is allowed. + After making an announcement, all transactions from the coldkey are blocked except for `swap_coldkey_announced`. + + Parameters: + subtensor: Subtensor instance with the connection to the chain. + wallet: Bittensor wallet object (should be the current coldkey wallet). + new_coldkey_ss58: SS58 address of the new coldkey that will replace the current one. + mev_protection: If `True`, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If `False`, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning `False` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - A swap cost is charged when making the first announcement (not when reannouncing). + - After making an announcement, all transactions from the coldkey are blocked except for `swap_coldkey_announced`. + - The swap can only be executed after the delay period has passed (check via `get_coldkey_swap_announcement`). + - See: + """ + try: + if not ( + unlocked := ExtrinsicResponse.unlock_wallet(wallet, raise_error) + ).success: + return unlocked + + # Compute hash of new coldkey + new_keypair = Keypair( + ss58_address=new_coldkey_ss58, + ) + new_coldkey_hash = compute_coldkey_hash(new_keypair) + + logging.debug( + f"Announcing coldkey swap: current=[blue]{wallet.coldkeypub.ss58_address}[/blue], " + f"new=[blue]{new_coldkey_ss58}[/blue], " + f"hash=[blue]{new_coldkey_hash}[/blue] " + f"on [blue]{subtensor.network}[/blue]." + ) + + call = SubtensorModule(subtensor).announce_coldkey_swap( + new_coldkey_hash=new_coldkey_hash + ) + + if mev_protection: + response = submit_encrypted_extrinsic( + subtensor=subtensor, + wallet=wallet, + call=call, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + else: + response = subtensor.sign_and_send_extrinsic( + call=call, + wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + period=period, + raise_error=raise_error, + ) + + if response.success: + logging.debug("[green]Coldkey swap announced successfully.[/green]") + else: + logging.error(f"[red]{response.message}[/red]") + + return response + + except Exception as error: + return ExtrinsicResponse.from_exception(raise_error=raise_error, error=error) + + +def swap_coldkey_announced_extrinsic( + subtensor: "Subtensor", + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = None, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, +) -> ExtrinsicResponse: + """ + Executes a previously announced coldkey swap. + + This extrinsic executes a coldkey swap that was previously announced via `announce_coldkey_swap_extrinsic`. + The new coldkey address must match the hash that was announced, and the delay period must have passed. + + Parameters: + subtensor: Subtensor instance with the connection to the chain. + wallet: Bittensor wallet object (should be the current coldkey wallet that made the announcement). + new_coldkey_ss58: SS58 address of the new coldkey to swap to. This must match the hash that was announced. + mev_protection: If `True`, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If `False`, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning `False` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - The new coldkey hash must match the hash that was announced. + - The delay period must have passed (check via `get_coldkey_swap_announcement`). + - All assets, stakes, subnet ownerships, and hotkey associations are transferred from the old coldkey to the new one. + - See: + """ + try: + if not ( + unlocked := ExtrinsicResponse.unlock_wallet(wallet, raise_error) + ).success: + return unlocked + + # Verify announcement exists and hash matches + announcement = subtensor.get_coldkey_swap_announcement( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + + if announcement is None: + error_msg = "No coldkey swap announcement found. Make an announcement first using announce_coldkey_swap_extrinsic." + if raise_error: + raise ValueError(error_msg) + return ExtrinsicResponse( + success=False, + message=error_msg, + extrinsic_receipt=None, + ) + + new_coldkey = Keypair(ss58_address=new_coldkey_ss58) + if not verify_coldkey_hash(new_coldkey, announcement.new_coldkey_hash): + error_msg = ( + f"New coldkey hash does not match announcement. " + f"Expected: {announcement.new_coldkey_hash}, " + f"Got: {compute_coldkey_hash(new_coldkey)}" + ) + if raise_error: + raise ValueError(error_msg) + return ExtrinsicResponse( + success=False, + message=error_msg, + extrinsic_receipt=None, + ) + + # Check if delay has passed + current_block = subtensor.get_current_block() + if current_block < announcement.execution_block: + error_msg = ( + f"Swap too early. Current block: {current_block}, " + f"Execution block: {announcement.execution_block}. " + f"Wait for {announcement.execution_block - current_block} more blocks." + ) + error = ValueError(error_msg) + if raise_error: + raise error + return ExtrinsicResponse( + success=False, message=error_msg, extrinsic_receipt=None, error=error + ) + + logging.debug( + f"Executing coldkey swap: current=[blue]{wallet.coldkeypub.ss58_address}[/blue], " + f"new=[blue]{new_coldkey_ss58}[/blue] " + f"on [blue]{subtensor.network}[/blue]." + ) + + call = SubtensorModule(subtensor).swap_coldkey_announced( + new_coldkey=new_coldkey_ss58 + ) + + if mev_protection: + response = submit_encrypted_extrinsic( + subtensor=subtensor, + wallet=wallet, + call=call, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + else: + response = subtensor.sign_and_send_extrinsic( + call=call, + wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + period=period, + raise_error=raise_error, + ) + + if response.success: + logging.debug("[green]Coldkey swap executed successfully.[/green]") + else: + logging.error(f"[red]{response.message}[/red]") + + return response + + except Exception as error: + return ExtrinsicResponse.from_exception(raise_error=raise_error, error=error) + + +def remove_coldkey_swap_announcement_extrinsic( + subtensor: "Subtensor", + wallet: "Wallet", + coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = None, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, +) -> ExtrinsicResponse: + """ + Removes a coldkey swap announcement. + + This extrinsic can only called by root. It removes a pending coldkey swap announcement for the specified coldkey. + + Parameters: + subtensor: Subtensor instance with the connection to the chain. + wallet: Bittensor wallet object (must be root/admin wallet). + coldkey_ss58: SS58 address of the coldkey to remove the swap announcement for. + mev_protection: If `True`, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If `False`, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning `False` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - This function can only called by root. + - See: + """ + try: + if not ( + unlocked := ExtrinsicResponse.unlock_wallet(wallet, raise_error) + ).success: + return unlocked + + logging.debug( + f"Removing coldkey swap announcement: coldkey=[blue]{coldkey_ss58}[/blue] " + f"on [blue]{subtensor.network}[/blue]." + ) + + call = SubtensorModule(subtensor).remove_coldkey_swap_announcement( + coldkey=coldkey_ss58 + ) + + if mev_protection: + response = submit_encrypted_extrinsic( + subtensor=subtensor, + wallet=wallet, + call=call, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + else: + response = subtensor.sign_and_send_extrinsic( + call=call, + wallet=wallet, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + period=period, + raise_error=raise_error, + ) + + if response.success: + logging.debug( + "[green]Coldkey swap announcement removed successfully.[/green]" + ) + else: + logging.error(f"[red]{response.message}[/red]") + + return response + + except Exception as error: + return ExtrinsicResponse.from_exception(raise_error=raise_error, error=error) diff --git a/bittensor/core/extrinsics/pallets/subtensor_module.py b/bittensor/core/extrinsics/pallets/subtensor_module.py index 8a3344be55..f1f1df4dab 100644 --- a/bittensor/core/extrinsics/pallets/subtensor_module.py +++ b/bittensor/core/extrinsics/pallets/subtensor_module.py @@ -75,6 +75,20 @@ def add_stake_limit( allow_partial=allow_partial, ) + def announce_coldkey_swap( + self, + new_coldkey_hash: str, + ) -> Call: + """Returns GenericCall instance for Subtensor function SubtensorModule.announce_coldkey_swap. + + Parameters: + new_coldkey_hash: The BlakeTwo256 hash of the new coldkey AccountId (hex string with 0x prefix). + + Returns: + GenericCall instance. + """ + return self.create_composed_call(new_coldkey_hash=new_coldkey_hash) + def burned_register( self, netuid: int, @@ -314,6 +328,22 @@ def remove_stake_full_limit( netuid=netuid, hotkey=hotkey, limit_price=limit_price ) + def remove_coldkey_swap_announcement( + self, + coldkey: str, + ) -> Call: + """Returns GenericCall instance for Subtensor function SubtensorModule.remove_coldkey_swap_announcement. + + Only callable by root. + + Parameters: + coldkey: SS58 address of the coldkey to remove the swap announcement for. + + Returns: + GenericCall instance. + """ + return self.create_composed_call(coldkey=coldkey) + def reveal_mechanism_weights( self, netuid: int, @@ -633,6 +663,21 @@ def swap_stake_limit( allow_partial=allow_partial, ) + def swap_coldkey_announced( + self, + new_coldkey: str, + ) -> Call: + """Returns GenericCall instance for Subtensor function SubtensorModule.swap_coldkey_announced. + + Parameters: + new_coldkey: SS58 address of the new coldkey to swap to. The BlakeTwo256 hash of this coldkey must match + the hash that was announced. + + Returns: + GenericCall instance. + """ + return self.create_composed_call(new_coldkey=new_coldkey) + def transfer_stake( self, destination_coldkey: str, diff --git a/bittensor/core/extrinsics/utils.py b/bittensor/core/extrinsics/utils.py index fdeffdb9c8..5139c35ff8 100644 --- a/bittensor/core/extrinsics/utils.py +++ b/bittensor/core/extrinsics/utils.py @@ -5,12 +5,12 @@ from typing import TYPE_CHECKING, Optional, Union from bittensor_drand import encrypt_mlkem768 +from bittensor_wallet import Keypair from bittensor.core.extrinsics.pallets import Sudo from bittensor.core.types import ExtrinsicResponse from bittensor.utils.balance import Balance - # TODO: Michael/Roman add the link to the docs once it's ready.' MEV_HOTKEY_USAGE_WARNING = ( "MeV Shield cannot be used with hotkey-signed extrinsics. The transaction will fail because the hotkey cannot pay " @@ -287,3 +287,47 @@ def get_event_data_by_event_name(events: list, event_name: str) -> Optional[dict ): return event return None + + +def compute_coldkey_hash(keypair: "Keypair") -> str: + """ + Computes BlakeTwo256 hash of a coldkey AccountId. + + This function extracts the AccountId (32-byte public key) from an SS58 address and computes its BlakeTwo256 hash. + The hash is used in coldkey swap announcements to verify the new coldkey address when executing the swap. + + Parameters: + keypair: keypair for getting hash. + + Returns: + Hex string with 0x prefix representing the BlakeTwo256 hash of the AccountId. + + Notes: + - The hash is computed from the AccountId (public key bytes), not from the SS58 string. + - This matches the hash computation used in the Subtensor runtime. + - See: + """ + hash_bytes = hashlib.blake2b(keypair.public_key, digest_size=32).digest() + return "0x" + hash_bytes.hex() + + +def verify_coldkey_hash(keypair: "Keypair", expected_hash: str) -> bool: + """ + Verifies that a coldkey SS58 address matches the expected BlakeTwo256 hash. + + This function computes the hash of the coldkey AccountId and compares it with the expected hash. Used to verify that + the new coldkey address in a swap announcement matches the announced hash. + + Parameters: + keypair: keypair whose hash needs to be verified. + expected_hash: Expected BlakeTwo256 hash (hex string with 0x prefix). + + Returns: + True if the computed hash matches the expected hash, False otherwise. + + Notes: + - Both hashes are compared in lowercase to handle case differences. + - See: + """ + computed_hash = compute_coldkey_hash(keypair) + return computed_hash.lower() == expected_hash.lower() diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index 217e93d9ff..d8496ad64b 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -14,6 +14,8 @@ from bittensor.core.axon import Axon from bittensor.core.chain_data import ( + ColdkeySwapAnnouncementInfo, + ColdkeySwapConstants, CrowdloanConstants, CrowdloanInfo, DelegatedInfo, @@ -50,6 +52,10 @@ root_set_pending_childkey_cooldown_extrinsic, set_children_extrinsic, ) +from bittensor.core.extrinsics.coldkey_swap import ( + announce_coldkey_swap_extrinsic, + swap_coldkey_announced_extrinsic, +) from bittensor.core.extrinsics.crowdloan import ( contribute_crowdloan_extrinsic, create_crowdloan_extrinsic, @@ -1555,6 +1561,172 @@ def get_children_pending( cooldown, ) + def get_coldkey_swap_announcement( + self, + coldkey_ss58: str, + block: Optional[int] = None, + ) -> Optional["ColdkeySwapAnnouncementInfo"]: + """ + Retrieves coldkey swap announcement for a specific coldkey. + + This method queries the SubtensorModule.ColdkeySwapAnnouncements storage for an announcement made by the given + coldkey. Announcements allow a coldkey to declare its intention to swap to a new coldkey address after a delay + period. + + Parameters: + coldkey_ss58: SS58 address of the coldkey whose announcement to retrieve. + block: The blockchain block number for the query. If None, queries the latest block. + + Returns: + ColdkeySwapAnnouncementInfo if announcement exists, None otherwise. Contains the execution block and + new coldkey hash. + + Notes: + - If the coldkey has no announcement, returns None. + - See: + """ + block_hash = self.determine_block_hash(block) + query = self.substrate.query( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + params=[coldkey_ss58], + block_hash=block_hash, + ) + return ColdkeySwapAnnouncementInfo.from_query( + coldkey_ss58=coldkey_ss58, query=query + ) + + def get_coldkey_swap_announcements( + self, + block: Optional[int] = None, + ) -> list["ColdkeySwapAnnouncementInfo"]: + """ + Retrieves all coldkey swap announcements from the chain. + + This method queries the SubtensorModule.ColdkeySwapAnnouncements storage map across all coldkeys and returns a + list of all active announcements. + + Parameters: + block: The blockchain block number for the query. If None, queries the latest block. + + Returns: + List of ColdkeySwapAnnouncementInfo objects representing all active coldkey swap announcements on the chain. + + Notes: + - This method queries all announcements on the chain, which may be resource-intensive for large networks. + Consider using :meth:`get_coldkey_swap_announcement` for querying specific coldkeys. + - See: + """ + block_hash = self.determine_block_hash(block) + query_map = self.substrate.query_map( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + block_hash=block_hash, + ) + return [ColdkeySwapAnnouncementInfo.from_record(record) for record in query_map] + + def get_coldkey_swap_announcement_delay( + self, + block: Optional[int] = None, + ) -> int: + """ + Retrieves the ColdkeySwapAnnouncementDelay storage value. + + This method queries the SubtensorModule.ColdkeySwapAnnouncementDelay storage value, which defines the number + of blocks that must elapse after making an announcement before the swap can be executed. + + Parameters: + block: The blockchain block number for the query. If None, queries the latest block. + + Returns: + The number of blocks that must elapse before swap execution (integer). + + Notes: + - This is a storage value (can be changed via admin extrinsics), not a runtime constant. + - See: + """ + block_hash = self.determine_block_hash(block) + query = self.substrate.query( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncementDelay", + block_hash=block_hash, + ) + return query.value if getattr(query, "value", None) else 0 + + def get_coldkey_swap_constants( + self, + constants: Optional[list[str]] = None, + as_dict: bool = False, + block: Optional[int] = None, + ) -> Union["ColdkeySwapConstants", dict]: + """ + Fetches runtime configuration constants for coldkey swap operations. + + This method retrieves on-chain runtime constants that define cost requirements for coldkey swap operations. + Note: For delay values (ColdkeySwapAnnouncementDelay and ColdkeySwapReannouncementDelay), use the dedicated + query methods `get_coldkey_swap_announcement_delay()` and `get_coldkey_swap_reannouncement_delay()` instead, + as these are storage values, not runtime constants. + + Parameters: + constants: Optional list of specific constant names to fetch. If omitted, all constants defined in + `ColdkeySwapConstants.constants_names()` are queried. Valid constant names include: "KeySwapCost". + as_dict: If True, returns the constants as a dictionary instead of a `ColdkeySwapConstants` object. + block: The blockchain block number for the query. If None, queries the latest block. + + Returns: + If `as_dict` is False: ColdkeySwapConstants object containing all requested constants. + If `as_dict` is True: Dictionary mapping constant names to their values (integers for cost in RAO). + + Notes: + - All amounts are returned in RAO. Values reflect the current chain configuration at the specified block. + - KeySwapCost is a runtime constant (queryable via constants). + - See: + """ + result = {} + const_names = constants or ColdkeySwapConstants.constants_names() + + for const_name in const_names: + # Query as runtime constant + query = self.query_constant( + module_name="SubtensorModule", + constant_name=const_name, + block=block, + ) + if query is not None: + result[const_name] = query.value + + constants_obj = ColdkeySwapConstants.from_dict(result) + + return constants_obj.to_dict() if as_dict else constants_obj + + def get_coldkey_swap_reannouncement_delay( + self, + block: Optional[int] = None, + ) -> int: + """ + Retrieves the ColdkeySwapReannouncementDelay storage value. + + This method queries the SubtensorModule.ColdkeySwapReannouncementDelay storage value, which defines the number + of blocks that must elapse between the original announcement and a reannouncement. + + Parameters: + block: The blockchain block number for the query. If None, queries the latest block. + + Returns: + The number of blocks that must elapse before reannouncement (integer). + + Notes: + - This is a storage value (can be changed via admin extrinsics), not a runtime constant. + - See: + """ + block_hash = self.determine_block_hash(block) + query = self.substrate.query( + module="SubtensorModule", + storage_function="ColdkeySwapReannouncementDelay", + block_hash=block_hash, + ) + return query.value if getattr(query, "value", None) else 0 + def get_commitment(self, netuid: int, uid: int, block: Optional[int] = None) -> str: """Retrieves the on-chain commitment for a specific neuron in the Bittensor network. @@ -5041,6 +5213,62 @@ def add_liquidity( wait_for_revealed_execution=wait_for_revealed_execution, ) + def announce_coldkey_swap( + self, + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = DEFAULT_PERIOD, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, + ) -> ExtrinsicResponse: + """ + Announces a coldkey swap by submitting the BlakeTwo256 hash of the new coldkey. + + This method allows a coldkey to declare its intention to swap to a new coldkey address. The announcement must be + made before the actual swap can be executed, and a delay period must pass before execution is allowed. + After making an announcement, all transactions from the coldkey are blocked except for `swap_coldkey_announced`. + + Parameters: + wallet: Bittensor wallet object (should be the current coldkey wallet). + new_coldkey_ss58: SS58 address of the new coldkey that will replace the current one. + mev_protection: If `True`, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If `False`, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning `False` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - A swap cost is charged when making the first announcement (not when reannouncing). + - After making an announcement, all transactions from the coldkey are blocked except for ` + swap_coldkey_announced`. + - The swap can only be executed after the delay period has passed (check via + `get_coldkey_swap_announcement`). + - See: + """ + return announce_coldkey_swap_extrinsic( + subtensor=self, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=mev_protection, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + def add_stake_multiple( self, wallet: "Wallet", @@ -7671,6 +7899,59 @@ def transfer( wait_for_revealed_execution=wait_for_revealed_execution, ) + def swap_coldkey_announced( + self, + wallet: "Wallet", + new_coldkey_ss58: str, + *, + mev_protection: bool = DEFAULT_MEV_PROTECTION, + period: Optional[int] = DEFAULT_PERIOD, + raise_error: bool = False, + wait_for_inclusion: bool = True, + wait_for_finalization: bool = True, + wait_for_revealed_execution: bool = True, + ) -> ExtrinsicResponse: + """ + Executes a previously announced coldkey swap. + + This method executes a coldkey swap that was previously announced via `announce_coldkey_swap`. The new coldkey + address must match the hash that was announced, and the delay period must have passed. + + Parameters: + wallet: Bittensor wallet object (should be the current coldkey wallet that made the announcement). + new_coldkey_ss58: SS58 address of the new coldkey to swap to. This must match the hash that was announced. + mev_protection: If `True`, encrypts and submits the transaction through the MEV Shield pallet to protect + against front-running and MEV attacks. The transaction remains encrypted in the mempool until validators + decrypt and execute it. If `False`, submits the transaction directly without encryption. + period: The number of blocks during which the transaction will remain valid after it's submitted. If the + transaction is not included in a block within that number of blocks, it will expire and be rejected. You + can think of it as an expiration date for the transaction. + raise_error: Raises a relevant exception rather than returning `False` if unsuccessful. + wait_for_inclusion: Whether to wait for the inclusion of the transaction. + wait_for_finalization: Whether to wait for the finalization of the transaction. + wait_for_revealed_execution: Whether to wait for the revealed execution of transaction if mev_protection used. + + Returns: + ExtrinsicResponse: The result object of the extrinsic execution. + + Notes: + - The new coldkey hash must match the hash that was announced. + - The delay period must have passed (check via `get_coldkey_swap_announcement`). + - All assets, stakes, subnet ownerships, and hotkey associations are transferred from the old coldkey to the new one. + - See: + """ + return swap_coldkey_announced_extrinsic( + subtensor=self, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=mev_protection, + period=period, + raise_error=raise_error, + wait_for_inclusion=wait_for_inclusion, + wait_for_finalization=wait_for_finalization, + wait_for_revealed_execution=wait_for_revealed_execution, + ) + def transfer_stake( self, wallet: "Wallet", diff --git a/bittensor/extras/subtensor_api/extrinsics.py b/bittensor/extras/subtensor_api/extrinsics.py index 69b3cde77f..1caf709a36 100644 --- a/bittensor/extras/subtensor_api/extrinsics.py +++ b/bittensor/extras/subtensor_api/extrinsics.py @@ -10,6 +10,7 @@ def __init__(self, subtensor: Union["_Subtensor", "_AsyncSubtensor"]): self.add_liquidity = subtensor.add_liquidity self.add_stake = subtensor.add_stake self.add_stake_multiple = subtensor.add_stake_multiple + self.announce_coldkey_swap = subtensor.announce_coldkey_swap self.burned_register = subtensor.burned_register self.claim_root = subtensor.claim_root self.commit_weights = subtensor.commit_weights @@ -36,6 +37,7 @@ def __init__(self, subtensor: Union["_Subtensor", "_AsyncSubtensor"]): self.set_commitment = subtensor.set_commitment self.set_root_claim_type = subtensor.set_root_claim_type self.start_call = subtensor.start_call + self.swap_coldkey_announced = subtensor.swap_coldkey_announced self.swap_stake = subtensor.swap_stake self.toggle_user_liquidity = subtensor.toggle_user_liquidity self.transfer = subtensor.transfer diff --git a/bittensor/extras/subtensor_api/wallets.py b/bittensor/extras/subtensor_api/wallets.py index b6822159ed..695a44b022 100644 --- a/bittensor/extras/subtensor_api/wallets.py +++ b/bittensor/extras/subtensor_api/wallets.py @@ -19,6 +19,15 @@ def __init__(self, subtensor: Union["_Subtensor", "_AsyncSubtensor"]): self.get_balances = subtensor.get_balances self.get_children = subtensor.get_children self.get_children_pending = subtensor.get_children_pending + self.get_coldkey_swap_announcement_delay = ( + subtensor.get_coldkey_swap_announcement_delay + ) + self.get_coldkey_swap_announcement = subtensor.get_coldkey_swap_announcement + self.get_coldkey_swap_announcements = subtensor.get_coldkey_swap_announcements + self.get_coldkey_swap_constants = subtensor.get_coldkey_swap_constants + self.get_coldkey_swap_reannouncement_delay = ( + subtensor.get_coldkey_swap_reannouncement_delay + ) self.get_delegate_by_hotkey = subtensor.get_delegate_by_hotkey self.get_delegate_take = subtensor.get_delegate_take self.get_delegated = subtensor.get_delegated diff --git a/tests/e2e_tests/test_coldkey_swap.py b/tests/e2e_tests/test_coldkey_swap.py new file mode 100644 index 0000000000..e350dcabf6 --- /dev/null +++ b/tests/e2e_tests/test_coldkey_swap.py @@ -0,0 +1,737 @@ +import pytest + +from bittensor import logging +from bittensor.utils.balance import Balance + + +def test_coldkey_swap(subtensor, alice_wallet, bob_wallet, charlie_wallet): + """ + Sync test for coldkey swap extrinsics. + + This comprehensive test covers: + 1. Happy Path - Successful swap flow: + - Step 1: Announce coldkey swap from Alice to Bob + - Step 2: Verify announcement was created and contains correct data + - Step 3: Verify coldkey swap constants are accessible + - Step 4: Wait for execution block (50 blocks delay) + - Step 5: Execute the swap + - Step 6: Verify announcement was removed after successful swap + + 2. Error cases for swap_coldkey_announced: + - Error 1: Attempt to execute swap without prior announcement + - Error 2: Attempt to execute swap with incorrect coldkey hash (mismatch) + - Error 3: Attempt to execute swap too early (before execution block) + + 3. Error cases for announce_coldkey_swap: + - Error 4: Attempt to create duplicate announcement (reannouncement behavior) + + 4. Transaction blocking after announcement: + - Step 1: Create announcement + - Step 2: Attempt to execute other transaction (transfer) from announced coldkey + - Step 3: Verify transaction is blocked (except swap_coldkey_announced) + + Notes: + - Uses fast blocks mode (50 blocks delay instead of 5 days) + - All operations use async_subtensor for async execution + - Each test section cleans up after itself + """ + logging.console.info("Starting coldkey swap E2E test") + + # === 1. Happy Path - Successful swap === + logging.console.info("Testing Happy Path - successful swap") + + # Step 1: Alice announces swap to new coldkey (Bob) + logging.console.info("Step 1: Alice announces swap to Bob") + response = subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Step 2: Verify announcement was created + logging.console.info("Step 2: Verify announcement was created") + announcement = subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Announcement should exist" + assert announcement.coldkey == alice_wallet.coldkeypub.ss58_address + assert announcement.execution_block > subtensor.chain.get_current_block() + + # Step 3: Verify constants and storage values + logging.console.info("Step 3: Verify constants and storage values") + constants = subtensor.wallets.get_coldkey_swap_constants() + assert constants.KeySwapCost is not None + + announcement_delay = subtensor.wallets.get_coldkey_swap_announcement_delay() + reannouncement_delay = subtensor.wallets.get_coldkey_swap_reannouncement_delay() + + assert announcement_delay is not None + assert reannouncement_delay is not None + logging.console.info( + f"Constants: AnnouncementDelay={announcement_delay}, " + f"ReannouncementDelay={reannouncement_delay}, " + f"KeySwapCost={constants.KeySwapCost}" + ) + + # Step 4: Wait for 50 blocks (execution_block) + logging.console.info("Step 4: Waiting for execution block") + current_block = subtensor.chain.get_current_block() + execution_block = announcement.execution_block + logging.console.info( + f"Current block: {current_block}, Execution block: {execution_block}" + ) + subtensor.wait_for_block(execution_block + 1) + + # Step 5: Execute swap + logging.console.info("Step 5: Executing swap") + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to execute swap: {response.message}" + + # Step 6: Verify announcement was removed after swap + logging.console.info("Step 6: Verify announcement was removed after swap") + announcement_after = subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement_after is None, "Announcement should be removed after swap" + + logging.console.info("Happy Path completed successfully") + + # Refund Alice balance for further tests (Bob now has all Alice's funds after swap) + logging.console.info("Refunding Alice balance for further tests") + bob_balance = subtensor.wallets.get_balance(bob_wallet.coldkeypub.ss58_address) + refund_amount = Balance.from_tao(10) + assert bob_balance > refund_amount, ( + f"Bob balance ({bob_balance}) too low to refund Alice ({refund_amount})" + ) + response = subtensor.extrinsics.transfer( + wallet=bob_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, f"Failed to refund Alice: {response.message}" + logging.console.info("Alice balance refunded successfully") + + # === 2. Error cases for swap_coldkey_announced === + logging.console.info("Testing errors for swap_coldkey_announced") + + # Error 1: Attempt to execute swap without announcement + logging.console.info("Error 1: Attempting swap without announcement") + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail without announcement" + assert "No coldkey swap announcement found" in response.message + logging.console.info("Error 1 passed: No announcement error") + + # Error 2: Hash mismatch + logging.console.info("Error 2: Testing hash mismatch") + # Alice announces swap to Bob + response = subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Attempt to execute swap with incorrect coldkey (Charlie instead of Bob) + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=charlie_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail with hash mismatch" + assert "hash does not match" in response.message.lower() + logging.console.info("Error 2 passed: Hash mismatch error") + + # Cleanup: Remove announcement from Error 2 by waiting and executing swap + logging.console.info("Cleaning up announcement from Error 2") + announcement_from_error2 = subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement_from_error2 is not None, ( + "Announcement from Error 2 should exist" + ) + # Wait for execution block (wait_for_block is safe even if block already passed) + subtensor.wait_for_block(announcement_from_error2.execution_block + 1) + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to cleanup announcement: {response.message}" + logging.console.info("Cleaned up announcement from Error 2") + # Refund Alice balance after swap + bob_balance = subtensor.wallets.get_balance(bob_wallet.coldkeypub.ss58_address) + refund_amount = Balance.from_tao(10) + assert bob_balance > refund_amount, ( + f"Bob balance ({bob_balance}) too low to refund Alice ({refund_amount})" + ) + response = subtensor.extrinsics.transfer( + wallet=bob_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, f"Failed to refund Alice: {response.message}" + logging.console.info("Refunded Alice balance after cleanup") + + # Error 3: Too early (before execution block) + logging.console.info("Error 3: Testing too early error") + # Create new announcement for this test + response = subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + announcement = subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Announcement should exist for Error 3 test" + + # Attempt to execute swap immediately (before execution_block) + current_block = subtensor.chain.get_current_block() + assert current_block < announcement.execution_block, ( + "Current block should be before execution block" + ) + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail with too early error" + assert "too early" in response.message.lower() + assert str(announcement.execution_block) in response.message + logging.console.info("Error 3 passed: Too early error") + + # Wait for execution_block and execute swap for cleanup + subtensor.wait_for_block(announcement.execution_block) + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, response.message + logging.console.info("Cleaned up announcement by executing swap") + # Refund Alice balance after swap + + refund_amount = Balance.from_tao(10) + response = subtensor.extrinsics.transfer( + wallet=bob_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, response.message + + # === 3. Error cases for announce_coldkey_swap === + logging.console.info("Testing errors for announce_coldkey_swap") + + # Error 4: Duplicate announcement (reannouncement) + logging.console.info("Error 4: Testing duplicate announcement") + # Create first announcement + response = subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Attempt to create second announcement (to Charlie) + response = subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=charlie_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail with duplicate announcement" + + # Verify that there is an active announcement + announcement = subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Should have an active announcement" + logging.console.info("Error 4: Duplicate announcement handled") + + # Cleanup: Remove announcement from Error 4 by waiting and executing swap + logging.console.info("Cleaning up announcement from Error 4") + from bittensor.core.extrinsics.utils import verify_coldkey_hash + from bittensor_wallet import Keypair + + # Determine which coldkey matches the announcement hash + bob_keypair = Keypair(ss58_address=bob_wallet.coldkeypub.ss58_address) + charlie_keypair = Keypair(ss58_address=charlie_wallet.coldkeypub.ss58_address) + + assert verify_coldkey_hash( + bob_keypair, announcement.new_coldkey_hash + ) or verify_coldkey_hash(charlie_keypair, announcement.new_coldkey_hash), ( + "Announcement hash should match either Bob or Charlie" + ) + + # Use the matching coldkey + target_coldkey = ( + bob_wallet.coldkeypub.ss58_address + if verify_coldkey_hash(bob_keypair, announcement.new_coldkey_hash) + else charlie_wallet.coldkeypub.ss58_address + ) + refund_wallet = ( + bob_wallet + if verify_coldkey_hash(bob_keypair, announcement.new_coldkey_hash) + else charlie_wallet + ) + + # Wait for execution block + subtensor.wait_for_block(announcement.execution_block + 1) + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=target_coldkey, + ) + assert response.success, f"Failed to cleanup announcement: {response.message}" + logging.console.info("Cleaned up announcement from Error 4") + + # Refund Alice balance after swap + refund_amount = Balance.from_tao(10) + refund_balance = subtensor.wallets.get_balance( + refund_wallet.coldkeypub.ss58_address + ) + assert refund_balance > refund_amount, ( + f"{refund_wallet.name} balance ({refund_balance}) too low to refund Alice ({refund_amount})" + ) + response = subtensor.extrinsics.transfer( + wallet=refund_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, f"Failed to refund Alice: {response.message}" + logging.console.info( + f"Refunded Alice balance from {refund_wallet.name} after cleanup" + ) + + # === 4. Transaction blocking after announcement === + logging.console.info("Testing transaction blocking after announcement") + + # Step 1: Alice announces swap to Bob + logging.console.info("Step 1: Alice announces swap to Bob") + # Verify no existing announcement + existing_announcement = subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert existing_announcement is None, ( + "No announcement should exist before creating new one" + ) + response = subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Step 2: Attempt to execute other transaction from Alice (transfer) + logging.console.info("Step 2: Attempting transfer transaction (should be blocked)") + transfer_value = Balance.from_tao(1) + dest_coldkey = charlie_wallet.coldkeypub.ss58_address + + response = subtensor.extrinsics.transfer( + wallet=alice_wallet, + destination_ss58=dest_coldkey, + amount=transfer_value, + raise_error=False, + ) + + # Step 3: Verify transaction is blocked + assert not response.success, "Transfer should be blocked after announcement" + # Error code 0 corresponds to ColdkeySwapAnnounced (see CustomTransactionError enum) + # The message may contain "Custom error: 0" or specific text about swap + assert ( + "Custom error: 0" in response.message + or "ColdkeySwapAnnounced" in response.message + or "swap" in response.message.lower() + ), ( + f"Expected transaction to be blocked by ColdkeySwapAnnounced, got: {response.message}" + ) + logging.console.info("Transaction blocking test passed") + + # Cleanup: wait for execution_block and execute swap + announcement = subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Announcement should exist for cleanup" + subtensor.wait_for_block(announcement.execution_block + 1) + response = subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to cleanup announcement: {response.message}" + logging.console.info("Cleaned up announcement by executing swap") + + logging.console.info("All coldkey swap E2E tests completed successfully") + + +@pytest.mark.asyncio +async def test_coldkey_swap_async( + async_subtensor, alice_wallet, bob_wallet, charlie_wallet +): + """ + Async test for coldkey swap extrinsics. + + This comprehensive test covers: + 1. Happy Path - Successful swap flow: + - Step 1: Announce coldkey swap from Alice to Bob + - Step 2: Verify announcement was created and contains correct data + - Step 3: Verify coldkey swap constants are accessible + - Step 4: Wait for execution block (50 blocks delay) + - Step 5: Execute the swap + - Step 6: Verify announcement was removed after successful swap + + 2. Error cases for swap_coldkey_announced: + - Error 1: Attempt to execute swap without prior announcement + - Error 2: Attempt to execute swap with incorrect coldkey hash (mismatch) + - Error 3: Attempt to execute swap too early (before execution block) + + 3. Error cases for announce_coldkey_swap: + - Error 4: Attempt to create duplicate announcement (reannouncement behavior) + + 4. Transaction blocking after announcement: + - Step 1: Create announcement + - Step 2: Attempt to execute other transaction (transfer) from announced coldkey + - Step 3: Verify transaction is blocked (except swap_coldkey_announced) + + Notes: + - Uses fast blocks mode (50 blocks delay instead of 5 days) + - All operations use async_subtensor for async execution + - Each test section cleans up after itself + """ + logging.console.info("Starting coldkey swap E2E test") + + # === 1. Happy Path - Successful swap === + logging.console.info("Testing Happy Path - successful swap") + + # Step 1: Alice announces swap to new coldkey (Bob) + logging.console.info("Step 1: Alice announces swap to Bob") + response = await async_subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Step 2: Verify announcement was created + logging.console.info("Step 2: Verify announcement was created") + announcement = await async_subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Announcement should exist" + assert announcement.coldkey == alice_wallet.coldkeypub.ss58_address + assert ( + announcement.execution_block > await async_subtensor.chain.get_current_block() + ) + + # Step 3: Verify constants and storage values + logging.console.info("Step 3: Verify constants and storage values") + constants = await async_subtensor.wallets.get_coldkey_swap_constants() + assert constants.KeySwapCost is not None + + announcement_delay = ( + await async_subtensor.wallets.get_coldkey_swap_announcement_delay() + ) + reannouncement_delay = ( + await async_subtensor.wallets.get_coldkey_swap_reannouncement_delay() + ) + + assert announcement_delay is not None + assert reannouncement_delay is not None + logging.console.info( + f"Constants: AnnouncementDelay={announcement_delay}, " + f"ReannouncementDelay={reannouncement_delay}, " + f"KeySwapCost={constants.KeySwapCost}" + ) + + # Step 4: Wait for 50 blocks (execution_block) + logging.console.info("Step 4: Waiting for execution block") + current_block = await async_subtensor.chain.get_current_block() + execution_block = announcement.execution_block + logging.console.info( + f"Current block: {current_block}, Execution block: {execution_block}" + ) + await async_subtensor.wait_for_block(execution_block + 1) + + # Step 5: Execute swap + logging.console.info("Step 5: Executing swap") + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to execute swap: {response.message}" + + # Step 6: Verify announcement was removed after swap + logging.console.info("Step 6: Verify announcement was removed after swap") + announcement_after = await async_subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement_after is None, "Announcement should be removed after swap" + + logging.console.info("Happy Path completed successfully") + + # Refund Alice balance for further tests (Bob now has all Alice's funds after swap) + logging.console.info("Refunding Alice balance for further tests") + bob_balance = await async_subtensor.wallets.get_balance( + bob_wallet.coldkeypub.ss58_address + ) + refund_amount = Balance.from_tao(10) + assert bob_balance > refund_amount, ( + f"Bob balance ({bob_balance}) too low to refund Alice ({refund_amount})" + ) + response = await async_subtensor.extrinsics.transfer( + wallet=bob_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, f"Failed to refund Alice: {response.message}" + logging.console.info("Alice balance refunded successfully") + + # === 2. Error cases for swap_coldkey_announced === + logging.console.info("Testing errors for swap_coldkey_announced") + + # Error 1: Attempt to execute swap without announcement + logging.console.info("Error 1: Attempting swap without announcement") + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail without announcement" + assert "No coldkey swap announcement found" in response.message + logging.console.info("Error 1 passed: No announcement error") + + # Error 2: Hash mismatch + logging.console.info("Error 2: Testing hash mismatch") + # Alice announces swap to Bob + response = await async_subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Attempt to execute swap with incorrect coldkey (Charlie instead of Bob) + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=charlie_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail with hash mismatch" + assert "hash does not match" in response.message.lower() + logging.console.info("Error 2 passed: Hash mismatch error") + + # Cleanup: Remove announcement from Error 2 by waiting and executing swap + logging.console.info("Cleaning up announcement from Error 2") + announcement_from_error2 = ( + await async_subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + ) + assert announcement_from_error2 is not None, ( + "Announcement from Error 2 should exist" + ) + # Wait for execution block (wait_for_block is safe even if block already passed) + await async_subtensor.wait_for_block(announcement_from_error2.execution_block + 1) + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to cleanup announcement: {response.message}" + logging.console.info("Cleaned up announcement from Error 2") + # Refund Alice balance after swap + bob_balance = await async_subtensor.wallets.get_balance( + bob_wallet.coldkeypub.ss58_address + ) + refund_amount = Balance.from_tao(10) + assert bob_balance > refund_amount, ( + f"Bob balance ({bob_balance}) too low to refund Alice ({refund_amount})" + ) + response = await async_subtensor.extrinsics.transfer( + wallet=bob_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, f"Failed to refund Alice: {response.message}" + logging.console.info("Refunded Alice balance after cleanup") + + # Error 3: Too early (before execution block) + logging.console.info("Error 3: Testing too early error") + # Create new announcement for this test + response = await async_subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + announcement = await async_subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Announcement should exist for Error 3 test" + + # Attempt to execute swap immediately (before execution_block) + current_block = await async_subtensor.chain.get_current_block() + assert current_block < announcement.execution_block, ( + "Current block should be before execution block" + ) + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail with too early error" + assert "too early" in response.message.lower() + assert str(announcement.execution_block) in response.message + logging.console.info("Error 3 passed: Too early error") + + # Wait for execution_block and execute swap for cleanup + await async_subtensor.wait_for_block(announcement.execution_block) + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, response.message + logging.console.info("Cleaned up announcement by executing swap") + # Refund Alice balance after swap + + refund_amount = Balance.from_tao(10) + response = await async_subtensor.extrinsics.transfer( + wallet=bob_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, response.message + + # === 3. Error cases for announce_coldkey_swap === + logging.console.info("Testing errors for announce_coldkey_swap") + + # Error 4: Duplicate announcement (reannouncement) + logging.console.info("Error 4: Testing duplicate announcement") + # Create first announcement + response = await async_subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Attempt to create second announcement (to Charlie) + response = await async_subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=charlie_wallet.coldkeypub.ss58_address, + raise_error=False, + ) + assert not response.success, "Should fail with duplicate announcement" + + # Verify that there is an active announcement + announcement = await async_subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Should have an active announcement" + logging.console.info("Error 4: Duplicate announcement handled") + + # Cleanup: Remove announcement from Error 4 by waiting and executing swap + logging.console.info("Cleaning up announcement from Error 4") + from bittensor.core.extrinsics.utils import verify_coldkey_hash + from bittensor_wallet import Keypair + + # Determine which coldkey matches the announcement hash + bob_keypair = Keypair(ss58_address=bob_wallet.coldkeypub.ss58_address) + charlie_keypair = Keypair(ss58_address=charlie_wallet.coldkeypub.ss58_address) + + assert verify_coldkey_hash( + bob_keypair, announcement.new_coldkey_hash + ) or verify_coldkey_hash(charlie_keypair, announcement.new_coldkey_hash), ( + "Announcement hash should match either Bob or Charlie" + ) + + # Use the matching coldkey + target_coldkey = ( + bob_wallet.coldkeypub.ss58_address + if verify_coldkey_hash(bob_keypair, announcement.new_coldkey_hash) + else charlie_wallet.coldkeypub.ss58_address + ) + refund_wallet = ( + bob_wallet + if verify_coldkey_hash(bob_keypair, announcement.new_coldkey_hash) + else charlie_wallet + ) + + # Wait for execution block + await async_subtensor.wait_for_block(announcement.execution_block + 1) + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=target_coldkey, + ) + assert response.success, f"Failed to cleanup announcement: {response.message}" + logging.console.info("Cleaned up announcement from Error 4") + + # Refund Alice balance after swap + refund_amount = Balance.from_tao(10) + refund_balance = await async_subtensor.wallets.get_balance( + refund_wallet.coldkeypub.ss58_address + ) + assert refund_balance > refund_amount, ( + f"{refund_wallet.name} balance ({refund_balance}) too low to refund Alice ({refund_amount})" + ) + response = await async_subtensor.extrinsics.transfer( + wallet=refund_wallet, + destination_ss58=alice_wallet.coldkeypub.ss58_address, + amount=refund_amount, + ) + assert response.success, f"Failed to refund Alice: {response.message}" + logging.console.info( + f"Refunded Alice balance from {refund_wallet.name} after cleanup" + ) + + # === 4. Transaction blocking after announcement === + logging.console.info("Testing transaction blocking after announcement") + + # Step 1: Alice announces swap to Bob + logging.console.info("Step 1: Alice announces swap to Bob") + # Verify no existing announcement + existing_announcement = await async_subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert existing_announcement is None, ( + "No announcement should exist before creating new one" + ) + response = await async_subtensor.extrinsics.announce_coldkey_swap( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to announce swap: {response.message}" + + # Step 2: Attempt to execute other transaction from Alice (transfer) + logging.console.info("Step 2: Attempting transfer transaction (should be blocked)") + transfer_value = Balance.from_tao(1) + dest_coldkey = charlie_wallet.coldkeypub.ss58_address + + response = await async_subtensor.extrinsics.transfer( + wallet=alice_wallet, + destination_ss58=dest_coldkey, + amount=transfer_value, + raise_error=False, + ) + + # Step 3: Verify transaction is blocked + assert not response.success, "Transfer should be blocked after announcement" + # Error code 0 corresponds to ColdkeySwapAnnounced (see CustomTransactionError enum) + # The message may contain "Custom error: 0" or specific text about swap + assert ( + "Custom error: 0" in response.message + or "ColdkeySwapAnnounced" in response.message + or "swap" in response.message.lower() + ), ( + f"Expected transaction to be blocked by ColdkeySwapAnnounced, got: {response.message}" + ) + logging.console.info("Transaction blocking test passed") + + # Cleanup: wait for execution_block and execute swap + announcement = await async_subtensor.wallets.get_coldkey_swap_announcement( + coldkey_ss58=alice_wallet.coldkeypub.ss58_address + ) + assert announcement is not None, "Announcement should exist for cleanup" + await async_subtensor.wait_for_block(announcement.execution_block + 1) + response = await async_subtensor.extrinsics.swap_coldkey_announced( + wallet=alice_wallet, + new_coldkey_ss58=bob_wallet.coldkeypub.ss58_address, + ) + assert response.success, f"Failed to cleanup announcement: {response.message}" + logging.console.info("Cleaned up announcement by executing swap") + + logging.console.info("All coldkey swap E2E tests completed successfully") diff --git a/tests/unit_tests/chain_data/test_coldkey_swap.py b/tests/unit_tests/chain_data/test_coldkey_swap.py new file mode 100644 index 0000000000..2908fc7832 --- /dev/null +++ b/tests/unit_tests/chain_data/test_coldkey_swap.py @@ -0,0 +1,42 @@ +from bittensor.core.chain_data.coldkey_swap import ( + ColdkeySwapAnnouncementInfo, + ColdkeySwapConstants, +) +from async_substrate_interface.types import ScaleObj + + +def test_coldkey_swap_announcement_info_from_query_none(mocker): + """Test from_query returns None when query has no value.""" + # Prep + coldkey_ss58 = mocker.Mock(spec=str) + query = mocker.Mock(spec=ScaleObj) + + # Call + from_query = ColdkeySwapAnnouncementInfo.from_query(coldkey_ss58, query) + + # Asserts + assert from_query is None + + +def test_coldkey_swap_announcement_info_from_query_happy_path(mocker): + """Test from_query returns ColdkeySwapAnnouncementInfo when query has valid data.""" + # Prep + coldkey_ss58 = mocker.Mock(spec=str) + fake_block = mocker.Mock(spec=int) + fake_hash_data = mocker.Mock(spec=list) + query = mocker.Mock(value=(fake_block, (fake_hash_data,))) + + mocked_bytes = mocker.patch("bittensor.core.chain_data.coldkey_swap.bytes") + + # Call + from_query = ColdkeySwapAnnouncementInfo.from_query(coldkey_ss58, query) + + # Asserts + mocked_bytes.assert_called_once_with(fake_hash_data) + assert from_query is not None, "Should return ColdkeySwapAnnouncementInfo object" + assert from_query.coldkey == coldkey_ss58 + assert from_query.execution_block == fake_block + assert ( + from_query.new_coldkey_hash + == mocked_bytes.return_value.hex.return_value.__radd__.return_value + ) diff --git a/tests/unit_tests/extrinsics/asyncex/test_coldkey_swap.py b/tests/unit_tests/extrinsics/asyncex/test_coldkey_swap.py new file mode 100644 index 0000000000..b4abda1293 --- /dev/null +++ b/tests/unit_tests/extrinsics/asyncex/test_coldkey_swap.py @@ -0,0 +1,402 @@ +import pytest +from bittensor_wallet import Wallet +from scalecodec.types import GenericCall + +from bittensor.core.extrinsics.asyncex import coldkey_swap +from bittensor.core.extrinsics.pallets import SubtensorModule +from bittensor.core.settings import DEFAULT_MEV_PROTECTION +from bittensor.core.types import ExtrinsicResponse +from bittensor.core.chain_data.coldkey_swap import ColdkeySwapAnnouncementInfo + + +@pytest.mark.asyncio +async def test_announce_coldkey_swap_extrinsic(subtensor, mocker): + """Verify that async `announce_coldkey_swap_extrinsic` method calls proper methods.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_keypair = mocker.patch( + "bittensor.core.extrinsics.asyncex.coldkey_swap.Keypair" + ) + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_compute_hash = mocker.patch.object( + coldkey_swap, "compute_coldkey_hash", return_value="0x" + "00" * 32 + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.announce_coldkey_swap = mocker.AsyncMock( + return_value=mocker.MagicMock() + ) + mocked_sign_and_send_extrinsic = mocker.patch.object( + subtensor, + "sign_and_send_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = await coldkey_swap.announce_coldkey_swap_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_keypair.assert_called_once_with(ss58_address=new_coldkey_ss58) + mocked_compute_hash.assert_called_once_with(mocked_keypair_instance) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.announce_coldkey_swap.assert_awaited_once_with( + new_coldkey_hash="0x" + "00" * 32 + ) + mocked_sign_and_send_extrinsic.assert_awaited_once_with( + call=mocked_pallet_instance.announce_coldkey_swap.return_value, + wallet=wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + period=None, + raise_error=False, + ) + assert response == mocked_sign_and_send_extrinsic.return_value + + +@pytest.mark.asyncio +async def test_announce_coldkey_swap_extrinsic_with_mev_protection(subtensor, mocker): + """Verify that async `announce_coldkey_swap_extrinsic` uses MEV protection when enabled.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_keypair = mocker.patch( + "bittensor.core.extrinsics.asyncex.coldkey_swap.Keypair" + ) + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_compute_hash = mocker.patch.object( + coldkey_swap, "compute_coldkey_hash", return_value="0x" + "00" * 32 + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.announce_coldkey_swap = mocker.AsyncMock( + return_value=mocker.MagicMock() + ) + mocked_submit_encrypted = mocker.patch.object( + coldkey_swap, + "submit_encrypted_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = await coldkey_swap.announce_coldkey_swap_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=True, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.announce_coldkey_swap.assert_awaited_once_with( + new_coldkey_hash="0x" + "00" * 32 + ) + mocked_submit_encrypted.assert_awaited_once() + assert response == mocked_submit_encrypted.return_value + + +@pytest.mark.asyncio +async def test_swap_coldkey_announced_extrinsic_success(subtensor, mocker): + """Verify that async `swap_coldkey_announced_extrinsic` method calls proper methods.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + fake_hash = "0x" + "00" * 32 + + announcement = ColdkeySwapAnnouncementInfo( + coldkey=wallet.coldkeypub.ss58_address, + execution_block=1000, + new_coldkey_hash=fake_hash, + ) + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=announcement + ) + mocked_get_current_block = mocker.patch.object( + subtensor, "get_current_block", return_value=1001 + ) + mocked_keypair = mocker.patch( + "bittensor.core.extrinsics.asyncex.coldkey_swap.Keypair" + ) + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_verify_hash = mocker.patch.object( + coldkey_swap, "verify_coldkey_hash", return_value=True + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.swap_coldkey_announced = mocker.AsyncMock( + return_value=mocker.MagicMock() + ) + mocked_sign_and_send_extrinsic = mocker.patch.object( + subtensor, + "sign_and_send_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = await coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_awaited_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + mocked_get_current_block.assert_awaited_once() + mocked_verify_hash.assert_called_once_with(mocked_keypair_instance, fake_hash) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.swap_coldkey_announced.assert_awaited_once_with( + new_coldkey=new_coldkey_ss58 + ) + mocked_sign_and_send_extrinsic.assert_awaited_once_with( + call=mocked_pallet_instance.swap_coldkey_announced.return_value, + wallet=wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + period=None, + raise_error=False, + ) + assert response == mocked_sign_and_send_extrinsic.return_value + + +@pytest.mark.asyncio +async def test_swap_coldkey_announced_extrinsic_no_announcement(subtensor, mocker): + """Verify that async `swap_coldkey_announced_extrinsic` returns error when no announcement.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=None + ) + + # Call + response = await coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + raise_error=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_awaited_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + assert response.success is False + assert "No coldkey swap announcement found" in response.message + + +@pytest.mark.asyncio +async def test_swap_coldkey_announced_extrinsic_hash_mismatch(subtensor, mocker): + """Verify that async `swap_coldkey_announced_extrinsic` returns error when hash doesn't match.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + fake_hash = "0x" + "00" * 32 + + announcement = ColdkeySwapAnnouncementInfo( + coldkey=wallet.coldkeypub.ss58_address, + execution_block=1000, + new_coldkey_hash=fake_hash, + ) + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=announcement + ) + mocked_keypair = mocker.patch( + "bittensor.core.extrinsics.asyncex.coldkey_swap.Keypair" + ) + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_verify_hash = mocker.patch.object( + coldkey_swap, "verify_coldkey_hash", return_value=False + ) + mocked_compute_hash = mocker.patch.object( + coldkey_swap, "compute_coldkey_hash", return_value="0x" + "11" * 32 + ) + + # Call + response = await coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + raise_error=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_awaited_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + mocked_verify_hash.assert_called_once_with(mocked_keypair_instance, fake_hash) + assert response.success is False + assert "hash does not match" in response.message.lower() + + +@pytest.mark.asyncio +async def test_swap_coldkey_announced_extrinsic_too_early(subtensor, mocker): + """Verify that async `swap_coldkey_announced_extrinsic` returns error when too early.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + fake_hash = "0x" + "00" * 32 + + announcement = ColdkeySwapAnnouncementInfo( + coldkey=wallet.coldkeypub.ss58_address, + execution_block=1000, + new_coldkey_hash=fake_hash, + ) + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=announcement + ) + mocked_get_current_block = mocker.patch.object( + subtensor, "get_current_block", return_value=999 + ) + mocked_keypair = mocker.patch( + "bittensor.core.extrinsics.asyncex.coldkey_swap.Keypair" + ) + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_verify_hash = mocker.patch.object( + coldkey_swap, "verify_coldkey_hash", return_value=True + ) + + # Call + response = await coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + raise_error=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_awaited_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + mocked_get_current_block.assert_awaited_once() + mocked_verify_hash.assert_called_once_with(mocked_keypair_instance, fake_hash) + assert response.success is False + assert "too early" in response.message.lower() + assert "999" in response.message + assert "1000" in response.message + + +@pytest.mark.asyncio +async def test_remove_coldkey_swap_announcement_extrinsic(subtensor, mocker): + """Verify that async `remove_coldkey_swap_announcement_extrinsic` method calls proper methods.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + coldkey_ss58 = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.remove_coldkey_swap_announcement = mocker.AsyncMock( + return_value=mocker.MagicMock() + ) + mocked_sign_and_send_extrinsic = mocker.patch.object( + subtensor, + "sign_and_send_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = await coldkey_swap.remove_coldkey_swap_announcement_extrinsic( + subtensor=subtensor, + wallet=wallet, + coldkey_ss58=coldkey_ss58, + mev_protection=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.remove_coldkey_swap_announcement.assert_awaited_once_with( + coldkey=coldkey_ss58 + ) + mocked_sign_and_send_extrinsic.assert_awaited_once_with( + call=mocked_pallet_instance.remove_coldkey_swap_announcement.return_value, + wallet=wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + period=None, + raise_error=False, + ) + assert response == mocked_sign_and_send_extrinsic.return_value diff --git a/tests/unit_tests/extrinsics/test_coldkey_swap.py b/tests/unit_tests/extrinsics/test_coldkey_swap.py new file mode 100644 index 0000000000..60e2d8dcb0 --- /dev/null +++ b/tests/unit_tests/extrinsics/test_coldkey_swap.py @@ -0,0 +1,378 @@ +from bittensor_wallet import Wallet +from scalecodec.types import GenericCall + +from bittensor.core.extrinsics import coldkey_swap +from bittensor.core.extrinsics.pallets import SubtensorModule +from bittensor.core.settings import DEFAULT_MEV_PROTECTION +from bittensor.core.types import ExtrinsicResponse +from bittensor.core.chain_data.coldkey_swap import ColdkeySwapAnnouncementInfo + + +def test_announce_coldkey_swap_extrinsic(subtensor, mocker): + """Verify that sync `announce_coldkey_swap_extrinsic` method calls proper methods.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_keypair = mocker.patch("bittensor.core.extrinsics.coldkey_swap.Keypair") + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_compute_hash = mocker.patch.object( + coldkey_swap, "compute_coldkey_hash", return_value="0x" + "00" * 32 + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.announce_coldkey_swap.return_value = mocker.MagicMock() + mocked_sign_and_send_extrinsic = mocker.patch.object( + subtensor, + "sign_and_send_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = coldkey_swap.announce_coldkey_swap_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_keypair.assert_called_once_with(ss58_address=new_coldkey_ss58) + mocked_compute_hash.assert_called_once_with(mocked_keypair_instance) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.announce_coldkey_swap.assert_called_once_with( + new_coldkey_hash="0x" + "00" * 32 + ) + mocked_sign_and_send_extrinsic.assert_called_once_with( + call=mocked_pallet_instance.announce_coldkey_swap.return_value, + wallet=wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + period=None, + raise_error=False, + ) + assert response == mocked_sign_and_send_extrinsic.return_value + + +def test_announce_coldkey_swap_extrinsic_with_mev_protection(subtensor, mocker): + """Verify that sync `announce_coldkey_swap_extrinsic` uses MEV protection when enabled.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_keypair = mocker.patch("bittensor.core.extrinsics.coldkey_swap.Keypair") + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_compute_hash = mocker.patch.object( + coldkey_swap, "compute_coldkey_hash", return_value="0x" + "00" * 32 + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.announce_coldkey_swap.return_value = mocker.MagicMock() + mocked_submit_encrypted = mocker.patch.object( + coldkey_swap, + "submit_encrypted_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = coldkey_swap.announce_coldkey_swap_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=True, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.announce_coldkey_swap.assert_called_once_with( + new_coldkey_hash="0x" + "00" * 32 + ) + mocked_submit_encrypted.assert_called_once() + assert response == mocked_submit_encrypted.return_value + + +def test_swap_coldkey_announced_extrinsic_success(subtensor, mocker): + """Verify that sync `swap_coldkey_announced_extrinsic` method calls proper methods.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + fake_hash = "0x" + "00" * 32 + + announcement = ColdkeySwapAnnouncementInfo( + coldkey=wallet.coldkeypub.ss58_address, + execution_block=1000, + new_coldkey_hash=fake_hash, + ) + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=announcement + ) + mocked_get_current_block = mocker.patch.object( + subtensor, "get_current_block", return_value=1001 + ) + mocked_keypair = mocker.patch("bittensor.core.extrinsics.coldkey_swap.Keypair") + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_verify_hash = mocker.patch.object( + coldkey_swap, "verify_coldkey_hash", return_value=True + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.swap_coldkey_announced.return_value = mocker.MagicMock() + mocked_sign_and_send_extrinsic = mocker.patch.object( + subtensor, + "sign_and_send_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_called_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + mocked_get_current_block.assert_called_once() + mocked_verify_hash.assert_called_once_with(mocked_keypair_instance, fake_hash) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.swap_coldkey_announced.assert_called_once_with( + new_coldkey=new_coldkey_ss58 + ) + mocked_sign_and_send_extrinsic.assert_called_once_with( + call=mocked_pallet_instance.swap_coldkey_announced.return_value, + wallet=wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + period=None, + raise_error=False, + ) + assert response == mocked_sign_and_send_extrinsic.return_value + + +def test_swap_coldkey_announced_extrinsic_no_announcement(subtensor, mocker): + """Verify that sync `swap_coldkey_announced_extrinsic` returns error when no announcement.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=None + ) + + # Call + response = coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + raise_error=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_called_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + assert response.success is False + assert "No coldkey swap announcement found" in response.message + + +def test_swap_coldkey_announced_extrinsic_hash_mismatch(subtensor, mocker): + """Verify that sync `swap_coldkey_announced_extrinsic` returns error when hash doesn't match.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + fake_hash = "0x" + "00" * 32 + + announcement = ColdkeySwapAnnouncementInfo( + coldkey=wallet.coldkeypub.ss58_address, + execution_block=1000, + new_coldkey_hash=fake_hash, + ) + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=announcement + ) + mocked_keypair = mocker.patch("bittensor.core.extrinsics.coldkey_swap.Keypair") + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_verify_hash = mocker.patch.object( + coldkey_swap, "verify_coldkey_hash", return_value=False + ) + mocked_compute_hash = mocker.patch.object( + coldkey_swap, "compute_coldkey_hash", return_value="0x" + "11" * 32 + ) + + # Call + response = coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + raise_error=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_called_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + mocked_verify_hash.assert_called_once_with(mocked_keypair_instance, fake_hash) + assert response.success is False + assert "hash does not match" in response.message.lower() + + +def test_swap_coldkey_announced_extrinsic_too_early(subtensor, mocker): + """Verify that sync `swap_coldkey_announced_extrinsic` returns error when too early.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + wallet.coldkeypub.ss58_address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + new_coldkey_ss58 = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" + fake_hash = "0x" + "00" * 32 + + announcement = ColdkeySwapAnnouncementInfo( + coldkey=wallet.coldkeypub.ss58_address, + execution_block=1000, + new_coldkey_hash=fake_hash, + ) + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_get_announcement = mocker.patch.object( + subtensor, "get_coldkey_swap_announcement", return_value=announcement + ) + mocked_get_current_block = mocker.patch.object( + subtensor, "get_current_block", return_value=999 + ) + mocked_keypair = mocker.patch("bittensor.core.extrinsics.coldkey_swap.Keypair") + mocked_keypair_instance = mocker.MagicMock() + mocked_keypair_instance.public_key = b"\x00" * 32 + mocked_keypair.return_value = mocked_keypair_instance + + mocked_verify_hash = mocker.patch.object( + coldkey_swap, "verify_coldkey_hash", return_value=True + ) + + # Call + response = coldkey_swap.swap_coldkey_announced_extrinsic( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + raise_error=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_get_announcement.assert_called_once_with( + coldkey_ss58=wallet.coldkeypub.ss58_address + ) + mocked_get_current_block.assert_called_once() + mocked_verify_hash.assert_called_once_with(mocked_keypair_instance, fake_hash) + assert response.success is False + assert "too early" in response.message.lower() + assert "999" in response.message + assert "1000" in response.message + + +def test_remove_coldkey_swap_announcement_extrinsic(subtensor, mocker): + """Verify that sync `remove_coldkey_swap_announcement_extrinsic` method calls proper methods.""" + # Preps + wallet = mocker.MagicMock(spec=Wallet) + coldkey_ss58 = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY" + + mocked_unlock_wallet = mocker.patch.object( + ExtrinsicResponse, + "unlock_wallet", + return_value=ExtrinsicResponse(success=True, message="Unlocked"), + ) + mocked_subtensor_module = mocker.patch.object( + coldkey_swap, "SubtensorModule", return_value=mocker.MagicMock() + ) + mocked_pallet_instance = mocked_subtensor_module.return_value + mocked_pallet_instance.remove_coldkey_swap_announcement.return_value = ( + mocker.MagicMock() + ) + mocked_sign_and_send_extrinsic = mocker.patch.object( + subtensor, + "sign_and_send_extrinsic", + return_value=ExtrinsicResponse(True, "Success"), + ) + + # Call + response = coldkey_swap.remove_coldkey_swap_announcement_extrinsic( + subtensor=subtensor, + wallet=wallet, + coldkey_ss58=coldkey_ss58, + mev_protection=False, + ) + + # Asserts + mocked_unlock_wallet.assert_called_once_with(wallet, False) + mocked_subtensor_module.assert_called_once_with(subtensor) + mocked_pallet_instance.remove_coldkey_swap_announcement.assert_called_once_with( + coldkey=coldkey_ss58 + ) + mocked_sign_and_send_extrinsic.assert_called_once_with( + call=mocked_pallet_instance.remove_coldkey_swap_announcement.return_value, + wallet=wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + period=None, + raise_error=False, + ) + assert response == mocked_sign_and_send_extrinsic.return_value diff --git a/tests/unit_tests/extrinsics/test_utils.py b/tests/unit_tests/extrinsics/test_utils.py index d77c3cc5e7..6b9d30900c 100644 --- a/tests/unit_tests/extrinsics/test_utils.py +++ b/tests/unit_tests/extrinsics/test_utils.py @@ -1,6 +1,7 @@ from bittensor.core.chain_data import StakeInfo from bittensor.core.extrinsics import utils from bittensor.utils.balance import Balance +from bittensor_wallet import Keypair def test_old_stake(subtensor, mocker): @@ -29,3 +30,61 @@ def test_old_stake(subtensor, mocker): result = utils.get_old_stakes(wallet, hotkey_ss58s, netuids, all_stakes) assert result == [expected_stake, Balance.from_tao(0)] + + +def test_compute_coldkey_hash(): + """Test compute_coldkey_hash computes correct BlakeTwo256 hash.""" + # Prep + keypair = Keypair(ss58_address="5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY") + expected_hash_length = 66 # 0x + 64 hex chars + + # Call + result = utils.compute_coldkey_hash(keypair) + + # Asserts + assert result.startswith("0x") + assert len(result) == expected_hash_length + assert all(c in "0123456789abcdef" for c in result[2:].lower()) + + +def test_verify_coldkey_hash_match(): + """Test verify_coldkey_hash returns True when hash matches.""" + # Prep + keypair = Keypair(ss58_address="5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY") + expected_hash = utils.compute_coldkey_hash(keypair) + + # Call + result = utils.verify_coldkey_hash(keypair, expected_hash) + + # Asserts + assert result is True + + +def test_verify_coldkey_hash_mismatch(): + """Test verify_coldkey_hash returns False when hash doesn't match.""" + # Prep + keypair = Keypair(ss58_address="5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY") + wrong_hash = "0x" + "00" * 32 + + # Call + result = utils.verify_coldkey_hash(keypair, wrong_hash) + + # Asserts + assert result is False + + +def test_verify_coldkey_hash_case_insensitive(): + """Test verify_coldkey_hash is case insensitive.""" + # Prep + keypair = Keypair(ss58_address="5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY") + expected_hash = utils.compute_coldkey_hash(keypair) + upper_hash = expected_hash.upper() + lower_hash = expected_hash.lower() + + # Call + result_upper = utils.verify_coldkey_hash(keypair, upper_hash) + result_lower = utils.verify_coldkey_hash(keypair, lower_hash) + + # Asserts + assert result_upper is True + assert result_lower is True diff --git a/tests/unit_tests/test_async_subtensor.py b/tests/unit_tests/test_async_subtensor.py index 0f83110729..0e141c2a98 100644 --- a/tests/unit_tests/test_async_subtensor.py +++ b/tests/unit_tests/test_async_subtensor.py @@ -6315,3 +6315,205 @@ async def test_mev_submit_encrypted_default_params(subtensor, fake_wallet, mocke blocks_for_revealed_execution=3, ) assert result == mocked_submit_encrypted_extrinsic.return_value + + +@pytest.mark.asyncio +async def test_get_coldkey_swap_announcement(subtensor, mocker): + """Test get_coldkey_swap_announcement returns correct data when announcement information is found.""" + # Prep + fake_coldkey_ss58 = mocker.Mock(spec=str) + + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + mocked_query = mocker.patch.object(subtensor.substrate, "query") + mocked_from_query = mocker.patch.object( + async_subtensor.ColdkeySwapAnnouncementInfo, "from_query" + ) + + # Call + result = await subtensor.get_coldkey_swap_announcement( + coldkey_ss58=fake_coldkey_ss58 + ) + + # Asserts + mocked_determine_block_hash.assert_awaited_once_with(None, None, False) + mocked_query.assert_awaited_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + params=[fake_coldkey_ss58], + block_hash=mocked_determine_block_hash.return_value, + reuse_block_hash=False, + ) + mocked_from_query.assert_called_once_with( + coldkey_ss58=fake_coldkey_ss58, query=mocked_query.return_value + ) + assert result == mocked_from_query.return_value + + +@pytest.mark.asyncio +async def test_get_coldkey_swap_announcements(subtensor, mocker): + """Test get_coldkey_swap_announcements returns correct data when announcement information is found.""" + # Prep + fake_record = mocker.Mock() + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + fake_query_result = mocker.AsyncMock() + fake_query_result.__aiter__.return_value = iter((fake_record,)) + mocked_query_map = mocker.patch.object( + subtensor.substrate, "query_map", return_value=fake_query_result + ) + mocked_from_record = mocker.patch.object( + async_subtensor.ColdkeySwapAnnouncementInfo, "from_record" + ) + + # Call + result = await subtensor.get_coldkey_swap_announcements() + + # Asserts + mocked_determine_block_hash.assert_awaited_once_with(None, None, False) + mocked_query_map.assert_awaited_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + block_hash=mocked_determine_block_hash.return_value, + reuse_block_hash=False, + ) + mocked_from_record.assert_called_once_with(fake_record) + assert result == [mocked_from_record.return_value] + + +@pytest.mark.asyncio +async def test_get_coldkey_swap_announcement_delay(subtensor, mocker): + """Test get_coldkey_swap_announcement_delay returns correct value when found.""" + # Prep + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + mocked_query = mocker.patch.object(subtensor.substrate, "query") + + # Call + result = await subtensor.get_coldkey_swap_announcement_delay() + + # Asserts + mocked_determine_block_hash.assert_awaited_once_with(None, None, False) + mocked_query.assert_awaited_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncementDelay", + block_hash=mocked_determine_block_hash.return_value, + reuse_block_hash=False, + ) + assert result == mocked_query.return_value.value + + +@pytest.mark.asyncio +async def test_get_coldkey_swap_reannouncement_delay(subtensor, mocker): + """Test get_coldkey_swap_reannouncement_delay returns correct value when found.""" + # Prep + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + mocked_query = mocker.patch.object(subtensor.substrate, "query") + + # Call + result = await subtensor.get_coldkey_swap_reannouncement_delay() + + # Asserts + mocked_determine_block_hash.assert_awaited_once_with(None, None, False) + mocked_query.assert_awaited_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapReannouncementDelay", + block_hash=mocked_determine_block_hash.return_value, + reuse_block_hash=False, + ) + assert result == mocked_query.return_value.value + + +@pytest.mark.asyncio +async def test_get_coldkey_swap_constants(subtensor, mocker): + """Test get_coldkey_swap_constants returns correct data when constants are found.""" + # Prep + fake_name = mocker.Mock(spec=str) + fake_value = mocker.Mock(value=mocker.Mock(spec=int)) + mocked_constants_names = mocker.patch.object( + async_subtensor.ColdkeySwapConstants, + "constants_names", + return_value=[fake_name], + ) + mocked_query_constant = mocker.patch.object( + subtensor, + "query_constant", + side_effect=[fake_value], + ) + mocked_from_dict = mocker.patch.object( + async_subtensor.ColdkeySwapConstants, "from_dict" + ) + + # Call + result = await subtensor.get_coldkey_swap_constants() + + # Asserts + mocked_constants_names.assert_called_once_with() + mocked_query_constant.assert_awaited_once_with( + module_name="SubtensorModule", + constant_name=fake_name, + block=None, + block_hash=None, + reuse_block=False, + ) + mocked_from_dict.assert_called_once_with({fake_name: fake_value.value}) + assert result == mocked_from_dict.return_value + + +@pytest.mark.asyncio +async def test_announce_coldkey_swap(mocker, subtensor): + """Tests `announce_coldkey_swap` extrinsic call method.""" + # preps + wallet = mocker.Mock(spec=Wallet) + new_coldkey_ss58 = mocker.Mock(spec=str) + mocked_announce_coldkey_swap_extrinsic = mocker.patch.object( + async_subtensor, "announce_coldkey_swap_extrinsic" + ) + + # call + response = await subtensor.announce_coldkey_swap( + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + ) + + # asserts + mocked_announce_coldkey_swap_extrinsic.assert_awaited_once_with( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=DEFAULT_MEV_PROTECTION, + period=DEFAULT_PERIOD, + raise_error=False, + wait_for_inclusion=True, + wait_for_finalization=True, + wait_for_revealed_execution=True, + ) + assert response == mocked_announce_coldkey_swap_extrinsic.return_value + + +@pytest.mark.asyncio +async def test_swap_coldkey_announced(mocker, subtensor): + """Tests `swap_coldkey_announced` extrinsic call method.""" + # preps + wallet = mocker.Mock(spec=Wallet) + new_coldkey_ss58 = mocker.Mock(spec=str) + mocked_swap_coldkey_announced_extrinsic = mocker.patch.object( + async_subtensor, "swap_coldkey_announced_extrinsic" + ) + + # call + response = await subtensor.swap_coldkey_announced( + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + ) + + # asserts + mocked_swap_coldkey_announced_extrinsic.assert_awaited_once_with( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=DEFAULT_MEV_PROTECTION, + period=DEFAULT_PERIOD, + raise_error=False, + wait_for_inclusion=True, + wait_for_finalization=True, + wait_for_revealed_execution=True, + ) + assert response == mocked_swap_coldkey_announced_extrinsic.return_value diff --git a/tests/unit_tests/test_subtensor.py b/tests/unit_tests/test_subtensor.py index 0d70415874..d66240c450 100644 --- a/tests/unit_tests/test_subtensor.py +++ b/tests/unit_tests/test_subtensor.py @@ -6430,3 +6430,188 @@ def test_mev_submit_encrypted_default_params(subtensor, fake_wallet, mocker): blocks_for_revealed_execution=3, ) assert result == mocked_submit_encrypted_extrinsic.return_value + + +def test_get_coldkey_swap_announcement(subtensor, mocker): + """Test get_coldkey_swap_announcement returns correct data when announcement information is found.""" + # Prep + fake_coldkey_ss58 = mocker.Mock(spec=str) + + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + mocked_query = mocker.patch.object(subtensor.substrate, "query") + mocked_from_query = mocker.patch.object( + subtensor_module.ColdkeySwapAnnouncementInfo, "from_query" + ) + + # Call + result = subtensor.get_coldkey_swap_announcement(coldkey_ss58=fake_coldkey_ss58) + + # Asserts + mocked_determine_block_hash.assert_called_once_with(None) + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + params=[fake_coldkey_ss58], + block_hash=mocked_determine_block_hash.return_value, + ) + mocked_from_query.assert_called_once_with( + coldkey_ss58=fake_coldkey_ss58, query=mocked_query.return_value + ) + assert result == mocked_from_query.return_value + + +def test_get_coldkey_swap_announcements(subtensor, mocker): + """Test get_coldkey_swap_announcements returns correct data when announcement information is found.""" + # Prep + fake_record = mocker.Mock() + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + mocked_query_map = mocker.patch.object( + subtensor.substrate, "query_map", return_value=(fake_record,) + ) + mocked_from_record = mocker.patch.object( + subtensor_module.ColdkeySwapAnnouncementInfo, "from_record" + ) + + # Call + result = subtensor.get_coldkey_swap_announcements() + + # Asserts + mocked_determine_block_hash.assert_called_once_with(None) + mocked_query_map.assert_called_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + block_hash=mocked_determine_block_hash.return_value, + ) + mocked_from_record.assert_called_once_with(fake_record) + assert result == [mocked_from_record.return_value] + + +def test_get_coldkey_swap_announcement_delay(subtensor, mocker): + """Test get_coldkey_swap_announcement_delay returns correct value when found.""" + # Prep + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + mocked_query = mocker.patch.object(subtensor.substrate, "query") + + # Call + result = subtensor.get_coldkey_swap_announcement_delay() + + # Asserts + mocked_determine_block_hash.assert_called_once_with(None) + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncementDelay", + block_hash=mocked_determine_block_hash.return_value, + ) + assert result == mocked_query.return_value.value + + +def test_get_coldkey_swap_reannouncement_delay(subtensor, mocker): + """Test get_coldkey_swap_reannouncement_delay returns correct value when found.""" + # Prep + mocked_determine_block_hash = mocker.patch.object(subtensor, "determine_block_hash") + mocked_query = mocker.patch.object(subtensor.substrate, "query") + + # Call + result = subtensor.get_coldkey_swap_reannouncement_delay() + + # Asserts + mocked_determine_block_hash.assert_called_once_with(None) + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="ColdkeySwapReannouncementDelay", + block_hash=mocked_determine_block_hash.return_value, + ) + assert result == mocked_query.return_value.value + + +def test_get_coldkey_swap_constants(subtensor, mocker): + """Test get_coldkey_swap_constants returns correct data when constants are found.""" + # Prep + fake_name = mocker.Mock(spec=str) + fake_value = mocker.Mock(value=mocker.Mock(spec=int)) + mocked_constants_names = mocker.patch.object( + subtensor_module.ColdkeySwapConstants, + "constants_names", + return_value=[fake_name], + ) + mocked_query_constant = mocker.patch.object( + subtensor, + "query_constant", + side_effect=[fake_value], + ) + mocked_from_dict = mocker.patch.object( + subtensor_module.ColdkeySwapConstants, "from_dict" + ) + + # Call + result = subtensor.get_coldkey_swap_constants() + + # Asserts + mocked_constants_names.assert_called_once() + mocked_query_constant.assert_called_once_with( + module_name="SubtensorModule", + constant_name=fake_name, + block=None, + ) + mocked_from_dict.assert_called_once_with({fake_name: fake_value.value}) + assert result == mocked_from_dict.return_value + + +def test_announce_coldkey_swap(mocker, subtensor): + """Tests `announce_coldkey_swap` extrinsic call method.""" + # preps + wallet = mocker.Mock(spec=Wallet) + new_coldkey_ss58 = mocker.Mock(spec=str) + mocked_announce_coldkey_swap_extrinsic = mocker.patch.object( + subtensor_module, "announce_coldkey_swap_extrinsic" + ) + + # call + response = subtensor.announce_coldkey_swap( + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + ) + + # asserts + mocked_announce_coldkey_swap_extrinsic.assert_called_once_with( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=DEFAULT_MEV_PROTECTION, + period=DEFAULT_PERIOD, + raise_error=False, + wait_for_inclusion=True, + wait_for_finalization=True, + wait_for_revealed_execution=True, + ) + assert response == mocked_announce_coldkey_swap_extrinsic.return_value + + +def test_swap_coldkey_announced(mocker, subtensor): + """Tests `swap_coldkey_announced` extrinsic call method.""" + # preps + wallet = mocker.Mock(spec=Wallet) + new_coldkey_ss58 = mocker.Mock(spec=str) + mocked_swap_coldkey_announced_extrinsic = mocker.patch.object( + subtensor_module, "swap_coldkey_announced_extrinsic" + ) + + # call + response = subtensor.swap_coldkey_announced( + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + ) + + # asserts + mocked_swap_coldkey_announced_extrinsic.assert_called_once_with( + subtensor=subtensor, + wallet=wallet, + new_coldkey_ss58=new_coldkey_ss58, + mev_protection=DEFAULT_MEV_PROTECTION, + period=DEFAULT_PERIOD, + raise_error=False, + wait_for_inclusion=True, + wait_for_finalization=True, + wait_for_revealed_execution=True, + ) + assert response == mocked_swap_coldkey_announced_extrinsic.return_value