diff --git a/.coverage b/.coverage deleted file mode 100644 index 20756cc..0000000 Binary files a/.coverage and /dev/null differ diff --git a/.gitignore b/.gitignore index f7c71d9..aa16a07 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,9 @@ dropkit/_version.txt # Virtual environments .venv +# Coverage +.coverage + # Git worktrees .worktrees/ diff --git a/dropkit/api.py b/dropkit/api.py index afd62d9..6830d48 100644 --- a/dropkit/api.py +++ b/dropkit/api.py @@ -875,16 +875,24 @@ def tag_resource(self, tag_name: str, resource_id: str, resource_type: str) -> N Raises: DigitalOceanAPIError: If tagging fails """ - payload = { - "resources": [ - { - "resource_id": resource_id, - "resource_type": resource_type, - } - ] - } + payload = {"resources": [{"resource_id": resource_id, "resource_type": resource_type}]} self._request("POST", f"/tags/{tag_name}/resources", json=payload) + def untag_resource(self, tag_name: str, resource_id: str, resource_type: str) -> None: + """ + Remove a tag from a resource. + + Args: + tag_name: Tag name to remove + resource_id: Resource ID (string for snapshots/images) + resource_type: Resource type ('image' for snapshots, 'droplet', etc.) + + Raises: + DigitalOceanAPIError: If untagging fails + """ + payload = {"resources": [{"resource_id": resource_id, "resource_type": resource_type}]} + self._request("DELETE", f"/tags/{tag_name}/resources", json=payload) + def create_tag(self, tag_name: str) -> dict[str, Any]: """ Create a tag if it doesn't exist. diff --git a/dropkit/main.py b/dropkit/main.py index 12ac7e4..38ea830 100644 --- a/dropkit/main.py +++ b/dropkit/main.py @@ -180,6 +180,25 @@ def complete_snapshot_name(incomplete: str) -> list[str]: return [] +def complete_droplet_or_snapshot_name(incomplete: str) -> list[str]: + """ + Autocompletion function for both live droplet and hibernated snapshot names. + + Combines results from complete_droplet_name and complete_snapshot_name, + deduplicating names that appear in both. + + Args: + incomplete: Partial text entered by the user + + Returns: + List of matching names (live droplets and hibernated snapshots) + """ + droplet_names = complete_droplet_name(incomplete) + snapshot_names = complete_snapshot_name(incomplete) + # Deduplicate while preserving order (live droplets first) + return list(dict.fromkeys(droplet_names + snapshot_names)) + + def load_config_and_api() -> tuple[Config, DigitalOceanAPI]: """ Load configuration and create API client. @@ -1493,7 +1512,7 @@ def setup_tailscale( return tailscale_ip -def find_user_droplet(api: DigitalOceanAPI, droplet_name: str) -> tuple[dict | None, str | None]: +def find_user_droplet(api: DigitalOceanAPI, droplet_name: str) -> tuple[dict | None, str]: """ Find a droplet by name, filtered by current user's tag. @@ -1526,7 +1545,7 @@ def find_user_droplet(api: DigitalOceanAPI, droplet_name: str) -> tuple[dict | N if droplet.get("name") == droplet_name: return droplet, username - return None, None + return None, username def find_project_by_name_or_id( @@ -2270,7 +2289,8 @@ def list_droplets(): console.print("[bold]Hibernated:[/bold]") snap_table = Table(show_header=True, header_style="bold cyan") snap_table.add_column("Name", style="white", no_wrap=True) - snap_table.add_column("Size", style="white", no_wrap=True) + snap_table.add_column("Droplet Size", style="white", no_wrap=True) + snap_table.add_column("Image Size", style="white", no_wrap=True) snap_table.add_column("Region", style="white", no_wrap=True) for snapshot in hibernated: @@ -2278,11 +2298,17 @@ def list_droplets(): # Extract droplet name from snapshot name (remove "dropkit-" prefix) droplet_name = get_droplet_name_from_snapshot(snapshot_name) or snapshot_name + # Extract droplet size slug from size: tag + droplet_size = "N/A" + for tag in snapshot.get("tags", []): + if tag.startswith("size:"): + droplet_size = tag.removeprefix("size:") + size_gb = snapshot.get("size_gigabytes", 0) regions = snapshot.get("regions", []) region = regions[0] if regions else "N/A" - snap_table.add_row(droplet_name, f"{size_gb} GB", region) + snap_table.add_row(droplet_name, droplet_size, f"{size_gb} GB", region) console.print(snap_table) console.print() @@ -2326,8 +2352,8 @@ def config_ssh( console.print(f"[dim]Looking for droplet: [cyan]{droplet_name}[/cyan][/dim]") droplet, username = find_user_droplet(api, droplet_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" + if not droplet: + tag = get_user_tag(username) console.print(f"[red]Error: Droplet '{droplet_name}' not found with tag {tag}[/red]") raise typer.Exit(1) @@ -2409,8 +2435,8 @@ def info(droplet_name: str = typer.Argument(..., autocompletion=complete_droplet console.print(f"[dim]Looking for droplet: [cyan]{droplet_name}[/cyan][/dim]\n") droplet, username = find_user_droplet(api, droplet_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" + if not droplet: + tag = get_user_tag(username) console.print(f"[red]Error: Droplet '{droplet_name}' not found with tag {tag}[/red]") raise typer.Exit(1) @@ -2566,14 +2592,6 @@ def destroy(droplet_name: str = typer.Argument(..., autocompletion=complete_drop # If no droplet found, check for hibernated snapshot if not droplet: - # Get username if we don't have it - if not username: - try: - username = api.get_username() - except DigitalOceanAPIError as e: - console.print(f"[red]Error fetching username from DigitalOcean: {e}[/red]") - raise typer.Exit(1) - # Check for hibernated snapshot snapshot_name = get_snapshot_name(droplet_name) user_tag = get_user_tag(username) @@ -2869,6 +2887,131 @@ def _destroy_hibernated_snapshot( ) +def _resize_hibernated_snapshot( + api: DigitalOceanAPI, + snapshot: dict, + droplet_name: str, + size: str | None, +) -> None: + """ + Handle resizing of a hibernated snapshot by swapping its size: tag. + + This is called by the resize command when no live droplet is found but a + hibernated snapshot exists. The resize is instant — it just updates the + size: tag so the next wake uses the new size. + """ + raw_id = snapshot.get("id") + if not raw_id: + console.print("[red]Error: Could not determine snapshot ID[/red]") + raise typer.Exit(1) + snapshot_id = str(raw_id) + + # Read current size from tags + current_size_slug = None + for tag in snapshot.get("tags", []): + if tag.startswith("size:"): + current_size_slug = tag.removeprefix("size:") + + if not current_size_slug: + console.print("[yellow]Could not determine current size from snapshot tags.[/yellow]") + console.print("[dim]The snapshot may not have a size tag. Try waking it first.[/dim]") + raise typer.Exit(1) + + # Display header + console.print( + Panel.fit( + f"[bold cyan]RESIZE HIBERNATED SNAPSHOT: {droplet_name}[/bold cyan]", + border_style="cyan", + ) + ) + + console.print( + "[dim]No active droplet found, but found a hibernated snapshot.\n" + "Resizing a hibernated snapshot is instant — it updates the size tag\n" + "so the next wake creates the droplet with the new size.[/dim]\n" + ) + + # Display current size + console.print(f"[bold]Current Size:[/bold] [cyan]{current_size_slug}[/cyan]") + + # Fetch available sizes once (needed for both interactive prompt and validation) + try: + available_sizes = api.get_available_sizes() + except DigitalOceanAPIError as e: + console.print(f"[red]Error fetching sizes: {e}[/red]") + raise typer.Exit(1) + + # Get new size (interactive if not provided) + if size is None: + new_size_slug = prompt_with_help( + "\n[bold]New size[/bold]", + default=current_size_slug, + display_func=display_sizes, + data=available_sizes, + ) + else: + new_size_slug = size + + # Check if same size + if new_size_slug == current_size_slug: + console.print( + f"\n[yellow]New size is the same as current size ({current_size_slug})[/yellow]" + ) + console.print("[dim]No resize needed.[/dim]") + raise typer.Exit(0) + + # Validate the new size exists + new_size_info = next((s for s in available_sizes if s.get("slug") == new_size_slug), None) + if not new_size_info: + console.print(f"[red]Error: Size '{new_size_slug}' not found or not available[/red]") + raise typer.Exit(1) + + # Display new size details + console.print(f"\n[bold]New Size:[/bold] [cyan]{new_size_slug}[/cyan]") + new_table = Table(show_header=False, box=None, padding=(0, 2)) + new_table.add_column(style="dim") + new_table.add_column(style="white") + + new_table.add_row("vCPUs:", str(new_size_info.get("vcpus", "N/A"))) + new_table.add_row("Memory:", f"{new_size_info.get('memory', 'N/A')} MB") + new_table.add_row("Disk:", f"{new_size_info.get('disk', 'N/A')} GB") + new_table.add_row("Price:", f"${new_size_info.get('price_monthly', 0):.2f}/month") + + console.print(new_table) + + # Confirmation + console.print() + confirm = Prompt.ask( + "[yellow]Are you sure you want to resize this hibernated snapshot?[/yellow]", + choices=["yes", "no"], + default="no", + ) + + if confirm != "yes": + console.print("[dim]Cancelled.[/dim]") + raise typer.Exit(0) + + # Swap the size tag: add new first, then remove old + # This ensures the snapshot always has at least one size: tag even if we crash mid-operation + console.print() + console.print("[dim]Updating size tag...[/dim]") + + new_tag = f"size:{new_size_slug}" + old_tag = f"size:{current_size_slug}" + + api.create_tag(new_tag) + api.tag_resource(new_tag, snapshot_id, "image") + api.untag_resource(old_tag, snapshot_id, "image") + + console.print("[green]✓[/green] Size tag updated") + console.print() + console.print( + f"[bold green]Hibernated snapshot '{droplet_name}' resized from " + f"{current_size_slug} to {new_size_slug}[/bold green]" + ) + console.print(f"[dim]Next wake will create the droplet with size {new_size_slug}.[/dim]") + + @app.command() @requires_lock("rename") def rename( @@ -2890,8 +3033,8 @@ def rename( console.print(f"[dim]Looking for droplet: [cyan]{old_name}[/cyan][/dim]\n") droplet, username = find_user_droplet(api, old_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" + if not droplet: + tag = get_user_tag(username) console.print(f"[red]Error: Droplet '{old_name}' not found with tag {tag}[/red]") raise typer.Exit(1) @@ -3032,17 +3175,23 @@ def rename( @app.command() @requires_lock("resize") def resize( - droplet_name: str = typer.Argument(..., autocompletion=complete_droplet_name), + droplet_name: str = typer.Argument(..., autocompletion=complete_droplet_or_snapshot_name), size: str | None = typer.Option(None, "--size", "-s", help="New size slug (e.g., s-4vcpu-8gb)"), disk: bool = typer.Option( True, "--disk/--no-disk", help="Resize disk (permanent, default: True)" ), ): """ - Resize a droplet (causes downtime - requires power off). + Resize a droplet or hibernated snapshot. - This will change the droplet's vCPUs, memory, and optionally disk size. - Only droplets tagged with owner: can be resized. + For live droplets, this causes downtime (requires power off) and changes + the droplet's vCPUs, memory, and optionally disk size. + + For hibernated snapshots, this is instant — it updates the size tag so the + next wake creates the droplet with the new size. The --disk/--no-disk flag + is ignored for hibernated snapshots. + + Only resources tagged with owner: can be resized. """ try: # Load config and API @@ -3052,9 +3201,21 @@ def resize( console.print(f"[dim]Looking for droplet: [cyan]{droplet_name}[/cyan][/dim]\n") droplet, username = find_user_droplet(api, droplet_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" - console.print(f"[red]Error: Droplet '{droplet_name}' not found with tag {tag}[/red]") + # If no droplet found, check for hibernated snapshot + if not droplet: + snapshot_name = get_snapshot_name(droplet_name) + user_tag = get_user_tag(username) + snapshot = api.get_snapshot_by_name(snapshot_name, tag=user_tag) + + if snapshot: + _resize_hibernated_snapshot(api, snapshot, droplet_name, size) + return + + console.print( + f"[red]Error: No droplet or hibernated snapshot found for '{droplet_name}'[/red]" + ) + console.print(f"[dim]Checked for droplet with tag: {user_tag}[/dim]") + console.print(f"[dim]Checked for snapshot named: {snapshot_name}[/dim]") raise typer.Exit(1) # Get detailed droplet info @@ -3296,8 +3457,8 @@ def on(droplet_name: str = typer.Argument(..., autocompletion=complete_droplet_n console.print(f"[dim]Looking for droplet: [cyan]{droplet_name}[/cyan][/dim]\n") droplet, username = find_user_droplet(api, droplet_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" + if not droplet: + tag = get_user_tag(username) console.print(f"[red]Error: Droplet '{droplet_name}' not found with tag {tag}[/red]") raise typer.Exit(1) @@ -3367,8 +3528,8 @@ def off(droplet_name: str = typer.Argument(..., autocompletion=complete_droplet_ console.print(f"[dim]Looking for droplet: [cyan]{droplet_name}[/cyan][/dim]\n") droplet, username = find_user_droplet(api, droplet_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" + if not droplet: + tag = get_user_tag(username) console.print(f"[red]Error: Droplet '{droplet_name}' not found with tag {tag}[/red]") raise typer.Exit(1) @@ -3473,8 +3634,8 @@ def hibernate( console.print(f"[dim]Looking for droplet: [cyan]{droplet_name}[/cyan][/dim]\n") droplet, username = find_user_droplet(api, droplet_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" + if not droplet: + tag = get_user_tag(username) console.print(f"[red]Error: Droplet '{droplet_name}' not found with tag {tag}[/red]") raise typer.Exit(1) @@ -3753,7 +3914,7 @@ def wake( was_tailscale_locked = False for tag in tags: if tag.startswith("size:"): - original_size = tag[5:] # Remove "size:" prefix + original_size = tag.removeprefix("size:") elif tag == "tailscale-lockdown": was_tailscale_locked = True @@ -3928,8 +4089,8 @@ def enable_tailscale( console.print(f"[dim]Looking for droplet: [cyan]{droplet_name}[/cyan][/dim]\n") droplet, username = find_user_droplet(api, droplet_name) - if not droplet or not username: - tag = get_user_tag(username) if username else "owner:" + if not droplet: + tag = get_user_tag(username) console.print(f"[red]Error: Droplet '{droplet_name}' not found with tag {tag}[/red]") raise typer.Exit(1) diff --git a/tests/test_api.py b/tests/test_api.py index 5e564ac..a991cba 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,5 +1,7 @@ """Tests for DigitalOcean API client.""" +from unittest.mock import patch + import pytest from dropkit.api import DigitalOceanAPI @@ -125,3 +127,45 @@ def test_rename_droplet_invalid_id_negative(self): api = DigitalOceanAPI("fake-token") with pytest.raises(ValueError, match="droplet_id must be a positive integer"): api.rename_droplet(-1, "new-name") + + +class TestUntagResource: + """Tests for untag_resource method.""" + + @patch.object(DigitalOceanAPI, "_request") + def test_untag_resource_calls_delete(self, mock_request): + """Test that untag_resource sends DELETE with correct payload.""" + api = DigitalOceanAPI("fake-token") + api.untag_resource("size:s-1vcpu-1gb", "12345", "image") + + mock_request.assert_called_once_with( + "DELETE", + "/tags/size:s-1vcpu-1gb/resources", + json={ + "resources": [ + { + "resource_id": "12345", + "resource_type": "image", + } + ] + }, + ) + + @patch.object(DigitalOceanAPI, "_request") + def test_untag_resource_droplet_type(self, mock_request): + """Test untag_resource with droplet resource type.""" + api = DigitalOceanAPI("fake-token") + api.untag_resource("owner:testuser", "67890", "droplet") + + mock_request.assert_called_once_with( + "DELETE", + "/tags/owner:testuser/resources", + json={ + "resources": [ + { + "resource_id": "67890", + "resource_type": "droplet", + } + ] + }, + ) diff --git a/tests/test_main_helpers.py b/tests/test_main_helpers.py index d4af64c..054cdf7 100644 --- a/tests/test_main_helpers.py +++ b/tests/test_main_helpers.py @@ -4,10 +4,13 @@ from unittest.mock import MagicMock, patch import pytest +import typer from dropkit.main import ( + _resize_hibernated_snapshot, add_temporary_ssh_rule, build_droplet_tags, + complete_droplet_or_snapshot_name, find_snapshot_action, get_droplet_name_from_snapshot, get_snapshot_name, @@ -358,3 +361,102 @@ def test_temp_rule_failure_skips_logout( mock_logout.assert_not_called() # And SSH config should NOT be updated (early return) mock_add_ssh_host.assert_not_called() + + +class TestResizeHibernatedSnapshot: + """Tests for _resize_hibernated_snapshot function.""" + + def test_no_snapshot_id_exits(self): + """Test exits with error when snapshot has no ID.""" + mock_api = MagicMock() + snapshot = {"tags": ["size:s-1vcpu-1gb"]} + + with pytest.raises(typer.Exit): + _resize_hibernated_snapshot(mock_api, snapshot, "myvm", "s-2vcpu-4gb") + + def test_no_size_tag_exits(self): + """Test exits with error when snapshot has no size: tag.""" + mock_api = MagicMock() + snapshot = {"id": "12345", "tags": ["owner:testuser", "firewall"]} + + with pytest.raises(typer.Exit): + _resize_hibernated_snapshot(mock_api, snapshot, "myvm", "s-2vcpu-4gb") + + @patch("dropkit.main.Prompt.ask", return_value="yes") + def test_same_size_exits(self, mock_prompt): + """Test exits when new size matches current size.""" + mock_api = MagicMock() + snapshot = {"id": "12345", "tags": ["size:s-1vcpu-1gb", "owner:testuser"]} + + with pytest.raises(typer.Exit): + _resize_hibernated_snapshot(mock_api, snapshot, "myvm", "s-1vcpu-1gb") + + @patch("dropkit.main.Prompt.ask", return_value="yes") + def test_successful_resize_swaps_tags(self, mock_prompt): + """Test successful resize creates new tag, tags resource, then untags old.""" + mock_api = MagicMock() + mock_api.get_available_sizes.return_value = [ + {"slug": "s-2vcpu-4gb", "vcpus": 2, "memory": 4096, "disk": 80, "price_monthly": 24}, + ] + snapshot = {"id": "12345", "tags": ["size:s-1vcpu-1gb", "owner:testuser"]} + + _resize_hibernated_snapshot(mock_api, snapshot, "myvm", "s-2vcpu-4gb") + + # Verify tag operations: add new first, then remove old + mock_api.create_tag.assert_called_once_with("size:s-2vcpu-4gb") + mock_api.tag_resource.assert_called_once_with("size:s-2vcpu-4gb", "12345", "image") + mock_api.untag_resource.assert_called_once_with("size:s-1vcpu-1gb", "12345", "image") + + @patch("dropkit.main.Prompt.ask", return_value="no") + def test_cancelled_resize_no_api_calls(self, mock_prompt): + """Test that cancelling resize makes no tag API calls.""" + mock_api = MagicMock() + mock_api.get_available_sizes.return_value = [ + {"slug": "s-2vcpu-4gb", "vcpus": 2, "memory": 4096, "disk": 80, "price_monthly": 24}, + ] + snapshot = {"id": "12345", "tags": ["size:s-1vcpu-1gb", "owner:testuser"]} + + with pytest.raises(typer.Exit): + _resize_hibernated_snapshot(mock_api, snapshot, "myvm", "s-2vcpu-4gb") + + mock_api.create_tag.assert_not_called() + mock_api.tag_resource.assert_not_called() + mock_api.untag_resource.assert_not_called() + + @patch("dropkit.main.Prompt.ask", return_value="yes") + def test_invalid_size_exits(self, mock_prompt): + """Test exits when provided size slug doesn't exist.""" + mock_api = MagicMock() + mock_api.get_available_sizes.return_value = [ + {"slug": "s-1vcpu-1gb", "vcpus": 1, "memory": 1024, "disk": 25, "price_monthly": 6}, + ] + snapshot = {"id": "12345", "tags": ["size:s-1vcpu-1gb", "owner:testuser"]} + + with pytest.raises(typer.Exit): + _resize_hibernated_snapshot(mock_api, snapshot, "myvm", "nonexistent-size") + + +class TestCompleteDropletOrSnapshotName: + """Tests for complete_droplet_or_snapshot_name function.""" + + @patch("dropkit.main.complete_snapshot_name", return_value=["snap-vm"]) + @patch("dropkit.main.complete_droplet_name", return_value=["live-vm"]) + def test_combines_both_sources(self, mock_droplet, mock_snapshot): + """Test that results from both completers are combined.""" + result = complete_droplet_or_snapshot_name("") + assert "live-vm" in result + assert "snap-vm" in result + + @patch("dropkit.main.complete_snapshot_name", return_value=["shared-vm"]) + @patch("dropkit.main.complete_droplet_name", return_value=["shared-vm"]) + def test_deduplicates(self, mock_droplet, mock_snapshot): + """Test that duplicate names appear only once.""" + result = complete_droplet_or_snapshot_name("") + assert result.count("shared-vm") == 1 + + @patch("dropkit.main.complete_snapshot_name", return_value=["snap-vm"]) + @patch("dropkit.main.complete_droplet_name", return_value=["live-vm"]) + def test_droplets_first(self, mock_droplet, mock_snapshot): + """Test that live droplet names appear before snapshot names.""" + result = complete_droplet_or_snapshot_name("") + assert result.index("live-vm") < result.index("snap-vm")