diff --git a/src/agentspaces/cli/formatters.py b/src/agentspaces/cli/formatters.py index 92ce793..7df9ecb 100644 --- a/src/agentspaces/cli/formatters.py +++ b/src/agentspaces/cli/formatters.py @@ -11,6 +11,7 @@ from rich.table import Table if TYPE_CHECKING: + from agentspaces.infrastructure.beads import BeadsIssue from agentspaces.modules.workspace.service import WorkspaceInfo __all__ = [ @@ -20,10 +21,12 @@ "print_did_you_mean", "print_error", "print_info", + "print_issue_next_steps", "print_next_steps", "print_success", "print_warning", "print_workspace_created", + "print_workspace_created_from_issue", "print_workspace_removed", "print_workspace_status", "print_workspace_table", @@ -123,6 +126,86 @@ def print_next_steps(workspace_path: str, has_venv: bool) -> None: console.print(panel) +def print_workspace_created_from_issue( + issue: BeadsIssue, + workspace: WorkspaceInfo, +) -> None: + """Print workspace creation summary for issue-based workspace. + + Shows issue context prominently and guides user to claude invocation. + + Args: + issue: Beads issue the workspace was created for. + workspace: Created workspace information. + """ + # Issue context header + console.print() + console.print(f"[bold cyan]Issue:[/bold cyan] {issue.id}") + console.print(f"[bold]Title:[/bold] {issue.title}") + console.print( + f"[bold]Type:[/bold] {issue.issue_type} [bold]Priority:[/bold] {issue.priority}" + ) + console.print() + + # Workspace details panel + lines = [ + f"[bold]Name:[/bold] {workspace.name}", + f"[bold]Location:[/bold] {workspace.path}", + f"[bold]Branch:[/bold] {workspace.branch} (from {workspace.base_branch})", + ] + + if workspace.has_venv: + version_str = workspace.python_version or "default" + lines.append(f"[bold]Python:[/bold] {version_str} (.venv created)") + elif workspace.python_version: + lines.append(f"[bold]Python:[/bold] {workspace.python_version}") + + panel = Panel( + "\n".join(lines), + title="[green]Workspace Created[/green]", + border_style="green", + ) + console.print(panel) + + +def print_issue_next_steps( + workspace_path: str, + issue_id: str, + has_venv: bool, +) -> None: + """Print next steps for issue-based workspace. + + Displays a copyable command string that includes: + - cd to workspace + - venv activation (if applicable) + - claude invocation with 'plan' prompt and issue ID + + Args: + workspace_path: Path to the workspace directory. + issue_id: Beads issue ID. + has_venv: Whether a virtual environment was created. + """ + # Quote values for shell safety + quoted_path = shlex.quote(workspace_path) + quoted_issue = shlex.quote(issue_id) + + # Build command string + commands = [f"cd {quoted_path}"] + if has_venv: + commands.append("source .venv/bin/activate") + commands.append(f"claude 'plan' {quoted_issue}") + + command_str = " && ".join(commands) + + # Display as copyable command + panel = Panel( + f"[cyan]{command_str}[/cyan]", + title="[blue]Next Steps[/blue]", + border_style="blue", + ) + console.print(panel) + + def format_relative_time(dt: datetime | None) -> str: """Format datetime as relative time string. diff --git a/src/agentspaces/cli/tui.py b/src/agentspaces/cli/tui.py index aa57c2c..add18e1 100644 --- a/src/agentspaces/cli/tui.py +++ b/src/agentspaces/cli/tui.py @@ -30,8 +30,8 @@ def _get_tab_title(workspace: WorkspaceInfo) -> str: """Get tab title from beads issue or workspace name. - If workspace purpose contains a beads issue ID, fetches the issue - title from beads. Otherwise falls back to workspace name. + Prefers issue_id field if available, otherwise checks purpose field + for backward compatibility. Fetches issue title from beads. Args: workspace: Workspace to get title for. @@ -39,11 +39,21 @@ def _get_tab_title(workspace: WorkspaceInfo) -> str: Returns: Tab title string (truncated to 30 chars). """ - # Check if purpose looks like a beads issue ID - if workspace.purpose and workspace.purpose.startswith("agentspaces-"): + # Prefer issue_id field, fall back to parsing purpose for backward compatibility + issue_id = workspace.issue_id + if ( + not issue_id + and workspace.purpose + and workspace.purpose.startswith("agentspaces-") + ): + # Backward compatibility: parse from purpose + issue_id = workspace.purpose.split(":")[0].strip() + + # Fetch issue title from beads if we have an issue_id + if issue_id: try: result = subprocess.run( # nosec: B603, B607 - ["bd", "show", workspace.purpose, "--json"], + ["bd", "show", issue_id, "--json"], capture_output=True, text=True, timeout=5, @@ -54,16 +64,16 @@ def _get_tab_title(workspace: WorkspaceInfo) -> str: title: str = str(issues[0]["title"]) logger.debug( "tab_title_from_beads", - issue_id=workspace.purpose, + issue_id=issue_id, title=title, ) return title[:30] except subprocess.TimeoutExpired: - logger.warning("beads_timeout", issue_id=workspace.purpose) + logger.warning("beads_timeout", issue_id=issue_id) except json.JSONDecodeError: - logger.warning("beads_json_error", issue_id=workspace.purpose) + logger.warning("beads_json_error", issue_id=issue_id) except (KeyError, IndexError, TypeError): - logger.warning("beads_data_error", issue_id=workspace.purpose) + logger.warning("beads_data_error", issue_id=issue_id) # Fallback to workspace name return workspace.name[:30] @@ -95,11 +105,20 @@ def _build_navigation_commands(workspace: WorkspaceInfo, tab_title: str) -> str: if venv_activate.exists(): commands.append(f"source {shlex.quote(str(venv_activate))}") - # Launch claude with plan prompt (if issue ID exists in purpose) - if workspace.purpose and workspace.purpose.startswith("agentspaces-"): - # Use shlex.quote to prevent shell injection via workspace.purpose - quoted_purpose = shlex.quote(workspace.purpose) - commands.append(f"claude 'plan' {quoted_purpose}") + # Launch claude with plan prompt (if issue ID exists) + # Prefer issue_id field, fall back to parsing purpose for backward compatibility + issue_id = workspace.issue_id + if ( + not issue_id + and workspace.purpose + and workspace.purpose.startswith("agentspaces-") + ): + # Backward compatibility: parse from purpose + issue_id = workspace.purpose.split(":")[0].strip() + + if issue_id: + quoted_issue = shlex.quote(issue_id) + commands.append(f"claude 'plan' {quoted_issue}") else: commands.append("claude") diff --git a/src/agentspaces/cli/workspace.py b/src/agentspaces/cli/workspace.py index 99ed306..687e053 100644 --- a/src/agentspaces/cli/workspace.py +++ b/src/agentspaces/cli/workspace.py @@ -13,14 +13,16 @@ print_did_you_mean, print_error, print_info, + print_issue_next_steps, print_next_steps, print_warning, print_workspace_created, + print_workspace_created_from_issue, print_workspace_removed, print_workspace_status, print_workspace_table, ) -from agentspaces.infrastructure import git +from agentspaces.infrastructure import beads, git from agentspaces.infrastructure.similarity import find_similar_names from agentspaces.modules.workspace.service import ( WorkspaceError, @@ -50,6 +52,14 @@ def create( "--attach", "-a", help="Attach to existing branch instead of creating new" ), ] = False, + next_issue: Annotated[ + bool, + typer.Option("--next-issue", help="Create workspace from next ready issue"), + ] = False, + issue_id: Annotated[ + str | None, + typer.Option("--issue-id", help="Create workspace from specific issue ID"), + ] = None, purpose: Annotated[ str | None, typer.Option("--purpose", "-p", help="Purpose/description for this workspace"), @@ -71,6 +81,9 @@ def create( Use --attach to create a workspace for an existing branch without creating a new branch. The workspace name will match the branch name. + Use --next-issue to create a workspace from the next ready beads issue. + Use --issue-id to create a workspace from a specific beads issue. + \b Examples: agentspaces workspace create # From current HEAD @@ -78,7 +91,57 @@ def create( agentspaces workspace create -p "Fix auth bug" # With purpose agentspaces workspace create --no-venv # Skip venv setup agentspaces workspace create feature/auth --attach # Attach to existing branch + agentspaces workspace create --next-issue # From next ready issue + agentspaces workspace create --issue-id=proj-123 # From specific issue """ + # Check mutual exclusivity + if sum([attach, next_issue, bool(issue_id)]) > 1: + print_error("Cannot use --attach, --next-issue, --issue-id together") + raise typer.Exit(1) + + # Issue-based workspace creation + if next_issue or issue_id: + # Check beads availability + if not beads.is_beads_available(): + print_error("bd command not found. Install: pip install beads-project") + raise typer.Exit(1) + + try: + # Get issue + if next_issue: + issues = beads.get_ready_issues() + if not issues: + print_error("No ready issues found. Create with: bd create") + raise typer.Exit(1) + issue = issues[0] + else: + issue = beads.get_issue_by_id(issue_id) # type: ignore[arg-type] + + # Create workspace from issue + workspace = _service.create_from_issue( + issue, + base_branch=branch, + python_version=python_version, + setup_venv=not no_venv, + ) + + # Display issue-specific output + print_workspace_created_from_issue(issue, workspace) + print_issue_next_steps( + workspace_path=str(workspace.path), + issue_id=issue.id, + has_venv=workspace.has_venv, + ) + return + + except beads.BeadsError as e: + print_error(str(e)) + raise typer.Exit(1) from e + except WorkspaceError as e: + print_error(str(e)) + raise typer.Exit(1) from e + + # Standard workspace creation try: if attach: workspace = _service.create( diff --git a/src/agentspaces/infrastructure/beads.py b/src/agentspaces/infrastructure/beads.py new file mode 100644 index 0000000..f86ab0b --- /dev/null +++ b/src/agentspaces/infrastructure/beads.py @@ -0,0 +1,206 @@ +"""Beads issue tracker operations via subprocess.""" + +from __future__ import annotations + +import json +import shutil +import subprocess # nosec B404 - subprocess needed for beads CLI operations +from dataclasses import dataclass + +import structlog + +__all__ = [ + "BeadsError", + "BeadsIssue", + "get_issue_by_id", + "get_ready_issues", + "is_beads_available", +] + +logger = structlog.get_logger() + +# Default timeout for beads operations (5 seconds) +BEADS_TIMEOUT = 5 + + +class BeadsError(Exception): + """Raised when a beads operation fails.""" + + def __init__(self, message: str, stderr: str | None = None) -> None: + super().__init__(message) + self.stderr = stderr + + +@dataclass(frozen=True) +class BeadsIssue: + """Immutable beads issue information. + + Attributes: + id: Issue ID (e.g., "agentspaces-1io"). + title: Issue title. + description: Issue description. + status: Issue status (open, in_progress, closed). + priority: Issue priority (0-4, where 0 is highest). + issue_type: Issue type (bug, feature, task, chore, epic). + owner: Assignee email or None if unassigned. + """ + + id: str + title: str + description: str + status: str + priority: int + issue_type: str + owner: str | None + + +def is_beads_available() -> bool: + """Check if beads CLI is available. + + Returns: + True if bd command is in PATH, False otherwise. + """ + return shutil.which("bd") is not None + + +def get_ready_issues() -> list[BeadsIssue]: + """Get all ready (unblocked) issues. + + Returns list of unassigned issues only, suitable for + automatic workspace creation. + + Returns: + List of ready, unassigned BeadsIssue objects. + + Raises: + BeadsError: If bd command fails or output is invalid. + """ + if not is_beads_available(): + raise BeadsError("bd command not found in PATH") + + try: + result = subprocess.run( # nosec B603, B607 + ["bd", "ready", "--json"], + capture_output=True, + text=True, + timeout=BEADS_TIMEOUT, + check=False, + ) + + if result.returncode != 0: + raise BeadsError( + f"bd ready failed with exit code {result.returncode}", + stderr=result.stderr, + ) + + issues_data = json.loads(result.stdout) + if not isinstance(issues_data, list): + raise BeadsError("bd ready returned invalid JSON (expected list)") + + # Parse issues and filter to unassigned only + issues: list[BeadsIssue] = [] + for data in issues_data: + issue = _parse_issue(data) + # Filter to unassigned issues only (owner is None or empty string) + if not issue.owner or issue.owner.strip() == "": + issues.append(issue) + + logger.debug("beads_ready_issues", count=len(issues)) + return issues + + except subprocess.TimeoutExpired as e: + raise BeadsError( + f"bd ready timed out after {BEADS_TIMEOUT} seconds", + stderr=e.stderr.decode() if e.stderr else None, + ) from e + except json.JSONDecodeError as e: + raise BeadsError(f"Failed to parse bd ready JSON output: {e}") from e + + +def get_issue_by_id(issue_id: str) -> BeadsIssue: + """Get a specific issue by ID. + + Args: + issue_id: Issue identifier (e.g., "agentspaces-1io"). + + Returns: + BeadsIssue object with issue details. + + Raises: + BeadsError: If bd command fails, issue not found, or output is invalid. + """ + if not is_beads_available(): + raise BeadsError("bd command not found in PATH") + + try: + result = subprocess.run( # nosec B603, B607 + ["bd", "show", issue_id, "--json"], + capture_output=True, + text=True, + timeout=BEADS_TIMEOUT, + check=False, + ) + + if result.returncode != 0: + # Check if issue not found + if "not found" in result.stderr.lower(): + raise BeadsError(f"Issue not found: {issue_id}", stderr=result.stderr) + raise BeadsError( + f"bd show failed with exit code {result.returncode}", + stderr=result.stderr, + ) + + issues_data = json.loads(result.stdout) + + # bd show returns a list with one item + if isinstance(issues_data, list): + if not issues_data: + raise BeadsError(f"Issue not found: {issue_id}") + issue_data = issues_data[0] + elif isinstance(issues_data, dict): + issue_data = issues_data + else: + raise BeadsError("bd show returned invalid JSON (expected list or dict)") + + issue = _parse_issue(issue_data) + logger.debug("beads_get_issue", issue_id=issue.id) + return issue + + except subprocess.TimeoutExpired as e: + raise BeadsError( + f"bd show timed out after {BEADS_TIMEOUT} seconds", + stderr=e.stderr.decode() if e.stderr else None, + ) from e + except json.JSONDecodeError as e: + raise BeadsError(f"Failed to parse bd show JSON output: {e}") from e + + +def _parse_issue(data: dict[str, object]) -> BeadsIssue: + """Parse issue data from JSON dict. + + Args: + data: JSON dict from bd command. + + Returns: + BeadsIssue object. + + Raises: + BeadsError: If required fields are missing or have invalid types. + """ + try: + # Type assertions for mypy - these are validated by try/except + priority_val = data["priority"] + if not isinstance(priority_val, int): + priority_val = int(str(priority_val)) + + return BeadsIssue( + id=str(data["id"]), + title=str(data["title"]), + description=str(data.get("description", "")), + status=str(data["status"]), + priority=priority_val, + issue_type=str(data["issue_type"]), + owner=str(data["owner"]) if data.get("owner") else None, + ) + except (KeyError, TypeError, ValueError) as e: + raise BeadsError(f"Failed to parse issue data: {e}") from e diff --git a/src/agentspaces/infrastructure/metadata.py b/src/agentspaces/infrastructure/metadata.py index 7c98732..b8ea18d 100644 --- a/src/agentspaces/infrastructure/metadata.py +++ b/src/agentspaces/infrastructure/metadata.py @@ -28,7 +28,8 @@ # v1: Initial schema # v2: Added deps_synced_at and last_activity_at fields (removed in v3) # v3: Removed unused timestamp fields (deps_synced_at, last_activity_at) -SCHEMA_VERSION = "3" +# v4: Added issue_id field for beads integration +SCHEMA_VERSION = "4" # Maximum metadata file size (1MB - generous for workspace metadata) MAX_METADATA_SIZE = 1 * 1024 * 1024 @@ -52,6 +53,7 @@ class WorkspaceMetadata: python_version: Python version used for venv. has_venv: Whether a virtual environment was created. status: Workspace status (active, archived). + issue_id: Beads issue ID associated with workspace. """ name: str @@ -63,6 +65,7 @@ class WorkspaceMetadata: python_version: str | None = None has_venv: bool = False status: str = "active" + issue_id: str | None = None def save_workspace_metadata(metadata: WorkspaceMetadata, path: Path) -> None: @@ -240,4 +243,5 @@ def _dict_to_metadata(data: dict[str, Any]) -> WorkspaceMetadata: python_version=data.get("python_version"), has_venv=data.get("has_venv", False), status=data.get("status", "active"), + issue_id=data.get("issue_id"), ) diff --git a/src/agentspaces/modules/workspace/service.py b/src/agentspaces/modules/workspace/service.py index cc5b067..acb1d73 100644 --- a/src/agentspaces/modules/workspace/service.py +++ b/src/agentspaces/modules/workspace/service.py @@ -9,6 +9,7 @@ import structlog from agentspaces.infrastructure import git +from agentspaces.infrastructure.beads import BeadsIssue # noqa: TC001 - used at runtime from agentspaces.infrastructure.metadata import ( WorkspaceMetadata, load_workspace_metadata, @@ -44,6 +45,7 @@ class WorkspaceInfo: python_version: str | None = None has_venv: bool = False status: str = "active" + issue_id: str | None = None class WorkspaceError(Exception): @@ -90,9 +92,11 @@ def create( *, base_branch: str = "HEAD", attach_branch: str | None = None, + workspace_name: str | None = None, purpose: str | None = None, python_version: str | None = None, setup_venv: bool = True, + issue_id: str | None = None, cwd: Path | None = None, ) -> WorkspaceInfo: """Create a new workspace. @@ -104,9 +108,12 @@ def create( base_branch: Branch to create workspace from (ignored if attach_branch set). attach_branch: Existing branch to attach to. When set, no new branch is created and the workspace name matches the branch name. + workspace_name: Custom workspace name. If None, generates random name. + Ignored if attach_branch is set. purpose: Description of workspace purpose. python_version: Python version for venv (auto-detected if not specified). setup_venv: Whether to create a virtual environment. + issue_id: Beads issue ID to associate with workspace. cwd: Current working directory. Returns: @@ -141,6 +148,7 @@ def create( result = worktree.create_worktree( project=project, base_branch=base_branch, + workspace_name=workspace_name, repo_root=repo_root, resolver=self._resolver, ) @@ -184,6 +192,7 @@ def create( purpose=purpose, python_version=env_info.python_version if env_info else None, has_venv=env_info.has_venv if env_info else False, + issue_id=issue_id, ) metadata_path = self._resolver.workspace_json(project, result.name) @@ -219,6 +228,7 @@ def create( purpose=purpose, python_version=env_info.python_version if env_info else None, has_venv=env_info.has_venv if env_info else False, + issue_id=issue_id, ) logger.info( @@ -231,6 +241,46 @@ def create( return workspace + def create_from_issue( + self, + issue: BeadsIssue, + *, + base_branch: str = "HEAD", + python_version: str | None = None, + setup_venv: bool = True, + cwd: Path | None = None, + ) -> WorkspaceInfo: + """Create workspace from beads issue. + + Uses issue.id as workspace name and branch name. + Sets purpose to "{issue.id}: {issue.title}". + Stores issue_id in metadata. + + Args: + issue: Beads issue to create workspace for. + base_branch: Branch to create workspace from. + python_version: Python version for venv (auto-detected if not specified). + setup_venv: Whether to create a virtual environment. + cwd: Current working directory. + + Returns: + WorkspaceInfo with details. + + Raises: + WorkspaceError: If workspace/branch already exists. + """ + purpose = f"{issue.id}: {issue.title}" + + return self.create( + base_branch=base_branch, + workspace_name=issue.id, + purpose=purpose, + issue_id=issue.id, + python_version=python_version, + setup_venv=setup_venv, + cwd=cwd, + ) + def list(self, *, cwd: Path | None = None) -> list[WorkspaceInfo]: """List all workspaces for the current repository. @@ -280,6 +330,7 @@ def list(self, *, cwd: Path | None = None) -> list[WorkspaceInfo]: python_version=metadata.python_version if metadata else None, has_venv=metadata.has_venv if metadata else False, status=metadata.status if metadata else "active", + issue_id=metadata.issue_id if metadata else None, ) ) @@ -334,6 +385,7 @@ def get(self, name: str, *, cwd: Path | None = None) -> WorkspaceInfo: if metadata else (workspace_path / ".venv").exists(), status=metadata.status if metadata else "active", + issue_id=metadata.issue_id if metadata else None, ) def remove( diff --git a/src/agentspaces/modules/workspace/worktree.py b/src/agentspaces/modules/workspace/worktree.py index 564811f..e833101 100644 --- a/src/agentspaces/modules/workspace/worktree.py +++ b/src/agentspaces/modules/workspace/worktree.py @@ -107,6 +107,7 @@ def create_worktree( project: str, base_branch: str = "HEAD", *, + workspace_name: str | None = None, repo_root: Path, resolver: PathResolver | None = None, ) -> WorktreeCreateResult: @@ -115,6 +116,7 @@ def create_worktree( Args: project: Project/repository name. base_branch: Base branch to create from. + workspace_name: Custom workspace name. If None, generates random name. repo_root: Path to the main repository. resolver: Path resolver instance. @@ -122,16 +124,24 @@ def create_worktree( WorktreeCreateResult with details. Raises: + ValueError: If custom workspace_name already exists. git.GitError: If worktree creation fails. """ resolver = resolver or PathResolver() resolver.ensure_base() - # Generate unique workspace name - def name_exists(name: str) -> bool: - return resolver.workspace_exists(project, name) + # Use provided name or generate unique one + if workspace_name is not None: + # Validate custom name doesn't exist + if resolver.workspace_exists(project, workspace_name): + raise ValueError(f"Workspace '{workspace_name}' already exists") + else: + # Generate unique workspace name + def name_exists(name: str) -> bool: + return resolver.workspace_exists(project, name) + + workspace_name = generate_name(exists_check=name_exists) - workspace_name = generate_name(exists_check=name_exists) workspace_path = resolver.workspace_dir(project, workspace_name) # Create parent directory diff --git a/src/agentspaces/ui/widgets.py b/src/agentspaces/ui/widgets.py index f7981b7..9f4fbb4 100644 --- a/src/agentspaces/ui/widgets.py +++ b/src/agentspaces/ui/widgets.py @@ -159,6 +159,9 @@ def update_preview(self, workspace: WorkspaceInfo | None) -> None: f"[bold]Venv:[/bold] {'Yes ✓' if workspace.has_venv else 'No'}", ] + if workspace.issue_id: + lines.extend(["", f"[bold]Issue:[/bold] {workspace.issue_id}"]) + if workspace.purpose: lines.extend(["", "[bold]Purpose:[/bold]", f"{workspace.purpose}"]) diff --git a/tests/integration/test_workspace_beads.py b/tests/integration/test_workspace_beads.py new file mode 100644 index 0000000..c89870e --- /dev/null +++ b/tests/integration/test_workspace_beads.py @@ -0,0 +1,315 @@ +"""Integration tests for beads workspace creation.""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import patch + +import pytest + +from agentspaces.infrastructure.beads import BeadsIssue +from agentspaces.modules.workspace.service import WorkspaceService + + +@pytest.fixture +def mock_beads_issue() -> BeadsIssue: + """Create a mock beads issue for testing.""" + return BeadsIssue( + id="agentspaces-1a2b", + title="Test Feature Implementation", + description="Implement test feature", + status="open", + priority=2, + issue_type="feature", + owner=None, + ) + + +@pytest.fixture +def service(tmp_path: Path) -> WorkspaceService: + """Create a workspace service with temp resolver.""" + from agentspaces.infrastructure.paths import PathResolver + + resolver = PathResolver(base=tmp_path) + return WorkspaceService(resolver=resolver) + + +class TestCreateFromIssue: + """Test create_from_issue() method.""" + + def test_creates_workspace_with_issue_id_as_name( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should create workspace with issue ID as name and branch.""" + with patch("agentspaces.infrastructure.git.worktree_add") as mock_git: + workspace = service.create_from_issue( + mock_beads_issue, + base_branch="main", + setup_venv=False, + ) + + # Verify git was called correctly + mock_git.assert_called_once() + call_args = mock_git.call_args + assert call_args[1]["branch"] == "agentspaces-1a2b" + + assert workspace.name == "agentspaces-1a2b" + assert workspace.branch == "agentspaces-1a2b" + assert workspace.issue_id == "agentspaces-1a2b" + + def test_sets_purpose_from_issue( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should set purpose to 'issue_id: title'.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + workspace = service.create_from_issue( + mock_beads_issue, + setup_venv=False, + ) + + assert workspace.purpose == "agentspaces-1a2b: Test Feature Implementation" + + def test_persists_issue_id_in_metadata( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should persist issue_id in workspace.json metadata.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + workspace = service.create_from_issue( + mock_beads_issue, + setup_venv=False, + ) + + # Read metadata file (use actual workspace path) + metadata_path = workspace.path / ".agentspace" / "workspace.json" + assert metadata_path.exists() + + data = json.loads(metadata_path.read_text()) + assert data["issue_id"] == "agentspaces-1a2b" + assert data["version"] == "4" + + def test_uses_provided_base_branch( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should use provided base_branch parameter.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + workspace = service.create_from_issue( + mock_beads_issue, + base_branch="develop", + setup_venv=False, + ) + + assert workspace.base_branch == "develop" + + def test_respects_setup_venv_flag( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should respect setup_venv parameter.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + # With venv + with patch( + "agentspaces.modules.workspace.environment.setup_environment" + ) as mock_setup: + from agentspaces.modules.workspace.environment import EnvironmentInfo + + mock_setup.return_value = EnvironmentInfo( + has_venv=True, + python_version="3.13", + has_pyproject=True, + venv_path=None, + ) + + workspace = service.create_from_issue( + mock_beads_issue, + setup_venv=True, + ) + assert mock_setup.called + + # Without venv - create different issue to avoid name collision + issue_no_venv = BeadsIssue( + id="agentspaces-2b3c", + title="Another Test Feature", + description="Test without venv", + status="open", + priority=2, + issue_type="feature", + owner=None, + ) + workspace = service.create_from_issue( + issue_no_venv, + setup_venv=False, + ) + assert not workspace.has_venv + + def test_rejects_duplicate_issue_workspace( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should reject creating workspace for same issue twice.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + # Create first workspace + service.create_from_issue(mock_beads_issue, setup_venv=False) + + # Try to create duplicate + from agentspaces.modules.workspace.service import WorkspaceError + + with pytest.raises(WorkspaceError, match="already exists"): + service.create_from_issue(mock_beads_issue, setup_venv=False) + + +class TestWorkspaceListWithIssues: + """Test listing workspaces with issue_id field.""" + + def test_list_includes_issue_id( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should include issue_id in listed workspaces.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + service.create_from_issue(mock_beads_issue, setup_venv=False) + + with patch("agentspaces.infrastructure.git.worktree_list") as mock_list: + from agentspaces.infrastructure.git import WorktreeInfo + + mock_list.return_value = [ + WorktreeInfo( + path=Path("/fake/path/agentspaces-1a2b"), + branch="agentspaces-1a2b", + commit="abc123", + is_main=False, + ) + ] + + workspaces = service.list() + + assert len(workspaces) == 1 + assert workspaces[0].issue_id == "agentspaces-1a2b" + + def test_list_handles_workspaces_without_issue_id( + self, service: WorkspaceService + ) -> None: + """Should handle workspaces without issue_id (backward compat).""" + # Create workspace without issue_id (old-style) + with patch("agentspaces.infrastructure.git.worktree_add"): + workspace = service.create( + base_branch="main", + setup_venv=False, + ) + + with patch("agentspaces.infrastructure.git.worktree_list") as mock_list: + from agentspaces.infrastructure.git import WorktreeInfo + + mock_list.return_value = [ + WorktreeInfo( + path=workspace.path, + branch=workspace.branch, + commit="abc123", + is_main=False, + ) + ] + + workspaces = service.list() + + assert len(workspaces) == 1 + assert workspaces[0].issue_id is None + + +class TestCustomWorkspaceName: + """Test custom workspace names.""" + + def test_create_with_custom_name(self, service: WorkspaceService) -> None: + """Should create workspace with custom name.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + workspace = service.create( + base_branch="main", + workspace_name="custom-workspace", + setup_venv=False, + ) + + assert workspace.name == "custom-workspace" + assert workspace.branch == "custom-workspace" + + def test_rejects_duplicate_workspace_name(self, service: WorkspaceService) -> None: + """Should reject duplicate custom workspace names.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + # Create first workspace + service.create( + workspace_name="duplicate-name", + setup_venv=False, + ) + + # Try to create duplicate + from agentspaces.modules.workspace.service import WorkspaceError + + with pytest.raises(WorkspaceError, match="already exists"): + service.create( + workspace_name="duplicate-name", + setup_venv=False, + ) + + +class TestMetadataSchemaV4: + """Test metadata schema v4 with issue_id.""" + + def test_metadata_v4_round_trip( + self, service: WorkspaceService, mock_beads_issue: BeadsIssue + ) -> None: + """Should save and load v4 metadata correctly.""" + with patch("agentspaces.infrastructure.git.worktree_add"): + # Create workspace with issue + workspace = service.create_from_issue( + mock_beads_issue, + setup_venv=False, + ) + + # Load it back (outside patch context, mock worktree_list) + with patch("agentspaces.infrastructure.git.worktree_list") as mock_list: + from agentspaces.infrastructure.git import WorktreeInfo + + mock_list.return_value = [ + WorktreeInfo( + path=workspace.path, + branch=workspace.branch, + commit="abc123", + is_main=False, + ) + ] + + loaded_workspace = service.get(workspace.name) + + assert loaded_workspace.issue_id == "agentspaces-1a2b" + assert loaded_workspace.name == "agentspaces-1a2b" + + def test_v3_metadata_loads_without_issue_id( + self, service: WorkspaceService + ) -> None: + """Should load v3 metadata with issue_id=None.""" + # Create old-style workspace (no issue_id) + with patch("agentspaces.infrastructure.git.worktree_add"): + workspace = service.create( + base_branch="main", + setup_venv=False, + ) + + # Manually modify metadata to v3 format (remove issue_id) + metadata_path = workspace.path / ".agentspace" / "workspace.json" + data = json.loads(metadata_path.read_text()) + data["version"] = "3" + data.pop("issue_id", None) + metadata_path.write_text(json.dumps(data, indent=2)) + + # Load it back (mock worktree_list) + with patch("agentspaces.infrastructure.git.worktree_list") as mock_list: + from agentspaces.infrastructure.git import WorktreeInfo + + mock_list.return_value = [ + WorktreeInfo( + path=workspace.path, + branch=workspace.branch, + commit="abc123", + is_main=False, + ) + ] + + loaded_workspace = service.get(workspace.name) + + assert loaded_workspace.issue_id is None + assert loaded_workspace.name == workspace.name diff --git a/tests/unit/infrastructure/test_beads.py b/tests/unit/infrastructure/test_beads.py new file mode 100644 index 0000000..a7fbd2f --- /dev/null +++ b/tests/unit/infrastructure/test_beads.py @@ -0,0 +1,279 @@ +"""Tests for beads integration module.""" + +from __future__ import annotations + +import json +import subprocess +from unittest.mock import Mock, patch + +import pytest + +from agentspaces.infrastructure.beads import ( + BeadsError, + BeadsIssue, + get_issue_by_id, + get_ready_issues, + is_beads_available, +) + + +class TestIsBeadsAvailable: + """Tests for is_beads_available().""" + + def test_returns_true_when_bd_in_path(self) -> None: + """Should return True when bd command is available.""" + with patch("shutil.which", return_value="/usr/bin/bd"): + assert is_beads_available() is True + + def test_returns_false_when_bd_not_in_path(self) -> None: + """Should return False when bd command is not available.""" + with patch("shutil.which", return_value=None): + assert is_beads_available() is False + + +class TestGetReadyIssues: + """Tests for get_ready_issues().""" + + def test_raises_error_when_bd_not_available(self) -> None: + """Should raise BeadsError when bd not in PATH.""" + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", + return_value=False, + ), + pytest.raises(BeadsError, match="bd command not found"), + ): + get_ready_issues() + + def test_returns_unassigned_issues_only(self) -> None: + """Should filter to unassigned issues only.""" + issues_data = [ + { + "id": "test-1", + "title": "Unassigned task", + "description": "Desc", + "status": "open", + "priority": 2, + "issue_type": "task", + "owner": None, + }, + { + "id": "test-2", + "title": "Assigned task", + "description": "Desc", + "status": "open", + "priority": 2, + "issue_type": "task", + "owner": "user@example.com", + }, + { + "id": "test-3", + "title": "Empty owner", + "description": "Desc", + "status": "open", + "priority": 2, + "issue_type": "task", + "owner": "", + }, + ] + + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps(issues_data) + + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", return_value=mock_result), + ): + issues = get_ready_issues() + + assert len(issues) == 2 + assert issues[0].id == "test-1" + assert issues[1].id == "test-3" + + def test_raises_error_on_command_failure(self) -> None: + """Should raise BeadsError when bd command fails.""" + mock_result = Mock() + mock_result.returncode = 1 + mock_result.stderr = "Command failed" + + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", return_value=mock_result), + pytest.raises(BeadsError, match="bd ready failed"), + ): + get_ready_issues() + + def test_raises_error_on_invalid_json(self) -> None: + """Should raise BeadsError when output is not valid JSON.""" + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = "not json" + + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", return_value=mock_result), + pytest.raises(BeadsError, match="Failed to parse"), + ): + get_ready_issues() + + def test_raises_error_on_timeout(self) -> None: + """Should raise BeadsError when command times out.""" + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", side_effect=subprocess.TimeoutExpired("bd", 5)), + pytest.raises(BeadsError, match="timed out"), + ): + get_ready_issues() + + +class TestGetIssueById: + """Tests for get_issue_by_id().""" + + def test_raises_error_when_bd_not_available(self) -> None: + """Should raise BeadsError when bd not in PATH.""" + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", + return_value=False, + ), + pytest.raises(BeadsError, match="bd command not found"), + ): + get_issue_by_id("test-1") + + def test_returns_issue_from_list_response(self) -> None: + """Should parse issue from list response (single item).""" + issue_data = [ + { + "id": "test-1", + "title": "Test Issue", + "description": "Test description", + "status": "open", + "priority": 2, + "issue_type": "task", + "owner": None, + } + ] + + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps(issue_data) + + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", return_value=mock_result), + ): + issue = get_issue_by_id("test-1") + + assert issue.id == "test-1" + assert issue.title == "Test Issue" + assert issue.priority == 2 + + def test_returns_issue_from_dict_response(self) -> None: + """Should parse issue from dict response.""" + issue_data = { + "id": "test-1", + "title": "Test Issue", + "description": "Test description", + "status": "open", + "priority": 2, + "issue_type": "task", + "owner": None, + } + + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = json.dumps(issue_data) + + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", return_value=mock_result), + ): + issue = get_issue_by_id("test-1") + + assert issue.id == "test-1" + assert issue.title == "Test Issue" + + def test_raises_error_when_issue_not_found(self) -> None: + """Should raise BeadsError when issue not found.""" + mock_result = Mock() + mock_result.returncode = 1 + mock_result.stderr = "Issue not found: test-1" + + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", return_value=mock_result), + pytest.raises(BeadsError, match="Issue not found: test-1"), + ): + get_issue_by_id("test-1") + + def test_raises_error_on_empty_list(self) -> None: + """Should raise BeadsError when list is empty.""" + mock_result = Mock() + mock_result.returncode = 0 + mock_result.stdout = "[]" + + with ( + patch( + "agentspaces.infrastructure.beads.is_beads_available", return_value=True + ), + patch("subprocess.run", return_value=mock_result), + pytest.raises(BeadsError, match="Issue not found"), + ): + get_issue_by_id("test-1") + + +class TestBeadsIssue: + """Tests for BeadsIssue dataclass.""" + + def test_issue_is_frozen(self) -> None: + """Should be immutable.""" + issue = BeadsIssue( + id="test-1", + title="Test", + description="Desc", + status="open", + priority=2, + issue_type="task", + owner=None, + ) + + with pytest.raises(AttributeError): + issue.title = "Changed" # type: ignore[misc] + + def test_issue_equality(self) -> None: + """Should support equality comparison.""" + issue1 = BeadsIssue( + id="test-1", + title="Test", + description="Desc", + status="open", + priority=2, + issue_type="task", + owner=None, + ) + issue2 = BeadsIssue( + id="test-1", + title="Test", + description="Desc", + status="open", + priority=2, + issue_type="task", + owner=None, + ) + + assert issue1 == issue2 diff --git a/tests/unit/infrastructure/test_metadata.py b/tests/unit/infrastructure/test_metadata.py index 3a901a0..04499fc 100644 --- a/tests/unit/infrastructure/test_metadata.py +++ b/tests/unit/infrastructure/test_metadata.py @@ -110,7 +110,7 @@ def test_save_includes_version(self, temp_dir: Path) -> None: data = json.loads(path.read_text(encoding="utf-8")) assert "version" in data - assert data["version"] == "3" # Schema version 3 + assert data["version"] == "4" # Schema version 4 def test_save_creates_parent_directories(self, temp_dir: Path) -> None: """Should create parent directories if needed."""