Skip to content

Commit 4077dd4

Browse files
committed
RDBC-934 Streaming
1 parent ec7d868 commit 4077dd4

File tree

2 files changed

+78
-11
lines changed

2 files changed

+78
-11
lines changed

ravendb/documents/ai/ai_conversation.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class AiHandleErrorStrategy:
2222
SEND_ERRORS_TO_MODEL = "SendErrorsToModel"
2323
RAISE_IMMEDIATELY = "RaiseImmediately"
2424

25+
2526
class UnhandledActionEventArgs:
2627
def __init__(self, sender: AiConversation, action: AiAgentActionRequest):
2728
self.sender = sender
@@ -120,28 +121,35 @@ def add_action_response(self, action_id: str, action_response: str) -> None:
120121

121122
self._action_responses.append(response)
122123

123-
def run(self, answer_type: type = dict) -> AiAnswer:
124+
def run(self) -> AiAnswer:
124125
"""
125126
Executes the conversation loop, automatically handling action requests
126127
until the conversation is complete or no handlers are available.
127128
128-
Args:
129-
answer_type: The expected type of the answer (default: dict)
130-
131129
Returns:
132130
AiAnswer with the final response, status, usage, and elapsed time
133131
"""
134132
while True:
135-
r = self._run_internal(answer_type)
133+
r = self._run_internal()
136134
if self._handle_server_reply(r):
137135
return r
138136

139-
def _run_internal(self, answer_type: type = dict) -> AiAnswer:
137+
def stream(self, stream_property_path: str = None, on_chunk: Optional[Callable[[str], None]] = None) -> AiAnswer:
140138
"""
141-
Internal method that executes a single server call.
139+
Stream the LLM response for the given property and return the final AiAnswer when done.
140+
"""
141+
while True:
142+
r = self._run_internal(stream_property_path=stream_property_path, streamed_chunks_callback=on_chunk)
143+
if self._handle_server_reply(r):
144+
return r
142145

