From 0cd83fd8939084fad48e4670ef6866ad48543831 Mon Sep 17 00:00:00 2001 From: shixiao-coder Date: Fri, 29 May 2026 09:32:26 -0400 Subject: [PATCH 1/5] update the timeout for running embeddings --- .../workflow/ingestion-helper/embedding_utils.py | 10 ++++++---- import-automation/workflow/ingestion-helper/main.py | 10 ++++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/embedding_utils.py b/import-automation/workflow/ingestion-helper/embedding_utils.py index 333705e316..cd80f9c720 100644 --- a/import-automation/workflow/ingestion-helper/embedding_utils.py +++ b/import-automation/workflow/ingestion-helper/embedding_utils.py @@ -43,7 +43,7 @@ def get_latest_lock_timestamp(database): raise return None -def get_updated_nodes(database, timestamp, node_types): +def get_updated_nodes(database, timestamp, node_types, timeout=None): """Gets subject_ids and names from Node table where update_timestamp > timestamp. Yields results to avoid loading all into memory. @@ -51,6 +51,7 @@ def get_updated_nodes(database, timestamp, node_types): database: google.cloud.spanner.Database object. timestamp: datetime object to filter by. node_types: A list of strings representing the node types to filter by. + timeout: Timeout for the query. Yields: Dictionaries containing subject_id and name. @@ -78,7 +79,7 @@ def get_updated_nodes(database, timestamp, node_types): try: with database.snapshot() as snapshot: - results = snapshot.execute_sql(updated_node_sql, params=params, param_types=param_types, timeout=300) + results = snapshot.execute_sql(updated_node_sql, params=params, param_types=param_types, timeout=timeout or 300) fields = None for row in results: if fields is None: @@ -104,7 +105,7 @@ def filter_and_convert_nodes(nodes_generator): yield (node.get("subject_id"), node.get("name"), node.get("types")) -def generate_embeddings_partitioned(database, nodes_generator): +def generate_embeddings_partitioned(database, nodes_generator, timeout=None): """Generates embeddings in batches using standard transactions. Processes nodes in chunks of 500 to avoid transaction size limits. Accepts a generator to avoid loading all nodes into memory. @@ -112,6 +113,7 @@ def generate_embeddings_partitioned(database, nodes_generator): Args: database: google.cloud.spanner.Database object. nodes_generator: A generator yielding tuples containing (subject_id, embedding_content). + timeout: Timeout for the query. Returns: The number of affected rows. @@ -149,7 +151,7 @@ def chunked(iterable, n): param_types = {"nodes": Array(struct_type)} def _execute_dml(transaction): - return transaction.execute_update(embeddings_sql, params=params, param_types=param_types, timeout=300) + return transaction.execute_update(embeddings_sql, params=params, param_types=param_types, timeout=timeout or 300) try: row_count = database.run_in_transaction(_execute_dml) diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index 511d9e7108..46c686c446 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -45,11 +45,13 @@ 'is_base_dc', os.environ.get('IS_BASE_DC', 'true').lower() == 'true', 'Is base DC') +flags.DEFINE_integer('timeout', 3600, 'Timeout for Spanner queries') if not FLAGS.is_parsed(): FLAGS(['ingestion_helper']) + def _validate_params(request_json, required_params): for param in required_params: if param not in request_json: @@ -78,6 +80,8 @@ def ingestion_helper(request): location=FLAGS.location, model_id=os.environ.get('EMBEDDING_MODEL_ID', 'text-embedding-005')) + + storage = StorageClient(FLAGS.gcs_bucket_id) if action_type == 'get_import_info': @@ -244,10 +248,12 @@ def ingestion_helper(request): try: logging.info(f"Job started. Fetching all nodes for types: {node_types}") timestamp = get_latest_lock_timestamp(spanner.database) - nodes = get_updated_nodes(spanner.database, timestamp, node_types) + nodes = get_updated_nodes(spanner.database, timestamp, node_types, timeout=FLAGS.timeout) converted_nodes = filter_and_convert_nodes(nodes) - affected_rows = generate_embeddings_partitioned(spanner.database, converted_nodes) + affected_rows = generate_embeddings_partitioned(spanner.database, converted_nodes, timeout=FLAGS.timeout) + return (f"OK [Affected rows: {affected_rows}]", 200) + except Exception as e: logging.error(f"Embedding ingestion failed: {e}") return (f"Error: {e}", 500) From 862f41d241679962417f5a50120156cf5cae3216 Mon Sep 17 00:00:00 2001 From: shixiao-coder Date: Fri, 29 May 2026 10:11:27 -0400 Subject: [PATCH 2/5] Update max timeout default to be 1740 for embeddings, 1 minute earlier than the cloud workflow HTTP max timeout --- import-automation/workflow/ingestion-helper/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index 9fdc99ee55..c692173b3f 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -45,7 +45,7 @@ 'is_base_dc', os.environ.get('IS_BASE_DC', 'true').lower() == 'true', 'Is base DC') -flags.DEFINE_integer('timeout', 3600, 'Timeout for Spanner queries') +flags.DEFINE_integer('timeout', 1700, 'Timeout for Spanner queries') if not FLAGS.is_parsed(): FLAGS(['ingestion_helper']) From f948d1eed0e233c383d68578b85312e6a8b0dce7 Mon Sep 17 00:00:00 2001 From: shixiao-coder Date: Fri, 29 May 2026 11:39:36 -0400 Subject: [PATCH 3/5] Updated based on comments --- .../workflow/ingestion-helper/embedding_utils.py | 8 ++++---- import-automation/workflow/ingestion-helper/main.py | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/embedding_utils.py b/import-automation/workflow/ingestion-helper/embedding_utils.py index 48409b6870..9c7f94bb90 100644 --- a/import-automation/workflow/ingestion-helper/embedding_utils.py +++ b/import-automation/workflow/ingestion-helper/embedding_utils.py @@ -43,7 +43,7 @@ def get_latest_lock_timestamp(database): raise return None -def get_updated_nodes(database, timestamp, node_types, timeout=None): +def get_updated_nodes(database, timestamp, node_types, timeout=300): """Gets subject_ids and names from Node table where last_update_timestamp > timestamp. Yields results to avoid loading all into memory. @@ -79,7 +79,7 @@ def get_updated_nodes(database, timestamp, node_types, timeout=None): try: with database.snapshot() as snapshot: - results = snapshot.execute_sql(updated_node_sql, params=params, param_types=param_types, timeout=timeout or 300) + results = snapshot.execute_sql(updated_node_sql, params=params, param_types=param_types, timeout=timeout) fields = None for row in results: if fields is None: @@ -105,7 +105,7 @@ def filter_and_convert_nodes(nodes_generator): yield (node.get("subject_id"), node.get("name"), node.get("types")) -def generate_embeddings_partitioned(database, nodes_generator, timeout=None): +def generate_embeddings_partitioned(database, nodes_generator, timeout=300): """Generates embeddings in batches using standard transactions. Processes nodes in chunks of 500 to avoid transaction size limits. Accepts a generator to avoid loading all nodes into memory. @@ -151,7 +151,7 @@ def chunked(iterable, n): param_types = {"nodes": Array(struct_type)} def _execute_dml(transaction): - return transaction.execute_update(embeddings_sql, params=params, param_types=param_types, timeout=timeout or 300) + return transaction.execute_update(embeddings_sql, params=params, param_types=param_types, timeout=timeout) try: row_count = database.run_in_transaction(_execute_dml) diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index c692173b3f..424cbb9069 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -45,7 +45,9 @@ 'is_base_dc', os.environ.get('IS_BASE_DC', 'true').lower() == 'true', 'Is base DC') -flags.DEFINE_integer('timeout', 1700, 'Timeout for Spanner queries') +flags.DEFINE_integer( + 'timeout', int(os.environ.get('TIMEOUT', 1700)), + 'Timeout in seconds for Spanner queries') if not FLAGS.is_parsed(): FLAGS(['ingestion_helper']) From 63fdd232f1467d500558a38e01569d27e0ebff7b Mon Sep 17 00:00:00 2001 From: shixiao-coder Date: Fri, 29 May 2026 11:41:26 -0400 Subject: [PATCH 4/5] remove redundant lines --- import-automation/workflow/ingestion-helper/main.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index 424cbb9069..f12069227c 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -53,7 +53,6 @@ FLAGS(['ingestion_helper']) - def _validate_params(request_json, required_params): for param in required_params: if param not in request_json: @@ -82,8 +81,6 @@ def ingestion_helper(request): location=FLAGS.location, model_id=os.environ.get('EMBEDDING_MODEL_ID', 'text-embedding-005')) - - storage = StorageClient(FLAGS.gcs_bucket_id) if action_type == 'get_import_info': @@ -255,9 +252,7 @@ def ingestion_helper(request): nodes = get_updated_nodes(spanner.database, timestamp, node_types, timeout=FLAGS.timeout) converted_nodes = filter_and_convert_nodes(nodes) affected_rows = generate_embeddings_partitioned(spanner.database, converted_nodes, timeout=FLAGS.timeout) - return (f"OK [Affected rows: {affected_rows}]", 200) - except Exception as e: logging.error(f"Embedding ingestion failed: {e}") return (f"Error: {e}", 500) From bd2004805b563188dec810cb8a9621fa63425370 Mon Sep 17 00:00:00 2001 From: shixiao-coder Date: Fri, 29 May 2026 16:03:09 -0400 Subject: [PATCH 5/5] update descriptions and remove 300s defaults --- .../workflow/ingestion-helper/embedding_utils.py | 8 ++++---- import-automation/workflow/ingestion-helper/main.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/embedding_utils.py b/import-automation/workflow/ingestion-helper/embedding_utils.py index 9c7f94bb90..442e486d3f 100644 --- a/import-automation/workflow/ingestion-helper/embedding_utils.py +++ b/import-automation/workflow/ingestion-helper/embedding_utils.py @@ -43,7 +43,7 @@ def get_latest_lock_timestamp(database): raise return None -def get_updated_nodes(database, timestamp, node_types, timeout=300): +def get_updated_nodes(database, timestamp, node_types, timeout): """Gets subject_ids and names from Node table where last_update_timestamp > timestamp. Yields results to avoid loading all into memory. @@ -51,7 +51,7 @@ def get_updated_nodes(database, timestamp, node_types, timeout=300): database: google.cloud.spanner.Database object. timestamp: datetime object to filter by. node_types: A list of strings representing the node types to filter by. - timeout: Timeout for the query. + timeout: Timeout for the spanner client to execute queries. Yields: Dictionaries containing subject_id and name. @@ -105,7 +105,7 @@ def filter_and_convert_nodes(nodes_generator): yield (node.get("subject_id"), node.get("name"), node.get("types")) -def generate_embeddings_partitioned(database, nodes_generator, timeout=300): +def generate_embeddings_partitioned(database, nodes_generator, timeout): """Generates embeddings in batches using standard transactions. Processes nodes in chunks of 500 to avoid transaction size limits. Accepts a generator to avoid loading all nodes into memory. @@ -113,7 +113,7 @@ def generate_embeddings_partitioned(database, nodes_generator, timeout=300): Args: database: google.cloud.spanner.Database object. nodes_generator: A generator yielding tuples containing (subject_id, embedding_content). - timeout: Timeout for the query. + timeout: Timeout for the spanner client to execute queries. Returns: The number of affected rows. diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index f12069227c..f8595d6fb7 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -47,7 +47,7 @@ 'Is base DC') flags.DEFINE_integer( 'timeout', int(os.environ.get('TIMEOUT', 1700)), - 'Timeout in seconds for Spanner queries') + 'Timeout in seconds for spanner client to execute queries') if not FLAGS.is_parsed(): FLAGS(['ingestion_helper'])