From 2de994e36df84348e1162a5ce04a62e7a096fccd Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Thu, 17 Jul 2025 18:54:01 +0000 Subject: [PATCH 01/17] Add agents. --- src/api/models/bedrock.py | 18 +- src/api/models/bedrock_agents.py | 299 +++++++++++++++++++++++++++++++ src/api/models/model_manager.py | 40 +++++ src/api/routers/chat.py | 13 +- src/api/routers/model.py | 4 +- src/api/setting.py | 3 + 6 files changed, 365 insertions(+), 12 deletions(-) create mode 100644 src/api/models/bedrock_agents.py create mode 100644 src/api/models/model_manager.py diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 9a8fd3c5..ea061342 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -15,6 +15,7 @@ from starlette.concurrency import run_in_threadpool from api.models.base import BaseChatModel, BaseEmbeddingsModel +from api.models.model_manager import ModelManager from api.schema import ( AssistantMessage, ChatRequest, @@ -158,6 +159,8 @@ def list_bedrock_models() -> dict: # Add application inference profiles if model_id in app_profile_dict: model_list[app_profile_dict[model_id]] = {"modalities": input_modalities} + + return model_list except Exception as e: logger.error(f"Unable to list models: {str(e)}") @@ -169,22 +172,21 @@ def list_bedrock_models() -> dict: return model_list -# Initialize the model list. -bedrock_model_list = list_bedrock_models() - - class BedrockModel(BaseChatModel): + def __init__(self): + """Instantiate with initial model grouping.""" + self._model_manager = ModelManager() + [self._model_manager.add_model({k: v}) for k,v in list_bedrock_models().items()] + def list_models(self) -> list[str]: """Always refresh the latest model list""" - global bedrock_model_list - bedrock_model_list = list_bedrock_models() - return list(bedrock_model_list.keys()) + return self._model_manager.model_keys def validate(self, chat_request: ChatRequest): """Perform basic validation on requests""" error = "" # check if model is supported - if chat_request.model not in bedrock_model_list.keys(): + if chat_request.model not in self._model_manager.model_keys: error = f"Unsupported model {chat_request.model}, please use models API to get a list of supported models" logger.error("Unsupported model: %s", chat_request.model) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py new file mode 100644 index 00000000..67fcf2cf --- /dev/null +++ b/src/api/models/bedrock_agents.py @@ -0,0 +1,299 @@ +# Original Credit: GitHub user dhapola +import base64 +import json +import logging +import re +import time +from abc import ABC +from typing import AsyncIterable + +import boto3 +from botocore.config import Config +import numpy as np +import requests +import tiktoken +from fastapi import HTTPException +from api.models.model_manager import ModelManager + +from api.models.bedrock import ( + BedrockModel, + bedrock_client, + bedrock_runtime) + +from api.schema import ( + ChatResponse, + ChatRequest, + ChatResponseMessage, + ChatStreamResponse, + ChoiceDelta +) + +from api.setting import (DEBUG, AWS_REGION, AGENT_PREFIX) + +logger = logging.getLogger(__name__) +config = Config(connect_timeout=1, read_timeout=120, retries={"max_attempts": 1}) + +bedrock_agent = boto3.client( + service_name="bedrock-agent", + region_name=AWS_REGION, + config=config, + ) + +bedrock_agent_runtime = boto3.client( + service_name="bedrock-agent-runtime", + region_name=AWS_REGION, + config=config, +) + + +class BedrockAgents(BedrockModel): + + def __init__(self): + """Append agents to model list.""" + super().__init__() + self.get_agents() + + def get_latest_agent_alias(self, client, agent_id): + + # List all aliases for the agent + response = client.list_agent_aliases( + agentId=agent_id, + maxResults=100 # Adjust based on your needs + ) + + if not response.get('agentAliasSummaries'): + return None + + # Sort aliases by creation time to get the latest one + aliases = response['agentAliasSummaries'] + latest_alias = None + latest_creation_time = None + + for alias in aliases: + # Only consider aliases that are in PREPARED state + if alias['agentAliasStatus'] == 'PREPARED': + creation_time = alias.get('creationDateTime') + if latest_creation_time is None or creation_time > latest_creation_time: + latest_creation_time = creation_time + latest_alias = alias + + if latest_alias: + return latest_alias['agentAliasId'] + + return None + + def get_agents(self): + bedrock_ag = boto3.client( + service_name="bedrock-agent", + region_name=AWS_REGION, + config=config, + ) + # List Agents + response = bedrock_agent.list_agents(maxResults=100) + + # Prepare agent for display + for agent in response['agentSummaries']: + + if (agent['agentStatus'] != 'PREPARED'): + continue + + name = f"{AGENT_PREFIX}{agent['agentName']}" + agentId = agent['agentId'] + + aliasId = self.get_latest_agent_alias(bedrock_ag, agentId) + if (aliasId is None): + continue + + val = { + "system": False, # Supports system prompts for context setting. These are already set in Bedrock Agent configuration + "multimodal": True, # Capable of processing both text and images + "tool_call": False, # Tool Use not required for Agents + "stream_tool_call": False, + "agent_id": agentId, + "alias_id": aliasId + } + + model = {} + model[name]=val + self._model_manager.add_model(model) + + + async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): + """Common logic for invoke bedrock models""" + + # convert OpenAI chat request to Bedrock SDK request + args = self._parse_request(chat_request) + if DEBUG: + logger.info("Bedrock request: " + json.dumps(str(args))) + + try: + + if stream: + response = bedrock_runtime.converse_stream(**args) + else: + response = bedrock_runtime.converse(**args) + + + except bedrock_client.exceptions.ValidationException as e: + logger.error("Validation Error: " + str(e)) + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(e) + raise HTTPException(status_code=500, detail=str(e)) + return response + + async def chat(self, chat_request: ChatRequest) -> ChatResponse: + """Default implementation for Chat API.""" + #chat: {chat_request}") + + message_id = self.generate_message_id() + + if (chat_request.model.startswith(AGENT_PREFIX)): + response = self._invoke_agent(chat_request) + output = "" + + for event in response["completion"]: + output += event["chunk"]["bytes"].decode("utf-8") + + # Minimal response (stop reason, token I/O counts not returned) + chat_response = self._create_response( + model=chat_request.model, + message_id=message_id, + content=[{"text": output}], + finish_reason="", + input_tokens=0, + output_tokens=0 + ) + else: + # Just use what we know works + chat_response = await super().chat(chat_request) + + return chat_response + + async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: + + """Default implementation for Chat Stream API""" + + response = '' + message_id = self.generate_message_id() + + if (chat_request.model.startswith(AGENT_PREFIX)): + response = self._invoke_agent(chat_request, stream=True) + + _event_stream = response["completion"] + + chunk_count = 1 + message = ChatResponseMessage( + role="assistant", + content="", + ) + stream_response = ChatStreamResponse( + id=message_id, + model=chat_request.model, + choices=[ + ChoiceDelta( + index=0, + delta=message, + logprobs=None, + finish_reason=None, + ) + ], + usage=None, + ) + yield self.stream_response_to_bytes(stream_response) + + for event in _event_stream: + chunk_count += 1 + if "chunk" in event: + _data = event["chunk"]["bytes"].decode("utf8") + message = ChatResponseMessage(content=_data) + + stream_response = ChatStreamResponse( + id=message_id, + model=chat_request.model, + choices=[ + ChoiceDelta( + index=0, + delta=message, + logprobs=None, + finish_reason=None, + ) + ], + usage=None, + ) + yield self.stream_response_to_bytes(stream_response) + + #message = self._make_fully_cited_answer(_data, event, False, 0) + + # return an [DONE] message at the end. + yield self.stream_response_to_bytes() + return + else: + response = await self._invoke_bedrock(chat_request, stream=True) + + stream = response.get("stream") + for chunk in stream: + stream_response = self._create_response_stream( + model_id=chat_request.model, message_id=message_id, chunk=chunk + ) + if not stream_response: + continue + if DEBUG: + logger.info("Proxy response :" + stream_response.model_dump_json()) + if stream_response.choices: + yield self.stream_response_to_bytes(stream_response) + elif ( + chat_request.stream_options + and chat_request.stream_options.include_usage + ): + # An empty choices for Usage as per OpenAI doc below: + # if you set stream_options: {"include_usage": true}. + # an additional chunk will be streamed before the data: [DONE] message. + # The usage field on this chunk shows the token usage statistics for the entire request, + # and the choices field will always be an empty array. + # All other chunks will also include a usage field, but with a null value. + yield self.stream_response_to_bytes(stream_response) + + # return an [DONE] message at the end. + yield self.stream_response_to_bytes() + + def _invoke_agent(self, chat_request: ChatRequest, stream=False): + """Common logic for invoke agent """ + if DEBUG: + logger.info("BedrockAgents._invoke_agent: Raw request: " + chat_request.model_dump_json()) + + # convert OpenAI chat request to Bedrock SDK request + args = self._parse_request(chat_request) + + + if DEBUG: + logger.info("Bedrock request: " + json.dumps(str(args))) + + model = self._model_manager.get_all_models()[chat_request.model] + + ################ + + try: + query = args['messages'][0]['content'][0]['text'] + messages = args['messages'] + query = messages[len(messages)-1]['content'][0]['text'] + + + # Step 1 - Retrieve Context + request_params = { + 'agentId': model['agent_id'], + 'agentAliasId': model['alias_id'], + 'sessionId': 'unique-session-id', # Generate a unique session ID + 'inputText': query + } + + # Make the retrieve request + # Invoke the agent + response = bedrock_agent_runtime.invoke_agent(**request_params) + return response + + except Exception as e: + logger.error(e) + raise HTTPException(status_code=500, detail=str(e)) + + \ No newline at end of file diff --git a/src/api/models/model_manager.py b/src/api/models/model_manager.py new file mode 100644 index 00000000..3bdcd327 --- /dev/null +++ b/src/api/models/model_manager.py @@ -0,0 +1,40 @@ +# Original Credit: GitHub user dhapola + + +class ModelManager: + _instance = None + _models = None + + @property + def model_keys(self): + return list(self.get_all_models().keys()) + + def __new__(cls, *args, **kwargs): + # Ensure that only one instance of ModelManager is created + if cls._instance is None: + cls._instance = super(ModelManager, cls).__new__(cls, *args, **kwargs) + cls._instance._models = {} # Initialize the list of models + + return cls._instance + + def get_all_models(self): + return self._models + + def add_model(self, model): + """Add a model to the list.""" + if (self._models is None): + self._models = {} + self._models.update(model) + + + def get_model_by_name(self, model_name: str): + """Get the list of models.""" + return self._models + + def clear_models(self): + """Clear the list of models.""" + self._models.clear() + self._models = {} + + def __repr__(self): + return f"ModelManager(models={self._models})" \ No newline at end of file diff --git a/src/api/routers/chat.py b/src/api/routers/chat.py index 530f75d6..875d6dcc 100644 --- a/src/api/routers/chat.py +++ b/src/api/routers/chat.py @@ -5,6 +5,7 @@ from api.auth import api_key_auth from api.models.bedrock import BedrockModel +from api.models.bedrock_agents import BedrockAgents from api.schema import ChatRequest, ChatResponse, ChatStreamResponse, Error from api.setting import DEFAULT_MODEL @@ -36,10 +37,18 @@ async def chat_completions( ): if chat_request.model.lower().startswith("gpt-"): chat_request.model = DEFAULT_MODEL + + model = None + + for mods_inst in [BedrockModel(), BedrockAgents()]: + try: + mods_inst.validate(chat_request) + except: + continue + + model = mods_inst # Exception will be raised if model not supported. - model = BedrockModel() - model.validate(chat_request) if chat_request.stream: return StreamingResponse(content=model.chat_stream(chat_request), media_type="text/event-stream") return await model.chat(chat_request) diff --git a/src/api/routers/model.py b/src/api/routers/model.py index e1de1553..0f8d5b33 100644 --- a/src/api/routers/model.py +++ b/src/api/routers/model.py @@ -3,7 +3,7 @@ from fastapi import APIRouter, Depends, HTTPException, Path from api.auth import api_key_auth -from api.models.bedrock import BedrockModel +from api.models.bedrock_agents import BedrockAgents from api.schema import Model, Models router = APIRouter( @@ -12,7 +12,7 @@ # responses={404: {"description": "Not found"}}, ) -chat_model = BedrockModel() +chat_model = BedrockAgents() async def validate_model_id(model_id: str): diff --git a/src/api/setting.py b/src/api/setting.py index 4e0a7bbd..8db65a5b 100644 --- a/src/api/setting.py +++ b/src/api/setting.py @@ -17,3 +17,6 @@ DEFAULT_EMBEDDING_MODEL = os.environ.get("DEFAULT_EMBEDDING_MODEL", "cohere.embed-multilingual-v3") ENABLE_CROSS_REGION_INFERENCE = os.environ.get("ENABLE_CROSS_REGION_INFERENCE", "true").lower() != "false" ENABLE_APPLICATION_INFERENCE_PROFILES = os.environ.get("ENABLE_APPLICATION_INFERENCE_PROFILES", "true").lower() != "false" + +# Added for agent ingestion +AGENT_PREFIX = 'agent-' \ No newline at end of file From 65222c3b0ec114b15d2c40ff23a73f508e2523a8 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Thu, 17 Jul 2025 19:06:38 +0000 Subject: [PATCH 02/17] Revert chat.py. --- src/api/routers/chat.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/api/routers/chat.py b/src/api/routers/chat.py index 875d6dcc..42e26cad 100644 --- a/src/api/routers/chat.py +++ b/src/api/routers/chat.py @@ -38,15 +38,8 @@ async def chat_completions( if chat_request.model.lower().startswith("gpt-"): chat_request.model = DEFAULT_MODEL - model = None - - for mods_inst in [BedrockModel(), BedrockAgents()]: - try: - mods_inst.validate(chat_request) - except: - continue - - model = mods_inst + model = BedrockAgents() + model.validate(chat_request) # Exception will be raised if model not supported. if chat_request.stream: From 391ed6068fdc34304d49ebe82a2c2b88bbcb635c Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 18 Jul 2025 10:09:54 +0000 Subject: [PATCH 03/17] Expose all agents' aliases. Remove weird constructor work-around on ModelManager. --- src/api/models/bedrock_agents.py | 77 ++++++++++++++++++-------------- src/api/models/model_manager.py | 18 +------- 2 files changed, 45 insertions(+), 50 deletions(-) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 67fcf2cf..edc5acdd 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -53,7 +53,7 @@ def __init__(self): super().__init__() self.get_agents() - def get_latest_agent_alias(self, client, agent_id): + def get_latest_agent_aliases(self, client, agent_id):#, limit=2): # List all aliases for the agent response = client.list_agent_aliases( @@ -63,24 +63,29 @@ def get_latest_agent_alias(self, client, agent_id): if not response.get('agentAliasSummaries'): return None - - # Sort aliases by creation time to get the latest one - aliases = response['agentAliasSummaries'] - latest_alias = None - latest_creation_time = None - - for alias in aliases: - # Only consider aliases that are in PREPARED state - if alias['agentAliasStatus'] == 'PREPARED': - creation_time = alias.get('creationDateTime') - if latest_creation_time is None or creation_time > latest_creation_time: - latest_creation_time = creation_time - latest_alias = alias - - if latest_alias: - return latest_alias['agentAliasId'] - - return None + + # Sort aliases by createdAt descending + aliases = response.get('agentAliasSummaries', []) + + sorted_aliases = sorted( + [a for a in aliases if a.get('agentAliasName')], + key=lambda a: a['createdAt'], + reverse=True + ) + + # Init + result = {} + seen_statuses = set() + + for alias in sorted_aliases: + if "PREPARED" in alias.get('agentAliasStatus'): + name = alias.get('agentAliasName').replace('AgentTestAlias', 'DRAFT') + result[name]=alias + + #if len(result) >= limit: + # break + + return result def get_agents(self): bedrock_ag = boto3.client( @@ -97,25 +102,29 @@ def get_agents(self): if (agent['agentStatus'] != 'PREPARED'): continue - name = f"{AGENT_PREFIX}{agent['agentName']}" agentId = agent['agentId'] - aliasId = self.get_latest_agent_alias(bedrock_ag, agentId) - if (aliasId is None): + all_latest_aliases = self.get_latest_agent_aliases(bedrock_ag, agentId) + if not all_latest_aliases: continue - val = { - "system": False, # Supports system prompts for context setting. These are already set in Bedrock Agent configuration - "multimodal": True, # Capable of processing both text and images - "tool_call": False, # Tool Use not required for Agents - "stream_tool_call": False, - "agent_id": agentId, - "alias_id": aliasId - } - - model = {} - model[name]=val - self._model_manager.add_model(model) + for alias_name, latest_alias in all_latest_aliases.items(): + key_alias_id = 'agentAliasId' + + name = f"{AGENT_PREFIX}{agent['agentName']}-{alias_name}" + + val = { + "system": False, # Supports system prompts for context setting. These are already set in Bedrock Agent configuration + "multimodal": True, # Capable of processing both text and images + "tool_call": False, # Tool Use not required for Agents + "stream_tool_call": True, + "agent_id": agentId, + "alias_id": latest_alias[key_alias_id] + } + + model = {} + model[name]=val + self._model_manager.add_model(model) async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): diff --git a/src/api/models/model_manager.py b/src/api/models/model_manager.py index 3bdcd327..2331250b 100644 --- a/src/api/models/model_manager.py +++ b/src/api/models/model_manager.py @@ -2,35 +2,21 @@ class ModelManager: - _instance = None - _models = None @property def model_keys(self): return list(self.get_all_models().keys()) - def __new__(cls, *args, **kwargs): - # Ensure that only one instance of ModelManager is created - if cls._instance is None: - cls._instance = super(ModelManager, cls).__new__(cls, *args, **kwargs) - cls._instance._models = {} # Initialize the list of models - - return cls._instance + def __init__(self, *args, **kwargs): + self._models = {} def get_all_models(self): return self._models def add_model(self, model): """Add a model to the list.""" - if (self._models is None): - self._models = {} self._models.update(model) - - def get_model_by_name(self, model_name: str): - """Get the list of models.""" - return self._models - def clear_models(self): """Clear the list of models.""" self._models.clear() From 79958e21e6093744ab22276b76681cede52dd7b9 Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Thu, 7 Aug 2025 14:09:44 -0400 Subject: [PATCH 04/17] Use config from bedrock.py for longer timeouts and more retries. --- src/api/models/bedrock_agents.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index edc5acdd..16654b11 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -31,7 +31,15 @@ from api.setting import (DEBUG, AWS_REGION, AGENT_PREFIX) logger = logging.getLogger(__name__) -config = Config(connect_timeout=1, read_timeout=120, retries={"max_attempts": 1}) +config = Config( + connect_timeout=60, # Connection timeout: 60 seconds + read_timeout=900, # Read timeout: 15 minutes (suitable for long streaming responses) + retries={ + 'max_attempts': 8, # Maximum retry attempts + 'mode': 'adaptive' # Adaptive retry mode + }, + max_pool_connections=50 # Maximum connection pool size + ) bedrock_agent = boto3.client( service_name="bedrock-agent", From bd866f3e012cbd52831e2a293795d891ed1af2cf Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Tue, 23 Sep 2025 16:10:15 -0400 Subject: [PATCH 05/17] Add first cut of parsing MD queries. --- src/api/models/bedrock_agents.py | 26 +++++++++++++++++++++++++- src/api/models/md.py | 26 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 src/api/models/md.py diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 16654b11..1e537edd 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -30,6 +30,8 @@ from api.setting import (DEBUG, AWS_REGION, AGENT_PREFIX) +from md import MetaData + logger = logging.getLogger(__name__) config = Config( connect_timeout=60, # Connection timeout: 60 seconds @@ -295,14 +297,36 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): messages = args['messages'] query = messages[len(messages)-1]['content'][0]['text'] + md = MetaData(query) + md_args = {} + session_state = {} + if md.has_metadata: + md_args = md.get_metadata_args() + query = md.get_clean_query() + kb_id = "D3Q2K57HXU" + + session_state['knowledgeBaseConfigurations'] = [{ + 'knowledgeBaseId': kb_id, # TODO: Don't hard-wire! + 'retrievalConfiguration': { + 'vectorSearchConfiguration': { + 'filter': md_args + } + } + }] + # Step 1 - Retrieve Context + # TODO: Session state request_params = { 'agentId': model['agent_id'], 'agentAliasId': model['alias_id'], 'sessionId': 'unique-session-id', # Generate a unique session ID - 'inputText': query + 'inputText': query, } + + # Append KB config if present + if session_state: + request_params['sessionState'] = session_state # Make the retrieve request # Invoke the agent diff --git a/src/api/models/md.py b/src/api/models/md.py new file mode 100644 index 00000000..fe0282d5 --- /dev/null +++ b/src/api/models/md.py @@ -0,0 +1,26 @@ +import re + +class MetaData(object): + + @property + def has_metadata(self): + return '"="' in self._prompt + + def __init__(self, prompt: str): + self._prompt = prompt + + def get_metadata_args(self): + key_and_all = "andAll" + md_args = {key_and_all: []} + + pattern = r'"([^"]*)"\s*=\s*"([^"]*)"' # TODO: DRY on pattern + matches = re.findall(pattern, self._prompt) + + for k,v in dict(matches).items(): + sub_map = {"equals": {"key": k, "value": v}} + md_args[key_and_all].append(sub_map) + + return md_args + + def get_clean_query(self): + return re.sub(r'"[^"]*"\s*=\s*"[^"]*"', '', self._prompt).strip() \ No newline at end of file From e62ee0cb82dabc882c9ea0553d504be36ad8a75d Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Tue, 23 Sep 2025 16:31:00 -0400 Subject: [PATCH 06/17] Print MD. --- src/api/models/bedrock_agents.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 1e537edd..8b7c087f 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -303,6 +303,7 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): if md.has_metadata: md_args = md.get_metadata_args() + print(md_args) query = md.get_clean_query() kb_id = "D3Q2K57HXU" From 9c745390cac14d8b4dddbbc8d73c5b6fbac671f0 Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Tue, 23 Sep 2025 16:37:46 -0400 Subject: [PATCH 07/17] Test parsing MD queries, print to logs instead of stdout. --- src/api/models/bedrock_agents.py | 2 +- src/api/models/md.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 8b7c087f..a2088065 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -303,7 +303,7 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): if md.has_metadata: md_args = md.get_metadata_args() - print(md_args) + logger.info(md_args) query = md.get_clean_query() kb_id = "D3Q2K57HXU" diff --git a/src/api/models/md.py b/src/api/models/md.py index fe0282d5..ab675bac 100644 --- a/src/api/models/md.py +++ b/src/api/models/md.py @@ -23,4 +23,11 @@ def get_metadata_args(self): return md_args def get_clean_query(self): - return re.sub(r'"[^"]*"\s*=\s*"[^"]*"', '', self._prompt).strip() \ No newline at end of file + return re.sub(r'"[^"]*"\s*=\s*"[^"]*"', '', self._prompt).strip() + +if __name__ == "__main__": + md = MetaData('"OE_Number"="111" Tell me about the event.') + prompt = md.get_clean_query() + filters = md.get_metadata_args() + + print(f"Prompt: {prompt}\nFilters: {filters}") \ No newline at end of file From f2e8d71657989a9a719b6a076e5df9b907ca161a Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Tue, 23 Sep 2025 16:51:19 -0400 Subject: [PATCH 08/17] Account for using only one metadata filter. --- src/api/models/bedrock_agents.py | 2 +- src/api/models/md.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index a2088065..3750d042 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -30,7 +30,7 @@ from api.setting import (DEBUG, AWS_REGION, AGENT_PREFIX) -from md import MetaData +from api.models.md import MetaData logger = logging.getLogger(__name__) config = Config( diff --git a/src/api/models/md.py b/src/api/models/md.py index ab675bac..dc4ea3ee 100644 --- a/src/api/models/md.py +++ b/src/api/models/md.py @@ -19,6 +19,10 @@ def get_metadata_args(self): for k,v in dict(matches).items(): sub_map = {"equals": {"key": k, "value": v}} md_args[key_and_all].append(sub_map) + + # Can't have andAll with just one filter :( + if len(matches) == 1: + md_args = md_args[key_and_all][0] return md_args From ced4469a585ea1e33eb71943342e162b8fe4b07e Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Tue, 23 Sep 2025 17:30:45 -0400 Subject: [PATCH 09/17] Unique session IDs when metadata is parsed. --- src/api/models/bedrock_agents.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 3750d042..096d4bb8 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -1,5 +1,6 @@ # Original Credit: GitHub user dhapola import base64 +import uuid import json import logging import re @@ -300,9 +301,11 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): md = MetaData(query) md_args = {} session_state = {} + session_id = 'unique-session-id' if md.has_metadata: md_args = md.get_metadata_args() + session_id = str(uuid.uuid4()) logger.info(md_args) query = md.get_clean_query() kb_id = "D3Q2K57HXU" @@ -321,7 +324,7 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): request_params = { 'agentId': model['agent_id'], 'agentAliasId': model['alias_id'], - 'sessionId': 'unique-session-id', # Generate a unique session ID + 'sessionId': session_id, 'inputText': query, } From 347360f0d3b58e43c559e00d0f723ba2ea7396dd Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Wed, 24 Sep 2025 13:34:16 -0400 Subject: [PATCH 10/17] Switch to orAll for multiple MD key/val. --- src/api/models/md.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/models/md.py b/src/api/models/md.py index dc4ea3ee..59dd37c0 100644 --- a/src/api/models/md.py +++ b/src/api/models/md.py @@ -10,19 +10,19 @@ def __init__(self, prompt: str): self._prompt = prompt def get_metadata_args(self): - key_and_all = "andAll" - md_args = {key_and_all: []} + outer_key = "orAll" + md_args = {outer_key: []} pattern = r'"([^"]*)"\s*=\s*"([^"]*)"' # TODO: DRY on pattern matches = re.findall(pattern, self._prompt) for k,v in dict(matches).items(): sub_map = {"equals": {"key": k, "value": v}} - md_args[key_and_all].append(sub_map) + md_args[outer_key].append(sub_map) # Can't have andAll with just one filter :( if len(matches) == 1: - md_args = md_args[key_and_all][0] + md_args = md_args[outer_key][0] return md_args From 7f7880718790da67625e3e107575a55411643a40 Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Mon, 29 Sep 2025 16:55:44 -0400 Subject: [PATCH 11/17] Ingest user field from raw request, embed in args, and apply to bedrock agent session ID. --- src/api/models/bedrock.py | 5 +++++ src/api/models/bedrock_agents.py | 2 +- src/api/schema.py | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 95854ac2..6ab31ea4 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -575,6 +575,11 @@ def _parse_request(self, chat_request: ChatRequest) -> dict: if chat_request.extra_body: # reasoning_config will not be used args["additionalModelRequestFields"] = chat_request.extra_body + + # Add user + if chat_request.user: + args["user"] = chat_request.user + return args def _create_response( diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 096d4bb8..8677b3e3 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -301,7 +301,7 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): md = MetaData(query) md_args = {} session_state = {} - session_id = 'unique-session-id' + session_id = args["user"] if md.has_metadata: md_args = md.get_metadata_args() diff --git a/src/api/schema.py b/src/api/schema.py index 233e1139..248554f2 100644 --- a/src/api/schema.py +++ b/src/api/schema.py @@ -99,7 +99,7 @@ class ChatRequest(BaseModel): stream_options: StreamOptions | None = None temperature: float | None = Field(default=1.0, le=2.0, ge=0.0) top_p: float | None = Field(default=1.0, le=1.0, ge=0.0) - user: str | None = None # Not used + user: str = 'unique-session-id' # Feed email to user field for unique session ID max_tokens: int | None = 2048 max_completion_tokens: int | None = None reasoning_effort: Literal["low", "medium", "high"] | None = None From e8b31bbdb78495a7e8a2fa443a573422c7c5a278 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Mon, 29 Sep 2025 21:52:38 +0000 Subject: [PATCH 12/17] Solidify schema as session_id distinct from user, and ensure it's not passed to foundation models. --- src/api/models/bedrock.py | 3 +-- src/api/models/bedrock_agents.py | 3 ++- src/api/schema.py | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/api/models/bedrock.py b/src/api/models/bedrock.py index 6ab31ea4..068e160a 100644 --- a/src/api/models/bedrock.py +++ b/src/api/models/bedrock.py @@ -577,8 +577,7 @@ def _parse_request(self, chat_request: ChatRequest) -> dict: args["additionalModelRequestFields"] = chat_request.extra_body # Add user - if chat_request.user: - args["user"] = chat_request.user + args["session_id"] = chat_request.session_id return args diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 8677b3e3..5cc38c22 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -143,6 +143,7 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False): # convert OpenAI chat request to Bedrock SDK request args = self._parse_request(chat_request) + del args["session_id"] # Not used for foundation models if DEBUG: logger.info("Bedrock request: " + json.dumps(str(args))) @@ -301,7 +302,7 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): md = MetaData(query) md_args = {} session_state = {} - session_id = args["user"] + session_id = args["session_id"] if md.has_metadata: md_args = md.get_metadata_args() diff --git a/src/api/schema.py b/src/api/schema.py index 248554f2..2f137c28 100644 --- a/src/api/schema.py +++ b/src/api/schema.py @@ -99,7 +99,8 @@ class ChatRequest(BaseModel): stream_options: StreamOptions | None = None temperature: float | None = Field(default=1.0, le=2.0, ge=0.0) top_p: float | None = Field(default=1.0, le=1.0, ge=0.0) - user: str = 'unique-session-id' # Feed email to user field for unique session ID + user: str | None = None + session_id: str | None = None # Feed user name for unique session ID max_tokens: int | None = 2048 max_completion_tokens: int | None = None reasoning_effort: Literal["low", "medium", "high"] | None = None From 7eac31224513ef44177f5ff05bad17a4f1ff47f0 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 30 Sep 2025 12:57:57 +0000 Subject: [PATCH 13/17] Ensure string const for session id --- src/api/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/schema.py b/src/api/schema.py index 2f137c28..af1d0905 100644 --- a/src/api/schema.py +++ b/src/api/schema.py @@ -100,7 +100,7 @@ class ChatRequest(BaseModel): temperature: float | None = Field(default=1.0, le=2.0, ge=0.0) top_p: float | None = Field(default=1.0, le=1.0, ge=0.0) user: str | None = None - session_id: str | None = None # Feed user name for unique session ID + session_id: str | None = "unique-session-id" # Feed user name for unique session ID max_tokens: int | None = 2048 max_completion_tokens: int | None = None reasoning_effort: Literal["low", "medium", "high"] | None = None From d562c81821c046f5911ec40963ec0f9fa7dd6404 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 30 Sep 2025 13:08:50 +0000 Subject: [PATCH 14/17] No spaces in session IDs. --- src/api/models/bedrock_agents.py | 2 +- src/api/schema.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 5cc38c22..aae93637 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -302,7 +302,7 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): md = MetaData(query) md_args = {} session_state = {} - session_id = args["session_id"] + session_id = args["session_id"].replace(" ", "_") # [0-9a-zA-Z._:-]+ if md.has_metadata: md_args = md.get_metadata_args() diff --git a/src/api/schema.py b/src/api/schema.py index af1d0905..2f137c28 100644 --- a/src/api/schema.py +++ b/src/api/schema.py @@ -100,7 +100,7 @@ class ChatRequest(BaseModel): temperature: float | None = Field(default=1.0, le=2.0, ge=0.0) top_p: float | None = Field(default=1.0, le=1.0, ge=0.0) user: str | None = None - session_id: str | None = "unique-session-id" # Feed user name for unique session ID + session_id: str | None = None # Feed user name for unique session ID max_tokens: int | None = 2048 max_completion_tokens: int | None = None reasoning_effort: Literal["low", "medium", "high"] | None = None From 7111bb7fba63c4eb3ffecfc8f15af0be6edb28f7 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 14 Oct 2025 19:23:45 +0000 Subject: [PATCH 15/17] Reinstantiate client in case credentials go stale. --- src/api/models/bedrock_agents.py | 34 +++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index aae93637..e4f576ec 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -9,6 +9,7 @@ from typing import AsyncIterable import boto3 +from botocore.exceptions import EventStreamError from botocore.config import Config import numpy as np import requests @@ -50,11 +51,14 @@ config=config, ) -bedrock_agent_runtime = boto3.client( - service_name="bedrock-agent-runtime", - region_name=AWS_REGION, - config=config, -) +def get_agent_runtime(): + return boto3.client( + service_name="bedrock-agent-runtime", + region_name=AWS_REGION, + config=config, + ) + +bedrock_agent_runtime = get_agent_runtime() class BedrockAgents(BedrockModel): @@ -278,7 +282,7 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]: # return an [DONE] message at the end. yield self.stream_response_to_bytes() - def _invoke_agent(self, chat_request: ChatRequest, stream=False): + def _invoke_agent(self, chat_request: ChatRequest, stream=False, retry=False): """Common logic for invoke agent """ if DEBUG: logger.info("BedrockAgents._invoke_agent: Raw request: " + chat_request.model_dump_json()) @@ -293,6 +297,7 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): model = self._model_manager.get_all_models()[chat_request.model] ################ + global bedrock_agent_runtime try: query = args['messages'][0]['content'][0]['text'] @@ -309,10 +314,10 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): session_id = str(uuid.uuid4()) logger.info(md_args) query = md.get_clean_query() - kb_id = "D3Q2K57HXU" + kb_id = "D3Q2K57HXU" # TODO: Don't hard-wire session_state['knowledgeBaseConfigurations'] = [{ - 'knowledgeBaseId': kb_id, # TODO: Don't hard-wire! + 'knowledgeBaseId': kb_id, 'retrievalConfiguration': { 'vectorSearchConfiguration': { 'filter': md_args @@ -337,9 +342,14 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False): # Invoke the agent response = bedrock_agent_runtime.invoke_agent(**request_params) return response - + except EventStreamError as ese: + if retry: + # Reinstantiate client to hopefully refresh credentials + logger.info("Refreshing client to get current credentials") + bedrock_agent_runtime = get_agent_runtime() + return self._invoke_agent(chat_request, stream, True) + else: + raise ese except Exception as e: logger.error(e) - raise HTTPException(status_code=500, detail=str(e)) - - \ No newline at end of file + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file From 8ef23b7360bc6e6aeff80286738c57395f99112d Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 17 Oct 2025 19:03:50 +0000 Subject: [PATCH 16/17] Account for special formatting on metadata double quotes. --- src/api/models/bedrock_agents.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index e4f576ec..10fa4698 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -304,6 +304,9 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False, retry=False): messages = args['messages'] query = messages[len(messages)-1]['content'][0]['text'] + # Sanitize variants of double quotes + query = query.translate(str.maketrans({'“':'"', '”':'"', '„':'"', '‟':'"'})) + md = MetaData(query) md_args = {} session_state = {} From c6ab1f822752b37db0847ad5bef5837d45cbc08a Mon Sep 17 00:00:00 2001 From: herter4171-kp Date: Tue, 21 Oct 2025 16:28:45 -0700 Subject: [PATCH 17/17] Test add streaming response. --- src/api/models/bedrock_agents.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/api/models/bedrock_agents.py b/src/api/models/bedrock_agents.py index 10fa4698..270b5ef1 100644 --- a/src/api/models/bedrock_agents.py +++ b/src/api/models/bedrock_agents.py @@ -340,6 +340,13 @@ def _invoke_agent(self, chat_request: ChatRequest, stream=False, retry=False): # Append KB config if present if session_state: request_params['sessionState'] = session_state + + # Apply streaming if desired + if stream: + request_params['streamingConfigurations'] = { + 'streamFinalResponse': True, + 'applyGuardrailInterval': 123 + } # Make the retrieve request # Invoke the agent