From e9ffdd1cf72b701f3cb07c1932bd94fac99d3834 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Wed, 1 Apr 2026 18:58:31 -0700 Subject: [PATCH] feat: Add ingest_events method for Memory Bank PiperOrigin-RevId: 893219138 --- vertexai/_genai/memories.py | 223 +++++++++++++++++++++++++ vertexai/_genai/types/__init__.py | 44 +++++ vertexai/_genai/types/common.py | 267 ++++++++++++++++++++++++++++++ 3 files changed, 534 insertions(+) diff --git a/vertexai/_genai/memories.py b/vertexai/_genai/memories.py index 2d4b1b180c..c9bea90ce3 100644 --- a/vertexai/_genai/memories.py +++ b/vertexai/_genai/memories.py @@ -254,6 +254,52 @@ def _GetAgentEngineMemoryRequestParameters_to_vertex( return to_object +def _IngestEventsConfig_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + + if getv(from_object, ["force_flush"]) is not None: + setv(parent_object, ["forceFlush"], getv(from_object, ["force_flush"])) + + return to_object + + +def _IngestEventsRequestParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["config"]) is not None: + _IngestEventsConfig_to_vertex(getv(from_object, ["config"]), to_object) + + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + + if getv(from_object, ["stream_id"]) is not None: + setv(to_object, ["streamId"], getv(from_object, ["stream_id"])) + + if getv(from_object, ["direct_contents_source"]) is not None: + setv( + to_object, + ["directContentsSource"], + getv(from_object, ["direct_contents_source"]), + ) + + if getv(from_object, ["scope"]) is not None: + setv(to_object, ["scope"], getv(from_object, ["scope"])) + + if getv(from_object, ["generation_trigger_config"]) is not None: + setv( + to_object, + ["generationTriggerConfig"], + getv(from_object, ["generation_trigger_config"]), + ) + + return to_object + + def _ListAgentEngineMemoryConfig_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -713,6 +759,69 @@ def get( self._api_client._verify_response(return_value) return return_value + def _ingest_events( + self, + *, + config: Optional[types.IngestEventsConfigOrDict] = None, + name: str, + stream_id: Optional[str] = None, + direct_contents_source: Optional[ + types.IngestionDirectContentsSourceOrDict + ] = None, + scope: Optional[dict[str, str]] = None, + generation_trigger_config: Optional[types.GenerationTriggerConfigOrDict] = None, + ) -> types.MemoryBankIngestEventsOperation: + """ + Ingest events into a Memory Bank. + """ + + parameter_model = types._IngestEventsRequestParameters( + config=config, + name=name, + stream_id=stream_id, + direct_contents_source=direct_contents_source, + scope=scope, + generation_trigger_config=generation_trigger_config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _IngestEventsRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}/memories:ingestEvents".format_map(request_url_dict) + else: + path = "{name}/memories:ingestEvents" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request("post", path, request_dict, http_options) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.MemoryBankIngestEventsOperation._from_response( + response=response_dict, kwargs=parameter_model.model_dump() + ) + + self._api_client._verify_response(return_value) + return return_value + def _list( self, *, @@ -1416,6 +1525,55 @@ def purge( raise RuntimeError(f"Failed to purge memories: {operation.error}") return operation + def ingest_events( + self, + *, + name: str, + scope: dict[str, str], + stream_id: str = None, + direct_contents_source: Optional[ + types.IngestionDirectContentsSourceOrDict + ] = None, + generation_trigger_config: Optional[types.GenerationTriggerConfigOrDict] = None, + config: Optional[types.IngestEventsConfigOrDict] = None, + ) -> types.MemoryBankIngestEventsOperation: + """Ingests events into an Agent Engine. + + Args: + name (str): + Required. The name of the Agent Engine to ingest events into. + scope (dict[str, str]): + Required. The scope of the events to ingest. For example, + {"user_id": "123"}. + config (IngestEventsConfig): + Optional. The configuration for the ingest events operation. + + Returns: + AgentEngineIngestEventsOperation: + The operation for ingesting the events. + """ + if config is None: + config = types.IngestEventsConfig() + elif isinstance(config, dict): + config = types.IngestEventsConfig.model_validate(config) + operation = self._ingest_events( + name=name, + scope=scope, + stream_id=stream_id, + generation_trigger_config=generation_trigger_config, + direct_contents_source=direct_contents_source, + config=config, + ) + if config.wait_for_completion and not operation.done: + operation = _agent_engines_utils._await_operation( + operation_name=operation.name, + get_operation_fn=self._get_memory_operation, + poll_interval_seconds=0.5, + ) + if operation.error: + raise RuntimeError(f"Failed to ingest events: {operation.error}") + return operation + class AsyncMemories(_api_module.BaseModule): @@ -1679,6 +1837,71 @@ async def get( self._api_client._verify_response(return_value) return return_value + async def _ingest_events( + self, + *, + config: Optional[types.IngestEventsConfigOrDict] = None, + name: str, + stream_id: Optional[str] = None, + direct_contents_source: Optional[ + types.IngestionDirectContentsSourceOrDict + ] = None, + scope: Optional[dict[str, str]] = None, + generation_trigger_config: Optional[types.GenerationTriggerConfigOrDict] = None, + ) -> types.MemoryBankIngestEventsOperation: + """ + Ingest events into a Memory Bank. + """ + + parameter_model = types._IngestEventsRequestParameters( + config=config, + name=name, + stream_id=stream_id, + direct_contents_source=direct_contents_source, + scope=scope, + generation_trigger_config=generation_trigger_config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _IngestEventsRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}/memories:ingestEvents".format_map(request_url_dict) + else: + path = "{name}/memories:ingestEvents" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "post", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.MemoryBankIngestEventsOperation._from_response( + response=response_dict, kwargs=parameter_model.model_dump() + ) + + self._api_client._verify_response(return_value) + return return_value + async def _list( self, *, diff --git a/vertexai/_genai/types/__init__.py b/vertexai/_genai/types/__init__.py index 937e8fe43f..b69f24c368 100644 --- a/vertexai/_genai/types/__init__.py +++ b/vertexai/_genai/types/__init__.py @@ -75,6 +75,7 @@ from .common import _GetEvaluationSetParameters from .common import _GetMultimodalDatasetOperationParameters from .common import _GetMultimodalDatasetParameters +from .common import _IngestEventsRequestParameters from .common import _ListAgentEngineMemoryRequestParameters from .common import _ListAgentEngineMemoryRevisionsRequestParameters from .common import _ListAgentEngineRequestParameters @@ -475,6 +476,9 @@ from .common import GenerateUserScenariosResponse from .common import GenerateUserScenariosResponseDict from .common import GenerateUserScenariosResponseOrDict +from .common import GenerationTriggerConfig +from .common import GenerationTriggerConfigDict +from .common import GenerationTriggerConfigOrDict from .common import GetAgentEngineConfig from .common import GetAgentEngineConfigDict from .common import GetAgentEngineConfigOrDict @@ -519,6 +523,15 @@ from .common import GetPromptConfigOrDict from .common import IdentityType from .common import Importance +from .common import IngestEventsConfig +from .common import IngestEventsConfigDict +from .common import IngestEventsConfigOrDict +from .common import IngestionDirectContentsSource +from .common import IngestionDirectContentsSourceDict +from .common import IngestionDirectContentsSourceEvent +from .common import IngestionDirectContentsSourceEventDict +from .common import IngestionDirectContentsSourceEventOrDict +from .common import IngestionDirectContentsSourceOrDict from .common import IntermediateExtractedMemory from .common import IntermediateExtractedMemoryDict from .common import IntermediateExtractedMemoryOrDict @@ -646,6 +659,9 @@ from .common import MemoryBankCustomizationConfigMemoryTopicManagedMemoryTopicOrDict from .common import MemoryBankCustomizationConfigMemoryTopicOrDict from .common import MemoryBankCustomizationConfigOrDict +from .common import MemoryBankIngestEventsOperation +from .common import MemoryBankIngestEventsOperationDict +from .common import MemoryBankIngestEventsOperationOrDict from .common import MemoryConjunctionFilter from .common import MemoryConjunctionFilterDict from .common import MemoryConjunctionFilterOrDict @@ -653,6 +669,12 @@ from .common import MemoryFilter from .common import MemoryFilterDict from .common import MemoryFilterOrDict +from .common import MemoryGenerationTriggerConfig +from .common import MemoryGenerationTriggerConfigDict +from .common import MemoryGenerationTriggerConfigGenerationTriggerRule +from .common import MemoryGenerationTriggerConfigGenerationTriggerRuleDict +from .common import MemoryGenerationTriggerConfigGenerationTriggerRuleOrDict +from .common import MemoryGenerationTriggerConfigOrDict from .common import MemoryMetadataMergeStrategy from .common import MemoryMetadataValue from .common import MemoryMetadataValueDict @@ -1580,6 +1602,12 @@ "MemoryBankCustomizationConfig", "MemoryBankCustomizationConfigDict", "MemoryBankCustomizationConfigOrDict", + "MemoryGenerationTriggerConfigGenerationTriggerRule", + "MemoryGenerationTriggerConfigGenerationTriggerRuleDict", + "MemoryGenerationTriggerConfigGenerationTriggerRuleOrDict", + "MemoryGenerationTriggerConfig", + "MemoryGenerationTriggerConfigDict", + "MemoryGenerationTriggerConfigOrDict", "ReasoningEngineContextSpecMemoryBankConfigGenerationConfig", "ReasoningEngineContextSpecMemoryBankConfigGenerationConfigDict", "ReasoningEngineContextSpecMemoryBankConfigGenerationConfigOrDict", @@ -1724,6 +1752,21 @@ "GetAgentEngineMemoryConfig", "GetAgentEngineMemoryConfigDict", "GetAgentEngineMemoryConfigOrDict", + "IngestionDirectContentsSourceEvent", + "IngestionDirectContentsSourceEventDict", + "IngestionDirectContentsSourceEventOrDict", + "IngestionDirectContentsSource", + "IngestionDirectContentsSourceDict", + "IngestionDirectContentsSourceOrDict", + "GenerationTriggerConfig", + "GenerationTriggerConfigDict", + "GenerationTriggerConfigOrDict", + "IngestEventsConfig", + "IngestEventsConfigDict", + "IngestEventsConfigOrDict", + "MemoryBankIngestEventsOperation", + "MemoryBankIngestEventsOperationDict", + "MemoryBankIngestEventsOperationOrDict", "ListAgentEngineMemoryConfig", "ListAgentEngineMemoryConfigDict", "ListAgentEngineMemoryConfigOrDict", @@ -2205,6 +2248,7 @@ "_DeleteAgentEngineMemoryRequestParameters", "_GenerateAgentEngineMemoriesRequestParameters", "_GetAgentEngineMemoryRequestParameters", + "_IngestEventsRequestParameters", "_ListAgentEngineMemoryRequestParameters", "_GetAgentEngineMemoryOperationParameters", "_GetAgentEngineGenerateMemoriesOperationParameters", diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index d549c5490c..ec6e04d689 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -6260,6 +6260,72 @@ class MemoryBankCustomizationConfigDict(TypedDict, total=False): ] +class MemoryGenerationTriggerConfigGenerationTriggerRule(_common.BaseModel): + """Represents the active rule that determines when to flush the buffer.""" + + idle_duration: Optional[str] = Field( + default=None, + description="""Specifies to trigger generation if the stream is inactive for the specified duration after the most recent event. The duration must have a minute-level granularity.""", + ) + fixed_interval: Optional[str] = Field( + default=None, + description="""Specifies to trigger generation at a fixed interval. The duration must have a minute-level granularity.""", + ) + token_limit: Optional[int] = Field( + default=None, + description="""Specifies to trigger generation when the token count reaches this limit.""", + ) + event_count: Optional[int] = Field( + default=None, + description="""Specifies to trigger generation when the event count reaches this limit.""", + ) + + +class MemoryGenerationTriggerConfigGenerationTriggerRuleDict(TypedDict, total=False): + """Represents the active rule that determines when to flush the buffer.""" + + idle_duration: Optional[str] + """Specifies to trigger generation if the stream is inactive for the specified duration after the most recent event. The duration must have a minute-level granularity.""" + + fixed_interval: Optional[str] + """Specifies to trigger generation at a fixed interval. The duration must have a minute-level granularity.""" + + token_limit: Optional[int] + """Specifies to trigger generation when the token count reaches this limit.""" + + event_count: Optional[int] + """Specifies to trigger generation when the event count reaches this limit.""" + + +MemoryGenerationTriggerConfigGenerationTriggerRuleOrDict = Union[ + MemoryGenerationTriggerConfigGenerationTriggerRule, + MemoryGenerationTriggerConfigGenerationTriggerRuleDict, +] + + +class MemoryGenerationTriggerConfig(_common.BaseModel): + """Represents configuration for triggering generation.""" + + generation_rule: Optional[MemoryGenerationTriggerConfigGenerationTriggerRule] = ( + Field( + default=None, + description="""Optional. Represents the active rule that determines when to flush the buffer. If not set, then the stream will be force flushed immediately.""", + ) + ) + + +class MemoryGenerationTriggerConfigDict(TypedDict, total=False): + """Represents configuration for triggering generation.""" + + generation_rule: Optional[MemoryGenerationTriggerConfigGenerationTriggerRuleDict] + """Optional. Represents the active rule that determines when to flush the buffer. If not set, then the stream will be force flushed immediately.""" + + +MemoryGenerationTriggerConfigOrDict = Union[ + MemoryGenerationTriggerConfig, MemoryGenerationTriggerConfigDict +] + + class ReasoningEngineContextSpecMemoryBankConfigGenerationConfig(_common.BaseModel): """Configuration for how to generate memories.""" @@ -6267,6 +6333,10 @@ class ReasoningEngineContextSpecMemoryBankConfigGenerationConfig(_common.BaseMod default=None, description="""Required. The model used to generate memories. Format: `projects/{project}/locations/{location}/publishers/google/models/{model}`.""", ) + generation_trigger_config: Optional[MemoryGenerationTriggerConfig] = Field( + default=None, + description="""Optional. Specifies the default trigger configuration for generating memories using `IngestEvents`.""", + ) class ReasoningEngineContextSpecMemoryBankConfigGenerationConfigDict( @@ -6277,6 +6347,9 @@ class ReasoningEngineContextSpecMemoryBankConfigGenerationConfigDict( model: Optional[str] """Required. The model used to generate memories. Format: `projects/{project}/locations/{location}/publishers/google/models/{model}`.""" + generation_trigger_config: Optional[MemoryGenerationTriggerConfigDict] + """Optional. Specifies the default trigger configuration for generating memories using `IngestEvents`.""" + ReasoningEngineContextSpecMemoryBankConfigGenerationConfigOrDict = Union[ ReasoningEngineContextSpecMemoryBankConfigGenerationConfig, @@ -8966,6 +9039,200 @@ class _GetAgentEngineMemoryRequestParametersDict(TypedDict, total=False): ] +class IngestionDirectContentsSourceEvent(_common.BaseModel): + """The direct contents source event for ingesting events.""" + + content: Optional[genai_types.Content] = Field(default=None, description="""""") + + +class IngestionDirectContentsSourceEventDict(TypedDict, total=False): + """The direct contents source event for ingesting events.""" + + content: Optional[genai_types.ContentDict] + """""" + + +IngestionDirectContentsSourceEventOrDict = Union[ + IngestionDirectContentsSourceEvent, IngestionDirectContentsSourceEventDict +] + + +class IngestionDirectContentsSource(_common.BaseModel): + """The direct contents source for ingesting events.""" + + events: Optional[list[IngestionDirectContentsSourceEvent]] = Field( + default=None, description="""""" + ) + + +class IngestionDirectContentsSourceDict(TypedDict, total=False): + """The direct contents source for ingesting events.""" + + events: Optional[list[IngestionDirectContentsSourceEventDict]] + """""" + + +IngestionDirectContentsSourceOrDict = Union[ + IngestionDirectContentsSource, IngestionDirectContentsSourceDict +] + + +class GenerationTriggerConfig(_common.BaseModel): + """The configuration for the memory generation trigger.""" + + pass + + +class GenerationTriggerConfigDict(TypedDict, total=False): + """The configuration for the memory generation trigger.""" + + pass + + +GenerationTriggerConfigOrDict = Union[ + GenerationTriggerConfig, GenerationTriggerConfigDict +] + + +class IngestEventsConfig(_common.BaseModel): + """Config for ingesting events.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + wait_for_completion: Optional[bool] = Field( + default=False, + description="""Waits for the underlying memory generation operation to complete + before returning. Defaults to false.""", + ) + force_flush: Optional[bool] = Field( + default=None, + description="""Optional. Forces a flush of all pending events in the stream and triggers memory generation immediately bypassing any conditions configured in the `generation_trigger_config`.""", + ) + + +class IngestEventsConfigDict(TypedDict, total=False): + """Config for ingesting events.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + wait_for_completion: Optional[bool] + """Waits for the underlying memory generation operation to complete + before returning. Defaults to false.""" + + force_flush: Optional[bool] + """Optional. Forces a flush of all pending events in the stream and triggers memory generation immediately bypassing any conditions configured in the `generation_trigger_config`.""" + + +IngestEventsConfigOrDict = Union[IngestEventsConfig, IngestEventsConfigDict] + + +class _IngestEventsRequestParameters(_common.BaseModel): + """Parameters for purging agent engine memories.""" + + config: Optional[IngestEventsConfig] = Field(default=None, description="""""") + name: Optional[str] = Field( + default=None, description="""Name of the Agent Engine to ingest events into.""" + ) + stream_id: Optional[str] = Field( + default=None, description="""The ID of the stream to ingest events into.""" + ) + direct_contents_source: Optional[IngestionDirectContentsSource] = Field( + default=None, + description="""The direct memories source of the events that should be ingested.""", + ) + scope: Optional[dict[str, str]] = Field( + default=None, + description="""The scope of the memories that should be generated from the stream. + + Memories will be consolidated across memories with the same scope. Scope + values cannot contain the wildcard character '*'.""", + ) + generation_trigger_config: Optional[GenerationTriggerConfig] = Field( + default=None, + description="""The configuration for the memory generation trigger.""", + ) + + +class _IngestEventsRequestParametersDict(TypedDict, total=False): + """Parameters for purging agent engine memories.""" + + config: Optional[IngestEventsConfigDict] + """""" + + name: Optional[str] + """Name of the Agent Engine to ingest events into.""" + + stream_id: Optional[str] + """The ID of the stream to ingest events into.""" + + direct_contents_source: Optional[IngestionDirectContentsSourceDict] + """The direct memories source of the events that should be ingested.""" + + scope: Optional[dict[str, str]] + """The scope of the memories that should be generated from the stream. + + Memories will be consolidated across memories with the same scope. Scope + values cannot contain the wildcard character '*'.""" + + generation_trigger_config: Optional[GenerationTriggerConfigDict] + """The configuration for the memory generation trigger.""" + + +_IngestEventsRequestParametersOrDict = Union[ + _IngestEventsRequestParameters, _IngestEventsRequestParametersDict +] + + +class MemoryBankIngestEventsOperation(_common.BaseModel): + """Operation that ingests events into a memory bank.""" + + name: Optional[str] = Field( + default=None, + description="""The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""", + ) + metadata: Optional[dict[str, Any]] = Field( + default=None, + description="""Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""", + ) + done: Optional[bool] = Field( + default=None, + description="""If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""", + ) + error: Optional[dict[str, Any]] = Field( + default=None, + description="""The error result of the operation in case of failure or cancellation.""", + ) + response: Optional[GenerateMemoriesResponse] = Field( + default=None, description="""The response for ingesting events.""" + ) + + +class MemoryBankIngestEventsOperationDict(TypedDict, total=False): + """Operation that ingests events into a memory bank.""" + + name: Optional[str] + """The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""" + + metadata: Optional[dict[str, Any]] + """Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""" + + done: Optional[bool] + """If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""" + + error: Optional[dict[str, Any]] + """The error result of the operation in case of failure or cancellation.""" + + response: Optional[GenerateMemoriesResponseDict] + """The response for ingesting events.""" + + +MemoryBankIngestEventsOperationOrDict = Union[ + MemoryBankIngestEventsOperation, MemoryBankIngestEventsOperationDict +] + + class ListAgentEngineMemoryConfig(_common.BaseModel): """Config for listing agent engine memories."""