From db00b2b0ab43dc479634cfb8a9dbe1cebd3e74c2 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 09:29:20 -0700 Subject: [PATCH 01/12] store: Add tool to parse and check index definitions --- store/postgres/examples/create_index.rs | 74 +++++++++++++++++++++++++ store/postgres/src/lib.rs | 2 +- 2 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 store/postgres/examples/create_index.rs diff --git a/store/postgres/examples/create_index.rs b/store/postgres/examples/create_index.rs new file mode 100644 index 00000000000..764c50d7ddb --- /dev/null +++ b/store/postgres/examples/create_index.rs @@ -0,0 +1,74 @@ +use std::{collections::HashSet, env, fs, time::Instant}; + +use graph::anyhow; +use graph_store_postgres::command_support::index::{CreateIndex, Expr}; + +/// Parse index definitions from a file and print information about any that +/// we could not parse. +/// +/// The easiest way to create a file with index definitions is to run this +/// query in psql: +/// ```sql +/// select indexdef from pg_indexes where schemaname like 'sgd%' \g /tmp/idxs.txt +/// ``` +pub fn main() -> anyhow::Result<()> { + let args: Vec = env::args().collect(); + if args.len() != 2 { + return Err(anyhow::anyhow!("usage: create_index ")); + } + let idxs = fs::read_to_string(&args[1])?; + + let mut parsed: usize = 0; + let mut failed: usize = 0; + let mut skipped: usize = 0; + let mut unknown_cols = HashSet::new(); + let start = Instant::now(); + for idxdef in idxs.lines() { + let idxdef = idxdef.trim(); + if idxdef.is_empty() || !idxdef.starts_with("CREATE") && !idxdef.starts_with("create") { + skipped += 1; + continue; + } + + let idx = CreateIndex::parse(idxdef.to_string()); + + match &idx { + CreateIndex::Parsed { columns, .. } => { + let mut failed_col = false; + for column in columns { + match column { + Expr::Unknown(expr) => { + unknown_cols.insert(expr.clone()); + failed_col = true; + break; + } + _ => { /* ok */ } + } + } + if failed_col { + failed += 1; + } else { + parsed += 1 + } + } + CreateIndex::Unknown { defn } => { + println!("Can not parse index definition: {}", defn); + failed += 1; + } + } + } + + if !unknown_cols.is_empty() { + println!("Unknown columns:"); + for col in unknown_cols { + println!(" {}", col); + } + } + + println!( + "total: {}, parsed: {parsed}, failed: {failed}, skipped: {skipped}, elapsed: {}s", + parsed + failed + skipped, + start.elapsed().as_secs() + ); + Ok(()) +} diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index 0de50af4d60..245067f119e 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -83,7 +83,7 @@ pub mod command_support { pub use crate::primary::{Connection, Mirror}; } pub mod index { - pub use crate::relational::index::{CreateIndex, Method}; + pub use crate::relational::index::{CreateIndex, Expr, Method}; } pub use crate::deployment::{on_sync, OnSync}; pub use crate::primary::Namespace; From 0b8b9df8663d0b57a18ecbdd26b52d7a09794396 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 12:05:26 -0700 Subject: [PATCH 02/12] store: Make logic in Table.as_ddl more explicit --- store/postgres/src/relational/ddl.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index cdb162978b6..3894167dc82 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -405,18 +405,19 @@ impl Table { ) -> fmt::Result { self.create_table(out)?; self.create_time_travel_indexes(catalog, out)?; - if index_def.is_some() && ENV_VARS.postpone_attribute_index_creation { - #[allow(clippy::unnecessary_unwrap)] - let arr = index_def - .unwrap() - .indexes_for_table(&self.nsp, &self.name.to_string(), self, false, false, false) - .map_err(|_| fmt::Error)?; - for (_, sql) in arr { - writeln!(out, "{};", sql).expect("properly formated index statements") + match (index_def, ENV_VARS.postpone_attribute_index_creation) { + (Some(index_def), true) => { + let arr = index_def + .indexes_for_table(&self.nsp, &self.name.to_string(), self, false, false, false) + .map_err(|_| fmt::Error)?; + for (_, sql) in arr { + writeln!(out, "{};", sql).expect("properly formated index statements") + } + } + (Some(_), false) | (None, _) => { + self.create_attribute_indexes(out)?; + self.create_aggregate_indexes(schema, out)?; } - } else { - self.create_attribute_indexes(out)?; - self.create_aggregate_indexes(schema, out)?; } Ok(()) } From 78b6e82196ec9d280a7d6cde420aa5c919d1dffd Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 14:47:40 -0700 Subject: [PATCH 03/12] store: Split the logic for IndexList.indexes_for_table up Move some of the responsibility to the caller to make it clearer what is being used --- store/postgres/src/copy.rs | 20 ++-- store/postgres/src/deployment_store.rs | 9 +- store/postgres/src/relational/ddl.rs | 13 ++- store/postgres/src/relational/ddl_tests.rs | 46 +++++---- store/postgres/src/relational/index.rs | 109 ++++++++------------- store/postgres/src/relational/prune.rs | 41 +++++--- 6 files changed, 116 insertions(+), 122 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 54c1a03a896..1f3ee205e7b 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -1233,17 +1233,15 @@ impl Connection { // the copy/graft operations. // First recreate the indexes that existed in the original subgraph. for table in state.all_tables() { - let arr = index_list.indexes_for_table( - &self.dst.site.namespace, - &table.src.name.to_string(), - &table.dst, - true, - false, - true, - )?; - - for (_, sql) in arr { - let query = sql_query(format!("{};", sql)); + let dst_nsp = self.dst.site.namespace.to_string(); + let idxs = index_list + .indexes_for_table(table.src.name.as_str(), &table.dst) + .filter(|idx| idx.to_postpone()) + .map(|idx| idx.with_nsp(dst_nsp.clone())) + .collect::, _>>()?; + + for idx in idxs { + let query = sql_query(format!("{};", idx.to_sql(false, true)?)); self.transaction(|conn| { async { query.execute(conn).await.map_err(StoreError::from) }.scope_boxed() })? diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index a9fcc833e99..21eec6b4a76 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -734,12 +734,11 @@ impl DeploymentStore { Ok(indexes.into_iter().map(CreateIndex::parse).collect()) } - /// Do not use this while already holding a connection as that can lead - /// to deadlocks pub(crate) async fn load_indexes(&self, site: Arc) -> Result { - let store = self.clone(); let mut conn = self.pool.get_permitted().await?; - IndexList::load(&mut conn, site, store).await + let layout = self.layout(&mut conn, site).await?; + + IndexList::load(&mut conn, &layout).await } /// Drops an index for a given deployment, concurrently. @@ -1639,7 +1638,7 @@ impl DeploymentStore { if ENV_VARS.postpone_attribute_index_creation { // Check if all indexes are valid and recreate them if they // aren't. - IndexList::load(&mut conn, site, self.cheap_clone()) + IndexList::load(&mut conn, &dst) .await? .recreate_invalid_indexes(&mut conn, &dst) .await?; diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index 3894167dc82..b3b95707ace 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -407,11 +407,14 @@ impl Table { self.create_time_travel_indexes(catalog, out)?; match (index_def, ENV_VARS.postpone_attribute_index_creation) { (Some(index_def), true) => { - let arr = index_def - .indexes_for_table(&self.nsp, &self.name.to_string(), self, false, false, false) - .map_err(|_| fmt::Error)?; - for (_, sql) in arr { - writeln!(out, "{};", sql).expect("properly formated index statements") + let idxs = index_def + .indexes_for_table(&self.name.to_string(), self) + .filter(|idx| !idx.to_postpone()); + for idx in idxs { + // For copies, the `index_def` is for the source table; + // we need to make sure it is for us + let idx = idx.with_nsp(self.nsp.to_string()).map_err(|_| fmt::Error)?; + writeln!(out, "{};", idx.to_sql(false, false)?)?; } } (Some(_), false) | (None, _) => { diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index 901b4daa1e5..e00b2221ee8 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -8,7 +8,7 @@ use crate::{deployment_store::generate_index_creation_sql, layout_for_tests::mak const ID_TYPE: ColumnType = ColumnType::String; -fn test_layout(gql: &str) -> Layout { +fn test_layout(gql: &str) -> Arc { let subgraph = DeploymentHash::new("subgraph").unwrap(); let schema = InputSchema::parse_latest(gql, subgraph.clone()).expect("Test schema invalid"); let namespace = Namespace::new("sgd0815".to_owned()).unwrap(); @@ -20,7 +20,7 @@ fn test_layout(gql: &str) -> Layout { } }; let catalog = Catalog::for_tests(site.clone(), ents).expect("Can not create catalog"); - Layout::new(site, &schema, catalog).expect("Failed to construct Layout") + Arc::new(Layout::new(site, &schema, catalog).expect("Failed to construct Layout")) } #[test] @@ -58,7 +58,7 @@ fn check_eqv(left: &str, right: &str) { #[test] fn test_manual_index_creation_ddl() { - let layout = Arc::new(test_layout(BOOKS_GQL)); + let layout = test_layout(BOOKS_GQL); #[track_caller] fn assert_generated_sql( @@ -206,7 +206,7 @@ impl IndexList { .to_string(), )]; indexes.insert("file_thing".to_string(), v3); - IndexList { indexes } + IndexList::new(indexes) } } @@ -376,7 +376,7 @@ fn postponed_indexes_with_block_column() { .map(|def| CreateIndex::parse(def.to_string())) .collect(), ); - IndexList { indexes } + IndexList::new(indexes) } fn cr(index: &str) -> String { @@ -414,23 +414,27 @@ fn postponed_indexes_with_block_column() { assert!(sql[0].contains(&cre(ATTR_IDX))); let dst_nsp = Namespace::new("sgd2".to_string()).unwrap(); - let arr = index_list() - .indexes_for_table(&dst_nsp, &table.name.to_string(), table, true, false, false) - .unwrap(); + let list = index_list(); + let arr: Vec<_> = list + .indexes_for_table(&table.name.to_string(), table) + .filter(|idx| idx.to_postpone()) + .map(|idx| idx.to_sql(false, false).unwrap()) + .collect(); assert_eq!(1, arr.len()); - assert!(!arr[0].1.contains(BLOCK_IDX)); - assert!(arr[0].1.contains(&cr(ATTR_IDX))); - - let arr = index_list() - .indexes_for_table( - &dst_nsp, - &table.name.to_string(), - table, - false, - false, - false, - ) - .unwrap(); + assert!(!arr[0].contains(BLOCK_IDX)); + assert!(arr[0].contains(&cr(ATTR_IDX))); + + let arr: Vec<_> = list + .indexes_for_table(&table.name.to_string(), table) + .filter(|idx| !idx.to_postpone()) + .map(|idx| { + idx.with_nsp(dst_nsp.to_string()) + .unwrap() + .to_sql(false, false) + .unwrap() + }) + .collect(); + assert_eq!(0, arr.len()); } diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index 483fab23619..f03fff3bc44 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -2,7 +2,6 @@ use anyhow::{anyhow, Error}; use std::collections::HashMap; use std::fmt::{Display, Write}; -use std::sync::Arc; use diesel::sql_query; use diesel::sql_types::{Bool, Text}; @@ -16,9 +15,6 @@ use graph::prelude::{ }; use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; -use crate::command_support::catalog::Site; -use crate::deployment_store::DeploymentStore; -use crate::primary::Namespace; use crate::relational::{BYTE_ARRAY_PREFIX_SIZE, STRING_PREFIX_SIZE}; use crate::{catalog, AsyncPgConnection}; @@ -743,70 +739,47 @@ pub struct IndexList { pub(crate) indexes: HashMap>, } -pub async fn load_indexes_from_table( - conn: &mut AsyncPgConnection, - table: &Arc, - schema_name: &str, -) -> Result, StoreError> { - let table_name = table.name.as_str(); - let indexes = catalog::indexes_for_table(conn, schema_name, table_name).await?; - Ok(indexes.into_iter().map(CreateIndex::parse).collect()) -} - impl IndexList { - pub async fn load( - conn: &mut AsyncPgConnection, - site: Arc, - store: DeploymentStore, - ) -> Result { - let mut list = IndexList { - indexes: HashMap::new(), - }; - let schema_name = site.namespace.clone(); - let layout = store.layout(conn, site).await?; + pub fn new(indexes: HashMap>) -> Self { + IndexList { indexes } + } + + pub async fn load(conn: &mut AsyncPgConnection, layout: &Layout) -> Result { + let mut indexes = HashMap::new(); + let schema_name = layout.site.namespace.clone(); for table in layout.tables.values() { - let indexes = load_indexes_from_table(conn, table, schema_name.as_str()).await?; - list.indexes.insert(table.name.to_string(), indexes); + let indexes_from_table = + catalog::indexes_for_table(conn, schema_name.as_str(), table.name.as_str()) + .await? + .into_iter() + .map(CreateIndex::parse) + .collect(); + indexes.insert(table.name.to_string(), indexes_from_table); } - Ok(list) + Ok(Self::new(indexes)) } - pub fn indexes_for_table( - &self, - namespace: &Namespace, - table_name: &String, - dest_table: &Table, - postponed: bool, - concurrent: bool, - if_not_exists: bool, - ) -> Result, String)>, Error> { - let mut arr = vec![]; - if let Some(vec) = self.indexes.get(table_name) { - for ci in vec { - // First we check if the fields do exist in the destination subgraph. - // In case of grafting that is not given. - if ci.fields_exist_in_dest(dest_table) - // Then we check if the index is one of the default indexes not based on - // the attributes. Those will be created anyway and we should skip them. - && !ci.is_default_non_attr_index() - // Then ID based indexes in the immutable tables are also created initially - // and should be skipped. - && !(ci.is_id() && dest_table.immutable) - // Finally we filter by the criteria is the index to be postponed. The ones - // that are not to be postponed we want to create during initial creation of - // the copied subgraph - && postponed == ci.to_postpone() - { - if let Ok(sql) = ci - .with_nsp(namespace.to_string())? - .to_sql(concurrent, if_not_exists) - { - arr.push((ci.name(), sql)) - } - } - } - } - Ok(arr) + pub fn indexes_for_table<'a>( + &'a self, + table_name: &str, + dest_table: &'a Table, + ) -> impl Iterator { + static EMPTY: Vec = vec![]; + let indexes = self.indexes.get(table_name).unwrap_or(&EMPTY); + + let iter = indexes.iter().filter(move |ci| { + // First we check if the fields do exist in the destination subgraph. + // In case of grafting that is not given. + ci.fields_exist_in_dest(dest_table) + // Then we check if the index is one of the default indexes not based on + // the attributes. Those will be created anyway and we should skip them. + && !ci.is_default_non_attr_index() + // Then ID based indexes in the immutable tables are also created initially + // and should be skipped. + && !(ci.is_id() && dest_table.immutable) + }); + + iter } pub async fn recreate_invalid_indexes( @@ -822,10 +795,11 @@ impl IndexList { let namespace = &layout.catalog.site.namespace; for table in layout.tables.values() { - for (ind_name, create_query) in - self.indexes_for_table(namespace, &table.name.to_string(), table, true, true, true)? - { - if let Some(index_name) = ind_name { + let idxs = self + .indexes_for_table(table.name.as_str(), table) + .filter(|idx| idx.to_postpone()); + for idx in idxs { + if let Some(index_name) = idx.name() { let table_name = table.name.clone(); let query = r#" SELECT x.indisvalid AS isvalid @@ -854,6 +828,7 @@ impl IndexList { sql_query(format!("DROP INDEX {}.{};", namespace, index_name)); drop_query.execute(conn).await?; } + let create_query = idx.to_sql(true, true)?; sql_query(create_query).execute(conn).await?; } } diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index d7747fef5e9..5b24306d775 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -11,7 +11,6 @@ use diesel_async::{ use graph::{ components::store::{PrunePhase, PruneReporter, PruneRequest, PruningStrategy, VersionStats}, prelude::{BlockNumber, CancelableError, CheapClone, StoreError, BLOCK_NUMBER_MAX}, - schema::InputSchema, slog::{warn, Logger}, }; use itertools::Itertools; @@ -26,8 +25,8 @@ use crate::{ }; use super::{ - index::{load_indexes_from_table, CreateIndex, IndexList}, - Catalog, Layout, Namespace, + index::{CreateIndex, IndexList}, + Layout, Namespace, }; pub use status::{Phase, PruneState, PruneTableState, Viewer}; @@ -51,12 +50,12 @@ impl TablePair { /// different namespace so that the names of indexes etc. don't clash async fn create( conn: &mut AsyncPgConnection, + src_layout: &Layout, src: Arc
, - src_nsp: Namespace, dst_nsp: Namespace, - schema: &InputSchema, - catalog: &Catalog, + src_indexes: &IndexList, ) -> Result { + let src_nsp = src_layout.site.namespace.clone(); let dst = src.new_like(&dst_nsp, &src.name); let mut query = String::new(); @@ -66,16 +65,20 @@ impl TablePair { let mut list = IndexList { indexes: HashMap::new(), }; - let indexes = load_indexes_from_table(conn, &src, src_nsp.as_str()) - .await? - .into_iter() + let indexes = src_indexes + .indexes_for_table(src.name.as_str(), &src) .map(|index| index.with_nsp(dst_nsp.to_string())) .collect::, _>>()?; list.indexes.insert(src.name.to_string(), indexes); // In case of pruning we don't do delayed creation of indexes, // as the asumption is that there is not that much data inserted. - dst.as_ddl(schema, catalog, Some(&list), &mut query)?; + dst.as_ddl( + &src_layout.input_schema, + &src_layout.catalog, + Some(&list), + &mut query, + )?; } conn.batch_execute(&query).await?; @@ -418,6 +421,19 @@ impl Layout { tracker.start(conn, req, &prunable_tables).await?; let dst_nsp = Namespace::prune(self.site.id); let mut recreate_dst_nsp = true; + + let index_list = IndexList::load(conn, &self).await?; + + // Go table by table; note that the subgraph writer can write in + // between the execution of the `with_lock` block below, and might + // therefore work with tables where some are pruned and some are not + // pruned yet. That does not affect correctness since we make no + // assumption about where the subgraph head is. If the subgraph + // advances during this loop, we might have an unnecessarily + // pessimistic but still safe value for `final_block`. We do assume + // that `final_block` is far enough from the subgraph head that it + // stays final even if a revert happens during this loop, but that + // is the definition of 'final' for (table, strat) in &prunable_tables { reporter.start_table(table.name.as_str()); tracker.start_table(conn, table).await?; @@ -429,11 +445,10 @@ impl Layout { } let pair = TablePair::create( conn, + &self, table.cheap_clone(), - self.site.namespace.clone(), dst_nsp.clone(), - &self.input_schema, - &self.catalog, + &index_list, ) .await?; // Copy final entities. This can happen in parallel to indexing as From c0268dd1e8c5d33fdeca8383f4aae95fcdc9989c Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 14:53:58 -0700 Subject: [PATCH 04/12] store: Remove the explicit name argument from IndexList.indexes_for_table All callers used the name of the table that was passed in, except for the use in `copy.rs`, but copying doesn't change table names, so the src and dst names of tables are the same if they exist in the dst --- store/postgres/src/copy.rs | 2 +- store/postgres/src/relational/ddl.rs | 2 +- store/postgres/src/relational/ddl_tests.rs | 4 ++-- store/postgres/src/relational/index.rs | 15 +++++++++------ store/postgres/src/relational/prune.rs | 2 +- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 1f3ee205e7b..060fc52b868 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -1235,7 +1235,7 @@ impl Connection { for table in state.all_tables() { let dst_nsp = self.dst.site.namespace.to_string(); let idxs = index_list - .indexes_for_table(table.src.name.as_str(), &table.dst) + .indexes_for_table(&table.dst) .filter(|idx| idx.to_postpone()) .map(|idx| idx.with_nsp(dst_nsp.clone())) .collect::, _>>()?; diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index b3b95707ace..58df30101c9 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -408,7 +408,7 @@ impl Table { match (index_def, ENV_VARS.postpone_attribute_index_creation) { (Some(index_def), true) => { let idxs = index_def - .indexes_for_table(&self.name.to_string(), self) + .indexes_for_table(self) .filter(|idx| !idx.to_postpone()); for idx in idxs { // For copies, the `index_def` is for the source table; diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index e00b2221ee8..d20f70e36e8 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -416,7 +416,7 @@ fn postponed_indexes_with_block_column() { let dst_nsp = Namespace::new("sgd2".to_string()).unwrap(); let list = index_list(); let arr: Vec<_> = list - .indexes_for_table(&table.name.to_string(), table) + .indexes_for_table(table) .filter(|idx| idx.to_postpone()) .map(|idx| idx.to_sql(false, false).unwrap()) .collect(); @@ -425,7 +425,7 @@ fn postponed_indexes_with_block_column() { assert!(arr[0].contains(&cr(ATTR_IDX))); let arr: Vec<_> = list - .indexes_for_table(&table.name.to_string(), table) + .indexes_for_table(table) .filter(|idx| !idx.to_postpone()) .map(|idx| { idx.with_nsp(dst_nsp.to_string()) diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index f03fff3bc44..eb3437bf053 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -759,24 +759,27 @@ impl IndexList { Ok(Self::new(indexes)) } + /// Return all indexes for `table` from this list. Only indexes that are + /// for columns that actually exist on `table` are returned. In + /// addition, indexes that are always created when a deployment is + /// created independent of the configuration are also not returned. pub fn indexes_for_table<'a>( &'a self, - table_name: &str, - dest_table: &'a Table, + table: &'a Table, ) -> impl Iterator { static EMPTY: Vec = vec![]; - let indexes = self.indexes.get(table_name).unwrap_or(&EMPTY); + let indexes = self.indexes.get(table.name.as_str()).unwrap_or(&EMPTY); let iter = indexes.iter().filter(move |ci| { // First we check if the fields do exist in the destination subgraph. // In case of grafting that is not given. - ci.fields_exist_in_dest(dest_table) + ci.fields_exist_in_dest(table) // Then we check if the index is one of the default indexes not based on // the attributes. Those will be created anyway and we should skip them. && !ci.is_default_non_attr_index() // Then ID based indexes in the immutable tables are also created initially // and should be skipped. - && !(ci.is_id() && dest_table.immutable) + && !(ci.is_id() && table.immutable) }); iter @@ -796,7 +799,7 @@ impl IndexList { let namespace = &layout.catalog.site.namespace; for table in layout.tables.values() { let idxs = self - .indexes_for_table(table.name.as_str(), table) + .indexes_for_table(table) .filter(|idx| idx.to_postpone()); for idx in idxs { if let Some(index_name) = idx.name() { diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 5b24306d775..02ea87bd0bd 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -66,7 +66,7 @@ impl TablePair { indexes: HashMap::new(), }; let indexes = src_indexes - .indexes_for_table(src.name.as_str(), &src) + .indexes_for_table(&src) .map(|index| index.with_nsp(dst_nsp.to_string())) .collect::, _>>()?; list.indexes.insert(src.name.to_string(), indexes); From 77dc9ac8427df02776898eaf7eeeb420ea8e8fb5 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 15:14:45 -0700 Subject: [PATCH 05/12] store: Add a helper to hold index creation options --- node/src/manager/commands/index.rs | 5 +- store/postgres/src/copy.rs | 29 ++++++----- store/postgres/src/lib.rs | 2 +- store/postgres/src/relational/ddl.rs | 10 +++- store/postgres/src/relational/ddl_tests.rs | 9 ++-- store/postgres/src/relational/index.rs | 56 +++++++++++++++++++++- store/postgres/src/relational/prune.rs | 2 + 7 files changed, 89 insertions(+), 24 deletions(-) diff --git a/node/src/manager/commands/index.rs b/node/src/manager/commands/index.rs index 6a5370895e5..48753448cbe 100644 --- a/node/src/manager/commands/index.rs +++ b/node/src/manager/commands/index.rs @@ -5,7 +5,7 @@ use graph::{ prelude::{anyhow, StoreError}, }; use graph_store_postgres::{ - command_support::index::{CreateIndex, Method}, + command_support::index::{CreateIndex, IndexCreator, Method}, ConnectionPool, SubgraphStore, }; use std::io::Write as _; @@ -183,8 +183,9 @@ pub async fn list( let mut term = Terminal::new(); if to_sql { + let creat = IndexCreator::new(concurrent, if_not_exists); for index in indexes { - writeln!(term, "{};", index.to_sql(concurrent, if_not_exists)?)?; + writeln!(term, "{};", creat.to_sql(&index)?)?; } } else { let mut first = true; diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 060fc52b868..ea8e678a84f 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -1001,6 +1001,18 @@ impl Connection { }) } + /// Run `callback` in a transaction using the connection in `self.conn`. + /// This will return an error if `self.conn` is `None`, which happens + /// while a background task is copying a table. + fn get_conn(&mut self) -> Result<&mut AsyncPgConnection, StoreError> { + let Some(conn) = self.conn.as_mut() else { + return Err(internal_error!( + "copy connection has been handed to background task but not returned yet (get_conn)" + )); + }; + Ok(&mut conn.inner) + } + /// Run `callback` in a transaction using the connection in `self.conn`. /// This will return an error if `self.conn` is `None`, which happens /// while a background task is copying a table. @@ -1017,12 +1029,7 @@ impl Connection { R: Send + 'a, 'a: 'conn, { - let Some(conn) = self.conn.as_mut() else { - return Err(internal_error!( - "copy connection has been handed to background task but not returned yet (transaction)" - )); - }; - let conn = &mut conn.inner; + let conn = self.get_conn()?; Ok(conn.transaction(|conn| callback(conn).scope_boxed())) } @@ -1232,6 +1239,7 @@ impl Connection { // Create indexes for all the attributes that were postponed at the start of // the copy/graft operations. // First recreate the indexes that existed in the original subgraph. + let creat = self.dst.index_creator(false, true); for table in state.all_tables() { let dst_nsp = self.dst.site.namespace.to_string(); let idxs = index_list @@ -1240,13 +1248,8 @@ impl Connection { .map(|idx| idx.with_nsp(dst_nsp.clone())) .collect::, _>>()?; - for idx in idxs { - let query = sql_query(format!("{};", idx.to_sql(false, true)?)); - self.transaction(|conn| { - async { query.execute(conn).await.map_err(StoreError::from) }.scope_boxed() - })? - .await?; - } + let conn = self.get_conn()?; + creat.execute_many(conn, &idxs).await?; } // Second create the indexes for the new fields. diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index 245067f119e..7eae15323d3 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -83,7 +83,7 @@ pub mod command_support { pub use crate::primary::{Connection, Mirror}; } pub mod index { - pub use crate::relational::index::{CreateIndex, Expr, Method}; + pub use crate::relational::index::{CreateIndex, Expr, IndexCreator, Method}; } pub use crate::deployment::{on_sync, OnSync}; pub use crate::primary::Namespace; diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index 58df30101c9..9a34e545687 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -14,7 +14,10 @@ use crate::relational::{ VID_COLUMN, }; -use super::{index::IndexList, Catalog, Column, Layout, SqlName, Table}; +use super::{ + index::{IndexCreator, IndexList}, + Catalog, Column, Layout, SqlName, Table, +}; // In debug builds (for testing etc.) unconditionally create exclusion constraints, in release // builds for production, skip them @@ -40,11 +43,13 @@ impl Layout { let mut tables = self.tables.values().collect::>(); tables.sort_by_key(|table| table.position); // Output 'create table' statements for all tables + let creat = self.index_creator(false, false); for table in tables { table.as_ddl( &self.input_schema, &self.catalog, index_def.as_ref(), + &creat, &mut out, )?; } @@ -401,6 +406,7 @@ impl Table { schema: &InputSchema, catalog: &Catalog, index_def: Option<&IndexList>, + creat: &IndexCreator, out: &mut String, ) -> fmt::Result { self.create_table(out)?; @@ -414,7 +420,7 @@ impl Table { // For copies, the `index_def` is for the source table; // we need to make sure it is for us let idx = idx.with_nsp(self.nsp.to_string()).map_err(|_| fmt::Error)?; - writeln!(out, "{};", idx.to_sql(false, false)?)?; + writeln!(out, "{};", creat.to_sql(&idx)?)?; } } (Some(_), false) | (None, _) => { diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index d20f70e36e8..187791aa9dc 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -415,10 +415,11 @@ fn postponed_indexes_with_block_column() { let dst_nsp = Namespace::new("sgd2".to_string()).unwrap(); let list = index_list(); + let creat = layout.index_creator(false, false); let arr: Vec<_> = list .indexes_for_table(table) .filter(|idx| idx.to_postpone()) - .map(|idx| idx.to_sql(false, false).unwrap()) + .map(|idx| creat.to_sql(idx).unwrap()) .collect(); assert_eq!(1, arr.len()); assert!(!arr[0].contains(BLOCK_IDX)); @@ -428,10 +429,8 @@ fn postponed_indexes_with_block_column() { .indexes_for_table(table) .filter(|idx| !idx.to_postpone()) .map(|idx| { - idx.with_nsp(dst_nsp.to_string()) - .unwrap() - .to_sql(false, false) - .unwrap() + let idx = idx.with_nsp(dst_nsp.to_string()).unwrap(); + creat.to_sql(&idx).unwrap() }) .collect(); diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index eb3437bf053..65c9a111954 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -703,7 +703,7 @@ impl CreateIndex { /// Generate a SQL statement that creates this index. If `concurrent` is /// `true`, make it a concurrent index creation. If `if_not_exists` is /// `true` add a `if not exists` clause to the index creation. - pub fn to_sql(&self, concurrent: bool, if_not_exists: bool) -> Result { + fn to_sql(&self, concurrent: bool, if_not_exists: bool) -> Result { match self { CreateIndex::Unknown { defn } => Ok(defn.to_string()), CreateIndex::Parsed { @@ -734,6 +734,60 @@ impl CreateIndex { } } +/// A helper to run or write index creation statements with options as to +/// whether to create them concurrently or only of they do not exist +pub struct IndexCreator { + concurrently: bool, + if_not_exists: bool, +} + +impl IndexCreator { + pub fn new(concurrently: bool, if_not_exists: bool) -> Self { + IndexCreator { + concurrently, + if_not_exists, + } + } + + /// Create the index `idx` in the transaction that is currently active + /// on `conn`, i.e., do not start a new transaction + pub async fn execute( + &self, + conn: &mut AsyncPgConnection, + idx: &CreateIndex, + ) -> Result<(), StoreError> { + let sql = idx.to_sql(self.concurrently, self.if_not_exists)?; + sql_query(sql).execute(conn).await?; + Ok(()) + } + + /// Create all indexes in `idxs`. Each index creation happens in its own + /// transaction, and `conn` should therefore not be in a transaction + /// when this method is called. + pub async fn execute_many( + &self, + conn: &mut AsyncPgConnection, + idxs: &[CreateIndex], + ) -> Result<(), StoreError> { + for idx in idxs { + self.execute(conn, idx).await?; + } + Ok(()) + } + + pub fn to_sql(&self, index: &CreateIndex) -> Result { + index.to_sql(self.concurrently, self.if_not_exists) + } +} + +impl Layout { + /// Create an index creator with the given options for creating indexes + /// in this layout + pub fn index_creator(&self, concurrently: bool, if_not_exists: bool) -> IndexCreator { + IndexCreator::new(concurrently, if_not_exists) + } +} + #[derive(Debug)] pub struct IndexList { pub(crate) indexes: HashMap>, diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 02ea87bd0bd..4f292b7c6f9 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -73,10 +73,12 @@ impl TablePair { // In case of pruning we don't do delayed creation of indexes, // as the asumption is that there is not that much data inserted. + let creat = src_layout.index_creator(false, false); dst.as_ddl( &src_layout.input_schema, &src_layout.catalog, Some(&list), + &creat, &mut query, )?; } From 73c28fa5ef7130ceb4e816b13401bccb9d1a9516 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 15:53:30 -0700 Subject: [PATCH 06/12] store: Use minmax_multi_ops in index creation when appropriate --- node/src/manager/commands/index.rs | 2 +- store/postgres/src/catalog.rs | 2 +- store/postgres/src/relational/index.rs | 76 ++++++++++++++++++-------- 3 files changed, 55 insertions(+), 25 deletions(-) diff --git a/node/src/manager/commands/index.rs b/node/src/manager/commands/index.rs index 48753448cbe..38d5ee196bd 100644 --- a/node/src/manager/commands/index.rs +++ b/node/src/manager/commands/index.rs @@ -183,7 +183,7 @@ pub async fn list( let mut term = Terminal::new(); if to_sql { - let creat = IndexCreator::new(concurrent, if_not_exists); + let creat = IndexCreator::new(concurrent, if_not_exists, true); for index in indexes { writeln!(term, "{};", creat.to_sql(&index)?)?; } diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 5f094548565..f93061b6f9a 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -189,7 +189,7 @@ pub struct Catalog { /// Whether the database supports `int4_minmax_multi_ops` etc. /// See the [Postgres docs](https://www.postgresql.org/docs/15/brin-builtin-opclasses.html) - has_minmax_multi_ops: bool, + pub has_minmax_multi_ops: bool, /// Whether the column `pg_stats.range_bounds_histogram` introduced in /// Postgres 17 exists. See the [Postgres diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index 65c9a111954..f057c57b9aa 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -195,8 +195,8 @@ impl Expr { return false; } for i in 0..orig.len() { - let o = orig[i].to_sql(); - let n = current[i].to_sql(); + let o = orig[i].to_sql(false); + let n = current[i].to_sql(false); // check that string n starts with o if n.len() < o.len() || n[0..o.len()] != o { @@ -206,16 +206,29 @@ impl Expr { true } - fn to_sql(&self) -> String { - match self { - Expr::Column(name) => format!("\"{}\"", name), - Expr::Prefix(name, kind) => kind.to_sql(&format!("\"{}\"", name)), - Expr::Vid => VID_COLUMN.to_string(), - Expr::Block => BLOCK_COLUMN.to_string(), - Expr::BlockRange => BLOCK_RANGE_COLUMN.to_string(), - Expr::BlockRangeLower => "lower(block_range)".to_string(), - Expr::BlockRangeUpper => "coalesce(upper(block_range), 2147483647)".to_string(), - Expr::Unknown(expr) => expr.to_string(), + /// Generate a SQL expression for this index expression. The `multi_ops` + /// indicates whether we should also print the `minmax_multi_ops` + /// operator class used for BRIN indexes. This is needed because it is + /// not the default operator class, and only supported in Postgres 14+. + fn to_sql(&self, multi_ops: bool) -> String { + const LBR: &str = "lower(block_range)"; + const LBR_MULTI: &str = "lower(block_range) int4_minmax_multi_ops"; + const UBR: &str = "coalesce(upper(block_range), 2147483647)"; + const UBR_MULTI: &str = "coalesce(upper(block_range), 2147483647) int4_minmax_multi_ops"; + const VID_MULTI: &str = "vid int8_minmax_multi_ops"; + + match (self, multi_ops) { + (Expr::Column(name), _) => format!("\"{}\"", name), + (Expr::Prefix(name, kind), _) => kind.to_sql(&format!("\"{}\"", name)), + (Expr::Vid, true) => VID_MULTI.to_string(), + (Expr::Vid, false) => VID_COLUMN.to_string(), + (Expr::Block, _) => BLOCK_COLUMN.to_string(), + (Expr::BlockRange, _) => BLOCK_RANGE_COLUMN.to_string(), + (Expr::BlockRangeLower, false) => LBR.to_string(), + (Expr::BlockRangeLower, true) => LBR_MULTI.to_string(), + (Expr::BlockRangeUpper, false) => UBR.to_string(), + (Expr::BlockRangeUpper, true) => UBR_MULTI.to_string(), + (Expr::Unknown(expr), _) => expr.to_string(), } } } @@ -703,7 +716,7 @@ impl CreateIndex { /// Generate a SQL statement that creates this index. If `concurrent` is /// `true`, make it a concurrent index creation. If `if_not_exists` is /// `true` add a `if not exists` clause to the index creation. - fn to_sql(&self, concurrent: bool, if_not_exists: bool) -> Result { + fn to_sql(&self, creat: &IndexCreator) -> Result { match self { CreateIndex::Unknown { defn } => Ok(defn.to_string()), CreateIndex::Parsed { @@ -716,10 +729,17 @@ impl CreateIndex { cond, with, } => { + let IndexCreator { + concurrently, + if_not_exists, + multi_ops, + } = creat; + // Explicit operator classes are only needed for BRIN indexes + let multi_ops = *multi_ops && method == &Method::Brin; let unique = if *unique { "unique " } else { "" }; - let concurrent = if concurrent { "concurrently " } else { "" }; - let if_not_exists = if if_not_exists { "if not exists " } else { "" }; - let columns = columns.iter().map(|c| c.to_sql()).join(", "); + let concurrent = if *concurrently { "concurrently " } else { "" }; + let if_not_exists = if *if_not_exists { "if not exists " } else { "" }; + let columns = columns.iter().map(|c| c.to_sql(multi_ops)).join(", "); let mut sql = format!("create {unique}index {concurrent}{if_not_exists}{name} on {nsp}.{table} using {method} ({columns})"); if let Some(with) = with { @@ -739,13 +759,19 @@ impl CreateIndex { pub struct IndexCreator { concurrently: bool, if_not_exists: bool, + /// Whether the shard supports the multi_ops operator classes + multi_ops: bool, } impl IndexCreator { - pub fn new(concurrently: bool, if_not_exists: bool) -> Self { + /// Create an index creator with the given options. The `multi_ops` flag + /// indicates whether the database in which we will create indexes + /// supports the `minmax_multi_ops` operator classes + pub fn new(concurrently: bool, if_not_exists: bool, multi_ops: bool) -> Self { IndexCreator { concurrently, if_not_exists, + multi_ops, } } @@ -756,7 +782,7 @@ impl IndexCreator { conn: &mut AsyncPgConnection, idx: &CreateIndex, ) -> Result<(), StoreError> { - let sql = idx.to_sql(self.concurrently, self.if_not_exists)?; + let sql = idx.to_sql(self)?; sql_query(sql).execute(conn).await?; Ok(()) } @@ -776,7 +802,7 @@ impl IndexCreator { } pub fn to_sql(&self, index: &CreateIndex) -> Result { - index.to_sql(self.concurrently, self.if_not_exists) + index.to_sql(self) } } @@ -784,7 +810,8 @@ impl Layout { /// Create an index creator with the given options for creating indexes /// in this layout pub fn index_creator(&self, concurrently: bool, if_not_exists: bool) -> IndexCreator { - IndexCreator::new(concurrently, if_not_exists) + let multi_ops = self.catalog.has_minmax_multi_ops; + IndexCreator::new(concurrently, if_not_exists, multi_ops) } } @@ -851,6 +878,7 @@ impl IndexList { } let namespace = &layout.catalog.site.namespace; + let creat = layout.index_creator(true, true); for table in layout.tables.values() { let idxs = self .indexes_for_table(table) @@ -885,8 +913,9 @@ impl IndexList { sql_query(format!("DROP INDEX {}.{};", namespace, index_name)); drop_query.execute(conn).await?; } - let create_query = idx.to_sql(true, true)?; - sql_query(create_query).execute(conn).await?; + // We are creating concurrently, which can't be done + // in a transaction + IndexCreator::execute(&creat, conn, idx).await?; } } } @@ -1013,7 +1042,8 @@ mod tests { assert_eq!(exp, act); let defn = defn.to_ascii_lowercase(); - assert_eq!(defn, act.to_sql(false, false).unwrap()); + let creat = IndexCreator::new(false, false, false); + assert_eq!(defn, creat.to_sql(&act).unwrap()); } use TestCond::*; From 8aa7959504f5b41415c49941790125c7ce7a370d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 09:44:44 -0700 Subject: [PATCH 07/12] store: Parse our own brin indexes even with minmax_multi_ops op classes --- store/postgres/src/relational/index.rs | 48 +++++++++++++++++--------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index f057c57b9aa..7c02acfa2ed 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -131,6 +131,13 @@ impl Display for Expr { } impl Expr { + const LBR: &str = "lower(block_range)"; + const LBR_MULTI: &str = "lower(block_range) int4_minmax_multi_ops"; + const UBR: &str = "coalesce(upper(block_range), 2147483647)"; + const UBR_MULTI: &str = "coalesce(upper(block_range), 2147483647) int4_minmax_multi_ops"; + const VID_MULTI: &str = "vid int8_minmax_multi_ops"; + const BLOCK_MULTI: &str = "block$ int4_minmax_multi_ops"; + fn parse(expr: &str) -> Self { use Expr::*; @@ -138,15 +145,19 @@ impl Expr { let prefix_rx = Regex::new("^(?Psubstring|left)\\((?P[a-z0-9$_]+)").unwrap(); - if expr == VID_COLUMN { + // We strip out the minmax_multi_ops operator class from the + // expression for the columns where we usually use them. They will + // be put back when we create an index, but whether they should be + // used depends on the database in which the index is created + if expr == VID_COLUMN || expr == Self::VID_MULTI { Vid - } else if expr == "lower(block_range)" { + } else if expr == Self::LBR || expr == Self::LBR_MULTI { BlockRangeLower - } else if expr == "coalesce(upper(block_range), 2147483647)" { + } else if expr == Self::UBR || expr == Self::UBR_MULTI { BlockRangeUpper } else if expr == "block_range" { BlockRange - } else if expr == "block$" { + } else if expr == BLOCK_COLUMN || expr == Self::BLOCK_MULTI { Block } else if expr .chars() @@ -211,23 +222,18 @@ impl Expr { /// operator class used for BRIN indexes. This is needed because it is /// not the default operator class, and only supported in Postgres 14+. fn to_sql(&self, multi_ops: bool) -> String { - const LBR: &str = "lower(block_range)"; - const LBR_MULTI: &str = "lower(block_range) int4_minmax_multi_ops"; - const UBR: &str = "coalesce(upper(block_range), 2147483647)"; - const UBR_MULTI: &str = "coalesce(upper(block_range), 2147483647) int4_minmax_multi_ops"; - const VID_MULTI: &str = "vid int8_minmax_multi_ops"; - match (self, multi_ops) { (Expr::Column(name), _) => format!("\"{}\"", name), (Expr::Prefix(name, kind), _) => kind.to_sql(&format!("\"{}\"", name)), - (Expr::Vid, true) => VID_MULTI.to_string(), + (Expr::Vid, true) => Self::VID_MULTI.to_string(), (Expr::Vid, false) => VID_COLUMN.to_string(), - (Expr::Block, _) => BLOCK_COLUMN.to_string(), + (Expr::Block, false) => BLOCK_COLUMN.to_string(), + (Expr::Block, true) => Self::BLOCK_MULTI.to_string(), (Expr::BlockRange, _) => BLOCK_RANGE_COLUMN.to_string(), - (Expr::BlockRangeLower, false) => LBR.to_string(), - (Expr::BlockRangeLower, true) => LBR_MULTI.to_string(), - (Expr::BlockRangeUpper, false) => UBR.to_string(), - (Expr::BlockRangeUpper, true) => UBR_MULTI.to_string(), + (Expr::BlockRangeLower, false) => Self::LBR.to_string(), + (Expr::BlockRangeLower, true) => Self::LBR_MULTI.to_string(), + (Expr::BlockRangeUpper, false) => Self::UBR.to_string(), + (Expr::BlockRangeUpper, true) => Self::UBR_MULTI.to_string(), (Expr::Unknown(expr), _) => expr.to_string(), } } @@ -1110,6 +1116,16 @@ mod tests { }; parse_one(sql, exp); + let sql = "CREATE INDEX brin_nft_transfer ON sgd4.nft_transfer USING brin (lower(block_range) int4_minmax_multi_ops, COALESCE(upper(block_range), 2147483647) int4_minmax_multi_ops, vid int8_minmax_multi_ops)"; + let act = CreateIndex::parse(sql.to_string()); + let CreateIndex::Parsed { columns, .. } = act else { + panic!("Failed to parse index"); + }; + assert_eq!( + vec![Expr::BlockRangeLower, Expr::BlockRangeUpper, Expr::Vid], + columns + ); + let sql = "create index token_block_range_closed on sgd44.token using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647)"; let exp = Parsed { unique: false, From b215f264e7632c77428bf6942f1da28667a63088 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 24 Apr 2025 12:36:20 -0700 Subject: [PATCH 08/12] store: Parse more 'where' clauses in indexes Also, test some variations of the same index definition --- store/postgres/src/relational/index.rs | 82 +++++++++++++++++++++----- 1 file changed, 66 insertions(+), 16 deletions(-) diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index 7c02acfa2ed..93a17c3719b 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -277,17 +277,26 @@ impl Cond { .map(Cond::Partial) } - if &cond == "coalesce(upper(block_range), 2147483647) < 2147483647" { + let cond = cond.trim(); + let cond = if cond.starts_with("(") && cond.ends_with(")") { + &cond[1..cond.len() - 1] + } else { + cond + }; + let cond = cond.trim(); + if cond == "coalesce(upper(block_range), 2147483647) < 2147483647" { Cond::Closed } else { - parse_partial(&cond).unwrap_or(Cond::Unknown(cond)) + parse_partial(cond).unwrap_or(Cond::Unknown(cond.to_string())) } } fn to_sql(&self) -> String { match self { - Cond::Partial(number) => format!("coalesce(upper(block_range), 2147483647) > {number}"), - Cond::Closed => "coalesce(upper(block_range), 2147483647) < 2147483647".to_string(), + Cond::Partial(number) => { + format!("(coalesce(upper(block_range), 2147483647) > {number})") + } + Cond::Closed => "(coalesce(upper(block_range), 2147483647) < 2147483647)".to_string(), Cond::Unknown(cond) => cond.to_string(), } } @@ -402,8 +411,8 @@ impl CreateIndex { "create (?Punique )?index (?P\"?[a-z0-9$_]+\"?) \ on (?Psgd[0-9]+)\\.(?P
\"?[a-z0-9$_]+\"?) \ using (?P[a-z]+) \\((?P.*?)\\)\ - ( where \\((?P.*)\\))?\ - ( with \\((?P.*)\\))?$", + ( with \\((?P.*)\\))?\ + ( where (?P.*))?$", ) .unwrap(); @@ -752,7 +761,7 @@ impl CreateIndex { write!(sql, " with ({with})")?; } if let Some(cond) = cond { - write!(sql, " where ({})", cond.to_sql())?; + write!(sql, " where {}", cond.to_sql())?; } Ok(sql) } @@ -987,7 +996,7 @@ mod tests { } } - #[derive(Debug)] + #[derive(Debug, Clone)] enum TestCond { Partial(BlockNumber), Closed, @@ -1004,7 +1013,7 @@ mod tests { } } - #[derive(Debug)] + #[derive(Debug, Clone)] struct Parsed { unique: bool, name: &'static str, @@ -1042,16 +1051,37 @@ mod tests { } #[track_caller] - fn parse_one(defn: &str, exp: Parsed) { + fn parses_to(defn: &str, exp: &Parsed) -> CreateIndex { let act = CreateIndex::parse(defn.to_string()); - let exp = CreateIndex::from(exp); + let exp = CreateIndex::from(exp.clone()); assert_eq!(exp, act); + act + } + + #[track_caller] + fn parse_one(defn: &str, exp: Parsed) { + let act = parses_to(defn, &exp); let defn = defn.to_ascii_lowercase(); let creat = IndexCreator::new(false, false, false); assert_eq!(defn, creat.to_sql(&act).unwrap()); } + // Test that the equivalent index definitions in `defns` are parsed to + // the same `CreateIndex` and that turning those index definitions into + // a SQL string produces `defns[0]` + #[track_caller] + fn parse_many(defns: &[&str], exp: Parsed) { + let act = parses_to(defns[0], &exp); + for defn in &defns[1..] { + parses_to(defn, &exp); + } + + let defn = defns[0].to_ascii_lowercase(); + let creat = IndexCreator::new(false, false, false); + assert_eq!(defn, creat.to_sql(&act).unwrap()); + } + use TestCond::*; use TestExpr::*; @@ -1223,8 +1253,10 @@ mod tests { }; parse_one(sql, exp); - let sql = - "CREATE INDEX brin_scy ON sgd314614.scy USING brin (block$, vid) where (amount > 0)"; + let sqls = &[ + "CREATE INDEX brin_scy ON sgd314614.scy USING brin (block$, vid) where amount > 0", + "CREATE INDEX brin_scy ON sgd314614.scy USING brin (block$, vid) where (amount > 0)", + ]; let exp = Parsed { unique: false, name: "brin_scy", @@ -1234,9 +1266,13 @@ mod tests { columns: &[Block, Vid], cond: Some(TestCond::Unknown("amount > 0")), }; - parse_one(sql, exp); + parse_many(sqls, exp); - let sql = "CREATE INDEX manual_token_random_cond ON sgd44.token USING btree (\"decimals\") WHERE (decimals > (5)::numeric)"; + let sqls = &[ + "CREATE INDEX manual_token_random_cond ON sgd44.token USING btree (\"decimals\") WHERE decimals > (5)::numeric", + "CREATE INDEX manual_token_random_cond ON sgd44.token USING btree (decimals) WHERE decimals > (5)::numeric", + "CREATE INDEX manual_token_random_cond ON sgd44.token USING btree (decimals) WHERE ( decimals > (5)::numeric )", + "CREATE INDEX manual_token_random_cond ON sgd44.token USING btree (\"decimals\") WHERE ( decimals > (5)::numeric )"]; let exp = Parsed { unique: false, name: "manual_token_random_cond", @@ -1246,7 +1282,21 @@ mod tests { columns: &[Name("decimals")], cond: Some(TestCond::Unknown("decimals > (5)::numeric")), }; - parse_one(sql, exp); + parse_many(sqls, exp); + + let sqls = &[ + "CREATE INDEX manual_pool_swap_enabled_total_liquidity ON sgd12.pool USING btree (\"total_liquidity\") WHERE swap_enabled", + "CREATE INDEX manual_pool_swap_enabled_total_liquidity ON sgd12.pool USING btree (total_liquidity) WHERE swap_enabled"]; + let exp = Parsed { + unique: false, + name: "manual_pool_swap_enabled_total_liquidity", + nsp: "sgd12", + table: "pool", + method: BTree, + columns: &[Name("total_liquidity")], + cond: Some(TestCond::Unknown("swap_enabled")), + }; + parse_many(sqls, exp); } #[test] From 663765a004e0724ecba62931c7a7b0e3fdb62a13 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 18 Apr 2025 09:44:56 -0700 Subject: [PATCH 09/12] store: Generate time-travel indexes with CreateIndex instead of raw SQL --- store/postgres/src/relational/ddl.rs | 71 +++++++++++++++++++++++++- store/postgres/src/relational/index.rs | 50 ++++++++++++++++++ 2 files changed, 119 insertions(+), 2 deletions(-) diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index 9a34e545687..360cf58e4f0 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -8,14 +8,14 @@ use graph::{ schema::InputSchema, }; -use crate::block_range::CAUSALITY_REGION_COLUMN; use crate::relational::{ ColumnType, BLOCK_COLUMN, BLOCK_RANGE_COLUMN, BYTE_ARRAY_PREFIX_SIZE, STRING_PREFIX_SIZE, VID_COLUMN, }; +use crate::{block_range::CAUSALITY_REGION_COLUMN, relational::index::Cond}; use super::{ - index::{IndexCreator, IndexList}, + index::{CreateIndex, Expr, IndexCreator, IndexList, Method}, Catalog, Column, Layout, SqlName, Table, }; @@ -164,6 +164,72 @@ impl Table { } } + /// Create a `CreateIndex` for an index on this table with the given + /// name over the given columns. The index will be a non-unique BTree + /// index + fn create_index(&self, name: &str, columns: Vec) -> CreateIndex { + CreateIndex::create( + name, + &self.qualified_name, + self.name.as_str(), + false, + Method::BTree, + columns, + None, + None, + ) + } + + fn time_travel_indexes(&self) -> Vec { + let mut idxs = Vec::new(); + if self.immutable { + // For immutable entities, a simple BTree on block$ is sufficient + let idx = self.create_index(&format!("{}_block", self.name), vec![Expr::Block]); + idxs.push(idx); + } else { + // Add a BRIN index on the block_range bounds to exploit the fact + // that block ranges closely correlate with where in a table an + // entity appears physically. This index is incredibly efficient for + // reverts where we look for very recent blocks, so that this index + // is highly selective. See https://github.com/graphprotocol/graph-node/issues/1415#issuecomment-630520713 + // for details on one experiment. + // + // We do not index the `block_range` as a whole, but rather the lower + // and upper bound separately, since experimentation has shown that + // Postgres will not use the index on `block_range` for clauses like + // `block_range @> $block` but rather falls back to a full table scan. + // + // We also make sure that we do not put `NULL` in the index for + // the upper bound since nulls can not be compared to anything and + // will make the index less effective. + // + // To make the index usable, queries need to have clauses using + // `lower(block_range)` and `coalesce(..)` verbatim. + // + // We also index `vid` as that correlates with the order in which + // entities are stored. + + let idx = self + .create_index( + &format!("brin_{table_name}", table_name = self.name), + vec![Expr::BlockRangeLower, Expr::BlockRangeUpper, Expr::Vid], + ) + .method(Method::Brin); + idxs.push(idx); + + // Add a BTree index that helps with the `RevertClampQuery` by making + // it faster to find entity versions that have been modified + let idx = self + .create_index( + &format!("{table_name}_block_range_closed", table_name = self.name), + vec![Expr::BlockRangeUpper], + ) + .cond(Cond::Closed); + idxs.push(idx); + } + idxs + } + fn create_time_travel_indexes(&self, catalog: &Catalog, out: &mut String) -> fmt::Result { let (int4, int8) = catalog.minmax_ops(); @@ -410,6 +476,7 @@ impl Table { out: &mut String, ) -> fmt::Result { self.create_table(out)?; + let idxs = self.time_travel_indexes(); self.create_time_travel_indexes(catalog, out)?; match (index_def, ENV_VARS.postpone_attribute_index_creation) { (Some(index_def), true) => { diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index 93a17c3719b..6f67cf830b9 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -767,6 +767,56 @@ impl CreateIndex { } } } + + pub(crate) fn method(self, method: Method) -> CreateIndex { + match self { + CreateIndex::Unknown { defn } => CreateIndex::Unknown { defn }, + CreateIndex::Parsed { + unique, + name, + nsp, + table, + method: _, + columns, + cond, + with, + } => CreateIndex::Parsed { + unique, + name, + nsp, + table, + method, + columns, + cond, + with, + }, + } + } + + pub(crate) fn cond(self, cond: Cond) -> CreateIndex { + match self { + CreateIndex::Unknown { defn } => CreateIndex::Unknown { defn }, + CreateIndex::Parsed { + unique, + name, + nsp, + table, + method, + columns, + cond: _, + with, + } => CreateIndex::Parsed { + unique, + name, + nsp, + table, + method, + columns, + cond: Some(cond), + with, + }, + } + } } /// A helper to run or write index creation statements with options as to From 6d52408a0726dcfcb70921deeacf5834d3bb8897 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 17 Feb 2026 12:33:45 -0800 Subject: [PATCH 10/12] store: Use Table::indexes() for all index creation Consolidate index creation into a single `Table::indexes()` method that returns all indexes (time-travel, attribute, aggregate) as `CreateIndex` objects. This replaces the old string-based methods and eliminates the `index_def: Option` parameter threading through the codebase. Key changes: - Add `Table::indexes()` combining time_travel + attribute + aggregate indexes - Add `attr_index_spec()` and `add_attribute_indexes()` structured helpers - Move env var check into `CreateIndex::to_postpone()` so callers need not check - Simplify `Table::as_ddl()` to iterate indexes with postpone filtering - Remove old `create_time_travel_indexes`, `create_attribute_indexes`, `create_postponed_indexes`, `create_aggregate_indexes` string methods - Remove `index_def` parameter from Layout, DeploymentStore, SubgraphStore - Update copy.rs to use `indexes()` + `references_column_not_in()` for new fields - Update prune.rs to use simplified `as_ddl()` without index_def - Update all DDL test constants for new single-line index format --- store/postgres/examples/layout.rs | 2 +- store/postgres/src/copy.rs | 52 +- store/postgres/src/deployment_store.rs | 7 - store/postgres/src/relational.rs | 4 +- store/postgres/src/relational/ddl.rs | 301 +++--- store/postgres/src/relational/ddl_tests.rs | 872 ++++++------------ store/postgres/src/relational/index.rs | 19 +- store/postgres/src/relational/prune.rs | 37 +- store/postgres/src/subgraph_store.rs | 26 +- store/test-store/tests/postgres/relational.rs | 2 +- .../tests/postgres/relational_bytes.rs | 2 +- 11 files changed, 436 insertions(+), 888 deletions(-) diff --git a/store/postgres/examples/layout.rs b/store/postgres/examples/layout.rs index cab97889cba..7556d5383b1 100644 --- a/store/postgres/examples/layout.rs +++ b/store/postgres/examples/layout.rs @@ -42,7 +42,7 @@ fn print_delete_all(layout: &Layout) { } fn print_ddl(layout: &Layout) { - let ddl = ensure(layout.as_ddl(None), "Failed to generate DDL"); + let ddl = ensure(layout.as_ddl(), "Failed to generate DDL"); println!("{}", ddl); } diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index ea8e678a84f..0af007c5798 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -12,6 +12,7 @@ //! operation can resume after an interruption, for example, because //! `graph-node` was restarted while the copy was running. use std::{ + collections::HashSet, convert::TryFrom, future::Future, pin::Pin, @@ -23,8 +24,7 @@ use std::{ }; use diesel::{ - dsl::sql, insert_into, select, sql_query, update, ExpressionMethods, OptionalExtension, - QueryDsl, + dsl::sql, insert_into, select, update, ExpressionMethods, OptionalExtension, QueryDsl, }; use diesel_async::{ scoped_futures::{ScopedBoxFuture, ScopedFutureExt}, @@ -32,6 +32,15 @@ use diesel_async::{ }; use diesel_async::{RunQueryDsl, SimpleAsyncConnection}; +use crate::{ + advisory_lock, catalog, deployment, + dynds::DataSourcesTable, + primary::{DeploymentId, Primary, Site}, + relational::{index::IndexList, Layout, Table}, + relational_queries as rq, + vid_batcher::{VidBatcher, VidRange}, + AsyncPgConnection, ConnectionPool, +}; use graph::{ futures03::{ future::{select_all, BoxFuture}, @@ -44,17 +53,6 @@ use graph::{ schema::EntityType, slog::error, }; -use itertools::Itertools; - -use crate::{ - advisory_lock, catalog, deployment, - dynds::DataSourcesTable, - primary::{DeploymentId, Primary, Site}, - relational::{index::IndexList, Layout, Table}, - relational_queries as rq, - vid_batcher::{VidBatcher, VidRange}, - AsyncPgConnection, ConnectionPool, -}; const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60); @@ -1252,26 +1250,20 @@ impl Connection { creat.execute_many(conn, &idxs).await?; } - // Second create the indexes for the new fields. - // Here we need to skip those created in the first step for the old fields. + // Second create the indexes for the new fields that don't exist in + // the source. for table in state.all_tables() { - let orig_colums = table - .src - .columns - .iter() - .map(|c| c.name.to_string()) - .collect_vec(); - for sql in table + let src_columns: HashSet<&str> = + table.src.columns.iter().map(|c| c.name.as_str()).collect(); + let new_idxs: Vec<_> = table .dst - .create_postponed_indexes(orig_colums, false) + .indexes(&self.dst.input_schema) + .map_err(|_| internal_error!("failed to generate indexes for copy"))? .into_iter() - { - let query = sql_query(sql); - self.transaction(|conn| { - async { query.execute(conn).await.map_err(StoreError::from) }.scope_boxed() - })? - .await?; - } + .filter(|idx| idx.to_postpone() && idx.references_column_not_in(&src_columns)) + .collect(); + let conn = self.get_conn()?; + creat.execute_many(conn, &new_idxs).await?; } self.copy_private_data_sources(&state).await?; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 21eec6b4a76..48ef854e3e6 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -174,11 +174,6 @@ impl DeploymentStore { DeploymentStore(Arc::new(store)) } - // Parameter index_def is used to copy over the definition of the indexes from the source subgraph - // to the destination one. This happens when it is set to Some. In this case also the BTree attribude - // indexes are created later on, when the subgraph has synced. In case this parameter is None, all - // indexes are created with the default creation strategy for a new subgraph, and also from the very - // start. pub(crate) async fn create_deployment( &self, schema: &InputSchema, @@ -186,7 +181,6 @@ impl DeploymentStore { site: Arc, replace: bool, on_sync: OnSync, - index_def: Option, ) -> Result<(), StoreError> { let mut conn = self.pool.get_permitted().await?; conn.transaction::<_, StoreError, _>(|conn| { @@ -221,7 +215,6 @@ impl DeploymentStore { site.clone(), schema, entities_with_causality_region.into_iter().collect(), - index_def, ) .await?; diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index ecd88298fac..77c5263cc67 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -43,7 +43,6 @@ use graph::schema::{ InputSchema, }; use graph::slog::warn; -use index::IndexList; use inflector::Inflector; use itertools::Itertools; use lazy_static::lazy_static; @@ -380,13 +379,12 @@ impl Layout { site: Arc, schema: &InputSchema, entities_with_causality_region: BTreeSet, - index_def: Option, ) -> Result { let catalog = Catalog::for_creation(conn, site.cheap_clone(), entities_with_causality_region).await?; let layout = Self::new(site, schema, catalog)?; let sql = layout - .as_ddl(index_def) + .as_ddl() .map_err(|_| StoreError::Unknown(anyhow!("failed to generate DDL for layout")))?; conn.batch_execute(&sql).await?; Ok(layout) diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index 360cf58e4f0..73ffe2f702f 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -3,10 +3,7 @@ use std::{ iter, }; -use graph::{ - prelude::{BLOCK_NUMBER_MAX, ENV_VARS}, - schema::InputSchema, -}; +use graph::{prelude::ENV_VARS, schema::InputSchema}; use crate::relational::{ ColumnType, BLOCK_COLUMN, BLOCK_RANGE_COLUMN, BYTE_ARRAY_PREFIX_SIZE, STRING_PREFIX_SIZE, @@ -15,8 +12,8 @@ use crate::relational::{ use crate::{block_range::CAUSALITY_REGION_COLUMN, relational::index::Cond}; use super::{ - index::{CreateIndex, Expr, IndexCreator, IndexList, Method}, - Catalog, Column, Layout, SqlName, Table, + index::{CreateIndex, Expr, IndexCreator, Method, PrefixKind}, + Column, Layout, SqlName, Table, }; // In debug builds (for testing etc.) unconditionally create exclusion constraints, in release @@ -32,7 +29,7 @@ impl Layout { /// /// See the unit tests at the end of this file for the actual DDL that /// gets generated - pub fn as_ddl(&self, index_def: Option) -> Result { + pub fn as_ddl(&self) -> Result { let mut out = String::new(); // Output enums first so table definitions can reference them @@ -45,13 +42,7 @@ impl Layout { // Output 'create table' statements for all tables let creat = self.index_creator(false, false); for table in tables { - table.as_ddl( - &self.input_schema, - &self.catalog, - index_def.as_ref(), - &creat, - &mut out, - )?; + table.as_ddl(&self.input_schema, &creat, &mut out)?; } Ok(out) @@ -170,8 +161,8 @@ impl Table { fn create_index(&self, name: &str, columns: Vec) -> CreateIndex { CreateIndex::create( name, - &self.qualified_name, - self.name.as_str(), + &format!("\"{}\"", self.nsp), + &self.name.quoted(), false, Method::BTree, columns, @@ -230,89 +221,6 @@ impl Table { idxs } - fn create_time_travel_indexes(&self, catalog: &Catalog, out: &mut String) -> fmt::Result { - let (int4, int8) = catalog.minmax_ops(); - - if self.immutable { - // For immutable entities, a simple BTree on block$ is sufficient - write!( - out, - "create index {table_name}_block\n \ - on {qname}({block});\n", - table_name = self.name, - qname = self.qualified_name, - block = BLOCK_COLUMN - ) - } else { - // Add a BRIN index on the block_range bounds to exploit the fact - // that block ranges closely correlate with where in a table an - // entity appears physically. This index is incredibly efficient for - // reverts where we look for very recent blocks, so that this index - // is highly selective. See https://github.com/graphprotocol/graph-node/issues/1415#issuecomment-630520713 - // for details on one experiment. - // - // We do not index the `block_range` as a whole, but rather the lower - // and upper bound separately, since experimentation has shown that - // Postgres will not use the index on `block_range` for clauses like - // `block_range @> $block` but rather falls back to a full table scan. - // - // We also make sure that we do not put `NULL` in the index for - // the upper bound since nulls can not be compared to anything and - // will make the index less effective. - // - // To make the index usable, queries need to have clauses using - // `lower(block_range)` and `coalesce(..)` verbatim. - // - // We also index `vid` as that correlates with the order in which - // entities are stored. - write!(out,"create index brin_{table_name}\n \ - on {qname}\n \ - using brin(lower(block_range) {int4}, coalesce(upper(block_range), {block_max}) {int4}, vid {int8});\n", - table_name = self.name, - qname = self.qualified_name, - block_max = BLOCK_NUMBER_MAX)?; - - // Add a BTree index that helps with the `RevertClampQuery` by making - // it faster to find entity versions that have been modified - write!( - out, - "create index {table_name}_block_range_closed\n \ - on {qname}(coalesce(upper(block_range), {block_max}))\n \ - where coalesce(upper(block_range), {block_max}) < {block_max};\n", - table_name = self.name, - qname = self.qualified_name, - block_max = BLOCK_NUMBER_MAX - ) - } - } - - /// Calculates the indexing method and expression for a database column. - /// - /// ### Parameters - /// * `immutable`: A boolean flag indicating whether the table is immutable. - /// * `column`: A reference to the `Column` struct, representing the database column for which the index method and expression are being calculated. - /// - /// ### Returns - /// A tuple `(String, String)` where: - /// - The first element is the indexing method ("btree", "gist", or "gin"), - /// - The second element is the index expression as a string. - fn calculate_attr_index_method_and_expression( - immutable: bool, - column: &Column, - ) -> (String, String) { - if column.is_reference() && !column.is_list() { - if immutable { - let index_expr = format!("{}, {}", column.name.quoted(), BLOCK_COLUMN); - ("btree".to_string(), index_expr) - } else { - let index_expr = format!("{}, {}", column.name.quoted(), BLOCK_RANGE_COLUMN); - ("gist".to_string(), index_expr) - } - } else { - Self::calculate_index_method_and_expression(column) - } - } - pub fn calculate_index_method_and_expression(column: &Column) -> (String, String) { let index_expr = if column.use_prefix_comparison { match column.column_type { @@ -340,62 +248,6 @@ impl Table { (method, index_expr) } - pub(crate) fn create_postponed_indexes( - &self, - skip_colums: Vec, - concurrently: bool, - ) -> Vec { - let mut indexing_queries = vec![]; - let columns = self.columns_to_index(); - - for (column_index, column) in columns.enumerate() { - let (method, index_expr) = - Self::calculate_attr_index_method_and_expression(self.immutable, column); - if !column.is_list() - && method == "btree" - && column.name.as_str() != "id" - && !skip_colums.contains(&column.name.to_string()) - { - let conc = if concurrently { "concurrently " } else { "" }; - let sql = format!( - "create index {conc}if not exists attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", - table_index = self.position, - table_name = self.name, - column_name = column.name, - qname = self.qualified_name, - ); - indexing_queries.push(sql); - } - } - indexing_queries - } - - fn create_attribute_indexes(&self, out: &mut String) -> fmt::Result { - let columns = self.columns_to_index(); - - for (column_index, column) in columns.enumerate() { - let (method, index_expr) = - Self::calculate_attr_index_method_and_expression(self.immutable, column); - - // If `create_gin_indexes` is set to false, we don't create - // indexes on array attributes. Experience has shown that these - // indexes are very expensive to update and can have a very bad - // impact on the write performance of the database, but are - // hardly ever used or needed by queries. - if !column.is_list() || ENV_VARS.store.create_gin_indexes { - write!( - out, - "create index attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", - table_index = self.position, - table_name = self.name, - column_name = column.name, - qname = self.qualified_name, - )?; - } - } - writeln!(out) - } - fn columns_to_index(&self) -> impl Iterator { // Skip columns whose type is an array of enum, since there is no // good way to index them with Postgres 9.6. Once we move to @@ -423,11 +275,82 @@ impl Table { columns } - /// If `self` is an aggregation and has cumulative aggregates, create an - /// index on the dimensions. That supports the lookup of previous - /// aggregation values we do in the rollup query since that filters by - /// all dimensions with an `=` and by timestamp with a `<` - fn create_aggregate_indexes(&self, schema: &InputSchema, out: &mut String) -> fmt::Result { + /// Return the index method and expressions for an attribute column. + fn attr_index_spec(immutable: bool, column: &Column) -> (Method, Vec) { + if column.is_reference() && !column.is_list() { + if immutable { + ( + Method::BTree, + vec![Expr::Column(column.name.as_str().to_string()), Expr::Block], + ) + } else { + ( + Method::Gist, + vec![ + Expr::Column(column.name.as_str().to_string()), + Expr::BlockRange, + ], + ) + } + } else if column.use_prefix_comparison { + match column.column_type { + ColumnType::String => ( + Method::BTree, + vec![Expr::Prefix( + column.name.as_str().to_string(), + PrefixKind::Left, + )], + ), + ColumnType::Bytes => ( + Method::BTree, + vec![Expr::Prefix( + column.name.as_str().to_string(), + PrefixKind::Substring, + )], + ), + _ => unreachable!("only String and Bytes can have arbitrary size"), + } + } else if column.is_list() || column.is_fulltext() { + ( + Method::Gin, + vec![Expr::Column(column.name.as_str().to_string())], + ) + } else { + ( + Method::BTree, + vec![Expr::Column(column.name.as_str().to_string())], + ) + } + } + + fn add_attribute_indexes(&self, indexes: &mut Vec) { + for (column_index, column) in self.columns_to_index().enumerate() { + if column.is_list() && !ENV_VARS.store.create_gin_indexes { + continue; + } + let (method, columns) = Self::attr_index_spec(self.immutable, column); + let name = format!( + "attr_{}_{}_{}_{}", + self.position, column_index, self.name, column.name + ); + indexes.push(CreateIndex::create( + &name, + &format!("\"{}\"", self.nsp), + &self.name.quoted(), + false, + method, + columns, + None, + None, + )); + } + } + + fn add_aggregate_indexes( + &self, + schema: &InputSchema, + indexes: &mut Vec, + ) -> Result<(), fmt::Error> { let agg = schema .agg_mappings() .find(|mapping| mapping.agg_type(schema) == self.object) @@ -439,29 +362,39 @@ impl Table { return Ok(()); }; - let dim_cols: Vec<_> = agg + let mut columns: Vec = agg .dimensions() .map(|dim| { self.column_for_field(&dim.name) - .map(|col| &col.name) - // We don't have a good way to return an error - // indicating that somehow the table is wrong (which - // should not happen). We can only return a generic - // formatting error + .map(|col| Expr::Column(col.name.as_str().to_string())) .map_err(|_| fmt::Error) }) .collect::>()?; + columns.push(Expr::Column("timestamp".to_string())); - write!( - out, - "create index {table_name}_dims\n on {qname}({dims}, timestamp);\n", - table_name = self.name, - qname = self.qualified_name, - dims = dim_cols.join(", ") - )?; + let name = format!("{}_dims", self.name); + indexes.push(CreateIndex::create( + &name, + &format!("\"{}\"", self.nsp), + &self.name.quoted(), + false, + Method::BTree, + columns, + None, + None, + )); Ok(()) } + /// Return all indexes for this table as `CreateIndex` objects + pub(crate) fn indexes(&self, schema: &InputSchema) -> Result, fmt::Error> { + let mut indexes = Vec::new(); + indexes.extend(self.time_travel_indexes()); + self.add_attribute_indexes(&mut indexes); + self.add_aggregate_indexes(schema, &mut indexes)?; + Ok(indexes) + } + /// Generate the DDL for one table, i.e. one `create table` statement /// and all `create index` statements for the table's columns /// @@ -470,29 +403,13 @@ impl Table { pub(crate) fn as_ddl( &self, schema: &InputSchema, - catalog: &Catalog, - index_def: Option<&IndexList>, creat: &IndexCreator, out: &mut String, ) -> fmt::Result { self.create_table(out)?; - let idxs = self.time_travel_indexes(); - self.create_time_travel_indexes(catalog, out)?; - match (index_def, ENV_VARS.postpone_attribute_index_creation) { - (Some(index_def), true) => { - let idxs = index_def - .indexes_for_table(self) - .filter(|idx| !idx.to_postpone()); - for idx in idxs { - // For copies, the `index_def` is for the source table; - // we need to make sure it is for us - let idx = idx.with_nsp(self.nsp.to_string()).map_err(|_| fmt::Error)?; - writeln!(out, "{};", creat.to_sql(&idx)?)?; - } - } - (Some(_), false) | (None, _) => { - self.create_attribute_indexes(out)?; - self.create_aggregate_indexes(schema, out)?; + for idx in self.indexes(schema)? { + if !idx.to_postpone() { + writeln!(out, "{};", creat.to_sql(&idx)?)?; } } Ok(()) diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index 187791aa9dc..6a545546fd5 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -1,4 +1,3 @@ -use index::CreateIndex; use itertools::Itertools; use pretty_assertions::assert_eq; @@ -155,94 +154,67 @@ fn test_manual_index_creation_ddl() { fn generate_postponed_indexes() { let layout = test_layout(THING_GQL); let table = layout.table(&SqlName::from("Scalar")).unwrap(); - let skip_colums = vec!["id".to_string()]; - let query_vec = table.create_postponed_indexes(skip_colums, true); - assert!(query_vec.len() == 7); - let queries = query_vec.join(" "); - check_eqv(THING_POSTPONED_INDEXES, &queries) + let postponed: Vec<_> = table + .indexes(&layout.input_schema) + .unwrap() + .into_iter() + .filter(|idx| idx.to_postpone()) + .collect(); + assert_eq!(7, postponed.len()); + + let creat = layout.index_creator(true, true); + let queries: Vec<_> = postponed + .iter() + .map(|idx| creat.to_sql(idx).unwrap()) + .collect(); + let queries = queries.join(";\n"); + check_eqv(THING_POSTPONED_INDEXES, &queries); } const THING_POSTPONED_INDEXES: &str = r#" create index concurrently if not exists attr_1_1_scalar_bool - on "sgd0815"."scalar" using btree("bool"); - create index concurrently if not exists attr_1_2_scalar_int - on "sgd0815"."scalar" using btree("int"); - create index concurrently if not exists attr_1_3_scalar_big_decimal - on "sgd0815"."scalar" using btree("big_decimal"); - create index concurrently if not exists attr_1_4_scalar_string - on "sgd0815"."scalar" using btree(left("string", 256)); - create index concurrently if not exists attr_1_5_scalar_bytes - on "sgd0815"."scalar" using btree(substring("bytes", 1, 64)); - create index concurrently if not exists attr_1_6_scalar_big_int - on "sgd0815"."scalar" using btree("big_int"); - create index concurrently if not exists attr_1_7_scalar_color - on "sgd0815"."scalar" using btree("color"); + on "sgd0815"."scalar" using btree ("bool"); +create index concurrently if not exists attr_1_2_scalar_int + on "sgd0815"."scalar" using btree ("int"); +create index concurrently if not exists attr_1_3_scalar_big_decimal + on "sgd0815"."scalar" using btree ("big_decimal"); +create index concurrently if not exists attr_1_4_scalar_string + on "sgd0815"."scalar" using btree (left("string", 256)); +create index concurrently if not exists attr_1_5_scalar_bytes + on "sgd0815"."scalar" using btree (substring("bytes", 1, 64)); +create index concurrently if not exists attr_1_6_scalar_big_int + on "sgd0815"."scalar" using btree ("big_int"); +create index concurrently if not exists attr_1_7_scalar_color + on "sgd0815"."scalar" using btree ("color") "#; -impl IndexList { - fn mock_thing_index_list() -> Self { - let mut indexes: HashMap> = HashMap::new(); - let v1 = vec![ - CreateIndex::parse(r#"create index thing_id_block_range_excl on sgd0815.thing using gist (id, block_range)"#.to_string()), - CreateIndex::parse(r#"create index brin_thing on sgd0815."thing" using brin (lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops)"#.to_string()), - // fixme: enable the index bellow once the parsing of statements is fixed, and BlockRangeUpper in particular (issue #5512) - // CreateIndex::parse(r#"create index thing_block_range_closed on sgd0815."thing" using btree (coalesce(upper(block_range), 2147483647)) where coalesce((upper(block_range), 2147483647) < 2147483647)"#.to_string()), - CreateIndex::parse(r#"create index attr_0_0_thing_id on sgd0815."thing" using btree (id)"#.to_string()), - CreateIndex::parse(r#"create index attr_0_1_thing_big_thing on sgd0815."thing" using gist (big_thing, block_range)"#.to_string()), - ]; - indexes.insert("thing".to_string(), v1); - let v2 = vec![ - CreateIndex::parse(r#"create index attr_1_0_scalar_id on sgd0815."scalar" using btree (id)"#.to_string(),), - CreateIndex::parse(r#"create index attr_1_1_scalar_bool on sgd0815."scalar" using btree (bool)"#.to_string(),), - CreateIndex::parse(r#"create index attr_1_2_scalar_int on sgd0815."scalar" using btree (int)"#.to_string(),), - CreateIndex::parse(r#"create index attr_1_3_scalar_big_decimal on sgd0815."scalar" using btree (big_decimal)"#.to_string()), - CreateIndex::parse(r#"create index attr_1_4_scalar_string on sgd0815."scalar" using btree (left(string, 256))"#.to_string()), - CreateIndex::parse(r#"create index attr_1_5_scalar_bytes on sgd0815."scalar" using btree (substring(bytes, 1, 64))"#.to_string()), - CreateIndex::parse(r#"create index attr_1_6_scalar_big_int on sgd0815."scalar" using btree (big_int)"#.to_string()), - CreateIndex::parse(r#"create index attr_1_7_scalar_color on sgd0815."scalar" using btree (color)"#.to_string()), - ]; - indexes.insert("scalar".to_string(), v2); - let v3 = vec![CreateIndex::parse( - r#"create index attr_2_0_file_thing_id on sgd0815."file_thing" using btree (id)"# - .to_string(), - )]; - indexes.insert("file_thing".to_string(), v3); - IndexList::new(indexes) - } -} - #[test] fn generate_ddl() { let layout = test_layout(THING_GQL); - let sql = layout.as_ddl(None).expect("Failed to generate DDL"); + let sql = layout.as_ddl().expect("Failed to generate DDL"); assert_eq!(THING_DDL, &sql); // Use `assert_eq!` to also test the formatting. - let il = IndexList::mock_thing_index_list(); - let layout = test_layout(THING_GQL); - let sql = layout.as_ddl(Some(il)).expect("Failed to generate DDL"); - check_eqv(THING_DDL_ON_COPY, &sql); - let layout = test_layout(MUSIC_GQL); - let sql = layout.as_ddl(None).expect("Failed to generate DDL"); + let sql = layout.as_ddl().expect("Failed to generate DDL"); check_eqv(MUSIC_DDL, &sql); let layout = test_layout(FOREST_GQL); - let sql = layout.as_ddl(None).expect("Failed to generate DDL"); + let sql = layout.as_ddl().expect("Failed to generate DDL"); check_eqv(FOREST_DDL, &sql); let layout = test_layout(FULLTEXT_GQL); - let sql = layout.as_ddl(None).expect("Failed to generate DDL"); + let sql = layout.as_ddl().expect("Failed to generate DDL"); check_eqv(FULLTEXT_DDL, &sql); let layout = test_layout(FORWARD_ENUM_GQL); - let sql = layout.as_ddl(None).expect("Failed to generate DDL"); + let sql = layout.as_ddl().expect("Failed to generate DDL"); check_eqv(FORWARD_ENUM_SQL, &sql); let layout = test_layout(TS_GQL); - let sql = layout.as_ddl(None).expect("Failed to generate DDL"); + let sql = layout.as_ddl().expect("Failed to generate DDL"); check_eqv(TS_SQL, &sql); let layout = test_layout(LIFETIME_GQL); - let sql = layout.as_ddl(None).expect("Failed to generate DDL"); + let sql = layout.as_ddl().expect("Failed to generate DDL"); check_eqv(LIFETIME_SQL, &sql); } @@ -350,91 +322,44 @@ fn can_copy_from() { ); } -/// Check that we do not create the index on `block$` twice. There was a bug -/// that if an immutable entity type had a `block` field and index creation -/// was postponed, we would emit the index on `block$` twice, once from -/// `Table.create_time_travel_indexes` and once through -/// `IndexList.indexes_for_table` +/// Check that we do not create the index on `block$` twice. With the new +/// `Table::indexes()` approach, the `block$` index (time-travel) and the +/// `attr_*_block` index (attribute) are both generated by `indexes()` but +/// only the attribute one can be postponed. #[test] fn postponed_indexes_with_block_column() { - fn index_list() -> IndexList { - // To generate this list, print the output of `layout.as_ddl(None)`, run - // that in Postgres and do `select indexdef from pg_indexes where - // schemaname = 'sgd0815'` - const INDEX_DEFS: &[&str] = &[ - "CREATE UNIQUE INDEX data_pkey ON sgd0815.data USING btree (vid)", - "CREATE UNIQUE INDEX data_id_key ON sgd0815.data USING btree (id)", - "CREATE INDEX data_block ON sgd0815.data USING btree (block$)", - "CREATE INDEX attr_1_0_data_block ON sgd0815.data USING btree (block, \"block$\")", - ]; - - let mut indexes: HashMap> = HashMap::new(); - indexes.insert( - "data".to_string(), - INDEX_DEFS - .iter() - .map(|def| CreateIndex::parse(def.to_string())) - .collect(), - ); - IndexList::new(indexes) - } - fn cr(index: &str) -> String { format!("create index{}", index) } - fn cre(index: &str) -> String { - format!("create index if not exists{}", index) - } - - // Names of the two indexes we are interested in. Not the leading space + // Names of the two indexes we are interested in. Note the leading space // to guard a little against overlapping names const BLOCK_IDX: &str = " data_block"; const ATTR_IDX: &str = " attr_1_0_data_block"; let layout = test_layout(BLOCK_GQL); - // Create everything - let sql = layout.as_ddl(None).unwrap(); - assert!(sql.contains(&cr(BLOCK_IDX))); - assert!(sql.contains(&cr(ATTR_IDX))); - - // Defer attribute indexes - let sql = layout.as_ddl(Some(index_list())).unwrap(); + // Create everything; when postpone is enabled (debug builds), the + // attribute btree index is omitted from the DDL + let sql = layout.as_ddl().unwrap(); assert!(sql.contains(&cr(BLOCK_IDX))); - assert!(!sql.contains(ATTR_IDX)); - // This used to be duplicated + // The block$ time-travel index appears exactly once let count = sql.matches(BLOCK_IDX).count(); assert_eq!(1, count); let table = layout.table(&SqlName::from("Data")).unwrap(); - let sql = table.create_postponed_indexes(vec![], false); - assert_eq!(1, sql.len()); - assert!(!sql[0].contains(BLOCK_IDX)); - assert!(sql[0].contains(&cre(ATTR_IDX))); - - let dst_nsp = Namespace::new("sgd2".to_string()).unwrap(); - let list = index_list(); - let creat = layout.index_creator(false, false); - let arr: Vec<_> = list - .indexes_for_table(table) + let postponed: Vec<_> = table + .indexes(&layout.input_schema) + .unwrap() + .into_iter() .filter(|idx| idx.to_postpone()) - .map(|idx| creat.to_sql(idx).unwrap()) - .collect(); - assert_eq!(1, arr.len()); - assert!(!arr[0].contains(BLOCK_IDX)); - assert!(arr[0].contains(&cr(ATTR_IDX))); - - let arr: Vec<_> = list - .indexes_for_table(table) - .filter(|idx| !idx.to_postpone()) - .map(|idx| { - let idx = idx.with_nsp(dst_nsp.to_string()).unwrap(); - creat.to_sql(&idx).unwrap() - }) .collect(); + assert_eq!(1, postponed.len()); - assert_eq!(0, arr.len()); + let creat = layout.index_creator(false, false); + let sql = creat.to_sql(&postponed[0]).unwrap(); + assert!(!sql.contains(BLOCK_IDX)); + assert!(sql.contains(&cr(ATTR_IDX))); } const THING_GQL: &str = r#" @@ -477,17 +402,10 @@ create type sgd0815."size" alter table "sgd0815"."thing" add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_thing - on "sgd0815"."thing" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index thing_block_range_closed - on "sgd0815"."thing"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_0_0_thing_id - on "sgd0815"."thing" using btree("id"); -create index attr_0_1_thing_big_thing - on "sgd0815"."thing" using gist("big_thing", block_range); - +create index brin_thing on "sgd0815"."thing" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index thing_block_range_closed on "sgd0815"."thing" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_0_0_thing_id on "sgd0815"."thing" using btree ("id"); +create index attr_0_1_thing_big_thing on "sgd0815"."thing" using gist ("big_thing", block_range); create table "sgd0815"."scalar" ( vid bigint primary key, @@ -504,29 +422,9 @@ create index attr_0_1_thing_big_thing alter table "sgd0815"."scalar" add constraint scalar_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_scalar - on "sgd0815"."scalar" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index scalar_block_range_closed - on "sgd0815"."scalar"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_1_0_scalar_id - on "sgd0815"."scalar" using btree("id"); -create index attr_1_1_scalar_bool - on "sgd0815"."scalar" using btree("bool"); -create index attr_1_2_scalar_int - on "sgd0815"."scalar" using btree("int"); -create index attr_1_3_scalar_big_decimal - on "sgd0815"."scalar" using btree("big_decimal"); -create index attr_1_4_scalar_string - on "sgd0815"."scalar" using btree(left("string", 256)); -create index attr_1_5_scalar_bytes - on "sgd0815"."scalar" using btree(substring("bytes", 1, 64)); -create index attr_1_6_scalar_big_int - on "sgd0815"."scalar" using btree("big_int"); -create index attr_1_7_scalar_color - on "sgd0815"."scalar" using btree("color"); - +create index brin_scalar on "sgd0815"."scalar" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index scalar_block_range_closed on "sgd0815"."scalar" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_1_0_scalar_id on "sgd0815"."scalar" using btree ("id"); create table "sgd0815"."file_thing" ( vid bigint primary key, @@ -537,85 +435,9 @@ create index attr_1_7_scalar_color alter table "sgd0815"."file_thing" add constraint file_thing_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_file_thing - on "sgd0815"."file_thing" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index file_thing_block_range_closed - on "sgd0815"."file_thing"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_2_0_file_thing_id - on "sgd0815"."file_thing" using btree("id"); - -"#; - -const THING_DDL_ON_COPY: &str = r#"create type sgd0815."color" - as enum ('BLUE', 'red', 'yellow'); -create type sgd0815."size" - as enum ('large', 'medium', 'small'); - - create table "sgd0815"."thing" ( - vid bigint primary key, - block_range int4range not null, - "id" text not null, - "big_thing" text not null - ); - - alter table "sgd0815"."thing" - add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_thing - on "sgd0815"."thing" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index thing_block_range_closed - on "sgd0815"."thing"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_0_0_thing_id - on sgd0815."thing" using btree ("id"); -create index attr_0_1_thing_big_thing - on sgd0815."thing" using gist ("big_thing", block_range); - - - create table "sgd0815"."scalar" ( - vid bigint primary key, - block_range int4range not null, - "id" text not null, - "bool" boolean, - "int" int4, - "big_decimal" numeric, - "string" text, - "bytes" bytea, - "big_int" numeric, - "color" "sgd0815"."color" - ); - - alter table "sgd0815"."scalar" - add constraint scalar_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_scalar - on "sgd0815"."scalar" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index scalar_block_range_closed - on "sgd0815"."scalar"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_1_0_scalar_id - on sgd0815."scalar" using btree ("id"); - - - create table "sgd0815"."file_thing" ( - vid bigint primary key, - block_range int4range not null, - causality_region int not null, - "id" text not null - ); - - alter table "sgd0815"."file_thing" - add constraint file_thing_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_file_thing - on "sgd0815"."file_thing" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index file_thing_block_range_closed - on "sgd0815"."file_thing"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_2_0_file_thing_id - on sgd0815."file_thing" using btree ("id"); +create index brin_file_thing on "sgd0815"."file_thing" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index file_thing_block_range_closed on "sgd0815"."file_thing" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_2_0_file_thing_id on "sgd0815"."file_thing" using btree ("id"); "#; const BOOKS_GQL: &str = r#"type Author @entity { @@ -659,84 +481,59 @@ type SongStat @entity { song: Song @derivedFrom(field: "id") played: Int! }"#; -const MUSIC_DDL: &str = r#"create table "sgd0815"."musician" ( +const MUSIC_DDL: &str = r#" + create table "sgd0815"."musician" ( vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, "main_band" text, "bands" text[] not null -); -alter table "sgd0815"."musician" - add constraint musician_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_musician - on "sgd0815"."musician" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index musician_block_range_closed - on "sgd0815"."musician"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_0_0_musician_id - on "sgd0815"."musician" using btree("id"); -create index attr_0_1_musician_name - on "sgd0815"."musician" using btree(left("name", 256)); -create index attr_0_2_musician_main_band - on "sgd0815"."musician" using gist("main_band", block_range); - -create table "sgd0815"."band" ( + ); + + alter table "sgd0815"."musician" + add constraint musician_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_musician on "sgd0815"."musician" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index musician_block_range_closed on "sgd0815"."musician" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_0_0_musician_id on "sgd0815"."musician" using btree ("id"); +create index attr_0_2_musician_main_band on "sgd0815"."musician" using gist ("main_band", block_range); + + create table "sgd0815"."band" ( vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, "original_songs" text[] not null -); -alter table "sgd0815"."band" - add constraint band_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_band - on "sgd0815"."band" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index band_block_range_closed - on "sgd0815"."band"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_1_0_band_id - on "sgd0815"."band" using btree("id"); -create index attr_1_1_band_name - on "sgd0815"."band" using btree(left("name", 256)); - -create table "sgd0815"."song" ( + ); + + alter table "sgd0815"."band" + add constraint band_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_band on "sgd0815"."band" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index band_block_range_closed on "sgd0815"."band" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_1_0_band_id on "sgd0815"."band" using btree ("id"); + + create table "sgd0815"."song" ( vid bigint primary key, - block$ int not null, - "id" text not null, + block$ int not null, +"id" text not null, "title" text not null, "written_by" text not null, - unique(id) -); -create index song_block - on "sgd0815"."song"(block$); -create index attr_2_0_song_title - on "sgd0815"."song" using btree(left("title", 256)); -create index attr_2_1_song_written_by - on "sgd0815"."song" using btree("written_by", block$); - -create table "sgd0815"."song_stat" ( + ); +create index song_block on "sgd0815"."song" using btree (block$); + + create table "sgd0815"."song_stat" ( vid bigint primary key, block_range int4range not null, "id" text not null, "played" int4 not null -); -alter table "sgd0815"."song_stat" - add constraint song_stat_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_song_stat - on "sgd0815"."song_stat" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index song_stat_block_range_closed - on "sgd0815"."song_stat"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_3_0_song_stat_id - on "sgd0815"."song_stat" using btree("id"); -create index attr_3_1_song_stat_played - on "sgd0815"."song_stat" using btree("played"); + ); + alter table "sgd0815"."song_stat" + add constraint song_stat_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_song_stat on "sgd0815"."song_stat" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index song_stat_block_range_closed on "sgd0815"."song_stat" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_3_0_song_stat_id on "sgd0815"."song_stat" using btree ("id"); "#; const FOREST_GQL: &str = r#" @@ -760,61 +557,47 @@ type Habitat @entity { dwellers: [ForestDweller!]! }"#; -const FOREST_DDL: &str = r#"create table "sgd0815"."animal" ( +const FOREST_DDL: &str = r#" + create table "sgd0815"."animal" ( vid bigint primary key, block_range int4range not null, "id" text not null, "forest" text -); -alter table "sgd0815"."animal" - add constraint animal_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_animal - on "sgd0815"."animal" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index animal_block_range_closed - on "sgd0815"."animal"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_0_0_animal_id - on "sgd0815"."animal" using btree("id"); -create index attr_0_1_animal_forest - on "sgd0815"."animal" using gist("forest", block_range); - -create table "sgd0815"."forest" ( - vid bigint primary key, - block_range int4range not null, - "id" text not null -); -alter table "sgd0815"."forest" - add constraint forest_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_forest - on "sgd0815"."forest" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index forest_block_range_closed - on "sgd0815"."forest"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_1_0_forest_id - on "sgd0815"."forest" using btree("id"); - -create table "sgd0815"."habitat" ( + ); + + alter table "sgd0815"."animal" + add constraint animal_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_animal on "sgd0815"."animal" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index animal_block_range_closed on "sgd0815"."animal" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_0_0_animal_id on "sgd0815"."animal" using btree ("id"); +create index attr_0_1_animal_forest on "sgd0815"."animal" using gist ("forest", block_range); + + create table "sgd0815"."forest" ( + vid bigint primary key, + block_range int4range not null, + "id" text not null + ); + + alter table "sgd0815"."forest" + add constraint forest_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_forest on "sgd0815"."forest" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index forest_block_range_closed on "sgd0815"."forest" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_1_0_forest_id on "sgd0815"."forest" using btree ("id"); + + create table "sgd0815"."habitat" ( vid bigint primary key, block_range int4range not null, "id" text not null, "most_common" text not null, "dwellers" text[] not null -); -alter table "sgd0815"."habitat" - add constraint habitat_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_habitat - on "sgd0815"."habitat" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index habitat_block_range_closed - on "sgd0815"."habitat"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_2_0_habitat_id - on "sgd0815"."habitat" using btree("id"); -create index attr_2_1_habitat_most_common - on "sgd0815"."habitat" using gist("most_common", block_range); + ); + alter table "sgd0815"."habitat" + add constraint habitat_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_habitat on "sgd0815"."habitat" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index habitat_block_range_closed on "sgd0815"."habitat" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_2_0_habitat_id on "sgd0815"."habitat" using btree ("id"); +create index attr_2_1_habitat_most_common on "sgd0815"."habitat" using gist ("most_common", block_range); "#; const FULLTEXT_GQL: &str = r#" type _Schema_ @fulltext( @@ -847,7 +630,8 @@ type Habitat @entity { dwellers: [Animal!]! }"#; -const FULLTEXT_DDL: &str = r#"create table "sgd0815"."animal" ( +const FULLTEXT_DDL: &str = r#" + create table "sgd0815"."animal" ( vid bigint primary key, block_range int4range not null, "id" text not null, @@ -855,63 +639,42 @@ const FULLTEXT_DDL: &str = r#"create table "sgd0815"."animal" ( "species" text not null, "forest" text, "search" tsvector -); -alter table "sgd0815"."animal" - add constraint animal_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_animal - on "sgd0815"."animal" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index animal_block_range_closed - on "sgd0815"."animal"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_0_0_animal_id - on "sgd0815"."animal" using btree("id"); -create index attr_0_1_animal_name - on "sgd0815"."animal" using btree(left("name", 256)); -create index attr_0_2_animal_species - on "sgd0815"."animal" using btree(left("species", 256)); -create index attr_0_3_animal_forest - on "sgd0815"."animal" using gist("forest", block_range); -create index attr_0_4_animal_search - on "sgd0815"."animal" using gin("search"); - -create table "sgd0815"."forest" ( + ); + + alter table "sgd0815"."animal" + add constraint animal_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_animal on "sgd0815"."animal" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index animal_block_range_closed on "sgd0815"."animal" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_0_0_animal_id on "sgd0815"."animal" using btree ("id"); +create index attr_0_3_animal_forest on "sgd0815"."animal" using gist ("forest", block_range); +create index attr_0_4_animal_search on "sgd0815"."animal" using gin ("search"); + + create table "sgd0815"."forest" ( vid bigint primary key, block_range int4range not null, "id" text not null -); -alter table "sgd0815"."forest" - add constraint forest_id_block_range_excl exclude using gist (id with =, block_range with &&); - -create index brin_forest - on "sgd0815"."forest" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index forest_block_range_closed - on "sgd0815"."forest"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_1_0_forest_id - on "sgd0815"."forest" using btree("id"); - -create table "sgd0815"."habitat" ( + ); + + alter table "sgd0815"."forest" + add constraint forest_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_forest on "sgd0815"."forest" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index forest_block_range_closed on "sgd0815"."forest" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_1_0_forest_id on "sgd0815"."forest" using btree ("id"); + + create table "sgd0815"."habitat" ( vid bigint primary key, block_range int4range not null, "id" text not null, "most_common" text not null, "dwellers" text[] not null -); -alter table "sgd0815"."habitat" - add constraint habitat_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_habitat - on "sgd0815"."habitat" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index habitat_block_range_closed - on "sgd0815"."habitat"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_2_0_habitat_id - on "sgd0815"."habitat" using btree("id"); -create index attr_2_1_habitat_most_common - on "sgd0815"."habitat" using gist("most_common", block_range); + ); + alter table "sgd0815"."habitat" + add constraint habitat_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_habitat on "sgd0815"."habitat" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index habitat_block_range_closed on "sgd0815"."habitat" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_2_0_habitat_id on "sgd0815"."habitat" using btree ("id"); +create index attr_2_1_habitat_most_common on "sgd0815"."habitat" using gist ("most_common", block_range); "#; const FORWARD_ENUM_GQL: &str = r#" @@ -927,25 +690,19 @@ enum Orientation { const FORWARD_ENUM_SQL: &str = r#"create type sgd0815."orientation" as enum ('DOWN', 'UP'); -create table "sgd0815"."thing" ( + + create table "sgd0815"."thing" ( vid bigint primary key, block_range int4range not null, "id" text not null, "orientation" "sgd0815"."orientation" not null -); -alter table "sgd0815"."thing" - add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&); -create index brin_thing - on "sgd0815"."thing" - using brin(lower(block_range) int4_minmax_ops, coalesce(upper(block_range), 2147483647) int4_minmax_ops, vid int8_minmax_ops); -create index thing_block_range_closed - on "sgd0815"."thing"(coalesce(upper(block_range), 2147483647)) - where coalesce(upper(block_range), 2147483647) < 2147483647; -create index attr_0_0_thing_id - on "sgd0815"."thing" using btree("id"); -create index attr_0_1_thing_orientation - on "sgd0815"."thing" using btree("orientation"); + ); + alter table "sgd0815"."thing" + add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&); +create index brin_thing on "sgd0815"."thing" using brin (lower(block_range), coalesce(upper(block_range), 2147483647), vid); +create index thing_block_range_closed on "sgd0815"."thing" using btree (coalesce(upper(block_range), 2147483647)) where (coalesce(upper(block_range), 2147483647) < 2147483647); +create index attr_0_0_thing_id on "sgd0815"."thing" using btree ("id"); "#; const TS_GQL: &str = r#" @@ -964,56 +721,38 @@ type Stats @aggregation(intervals: ["hour", "day"], source: "Data") { "#; const TS_SQL: &str = r#" -create table "sgd0815"."data" ( - vid bigint primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "amount" numeric not null, - unique(id) -); -create index data_block - on "sgd0815"."data"(block$); -create index attr_0_0_data_timestamp - on "sgd0815"."data" using btree("timestamp"); -create index attr_0_1_data_amount - on "sgd0815"."data" using btree("amount"); - -create table "sgd0815"."stats_hour" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "volume" numeric not null, - "max_price" numeric not null, - unique(id) -); -create index stats_hour_block - on "sgd0815"."stats_hour"(block$); -create index attr_1_0_stats_hour_timestamp - on "sgd0815"."stats_hour" using btree("timestamp"); -create index attr_1_1_stats_hour_volume - on "sgd0815"."stats_hour" using btree("volume"); -create index attr_1_2_stats_hour_max_price - on "sgd0815"."stats_hour" using btree("max_price"); - -create table "sgd0815"."stats_day" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "volume" numeric not null, - "max_price" numeric not null, - unique(id) -); -create index stats_day_block - on "sgd0815"."stats_day"(block$); -create index attr_2_0_stats_day_timestamp - on "sgd0815"."stats_day" using btree("timestamp"); -create index attr_2_1_stats_day_volume - on "sgd0815"."stats_day" using btree("volume"); -create index attr_2_2_stats_day_max_price - on "sgd0815"."stats_day" using btree("max_price");"#; + create table "sgd0815"."data" ( + vid bigint primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "amount" numeric not null, + unique(id) + ); +create index data_block on "sgd0815"."data" using btree (block$); + + create table "sgd0815"."stats_hour" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "volume" numeric not null, + "max_price" numeric not null, + unique(id) + ); +create index stats_hour_block on "sgd0815"."stats_hour" using btree (block$); + + create table "sgd0815"."stats_day" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "volume" numeric not null, + "max_price" numeric not null, + unique(id) + ); +create index stats_day_block on "sgd0815"."stats_day" using btree (block$); +"#; const LIFETIME_GQL: &str = r#" type Data @entity(timeseries: true) { @@ -1055,144 +794,87 @@ const LIFETIME_GQL: &str = r#" "#; const LIFETIME_SQL: &str = r#" -create table "sgd0815"."data" ( - vid bigint primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "group_1" int4 not null, - "group_2" int4 not null, - "amount" numeric not null, - unique(id) -); -create index data_block -on "sgd0815"."data"(block$); -create index attr_0_0_data_timestamp -on "sgd0815"."data" using btree("timestamp"); -create index attr_0_1_data_group_1 -on "sgd0815"."data" using btree("group_1"); -create index attr_0_2_data_group_2 -on "sgd0815"."data" using btree("group_2"); -create index attr_0_3_data_amount -on "sgd0815"."data" using btree("amount"); - -create table "sgd0815"."stats_1_hour" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "volume" numeric not null, - unique(id) -); -create index stats_1_hour_block -on "sgd0815"."stats_1_hour"(block$); -create index attr_1_0_stats_1_hour_timestamp -on "sgd0815"."stats_1_hour" using btree("timestamp"); -create index attr_1_1_stats_1_hour_volume -on "sgd0815"."stats_1_hour" using btree("volume"); - - -create table "sgd0815"."stats_1_day" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "volume" numeric not null, - unique(id) -); -create index stats_1_day_block -on "sgd0815"."stats_1_day"(block$); -create index attr_2_0_stats_1_day_timestamp -on "sgd0815"."stats_1_day" using btree("timestamp"); -create index attr_2_1_stats_1_day_volume -on "sgd0815"."stats_1_day" using btree("volume"); - - -create table "sgd0815"."stats_2_hour" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "group_1" int4 not null, - "volume" numeric not null, - unique(id) -); -create index stats_2_hour_block -on "sgd0815"."stats_2_hour"(block$); -create index attr_5_0_stats_2_hour_timestamp -on "sgd0815"."stats_2_hour" using btree("timestamp"); -create index attr_5_1_stats_2_hour_group_1 -on "sgd0815"."stats_2_hour" using btree("group_1"); -create index attr_5_2_stats_2_hour_volume -on "sgd0815"."stats_2_hour" using btree("volume"); -create index stats_2_hour_dims -on "sgd0815"."stats_2_hour"(group_1, timestamp); - -create table "sgd0815"."stats_2_day" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "group_1" int4 not null, - "volume" numeric not null, - unique(id) -); -create index stats_2_day_block -on "sgd0815"."stats_2_day"(block$); -create index attr_6_0_stats_2_day_timestamp -on "sgd0815"."stats_2_day" using btree("timestamp"); -create index attr_6_1_stats_2_day_group_1 -on "sgd0815"."stats_2_day" using btree("group_1"); -create index attr_6_2_stats_2_day_volume -on "sgd0815"."stats_2_day" using btree("volume"); -create index stats_2_day_dims -on "sgd0815"."stats_2_day"(group_1, timestamp); - -create table "sgd0815"."stats_3_hour" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "group_2" int4 not null, - "group_1" int4 not null, - "volume" numeric not null, - unique(id) -); -create index stats_3_hour_block -on "sgd0815"."stats_3_hour"(block$); -create index attr_7_0_stats_3_hour_timestamp -on "sgd0815"."stats_3_hour" using btree("timestamp"); -create index attr_7_1_stats_3_hour_group_2 -on "sgd0815"."stats_3_hour" using btree("group_2"); -create index attr_7_2_stats_3_hour_group_1 -on "sgd0815"."stats_3_hour" using btree("group_1"); -create index attr_7_3_stats_3_hour_volume -on "sgd0815"."stats_3_hour" using btree("volume"); -create index stats_3_hour_dims -on "sgd0815"."stats_3_hour"(group_2, group_1, timestamp); - -create table "sgd0815"."stats_3_day" ( - vid bigserial primary key, - block$ int not null, - "id" int8 not null, - "timestamp" timestamptz not null, - "group_2" int4 not null, - "group_1" int4 not null, - "volume" numeric not null, - unique(id) -); -create index stats_3_day_block -on "sgd0815"."stats_3_day"(block$); -create index attr_8_0_stats_3_day_timestamp -on "sgd0815"."stats_3_day" using btree("timestamp"); -create index attr_8_1_stats_3_day_group_2 -on "sgd0815"."stats_3_day" using btree("group_2"); -create index attr_8_2_stats_3_day_group_1 -on "sgd0815"."stats_3_day" using btree("group_1"); -create index attr_8_3_stats_3_day_volume -on "sgd0815"."stats_3_day" using btree("volume"); -create index stats_3_day_dims -on "sgd0815"."stats_3_day"(group_2, group_1, timestamp); + create table "sgd0815"."data" ( + vid bigint primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "group_1" int4 not null, + "group_2" int4 not null, + "amount" numeric not null, + unique(id) + ); +create index data_block on "sgd0815"."data" using btree (block$); + + create table "sgd0815"."stats_1_hour" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "volume" numeric not null, + unique(id) + ); +create index stats_1_hour_block on "sgd0815"."stats_1_hour" using btree (block$); + + create table "sgd0815"."stats_1_day" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "volume" numeric not null, + unique(id) + ); +create index stats_1_day_block on "sgd0815"."stats_1_day" using btree (block$); + + create table "sgd0815"."stats_2_hour" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "group_1" int4 not null, + "volume" numeric not null, + unique(id) + ); +create index stats_2_hour_block on "sgd0815"."stats_2_hour" using btree (block$); +create index stats_2_hour_dims on "sgd0815"."stats_2_hour" using btree ("group_1", "timestamp"); + + create table "sgd0815"."stats_2_day" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "group_1" int4 not null, + "volume" numeric not null, + unique(id) + ); +create index stats_2_day_block on "sgd0815"."stats_2_day" using btree (block$); +create index stats_2_day_dims on "sgd0815"."stats_2_day" using btree ("group_1", "timestamp"); + + create table "sgd0815"."stats_3_hour" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "group_2" int4 not null, + "group_1" int4 not null, + "volume" numeric not null, + unique(id) + ); +create index stats_3_hour_block on "sgd0815"."stats_3_hour" using btree (block$); +create index stats_3_hour_dims on "sgd0815"."stats_3_hour" using btree ("group_2", "group_1", "timestamp"); + + create table "sgd0815"."stats_3_day" ( + vid bigserial primary key, + block$ int not null, +"id" int8 not null, + "timestamp" timestamptz not null, + "group_2" int4 not null, + "group_1" int4 not null, + "volume" numeric not null, + unique(id) + ); +create index stats_3_day_block on "sgd0815"."stats_3_day" using btree (block$); +create index stats_3_day_dims on "sgd0815"."stats_3_day" using btree ("group_2", "group_1", "timestamp"); "#; const BLOCK_GQL: &str = r#" diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index 6f67cf830b9..3840b4db7ed 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -11,7 +11,7 @@ use graph::itertools::Itertools; use graph::prelude::{ lazy_static, regex::{Captures, Regex}, - BlockNumber, + BlockNumber, ENV_VARS, }; use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; @@ -640,6 +640,10 @@ impl CreateIndex { } pub fn to_postpone(&self) -> bool { + if !ENV_VARS.postpone_attribute_index_creation { + return false; + } + fn has_prefix(s: &str, prefix: &str) -> bool { s.starts_with(prefix) || s.ends_with("\"") && s.starts_with(format!("\"{}", prefix).as_str()) @@ -670,6 +674,19 @@ impl CreateIndex { } } + /// Return `true` if any of the attribute columns (user-defined columns, + /// not infrastructure columns like `vid`, `block$`, etc.) referenced by + /// this index are NOT in `columns`. + pub fn references_column_not_in(&self, columns: &std::collections::HashSet<&str>) -> bool { + match self { + CreateIndex::Unknown { .. } => false, + CreateIndex::Parsed { columns: exprs, .. } => exprs.iter().any(|expr| match expr { + Expr::Column(name) | Expr::Prefix(name, _) => !columns.contains(name.as_str()), + _ => false, + }), + } + } + pub fn fields_exist_in_dest(&self, dest_table: &Table) -> bool { fn column_exists(dest_table: &Table, column_name: &str) -> bool { dest_table diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 4f292b7c6f9..90e3ea0a429 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt::Write, sync::Arc}; +use std::{fmt::Write, sync::Arc}; use diesel::{ sql_query, @@ -24,10 +24,7 @@ use crate::{ AsyncPgConnection, }; -use super::{ - index::{CreateIndex, IndexList}, - Layout, Namespace, -}; +use super::{Layout, Namespace}; pub use status::{Phase, PruneState, PruneTableState, Viewer}; @@ -53,7 +50,6 @@ impl TablePair { src_layout: &Layout, src: Arc
, dst_nsp: Namespace, - src_indexes: &IndexList, ) -> Result { let src_nsp = src_layout.site.namespace.clone(); let dst = src.new_like(&dst_nsp, &src.name); @@ -62,25 +58,10 @@ impl TablePair { if catalog::table_exists(conn, dst_nsp.as_str(), &dst.name).await? { writeln!(query, "truncate table {};", dst.qualified_name)?; } else { - let mut list = IndexList { - indexes: HashMap::new(), - }; - let indexes = src_indexes - .indexes_for_table(&src) - .map(|index| index.with_nsp(dst_nsp.to_string())) - .collect::, _>>()?; - list.indexes.insert(src.name.to_string(), indexes); - // In case of pruning we don't do delayed creation of indexes, // as the asumption is that there is not that much data inserted. let creat = src_layout.index_creator(false, false); - dst.as_ddl( - &src_layout.input_schema, - &src_layout.catalog, - Some(&list), - &creat, - &mut query, - )?; + dst.as_ddl(&src_layout.input_schema, &creat, &mut query)?; } conn.batch_execute(&query).await?; @@ -424,8 +405,6 @@ impl Layout { let dst_nsp = Namespace::prune(self.site.id); let mut recreate_dst_nsp = true; - let index_list = IndexList::load(conn, &self).await?; - // Go table by table; note that the subgraph writer can write in // between the execution of the `with_lock` block below, and might // therefore work with tables where some are pruned and some are not @@ -445,14 +424,8 @@ impl Layout { catalog::recreate_schema(conn, dst_nsp.as_str()).await?; recreate_dst_nsp = false; } - let pair = TablePair::create( - conn, - &self, - table.cheap_clone(), - dst_nsp.clone(), - &index_list, - ) - .await?; + let pair = TablePair::create(conn, &self, table.cheap_clone(), dst_nsp.clone()) + .await?; // Copy final entities. This can happen in parallel to indexing as // that part of the table will not change pair.copy_final_entities( diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 0d2e5009828..0317d1ae176 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -413,30 +413,8 @@ impl SubgraphStore { }; // Create the actual databases schema and metadata entries - let index_def = if let Some(graft) = graft_base { - if let Some(site) = self.sites.get(graft) { - let store = self - .stores - .get(&site.shard) - .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - - Some(store.load_indexes(site).await?) - } else { - None - } - } else { - None - }; - deployment_store - .create_deployment( - schema, - deployment, - site.clone(), - replace, - OnSync::None, - index_def, - ) + .create_deployment(schema, deployment, site.clone(), replace, OnSync::None) .await?; // FIXME: This simultaneously holds a `primary_conn` and a shard connection, which can @@ -769,7 +747,6 @@ impl Inner { ))); } let deployment = src_store.load_deployment(src.clone()).await?; - let index_def = src_store.load_indexes(src.clone()).await?; // Transmogrify the deployment into a new one let deployment = DeploymentCreate { @@ -799,7 +776,6 @@ impl Inner { dst.clone(), false, on_sync, - Some(index_def), ) .await?; diff --git a/store/test-store/tests/postgres/relational.rs b/store/test-store/tests/postgres/relational.rs index 4a029709408..20f5c54d063 100644 --- a/store/test-store/tests/postgres/relational.rs +++ b/store/test-store/tests/postgres/relational.rs @@ -508,7 +508,7 @@ async fn create_schema(conn: &mut AsyncPgConnection) -> Layout { let query = format!("create schema {}", NAMESPACE.as_str()); conn.batch_execute(&query).await.unwrap(); - Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new(), None) + Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new()) .await .expect("Failed to create relational schema") } diff --git a/store/test-store/tests/postgres/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs index 470d4e17412..9da764e449f 100644 --- a/store/test-store/tests/postgres/relational_bytes.rs +++ b/store/test-store/tests/postgres/relational_bytes.rs @@ -171,7 +171,7 @@ async fn create_schema(conn: &mut AsyncPgConnection) -> Layout { NAMESPACE.clone(), NETWORK_NAME.to_string(), ); - Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new(), None) + Layout::create_relational_schema(conn, Arc::new(site), &schema, BTreeSet::new()) .await .expect("Failed to create relational schema") } From 33094d487ddfce439d15d8435a8c70b7a824fb07 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 17 Feb 2026 12:50:01 -0800 Subject: [PATCH 11/12] core, store: Create postponed indexes when approaching chain head Add a trigger that creates postponed indexes when a subgraph gets within a configurable number of blocks (default 10000) of the chain head. This ensures indexes are in place before the subgraph starts serving queries. The new env var GRAPH_POSTPONE_INDEXES_CREATION_THRESHOLD controls how many blocks before the chain head to trigger index creation. The creation is idempotent (IF NOT EXISTS + CONCURRENTLY) and only attempted once per subgraph run via an AtomicBool guard. --- core/src/subgraph/runner/mod.rs | 16 ++++++++++++++ core/src/subgraph/state.rs | 3 +++ graph/src/components/store/traits.rs | 6 +++++ graph/src/env/mod.rs | 11 ++++++++++ store/postgres/src/deployment_store.rs | 23 +++++++++++++++++++- store/postgres/src/writable.rs | 15 +++++++++++++ store/test-store/tests/graph/entity_cache.rs | 4 ++++ 7 files changed, 77 insertions(+), 1 deletion(-) diff --git a/core/src/subgraph/runner/mod.rs b/core/src/subgraph/runner/mod.rs index 88238a7fadb..059588e4da0 100644 --- a/core/src/subgraph/runner/mod.rs +++ b/core/src/subgraph/runner/mod.rs @@ -107,6 +107,7 @@ where ), entity_lfu_cache: LfuCache::new(), cached_head_ptr: None, + postponed_indexes_created: false, }, logger, metrics, @@ -731,6 +732,21 @@ where let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; + if !self.state.postponed_indexes_created + && close_to_chain_head( + &block_ptr, + &self.state.cached_head_ptr, + ENV_VARS.postpone_indexes_creation_threshold, + ) + { + self.state.postponed_indexes_created = true; + self.inputs + .store + .create_postponed_indexes() + .await + .non_deterministic()?; + } + self.inputs .store .transact_block_operations( diff --git a/core/src/subgraph/state.rs b/core/src/subgraph/state.rs index 0ce6ab48b15..c1da4b43199 100644 --- a/core/src/subgraph/state.rs +++ b/core/src/subgraph/state.rs @@ -16,4 +16,7 @@ pub struct IndexingState { pub skip_ptr_updates_timer: Instant, pub entity_lfu_cache: EntityLfuCache, pub cached_head_ptr: Option, + /// Set to `true` once postponed indexes have been created. This + /// ensures we only trigger index creation once per subgraph run. + pub postponed_indexes_created: bool, } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 32fc1fbb81f..526fba5f92e 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -447,6 +447,12 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker { async fn health(&self) -> Result; + /// Create indexes whose creation was postponed at deployment time. + /// This should be called when a subgraph gets close to the chain + /// head. Calling it when all postponed indexes already exist is safe + /// and a no-op. + async fn create_postponed_indexes(&self) -> Result<(), StoreError>; + /// Wait for the background writer to finish processing its queue async fn flush(&self) -> Result<(), StoreError>; diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 536a383e0da..3f295931632 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -172,6 +172,14 @@ pub struct EnvVars { /// /// Set the flag `GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION`. Off by default. pub postpone_attribute_index_creation: bool, + /// When a subgraph gets within this many blocks of the chain head, + /// create any indexes whose creation was postponed. Only has an effect + /// when `postpone_attribute_index_creation` is set. + /// + /// Set by the environment variable + /// `GRAPH_POSTPONE_INDEXES_CREATION_THRESHOLD`. The default value is + /// 10000. + pub postpone_indexes_creation_threshold: BlockNumber, /// Verbose logging of mapping inputs. /// /// Set by the flag `GRAPH_LOG_TRIGGER_DATA`. Off by @@ -345,6 +353,7 @@ impl EnvVars { enable_select_by_specific_attributes: inner.enable_select_by_specific_attributes.0, postpone_attribute_index_creation: inner.postpone_attribute_index_creation.0 || cfg!(debug_assertions), + postpone_indexes_creation_threshold: inner.postpone_indexes_creation_threshold, log_trigger_data: inner.log_trigger_data.0, explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs), explorer_lock_threshold: Duration::from_millis(inner.explorer_lock_threshold_in_msec), @@ -549,6 +558,8 @@ struct Inner { enable_select_by_specific_attributes: EnvVarBoolean, #[envconfig(from = "GRAPH_POSTPONE_ATTRIBUTE_INDEX_CREATION", default = "false")] postpone_attribute_index_creation: EnvVarBoolean, + #[envconfig(from = "GRAPH_POSTPONE_INDEXES_CREATION_THRESHOLD", default = "10000")] + postpone_indexes_creation_threshold: i32, #[envconfig(from = "GRAPH_LOG_TRIGGER_DATA", default = "false")] log_trigger_data: EnvVarBoolean, #[envconfig(from = "GRAPH_EXPLORER_TTL", default = "10")] diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 48ef854e3e6..8cfcd02a44a 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -50,7 +50,7 @@ use crate::deployment::{self, OnSync}; use crate::detail::ErrorDetail; use crate::dynds::DataSourcesTable; use crate::primary::{DeploymentId, Primary}; -use crate::relational::index::{CreateIndex, IndexList, Method}; +use crate::relational::index::{CreateIndex, IndexCreator, IndexList, Method}; use crate::relational::{self, Layout, LayoutCache, SqlName, Table, STATEMENT_TIMEOUT}; use crate::relational_queries::{FromEntityData, JSONData}; use crate::{advisory_lock, catalog, retry, AsyncPgConnection}; @@ -1647,6 +1647,27 @@ impl DeploymentStore { Ok(()) } + /// Create all indexes whose creation was postponed when the + /// deployment was first created. Using `IF NOT EXISTS` and + /// `CONCURRENTLY` makes this safe to call even when some or all + /// indexes already exist. + pub(crate) async fn create_postponed_indexes(&self, site: Arc) -> Result<(), StoreError> { + let layout = self.find_layout(site).await?; + let creat = layout.index_creator(true, true); + let mut conn = self.pool.get_permitted().await?; + for table in layout.tables.values() { + let indexes = table.indexes(&layout.input_schema).map_err(|e| { + StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e)) + })?; + for idx in indexes { + if idx.to_postpone() { + IndexCreator::execute(&creat, &mut conn, &idx).await?; + } + } + } + Ok(()) + } + // If the current block of the deployment is the same as the fatal error, // we revert all block operations to it's parent/previous block. // diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 958ba2def76..c92df777ae5 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -503,6 +503,12 @@ impl SyncStore { .await } + async fn create_postponed_indexes(&self) -> Result<(), StoreError> { + self.writable + .create_postponed_indexes(self.site.cheap_clone()) + .await + } + fn input_schema(&self) -> InputSchema { self.input_schema.cheap_clone() } @@ -1608,6 +1614,7 @@ pub struct WritableStore { // Cached to avoid querying the database. is_deployment_synced: AtomicBool, + postponed_indexes_created: AtomicBool, } impl WritableStore { @@ -1649,6 +1656,7 @@ impl WritableStore { block_cursor, writer, is_deployment_synced: AtomicBool::new(is_deployment_synced), + postponed_indexes_created: AtomicBool::new(false), }) } @@ -1892,6 +1900,13 @@ impl WritableStoreTrait for WritableStore { self.store.health().await } + async fn create_postponed_indexes(&self) -> Result<(), StoreError> { + if !self.postponed_indexes_created.swap(true, Ordering::SeqCst) { + self.store.create_postponed_indexes().await?; + } + Ok(()) + } + async fn flush(&self) -> Result<(), StoreError> { self.writer.flush().await } diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 9b9f2c4c574..eea22f145e7 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -169,6 +169,10 @@ impl WritableStore for MockStore { unimplemented!() } + async fn create_postponed_indexes(&self) -> Result<(), StoreError> { + unimplemented!() + } + async fn flush(&self) -> Result<(), StoreError> { unimplemented!() } From cf2f440d2a04b596d411688cb2b98afaa916614b Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 17 Feb 2026 12:58:56 -0800 Subject: [PATCH 12/12] store: Create postponed indexes on subgraph restart Replace the IndexList-based `recreate_invalid_indexes` call in `start_subgraph()` with a call to `create_postponed_indexes()`. This uses `IF NOT EXISTS` and `CONCURRENTLY` to safely create any missing postponed indexes on every restart, acting as a safety net. Remove the now-unused `IndexList::recreate_invalid_indexes` method. --- store/postgres/src/deployment_store.rs | 14 +++---- store/postgres/src/relational/index.rs | 58 -------------------------- 2 files changed, 5 insertions(+), 67 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 8cfcd02a44a..052b3e9a21a 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1627,19 +1627,15 @@ impl DeploymentStore { .await?; } - let mut conn = self.pool.get_permitted().await?; - if ENV_VARS.postpone_attribute_index_creation { - // Check if all indexes are valid and recreate them if they - // aren't. - IndexList::load(&mut conn, &dst) - .await? - .recreate_invalid_indexes(&mut conn, &dst) - .await?; - } + // Create any indexes whose creation was postponed when the + // deployment was first created. Using `IF NOT EXISTS` and + // `CONCURRENTLY` makes this safe to call on every restart. + self.create_postponed_indexes(site.cheap_clone()).await?; // Make sure the block pointer is set. This is important for newly // deployed subgraphs so that we respect the 'startBlock' setting // the first time the subgraph is started + let mut conn = self.pool.get_permitted().await?; conn.transaction(|conn| { crate::deployment::initialize_block_ptr(conn, &dst.site).scope_boxed() }) diff --git a/store/postgres/src/relational/index.rs b/store/postgres/src/relational/index.rs index 3840b4db7ed..e3b698788ba 100644 --- a/store/postgres/src/relational/index.rs +++ b/store/postgres/src/relational/index.rs @@ -4,7 +4,6 @@ use std::collections::HashMap; use std::fmt::{Display, Write}; use diesel::sql_query; -use diesel::sql_types::{Bool, Text}; use diesel_async::RunQueryDsl; use graph::components::store::StoreError; use graph::itertools::Itertools; @@ -947,63 +946,6 @@ impl IndexList { iter } - - pub async fn recreate_invalid_indexes( - &self, - conn: &mut AsyncPgConnection, - layout: &Layout, - ) -> Result<(), StoreError> { - #[derive(QueryableByName, Debug)] - struct IndexInfo { - #[diesel(sql_type = Bool)] - isvalid: bool, - } - - let namespace = &layout.catalog.site.namespace; - let creat = layout.index_creator(true, true); - for table in layout.tables.values() { - let idxs = self - .indexes_for_table(table) - .filter(|idx| idx.to_postpone()); - for idx in idxs { - if let Some(index_name) = idx.name() { - let table_name = table.name.clone(); - let query = r#" - SELECT x.indisvalid AS isvalid - FROM pg_index x - JOIN pg_class c ON c.oid = x.indrelid - JOIN pg_class i ON i.oid = x.indexrelid - LEFT JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE (c.relkind = ANY (ARRAY ['r'::"char", 'm'::"char", 'p'::"char"])) - AND (i.relkind = ANY (ARRAY ['i'::"char", 'I'::"char"])) - AND (n.nspname = $1) - AND (c.relname = $2) - AND (i.relname = $3);"#; - let ii_vec = sql_query(query) - .bind::(namespace.to_string()) - .bind::(table_name) - .bind::(index_name.clone()) - .get_results::(conn) - .await? - .into_iter() - .collect::>(); - assert!(ii_vec.len() <= 1); - if ii_vec.is_empty() || !ii_vec[0].isvalid { - // if a bad index exist lets first drop it - if !ii_vec.is_empty() { - let drop_query = - sql_query(format!("DROP INDEX {}.{};", namespace, index_name)); - drop_query.execute(conn).await?; - } - // We are creating concurrently, which can't be done - // in a transaction - IndexCreator::execute(&creat, conn, idx).await?; - } - } - } - } - Ok(()) - } } #[cfg(test)]