Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion packages/uipath-agent-framework/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[project]
name = "uipath-agent-framework"
version = "0.0.1"
version = "0.0.2"
description = "Python SDK that enables developers to build and deploy Microsoft Agent Framework agents to the UiPath Cloud Platform"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"agent-framework-core>=1.0.0b260212",
"aiosqlite>=0.20.0",
"openinference-instrumentation-agent-framework>=0.1.0",
"uipath>=2.8.41, <2.9.0",
"uipath-runtime>=0.9.0, <0.10.0",
Expand Down Expand Up @@ -91,6 +92,14 @@ module = "openinference.*"
ignore_missing_imports = true
ignore_errors = true

[[tool.mypy.overrides]]
module = "aiosqlite.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["anthropic", "anthropic.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "agent_framework_anthropic.*"
ignore_missing_imports = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,4 @@ dev = [
[tool.uv]
prerelease = "allow"

[tool.uv.sources]
uipath-dev = { path = "../../../../../uipath-dev-python", editable = true }
uipath-agent-framework = { path = "../../", editable = true }

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __new__(
_check_anthropic_dependency()

from agent_framework_anthropic import AnthropicClient
from anthropic import AsyncAnthropic # type: ignore[import-not-found]
from anthropic import AsyncAnthropic # type: ignore[import-untyped]

uipath_url, token = get_uipath_config()
gateway_url = build_gateway_url("awsbedrock", model, uipath_url)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Factory for creating Agent Framework runtimes from agent_framework.json configuration."""

import asyncio
import os
from typing import Any

from agent_framework import BaseAgent
Expand All @@ -25,6 +26,7 @@
)
from uipath_agent_framework.runtime.loader import AgentFrameworkAgentLoader
from uipath_agent_framework.runtime.runtime import UiPathAgentFrameworkRuntime
from uipath_agent_framework.runtime.storage import SqliteSessionStore


class UiPathAgentFrameworkRuntimeFactory:
Expand All @@ -47,6 +49,9 @@ def __init__(
self._agent_loaders: dict[str, AgentFrameworkAgentLoader] = {}
self._agent_lock = asyncio.Lock()

self._session_store: SqliteSessionStore | None = None
self._session_store_lock = asyncio.Lock()

self._setup_instrumentation()

def _setup_instrumentation(self) -> None:
Expand All @@ -64,6 +69,32 @@ def _load_config(self) -> AgentFrameworkConfig:
self._config = AgentFrameworkConfig()
return self._config

def _get_db_path(self) -> str:
"""Get the database path for session persistence.

Uses UiPathRuntimeContext to resolve the state file path.
Cleans up stale state files when not resuming.
"""
path = self.context.resolved_state_file_path
# Delete previous state file if not resuming
if (
not self.context.resume
and self.context.job_id is None
and not self.context.keep_state_file
):
if os.path.exists(path):
os.remove(path)
return path

async def _get_session_store(self) -> SqliteSessionStore:
"""Get or create the shared session store instance."""
async with self._session_store_lock:
if self._session_store is None:
db_path = self._get_db_path()
self._session_store = SqliteSessionStore(db_path)
await self._session_store.setup()
return self._session_store

async def _load_agent(self, entrypoint: str) -> BaseAgent:
"""
Load an agent for the given entrypoint.
Expand Down Expand Up @@ -182,11 +213,19 @@ async def _create_runtime_instance(
runtime_id: str,
entrypoint: str,
) -> UiPathRuntimeProtocol:
"""Create a runtime instance from an agent."""
"""Create a runtime instance from an agent.

Creates the runtime with a shared SqliteSessionStore for persistent
conversation history. Sessions are isolated by runtime_id — each
runtime instance gets its own conversation state.
"""
session_store = await self._get_session_store()

return UiPathAgentFrameworkRuntime(
agent=agent,
runtime_id=runtime_id,
entrypoint=entrypoint,
session_store=session_store,
)

async def new_runtime(
Expand Down Expand Up @@ -218,3 +257,7 @@ async def dispose(self) -> None:

self._agent_loaders.clear()
self._agent_cache.clear()

if self._session_store:
await self._session_store.dispose()
self._session_store = None
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
Content,
FunctionTool,
Expand Down Expand Up @@ -36,6 +37,7 @@
get_agent_tools,
get_entrypoints_schema,
)
from .storage import SqliteSessionStore

logger = logging.getLogger(__name__)

Expand All @@ -48,11 +50,13 @@ def __init__(
agent: BaseAgent,
runtime_id: str | None = None,
entrypoint: str | None = None,
session_store: SqliteSessionStore | None = None,
):
self.agent: BaseAgent = agent
self.runtime_id: str = runtime_id or "default"
self.entrypoint: str | None = entrypoint
self.chat = AgentFrameworkChatMessagesMapper()
self._session_store = session_store

@staticmethod
def _build_agent_tool_names(agent: BaseAgent) -> set[str]:
Expand Down Expand Up @@ -84,6 +88,30 @@ def _build_tool_name_to_agent(agent: BaseAgent) -> dict[str, str]:
mapping[tool.name] = agent_name
return mapping

async def _load_session(self) -> AgentSession:
"""Load or create an AgentSession for this runtime_id.

If a session store is configured, loads the persisted session state.
Otherwise creates a fresh session each time.
"""
if self._session_store:
session_data = await self._session_store.load_session(self.runtime_id)
if session_data is not None:
logger.debug(
"Restoring session from store for runtime_id=%s",
self.runtime_id,
)
return AgentSession.from_dict(session_data) # type: ignore[attr-defined]

return self.agent.create_session(session_id=self.runtime_id) # type: ignore[attr-defined]

async def _save_session(self, session: AgentSession) -> None:
"""Persist the session state after execution."""
if self._session_store:
session_data = session.to_dict() # type: ignore[attr-defined]
await self._session_store.save_session(self.runtime_id, session_data)
logger.debug("Saved session to store for runtime_id=%s", self.runtime_id)

async def execute(
self,
input: dict[str, Any] | None = None,
Expand All @@ -92,7 +120,9 @@ async def execute(
"""Execute the agent with the provided input and return the result."""
try:
user_input = self._prepare_input(input)
response = await self.agent.run(user_input) # type: ignore[attr-defined]
session = await self._load_session()
response = await self.agent.run(user_input, session=session) # type: ignore[attr-defined]
await self._save_session(session)
output = self._extract_output(response)
return self._create_success_result(output)
except Exception as e:
Expand All @@ -115,6 +145,7 @@ async def stream(
"""
try:
user_input = self._prepare_input(input)
session = await self._load_session()
agent_name = self.agent.name or "agent"

# Pre-compute which tool names correspond to sub-agents
Expand All @@ -132,7 +163,7 @@ async def stream(
active_tools: str | None = None
final_text = ""

response_stream = self.agent.run(user_input, stream=True) # type: ignore[attr-defined]
response_stream = self.agent.run(user_input, stream=True, session=session) # type: ignore[attr-defined]
async for update in response_stream:
if not isinstance(update, AgentResponseUpdate):
continue
Expand Down Expand Up @@ -290,6 +321,9 @@ async def stream(
for msg_event in self.chat.close_message():
yield UiPathRuntimeMessageEvent(payload=msg_event)

# Persist session state after streaming completes
await self._save_session(session)

# Get final response
final_response = await response_stream.get_final_response()
output = self._extract_output(final_response)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""SQLite session store for Agent Framework agents.

Persists AgentSession state between turns using SQLite, keyed by runtime_id.
Each runtime_id maps to an isolated session — conversation history accumulates
across calls via the InMemoryHistoryProvider that Agent Framework auto-injects.
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
from typing import Any

import aiosqlite

logger = logging.getLogger(__name__)


class SqliteSessionStore:
"""SQLite-backed store for Agent Framework session state.

Stores serialized AgentSession dicts (via to_dict/from_dict) in a single
table, keyed by runtime_id. Thread-safe via asyncio lock.
"""

def __init__(self, db_path: str) -> None:
self.db_path = db_path
self._conn: aiosqlite.Connection | None = None
self._lock = asyncio.Lock()
self._initialized = False

async def setup(self) -> None:
"""Ensure storage directory and database table exist."""
dir_name = os.path.dirname(self.db_path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)

conn = await self._get_conn()
async with self._lock:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS sessions (
runtime_id TEXT PRIMARY KEY,
session_data TEXT NOT NULL
)
"""
)
await conn.commit()
self._initialized = True
logger.debug("Session store initialized at %s", self.db_path)

async def _get_conn(self) -> aiosqlite.Connection:
"""Get or create the database connection."""
if self._conn is None:
self._conn = await aiosqlite.connect(self.db_path, timeout=30.0)
await self._conn.execute("PRAGMA journal_mode=WAL")
await self._conn.execute("PRAGMA busy_timeout=30000")
await self._conn.execute("PRAGMA synchronous=NORMAL")
await self._conn.commit()
return self._conn

async def load_session(self, runtime_id: str) -> dict[str, Any] | None:
"""Load a serialized session dict for the given runtime_id.

Returns None if no session exists for this runtime_id.
"""
if not self._initialized:
await self.setup()

conn = await self._get_conn()
async with self._lock:
cursor = await conn.execute(
"SELECT session_data FROM sessions WHERE runtime_id = ?",
(runtime_id,),
)
row = await cursor.fetchone()

if not row:
logger.debug("No session found for runtime_id=%s", runtime_id)
return None

logger.debug("Loaded session for runtime_id=%s", runtime_id)
return json.loads(row[0])

async def save_session(self, runtime_id: str, session_data: dict[str, Any]) -> None:
"""Save a serialized session dict for the given runtime_id."""
if not self._initialized:
await self.setup()

data_json = json.dumps(session_data)
conn = await self._get_conn()
async with self._lock:
await conn.execute(
"""
INSERT INTO sessions (runtime_id, session_data)
VALUES (?, ?)
ON CONFLICT(runtime_id) DO UPDATE SET
session_data = excluded.session_data
""",
(runtime_id, data_json),
)
await conn.commit()

logger.debug("Saved session for runtime_id=%s", runtime_id)

async def dispose(self) -> None:
"""Close the database connection."""
if self._conn:
await self._conn.close()
self._conn = None
self._initialized = False


__all__ = ["SqliteSessionStore"]
Loading