Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/google/adk/sessions/database_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,23 @@ async def append_event(self, session: Session, event: Event) -> Event:
if storage_session is None:
raise ValueError(f"Session {session.id} not found.")

# Pre-analyze state deltas to determine which scopes actually need
# write locks. Most events carry only session-scoped state (or no
# state at all), so acquiring FOR UPDATE on app_states / user_states
# unnecessarily serializes all concurrent append_event calls.
state_deltas = (
_session_util.extract_state_delta(event.actions.state_delta)
if event.actions and event.actions.state_delta
else None
)
has_app_delta = bool(state_deltas and state_deltas.get("app"))
has_user_delta = bool(state_deltas and state_deltas.get("user"))

storage_app_state = await _select_required_state(
sql_session=sql_session,
state_model=schema.StorageAppState,
predicates=(schema.StorageAppState.app_name == session.app_name,),
use_row_level_locking=use_row_level_locking,
use_row_level_locking=use_row_level_locking and has_app_delta,
missing_message=(
"App state missing for app_name="
f"{session.app_name!r}. Session state tables should be "
Expand All @@ -568,7 +580,7 @@ async def append_event(self, session: Session, event: Event) -> Event:
schema.StorageUserState.app_name == session.app_name,
schema.StorageUserState.user_id == session.user_id,
),
use_row_level_locking=use_row_level_locking,
use_row_level_locking=use_row_level_locking and has_user_delta,
missing_message=(
"User state missing for app_name="
f"{session.app_name!r}, user_id={session.user_id!r}. "
Expand Down Expand Up @@ -599,11 +611,8 @@ async def append_event(self, session: Session, event: Event) -> Event:
storage_events = [e async for e in result]
session.events = [e.to_event() for e in storage_events]

# Extract state delta
if event.actions and event.actions.state_delta:
state_deltas = _session_util.extract_state_delta(
event.actions.state_delta
)
# Apply state deltas (already extracted above for lock scoping)
if state_deltas is not None:
app_state_delta = state_deltas["app"]
user_state_delta = state_deltas["user"]
session_state_delta = state_deltas["session"]
Expand Down