143-
Args:
144-
answer_type: The expected type of the answer
146+
def _run_internal(
147+
self,
148+
stream_property_path: Optional[str] = None,
149+
streamed_chunks_callback: Optional[Callable[[str], None]] = None,
150+
) -> AiAnswer:
151+
"""
152+
Internal method that executes a single server call.
145153
146154
Returns:
147155
AiAnswer from this single turn
@@ -175,6 +183,8 @@ def _run_internal(self, answer_type: type = dict) -> AiAnswer:
175183
action_responses=self._action_responses, # Always send list, even if empty
176184
options=self._options,
177185
change_vector=self._change_vector,
186+
stream_property_path=stream_property_path,
187+
streamed_chunks_callback=streamed_chunks_callback,
178188
)
179189

180190
try:

ravendb/documents/operations/ai/agents/run_conversation_operation.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
from __future__ import annotations
22
import json
33
from dataclasses import dataclass
4-
from typing import Optional, List, Dict, Any, TypeVar, Generic
4+
from typing import Optional, List, Dict, Any, TypeVar, Generic, Callable
55

66
from ravendb.documents.operations.definitions import MaintenanceOperation
77
from ravendb.documents.conventions import DocumentConventions
8-
from ravendb.http.raven_command import RavenCommand
8+
from ravendb.http.raven_command import RavenCommand, RavenCommandResponseType
99
from ravendb.http.server_node import ServerNode
1010
import requests
11+
from ravendb.http.misc import ResponseDisposeHandling
12+
1113

1214
TSchema = TypeVar("TSchema")
1315

@@ -205,6 +207,8 @@ def __init__(
205207
action_responses: Optional[List[AiAgentActionResponse]] = None,
206208
options: Optional[AiConversationCreationOptions] = None,
207209
change_vector: Optional[str] = None,
210+
stream_property_path: Optional[str] = None,
211+
streamed_chunks_callback: Optional[Callable[[str], None]] = None,
208212
):
209213
"""
210214
Initialize a RunConversationOperation.
@@ -216,18 +220,24 @@ def __init__(
216220
action_responses: List of action responses from previous turn
217221
options: Creation options including parameters and expiration
218222
change_vector: Change vector for optimistic concurrency
223+
stream_property_path: Optional response property name to stream
224+
streamed_chunks_callback: Optional callback invoked per streamed chunk
219225
"""
220226
if not agent_id or (isinstance(agent_id, str) and agent_id.isspace()):
221227
raise ValueError("agent_id cannot be None or empty")
222228
if not conversation_id or (isinstance(conversation_id, str) and conversation_id.isspace()):
223229
raise ValueError("conversation_id cannot be None or empty")
230+
if (stream_property_path is None) != (streamed_chunks_callback is None):
231+
raise ValueError("Both stream_property_path and streamed_chunks_callback must be specified together")
224232

225233
self._agent_id = agent_id
226234
self._conversation_id = conversation_id
227235
self._prompt_parts = prompt_parts
228236
self._action_responses = action_responses
229237
self._options = options
230238
self._change_vector = change_vector
239+
self._stream_property_path = stream_property_path
240+
self._streamed_chunks_callback = streamed_chunks_callback
231241

232242
def get_command(self, conventions: DocumentConventions) -> RavenCommand[ConversationResult[TSchema]]:
233243
return RunConversationCommand(
@@ -237,6 +247,8 @@ def get_command(self, conventions: DocumentConventions) -> RavenCommand[Conversa
237247
action_responses=self._action_responses,
238248
options=self._options,
239249
change_vector=self._change_vector,
250+
stream_property_path=self._stream_property_path,
251+
streamed_chunks_callback=self._streamed_chunks_callback,
240252
conventions=conventions,
241253
)
242254

@@ -250,6 +262,8 @@ def __init__(
250262
action_responses: Optional[List[AiAgentActionResponse]] = None,
251263
options: Optional[AiConversationCreationOptions] = None,
252264
change_vector: Optional[str] = None,
265+
stream_property_path: Optional[str] = None,
266+
streamed_chunks_callback: Optional[Callable[[str], None]] = None,
253267
conventions: Optional[DocumentConventions] = None,
254268
):
255269
from ravendb.util.util import RaftIdGenerator
@@ -261,6 +275,8 @@ def __init__(
261275
self._action_responses = action_responses
262276
self._options = options
263277
self._change_vector = change_vector
278+
self._stream_property_path = stream_property_path
279+
self._streamed_chunks_callback = streamed_chunks_callback
264280
self._conventions = conventions
265281
self._raft_id = RaftIdGenerator.dont_care_id()
266282

@@ -286,6 +302,10 @@ def create_request(self, node: ServerNode) -> requests.Request:
286302
if self._change_vector:
287303
url += f"&changeVector={quote(self._change_vector)}"
288304

305+
# Add streaming flags if requested
306+
if self._stream_property_path:
307+
url += f"&streaming=true&streamPropertyPath={quote(self._stream_property_path)}"
308+
289309
# Build request body with correct structure to match .NET client
290310
request_body = ConversationRequestBody(
291311
action_responses=self._action_responses,
@@ -302,6 +322,43 @@ def create_request(self, node: ServerNode) -> requests.Request:
302322
request.data = body
303323
return request
304324

325+
# todo: this should be handled by writing custom set_response_raw method, and ravendcommandresponsetype set to RAW
326+
def process_response(self, cache, response: requests.Response, url) -> ResponseDisposeHandling:
327+
# If not streaming, delegate to the default handler
328+
if not self._stream_property_path:
329+
return super().process_response(cache, response, url)
330+
331+
try:
332+
for line in response.iter_lines(decode_unicode=True):
333+
if not line:
334+
continue
335+
if line.startswith("{"):
336+
response_json = json.loads(line)
337+
self.result = ConversationResult.from_json(response_json)
338+
return ResponseDisposeHandling.AUTOMATIC
339+
# Non-final lines are JSON-encoded strings (e.g. "\\\"chunk\\\"")
340+
try:
341+
chunk = json.loads(line)
342+
except Exception:
343+
chunk = line
344+
if self._streamed_chunks_callback:
345+
self._streamed_chunks_callback(chunk)
346+
# No final JSON object received; set empty result
347+
self.result = ConversationResult()
348+
return ResponseDisposeHandling.AUTOMATIC
349+
finally:
350+
# Response will be closed by RequestExecutor when AUTOMATIC is returned
351+
pass
352+
353+
def send(self, session: requests.Session, request: requests.Request) -> requests.Response:
354+
if self._stream_property_path:
355+
from ravendb.util.request_utils import RequestUtils
356+
357+
prepared_request = session.prepare_request(request)
358+
RequestUtils.remove_zstd_encoding(prepared_request)
359+
return session.send(prepared_request, cert=session.cert, stream=True)
360+
return super().send(session, request)
361+
305362
def set_response(self, response: str, from_cache: bool) -> None:
306363
if response is None:
307364
self.result = ConversationResult() # Uses default constructor with all None values

0 commit comments

Comments
 (0)