Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions skills/gdb-cli/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ If gdb-cli is not installed, guide the user: `pip install git+https://github.com

### Step 1: Initialize Debug Session

**For debugging a binary file through a target running on a remote host and port**
```bash
gdb-cli target --binary <binary_path> --remote <host_name>:<port_number> [--gdb-path <gdb_path>]
```

**For core dump analysis:**
```bash
gdb-cli load --binary <binary_path> --core <core_path> [--gdb-path <gdb_path>]
Expand Down
68 changes: 67 additions & 1 deletion src/gdb_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Usage:
gdb-cli load --binary ./my_program --core ./core.1234
gdb-cli attach --pid 9876
gdb-cli target --remote 192.168.0.21:3333
gdb-cli eval-cmd --session <id> "lock_mgr->buckets[0]"
gdb-cli threads --session <id> [--limit 20]
gdb-cli bt --session <id> [--thread 12] [--limit 30]
Expand All @@ -21,11 +22,12 @@
from . import __version__
from .client import GDBClient, GDBClientError, GDBCommandError
from .i18n import t
from .launcher import GDBLauncherError, launch_attach, launch_core
from .launcher import GDBLauncherError, launch_attach, launch_core, launch_target
from .session import (
cleanup_dead_sessions,
find_session_by_core,
find_session_by_pid,
find_session_by_remote,
get_session,
list_sessions,
)
Expand Down Expand Up @@ -181,6 +183,68 @@ def attach(
raise click.exceptions.Exit(1)


@main.command()
@click.option("--remote", "-r", required=True, type=str, help=t("cli.target.remote_help"))
@click.option("--binary", "-b", help=t("cli.target.binary_help"))
@click.option("--scheduler-locking/--no-scheduler-locking", default=True, help=t("cli.target.scheduler_locking_help"))
@click.option("--non-stop/--no-non-stop", default=False, help=t("cli.target.non_stop_help"))
@click.option("--timeout", default=600, help=t("cli.target.timeout_help"))
@click.option("--allow-write", is_flag=True, help=t("cli.target.allow_write_help"))
@click.option("--allow-call", is_flag=True, help=t("cli.target.allow_call_help"))
@click.option("--gdb-path", default="gdb", help=t("cli.load.gdb_path_help"))
def target(
remote: str,
binary: Optional[str],
scheduler_locking: bool,
non_stop: bool,
timeout: int,
allow_write: bool,
allow_call: bool,
gdb_path: str,
) -> None:
"""Connect to remote GDB server"""
existing = find_session_by_remote(remote)
if existing:
print_json({
"session_id": existing.session_id,
"mode": existing.mode,
"remote": existing.remote,
"status": "reused",
"message": "Session already exists for this remote"
})
return

try:
gdb_process = launch_target(
remote=remote,
binary=binary,
scheduler_locking=scheduler_locking,
non_stop=non_stop,
timeout=timeout,
allow_write=allow_write,
allow_call=allow_call,
gdb_path=gdb_path
)

session = gdb_process.session

print_json({
"session_id": session.session_id,
"mode": session.mode,
"remote": session.remote,
"binary": session.binary,
"remote": session.remote,
"sock_path": session.sock_path,
"gdb_pid": gdb_process.pid,
"safety_level": session.safety_level,
"status": "started"
})

except GDBLauncherError as e:
print_error("Failed to connect to target", str(e))
raise click.exceptions.Exit(1)


@main.command()
@click.option("--session", "-s", required=True, help=t("cli.eval_cmd.session_help"))
@click.argument("expr")
Expand Down Expand Up @@ -330,7 +394,9 @@ def sessions() -> None:
"mode": s.mode,
"binary": s.binary,
"pid": s.pid,
"remote": s.remote,
"core": s.core,
"gdb_pid": s.gdb_pid,
"started_at": s.started_at,
}
for s in session_list
Expand Down
2 changes: 1 addition & 1 deletion src/gdb_cli/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def __init__(self, message: str, suggestion: Optional[str] = None, details: Opti

# 连接错误
"socket_not_found": {
"suggestion": "GDB 会话可能已终止,重新运行 'gdb-cli load' 或 'gdb-cli attach'",
"suggestion": "GDB 会话可能已终止,重新运行 'gdb-cli load' 或 'gdb-cli attach' 或 'gdb-cli target'",
},
"connection_refused": {
"suggestion": "GDB RPC Server 未响应,检查 GDB 进程是否正常运行",
Expand Down
28 changes: 26 additions & 2 deletions src/gdb_cli/gdb_server/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import os
import queue
from pathlib import Path
from typing import Any, List, Optional, Tuple

Expand Down Expand Up @@ -571,7 +572,26 @@ def handle_exec(
}

try:
output = gdb.execute(command, to_string=True)

# The command must run in the main thread, not in the thread that
# receives from the socket. Create a function which will execute
# the command and pass the output through a queue. This definition
# uses lexical scoping for the command and the queue.
result_queue = queue.Queue()

def run_command():
try:
output = gdb.execute(command, to_string=True)
result_queue.put(("ok", output))
except Exception as e:
result_queue.put(("error", str(e)))

# Pass the function through post_event() to the main thread,
# where it will run, and this thread waits to receive the output
# from the queue.
gdb.post_event(run_command)
output_status, output = result_queue.get(timeout=30.0)

return {
"command": command,
"output": output or "(no output)"
Expand All @@ -588,7 +608,7 @@ def handle_status(**kwargs) -> dict:

Returns:
{
"mode": "core" | "attach",
"mode": "core" | "attach" | "target",
"binary": "...",
"threads_count": N,
"current_thread": {...},
Expand All @@ -604,6 +624,7 @@ def handle_status(**kwargs) -> dict:
"state": "ready",
"mode": session_meta.get("mode", "unknown"),
"binary": session_meta.get("binary"),
"gdb_pid": session_meta.get("gdb_pid"),
}

# 线程信息
Expand Down Expand Up @@ -640,6 +661,9 @@ def handle_status(**kwargs) -> dict:
if session_meta.get("mode") == "attach":
result["pid"] = session_meta.get("pid")

if session_meta.get("mode") == "target":
result["remote"] = session_meta.get("remote")

return result


Expand Down
84 changes: 82 additions & 2 deletions src/gdb_cli/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ def launch_attach(
# non-stop 模式
if non_stop:
gdb_commands.append("set non-stop on")
gdb_commands.append("set target-async on")
gdb_commands.append("set mi-async on")
else:
gdb_commands.append("set non-stop off")
gdb_commands.append("set mi-async off")

# 加载 binary (可选)
if binary:
Expand Down Expand Up @@ -214,6 +217,82 @@ def launch_attach(
return gdb_process


def launch_target(
remote: str,
binary: Optional[str] = None,
scheduler_locking: bool = True,
non_stop: bool = False,
timeout: int = 600,
allow_write: bool = False,
allow_call: bool = False,
gdb_path: str = "gdb"
) -> GDBProcess:
"""
Args:
remote: host:port
binary: 可执行文件路径 (可选)
scheduler_locking: 是否启用 scheduler-locking
non_stop: 是否启用 non-stop 模式
timeout: 心跳超时秒数
allow_write: 是否允许内存修改
allow_call: 是否允许函数调用
gdb_path: GDB 可执行文件路径

Returns:
GDBProcess 实例
"""
# 创建 session
safety_level = "full" if (allow_write or allow_call) else "readonly"
if allow_write and not allow_call:
safety_level = "readwrite"

session = create_session(
mode="target",
remote=remote,
binary=binary,
timeout=timeout,
safety_level=safety_level
)

# 构建 GDB 启动命令
gdb_commands = [
"set pagination off",
"set print elements 0",
"set confirm off",
]

# non-stop 模式
if non_stop:
gdb_commands.append("set non-stop on")
gdb_commands.append("set target-async on")

# 加载 binary (可选)
if binary:
gdb_commands.append(f"file {binary}")

# Target
gdb_commands.append(f"target extended-remote {remote}")

# scheduler-locking
if scheduler_locking:
gdb_commands.append("set scheduler-locking on")

# 启动 RPC Server
gdb_commands.extend(_build_server_commands(session))
gdb_commands.append("python _gdb_rpc_server.set_ready()")

# 构建 GDB 参数
gdb_args = [gdb_path, "-nx", "-q"]
for cmd in gdb_commands:
gdb_args.extend(["-ex", cmd])

# 启动进程
_start_gdb_process(gdb_args, session, timeout=float(timeout))
gdb_process = GDBProcess(session)
gdb_process._process = session._gdb_process
return gdb_process


def _build_server_commands(session: SessionMeta) -> List[str]:
"""构建 RPC Server 启动命令"""
session_meta = {
Expand All @@ -222,6 +301,7 @@ def _build_server_commands(session: SessionMeta) -> List[str]:
"binary": session.binary,
"core": session.core,
"pid": session.pid,
"remote": session.remote,
"sock_path": str(session.sock_path),
"started_at": session.started_at,
}
Expand All @@ -241,7 +321,7 @@ def _build_server_commands(session: SessionMeta) -> List[str]:
# 加载 Server 脚本 (source 会将定义加载到全局命名空间)
f"source {GDB_SERVER_SCRIPT}",
# 启动 Server (直接调用全局命名空间中的 start_server)
f"python start_server('{session.sock_path}', {json.dumps(session_meta)}, {session.heartbeat_timeout})",
f"python start_server('{session.sock_path}', {session_meta}, {session.heartbeat_timeout})",
]


Expand Down
2 changes: 1 addition & 1 deletion src/gdb_cli/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class CommandCheckResult:
# 始终禁止的命令
FORBIDDEN_COMMANDS: Set[str] = {
"quit", "kill", "shell", "python-interactive",
"signal", "detach", "attach",
"signal", "detach", "attach", "target",
}

# 命令别名映射
Expand Down
23 changes: 21 additions & 2 deletions src/gdb_cli/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
class SessionMeta:
"""会话元数据"""
session_id: str # UUID
mode: str # "core" | "attach"
mode: str # "core" | "attach" | "target"
binary: Optional[str] = None # 可执行文件路径
core: Optional[str] = None # Core dump 路径 (core 模式)
pid: Optional[int] = None # 目标进程 PID (attach 模式)
remote: Optional[str] = None # host:port for target
gdb_pid: Optional[int] = None # GDB 进程 PID
sock_path: Optional[str] = None # Unix Socket 路径
started_at: float = field(default_factory=time.time)
Expand All @@ -49,17 +50,19 @@ def create_session(
binary: Optional[str] = None,
core: Optional[str] = None,
pid: Optional[int] = None,
remote: Optional[int] = None,
timeout: int = 600,
safety_level: str = "readonly"
) -> SessionMeta:
"""
创建新会话

Args:
mode: 模式 ("core" | "attach")
mode: 模式 ("core" | "attach" | "target")
binary: 可执行文件路径
core: Core dump 路径
pid: 目标进程 PID
remote: host:port
timeout: 心跳超时秒数
safety_level: 安全级别

Expand All @@ -83,6 +86,7 @@ def create_session(
binary=binary,
core=core,
pid=pid,
remote=remote,
sock_path=str(sock_path),
heartbeat_timeout=timeout,
safety_level=safety_level
Expand Down Expand Up @@ -290,6 +294,21 @@ def find_session_by_core(core: str) -> Optional[SessionMeta]:
return None


def find_session_by_remote(remote: str) -> Optional[SessionMeta]:
"""
Args:
remote: host:port

Returns:
已存在的 SessionMeta 或 None
"""
sessions = list_sessions(alive_only=True)
for session in sessions:
if session.mode == "target" and session.remote == remote:
return session
return None


def update_session_activity(session_id: str) -> None:
"""更新会话最后活跃时间"""
meta = _read_meta(session_id)
Expand Down
Loading