diff --git a/.github/workflows/e2e-subtensor-tests.yml b/.github/workflows/e2e-subtensor-tests.yml index fcd7ddf53..b1527f857 100644 --- a/.github/workflows/e2e-subtensor-tests.yml +++ b/.github/workflows/e2e-subtensor-tests.yml @@ -163,7 +163,7 @@ jobs: os: - ubuntu-latest test-file: ${{ fromJson(needs.find-tests.outputs.test-files) }} - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] steps: - name: Check-out repository uses: actions/checkout@v4 diff --git a/.github/workflows/ruff-formatter.yml b/.github/workflows/ruff-formatter.yml index 93166c304..92d7c30c4 100644 --- a/.github/workflows/ruff-formatter.yml +++ b/.github/workflows/ruff-formatter.yml @@ -1,42 +1,27 @@ name: Ruff Formatter Check + +concurrency: + group: ruff-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + permissions: contents: read on: pull_request: - types: [opened, synchronize, reopened, edited] + types: [opened, synchronize, reopened] jobs: ruff: runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.9.13"] + timeout-minutes: 10 steps: - name: Checkout repository uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Set up caching for Ruff virtual environment - id: cache-ruff - uses: actions/cache@v4 - with: - path: .venv - key: v2-pypi-py-ruff-${{ matrix.python-version }}-${{ hashFiles('pyproject.toml') }} - restore-keys: | - v2-pypi-py-ruff-${{ matrix.python-version }}- - - - name: Set up Ruff virtual environment if cache is missed - if: steps.cache-ruff.outputs.cache-hit != 'true' - run: | - python -m venv .venv - .venv/bin/python -m pip install ruff==0.11.5 - - name: Ruff format check - run: | - .venv/bin/ruff format --diff bittensor_cli - .venv/bin/ruff format --diff tests + uses: astral-sh/ruff-action@v3 + with: + version: "0.11.5" + args: "format --diff" + src: "bittensor_cli tests" diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 4e5d34e41..4e96f29b6 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -27,7 +27,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] steps: - name: Check-out repository uses: actions/checkout@v4 diff --git a/bittensor_cli/cli.py b/bittensor_cli/cli.py index 295232c36..47c005c34 100755 --- a/bittensor_cli/cli.py +++ b/bittensor_cli/cli.py @@ -32,7 +32,7 @@ from rich.prompt import FloatPrompt, Prompt, IntPrompt from rich.table import Column, Table from rich.tree import Tree -from typing_extensions import Annotated +from typing import Annotated from yaml import safe_dump, safe_load from bittensor_cli.src import ( @@ -867,7 +867,7 @@ def __init__(self): self.subnet_mechanisms_app = typer.Typer(epilog=_epilog) self.weights_app = typer.Typer(epilog=_epilog) self.view_app = typer.Typer(epilog=_epilog) - self.liquidity_app = typer.Typer(epilog=_epilog) + self.liquidity_app = typer.Typer(epilog=_epilog, hidden=True) self.crowd_app = typer.Typer(epilog=_epilog) self.utils_app = typer.Typer(epilog=_epilog) self.axon_app = typer.Typer(epilog=_epilog) @@ -1165,6 +1165,9 @@ def __init__(self): self.sudo_app.command("trim", rich_help_panel=HELP_PANELS["SUDO"]["CONFIG"])( self.sudo_trim ) + self.sudo_app.command( + "stake-burn", rich_help_panel=HELP_PANELS["SUDO"]["CONFIG"] + )(self.sudo_stake_burn) # subnets commands self.subnets_app.command( @@ -1299,6 +1302,7 @@ def __init__(self): self.sudo_app.command("senate_vote", hidden=True)(self.sudo_senate_vote) self.sudo_app.command("get_take", hidden=True)(self.sudo_get_take) self.sudo_app.command("set_take", hidden=True)(self.sudo_set_take) + self.sudo_app.command("buyback", hidden=True)(self.sudo_stake_burn) # Stake self.stake_app.command( @@ -3543,62 +3547,63 @@ def wallet_check_ck_swap( wallet_ss58_address: Optional[str] = Options.wallet_ss58_address, wallet_path: Optional[str] = Options.wallet_path, wallet_hotkey: Optional[str] = Options.wallet_hotkey, - scheduled_block: Optional[int] = typer.Option( - None, - "--block", - help="Block number where the swap was scheduled", - ), show_all: bool = typer.Option( False, "--all", "-a", - help="Show all pending coldkey swaps", + help="Show all pending coldkey swap announcements", ), network: Optional[list[str]] = Options.network, quiet: bool = Options.quiet, verbose: bool = Options.verbose, + json_output: bool = Options.json_output, ): """ - Check the status of scheduled coldkey swaps. + Check the status of pending coldkey swap announcements. + + Coldkey swaps use a two-step announcement system. Use this command + to check if you have any pending announcements and when they become executable. USAGE - This command can be used in three ways: - 1. Show all pending swaps (--all) - 2. Check status of a specific wallet's swap or SS58 address - 3. Check detailed swap status with block number (--block) + This command can be used in two ways: + + 1. Show all pending announcements (--all) + + 2. Check status of a specific wallet or SS58 address EXAMPLES - Show all pending swaps: + 1. Show all pending swap announcements: + [green]$[/green] btcli wallet swap-check --all - Check specific wallet's swap: + 2. Check specific wallet's announcement: + [green]$[/green] btcli wallet swap-check --wallet-name my_wallet - Check swap using SS58 address: - [green]$[/green] btcli wallet swap-check --ss58 5DkQ4... + 3. Check announcement using SS58 address: - Check swap details with block number: - [green]$[/green] btcli wallet swap-check --wallet-name my_wallet --block 12345 + [green]$[/green] btcli wallet swap-check --ss58 5DkQ4... """ - # TODO add json_output if this ever gets used again (doubtful) - self.verbosity_handler(quiet, verbose, json_output=False, prompt=False) + self.verbosity_handler(quiet, verbose, json_output=json_output, prompt=False) self.initialize_chain(network) if show_all: return self._run_command( - wallets.check_swap_status(self.subtensor, None, None) + wallets.check_swap_status(self.subtensor, None, json_output=json_output) ) if not wallet_ss58_address: wallet_ss58_address = Prompt.ask( "Enter [blue]wallet name[/blue] or [blue]SS58 address[/blue] [dim]" - "(leave blank to show all pending swaps)[/dim]" + "(leave blank to show all pending announcements)[/dim]" ) if not wallet_ss58_address: return self._run_command( - wallets.check_swap_status(self.subtensor, None, None) + wallets.check_swap_status( + self.subtensor, None, json_output=json_output + ) ) if is_valid_ss58_address(wallet_ss58_address): @@ -3613,26 +3618,11 @@ def wallet_check_ck_swap( ) ss58_address = wallet.coldkeypub.ss58_address - if not scheduled_block: - block_input = Prompt.ask( - "[blue]Enter the block number[/blue] where the swap was scheduled " - "[dim](optional, press enter to skip)[/dim]", - default="", - ) - if block_input: - try: - scheduled_block = int(block_input) - except ValueError: - print_error("Invalid block number") - raise typer.Exit() - logger.debug( - "args:\n" - f"scheduled_block {scheduled_block}\n" - f"ss58_address {ss58_address}\n" - f"network {network}\n" - ) + logger.debug(f"args:\nss58_address {ss58_address}\nnetwork {network}\n") return self._run_command( - wallets.check_swap_status(self.subtensor, ss58_address, scheduled_block) + wallets.check_swap_status( + self.subtensor, ss58_address, json_output=json_output + ) ) def wallet_create_wallet( @@ -3842,8 +3832,10 @@ def wallet_history( # if self.config.get("network") != "finney": # console.print(no_use_config_str) - # For Rao games - print_error("This command is disabled on the 'rao' network.") + print_error( + "This command is currently disabled as it used external APIs which are no longer " + "feasible; meanwhile a chain native data fetching solution is being investigated." + ) raise typer.Exit() self.verbosity_handler(quiet, verbose, False, False) @@ -4150,6 +4142,10 @@ def wallet_verify( def wallet_swap_coldkey( self, + action: str = typer.Argument( + None, + help="Action to perform: 'announce' to announce intent, 'execute' to complete swap after delay, 'dispute' to freeze the swap.", + ), wallet_name: Optional[str] = Options.wallet_name, wallet_path: Optional[str] = Options.wallet_path, wallet_hotkey: Optional[str] = Options.wallet_hotkey, @@ -4159,39 +4155,70 @@ def wallet_swap_coldkey( "--new-coldkey-ss58", "--new-wallet", "--new", - help="SS58 address of the new coldkey that will replace the current one.", + help="SS58 address or wallet name of the new coldkey.", ), + mev_protection: bool = Options.mev_protection, network: Optional[list[str]] = Options.network, - proxy: Optional[str] = Options.proxy, - announce_only: bool = Options.announce_only, - decline: bool = Options.decline, quiet: bool = Options.quiet, verbose: bool = Options.verbose, - force_swap: bool = typer.Option( - False, - "--force", - "-f", - "--force-swap", - help="Force the swap even if the new coldkey is already scheduled for a swap.", - ), + decline: bool = Options.decline, + prompt: bool = Options.prompt, ): """ - Schedule a coldkey swap for a wallet. + Swap your coldkey to a new address using a two-step announcement process. + + Coldkey swaps require two steps for security: + + 1. [bold]Announce[/bold]: Declare your intent to swap. This pays the swap fee and starts a delay period. - This command allows you to schedule a coldkey swap for a wallet. You can either provide a new wallet name, or SS58 address. + 2. [bold]Execute[/bold]: After the delay (typically 5 days), complete the swap. + + If you suspect compromise, you can [bold]Dispute[/bold] an active announcement to freeze + all activity for the coldkey until the triumvirate can intervene. EXAMPLES - [green]$[/green] btcli wallet schedule-coldkey-swap --new-wallet my_new_wallet + Step 1 - Announce your intent to swap: + + [green]$[/green] btcli wallet swap-coldkey announce --new-coldkey 5Dk...X3q + + Step 2 - After the delay period, execute the swap: - [green]$[/green] btcli wallet schedule-coldkey-swap --new-coldkey-ss58 5Dk...X3q + [green]$[/green] btcli wallet swap-coldkey execute --new-coldkey 5Dk...X3q + + Dispute an active swap (freezes the swap process): + + [green]$[/green] btcli wallet swap-coldkey dispute + + Check status of pending swaps: + + [green]$[/green] btcli wallet swap-check """ self.verbosity_handler(quiet, verbose, prompt=False, json_output=False) - proxy = self.is_valid_proxy_name_or_ss58(proxy, announce_only) + + if not action: + console.print( + "\n[bold][blue]Coldkey Swap Actions:[/blue][/bold]\n" + " [dark_sea_green3]announce[/dark_sea_green3] - Start the swap process (pays fee, starts delay timer)\n" + " [dark_sea_green3]execute[/dark_sea_green3] - Complete the swap (after delay period)\n" + " [dark_sea_green3]dispute[/dark_sea_green3] - Freeze the swap process if you suspect compromise\n\n" + " [dim]You can check the current status of your swap with 'btcli wallet swap-check'.[/dim]\n" + ) + action = Prompt.ask( + "Select action", + choices=["announce", "execute", "dispute"], + default="announce", + ) + + if action.lower() not in ("announce", "execute", "dispute"): + print_error( + f"Invalid action: {action}. Must be 'announce', 'execute', or 'dispute'." + ) + raise typer.Exit(1) if not wallet_name: wallet_name = Prompt.ask( - "Enter the [blue]wallet name[/blue] which you want to swap the coldkey for", + "Enter the [blue]wallet name[/blue] of the coldkey to swap", default=self.config.get("wallet_name") or defaults.wallet.name, ) wallet = self.wallet_ask( @@ -4202,48 +4229,74 @@ def wallet_swap_coldkey( validate=WV.WALLET, ) console.print( - f"\nWallet selected to swap the [blue]coldkey[/blue] from: \n" - f"[dark_sea_green3]{wallet}[/dark_sea_green3]\n" + f"\nWallet selected: [dark_sea_green3]{wallet}[/dark_sea_green3]\n" ) - if not new_wallet_or_ss58: - new_wallet_or_ss58 = Prompt.ask( - "Enter the [blue]new wallet name[/blue] or [blue]SS58 address[/blue] of the new coldkey", - ) + new_wallet_coldkey_ss58 = None + if action != "dispute": + if not new_wallet_or_ss58: + new_wallet_or_ss58 = Prompt.ask( + "Enter the [blue]new wallet name[/blue] or [blue]SS58 address[/blue] of the new coldkey", + ) + + if is_valid_ss58_address(new_wallet_or_ss58): + new_wallet_coldkey_ss58 = new_wallet_or_ss58 + else: + new_wallet_name = new_wallet_or_ss58 + new_wallet = self.wallet_ask( + new_wallet_name, + wallet_path, + wallet_hotkey, + ask_for=[WO.NAME], + validate=WV.WALLET, + ) + console.print( + f"\nNew coldkey wallet: [dark_sea_green3]{new_wallet}[/dark_sea_green3]\n" + ) + new_wallet_coldkey_ss58 = new_wallet.coldkeypub.ss58_address - if is_valid_ss58_address(new_wallet_or_ss58): - new_wallet_coldkey_ss58 = new_wallet_or_ss58 - else: - new_wallet_name = new_wallet_or_ss58 - new_wallet = self.wallet_ask( - new_wallet_name, - wallet_path, - wallet_hotkey, - ask_for=[WO.NAME], - validate=WV.WALLET, - ) - console.print( - f"\nNew wallet to swap the [blue]coldkey[/blue] to: \n" - f"[dark_sea_green3]{new_wallet}[/dark_sea_green3]\n" - ) - new_wallet_coldkey_ss58 = new_wallet.coldkeypub.ss58_address logger.debug( - "args:\n" - f"network {network}\n" - f"new_coldkey_ss58 {new_wallet_coldkey_ss58}\n" - f"force_swap {force_swap}" + f"args:\n" + f"action: {action}\n" + f"network: {network}\n" + f"new_coldkey_ss58: {new_wallet_coldkey_ss58}" ) - return self._run_command( - wallets.schedule_coldkey_swap( - wallet=wallet, - subtensor=self.initialize_chain(network), - new_coldkey_ss58=new_wallet_coldkey_ss58, - force_swap=force_swap, - decline=decline, - quiet=quiet, - proxy=proxy, + + if action == "announce": + return self._run_command( + wallets.announce_coldkey_swap( + wallet=wallet, + subtensor=self.initialize_chain(network), + new_coldkey_ss58=new_wallet_coldkey_ss58, + decline=decline, + quiet=quiet, + prompt=prompt, + mev_protection=mev_protection, + ) + ) + elif action == "dispute": + return self._run_command( + wallets.dispute_coldkey_swap( + wallet=wallet, + subtensor=self.initialize_chain(network), + decline=decline, + quiet=quiet, + prompt=prompt, + mev_protection=mev_protection, + ) + ) + else: + return self._run_command( + wallets.execute_coldkey_swap( + wallet=wallet, + subtensor=self.initialize_chain(network), + new_coldkey_ss58=new_wallet_coldkey_ss58, + decline=decline, + quiet=quiet, + prompt=prompt, + mev_protection=mev_protection, + ) ) - ) def axon_reset( self, @@ -7343,6 +7396,107 @@ def sudo_trim( ) ) + def sudo_stake_burn( + self, + network: Optional[list[str]] = Options.network, + wallet_name: Optional[str] = Options.wallet_name, + wallet_path: Optional[str] = Options.wallet_path, + wallet_hotkey: Optional[str] = Options.wallet_hotkey_ss58, + netuid: int = Options.netuid, + amount: float = typer.Option( + None, + "--amount", + "-a", + help="Amount of TAO to stake and burn", + prompt="Enter the amount of TAO to stake and burn", + ), + proxy: Optional[str] = Options.proxy, + rate_tolerance: Optional[float] = Options.rate_tolerance, + safe_staking: Optional[bool] = Options.safe_staking, + mev_protection: bool = Options.mev_protection, + quiet: bool = Options.quiet, + verbose: bool = Options.verbose, + json_output: bool = Options.json_output, + prompt: bool = Options.prompt, + decline: bool = Options.decline, + period: int = Options.period, + ): + """ + Allows subnet owners to buy back alpha on their subnet by staking TAO and immediately burning the acquired alpha. + + [bold]Examples:[/bold] + 1. Stake and burn 10 TAO on subnet 14: + [green]$[/green] btcli sudo stake-burn --netuid 14 --amount 10 + 2. Stake and burn 10 TAO on subnet 14 with safe staking and 5% rate tolerance: + [green]$[/green] btcli sudo stake-burn --netuid 14 --amount 10 --tolerance 0.05 + 3. Stake and burn 10 TAO on subnet 14 with a specific hotkey: + [green]$[/green] btcli sudo stake-burn --netuid 14 --amount 10 --wallet-hotkey + """ + self.verbosity_handler(quiet, verbose, json_output, prompt) + proxy = self.is_valid_proxy_name_or_ss58(proxy, announce_only=False) + safe_staking = self.ask_safe_staking(safe_staking) + if safe_staking: + rate_tolerance = self.ask_rate_tolerance(rate_tolerance) + + print_protection_warnings( + mev_protection=mev_protection, + safe_staking=safe_staking, + command_name="sudo stake-burn", + ) + + if not wallet_hotkey: + wallet_hotkey = Prompt.ask( + "Enter the [blue]hotkey[/blue] name or " + "[blue]hotkey ss58 address[/blue] [dim](to use for the stake burn)[/dim]", + default=self.config.get("wallet_hotkey") or defaults.wallet.hotkey, + ) + + if wallet_hotkey and is_valid_ss58_address(wallet_hotkey): + hotkey_ss58 = wallet_hotkey + wallet = self.wallet_ask( + wallet_name, + wallet_path, + None, + ask_for=[WO.NAME, WO.PATH], + validate=WV.WALLET, + ) + else: + wallet = self.wallet_ask( + wallet_name, + wallet_path, + wallet_hotkey, + ask_for=[WO.NAME, WO.PATH, WO.HOTKEY], + validate=WV.WALLET_AND_HOTKEY, + ) + hotkey_ss58 = get_hotkey_pub_ss58(wallet) + + if amount <= 0: + print_error(f"You entered an incorrect stake and burn amount: {amount}") + raise typer.Exit() + + if netuid == 0: + print_error("Cannot stake and burn on the root subnet.") + raise typer.Exit() + + self._run_command( + sudo.stake_burn( + subtensor=self.initialize_chain(network), + wallet=wallet, + netuid=netuid, + amount=amount, + hotkey_ss58=hotkey_ss58, + safe_staking=safe_staking, + proxy=proxy, + rate_tolerance=rate_tolerance, + mev_protection=mev_protection, + json_output=json_output, + prompt=prompt, + decline=decline, + quiet=quiet, + period=period, + ) + ) + # Subnets def subnets_list( @@ -8518,6 +8672,9 @@ def liquidity_add( json_output: bool = Options.json_output, ): """Add liquidity to the swap (as a combination of TAO + Alpha).""" + console.print_error("User liquidity is currently disabled on Bittensor.") + raise typer.Exit() + self.verbosity_handler(quiet, verbose, json_output, prompt, decline) proxy = self.is_valid_proxy_name_or_ss58(proxy, announce_only) if not netuid: @@ -8595,6 +8752,9 @@ def liquidity_list( json_output: bool = Options.json_output, ): """Displays liquidity positions in given subnet.""" + console.print_error("User liquidity is currently disabled on Bittensor.") + raise typer.Exit() + self.verbosity_handler(quiet, verbose, json_output, prompt=False) if not netuid: netuid = IntPrompt.ask( @@ -8648,6 +8808,8 @@ def liquidity_remove( ): """Remove liquidity from the swap (as a combination of TAO + Alpha).""" + console.print_error("User liquidity is currently disabled on Bittensor.") + raise typer.Exit() self.verbosity_handler(quiet, verbose, json_output, prompt, decline) proxy = self.is_valid_proxy_name_or_ss58(proxy, announce_only) if all_liquidity_ids and position_id: @@ -8724,6 +8886,8 @@ def liquidity_modify( json_output: bool = Options.json_output, ): """Modifies the liquidity position for the given subnet.""" + console.print_error("User liquidity is currently disabled on Bittensor.") + raise typer.Exit() self.verbosity_handler(quiet, verbose, json_output, prompt, decline) proxy = self.is_valid_proxy_name_or_ss58(proxy, announce_only) if not netuid: diff --git a/bittensor_cli/src/__init__.py b/bittensor_cli/src/__init__.py index e4de4da53..bbbbb3151 100644 --- a/bittensor_cli/src/__init__.py +++ b/bittensor_cli/src/__init__.py @@ -683,6 +683,9 @@ class RootSudoOnly(Enum): "bonds_reset_enabled": ("sudo_set_bonds_reset_enabled", RootSudoOnly.FALSE), "transfers_enabled": ("sudo_set_toggle_transfer", RootSudoOnly.FALSE), "min_allowed_uids": ("sudo_set_min_allowed_uids", RootSudoOnly.TRUE), + "sn_owner_hotkey": ("sudo_set_sn_owner_hotkey", RootSudoOnly.FALSE), + "subnet_owner_hotkey": ("sudo_set_sn_owner_hotkey", RootSudoOnly.FALSE), + "recycle_or_burn": ("sudo_set_recycle_or_burn", RootSudoOnly.FALSE), # Note: These are displayed but not directly settable via HYPERPARAMS # They are derived or set via other mechanisms "alpha_high": ("", RootSudoOnly.FALSE), # Derived from alpha_values @@ -895,6 +898,24 @@ class RootSudoOnly(Enum): "owner_settable": False, "docs_link": "docs.learnbittensor.org/subnets/subnet-hyperparameters#minalloweduids", }, + "sn_owner_hotkey": { + "description": "Set the subnet owner hotkey.", + "side_effects": "Changes which hotkey is authorized as subnet owner for the given subnet.", + "owner_settable": True, + "docs_link": "docs.learnbittensor.org/subnets/subnet-hyperparameters", + }, + "subnet_owner_hotkey": { + "description": "Alias for sn_owner_hotkey; sets the subnet owner hotkey.", + "side_effects": "Same as sn_owner_hotkey.", + "owner_settable": True, + "docs_link": "docs.learnbittensor.org/subnets/subnet-hyperparameters", + }, + "recycle_or_burn": { + "description": "Set whether subnet TAO is recycled or burned.", + "side_effects": "Controls whether unstaked TAO is recycled back into the subnet or burned.", + "owner_settable": True, + "docs_link": "docs.learnbittensor.org/subnets/subnet-hyperparameters", + }, # Additional hyperparameters that appear in chain data but aren't directly settable via HYPERPARAMS "alpha_high": { "description": "High bound of the alpha range for stake calculations.", diff --git a/bittensor_cli/src/bittensor/chain_data.py b/bittensor_cli/src/bittensor/chain_data.py index cfcc699f5..e33b141ea 100644 --- a/bittensor_cli/src/bittensor/chain_data.py +++ b/bittensor_cli/src/bittensor/chain_data.py @@ -800,104 +800,36 @@ def tao_to_alpha(self, tao: Balance) -> Balance: def alpha_to_tao(self, alpha: Balance) -> Balance: return Balance.from_tao(alpha.tao * self.price.tao) - def tao_to_alpha_with_slippage( - self, tao: Balance - ) -> tuple[Balance, Balance, float]: - """ - Returns an estimate of how much Alpha a staker would receive if they stake their tao using the current pool - state. - - Args: - tao: Amount of TAO to stake. - Returns: - Tuple of balances where the first part is the amount of Alpha received, and the - second part (slippage) is the difference between the estimated amount and ideal - amount as if there was no slippage - """ - if self.is_dynamic: - new_tao_in = self.tao_in + tao - if new_tao_in == 0: - return tao, Balance.from_rao(0) - new_alpha_in = self.k / new_tao_in - - # Amount of alpha given to the staker - alpha_returned = Balance.from_rao( - self.alpha_in.rao - new_alpha_in.rao - ).set_unit(self.netuid) - - # Ideal conversion as if there is no slippage, just price - alpha_ideal = self.tao_to_alpha(tao) - - if alpha_ideal.tao > alpha_returned.tao: - slippage = Balance.from_tao( - alpha_ideal.tao - alpha_returned.tao - ).set_unit(self.netuid) - else: - slippage = Balance.from_tao(0) - else: - alpha_returned = tao.set_unit(self.netuid) - slippage = Balance.from_tao(0) - - slippage_pct_float = ( - 100 * float(slippage) / float(slippage + alpha_returned) - if slippage + alpha_returned != 0 - else 0 - ) - return alpha_returned, slippage, slippage_pct_float - - def alpha_to_tao_with_slippage( - self, alpha: Balance - ) -> tuple[Balance, Balance, float]: - """ - Returns an estimate of how much TAO a staker would receive if they unstake their alpha using the current pool - state. - Args: - alpha: Amount of Alpha to stake. - Returns: - Tuple of balances where the first part is the amount of TAO received, and the - second part (slippage) is the difference between the estimated amount and ideal - amount as if there was no slippage - """ - if self.is_dynamic: - new_alpha_in = self.alpha_in + alpha - new_tao_reserve = self.k / new_alpha_in - # Amount of TAO given to the unstaker - tao_returned = Balance.from_rao(self.tao_in - new_tao_reserve) - - # Ideal conversion as if there is no slippage, just price - tao_ideal = self.alpha_to_tao(alpha) - - if tao_ideal > tao_returned: - slippage = Balance.from_tao(tao_ideal.tao - tao_returned.tao) - else: - slippage = Balance.from_tao(0) - else: - tao_returned = alpha.set_unit(0) - slippage = Balance.from_tao(0) - slippage_pct_float = ( - 100 * float(slippage) / float(slippage + tao_returned) - if slippage + tao_returned != 0 - else 0 - ) - return tao_returned, slippage, slippage_pct_float +@dataclass +class ColdkeySwapAnnouncementInfo(InfoBase): + """ + Information about a coldkey swap announcement. + Contains information about a pending coldkey swap announcement when a coldkey + wants to declare its intent to swap to a new coldkey address. + The announcement is made before the actual swap can be executed, + allowing time for verification and security checks. -@dataclass -class ScheduledColdkeySwapInfo(InfoBase): - """Dataclass for scheduled coldkey swap information.""" + The destination coldkey address is stored as a hash. + This is to prevent the actual coldkey address from being exposed + to the network. The hash is computed using the BlakeTwo256 hashing algorithm. + """ - old_coldkey: str - new_coldkey: str - arbitration_block: int + coldkey: str + execution_block: int + new_coldkey_hash: str @classmethod - def _fix_decoded(cls, decoded: Any) -> "ScheduledColdkeySwapInfo": - """Fixes the decoded values.""" + def _fix_decoded( + cls, coldkey: str, decoded: tuple + ) -> "ColdkeySwapAnnouncementInfo": + execution_block, new_coldkey_hash = decoded + hash_str = "0x" + bytes(new_coldkey_hash[0]).hex() return cls( - old_coldkey=decode_account_id(decoded.get("old_coldkey")), - new_coldkey=decode_account_id(decoded.get("new_coldkey")), - arbitration_block=decoded.get("arbitration_block"), + coldkey=coldkey, + execution_block=int(execution_block), + new_coldkey_hash=hash_str, ) @@ -1204,6 +1136,8 @@ class SimSwapResult: alpha_amount: Balance tao_fee: Balance alpha_fee: Balance + tao_slippage: Balance + alpha_slippage: Balance @classmethod def from_dict(cls, d: dict, netuid: int) -> "SimSwapResult": @@ -1212,8 +1146,22 @@ def from_dict(cls, d: dict, netuid: int) -> "SimSwapResult": alpha_amount=Balance.from_rao(d["alpha_amount"]).set_unit(netuid), tao_fee=Balance.from_rao(d["tao_fee"]).set_unit(0), alpha_fee=Balance.from_rao(d["alpha_fee"]).set_unit(netuid), + tao_slippage=Balance.from_rao(d["tao_slippage"]).set_unit(0), + alpha_slippage=Balance.from_rao(d["alpha_slippage"]).set_unit(netuid), ) + @property + def tao_slippage_pct(self) -> float: + """Slippage percentage for alpha->tao swaps.""" + ideal = self.tao_amount.tao + self.tao_slippage.tao + return (100.0 * self.tao_slippage.tao / ideal) if ideal > 0 else 0.0 + + @property + def alpha_slippage_pct(self) -> float: + """Slippage percentage for tao->alpha swaps.""" + ideal = self.alpha_amount.tao + self.alpha_slippage.tao + return (100.0 * self.alpha_slippage.tao / ideal) if ideal > 0 else 0.0 + @dataclass class CrowdloanData(InfoBase): diff --git a/bittensor_cli/src/bittensor/subtensor_interface.py b/bittensor_cli/src/bittensor/subtensor_interface.py index 57b4a627e..7eb93aa7c 100644 --- a/bittensor_cli/src/bittensor/subtensor_interface.py +++ b/bittensor_cli/src/bittensor/subtensor_interface.py @@ -30,6 +30,7 @@ MetagraphInfo, SimSwapResult, CrowdloanData, + ColdkeySwapAnnouncementInfo, ) from bittensor_cli.src import DelegatesDetails from bittensor_cli.src.bittensor.balances import Balance, fixed_to_float @@ -443,40 +444,52 @@ async def get_total_stake_for_coldkey( :return: {address: Balance objects} """ - sub_stakes, dynamic_info = await asyncio.gather( + sub_stakes, price_map = await asyncio.gather( self.get_stake_for_coldkeys(list(ss58_addresses), block_hash=block_hash), - # Token pricing info - self.all_subnets(block_hash=block_hash), + self.get_subnet_prices(block_hash=block_hash), ) - results = {} + results: dict[str, tuple[Balance, Balance]] = {} + dynamic_stakes: list[tuple[str, "StakeInfo"]] = [] + for ss58, stake_info_list in sub_stakes.items(): - total_tao_value = Balance(0) - total_swapped_tao_value = Balance(0) + total_tao_value, total_swapped_tao_value = Balance(0), Balance(0) for sub_stake in stake_info_list: if sub_stake.stake.rao == 0: continue - netuid = sub_stake.netuid - pool = dynamic_info[netuid] - alpha_value = Balance.from_rao(int(sub_stake.stake.rao)).set_unit( - netuid + netuid = sub_stake.netuid + price = price_map[netuid] + ideal_tao = Balance.from_tao(sub_stake.stake.tao * price.tao).set_unit( + 0 ) + total_tao_value += ideal_tao - # Without slippage - tao_value = pool.alpha_to_tao(alpha_value) - total_tao_value += tao_value - - # With slippage if netuid == 0: - swapped_tao_value = tao_value + total_swapped_tao_value += ideal_tao else: - swapped_tao_value, _, _ = pool.alpha_to_tao_with_slippage( - sub_stake.stake - ) - total_swapped_tao_value += swapped_tao_value + dynamic_stakes.append((ss58, sub_stake)) results[ss58] = (total_tao_value, total_swapped_tao_value) + + if dynamic_stakes: + sim_results = await asyncio.gather( + *[ + self.sim_swap( + origin_netuid=sub_stake.netuid, + destination_netuid=0, + amount=sub_stake.stake.rao, + block_hash=block_hash, + ) + for _, sub_stake in dynamic_stakes + ] + ) + + for (ss58, sub_stake), sim_result in zip(dynamic_stakes, sim_results): + total_tao_value, total_swapped_tao_value = results[ss58] + total_swapped_tao_value += sim_result.tao_amount + results[ss58] = (total_tao_value, total_swapped_tao_value) + return results async def get_total_stake_for_hotkey( @@ -1653,7 +1666,7 @@ async def all_subnets(self, block_hash: Optional[str] = None) -> list[DynamicInf "get_all_dynamic_info", block_hash=block_hash, ), - self.get_subnet_prices(block_hash=block_hash, page_size=129), + self.get_subnet_prices(block_hash=block_hash), ) sns: list[DynamicInfo] = DynamicInfo.list_from_any(result) for sn in sns: @@ -1781,6 +1794,7 @@ async def sim_swap( ) secondary_fee = (result.tao_fee / sn_price.tao).set_unit(origin_netuid) result.alpha_fee = result.alpha_fee + secondary_fee + result.tao_slippage = intermediate_result.tao_slippage return result elif origin_netuid > 0: # dynamic to tao @@ -1805,30 +1819,119 @@ async def sim_swap( destination_netuid, ) - async def get_scheduled_coldkey_swap( + async def get_coldkey_swap_announcements( self, block_hash: Optional[str] = None, reuse_block: bool = False, - ) -> Optional[list[str]]: + ) -> list[ColdkeySwapAnnouncementInfo]: + """Fetches all pending coldkey swap announcements. + + Args: + block_hash: Block hash at which to perform query. + reuse_block: Whether to reuse the last-used block hash. + + Returns: + A list of ColdkeySwapAnnouncementInfo for all pending announcements. """ - Queries the chain to fetch the list of coldkeys that are scheduled for a swap. + result = await self.substrate.query_map( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) - :param block_hash: Block hash at which to perform query. - :param reuse_block: Whether to reuse the last-used block hash. + announcements = [] + async for ss58, data in result: + coldkey = decode_account_id(ss58) + announcements.append( + ColdkeySwapAnnouncementInfo._fix_decoded(coldkey, data) + ) + return announcements + + async def get_coldkey_swap_announcement( + self, + coldkey_ss58: str, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> Optional[ColdkeySwapAnnouncementInfo]: + """Fetches a pending coldkey swap announcement for a specific coldkey. - :return: A list of SS58 addresses of the coldkeys that are scheduled for a coldkey swap. + Args: + coldkey_ss58: The SS58 address of the coldkey to query. + block_hash: Block hash at which to perform query. + reuse_block: Whether to reuse the last-used block hash. + + Returns: + ColdkeySwapAnnouncementInfo if an announcement exists, None otherwise. + """ + result = await self.query( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncements", + params=[coldkey_ss58], + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + + if result is None: + return None + + return ColdkeySwapAnnouncementInfo._fix_decoded(coldkey_ss58, result) + + async def get_coldkey_swap_disputes( + self, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> list[tuple[str, int]]: + """Fetch all coldkey swap disputes. + + Args: + block_hash: Optional block hash at which to query storage. + reuse_block: Whether to reuse the last-used block hash. + + Returns: + list[tuple[str, int]]: Tuples of `(coldkey_ss58, disputed_block)`. """ result = await self.substrate.query_map( module="SubtensorModule", - storage_function="ColdkeySwapScheduled", + storage_function="ColdkeySwapDisputes", + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + + disputes: list[tuple[str, int]] = [] + async for ss58, data in result: + coldkey = decode_account_id(ss58) + disputes.append((coldkey, data.value)) + return disputes + + async def get_coldkey_swap_dispute( + self, + coldkey_ss58: str, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> Optional[int]: + """Fetch the disputed block for a given coldkey swap. + + Args: + coldkey_ss58: Coldkey SS58 address. + block_hash: Optional block hash at which to query storage. + reuse_block: Whether to reuse the last-used block hash. + + Returns: + int | None: Block number when disputed, or None if no dispute exists. + """ + result = await self.query( + module="SubtensorModule", + storage_function="ColdkeySwapDisputes", + params=[coldkey_ss58], block_hash=block_hash, reuse_block_hash=reuse_block, ) - keys_pending_swap = [] - async for ss58, _ in result: - keys_pending_swap.append(decode_account_id(ss58)) - return keys_pending_swap + if result is None: + return None + + return int(result) async def get_crowdloans( self, block_hash: Optional[str] = None @@ -1962,24 +2065,53 @@ async def get_crowdloan_contributors( return contributor_contributions - async def get_coldkey_swap_schedule_duration( + async def get_coldkey_swap_announcement_delay( self, block_hash: Optional[str] = None, reuse_block: bool = False, ) -> int: + """Retrieves the delay (in blocks) before a coldkey swap can be executed. + + This is the time the user must wait after announcing a coldkey swap + before they can execute the swap. + + Args: + block_hash: The hash of the blockchain block number for the query. + reuse_block: Whether to reuse the last-used blockchain block hash. + + Returns: + The number of blocks to wait after announcement. """ - Retrieves the duration (in blocks) required for a coldkey swap to be executed. + result = await self.query( + module="SubtensorModule", + storage_function="ColdkeySwapAnnouncementDelay", + params=[], + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + + return result + + async def get_coldkey_swap_reannouncement_delay( + self, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> int: + """Retrieves the delay (in blocks) before the user can reannounce a coldkey swap. + + If the user has already announced a swap, they must wait this many blocks + after the original execution block before they can announce a new swap. Args: block_hash: The hash of the blockchain block number for the query. reuse_block: Whether to reuse the last-used blockchain block hash. Returns: - int: The number of blocks required for the coldkey swap schedule duration. + The number of blocks to wait before reannouncing. """ result = await self.query( module="SubtensorModule", - storage_function="ColdkeySwapScheduleDuration", + storage_function="ColdkeySwapReannouncementDelay", params=[], block_hash=block_hash, reuse_block_hash=reuse_block, @@ -1987,6 +2119,30 @@ async def get_coldkey_swap_schedule_duration( return result + async def get_coldkey_swap_cost( + self, + block_hash: Optional[str] = None, + reuse_block: bool = False, + ) -> Balance: + """Retrieves the fee required to announce a coldkey swap. + + Args: + block_hash: Block hash at which to query the constant. + reuse_block: Whether to reuse the last-used block hash. + + Returns: + The swap cost as a Balance object. Returns 0 TAO if constant not found. + """ + swap_cost = await self.substrate.get_constant( + module_name="SubtensorModule", + constant_name="KeySwapCost", + block_hash=block_hash, + reuse_block_hash=reuse_block, + ) + if swap_cost is None: + return None + return Balance.from_rao(swap_cost.value) + async def get_coldkey_claim_type( self, coldkey_ss58: str, @@ -2380,43 +2536,36 @@ async def get_subnet_price( :return: The current Alpha price in TAO units for the specified subnet. """ - # TODO update this to use the runtime call SwapRuntimeAPI.current_alpha_price - current_sqrt_price = await self.query( - module="Swap", - storage_function="AlphaSqrtPrice", - params=[netuid], + if netuid == 0: + return Balance.from_tao(1.0) + + current_price = await self.query_runtime_api( + "SwapRuntimeApi", + "current_alpha_price", + params={"netuid": netuid}, block_hash=block_hash, ) - - current_sqrt_price = fixed_to_float(current_sqrt_price) - current_price = current_sqrt_price * current_sqrt_price - return Balance.from_rao(int(current_price * 1e9)) + return Balance.from_rao(current_price) async def get_subnet_prices( - self, block_hash: Optional[str] = None, page_size: int = 100 + self, block_hash: Optional[str] = None ) -> dict[int, Balance]: """ Gets the current Alpha prices in TAO for all subnets. :param block_hash: The hash of the block to retrieve prices from. - :param page_size: The page size for batch queries (default: 100). :return: A dictionary mapping netuid to the current Alpha price in TAO units. """ - query = await self.substrate.query_map( - module="Swap", - storage_function="AlphaSqrtPrice", - page_size=page_size, + all_prices = await self.query_runtime_api( + "SwapRuntimeApi", + "current_alpha_price_all", block_hash=block_hash, ) - - map_ = {} - async for netuid_, current_sqrt_price in query: - current_sqrt_price_ = fixed_to_float(current_sqrt_price.value) - current_price = current_sqrt_price_**2 - map_[netuid_] = Balance.from_rao(int(current_price * 1e9)) - - return map_ + result = {} + for entry in all_prices: + result[entry["netuid"]] = Balance.from_rao(entry["price"]) + return result async def get_all_subnet_ema_tao_inflow( self, diff --git a/bittensor_cli/src/bittensor/utils.py b/bittensor_cli/src/bittensor/utils.py index 53a4e4191..fc6838dde 100644 --- a/bittensor_cli/src/bittensor/utils.py +++ b/bittensor_cli/src/bittensor/utils.py @@ -25,6 +25,7 @@ from numpy.typing import NDArray from rich.console import Console from rich.prompt import Confirm, Prompt +from rich.table import Table from scalecodec import GenericCall from scalecodec.utils.ss58 import ss58_encode, ss58_decode import typer @@ -89,6 +90,63 @@ def confirm_action( return Confirm.ask(message, default=default) +def create_table(*columns, title: str = "", **overrides) -> Table: + """ + Creates a Rich Table with consistent CLI styling. + + Default styling: no edge borders, bold white headers, bright black borders, + footer enabled, center-aligned title, and no lines between rows. + + Args: + *columns: Optional Column objects to add to the table upfront. + title: Table title with rich markup support. + **overrides: Any Table() parameter to override defaults (e.g., show_footer, + border_style, box, expand). + + Returns: + Configured Rich Table ready for adding columns/rows. + + Examples: + Basic usage (add columns later): + >>> table = create_table(title="My Subnets") + >>> table.add_column("Netuid", justify="center") + >>> table.add_row("1") + + With Column objects upfront: + >>> from rich.table import Column + >>> table = create_table( + ... Column("Name", justify="left"), + ... Column("Value", justify="right"), + ... title="Settings" + ... ) + >>> table.add_row("Timeout", "30s") + + Custom styling: + >>> from rich import box + >>> table = create_table( + ... title="Custom", + ... border_style="blue", + ... box=box.ROUNDED + ... ) + """ + defaults = { + "title": title, + "show_footer": True, + "show_edge": False, + "header_style": "bold white", + "border_style": "bright_black", + "style": "bold", + "title_justify": "center", + "show_lines": False, + "pad_edge": True, + } + + # Merge overrides into defaults + config = {**defaults, **overrides} + + return Table(*columns, **config) + + jinja_env = Environment( loader=PackageLoader("bittensor_cli", "src/bittensor/templates"), autoescape=select_autoescape(), @@ -897,7 +955,7 @@ def normalize_hyperparameters( norm_value = norm_value.to_dict() else: norm_value = value - except Exception: + except (KeyError, ValueError, TypeError, AttributeError): # bittensor.logging.warning(f"Error normalizing parameter '{param}': {e}") norm_value = "-" if not json_output: @@ -1785,7 +1843,7 @@ def is_valid_github_url(url: str) -> bool: return False return True - except Exception: # TODO figure out the exceptions that can be raised in here + except (ValueError, TypeError, AttributeError): return False diff --git a/bittensor_cli/src/commands/stake/add.py b/bittensor_cli/src/commands/stake/add.py index 2379a62aa..7372d6a21 100644 --- a/bittensor_cli/src/commands/stake/add.py +++ b/bittensor_cli/src/commands/stake/add.py @@ -17,6 +17,7 @@ from bittensor_cli.src.bittensor.utils import ( confirm_action, console, + create_table, get_hotkey_wallets_for_wallet, is_valid_ss58_address, print_error, @@ -382,18 +383,7 @@ async def stake_extrinsic( return remaining_wallet_balance -= amount_to_stake - # Calculate slippage - # TODO: Update for V3, slippage calculation is significantly different in v3 - # try: - # received_amount, slippage_pct, slippage_pct_float, rate = ( - # _calculate_slippage(subnet_info, amount_to_stake, stake_fee) - # ) - # except ValueError: - # return False - # - # max_slippage = max(slippage_pct_float, max_slippage) - - # Temporary workaround - calculations without slippage + # Calculate rate current_price_float = float(subnet_info.price.tao) rate = 1.0 / current_price_float @@ -443,6 +433,11 @@ async def stake_extrinsic( amount=amount_minus_fee.rao, ) received_amount = sim_swap.alpha_amount + + slippage_pct_float = sim_swap.alpha_slippage_pct + slippage_pct = f"{slippage_pct_float:.4f} %" + max_slippage = max(slippage_pct_float, max_slippage) + # Add rows for the table base_row = [ str(netuid), # netuid @@ -453,7 +448,7 @@ async def stake_extrinsic( str(received_amount.set_unit(netuid)), # received str(sim_swap.tao_fee), # fee str(extrinsic_fee), - # str(slippage_pct), # slippage + str(slippage_pct), # slippage ] + row_extension rows.append(tuple(base_row)) @@ -652,19 +647,11 @@ def _define_stake_table( Returns: Table: An initialized rich Table object with appropriate columns """ - table = Table( + table = create_table( title=f"\n[{COLOR_PALETTE.G.HEADER}]Staking to:\n" f"Wallet: [{COLOR_PALETTE.G.CK}]{wallet.name}[/{COLOR_PALETTE.G.CK}], " f"Coldkey ss58: [{COLOR_PALETTE.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLOR_PALETTE.G.CK}]\n" f"Network: {subtensor.network}[/{COLOR_PALETTE.G.HEADER}]\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column("Netuid", justify="center", style="grey89") @@ -696,10 +683,9 @@ def _define_stake_table( justify="center", style=COLOR_PALETTE.STAKE.TAO, ) - # TODO: Uncomment when slippage is reimplemented for v3 - # table.add_column( - # "Slippage", justify="center", style=COLOR_PALETTE["STAKE"]["SLIPPAGE_PERCENT"] - # ) + table.add_column( + "Slippage", justify="center", style=COLOR_PALETTE["STAKE"]["SLIPPAGE_PERCENT"] + ) if safe_staking: table.add_column( @@ -744,56 +730,11 @@ def _print_table_and_slippage(table: Table, max_slippage: float, safe_staking: b - [bold white]Hotkey[/bold white]: The ss58 address of the hotkey you are staking to. - [bold white]Amount[/bold white]: The TAO you are staking into this subnet onto this hotkey. - [bold white]Rate[/bold white]: The rate of exchange between your TAO and the subnet's stake. - - [bold white]Received[/bold white]: The amount of stake you will receive on this subnet after slippage.""" - # - [bold white]Slippage[/bold white]: The slippage percentage of the stake operation. (0% if the subnet is not dynamic i.e. root).""" + - [bold white]Received[/bold white]: The amount of stake you will receive on this subnet after slippage. + - [bold white]Slippage[/bold white]: The slippage percentage of the stake operation. (0% if the subnet is not dynamic i.e. root).""" safe_staking_description = """ - [bold white]Rate Tolerance[/bold white]: Maximum acceptable alpha rate. If the rate exceeds this tolerance, the transaction will be limited or rejected. - [bold white]Partial staking[/bold white]: If True, allows staking up to the rate tolerance limit. If False, the entire transaction will fail if rate tolerance is exceeded.\n""" console.print(base_description + (safe_staking_description if safe_staking else "")) - - -def _calculate_slippage( - subnet_info, amount: Balance, stake_fee: Balance -) -> tuple[Balance, str, float, str]: - """Calculate slippage when adding stake. - - Args: - subnet_info: Subnet dynamic info - amount: Amount being staked - stake_fee: Transaction fee for the stake operation - - Returns: - tuple containing: - - received_amount: Amount received after slippage and fees - - slippage_str: Formatted slippage percentage string - - slippage_float: Raw slippage percentage value - - rate: Exchange rate string - - TODO: Update to v3. This method only works for protocol-liquidity-only - mode (user liquidity disabled) - """ - amount_after_fee = amount - stake_fee - - if amount_after_fee < 0: - print_error("You don't have enough balance to cover the stake fee.") - raise ValueError() - - received_amount, _, _ = subnet_info.tao_to_alpha_with_slippage(amount_after_fee) - - if subnet_info.is_dynamic: - ideal_amount = subnet_info.tao_to_alpha(amount) - total_slippage = ideal_amount - received_amount - slippage_pct_float = 100 * (total_slippage.tao / ideal_amount.tao) - slippage_str = f"{slippage_pct_float:.4f} %" - rate = f"{(1 / subnet_info.price.tao or 1):.4f}" - else: - # TODO: Fix this. Slippage is always zero for static networks. - slippage_pct_float = ( - 100 * float(stake_fee.tao) / float(amount.tao) if amount.tao != 0 else 0 - ) - slippage_str = f"{slippage_pct_float:.4f} %" - rate = "1" - - return received_amount, slippage_str, slippage_pct_float, rate diff --git a/bittensor_cli/src/commands/stake/auto_staking.py b/bittensor_cli/src/commands/stake/auto_staking.py index 3d7888321..0afee7bce 100644 --- a/bittensor_cli/src/commands/stake/auto_staking.py +++ b/bittensor_cli/src/commands/stake/auto_staking.py @@ -4,12 +4,12 @@ from bittensor_wallet import Wallet from rich import box -from rich.table import Table from bittensor_cli.src import COLOR_PALETTE from bittensor_cli.src.bittensor.utils import ( confirm_action, console, + create_table, json_console, print_success, get_subnet_name, @@ -127,7 +127,7 @@ def resolve_identity(hotkey: str) -> Optional[str]: json_console.print(json.dumps(data_output)) return data_output - table = Table( + table = create_table( title=( f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Auto Stake Destinations" f" for [bold]{coldkey_display}[/bold]\n" @@ -135,13 +135,6 @@ def resolve_identity(hotkey: str) -> Optional[str]: f"Coldkey: {coldkey_ss58}\n" f"[/{COLOR_PALETTE['GENERAL']['HEADER']}]" ), - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, box=box.SIMPLE_HEAD, ) @@ -214,18 +207,11 @@ async def set_auto_stake_destination( hotkey_identity = delegate_info.display if prompt_user and not json_output: - table = Table( + table = create_table( title=( f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Confirm Auto-Stake Destination" f"[/{COLOR_PALETTE['GENERAL']['HEADER']}]" ), - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, box=box.SIMPLE_HEAD, ) table.add_column( diff --git a/bittensor_cli/src/commands/stake/move.py b/bittensor_cli/src/commands/stake/move.py index 402bd4f26..2e83e78cd 100644 --- a/bittensor_cli/src/commands/stake/move.py +++ b/bittensor_cli/src/commands/stake/move.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING, Optional from bittensor_wallet import Wallet -from rich.table import Table from rich.prompt import Prompt from bittensor_cli.src import COLOR_PALETTE @@ -16,6 +15,7 @@ from bittensor_cli.src.bittensor.utils import ( confirm_action, console, + create_table, print_error, group_subnets, get_subnet_name, @@ -167,7 +167,7 @@ async def display_stake_movement_cross_subnets( ) # Create and display table - table = Table( + table = create_table( title=( f"\n[{COLOR_PALETTE.G.HEADER}]" f"Moving stake from: " @@ -178,14 +178,6 @@ async def display_stake_movement_cross_subnets( f"[/{COLOR_PALETTE.G.SUBHEAD}]\nNetwork: {subtensor.network}\n" f"[/{COLOR_PALETTE.G.HEADER}]" ), - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column( @@ -352,16 +344,8 @@ async def stake_move_transfer_selection( raise ValueError # Display hotkeys with stakes - table = Table( + table = create_table( title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Hotkeys with Stakes\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column("Index", justify="right") table.add_column("Identity", style=COLOR_PALETTE["GENERAL"]["SUBHEADING"]) @@ -405,13 +389,12 @@ async def stake_move_transfer_selection( origin_hotkey_ss58 = origin_hotkey_info["hotkey_ss58"] # Display available netuids for selected hotkey - table = Table( + table = create_table( title=f"\n[{COLOR_PALETTE.G.HEADER}]Available Stakes for Hotkey\n[/{COLOR_PALETTE.G.HEADER}]" f"[{COLOR_PALETTE.G.HK}]{origin_hotkey_ss58}[/{COLOR_PALETTE.G.HK}]\n", - show_edge=False, - header_style="bold white", - border_style="bright_black", - title_justify="center", + show_footer=False, + show_lines=True, + pad_edge=False, width=len(origin_hotkey_ss58) + 20, ) table.add_column("Netuid", style="cyan") @@ -483,13 +466,12 @@ async def stake_swap_selection( raise ValueError # Display available stakes - table = Table( + table = create_table( title=f"\n[{COLOR_PALETTE.G.HEADER}]Available Stakes for Hotkey\n[/{COLOR_PALETTE.G.HEADER}]" f"[{COLOR_PALETTE.G.HK}]{wallet.hotkey_str}: {hotkey_ss58}[/{COLOR_PALETTE.G.HK}]\n", - show_edge=False, - header_style="bold white", - border_style="bright_black", - title_justify="center", + show_footer=False, + show_lines=True, + pad_edge=False, width=len(hotkey_ss58) + 20, ) diff --git a/bittensor_cli/src/commands/stake/remove.py b/bittensor_cli/src/commands/stake/remove.py index bb8faceb5..9cc782be6 100644 --- a/bittensor_cli/src/commands/stake/remove.py +++ b/bittensor_cli/src/commands/stake/remove.py @@ -18,6 +18,7 @@ from bittensor_cli.src.bittensor.utils import ( confirm_action, console, + create_table, print_success, print_verbose, print_error, @@ -249,6 +250,10 @@ async def unstake( received_amount = sim_swap.tao_amount if not proxy: received_amount -= extrinsic_fee + + slippage_pct_float = sim_swap.tao_slippage_pct + slippage_pct = f"{slippage_pct_float:.4f} %" + max_float_slippage = max(slippage_pct_float, max_float_slippage) except ValueError: continue total_received_amount += received_amount @@ -274,7 +279,7 @@ async def unstake( str(sim_swap.alpha_fee), # Fee str(extrinsic_fee), # Extrinsic fee str(received_amount), # Received Amount - # slippage_pct, # Slippage Percent + slippage_pct, # Slippage Percent ] # Additional fields for safe unstaking @@ -443,6 +448,7 @@ async def unstake_all( return all_sn_dynamic_info = {info.netuid: info for info in all_sn_dynamic_info_} + max_float_slippage = 0.0 # Create table for unstaking all table_title = ( @@ -450,21 +456,13 @@ async def unstake_all( if not unstake_all_alpha else "Unstaking Summary - All Alpha Stakes" ) - table = Table( + table = create_table( title=( f"\n[{COLOR_PALETTE.G.HEADER}]{table_title}[/{COLOR_PALETTE.G.HEADER}]\n" f"Wallet: [{COLOR_PALETTE.G.COLDKEY}]{wallet.name}[/{COLOR_PALETTE.G.COLDKEY}], " f"Coldkey ss58: [{COLOR_PALETTE.G.CK}]{coldkey_ss58}[/{COLOR_PALETTE.G.CK}]\n" f"Network: [{COLOR_PALETTE.G.HEADER}]{subtensor.network}[/{COLOR_PALETTE.G.HEADER}]\n" ), - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column("Netuid", justify="center", style="grey89") table.add_column( @@ -495,11 +493,11 @@ async def unstake_all( justify="center", style=COLOR_PALETTE["POOLS"]["TAO_EQUIV"], ) - # table.add_column( - # "Slippage", - # justify="center", - # style=COLOR_PALETTE["STAKE"]["SLIPPAGE_PERCENT"], - # ) + table.add_column( + "Slippage", + justify="center", + style=COLOR_PALETTE["STAKE"]["SLIPPAGE_PERCENT"], + ) # Calculate total received total_received_value = Balance(0) @@ -531,6 +529,10 @@ async def unstake_all( if received_amount < Balance.from_tao(0): print_error("Not enough Alpha to pay the transaction fee.") continue + + slippage_pct_float = sim_swap.tao_slippage_pct + slippage_pct = f"{slippage_pct_float:.4f} %" + max_float_slippage = max(slippage_pct_float, max_float_slippage) except (AttributeError, ValueError): continue @@ -545,8 +547,9 @@ async def unstake_all( str(sim_swap.alpha_fee), str(extrinsic_fee), str(received_amount), + slippage_pct, ) - console.print(table) + _print_table_and_slippage(table, max_float_slippage, False) console.print( f"Total expected return: [{COLOR_PALETTE['STAKE']['STAKE_AMOUNT']}]{total_received_value}" @@ -1056,16 +1059,8 @@ async def _unstake_selection( # Display existing hotkeys, id, and staked netuids. subnet_filter = f" for Subnet {netuid}" if netuid is not None else "" - table = Table( + table = create_table( title=f"\n[{COLOR_PALETTE.G.HEADER}]Hotkeys with Stakes{subnet_filter}\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column("Index", justify="right") table.add_column("Identity", style=COLOR_PALETTE.G.SUBHEAD) @@ -1093,18 +1088,10 @@ async def _unstake_selection( netuid_stakes = hotkey_stakes[selected_hotkey_ss58] # Display hotkey's staked netuids with amount. - table = Table( + table = create_table( title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Stakes for hotkey \n" f"[{COLOR_PALETTE['GENERAL']['SUBHEADING']}]{selected_hotkey_name}\n" f"{selected_hotkey_ss58}\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column("Subnet", justify="right") table.add_column("Symbol", style=COLOR_PALETTE["GENERAL"]["SYMBOL"]) @@ -1342,16 +1329,8 @@ def _create_unstake_table( f"Coldkey ss58: [{COLOR_PALETTE.G.CK}]{wallet_coldkey_ss58}[/{COLOR_PALETTE.G.CK}]\n" f"Network: {network}[/{COLOR_PALETTE.G.HEADER}]\n" ) - table = Table( + table = create_table( title=title, - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column("Netuid", justify="center", style="grey89") @@ -1382,9 +1361,9 @@ def _create_unstake_table( style=COLOR_PALETTE["POOLS"]["TAO_EQUIV"], footer=str(total_received_amount), ) - # table.add_column( - # "Slippage", justify="center", style=COLOR_PALETTE["STAKE"]["SLIPPAGE_PERCENT"] - # ) + table.add_column( + "Slippage", justify="center", style=COLOR_PALETTE["STAKE"]["SLIPPAGE_PERCENT"] + ) if safe_staking: table.add_column( f"Rate with tolerance: [blue]({rate_tolerance * 100}%)[/blue]", diff --git a/bittensor_cli/src/commands/stake/wizard.py b/bittensor_cli/src/commands/stake/wizard.py index f1886f65e..1b11f93c3 100644 --- a/bittensor_cli/src/commands/stake/wizard.py +++ b/bittensor_cli/src/commands/stake/wizard.py @@ -10,12 +10,12 @@ from bittensor_wallet import Wallet from rich.prompt import Prompt -from rich.table import Table from rich.panel import Panel from bittensor_cli.src import COLOR_PALETTE from bittensor_cli.src.bittensor.utils import ( console, + create_table, print_error, is_valid_ss58_address, get_hotkey_pub_ss58, @@ -159,12 +159,11 @@ def get_identity(hotkey_ss58_: str) -> str: return old_identity.display return "~" - table = Table( + table = create_table( title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Your Available Stakes[/{COLOR_PALETTE['GENERAL']['HEADER']}]\n", - show_edge=False, - header_style="bold white", - border_style="bright_black", - title_justify="center", + show_footer=False, + show_lines=True, + pad_edge=False, ) table.add_column("Hotkey Identity", style=COLOR_PALETTE["GENERAL"]["SUBHEADING"]) diff --git a/bittensor_cli/src/commands/subnets/subnets.py b/bittensor_cli/src/commands/subnets/subnets.py index 57a247103..f627de7fb 100644 --- a/bittensor_cli/src/commands/subnets/subnets.py +++ b/bittensor_cli/src/commands/subnets/subnets.py @@ -30,6 +30,7 @@ confirm_action, console, create_and_populate_table, + create_table, print_success, print_verbose, print_error, @@ -364,17 +365,9 @@ def define_table( tao_emission_percentage: str, total_tao_flow_ema: float, ): - defined_table = Table( + defined_table = create_table( title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Subnets" f"\nNetwork: [{COLOR_PALETTE['GENERAL']['SUBHEADING']}]{subtensor.network}\n\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) defined_table.add_column( @@ -602,7 +595,7 @@ def dict_table(subnets_, block_number_, mechanisms, ema_tao_inflow) -> dict: tao_flow_ema = None if netuid in ema_tao_inflow: tao_flow_ema = ema_tao_inflow[netuid].tao - total_tao_flow_ema += tao_flow_ema.tao + total_tao_flow_ema += tao_flow_ema subnet_rows[netuid] = { "netuid": netuid, "subnet_name": subnet_name, @@ -1095,17 +1088,9 @@ async def show_root(): tao_sum = sum(root_state.tao_stake).tao - table = Table( + table = create_table( title=f"[{COLOR_PALETTE.G.HEADER}]Root Network\n[{COLOR_PALETTE.G.SUBHEAD}]" f"Network: {subtensor.network}[/{COLOR_PALETTE.G.SUBHEAD}]\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column("[bold white]Position", style="white", justify="center") @@ -1340,18 +1325,10 @@ async def show_subnet( # Define table properties mechanism_label = f"Mechanism {selected_mechanism_id}" - table = Table( + table = create_table( title=f"[{COLOR_PALETTE['GENERAL']['HEADER']}]Subnet [{COLOR_PALETTE['GENERAL']['SUBHEADING']}]{netuid_}" f"{': ' + get_subnet_name(subnet_info)}" f"\n[{COLOR_PALETTE['GENERAL']['SUBHEADING']}]Network: {subtensor.network} โ€ข {mechanism_label}[/{COLOR_PALETTE['GENERAL']['SUBHEADING']}]\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) # For table footers @@ -1892,22 +1869,13 @@ async def _storage_key(storage_fn: str) -> StorageKey: return if prompt and not json_output: - # TODO make this a reusable function, also used in subnets list # Show creation table. - table = Table( + table = create_table( title=( f"\n[{COLOR_PALETTE.G.HEADER}]" f"Register to [{COLOR_PALETTE.G.SUBHEAD}]netuid: {netuid}[/{COLOR_PALETTE.G.SUBHEAD}]" f"\nNetwork: [{COLOR_PALETTE.G.SUBHEAD}]{subtensor.network}[/{COLOR_PALETTE.G.SUBHEAD}]\n" ), - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) table.add_column( "Netuid", style="rgb(253,246,227)", no_wrap=True, justify="center" @@ -2476,16 +2444,8 @@ async def metagraph_cmd( table_cols_indices.append(idx) table_cols.append(v) - table = Table( + table = create_table( *table_cols, - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_style="bold white", - title_justify="center", - show_lines=False, expand=True, title=( f"[underline dark_orange]Metagraph[/underline dark_orange]\n\n" @@ -2496,7 +2456,6 @@ async def metagraph_cmd( f"Issuance: [bright_blue]{metadata_info['issuance']}[/bright_blue], " f"Difficulty: [bright_cyan]{metadata_info['difficulty']}[/bright_cyan]\n" ), - pad_edge=True, ) if all(x is False for x in display_cols.values()): @@ -2521,7 +2480,7 @@ def create_identity_table(title: str = None): if not title: title = "Subnet Identity" - table = Table( + table = create_table( Column( "Item", justify="right", @@ -2530,14 +2489,6 @@ def create_identity_table(title: str = None): ), Column("Value", style=COLOR_PALETTE["GENERAL"]["SUBHEADING"]), title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]{title}\n", - show_footer=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, ) return table diff --git a/bittensor_cli/src/commands/sudo.py b/bittensor_cli/src/commands/sudo.py index ec6d1461c..4561c7227 100644 --- a/bittensor_cli/src/commands/sudo.py +++ b/bittensor_cli/src/commands/sudo.py @@ -18,10 +18,16 @@ DelegatesDetails, COLOR_PALETTE, ) +from bittensor_cli.src.bittensor.balances import Balance +from bittensor_cli.src.bittensor.extrinsics.mev_shield import ( + extract_mev_shield_id, + wait_for_extrinsic_by_hash, +) from bittensor_cli.src.bittensor.chain_data import decode_account_id from bittensor_cli.src.bittensor.utils import ( confirm_action, console, + create_table, print_error, print_success, print_verbose, @@ -94,6 +100,236 @@ def string_to_bool(val) -> Union[bool, Type[ValueError]]: return ValueError +async def stake_burn( + wallet: Wallet, + subtensor: "SubtensorInterface", + netuid: int, + amount: float, + hotkey_ss58: Optional[str], + safe_staking: bool, + proxy: Optional[str], + rate_tolerance: Optional[float], + mev_protection: bool, + json_output: bool, + prompt: bool, + decline: bool, + quiet: bool, + period: int, +) -> bool: + """ + Perform a stake burn (owner-only). + Stakes TAO into the subnet and immediately burns the acquired alpha. + """ + subnet_owner = await subtensor.query( + module="SubtensorModule", + storage_function="SubnetOwner", + params=[netuid], + ) + if subnet_owner != wallet.coldkeypub.ss58_address: + err_msg = ( + f"Coldkey {wallet.coldkeypub.ss58_address} does not own subnet {netuid}." + ) + if json_output: + json_console.print_json( + data={ + "success": False, + "message": err_msg, + "extrinsic_identifier": None, + } + ) + else: + print_error(err_msg) + return False + + subnet_info = await subtensor.subnet(netuid=netuid) + stake_burn_amount = Balance.from_tao(amount) + rate_tolerance = rate_tolerance if rate_tolerance is not None else 0.0 + + price_limit: Optional[Balance] = None + if safe_staking: + price_limit = Balance.from_tao(subnet_info.price.tao * (1 + rate_tolerance)) + + call_params = { + "hotkey": hotkey_ss58, + "netuid": netuid, + "amount": stake_burn_amount.rao, + "limit": price_limit.rao if price_limit else None, + } + + call = await subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function="add_stake_burn", + call_params=call_params, + ) + + if not json_output: + extrinsic_fee = await subtensor.get_extrinsic_fee( + call, wallet.coldkeypub, proxy=proxy + ) + amount_minus_fee = stake_burn_amount - extrinsic_fee + sim_swap = await subtensor.sim_swap( + origin_netuid=0, + destination_netuid=netuid, + amount=amount_minus_fee.rao, + ) + + received_amount = sim_swap.alpha_amount + current_price_float = subnet_info.price.tao + rate = 1.0 / current_price_float + + table = _define_stake_burn_table( + wallet=wallet, + subtensor=subtensor, + safe_staking=safe_staking, + rate_tolerance=rate_tolerance, + ) + row = [ + str(netuid), + hotkey_ss58, + str(stake_burn_amount), + str(rate) + f" {Balance.get_unit(netuid)}/{Balance.get_unit(0)} ", + str(received_amount.set_unit(netuid)), + str(sim_swap.tao_fee), + str(extrinsic_fee), + ] + if safe_staking: + price_with_tolerance = current_price_float * (1 + rate_tolerance) + rate_with_tolerance = 1.0 / price_with_tolerance + rate_with_tolerance_str = ( + f"{rate_with_tolerance:.4f} " + f"{Balance.get_unit(netuid)}/{Balance.get_unit(0)} " + ) + row.append(rate_with_tolerance_str) + + table.add_row(*row) + console.print(table) + + if prompt and not confirm_action( + "Would you like to continue?", decline=decline, quiet=quiet + ): + print_error("User aborted.") + return False + + if not unlock_key(wallet).success: + return False + + with console.status( + f":satellite: Performing subnet stake burn on [bold]{netuid}[/bold]...", + spinner="earth", + ) as status: + next_nonce = await subtensor.substrate.get_account_next_index( + wallet.coldkeypub.ss58_address + ) + success, err_msg, ext_receipt = await subtensor.sign_and_send_extrinsic( + call=call, + wallet=wallet, + nonce=next_nonce, + era={"period": period}, + proxy=proxy, + mev_protection=mev_protection, + ) + + if not success: + if json_output: + json_console.print_json( + data={ + "success": False, + "message": err_msg, + "extrinsic_identifier": None, + } + ) + else: + print_error(err_msg, status=status) + return False + + if mev_protection: + inner_hash = err_msg + mev_shield_id = await extract_mev_shield_id(ext_receipt) + mev_success, mev_error, ext_receipt = await wait_for_extrinsic_by_hash( + subtensor=subtensor, + extrinsic_hash=inner_hash, + shield_id=mev_shield_id, + submit_block_hash=ext_receipt.block_hash, + status=status, + ) + if not mev_success: + status.stop() + if json_output: + json_console.print_json( + data={ + "success": False, + "message": mev_error, + "extrinsic_identifier": None, + } + ) + else: + print_error(mev_error, status=status) + return False + + ext_id = await ext_receipt.get_extrinsic_identifier() + + msg = f"Subnet stake burn succeeded on SN{netuid}." + if json_output: + json_console.print_json( + data={"success": True, "message": msg, "extrinsic_identifier": ext_id} + ) + else: + await print_extrinsic_id(ext_receipt) + print_success(msg) + + return True + + +def _define_stake_burn_table( + wallet: Wallet, + subtensor: "SubtensorInterface", + safe_staking: bool, + rate_tolerance: float, +) -> Table: + table = create_table( + title=f"\n[{COLOR_PALETTE.G.HEADER}]Subnet Buyback:\n" + f"Wallet: [{COLOR_PALETTE.G.CK}]{wallet.name}[/{COLOR_PALETTE.G.CK}], " + f"Coldkey ss58: [{COLOR_PALETTE.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLOR_PALETTE.G.CK}]\n" + f"Network: {subtensor.network}[/{COLOR_PALETTE.G.HEADER}]\n", + ) + table.add_column("Netuid", justify="center", style="grey89") + table.add_column( + "Hotkey", justify="center", style=COLOR_PALETTE["GENERAL"]["HOTKEY"] + ) + table.add_column( + "Amount (ฯ„)", + justify="center", + style=COLOR_PALETTE["POOLS"]["TAO"], + ) + table.add_column( + "Rate (per ฯ„)", + justify="center", + style=COLOR_PALETTE["POOLS"]["RATE"], + ) + table.add_column( + "Est. Burned", + justify="center", + style=COLOR_PALETTE["POOLS"]["TAO_EQUIV"], + ) + table.add_column( + "Fee (ฯ„)", + justify="center", + style=COLOR_PALETTE["STAKE"]["STAKE_AMOUNT"], + ) + table.add_column( + "Extrinsic Fee (ฯ„)", + justify="center", + style=COLOR_PALETTE.STAKE.TAO, + ) + if safe_staking: + table.add_column( + f"Rate with tolerance: [blue]({rate_tolerance * 100}%)[/blue]", + justify="center", + style=COLOR_PALETTE["POOLS"]["RATE"], + ) + return table + + def search_metadata( param_name: str, value: Union[str, bool, float, list[float]], @@ -126,8 +362,19 @@ def type_converter_with_retry(type_, val, arg_name): return arg_types[type_](val) except ValueError: return type_converter_with_retry(type_, None, arg_name) + except KeyError: + print_error( + f"Type {type_} is not recognized. " + "You will be unable to set this parameter via this command.\n" + "Some hyperparams must be set by their dedicated command, such as `btcli subnets mech`" + ) - arg_types = {"bool": string_to_bool, "u16": string_to_u16, "u64": string_to_u64} + arg_types = { + "bool": string_to_bool, + "u16": string_to_u16, + "u64": string_to_u64, + "MechId": int, + } arg_type_output = {"bool": "bool", "u16": "float", "u64": "float"} call_crafter = {"netuid": netuid} diff --git a/bittensor_cli/src/commands/wallets.py b/bittensor_cli/src/commands/wallets.py index 1b15ff2c8..abd76d354 100644 --- a/bittensor_cli/src/commands/wallets.py +++ b/bittensor_cli/src/commands/wallets.py @@ -1,4 +1,5 @@ import asyncio +import hashlib import itertools import json import os @@ -15,31 +16,32 @@ from rich.table import Column, Table from rich.tree import Tree from rich.padding import Padding -from bittensor_cli.src import COLOR_PALETTE, COLORS, Constants +from bittensor_cli.src import COLOR_PALETTE, COLORS from bittensor_cli.src.bittensor import utils from bittensor_cli.src.bittensor.balances import Balance from bittensor_cli.src.bittensor.chain_data import ( DelegateInfo, NeuronInfoLite, ) +from bittensor_cli.src.bittensor.extrinsics.mev_shield import ( + extract_mev_shield_id, + wait_for_extrinsic_by_hash, +) from bittensor_cli.src.bittensor.extrinsics.registration import ( run_faucet_extrinsic, swap_hotkey_extrinsic, ) from bittensor_cli.src.bittensor.extrinsics.transfer import transfer_extrinsic from bittensor_cli.src.bittensor.networking import int_to_ip -from bittensor_cli.src.bittensor.subtensor_interface import ( - SubtensorInterface, - GENESIS_ADDRESS, -) +from bittensor_cli.src.bittensor.subtensor_interface import SubtensorInterface from bittensor_cli.src.bittensor.utils import ( RAO_PER_TAO, confirm_action, console, - convert_blocks_to_time, json_console, print_error, print_verbose, + print_success, get_all_wallets_for_path, get_hotkey_wallets_for_wallet, is_valid_ss58_address, @@ -49,7 +51,6 @@ unlock_key, WalletLike, blocks_to_duration, - decode_account_id, get_hotkey_pub_ss58, print_extrinsic_id, ) @@ -374,7 +375,7 @@ async def new_hotkey( if uri: try: keypair = Keypair.create_from_uri(uri) - except Exception as e: + except TypeError as e: print_error(f"Failed to create keypair from URI {uri}: {str(e)}") return wallet.set_hotkey(keypair=keypair, encrypt=use_password) @@ -425,7 +426,7 @@ async def new_coldkey( if uri: try: keypair = Keypair.create_from_uri(uri) - except Exception as e: + except TypeError as e: print_error(f"Failed to create keypair from URI {uri}: {str(e)}") wallet.set_coldkey(keypair=keypair, encrypt=False, overwrite=False) wallet.set_coldkeypub(keypair=keypair, encrypt=False, overwrite=False) @@ -498,7 +499,7 @@ async def wallet_create( "hotkey_ss58": wallet.hotkeypub.ss58_address, "coldkey_ss58": wallet.coldkeypub.ss58_address, } - except Exception as e: + except (ValueError, TypeError, KeyFileError) as e: err = f"Failed to create keypair from URI: {str(e)}" print_error(err) output_dict["error"] = err @@ -1766,11 +1767,16 @@ async def swap_hotkey( return result -def create_identity_table(title: str = None): - if not title: - title = "On-Chain Identity" +def create_key_value_table(title: str = "Details") -> Table: + """Creates a key-value table for displaying information for various cmds. - table = Table( + Args: + title: The title shown above the table. + + Returns: + A Rich Table for key-value display. + """ + return Table( Column( "Item", justify="right", @@ -1788,7 +1794,6 @@ def create_identity_table(title: str = None): show_lines=False, pad_edge=True, ) - return table async def set_id( @@ -1845,7 +1850,7 @@ async def set_id( output_dict["success"] = True identity = await subtensor.query_identity(wallet.coldkeypub.ss58_address) - table = create_identity_table(title="New on-chain Identity") + table = create_key_value_table(title="New on-chain Identity\n") table.add_row("Address", wallet.coldkeypub.ss58_address) for key, value in identity.items(): table.add_row(key, str(value) if value else "~") @@ -1879,7 +1884,7 @@ async def get_id( json_console.print("{}") return {} - table = create_identity_table(title) + table = create_key_value_table(title) table.add_row("Address", ss58_address) for key, value in identity.items(): table.add_row(key, str(value) if value else "~") @@ -1890,43 +1895,6 @@ async def get_id( return identity -async def check_coldkey_swap(wallet: Wallet, subtensor: SubtensorInterface): - arbitration_check = len( # TODO verify this works - ( - await subtensor.query( - module="SubtensorModule", - storage_function="ColdkeySwapDestinations", - params=[wallet.coldkeypub.ss58_address], - ) - ) - ) - if arbitration_check == 0: - console.print( - "[green]There has been no previous key swap initiated for your coldkey.[/green]" - ) - elif arbitration_check == 1: - arbitration_block = await subtensor.query( - module="SubtensorModule", - storage_function="ColdkeyArbitrationBlock", - params=[wallet.coldkeypub.ss58_address], - ) - arbitration_remaining = ( - arbitration_block.value - await subtensor.substrate.get_block_number(None) - ) - - hours, minutes, seconds = convert_blocks_to_time(arbitration_remaining) - console.print( - "[yellow]There has been 1 swap request made for this coldkey already." - " By adding another swap request, the key will enter arbitration." - f" Your key swap is scheduled for {hours} hours, {minutes} minutes, {seconds} seconds" - " from now.[/yellow]" - ) - elif arbitration_check > 1: - console.print( - f"[red]This coldkey is currently in arbitration with a total swaps of {arbitration_check}.[/red]" - ) - - async def sign( wallet: Wallet, message: str, use_hotkey: bool, json_output: bool = False ): @@ -2040,272 +2008,612 @@ async def verify( return is_valid -async def schedule_coldkey_swap( +def compute_coldkey_hash(ss58_address: str) -> str: + """ + Compute Blake2b-256 hash of a coldkey AccountId. + + Args: + ss58_address: SS58 address of the coldkey. + + Returns: + str: 0x-prefixed hex hash. + + Notes: + Hashes AccountId bytes (not the SS58). Used by coldkey-swap announcements. + """ + keypair = Keypair(ss58_address=ss58_address) + public_key_bytes = keypair.public_key + + hash_result = hashlib.blake2b(public_key_bytes, digest_size=32) + return "0x" + hash_result.hexdigest() + + +async def announce_coldkey_swap( wallet: Wallet, subtensor: SubtensorInterface, new_coldkey_ss58: str, - force_swap: bool = False, decline: bool = False, quiet: bool = False, - proxy: Optional[str] = None, + prompt: bool = True, + mev_protection: bool = False, ) -> bool: - """Schedules a coldkey swap operation to be executed at a future block. + """Announces intent to swap a coldkey to a new address. + + This is the first step of a two-step coldkey swap process. After announcing, + the user must wait for the announcement delay period to pass before executing + the swap with execute_coldkey_swap. Args: - wallet (Wallet): The wallet initiating the coldkey swap - subtensor (SubtensorInterface): Connection to the Bittensor network - new_coldkey_ss58 (str): SS58 address of the new coldkey - force_swap (bool, optional): Whether to force the swap even if the new coldkey is already scheduled for a swap. Defaults to False. + wallet: The wallet initiating the coldkey swap. + subtensor: Connection to the Bittensor network. + new_coldkey_ss58: SS58 address of the new coldkey. + decline: If True, skip confirmation prompt and decline. + quiet: If True, skip confirmation prompt and proceed. + mev_protection: If True, encrypt the extrinsic with MEV protection. + Returns: - bool: True if the swap was scheduled successfully, False otherwise + True if the announcement was successful, False otherwise. """ if not is_valid_ss58_address(new_coldkey_ss58): print_error(f"Invalid SS58 address format: {new_coldkey_ss58}") return False - scheduled_coldkey_swap = await subtensor.get_scheduled_coldkey_swap() - if wallet.coldkeypub.ss58_address in scheduled_coldkey_swap: - print_error( - f"Coldkey {wallet.coldkeypub.ss58_address} is already scheduled for a swap." - ) - console.print("[dim]Use the force_swap (--force) flag to override this.[/dim]") - if not force_swap: + # Check for existing announcement + block_hash = await subtensor.substrate.get_chain_head() + new_coldkey_hash = compute_coldkey_hash(new_coldkey_ss58) + + existing = await subtensor.get_coldkey_swap_announcement( + wallet.coldkeypub.ss58_address, + block_hash=block_hash, + ) + if existing: + current_block, reannounce_delay, announce_delay = await asyncio.gather( + subtensor.substrate.get_block_number(block_hash=block_hash), + subtensor.get_coldkey_swap_reannouncement_delay(block_hash=block_hash), + subtensor.get_coldkey_swap_announcement_delay(block_hash=block_hash), + ) + remaining = existing.execution_block - current_block + reannounce_block = existing.execution_block + reannounce_delay + same_hash = new_coldkey_hash.lower() == str(existing.new_coldkey_hash).lower() + + # Show existing announcement info + table = create_key_value_table("Existing Coldkey Swap Announcement") + table.add_row("Execution Block", str(existing.execution_block)) + if remaining > 0: + table.add_row( + "Time Remaining", f"[yellow]{blocks_to_duration(remaining)}[/yellow]" + ) + table.add_row("Status", "[yellow]Pending[/yellow]") + else: + table.add_row("Status", "[green]Ready to Execute[/green]") + table.add_row("Announced Hash", f"[dim]{existing.new_coldkey_hash}[/dim]") + table.add_row("Requested Hash", f"[dim]{new_coldkey_hash}[/dim]") + table.add_row("Match", "[green]Yes[/green]" if same_hash else "[red]No[/red]") + console.print(table) + + # Check if reannouncement allowed + if current_block < reannounce_block: + time_until_reannounce = blocks_to_duration(reannounce_block - current_block) + console.print( + f"\n[dim]You can reannounce after block {reannounce_block} ({time_until_reannounce} from now).[/dim]", + f"Current block: {current_block}", + ) return False + + # Reannouncement allowed + if same_hash: + console.print( + "\n[yellow]You already have an announcement for this coldkey.[/yellow] " + "You can execute the existing swap without reannouncing." + ) + if prompt and not confirm_action( + "Do you still want to reannounce the same hash (the period to wait before executing the swap will be reset)?", + decline=decline, + quiet=quiet, + ): + return False else: console.print( - "[yellow]Continuing with the swap due to force_swap flag.[/yellow]\n" + f"\n[dim]Reannouncing with a different coldkey will reset the waiting period " + f"to {blocks_to_duration(announce_delay)} from now.[/dim]" ) + if prompt and not confirm_action( + "Proceed with reannouncement and reset the waiting period?", + decline=decline, + quiet=quiet, + ): + return False + + # Proceed with the announcement + swap_cost, delay = await asyncio.gather( + subtensor.get_coldkey_swap_cost(block_hash=block_hash), + subtensor.get_coldkey_swap_announcement_delay(block_hash=block_hash), + ) - prompt_msg = ( - "You are [red]swapping[/red] your [blue]coldkey[/blue] to a new address.\n" - f"Current ss58: [{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]\n" - f"New ss58: [{COLORS.G.CK}]{new_coldkey_ss58}[/{COLORS.G.CK}]\n" - "Are you sure you want to continue?" + table = create_key_value_table("Announcing Coldkey Swap\n") + table.add_row( + "Current Coldkey", + f"[{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]", + ) + table.add_row("New Coldkey", f"[{COLORS.G.CK}]{new_coldkey_ss58}[/{COLORS.G.CK}]") + table.add_row( + "New Coldkey Hash", + f"[dim]{new_coldkey_hash}[/dim]", + ) + table.add_row("Swap Cost", f"[green]{swap_cost}[/green]") + table.add_row( + "Delay Before Execution", f"[yellow]{blocks_to_duration(delay)}[/yellow]" ) - if not confirm_action(prompt_msg, decline=decline, quiet=quiet): + console.print(table) + + if prompt and not confirm_action( + "Are you sure you want to continue?", decline=decline, quiet=quiet + ): return False if not unlock_key(wallet).success: return False - block_pre_call, call = await asyncio.gather( - subtensor.substrate.get_block_number(), - subtensor.substrate.compose_call( + with console.status(":satellite: Announcing coldkey swap on-chain...") as status: + call = await subtensor.substrate.compose_call( call_module="SubtensorModule", - call_function="schedule_swap_coldkey", + call_function="announce_coldkey_swap", call_params={ - "new_coldkey": new_coldkey_ss58, + "new_coldkey_hash": new_coldkey_hash, }, - ), - ) - swap_info = None - with console.status(":satellite: Scheduling coldkey swap on-chain..."): + ) success, err_msg, ext_receipt = await subtensor.sign_and_send_extrinsic( call, wallet, wait_for_inclusion=True, wait_for_finalization=True, - proxy=proxy, + mev_protection=mev_protection, ) - block_post_call = await subtensor.substrate.get_block_number() if not success: - print_error(f"Failed to schedule coldkey swap: {err_msg}") + print_error(f"Failed to announce coldkey swap: {err_msg}") return False - console.print( - ":white_heavy_check_mark: [green]Successfully scheduled coldkey swap" - ) + if mev_protection: + inner_hash = err_msg + mev_shield_id = await extract_mev_shield_id(ext_receipt) + mev_success, mev_error, ext_receipt = await wait_for_extrinsic_by_hash( + subtensor=subtensor, + extrinsic_hash=inner_hash, + shield_id=mev_shield_id, + submit_block_hash=ext_receipt.block_hash, + status=status, + ) + if not mev_success: + print_error( + f"Failed to announce coldkey swap: {mev_error}", status=status + ) + return False + + print_success("[dark_sea_green3]Successfully announced coldkey swap") await print_extrinsic_id(ext_receipt) - for event in await ext_receipt.triggered_events: - if ( - event.get("event", {}).get("module_id") == "SubtensorModule" - and event.get("event", {}).get("event_id") == "ColdkeySwapScheduled" - ): - attributes = event["event"].get("attributes", {}) - old_coldkey = decode_account_id(attributes["old_coldkey"][0]) - - if old_coldkey == wallet.coldkeypub.ss58_address: - swap_info = { - "block_num": block_pre_call, - "dest_coldkey": decode_account_id(attributes["new_coldkey"][0]), - "execution_block": attributes["execution_block"], - } - if not swap_info: - swap_info = await find_coldkey_swap_extrinsic( - subtensor=subtensor, - start_block=block_pre_call, - end_block=block_post_call, - wallet_ss58=wallet.coldkeypub.ss58_address, + # Post-success information + new_block_hash = await subtensor.substrate.get_chain_head() + announcement = await subtensor.get_coldkey_swap_announcement( + wallet.coldkeypub.ss58_address, + block_hash=new_block_hash, + ) + if announcement: + current_block = await subtensor.substrate.get_block_number( + block_hash=new_block_hash ) + remaining = announcement.execution_block - current_block - if not swap_info: + details_table = create_key_value_table("Coldkey Swap Announced\n") + details_table.add_row( + "Original Coldkey", + f"[{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]", + ) + details_table.add_row( + "New Coldkey", f"[{COLORS.G.CK}]{new_coldkey_ss58}[/{COLORS.G.CK}]" + ) + details_table.add_row( + "New Coldkey Hash", + f"[dim]{new_coldkey_hash}[/dim]", + ) + details_table.add_row( + "Execution Block", f"[green]{announcement.execution_block}[/green]" + ) + details_table.add_row( + "Time Until Executable", f"[yellow]{blocks_to_duration(remaining)}[/yellow]" + ) + + console.print(details_table) console.print( - "[yellow]Warning: Could not find the swap extrinsic in recent blocks" + f"\n[dim]After the delay, run:" + f"\n[green]btcli wallet swap-coldkey execute --new-coldkey {new_coldkey_ss58}[/green]" ) - return True - console.print( - "\n[green]Coldkey swap details:[/green]" - f"\nBlock number: {swap_info['block_num']}" - f"\nOriginal address: [{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]" - f"\nDestination address: [{COLORS.G.CK}]{swap_info['dest_coldkey']}[/{COLORS.G.CK}]" - f"\nThe swap will be completed at block: [green]{swap_info['execution_block']}[/green]" - f"\n[dim]You can provide the block number to `btcli wallet swap-check`[/dim]" - ) + return True -async def find_coldkey_swap_extrinsic( +async def dispute_coldkey_swap( + wallet: Wallet, subtensor: SubtensorInterface, - start_block: int, - end_block: int, - wallet_ss58: str, -) -> dict: - """Search for a coldkey swap event in a range of blocks. + decline: bool = False, + quiet: bool = False, + prompt: bool = True, + mev_protection: bool = False, +) -> bool: + """Dispute a pending coldkey swap for the calling coldkey. + + Disputing freezes the current swap process until the triumvirate can intervene. Args: - subtensor: SubtensorInterface for chain queries - start_block: Starting block number to search - end_block: Ending block number to search (inclusive) - wallet_ss58: SS58 address of the signing wallet + wallet: Wallet initiating the dispute (must be the announcing coldkey). + subtensor: Connection to the Bittensor network. + decline: If True, default to declining at confirmation prompt. + quiet: If True, skip confirmation prompts and proceed. + mev_protection: If True, encrypt the extrinsic with MEV protection. Returns: - dict: Contains the following keys if found: - - block_num: Block number where swap was scheduled - - dest_coldkey: SS58 address of destination coldkey - - execution_block: Block number when swap will execute - Empty dict if not found + bool: True if the dispute extrinsic was included successfully, else False. """ + block_hash = await subtensor.substrate.get_chain_head() + announcement, dispute = await asyncio.gather( + subtensor.get_coldkey_swap_announcement( + wallet.coldkeypub.ss58_address, block_hash=block_hash + ), + subtensor.get_coldkey_swap_dispute( + wallet.coldkeypub.ss58_address, block_hash=block_hash + ), + ) + + if not announcement: + print_error( + f"No coldkey swap announcement found for {wallet.coldkeypub.ss58_address}.\n" + "You can only dispute an active announcement." + ) + return False - current_block, genesis_block = await asyncio.gather( - subtensor.substrate.get_block_number(), subtensor.substrate.get_block_hash(0) + if dispute is not None: + console.print( + f"[yellow]Swap already disputed at block {dispute}.[/yellow] " + "Account remains frozen until a root reset." + ) + return False + + current_block = await subtensor.substrate.get_block_number(block_hash=block_hash) + info = create_key_value_table("Dispute Coldkey Swap\n") + info.add_row( + "Coldkey", f"[{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]" ) - if ( - current_block - start_block > 300 - and genesis_block == Constants.genesis_block_hash_map["finney"] - ): - console.print("Querying archive node for coldkey swap events...") - await subtensor.substrate.close() - subtensor.substrate.chain_endpoint = Constants.archive_entrypoint - subtensor.substrate.url = Constants.archive_entrypoint - subtensor.substrate.initialized = False - await subtensor.substrate.initialize() - - block_hashes = await asyncio.gather( - *[ - subtensor.substrate.get_block_hash(block_num) - for block_num in range(start_block, end_block + 1) - ] + info.add_row("Execution Block", str(announcement.execution_block)) + info.add_row( + "Status", + "[yellow]Pending[/yellow]" + if current_block < announcement.execution_block + else "[green]Ready[/green]", ) - block_events = await asyncio.gather( - *[ - subtensor.substrate.get_events(block_hash=block_hash) - for block_hash in block_hashes - ] + info.add_row( + "Warning", + "[red]Disputing freezes the current swap process until the triumvirate can intervene.[/red]", ) + console.print(info) - for block_num, events in zip(range(start_block, end_block + 1), block_events): - for event in events: - if ( - event.get("event", {}).get("module_id") == "SubtensorModule" - and event.get("event", {}).get("event_id") == "ColdkeySwapScheduled" - ): - attributes = event["event"].get("attributes", {}) - old_coldkey = decode_account_id(attributes["old_coldkey"][0]) - - if old_coldkey == wallet_ss58: - return { - "block_num": block_num, - "dest_coldkey": decode_account_id(attributes["new_coldkey"][0]), - "execution_block": attributes["execution_block"], - } + if prompt and not confirm_action( + "Proceed with dispute? Your swap process will be frozen until the triumvirate can intervene.", + decline=decline, + quiet=quiet, + ): + return False - return {} + if not unlock_key(wallet).success: + return False + with console.status(":satellite: Disputing coldkey swap on-chain..."): + call = await subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function="dispute_coldkey_swap", + call_params={}, + ) + success, err_msg, ext_receipt = await subtensor.sign_and_send_extrinsic( + call, + wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + mev_protection=mev_protection, + ) -async def check_swap_status( + if not success: + print_error(f"Failed to dispute coldkey swap: {err_msg}") + return False + + print_success("[dark_sea_green3]Coldkey swap disputed.") + await print_extrinsic_id(ext_receipt) + + console.print( + "\n[dim]Your coldkey is now frozen. The triumvirate will need to intervene to clear the dispute.[/dim]" + ) + return True + + +async def execute_coldkey_swap( + wallet: Wallet, subtensor: SubtensorInterface, - origin_ss58: Optional[str] = None, - expected_block_number: Optional[int] = None, -) -> None: - """ - Check the status of a coldkey swap. + new_coldkey_ss58: str, + decline: bool = False, + quiet: bool = False, + prompt: bool = True, + mev_protection: bool = True, +) -> bool: + """Executes a previously announced coldkey swap. + + This is the second step of a two-step coldkey swap process. You must have + previously called announce_coldkey_swap and waited for the delay period. Args: - subtensor: Connection to the network - origin_ss58: The SS58 address of the original coldkey - expected_block_number: Optional block number where the swap was scheduled + wallet: The wallet executing the coldkey swap. + subtensor: Connection to the Bittensor network. + new_coldkey_ss58: SS58 address of the new coldkey (must match announcement). + decline: If True, skip confirmation prompt and decline. + quiet: If True, skip confirmation prompt and proceed. + mev_protection: If True, encrypt the extrinsic with MEV protection. + Returns: + True if the swap was executed successfully, False otherwise. """ + if not is_valid_ss58_address(new_coldkey_ss58): + print_error(f"Invalid SS58 address format: {new_coldkey_ss58}") + return False - if not origin_ss58: - scheduled_swaps = await subtensor.get_scheduled_coldkey_swap() - if not scheduled_swaps: - console.print("[yellow]No pending coldkey swaps found.[/yellow]") - return + if not mev_protection: + console.print( + "[yellow]WARNING: MEV protection is disabled.\n" + "This transaction is not protected & will expose the new coldkey.[/yellow]" + ) - table = Table( - Column( - "Original Coldkey", - justify="Left", - style=COLOR_PALETTE["GENERAL"]["SUBHEADING_MAIN"], - no_wrap=True, - ), - Column("Status", style="dark_sea_green3"), - title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Pending Coldkey Swaps\n", - show_header=True, - show_edge=False, - header_style="bold white", - border_style="bright_black", - style="bold", - title_justify="center", - show_lines=False, - pad_edge=True, + block_hash = await subtensor.substrate.get_chain_head() + announcement = await subtensor.get_coldkey_swap_announcement( + wallet.coldkeypub.ss58_address, + block_hash=block_hash, + ) + if not announcement: + print_error( + f"No pending coldkey swap announcement found for {wallet.coldkeypub.ss58_address}.\n" + "You must first announce your swap with 'btcli wallet swap-coldkey announce'." ) + return False - for coldkey in scheduled_swaps: - table.add_row(coldkey, "Pending") + expected_hash = compute_coldkey_hash(new_coldkey_ss58) + if announcement.new_coldkey_hash != expected_hash: + table = create_key_value_table("Coldkey Hash Mismatch\n") + table.add_row("Announced Hash", f"[dim]{announcement.new_coldkey_hash}[/dim]") + table.add_row("Provided Hash", f"[dim]{expected_hash}[/dim]") + console.print(table) + print_error( + "The provided coldkey does not match the announced hash.\n" + "Make sure you're using the same coldkey you announced." + ) + return False + current_block = await subtensor.substrate.get_block_number(block_hash=block_hash) + if current_block < announcement.execution_block: + remaining = announcement.execution_block - current_block + table = create_key_value_table("Coldkey Swap Not Ready") + table.add_row("Current Block", str(current_block)) + table.add_row("Execution Block", str(announcement.execution_block)) + table.add_row( + "Time Remaining", f"[yellow]{blocks_to_duration(remaining)}[/yellow]" + ) console.print(table) - console.print( - "\n[dim]Tip: Check specific swap details by providing the original coldkey " - "SS58 address and the block number.[/dim]" + print_error( + "Coldkey swap cannot be executed yet. Please wait for the delay period." ) - return - chain_reported_completion_block, destination_address = await subtensor.query( - "SubtensorModule", "ColdkeySwapScheduled", [origin_ss58] + return False + + # Display confirmation table + table = create_key_value_table("Executing Coldkey Swap\n") + table.add_row( + "Current Coldkey", + f"[{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]", ) - destination_address = decode_account_id(destination_address[0]) - if chain_reported_completion_block != 0 and destination_address != GENESIS_ADDRESS: - is_pending = True - else: - is_pending = False + table.add_row("New Coldkey", f"[{COLORS.G.CK}]{new_coldkey_ss58}[/{COLORS.G.CK}]") - if not is_pending: - console.print( - f"[red]No pending swap found for coldkey:[/red] [{COLORS.G.CK}]{origin_ss58}[/{COLORS.G.CK}]" + console.print(table) + console.print( + "\n[bold red]WARNING:[/bold red] This action is irreversible. All assets will be transferred.\n" + ) + + if prompt and not confirm_action( + "Are you sure you want to continue?", decline=decline, quiet=quiet + ): + return False + + if not unlock_key(wallet).success: + return False + + with console.status(":satellite: Executing coldkey swap on-chain...") as status: + call = await subtensor.substrate.compose_call( + call_module="SubtensorModule", + call_function="swap_coldkey_announced", + call_params={ + "new_coldkey": new_coldkey_ss58, + }, + ) + success, err_msg, ext_receipt = await subtensor.sign_and_send_extrinsic( + call, + wallet, + wait_for_inclusion=True, + wait_for_finalization=True, + mev_protection=mev_protection, ) - return - console.print( - f"[green]Found pending swap for coldkey:[/green] [{COLORS.G.CK}]{origin_ss58}[/{COLORS.G.CK}]" + if not success: + if "Custom error: 21" in err_msg: + print_error( + "Failed to execute coldkey swap: The swap has been disputed.\n" + "The account is frozen until the triumvirate resolves the dispute.\n", + status=status, + ) + else: + print_error(f"Failed to execute coldkey swap: {err_msg}", status=status) + return False + + if mev_protection: + inner_hash = err_msg + mev_shield_id = await extract_mev_shield_id(ext_receipt) + mev_success, mev_error, ext_receipt = await wait_for_extrinsic_by_hash( + subtensor=subtensor, + extrinsic_hash=inner_hash, + shield_id=mev_shield_id, + submit_block_hash=ext_receipt.block_hash, + status=status, + ) + if not mev_success: + print_error( + f"Failed to execute coldkey swap: {mev_error}", status=status + ) + return False + + print_success("[dark_sea_green3]Successfully executed coldkey swap!") + await print_extrinsic_id(ext_receipt) + + # Success details table + success_table = create_key_value_table("Coldkey Swap Completed\n") + success_table.add_row( + "Old Coldkey", + f"[{COLORS.G.CK}]{wallet.coldkeypub.ss58_address}[/{COLORS.G.CK}]", + ) + success_table.add_row( + "New Coldkey", f"[{COLORS.G.CK}]{new_coldkey_ss58}[/{COLORS.G.CK}]" + ) + console.print(success_table) + console.print("\n[dim]All assets have been transferred to the new coldkey.[/dim]") + + return True + + +async def check_swap_status( + subtensor: SubtensorInterface, + origin_ss58: Optional[str] = None, + json_output: bool = False, +) -> None: + """Retrieves and displays the status of coldkey swap announcements. + + Args: + subtensor: Connection to the network. + origin_ss58: The SS58 address of the coldkey to check. If None, shows all pending announcements. + json_output: If True, print JSON payload instead of rich table. + """ + block_hash = await subtensor.substrate.get_chain_head() + if origin_ss58: + announcement, dispute, current_block = await asyncio.gather( + subtensor.get_coldkey_swap_announcement(origin_ss58, block_hash=block_hash), + subtensor.get_coldkey_swap_dispute(origin_ss58, block_hash=block_hash), + subtensor.substrate.get_block_number(block_hash=block_hash), + ) + if not announcement: + console.print( + f"[yellow]No pending swap announcement found for coldkey:[/yellow] " + f"[{COLORS.G.CK}]{origin_ss58}[/{COLORS.G.CK}]" + ) + return + announcements = [announcement] + disputes = [(origin_ss58, dispute)] if dispute is not None else [] + + else: + announcements, disputes, current_block = await asyncio.gather( + subtensor.get_coldkey_swap_announcements(block_hash=block_hash), + subtensor.get_coldkey_swap_disputes(block_hash=block_hash), + subtensor.substrate.get_block_number(block_hash=block_hash), + ) + if not announcements: + console.print( + "[yellow]No pending coldkey swap announcements found.[/yellow]" + ) + if json_output: + json_console.print( + json.dumps( + { + "success": True, + "current_block": current_block, + "announcements": [], + } + ) + ) + return + + dispute_map = { + coldkey: block for coldkey, block in disputes if coldkey and block is not None + } + + payload = { + "success": True, + "current_block": current_block, + "announcements": [], + } + + table = Table( + Column( + "Coldkey", + justify="left", + style=COLOR_PALETTE["GENERAL"]["SUBHEADING_MAIN"], + no_wrap=True, + ), + Column("New Coldkey Hash", justify="left", style="dim", no_wrap=True), + Column("Execution Block", justify="right", style="dark_sea_green3"), + Column("Time Remaining", justify="right", style="yellow"), + Column("Status", justify="center", style="green"), + title=f"\n[{COLOR_PALETTE['GENERAL']['HEADER']}]Pending Coldkey Swap Announcements\nCurrent Block: {current_block}\n", + show_header=True, + show_edge=False, + header_style="bold white", + border_style="bright_black", + style="bold", + title_justify="center", + show_lines=False, + pad_edge=True, ) - if expected_block_number is None: - expected_block_number = chain_reported_completion_block + for announcement in announcements: + dispute_block = dispute_map.get(announcement.coldkey) + remaining_blocks = announcement.execution_block - current_block + if dispute_block is not None: + status = "[red]Disputed[/red]" + time_str = f"Disputed @ {dispute_block}" + status_label = "disputed" + elif remaining_blocks <= 0: + status = "Ready" + time_str = "[green]Ready[/green]" + status_label = "ready" + else: + status = "Pending" + time_str = blocks_to_duration(remaining_blocks) + status_label = "pending" + hash_display = f"{announcement.new_coldkey_hash[:12]}...{announcement.new_coldkey_hash[-6:]}" - current_block = await subtensor.substrate.get_block_number() - remaining_blocks = expected_block_number - current_block + table.add_row( + announcement.coldkey, + hash_display, + str(announcement.execution_block), + time_str, + status, + ) + + payload["announcements"].append( + { + "coldkey": announcement.coldkey, + "new_coldkey_hash": announcement.new_coldkey_hash, + "execution_block": announcement.execution_block, + "status": status_label, + "time_remaining_blocks": max(0, remaining_blocks), + "disputed_block": dispute_block, + } + ) - if remaining_blocks <= 0: - console.print("[green]Swap period has completed![/green]") + if json_output: + json_console.print(json.dumps(payload)) return + console.print(table) console.print( - "\n[green]Coldkey swap details:[/green]" - f"\nOriginal address: [{COLORS.G.CK}]{origin_ss58}[/{COLORS.G.CK}]" - f"\nDestination address: [{COLORS.G.CK}]{destination_address}[/{COLORS.G.CK}]" - f"\nCompletion block: {chain_reported_completion_block}" - f"\nTime remaining: {blocks_to_duration(remaining_blocks)}" + "\n[dim]To execute a ready swap:[/dim] " + "[green]btcli wallet swap-coldkey execute[/green]" ) diff --git a/pyproject.toml b/pyproject.toml index 0cfe2953b..9388281e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "flit_core.buildapi" [project] name = "bittensor-cli" -version = "9.18.1" +version = "9.19.0rc1" description = "Bittensor CLI" readme = "README.md" authors = [ @@ -12,12 +12,11 @@ authors = [ ] license = "MIT" scripts = { btcli = "bittensor_cli.cli:main" } -requires-python = ">=3.9" +requires-python = ">=3.10" classifiers = [ "Development Status :: 5 - Production/Stable", "Intended Audience :: End Users/Desktop", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", diff --git a/tests/e2e_tests/test_coldkey_swap.py b/tests/e2e_tests/test_coldkey_swap.py new file mode 100644 index 000000000..0f7f977cd --- /dev/null +++ b/tests/e2e_tests/test_coldkey_swap.py @@ -0,0 +1,425 @@ +import asyncio +import json +import time +from .utils import ( + find_stake_entries, +) + + +def _wait_until_block(substrate, target_block: int): + async def _wait(): + while True: + head = await substrate.get_chain_head() + current = await substrate.get_block_number(block_hash=head) + if current >= target_block: + return current + await asyncio.sleep(1) + + return asyncio.run(_wait()) + + +def test_coldkey_swap_with_stake(local_chain, wallet_setup): + """ + Coldkey swap with stake: + 0. Bob registers on root and adds stake. + 1. Bob announces coldkey swap. + 2. Status shows pending. + 3. Wait until execution block. + 4. Execute swap. + 5. Status clear and root stake moves to new coldkey. + """ + print("Testing coldkey swap with stake ๐Ÿงช") + wallet_path_bob = "//Bob" + wallet_path_new = "//Charlie" + + _, wallet_bob, path_bob, exec_command_bob = wallet_setup(wallet_path_bob) + _, wallet_new, path_new, _ = wallet_setup(wallet_path_new) + netuid = 2 + time.sleep(12) + # Create a new subnet by Bob + create_sn = exec_command_bob( + command="subnets", + sub_command="create", + extra_args=[ + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--wallet-hotkey", + wallet_bob.hotkey_str, + "--chain", + "ws://127.0.0.1:9945", + "--subnet-name", + "Test Subnet CK Swap", + "--repo", + "https://github.com/opentensor/subnet-repo", + "--contact", + "bob@opentensor.dev", + "--url", + "https://subnet.example.com", + "--discord", + "bob#1234", + "--description", + "Subnet for coldkey swap e2e", + "--logo-url", + "https://subnet.example.com/logo.png", + "--additional-info", + "Created for e2e coldkey swap test", + "--no-prompt", + "--json-output", + "--no-mev-protection", + ], + ) + create_payload = json.loads(create_sn.stdout) + assert create_payload["success"] is True + + # Start emission schedule + start_sn = exec_command_bob( + command="subnets", + sub_command="start", + extra_args=[ + "--netuid", + str(2), + "--wallet-name", + wallet_bob.name, + "--wallet-path", + path_bob, + "--network", + "ws://127.0.0.1:9945", + "--no-prompt", + ], + ) + assert "Successfully started subnet" in start_sn.stdout, start_sn.stdout + + # Add stake to the new subnet + stake_add = exec_command_bob( + command="stake", + sub_command="add", + extra_args=[ + "--netuid", + str(netuid), + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--hotkey", + wallet_bob.hotkey_str, + "--chain", + "ws://127.0.0.1:9945", + "--amount", + "5", + "--unsafe", + "--no-prompt", + "--no-mev-protection", + ], + ) + assert "โœ… Finalized" in stake_add.stdout, stake_add.stdout + + # Announce swap + announce = exec_command_bob( + command="wallet", + sub_command="swap-coldkey", + extra_args=[ + "announce", + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--new-coldkey", + wallet_new.coldkeypub.ss58_address, + "--no-prompt", + "--no-mev-protection", + ], + ) + assert "Successfully announced coldkey swap" in announce.stdout, announce.stdout + + # Fetch announcement and wait for execution block + status_json = exec_command_bob( + command="wallet", + sub_command="swap-check", + extra_args=[ + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--json-output", + ], + ) + status_payload = json.loads(status_json.stdout) + assert status_payload["announcements"], status_payload + when = status_payload["announcements"][0]["execution_block"] + _wait_until_block(local_chain, when) + + # Execute swap + execute = exec_command_bob( + command="wallet", + sub_command="swap-coldkey", + extra_args=[ + "execute", + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--new-coldkey", + wallet_new.coldkeypub.ss58_address, + "--no-prompt", + "--no-mev-protection", + ], + ) + assert "Successfully executed coldkey swap" in execute.stdout, execute.stdout + + # Status should clear + status = exec_command_bob( + command="wallet", + sub_command="swap-check", + extra_args=[ + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + ], + ) + assert "No pending swap announcement" in status.stdout, status.stdout + + # Stake should now be on the new coldkey + stake_new = exec_command_bob( + command="stake", + sub_command="list", + extra_args=[ + "--coldkey-ss58", + wallet_new.coldkeypub.ss58_address, + "--network", + "ws://127.0.0.1:9945", + "--json-output", + "--no-prompt", + ], + ) + payload_new = json.loads(stake_new.stdout) + new_entries = find_stake_entries( + payload_new, netuid=netuid, hotkey_ss58=wallet_bob.hotkey.ss58_address + ) + assert len(new_entries) > 0, "Stake not found on new coldkey" + assert float(new_entries[0].get("value", 0)) > 0 + + # Old coldkey should have no stake + stake_old = exec_command_bob( + command="stake", + sub_command="list", + extra_args=[ + "--coldkey-ss58", + wallet_bob.coldkeypub.ss58_address, + "--network", + "ws://127.0.0.1:9945", + "--json-output", + "--no-prompt", + ], + ) + assert not stake_old.stdout, "Old coldkey still has stake" + + +def test_coldkey_swap_dispute(local_chain, wallet_setup): + """ + Dispute path: + 1. Bob announces swap. + 2. Status pending. + 3. Bob disputes immediately. + 4. Execute attempt fails and status shows Disputed. + """ + print("Testing coldkey swap dispute path ๐Ÿงช") + wallet_path_bob = "//Bob" + wallet_path_new = "//Dave" + + _, wallet_bob, path_bob, exec_command_bob = wallet_setup(wallet_path_bob) + _, wallet_new, _, _ = wallet_setup(wallet_path_new) + + time.sleep(12) + # Create subnet, start, and stake on it + create_sn = exec_command_bob( + command="subnets", + sub_command="create", + extra_args=[ + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--wallet-hotkey", + wallet_bob.hotkey_str, + "--chain", + "ws://127.0.0.1:9945", + "--subnet-name", + "Test Subnet CK Swap Dispute", + "--repo", + "https://github.com/opentensor/subnet-repo", + "--contact", + "bob@opentensor.dev", + "--url", + "https://subnet.example.com", + "--discord", + "bob#1234", + "--description", + "Subnet for coldkey swap dispute test", + "--logo-url", + "https://subnet.example.com/logo.png", + "--additional-info", + "Created for e2e coldkey swap dispute test", + "--no-prompt", + "--json-output", + ], + ) + create_payload = json.loads(create_sn.stdout) + assert create_payload.get("success") is True, create_payload + netuid = create_payload["netuid"] + + start_sn = exec_command_bob( + command="subnets", + sub_command="start", + extra_args=[ + "--netuid", + str(netuid), + "--wallet-name", + wallet_bob.name, + "--wallet-path", + path_bob, + "--network", + "ws://127.0.0.1:9945", + "--no-prompt", + ], + ) + assert "Successfully started subnet" in start_sn.stdout, start_sn.stdout + + stake_add = exec_command_bob( + command="stake", + sub_command="add", + extra_args=[ + "--netuid", + str(netuid), + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--hotkey", + wallet_bob.hotkey_str, + "--chain", + "ws://127.0.0.1:9945", + "--amount", + "2", + "--unsafe", + "--no-prompt", + "--no-mev-protection", + ], + ) + assert "โœ… Finalized" in stake_add.stdout, stake_add.stdout + + announce = exec_command_bob( + command="wallet", + sub_command="swap-coldkey", + extra_args=[ + "announce", + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--new-coldkey", + wallet_new.coldkeypub.ss58_address, + "--no-prompt", + "--no-mev-protection", + ], + ) + assert "Successfully announced coldkey swap" in announce.stdout, announce.stdout + + status = exec_command_bob( + command="wallet", + sub_command="swap-check", + extra_args=[ + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--json-output", + ], + ) + status_payload = json.loads(status.stdout) + assert status_payload["announcements"], status_payload + assert status_payload["announcements"][0]["status"] == "pending" + + dispute = exec_command_bob( + command="wallet", + sub_command="swap-coldkey", + extra_args=[ + "dispute", + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--no-prompt", + "--no-mev-protection", + ], + ) + assert "Coldkey swap disputed" in dispute.stdout, dispute.stdout + + status_after_dispute = exec_command_bob( + command="wallet", + sub_command="swap-check", + extra_args=[ + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--json-output", + ], + ) + status_payload_after_dispute = json.loads(status_after_dispute.stdout) + if status_payload_after_dispute["announcements"]: + when = status_payload_after_dispute["announcements"][0]["execution_block"] + _wait_until_block(local_chain, when) + + execute = exec_command_bob( + command="wallet", + sub_command="swap-coldkey", + extra_args=[ + "execute", + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--new-coldkey", + wallet_new.coldkeypub.ss58_address, + "--no-prompt", + ], + ) + assert "The account is frozen" in execute.stderr, execute.stderr + + status_after = exec_command_bob( + command="wallet", + sub_command="swap-check", + extra_args=[ + "--wallet-path", + path_bob, + "--wallet-name", + wallet_bob.name, + "--network", + "ws://127.0.0.1:9945", + "--json-output", + ], + ) + status_after_payload = json.loads(status_after.stdout) + assert status_after_payload["announcements"], status_after_payload + assert status_after_payload["announcements"][0]["status"] == "disputed" diff --git a/tests/e2e_tests/test_stake_burn.py b/tests/e2e_tests/test_stake_burn.py new file mode 100644 index 000000000..a85f24137 --- /dev/null +++ b/tests/e2e_tests/test_stake_burn.py @@ -0,0 +1,165 @@ +import json +import time + +import pytest + +from .utils import extract_coldkey_balance + + +@pytest.mark.parametrize("local_chain", [False], indirect=True) +def test_stake_burn(local_chain, wallet_setup): + """ + Test stake burn + 1. Create a subnet + 2. Start the subnet's emission schedule + 3. Buyback the subnet (stake burn) + 3. Check the balance before and after the buyback upon success + 4. Try to buyback again and expect it to fail due to rate limit + """ + + _, wallet_alice, wallet_path_alice, exec_command_alice = wallet_setup("//Alice") + time.sleep(12) + netuid = 2 + result = exec_command_alice( + command="subnets", + sub_command="create", + extra_args=[ + "--wallet-path", + wallet_path_alice, + "--chain", + "ws://127.0.0.1:9945", + "--wallet-name", + wallet_alice.name, + "--wallet-hotkey", + wallet_alice.hotkey_str, + "--subnet-name", + "Test Subnet", + "--repo", + "https://github.com/username/repo", + "--contact", + "test@opentensor.dev", + "--url", + "https://testsubnet.com", + "--discord", + "test#1234", + "--description", + "A test subnet for e2e testing", + "--logo-url", + "https://testsubnet.com/logo.png", + "--additional-info", + "Test subnet", + "--no-prompt", + "--no-mev-protection", + ], + ) + assert "โœ… Registered subnetwork with netuid: 2" in result.stdout + + # Start the subnet's emission schedule + start_call_netuid_2 = exec_command_alice( + command="subnets", + sub_command="start", + extra_args=[ + "--netuid", + str(netuid), + "--wallet-name", + wallet_alice.name, + "--no-prompt", + "--chain", + "ws://127.0.0.1:9945", + "--wallet-path", + wallet_path_alice, + ], + ) + assert ( + "Successfully started subnet 2's emission schedule." + in start_call_netuid_2.stdout + ) + assert "Your extrinsic has been included" in start_call_netuid_2.stdout + time.sleep(2) + + # Balance before buyback + _balance_before = exec_command_alice( + "wallet", + "balance", + extra_args=[ + "--wallet-path", + wallet_path_alice, + "--wallet-name", + wallet_alice.name, + "--network", + "ws://127.0.0.1:9945", + ], + ) + balance_before = extract_coldkey_balance( + _balance_before.stdout, wallet_alice.name, wallet_alice.coldkey.ss58_address + )["free_balance"] + + # First stake burn + amount_tao = 1.0 + stake_burn_result = exec_command_alice( + "sudo", + "stake-burn", + extra_args=[ + "--wallet-path", + wallet_path_alice, + "--wallet-name", + wallet_alice.name, + "--wallet-hotkey", + wallet_alice.hotkey_str, + "--network", + "ws://127.0.0.1:9945", + "--netuid", + str(netuid), + "--amount", + str(amount_tao), + "--no-prompt", + "--json-output", + ], + ) + stale_burn_ok_out = json.loads(stake_burn_result.stdout) + assert stale_burn_ok_out["success"] is True, stake_burn_result.stdout + + # Balance after stake burn + _balance_after = exec_command_alice( + "wallet", + "balance", + extra_args=[ + "--wallet-path", + wallet_path_alice, + "--wallet-name", + wallet_alice.name, + "--network", + "ws://127.0.0.1:9945", + ], + ) + balance_after = extract_coldkey_balance( + _balance_after.stdout, wallet_alice.name, wallet_alice.coldkey.ss58_address + )["free_balance"] + assert balance_after < balance_before, (balance_before, balance_after) + + # Should fail due to rate limit + stake_burn_ratelimited_result = exec_command_alice( + "sudo", + "buyback", + extra_args=[ + "--wallet-path", + wallet_path_alice, + "--wallet-name", + wallet_alice.name, + "--wallet-hotkey", + wallet_alice.hotkey_str, + "--network", + "ws://127.0.0.1:9945", + "--netuid", + str(netuid), + "--amount", + str(amount_tao), + "--no-prompt", + "--json-output", + ], + ) + stake_burn_ratelimited = json.loads(stake_burn_ratelimited_result.stdout) + assert stake_burn_ratelimited["success"] is False, ( + stake_burn_ratelimited_result.stdout + ) + assert "AddStakeBurnRateLimitExceeded" in stake_burn_ratelimited["message"] diff --git a/tests/e2e_tests/test_staking_sudo.py b/tests/e2e_tests/test_staking_sudo.py index e76ff1627..0e0cc65a4 100644 --- a/tests/e2e_tests/test_staking_sudo.py +++ b/tests/e2e_tests/test_staking_sudo.py @@ -297,8 +297,8 @@ def test_staking(local_chain, wallet_setup): assert str(netuid) in get_s_price_output.keys() stats = get_s_price_output[str(netuid)]["stats"] assert stats["name"] == sn_name - assert stats["current_price"] == 0.0 - assert stats["market_cap"] == 0.0 + assert stats["current_price"] == 1 + assert stats["market_cap"] == 1_000 # Start emissions on SNs for netuid_ in multiple_netuids: diff --git a/tests/e2e_tests/test_wallet_interactions.py b/tests/e2e_tests/test_wallet_interactions.py index 7ed705b65..02797b978 100644 --- a/tests/e2e_tests/test_wallet_interactions.py +++ b/tests/e2e_tests/test_wallet_interactions.py @@ -447,17 +447,17 @@ def test_wallet_identities(local_chain, wallet_setup): assert "Your extrinsic has been included as" in set_id_output[1] - assert alice_identity["name"] in set_id_output[7] - assert alice_identity["url"] in set_id_output[8] - assert alice_identity["github_repo"] in set_id_output[9] - assert alice_identity["image"] in set_id_output[10] - assert alice_identity["discord"] in set_id_output[11] - assert alice_identity["description"] in set_id_output[12] - assert alice_identity["additional"] in set_id_output[13] + assert alice_identity["name"] in set_id_output[8] + assert alice_identity["url"] in set_id_output[9] + assert alice_identity["github_repo"] in set_id_output[10] + assert alice_identity["image"] in set_id_output[11] + assert alice_identity["discord"] in set_id_output[12] + assert alice_identity["description"] in set_id_output[13] + assert alice_identity["additional"] in set_id_output[14] # TODO: Currently coldkey + hotkey are the same for test wallets. # Maybe we can add a new key to help in distinguishing - assert wallet_alice.coldkeypub.ss58_address in set_id_output[6] + assert wallet_alice.coldkeypub.ss58_address in set_id_output[7] # Execute btcli get-identity using hotkey get_identity = exec_command_alice( diff --git a/tests/unit_tests/test_cli.py b/tests/unit_tests/test_cli.py index 60cc10708..fbd4130db 100644 --- a/tests/unit_tests/test_cli.py +++ b/tests/unit_tests/test_cli.py @@ -4,6 +4,7 @@ from async_substrate_interface import AsyncSubstrateInterface from bittensor_cli.cli import parse_mnemonic, CLIManager +from bittensor_cli.src import HYPERPARAMS, HYPERPARAMS_METADATA, RootSudoOnly from bittensor_cli.src.bittensor.extrinsics.root import ( get_current_weights_for_uid, set_root_weights_extrinsic, @@ -551,46 +552,6 @@ def test_wallet_set_id_calls_proxy_validation(): mock_proxy_validation.assert_called_once_with(valid_proxy, False) -def test_wallet_swap_coldkey_calls_proxy_validation(): - """Test that wallet_swap_coldkey calls is_valid_proxy_name_or_ss58""" - cli_manager = CLIManager() - valid_proxy = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" - new_coldkey = "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" - - with ( - patch.object(cli_manager, "verbosity_handler"), - patch.object(cli_manager, "wallet_ask") as mock_wallet_ask, - patch.object(cli_manager, "initialize_chain"), - patch.object(cli_manager, "_run_command"), - patch("bittensor_cli.cli.is_valid_ss58_address", return_value=True), - patch.object( - cli_manager, "is_valid_proxy_name_or_ss58", return_value=valid_proxy - ) as mock_proxy_validation, - ): - mock_wallet = Mock() - mock_wallet.coldkeypub = Mock() - mock_wallet.coldkeypub.ss58_address = ( - "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty" - ) - mock_wallet_ask.return_value = mock_wallet - - cli_manager.wallet_swap_coldkey( - wallet_name="test_wallet", - wallet_path="/tmp/test", - wallet_hotkey="test_hotkey", - new_wallet_or_ss58=new_coldkey, - network=None, - proxy=valid_proxy, - announce_only=False, - quiet=True, - verbose=False, - force_swap=False, - ) - - # Assert that proxy validation was called - mock_proxy_validation.assert_called_once_with(valid_proxy, False) - - def test_stake_move_calls_proxy_validation(): """Test that stake_move calls is_valid_proxy_name_or_ss58""" cli_manager = CLIManager() @@ -805,3 +766,37 @@ async def test_set_root_weights_skips_current_weights_without_prompt(): ) mock_get_current.assert_not_called() + + +# HYPERPARAMS / HYPERPARAMS_METADATA (issue #826) +NEW_HYPERPARAMS_826 = {"sn_owner_hotkey", "subnet_owner_hotkey", "recycle_or_burn"} + + +def test_new_hyperparams_in_hyperparams(): + for key in NEW_HYPERPARAMS_826: + assert key in HYPERPARAMS, f"{key} should be in HYPERPARAMS" + extrinsic, root_only = HYPERPARAMS[key] + assert extrinsic, f"{key} must have non-empty extrinsic name" + assert root_only is RootSudoOnly.FALSE + + +def test_subnet_owner_hotkey_alias_maps_to_same_extrinsic(): + ext_sn, _ = HYPERPARAMS["sn_owner_hotkey"] + ext_subnet, _ = HYPERPARAMS["subnet_owner_hotkey"] + assert ext_sn == ext_subnet == "sudo_set_sn_owner_hotkey" + + +def test_new_hyperparams_have_metadata(): + required = {"description", "side_effects", "owner_settable", "docs_link"} + for key in NEW_HYPERPARAMS_826: + assert key in HYPERPARAMS_METADATA, f"{key} should be in HYPERPARAMS_METADATA" + meta = HYPERPARAMS_METADATA[key] + for field in required: + assert field in meta, f"{key} metadata missing '{field}'" + assert isinstance(meta["description"], str) + assert isinstance(meta["owner_settable"], bool) + + +def test_new_hyperparams_owner_settable_true(): + for key in NEW_HYPERPARAMS_826: + assert HYPERPARAMS_METADATA[key]["owner_settable"] is True diff --git a/tests/unit_tests/test_hyperparams.py b/tests/unit_tests/test_hyperparams.py new file mode 100644 index 000000000..8f6b42647 --- /dev/null +++ b/tests/unit_tests/test_hyperparams.py @@ -0,0 +1,40 @@ +"""Unit tests for HYPERPARAMS and HYPERPARAMS_METADATA (issue #826).""" + +from bittensor_cli.src import HYPERPARAMS, HYPERPARAMS_METADATA, RootSudoOnly + + +NEW_HYPERPARAMS_826 = { + "sn_owner_hotkey", + "subnet_owner_hotkey", + "recycle_or_burn", +} + + +def test_new_hyperparams_in_hyperparams(): + for key in NEW_HYPERPARAMS_826: + assert key in HYPERPARAMS, f"{key} should be in HYPERPARAMS" + extrinsic, root_only = HYPERPARAMS[key] + assert extrinsic, f"{key} must have non-empty extrinsic name" + assert root_only is RootSudoOnly.FALSE + + +def test_subnet_owner_hotkey_alias_maps_to_same_extrinsic(): + ext_sn, _ = HYPERPARAMS["sn_owner_hotkey"] + ext_subnet, _ = HYPERPARAMS["subnet_owner_hotkey"] + assert ext_sn == ext_subnet == "sudo_set_sn_owner_hotkey" + + +def test_new_hyperparams_have_metadata(): + required = {"description", "side_effects", "owner_settable", "docs_link"} + for key in NEW_HYPERPARAMS_826: + assert key in HYPERPARAMS_METADATA, f"{key} should be in HYPERPARAMS_METADATA" + meta = HYPERPARAMS_METADATA[key] + for field in required: + assert field in meta, f"{key} metadata missing '{field}'" + assert isinstance(meta["description"], str) + assert isinstance(meta["owner_settable"], bool) + + +def test_new_hyperparams_owner_settable_true(): + for key in NEW_HYPERPARAMS_826: + assert HYPERPARAMS_METADATA[key]["owner_settable"] is True