From b7dfcc136d63faa7f86941ff917d2390193b7682 Mon Sep 17 00:00:00 2001 From: shixiao-coder Date: Thu, 28 May 2026 12:19:03 -0400 Subject: [PATCH] This commit update the model endpoint, embedding index and timestamp field to use. The recent workflow finishes properly with above fixes and the NodeEmbedding table now generates the embeddings. We also confirm the last_update_timestamp get properly write from data ingestion --- .../workflow/ingestion-helper/embedding_schema.sql | 3 +-- .../workflow/ingestion-helper/embedding_utils.py | 4 ++-- import-automation/workflow/ingestion-helper/spanner_client.py | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/embedding_schema.sql b/import-automation/workflow/ingestion-helper/embedding_schema.sql index 50d57ce3ea..a67e96f8ca 100644 --- a/import-automation/workflow/ingestion-helper/embedding_schema.sql +++ b/import-automation/workflow/ingestion-helper/embedding_schema.sql @@ -24,8 +24,7 @@ CREATE VECTOR INDEX NodeEmbeddingIndex ON NodeEmbedding(embeddings) WHERE embeddings IS NOT NULL OPTIONS ( - distance_type = 'COSINE', - flat_index = true + distance_type = 'COSINE' ); CREATE MODEL NodeEmbeddingModel diff --git a/import-automation/workflow/ingestion-helper/embedding_utils.py b/import-automation/workflow/ingestion-helper/embedding_utils.py index 333705e316..9c22d5f938 100644 --- a/import-automation/workflow/ingestion-helper/embedding_utils.py +++ b/import-automation/workflow/ingestion-helper/embedding_utils.py @@ -44,7 +44,7 @@ def get_latest_lock_timestamp(database): return None def get_updated_nodes(database, timestamp, node_types): - """Gets subject_ids and names from Node table where update_timestamp > timestamp. + """Gets subject_ids and names from Node table where last_update_timestamp > timestamp. Yields results to avoid loading all into memory. Args: @@ -55,7 +55,7 @@ def get_updated_nodes(database, timestamp, node_types): Yields: Dictionaries containing subject_id and name. """ - timestamp_condition = "update_timestamp > @timestamp" if timestamp else "TRUE" + timestamp_condition = "last_update_timestamp > @timestamp" if timestamp else "TRUE" updated_node_sql = f""" SELECT subject_id, name, types FROM Node diff --git a/import-automation/workflow/ingestion-helper/spanner_client.py b/import-automation/workflow/ingestion-helper/spanner_client.py index 97c528f4c1..8fd277d8c4 100644 --- a/import-automation/workflow/ingestion-helper/spanner_client.py +++ b/import-automation/workflow/ingestion-helper/spanner_client.py @@ -31,7 +31,7 @@ class SpannerClient: and getting/updating import statuses. """ _LOCK_ID = "global_ingestion_lock" - _EMBEDDING_MODEL_PATH = "projects/{project}/locations/{location}/publishers/google/models/{model}" + _EMBEDDING_MODEL_PATH = "//aiplatform.googleapis.com/projects/{project}/locations/{location}/publishers/google/models/{model}" def __init__(self, project_id: str,