diff --git a/packages/runtimeuse-client-python/pyproject.toml b/packages/runtimeuse-client-python/pyproject.toml index 02897a7..753a19c 100644 --- a/packages/runtimeuse-client-python/pyproject.toml +++ b/packages/runtimeuse-client-python/pyproject.toml @@ -22,6 +22,7 @@ Issues = "https://github.com/getlark/runtimeuse/issues" [project.optional-dependencies] dev = [ + "boto3", "daytona", "e2b", "e2b-code-interpreter", diff --git a/packages/runtimeuse-client-python/src/runtimeuse_client/types.py b/packages/runtimeuse-client-python/src/runtimeuse_client/types.py index 0c9ba18..f856b2e 100644 --- a/packages/runtimeuse-client-python/src/runtimeuse_client/types.py +++ b/packages/runtimeuse-client-python/src/runtimeuse_client/types.py @@ -148,3 +148,11 @@ class QueryOptions: timeout: float | None = None #: Logger instance; falls back to the module-level logger when ``None``. logger: logging.Logger | None = None + + def __post_init__(self) -> None: + has_dir = self.artifacts_dir is not None + has_cb = self.on_artifact_upload_request is not None + if has_dir != has_cb: + raise ValueError( + "artifacts_dir and on_artifact_upload_request must be specified together" + ) diff --git a/packages/runtimeuse-client-python/test/e2e/conftest.py b/packages/runtimeuse-client-python/test/e2e/conftest.py index 032ab52..c671818 100644 --- a/packages/runtimeuse-client-python/test/e2e/conftest.py +++ b/packages/runtimeuse-client-python/test/e2e/conftest.py @@ -3,7 +3,9 @@ import signal import socket import subprocess +import threading import time +from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path from typing import Any @@ -87,3 +89,51 @@ def query_options() -> QueryOptions: @pytest.fixture def make_query_options(): return _make_query_options + + +@pytest.fixture +def http_server(): + """Local HTTP server for serving downloadable files and receiving artifact uploads. + + Yields (base_url, files, uploads) where: + - files: dict[str, bytes] — seed with content before the test runs a query + - uploads: dict[str, bytes] — populated by PUT requests during the test + """ + uploads: dict[str, bytes] = {} + files: dict[str, bytes] = {} + + class _Handler(BaseHTTPRequestHandler): + def do_GET(self) -> None: + if self.path.startswith("/files/"): + name = self.path[len("/files/"):] + if name in files: + self.send_response(200) + self.send_header("Content-Type", "application/octet-stream") + self.end_headers() + self.wfile.write(files[name]) + return + self.send_response(404) + self.end_headers() + + def do_PUT(self) -> None: + if self.path.startswith("/uploads/"): + name = self.path[len("/uploads/"):] + length = int(self.headers.get("Content-Length", 0)) + uploads[name] = self.rfile.read(length) + self.send_response(200) + self.end_headers() + return + self.send_response(404) + self.end_headers() + + def log_message(self, format: str, *args: Any) -> None: + pass + + server = HTTPServer(("127.0.0.1", 0), _Handler) + port = server.server_address[1] + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + yield f"http://127.0.0.1:{port}", files, uploads + + server.shutdown() diff --git a/packages/runtimeuse-client-python/test/e2e/echo_handler.js b/packages/runtimeuse-client-python/test/e2e/echo_handler.js index 9def185..17e44f2 100644 --- a/packages/runtimeuse-client-python/test/e2e/echo_handler.js +++ b/packages/runtimeuse-client-python/test/e2e/echo_handler.js @@ -6,7 +6,10 @@ * STRUCTURED: — return structured_output result * SLOW: — sleep then return text (timeout / cancel tests) * STREAM: — send n assistant messages before returning + * STREAM_TEXT: — send text as assistant message, then return "done" * ERROR: — send error via sender and throw + * WRITE_FILE: — write file, sleep 3s for chokidar, return text + * READ_FILE: — read file and return its contents as text * (anything else) — echo the prompt back as text */ @@ -40,6 +43,32 @@ export const handler = { return { type: "text", text: `streamed ${count} messages` }; } + if (prompt.startsWith("STREAM_TEXT:")) { + const text = prompt.slice("STREAM_TEXT:".length); + sender.sendAssistantMessage([text]); + return { type: "text", text: "done" }; + } + + if (prompt.startsWith("WRITE_FILE:")) { + const rest = prompt.slice("WRITE_FILE:".length); + const spaceIdx = rest.indexOf(" "); + const filePath = spaceIdx === -1 ? rest : rest.slice(0, spaceIdx); + const content = spaceIdx === -1 ? "" : rest.slice(spaceIdx + 1); + const fs = await import("fs"); + const path = await import("path"); + fs.mkdirSync(path.dirname(filePath), { recursive: true }); + fs.writeFileSync(filePath, content); + await new Promise((r) => setTimeout(r, 3000)); + return { type: "text", text: `wrote ${filePath}` }; + } + + if (prompt.startsWith("READ_FILE:")) { + const filePath = prompt.slice("READ_FILE:".length).trim(); + const fs = await import("fs"); + const content = fs.readFileSync(filePath, "utf-8"); + return { type: "text", text: content }; + } + if (prompt.startsWith("ERROR:")) { const msg = prompt.slice("ERROR:".length); sender.sendErrorMessage(msg, { source: "echo_handler" }); diff --git a/packages/runtimeuse-client-python/test/e2e/test_e2e.py b/packages/runtimeuse-client-python/test/e2e/test_e2e.py index bd4f6a4..08989b3 100644 --- a/packages/runtimeuse-client-python/test/e2e/test_e2e.py +++ b/packages/runtimeuse-client-python/test/e2e/test_e2e.py @@ -2,6 +2,8 @@ with the deterministic echo handler.""" import json +import os +from uuid import uuid4 import pytest @@ -12,9 +14,12 @@ TextResult, StructuredOutputResult, AssistantMessageInterface, + ArtifactUploadRequestMessageInterface, + ArtifactUploadResult, AgentRuntimeError, CancelledException, CommandInterface, + RuntimeEnvironmentDownloadableInterface, ) pytestmark = [pytest.mark.e2e, pytest.mark.asyncio] @@ -251,6 +256,38 @@ async def test_failed_post_command_raises_error( ) +class TestArtifacts: + async def test_artifact_upload_request_received( + self, client: RuntimeUseClient, make_query_options + ): + artifacts_dir = f"/tmp/test-artifacts-{uuid4()}" + os.makedirs(artifacts_dir, exist_ok=True) + received_requests: list[ArtifactUploadRequestMessageInterface] = [] + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + received_requests.append(req) + return ArtifactUploadResult( + presigned_url="http://localhost:1/fake-upload", + content_type="text/plain", + ) + + result = await client.query( + prompt=f"WRITE_FILE:{artifacts_dir}/test.txt test-content", + options=make_query_options( + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + timeout=15, + ), + ) + + assert isinstance(result.data, TextResult) + assert result.data.text == f"wrote {artifacts_dir}/test.txt" + assert len(received_requests) == 1 + assert received_requests[0].filename == "test.txt" + + class TestInvocationFieldsForwarded: async def test_fields_round_trip( self, client: RuntimeUseClient, make_query_options @@ -266,3 +303,296 @@ async def test_fields_round_trip( assert isinstance(result.data, TextResult) assert result.data.text == "field test" + + +class TestArtifactUploadIntegration: + """Verify that artifact file content actually reaches the upload target.""" + + async def test_artifact_content_uploaded( + self, client: RuntimeUseClient, make_query_options, http_server + ): + base_url, _files, uploads = http_server + artifacts_dir = f"/tmp/test-artifacts-{uuid4()}" + os.makedirs(artifacts_dir, exist_ok=True) + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + return ArtifactUploadResult( + presigned_url=f"{base_url}/uploads/{req.filename}", + content_type="text/plain", + ) + + result = await client.query( + prompt=f"WRITE_FILE:{artifacts_dir}/hello.txt some-content", + options=make_query_options( + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + timeout=15, + ), + ) + + assert isinstance(result.data, TextResult) + assert result.data.text == f"wrote {artifacts_dir}/hello.txt" + assert uploads.get("hello.txt") == b"some-content" + + async def test_multiple_artifacts_uploaded( + self, ws_url: str, make_query_options, http_server + ): + base_url, _files, uploads = http_server + artifacts_dir = f"/tmp/test-artifacts-{uuid4()}" + os.makedirs(artifacts_dir, exist_ok=True) + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + return ArtifactUploadResult( + presigned_url=f"{base_url}/uploads/{req.filename}", + content_type="text/plain", + ) + + opts = dict( + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + timeout=15, + ) + + client1 = RuntimeUseClient(ws_url=ws_url) + result1 = await client1.query( + prompt=f"WRITE_FILE:{artifacts_dir}/one.txt first", + options=make_query_options(**opts), + ) + assert isinstance(result1.data, TextResult) + + client2 = RuntimeUseClient(ws_url=ws_url) + result2 = await client2.query( + prompt=f"WRITE_FILE:{artifacts_dir}/two.txt second", + options=make_query_options(**opts), + ) + assert isinstance(result2.data, TextResult) + + assert uploads.get("one.txt") == b"first" + assert uploads.get("two.txt") == b"second" + + +class TestPreAgentDownloadables: + """Verify pre_agent_downloadables are fetched into the runtime before the agent runs.""" + + async def test_downloaded_file_accessible( + self, client: RuntimeUseClient, make_query_options, http_server + ): + base_url, files, _uploads = http_server + files["setup.sh"] = b"#!/bin/bash\necho hello" + working_dir = f"/tmp/dl-test-{uuid4()}" + + result = await client.query( + prompt=f"READ_FILE:{working_dir}/setup.sh", + options=make_query_options( + pre_agent_downloadables=[ + RuntimeEnvironmentDownloadableInterface( + download_url=f"{base_url}/files/setup.sh", + working_dir=working_dir, + ) + ], + ), + ) + + assert isinstance(result.data, TextResult) + assert result.data.text == "#!/bin/bash\necho hello" + + async def test_multiple_downloadables( + self, ws_url: str, make_query_options, http_server + ): + base_url, files, _uploads = http_server + files["a.txt"] = b"content-a" + files["b.txt"] = b"content-b" + working_dir = f"/tmp/dl-test-{uuid4()}" + + client = RuntimeUseClient(ws_url=ws_url) + result = await client.query( + prompt=f"READ_FILE:{working_dir}/a.txt", + options=make_query_options( + pre_agent_downloadables=[ + RuntimeEnvironmentDownloadableInterface( + download_url=f"{base_url}/files/a.txt", + working_dir=working_dir, + ), + RuntimeEnvironmentDownloadableInterface( + download_url=f"{base_url}/files/b.txt", + working_dir=working_dir, + ), + ], + ), + ) + + assert isinstance(result.data, TextResult) + assert result.data.text == "content-a" + + client2 = RuntimeUseClient(ws_url=ws_url) + result2 = await client2.query( + prompt=f"READ_FILE:{working_dir}/b.txt", + options=make_query_options( + pre_agent_downloadables=[], + ), + ) + assert isinstance(result2.data, TextResult) + assert result2.data.text == "content-b" + + async def test_download_failure_raises_error( + self, client: RuntimeUseClient, make_query_options, http_server + ): + base_url, _files, _uploads = http_server + working_dir = f"/tmp/dl-test-{uuid4()}" + + with pytest.raises(AgentRuntimeError, match="Download failed"): + await client.query( + prompt="ECHO:should not reach", + options=make_query_options( + pre_agent_downloadables=[ + RuntimeEnvironmentDownloadableInterface( + download_url=f"{base_url}/files/nonexistent", + working_dir=working_dir, + ) + ], + ), + ) + + +class TestFullInvocationLifecycle: + """Combined test: download -> pre-command -> agent -> post-command -> artifact upload.""" + + async def test_full_invocation_ordering( + self, ws_url: str, make_query_options, http_server + ): + base_url, files, uploads = http_server + files["runtime.sh"] = b"runtime-payload" + dl_dir = f"/tmp/dl-lifecycle-{uuid4()}" + artifacts_dir = f"/tmp/art-lifecycle-{uuid4()}" + os.makedirs(artifacts_dir, exist_ok=True) + + received: list[AssistantMessageInterface] = [] + + async def on_msg(msg: AssistantMessageInterface): + received.append(msg) + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + return ArtifactUploadResult( + presigned_url=f"{base_url}/uploads/{req.filename}", + content_type="text/plain", + ) + + client = RuntimeUseClient(ws_url=ws_url) + result = await client.query( + prompt=f"WRITE_FILE:{artifacts_dir}/output.txt result-data", + options=make_query_options( + pre_agent_downloadables=[ + RuntimeEnvironmentDownloadableInterface( + download_url=f"{base_url}/files/runtime.sh", + working_dir=dl_dir, + ) + ], + pre_agent_invocation_commands=[ + CommandInterface(command=f"cat {dl_dir}/runtime.sh") + ], + post_agent_invocation_commands=[ + CommandInterface(command="echo lifecycle-done") + ], + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + on_assistant_message=on_msg, + timeout=20, + ), + ) + + assert isinstance(result.data, TextResult) + assert result.data.text == f"wrote {artifacts_dir}/output.txt" + + all_text = [block for msg in received for block in msg.text_blocks] + assert any("runtime-payload" in t for t in all_text), ( + "pre-command should have cat'd the downloaded file" + ) + assert any("lifecycle-done" in t for t in all_text), ( + "post-command should have run after the agent" + ) + + assert uploads.get("output.txt") == b"result-data" + + +class TestSecretsRedaction: + """Verify secrets_to_redact are scrubbed from all outbound messages.""" + + async def test_secret_redacted_from_command_output( + self, client: RuntimeUseClient, make_query_options + ): + received: list[AssistantMessageInterface] = [] + + async def on_msg(msg: AssistantMessageInterface): + received.append(msg) + + result = await client.query( + prompt="ECHO:ok", + options=make_query_options( + secrets_to_redact=["super-secret-value"], + pre_agent_invocation_commands=[ + CommandInterface(command="echo super-secret-value") + ], + on_assistant_message=on_msg, + ), + ) + + assert isinstance(result.data, TextResult) + all_text = [block for msg in received for block in msg.text_blocks] + assert not any("super-secret-value" in t for t in all_text) + assert any("[REDACTED]" in t for t in all_text) + + async def test_secret_redacted_from_assistant_message( + self, client: RuntimeUseClient, make_query_options + ): + received: list[AssistantMessageInterface] = [] + + async def on_msg(msg: AssistantMessageInterface): + received.append(msg) + + result = await client.query( + prompt="STREAM_TEXT:the password is super-secret-value ok", + options=make_query_options( + secrets_to_redact=["super-secret-value"], + on_assistant_message=on_msg, + ), + ) + + assert isinstance(result.data, TextResult) + assert result.data.text == "done" + all_text = [block for msg in received for block in msg.text_blocks] + assert not any("super-secret-value" in t for t in all_text) + assert any("[REDACTED]" in t for t in all_text) + + async def test_secret_redacted_from_result_text( + self, client: RuntimeUseClient, make_query_options + ): + result = await client.query( + prompt="ECHO:the key is super-secret-value here", + options=make_query_options( + secrets_to_redact=["super-secret-value"], + ), + ) + + assert isinstance(result.data, TextResult) + assert "super-secret-value" not in result.data.text + assert "[REDACTED]" in result.data.text + + async def test_secret_redacted_from_error_message( + self, client: RuntimeUseClient, make_query_options + ): + with pytest.raises(AgentRuntimeError) as exc_info: + await client.query( + prompt="ERROR:failed with super-secret-value exposed", + options=make_query_options( + secrets_to_redact=["super-secret-value"], + ), + ) + + assert "super-secret-value" not in str(exc_info.value) + assert "[REDACTED]" in str(exc_info.value) diff --git a/packages/runtimeuse-client-python/test/llm/conftest.py b/packages/runtimeuse-client-python/test/llm/conftest.py index 623601a..89a0e0b 100644 --- a/packages/runtimeuse-client-python/test/llm/conftest.py +++ b/packages/runtimeuse-client-python/test/llm/conftest.py @@ -1,11 +1,21 @@ +import logging +import os +import time + import pytest from test.sandbox_factories.e2b import create_e2b_runtimeuse +_logger = logging.getLogger(__name__) + @pytest.fixture(scope="session") def openai_ws_url(): """Create an E2B sandbox running runtimeuse with the OpenAI agent.""" + if os.environ.get("TEST_WS_URL"): + yield os.environ.get("TEST_WS_URL") + return + try: sandbox, ws_url = create_e2b_runtimeuse(agent="openai") except RuntimeError as exc: @@ -19,6 +29,10 @@ def openai_ws_url(): @pytest.fixture(scope="session") def claude_ws_url(): """Create an E2B sandbox running runtimeuse with the Claude agent.""" + if os.environ.get("TEST_WS_URL"): + yield os.environ.get("TEST_WS_URL") + return + try: sandbox, ws_url = create_e2b_runtimeuse(agent="claude") except RuntimeError as exc: @@ -27,3 +41,69 @@ def claude_ws_url(): yield ws_url sandbox.kill() + + +@pytest.fixture(scope="session") +def s3_client(): + """Boto3 S3 client for artifact upload/download tests.""" + try: + import boto3 + from botocore.config import Config + except ImportError: + pytest.fail("boto3 is required for S3 tests — install with: pip install boto3") + region = os.environ.get("AWS_REGION", "us-east-1") + return boto3.client("s3", region_name=region, config=Config(signature_version="s3v4")) + + +@pytest.fixture +def s3_test_bucket(): + """Read TEST_S3_BUCKET from env; fail if unset.""" + bucket = os.environ.get("TEST_S3_BUCKET") + if not bucket: + pytest.fail("TEST_S3_BUCKET environment variable is not set") + return bucket + + +def wait_for_s3_object( + s3_client, + bucket: str, + key: str, + timeout: float = 30, + poll_interval: float = 2, +) -> bytes: + """Poll S3 until the object exists, then return its body. + + The server-side artifact upload may still be in flight when the + client's query() returns (the presigned-URL PUT races with the + WebSocket close). Polling avoids flaky NoSuchKey failures. + """ + from botocore.exceptions import ClientError + + _logger.info("Waiting for S3 object s3://%s/%s (timeout=%ss)", bucket, key, timeout) + deadline = time.monotonic() + timeout + last_exc: Exception | None = None + attempt = 0 + + while time.monotonic() < deadline: + attempt += 1 + try: + obj = s3_client.get_object(Bucket=bucket, Key=key) + _logger.info("S3 object found on attempt %d", attempt) + return obj["Body"].read() + except ClientError as exc: + if exc.response["Error"]["Code"] != "NoSuchKey": + raise + elapsed = timeout - (deadline - time.monotonic()) + _logger.info( + "S3 object not found (attempt %d, %.1fs elapsed), retrying in %ss", + attempt, + elapsed, + poll_interval, + ) + last_exc = exc + time.sleep(poll_interval) + + raise AssertionError( + f"S3 object s3://{bucket}/{key} did not appear within {timeout}s " + f"after {attempt} attempts" + ) from last_exc diff --git a/packages/runtimeuse-client-python/test/llm/test_claude_advanced.py b/packages/runtimeuse-client-python/test/llm/test_claude_advanced.py new file mode 100644 index 0000000..9327c2d --- /dev/null +++ b/packages/runtimeuse-client-python/test/llm/test_claude_advanced.py @@ -0,0 +1,227 @@ +"""Advanced LLM integration tests for the Claude agent. + +Covers pre/post commands, artifact uploads via real S3 presigned URLs, +and the full invocation lifecycle (download -> pre-cmd -> agent -> post-cmd -> upload). +""" + +import logging +from uuid import uuid4 + +import pytest + +from test.llm.conftest import wait_for_s3_object + +from src.runtimeuse_client import ( + RuntimeUseClient, + QueryOptions, + QueryResult, + TextResult, + AssistantMessageInterface, + ArtifactUploadRequestMessageInterface, + ArtifactUploadResult, + AgentRuntimeError, + CommandInterface, + RuntimeEnvironmentDownloadableInterface, +) + +pytestmark = [pytest.mark.llm, pytest.mark.asyncio] + +_logger = logging.getLogger(__name__) + +MODEL = "claude-sonnet-4-20250514" + + +class TestClaudePrePostCommands: + async def test_pre_and_post_commands_run(self, claude_ws_url: str): + received: list[AssistantMessageInterface] = [] + + async def on_msg(msg: AssistantMessageInterface): + received.append(msg) + + client = RuntimeUseClient(ws_url=claude_ws_url) + result = await client.query( + prompt="Say the word 'acknowledged'", + options=QueryOptions( + system_prompt="Reply concisely in plain text.", + model=MODEL, + pre_agent_invocation_commands=[ + CommandInterface( + command="echo 'pre-command ran' > /tmp/pre-sentinel.txt && echo pre-sentinel", + cwd="/tmp", + ) + ], + post_agent_invocation_commands=[ + CommandInterface( + command="echo 'post-command ran' > /tmp/post-sentinel.txt && echo post-sentinel", + cwd="/tmp", + ) + ], + on_assistant_message=on_msg, + timeout=60, + ), + ) + + assert isinstance(result, QueryResult) + assert isinstance(result.data, TextResult) + + all_text = [block for msg in received for block in msg.text_blocks] + assert any( + "pre-sentinel" in t for t in all_text + ), "pre-command output should appear in streamed messages" + assert any( + "post-sentinel" in t for t in all_text + ), "post-command output should appear in streamed messages" + + async def test_failed_pre_command_raises_error(self, claude_ws_url: str): + client = RuntimeUseClient(ws_url=claude_ws_url) + with pytest.raises(AgentRuntimeError, match="failed with exit code"): + await client.query( + prompt="This prompt should never reach the agent", + options=QueryOptions( + system_prompt="Reply concisely.", + model=MODEL, + pre_agent_invocation_commands=[CommandInterface(command="exit 1")], + timeout=30, + ), + ) + + +class TestClaudeArtifactsS3: + async def test_artifact_uploaded_to_s3( + self, claude_ws_url: str, s3_client, s3_test_bucket: str + ): + run_id = str(uuid4()) + artifact_key = f"test-artifacts/{run_id}/output.txt" + artifacts_dir = f"/tmp/test-artifacts-{run_id}" + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + presigned = s3_client.generate_presigned_url( + "put_object", + Params={"Bucket": s3_test_bucket, "Key": artifact_key}, + ExpiresIn=300, + ) + _logger.info( + "Returning presigned URL for %s: %s", + req.filename, + presigned, + ) + return ArtifactUploadResult( + presigned_url=presigned, + content_type="text/plain", + ) + + client = RuntimeUseClient(ws_url=claude_ws_url) + result = await client.query( + prompt=( + f"Create the directory {artifacts_dir} and write a file at " + f"{artifacts_dir}/output.txt with the exact content " + "'hello from claude'. Use the bash tool." + ), + options=QueryOptions( + system_prompt="You are a helpful assistant. Execute tasks using tools.", + model=MODEL, + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + timeout=120, + ), + ) + + assert isinstance(result.data, TextResult) + + body = wait_for_s3_object(s3_client, s3_test_bucket, artifact_key) + assert b"hello from claude" in body + + s3_client.delete_object(Bucket=s3_test_bucket, Key=artifact_key) + + +class TestClaudeFullLifecycle: + async def test_full_invocation_ordering( + self, claude_ws_url: str, s3_client, s3_test_bucket: str + ): + run_id = str(uuid4()) + + setup_key = f"test-downloads/{run_id}/setup.sh" + s3_client.put_object( + Bucket=s3_test_bucket, + Key=setup_key, + Body=b"#!/bin/bash\necho setup-payload", + ) + download_url = s3_client.generate_presigned_url( + "get_object", + Params={"Bucket": s3_test_bucket, "Key": setup_key}, + ExpiresIn=300, + ) + + dl_dir = f"/tmp/dl-lifecycle-{run_id}" + artifacts_dir = f"/tmp/art-lifecycle-{run_id}" + artifact_key = f"test-artifacts/{run_id}/result.txt" + + received: list[AssistantMessageInterface] = [] + + async def on_msg(msg: AssistantMessageInterface): + received.append(msg) + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + presigned = s3_client.generate_presigned_url( + "put_object", + Params={"Bucket": s3_test_bucket, "Key": artifact_key}, + ExpiresIn=300, + ) + _logger.info( + "Returning presigned URL for %s: %s", + req.filename, + presigned, + ) + return ArtifactUploadResult( + presigned_url=presigned, + content_type="text/plain", + ) + + client = RuntimeUseClient(ws_url=claude_ws_url) + result = await client.query( + prompt=( + f"Create the directory {artifacts_dir} and write a file at " + f"{artifacts_dir}/result.txt with the content 'lifecycle-result'. " + "Use the bash tool." + ), + options=QueryOptions( + system_prompt="Execute tasks using available tools. Be concise.", + model=MODEL, + pre_agent_downloadables=[ + RuntimeEnvironmentDownloadableInterface( + download_url=download_url, + working_dir=dl_dir, + ) + ], + pre_agent_invocation_commands=[ + CommandInterface(command=f"cat {dl_dir}/setup.sh") + ], + post_agent_invocation_commands=[ + CommandInterface(command="echo lifecycle-done") + ], + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + on_assistant_message=on_msg, + timeout=120, + ), + ) + + assert isinstance(result.data, TextResult) + + all_text = [block for msg in received for block in msg.text_blocks] + assert any( + "setup-payload" in t for t in all_text + ), "pre-command should have cat'd the downloaded file" + assert any( + "lifecycle-done" in t for t in all_text + ), "post-command should have run after the agent" + + body = wait_for_s3_object(s3_client, s3_test_bucket, artifact_key) + assert b"lifecycle-result" in body + + s3_client.delete_object(Bucket=s3_test_bucket, Key=setup_key) + s3_client.delete_object(Bucket=s3_test_bucket, Key=artifact_key) diff --git a/packages/runtimeuse-client-python/test/llm/test_openai_advanced.py b/packages/runtimeuse-client-python/test/llm/test_openai_advanced.py new file mode 100644 index 0000000..6e01af0 --- /dev/null +++ b/packages/runtimeuse-client-python/test/llm/test_openai_advanced.py @@ -0,0 +1,227 @@ +"""Advanced LLM integration tests for the OpenAI agent. + +Covers pre/post commands, artifact uploads via real S3 presigned URLs, +and the full invocation lifecycle (download -> pre-cmd -> agent -> post-cmd -> upload). +""" + +import logging +from uuid import uuid4 + +import pytest + +from test.llm.conftest import wait_for_s3_object + +from src.runtimeuse_client import ( + RuntimeUseClient, + QueryOptions, + QueryResult, + TextResult, + AssistantMessageInterface, + ArtifactUploadRequestMessageInterface, + ArtifactUploadResult, + AgentRuntimeError, + CommandInterface, + RuntimeEnvironmentDownloadableInterface, +) + +pytestmark = [pytest.mark.llm, pytest.mark.asyncio] + +_logger = logging.getLogger(__name__) + +MODEL = "gpt-5.1" + + +class TestOpenAIPrePostCommands: + async def test_pre_and_post_commands_run(self, openai_ws_url: str): + received: list[AssistantMessageInterface] = [] + + async def on_msg(msg: AssistantMessageInterface): + received.append(msg) + + client = RuntimeUseClient(ws_url=openai_ws_url) + result = await client.query( + prompt="Say the word 'acknowledged'", + options=QueryOptions( + system_prompt="Reply concisely in plain text.", + model=MODEL, + pre_agent_invocation_commands=[ + CommandInterface( + command="echo 'pre-command ran' > /tmp/pre-sentinel.txt && echo pre-sentinel", + cwd="/tmp", + ) + ], + post_agent_invocation_commands=[ + CommandInterface( + command="echo 'post-command ran' > /tmp/post-sentinel.txt && echo post-sentinel", + cwd="/tmp", + ) + ], + on_assistant_message=on_msg, + timeout=60, + ), + ) + + assert isinstance(result, QueryResult) + assert isinstance(result.data, TextResult) + + all_text = [block for msg in received for block in msg.text_blocks] + assert any( + "pre-sentinel" in t for t in all_text + ), "pre-command output should appear in streamed messages" + assert any( + "post-sentinel" in t for t in all_text + ), "post-command output should appear in streamed messages" + + async def test_failed_pre_command_raises_error(self, openai_ws_url: str): + client = RuntimeUseClient(ws_url=openai_ws_url) + with pytest.raises(AgentRuntimeError, match="failed with exit code"): + await client.query( + prompt="This prompt should never reach the agent", + options=QueryOptions( + system_prompt="Reply concisely.", + model=MODEL, + pre_agent_invocation_commands=[CommandInterface(command="exit 1")], + timeout=30, + ), + ) + + +class TestOpenAIArtifactsS3: + async def test_artifact_uploaded_to_s3( + self, openai_ws_url: str, s3_client, s3_test_bucket: str + ): + run_id = str(uuid4()) + artifact_key = f"test-artifacts/{run_id}/output.txt" + artifacts_dir = f"/tmp/test-artifacts-{run_id}" + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + presigned = s3_client.generate_presigned_url( + "put_object", + Params={"Bucket": s3_test_bucket, "Key": artifact_key}, + ExpiresIn=300, + ) + _logger.info( + "Returning presigned URL for %s: %s", + req.filename, + presigned, + ) + return ArtifactUploadResult( + presigned_url=presigned, + content_type="text/plain", + ) + + client = RuntimeUseClient(ws_url=openai_ws_url) + result = await client.query( + prompt=( + f"Create the directory {artifacts_dir} and write a file at " + f"{artifacts_dir}/output.txt with the exact content " + "'hello from openai'. Use the bash tool." + ), + options=QueryOptions( + system_prompt="You are a helpful assistant. Execute tasks using tools.", + model=MODEL, + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + timeout=120, + ), + ) + + assert isinstance(result.data, TextResult) + + body = wait_for_s3_object(s3_client, s3_test_bucket, artifact_key) + assert b"hello from openai" in body + + s3_client.delete_object(Bucket=s3_test_bucket, Key=artifact_key) + + +class TestOpenAIFullLifecycle: + async def test_full_invocation_ordering( + self, openai_ws_url: str, s3_client, s3_test_bucket: str + ): + run_id = str(uuid4()) + + setup_key = f"test-downloads/{run_id}/setup.sh" + s3_client.put_object( + Bucket=s3_test_bucket, + Key=setup_key, + Body=b"#!/bin/bash\necho setup-payload", + ) + download_url = s3_client.generate_presigned_url( + "get_object", + Params={"Bucket": s3_test_bucket, "Key": setup_key}, + ExpiresIn=300, + ) + + dl_dir = f"/tmp/dl-lifecycle-{run_id}" + artifacts_dir = f"/tmp/art-lifecycle-{run_id}" + artifact_key = f"test-artifacts/{run_id}/result.txt" + + received: list[AssistantMessageInterface] = [] + + async def on_msg(msg: AssistantMessageInterface): + received.append(msg) + + async def on_artifact( + req: ArtifactUploadRequestMessageInterface, + ) -> ArtifactUploadResult: + presigned = s3_client.generate_presigned_url( + "put_object", + Params={"Bucket": s3_test_bucket, "Key": artifact_key}, + ExpiresIn=300, + ) + _logger.info( + "Returning presigned URL for %s: %s", + req.filename, + presigned, + ) + return ArtifactUploadResult( + presigned_url=presigned, + content_type="text/plain", + ) + + client = RuntimeUseClient(ws_url=openai_ws_url) + result = await client.query( + prompt=( + f"Create the directory {artifacts_dir} and write a file at " + f"{artifacts_dir}/result.txt with the content 'lifecycle-result'. " + "Use the bash tool." + ), + options=QueryOptions( + system_prompt="Execute tasks using available tools. Be concise.", + model=MODEL, + pre_agent_downloadables=[ + RuntimeEnvironmentDownloadableInterface( + download_url=download_url, + working_dir=dl_dir, + ) + ], + pre_agent_invocation_commands=[ + CommandInterface(command=f"cat {dl_dir}/setup.sh") + ], + post_agent_invocation_commands=[ + CommandInterface(command="echo lifecycle-done") + ], + artifacts_dir=artifacts_dir, + on_artifact_upload_request=on_artifact, + on_assistant_message=on_msg, + timeout=120, + ), + ) + + assert isinstance(result.data, TextResult) + + all_text = [block for msg in received for block in msg.text_blocks] + assert any( + "setup-payload" in t for t in all_text + ), "pre-command should have cat'd the downloaded file" + assert any( + "lifecycle-done" in t for t in all_text + ), "post-command should have run after the agent" + + body = wait_for_s3_object(s3_client, s3_test_bucket, artifact_key) + assert b"lifecycle-result" in body + + s3_client.delete_object(Bucket=s3_test_bucket, Key=setup_key) + s3_client.delete_object(Bucket=s3_test_bucket, Key=artifact_key) diff --git a/packages/runtimeuse-client-python/test/sandbox_factories/e2b.py b/packages/runtimeuse-client-python/test/sandbox_factories/e2b.py index 576d39d..e6db777 100644 --- a/packages/runtimeuse-client-python/test/sandbox_factories/e2b.py +++ b/packages/runtimeuse-client-python/test/sandbox_factories/e2b.py @@ -68,8 +68,15 @@ def create_e2b_runtimeuse( template = ( Template() .from_node_image("lts") - .apt_install(["unzip"]) + .apt_install(["unzip", "openssh-server"]) .npm_install(["@anthropic-ai/claude-code"], g=True) + .run_cmd( + [ + "curl -fsSL -o /usr/local/bin/websocat https://github.com/vi/websocat/releases/latest/download/websocat.x86_64-unknown-linux-musl", + "chmod a+x /usr/local/bin/websocat", + ], + user="root", + ) .set_envs(envs) .set_start_cmd(start_cmd, wait_for_port(8080)) ) @@ -84,6 +91,17 @@ def create_e2b_runtimeuse( sandbox = Sandbox.create(template=alias, api_key=e2b_api_key) + # start ssh server in the background + sandbox.commands.run( + "sudo websocat -b --exit-on-eof ws-l:0.0.0.0:8081 tcp:127.0.0.1:22", + background=True, + timeout=0, + ) + host = sandbox.get_host(8081) + _logger.info( + f"SSH server connection string: ssh -o 'ProxyCommand=websocat --binary -B 65536 - wss://8081-%h.e2b.app' user@{sandbox.sandbox_id}" + ) + host = sandbox.get_host(8080) ws_url = f"wss://{host}" diff --git a/packages/runtimeuse-client-python/test/test_client.py b/packages/runtimeuse-client-python/test/test_client.py index 3801d83..3a19113 100644 --- a/packages/runtimeuse-client-python/test/test_client.py +++ b/packages/runtimeuse-client-python/test/test_client.py @@ -210,7 +210,10 @@ async def on_artifact( await client.query( prompt=DEFAULT_PROMPT, - options=make_query_options(on_artifact_upload_request=on_artifact), + options=make_query_options( + artifacts_dir="/tmp/artifacts", + on_artifact_upload_request=on_artifact, + ), ) response_msgs = [ @@ -446,3 +449,21 @@ def test_accepts_ws_url(self): def test_accepts_transport(self, fake_transport): transport, client = fake_transport([]) assert client is not None + + def test_artifacts_dir_requires_callback(self, make_query_options): + with pytest.raises(ValueError, match="must be specified together"): + make_query_options(artifacts_dir="/tmp/artifacts") + + async def _dummy_cb(req): + return ArtifactUploadResult( + presigned_url="https://example.com", content_type="text/plain" + ) + + with pytest.raises(ValueError, match="must be specified together"): + make_query_options(on_artifact_upload_request=_dummy_cb) + + opts = make_query_options( + artifacts_dir="/tmp/artifacts", on_artifact_upload_request=_dummy_cb + ) + assert opts.artifacts_dir == "/tmp/artifacts" + assert opts.on_artifact_upload_request is _dummy_cb diff --git a/packages/runtimeuse/src/artifact-manager.ts b/packages/runtimeuse/src/artifact-manager.ts index 37d604b..8131481 100644 --- a/packages/runtimeuse/src/artifact-manager.ts +++ b/packages/runtimeuse/src/artifact-manager.ts @@ -35,6 +35,7 @@ export class ArtifactManager { this.uploadTracker = config.uploadTracker; this.send = config.send; + fs.mkdirSync(config.artifactsDir, { recursive: true }); this.reloadIgnorePatterns(); this.watcher = chokidar.watch(config.artifactsDir, { @@ -104,6 +105,7 @@ export class ArtifactManager { } async stopWatching(): Promise { + this.logger.log("Stopping artifact watcher"); await this.watcher.close(); } diff --git a/packages/runtimeuse/src/claude-handler.ts b/packages/runtimeuse/src/claude-handler.ts index f577254..65273c7 100644 --- a/packages/runtimeuse/src/claude-handler.ts +++ b/packages/runtimeuse/src/claude-handler.ts @@ -40,6 +40,20 @@ export const claudeHandler: AgentHandler = { tools: { type: "preset", preset: "claude_code" }, permissionMode: "bypassPermissions", allowDangerouslySkipPermissions: true, + hooks: { + PostToolUse: [ + { + hooks: [ + async (input: Record) => { + invocation.logger.log( + `[PostToolUse] tool=${input.tool_name} input=${JSON.stringify(input.tool_input)} response=${JSON.stringify(input.tool_response)}`, + ); + return {}; + }, + ], + }, + ], + }, }; if (invocation.outputFormat) { queryOptions.outputFormat = invocation.outputFormat; diff --git a/packages/runtimeuse/src/index.ts b/packages/runtimeuse/src/index.ts index 76885de..24187cd 100644 --- a/packages/runtimeuse/src/index.ts +++ b/packages/runtimeuse/src/index.ts @@ -52,7 +52,7 @@ export type { // Utilities export { redactSecrets, sleep } from "./utils.js"; -export { createLogger, defaultLogger } from "./logger.js"; +export { createLogger, createRedactingLogger, defaultLogger } from "./logger.js"; export type { Logger } from "./logger.js"; // Constants diff --git a/packages/runtimeuse/src/logger.ts b/packages/runtimeuse/src/logger.ts index ea3ab63..f7450ae 100644 --- a/packages/runtimeuse/src/logger.ts +++ b/packages/runtimeuse/src/logger.ts @@ -1,3 +1,5 @@ +import { redactSecrets } from "./utils.js"; + export interface Logger { log(...args: unknown[]): void; error(...args: unknown[]): void; @@ -13,6 +15,19 @@ export function createLogger(sourceId: string): Logger { }; } +export function createRedactingLogger( + inner: Logger, + secrets: string[], +): Logger { + const redact = (args: unknown[]) => + args.map((a) => redactSecrets(a, secrets)); + return { + log: (...args: unknown[]) => inner.log(...redact(args)), + error: (...args: unknown[]) => inner.error(...redact(args)), + debug: (...args: unknown[]) => inner.debug(...redact(args)), + }; +} + export const defaultLogger: Logger = { log: (...args: unknown[]) => console.log(...args), error: (...args: unknown[]) => console.error(...args), diff --git a/packages/runtimeuse/src/openai-handler.ts b/packages/runtimeuse/src/openai-handler.ts index d565297..2516898 100644 --- a/packages/runtimeuse/src/openai-handler.ts +++ b/packages/runtimeuse/src/openai-handler.ts @@ -1,12 +1,16 @@ +import { exec } from "node:child_process"; + import zod from "zod"; import { Agent, run as runAgent, codeInterpreterTool, + shellTool, webSearchTool, AgentOutputType, } from "@openai/agents"; +import type { Shell, ShellAction, ShellResult } from "@openai/agents"; import type { AgentHandler, AgentInvocation, @@ -14,6 +18,40 @@ import type { MessageSender, } from "./agent-handler.js"; +const localShell: Shell = { + async run(action: ShellAction): Promise { + const output = []; + for (const cmd of action.commands) { + const result = await new Promise<{ + stdout: string; + stderr: string; + exitCode: number | null; + }>((resolve) => { + exec( + cmd, + { timeout: action.timeoutMs ?? 30_000 }, + (error, stdout, stderr) => { + resolve({ + stdout: stdout ?? "", + stderr: stderr ?? "", + exitCode: error ? (error.code as number | undefined) ?? 1 : 0, + }); + }, + ); + }); + output.push({ + stdout: result.stdout, + stderr: result.stderr, + outcome: { + type: "exit" as const, + exitCode: result.exitCode, + }, + }); + } + return { output }; + }, +}; + export const openaiHandler: AgentHandler = { async run( invocation: AgentInvocation, @@ -23,7 +61,14 @@ export const openaiHandler: AgentHandler = { name: "runtimeuse-agent", instructions: invocation.systemPrompt, model: invocation.model, - tools: [codeInterpreterTool(), webSearchTool()], + tools: [ + webSearchTool(), + shellTool({ + environment: { type: "local" }, + shell: localShell, + needsApproval: async () => false, + }), + ], }; if (invocation.outputFormat) { @@ -33,6 +78,20 @@ export const openaiHandler: AgentHandler = { const agent = new Agent(agentConfig); + agent.on("agent_tool_start", (_ctx, tool, details) => { + const tc = details.toolCall as Record; + invocation.logger.log( + `[ToolStart] tool=${tc.name ?? tool.name} input=${tc.arguments ?? ""}`, + ); + }); + + agent.on("agent_tool_end", (_ctx, tool, result, details) => { + const tc = details.toolCall as Record; + invocation.logger.log( + `[ToolEnd] tool=${tc.name ?? tool.name} result=${result}`, + ); + }); + const result = await runAgent(agent, invocation.userPrompt, { signal: invocation.signal, stream: true, diff --git a/packages/runtimeuse/src/session.test.ts b/packages/runtimeuse/src/session.test.ts index 06b4ec4..6e71421 100644 --- a/packages/runtimeuse/src/session.test.ts +++ b/packages/runtimeuse/src/session.test.ts @@ -349,6 +349,63 @@ describe("WebSocketSession", () => { }); }); + describe("secrets redaction", () => { + it("redacts secrets from result messages", async () => { + mockHandlerRun.mockResolvedValue({ + type: "text", + text: "the key is secret123 here", + metadata: { info: "secret123 leaked" }, + } as AgentResult); + + const { session, ws } = createSession(); + const done = session.run(); + sendMessage(ws, INVOCATION_MSG); + await done; + + const sent = parseSentMessages(ws); + const result = sent.find((m) => m.message_type === "result_message"); + expect(result).toBeDefined(); + expect(result!.data.text).toBe("the key is [REDACTED] here"); + expect(result!.metadata.info).toBe("[REDACTED] leaked"); + }); + + it("redacts secrets from assistant messages", async () => { + mockHandlerRun.mockImplementation(async (_inv, sender) => { + sender.sendAssistantMessage(["password is secret123"]); + return { type: "text", text: "done" } as AgentResult; + }); + + const { session, ws } = createSession(); + const done = session.run(); + sendMessage(ws, INVOCATION_MSG); + await done; + + const sent = parseSentMessages(ws); + const assistant = sent.find( + (m) => m.message_type === "assistant_message", + ); + expect(assistant).toBeDefined(); + expect(assistant!.text_blocks[0]).toBe("password is [REDACTED]"); + }); + + it("redacts secrets from error messages", async () => { + mockHandlerRun.mockRejectedValueOnce( + new Error("crash with secret123 in trace"), + ); + + const { session, ws } = createSession(); + const done = session.run(); + sendMessage(ws, INVOCATION_MSG); + await done; + + const sent = parseSentMessages(ws); + const error = sent.find((m) => m.message_type === "error_message"); + expect(error).toBeDefined(); + expect(error!.error).not.toContain("secret123"); + expect(error!.error).toContain("[REDACTED]"); + }); + }); + describe("pre-agent invocation commands", () => { it("continues to agent when pre-agent command exits with 0", async () => { mockCommandExecute.mockResolvedValueOnce({ exitCode: 0 }); diff --git a/packages/runtimeuse/src/session.ts b/packages/runtimeuse/src/session.ts index d3881b4..043adbd 100644 --- a/packages/runtimeuse/src/session.ts +++ b/packages/runtimeuse/src/session.ts @@ -4,8 +4,8 @@ import type { AgentHandler } from "./agent-handler.js"; import { ArtifactManager } from "./artifact-manager.js"; import type { UploadTracker } from "./upload-tracker.js"; import type { InvocationMessage, IncomingMessage, OutgoingMessage } from "./types.js"; -import { sleep } from "./utils.js"; -import { createLogger, defaultLogger, type Logger } from "./logger.js"; +import { redactSecrets, sleep } from "./utils.js"; +import { createLogger, createRedactingLogger, defaultLogger, type Logger } from "./logger.js"; import { InvocationRunner } from "./invocation-runner.js"; export interface SessionConfig { @@ -23,6 +23,7 @@ export class WebSocketSession { private readonly abortController = new AbortController(); private artifactManager: ArtifactManager | null = null; private invocationReceived = false; + private secrets: string[] = []; private logger: Logger; constructor(ws: WebSocket, config: SessionConfig) { @@ -106,7 +107,9 @@ export class WebSocketSession { } this.invocationReceived = true; await this.executeInvocation(message); - if (process.env.NODE_ENV !== "test") { + const hasArtifacts = this.artifactManager !== null; + if (process.env.NODE_ENV !== "test" || hasArtifacts) { + this.logger.log("Waiting for post-invocation delay..."); await sleep(this.config.postInvocationDelayMs ?? 3_000); } await this.finalize(); @@ -117,7 +120,8 @@ export class WebSocketSession { private async executeInvocation(message: InvocationMessage): Promise { const sourceId = message.source_id ?? crypto.randomUUID(); - this.logger = createLogger(sourceId); + this.secrets = message.secrets_to_redact ?? []; + this.logger = createRedactingLogger(createLogger(sourceId), this.secrets); this.config.uploadTracker.setLogger(this.logger); this.logger.log(`Received invocation: model=${message.model}`); @@ -175,7 +179,7 @@ export class WebSocketSession { private send(data: OutgoingMessage): void { if (this.ws.readyState === WebSocket.OPEN) { - this.ws.send(JSON.stringify(data)); + this.ws.send(JSON.stringify(redactSecrets(data, this.secrets))); } } } diff --git a/packages/runtimeuse/src/storage.ts b/packages/runtimeuse/src/storage.ts index d5f1063..1e3b895 100644 --- a/packages/runtimeuse/src/storage.ts +++ b/packages/runtimeuse/src/storage.ts @@ -18,9 +18,17 @@ export async function uploadFile( body: content, }); - logger.log(`File ${filePath} uploaded with status: ${response.status}`); + if (response.ok) { + logger.log( + `Upload succeeded for ${filePath} with status ${response.status} ${response.statusText}`, + ); + } else { + logger.error( + `Upload failed for ${filePath} with status ${response.status} ${response.statusText}`, + ); + } - return response.status === 200; + return response.ok; } catch (error) { logger.error("Upload error:", error); return false;