diff --git a/apps/framework-cli-e2e/test/utils/database-utils.ts b/apps/framework-cli-e2e/test/utils/database-utils.ts index 28a86a2531..1d7b8d6c56 100644 --- a/apps/framework-cli-e2e/test/utils/database-utils.ts +++ b/apps/framework-cli-e2e/test/utils/database-utils.ts @@ -255,6 +255,8 @@ export interface ExpectedColumn { type: string | RegExp; // Allow regex for complex type matching nullable?: boolean; comment?: string; + codec?: string | RegExp; + materialized?: string | RegExp; } /** @@ -433,6 +435,52 @@ export const validateTableSchema = async ( `Column '${expectedCol.name}' comment mismatch: expected '${expectedCol.comment}', got '${actualCol.comment}'`, ); } + + // Codec validation (if specified) + if (expectedCol.codec !== undefined) { + const actualCodec = actualCol.codec_expression; + let codecMatches = false; + + if (typeof expectedCol.codec === "string") { + // Exact string match + codecMatches = actualCodec === expectedCol.codec; + } else if (expectedCol.codec instanceof RegExp) { + // Regex match for complex codec expressions + codecMatches = expectedCol.codec.test(actualCodec); + } + + if (!codecMatches) { + errors.push( + `Column '${expectedCol.name}' codec mismatch: expected '${expectedCol.codec}', got '${actualCodec}'`, + ); + } + } + + // Materialized validation (if specified) + if (expectedCol.materialized !== undefined) { + const actualMaterialized = actualCol.default_expression; + const actualDefaultType = actualCol.default_type; + let materializedMatches = false; + + // Check that it's actually a MATERIALIZED column + if (actualDefaultType === "MATERIALIZED") { + if (typeof expectedCol.materialized === "string") { + // Exact string match + materializedMatches = + actualMaterialized === expectedCol.materialized; + } else if (expectedCol.materialized instanceof RegExp) { + // Regex match for complex expressions + materializedMatches = + expectedCol.materialized.test(actualMaterialized); + } + } + + if (!materializedMatches) { + errors.push( + `Column '${expectedCol.name}' materialized mismatch: expected '${expectedCol.materialized}', got '${actualDefaultType === "MATERIALIZED" ? actualMaterialized : "(not materialized)"}'`, + ); + } + } } // Check for unexpected columns (optional - could be made configurable) diff --git a/apps/framework-cli-e2e/test/utils/schema-definitions.ts b/apps/framework-cli-e2e/test/utils/schema-definitions.ts index ce622995ee..82a6e946fe 100644 --- a/apps/framework-cli-e2e/test/utils/schema-definitions.ts +++ b/apps/framework-cli-e2e/test/utils/schema-definitions.ts @@ -421,6 +421,44 @@ export const TYPESCRIPT_TEST_SCHEMAS: ExpectedTableSchema[] = [ { name: "payloadBasic", type: "JSON(count Int64, name String)" }, ], }, + // Codec compression test table + { + tableName: "CodecTest", + columns: [ + { name: "id", type: "String" }, + { name: "timestamp", type: /DateTime\('UTC'\)/, codec: /Delta.*LZ4/ }, + { name: "log_blob", type: "JSON", codec: "ZSTD(3)" }, + { name: "combination_hash", type: "Array(UInt64)", codec: "ZSTD(1)" }, + { name: "temperature", type: "Float64", codec: /Gorilla.*ZSTD/ }, + { name: "request_count", type: "Float64", codec: /DoubleDelta.*LZ4/ }, + { name: "user_agent", type: "String", codec: "ZSTD(3)" }, + { name: "tags", type: "Array(String)", codec: "LZ4" }, + { name: "status_code", type: "Float64" }, + ], + }, + // Materialized column test table + { + tableName: "MaterializedTest", + columns: [ + { name: "id", type: "String" }, + { name: "timestamp", type: /DateTime\('UTC'\)/ }, + { name: "userId", type: "String" }, + { + name: "eventDate", + type: /Date(32)?/, + materialized: "toDate(timestamp)", + }, + { name: "userHash", type: "UInt64", materialized: "cityHash64(userId)" }, + { name: "log_blob", type: "JSON", codec: "ZSTD(3)" }, + { + name: "combinationHash", + type: "Array(UInt64)", + materialized: + "arrayMap(kv -> cityHash64(kv.1, kv.2), JSONExtractKeysAndValuesRaw(toString(log_blob)))", + codec: "ZSTD(1)", + }, + ], + }, ]; // ============ PYTHON TEMPLATE SCHEMA DEFINITIONS ============ @@ -805,6 +843,48 @@ export const PYTHON_TEST_SCHEMAS: ExpectedTableSchema[] = [ { name: "payload_basic", type: "JSON(count Int64, name String)" }, ], }, + // Codec compression test table + { + tableName: "CodecTest", + columns: [ + { name: "id", type: "String" }, + { name: "timestamp", type: /DateTime\('UTC'\)/, codec: /Delta.*LZ4/ }, + { name: "log_blob", type: "JSON", codec: "ZSTD(3)" }, + { name: "combination_hash", type: "Array(UInt64)", codec: "ZSTD(1)" }, + { name: "temperature", type: "Float64", codec: /Gorilla.*ZSTD/ }, + { name: "request_count", type: "Float64", codec: /DoubleDelta.*LZ4/ }, + { name: "user_agent", type: "String", codec: "ZSTD(3)" }, + { name: "tags", type: "Array(String)", codec: "LZ4" }, + { name: "status_code", type: "Float64" }, + ], + }, + // Materialized column test table + { + tableName: "MaterializedTest", + columns: [ + { name: "id", type: "String" }, + { name: "timestamp", type: /DateTime\('UTC'\)/ }, + { name: "user_id", type: "String" }, + { + name: "event_date", + type: /Date(32)?/, + materialized: "toDate(timestamp)", + }, + { + name: "user_hash", + type: "UInt64", + materialized: "cityHash64(user_id)", + }, + { name: "log_blob", type: "JSON", codec: "ZSTD(3)" }, + { + name: "combination_hash", + type: "Array(UInt64)", + materialized: + "arrayMap(kv -> cityHash64(kv.1, kv.2), JSONExtractKeysAndValuesRaw(toString(log_blob)))", + codec: "ZSTD(1)", + }, + ], + }, ]; // ============ HELPER FUNCTIONS ============ diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index 840252b1a9..8165cb68ef 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -3544,6 +3544,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, diff --git a/apps/framework-cli/src/cli/routines/migrate.rs b/apps/framework-cli/src/cli/routines/migrate.rs index 6edc7a8bea..7e7c5dc2bf 100644 --- a/apps/framework-cli/src/cli/routines/migrate.rs +++ b/apps/framework-cli/src/cli/routines/migrate.rs @@ -763,6 +763,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -797,6 +799,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); table } @@ -1140,6 +1144,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, after_column: None, database: Some("bad_db".to_string()), @@ -1157,6 +1163,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, after_column: Column { name: "col".to_string(), @@ -1168,6 +1176,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, database: Some("another_bad_db".to_string()), cluster_name: None, diff --git a/apps/framework-cli/src/framework/core/infra_reality_checker.rs b/apps/framework-cli/src/framework/core/infra_reality_checker.rs index 17320e03ca..7d95265124 100644 --- a/apps/framework-cli/src/framework/core/infra_reality_checker.rs +++ b/apps/framework-cli/src/framework/core/infra_reality_checker.rs @@ -515,6 +515,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -610,6 +612,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); let mock_client = MockOlapClient { @@ -679,6 +683,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; actual_table.columns.push(timestamp_col.clone()); infra_table.columns.push(timestamp_col); diff --git a/apps/framework-cli/src/framework/core/infrastructure/table.rs b/apps/framework-cli/src/framework/core/infrastructure/table.rs index a21336a8e4..0a68b5ee0a 100644 --- a/apps/framework-cli/src/framework/core/infrastructure/table.rs +++ b/apps/framework-cli/src/framework/core/infrastructure/table.rs @@ -600,6 +600,10 @@ pub struct Column { pub comment: Option, // Column comment for metadata storage #[serde(skip_serializing_if = "Option::is_none", default)] pub ttl: Option, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub codec: Option, // Compression codec expression (e.g., "ZSTD(3)", "Delta, LZ4") + #[serde(skip_serializing_if = "Option::is_none", default)] + pub materialized: Option, // MATERIALIZED column expression (computed at write-time, physically stored) } #[derive(Debug, Clone, Eq, PartialEq, Hash)] @@ -1114,6 +1118,8 @@ impl Column { .collect(), comment: self.comment.clone(), ttl: self.ttl.clone(), + codec: self.codec.clone(), + materialized: self.materialized.clone(), special_fields: Default::default(), } } @@ -1136,6 +1142,8 @@ impl Column { annotations, comment: proto.comment, ttl: proto.ttl, + codec: proto.codec, + materialized: proto.materialized, } } } @@ -1515,6 +1523,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let json = serde_json::to_string(&nested_column).unwrap(); @@ -1535,6 +1545,8 @@ mod tests { annotations: vec![], comment: Some("[MOOSE_METADATA:DO_NOT_MODIFY] {\"version\":1,\"enum\":{\"name\":\"TestEnum\",\"members\":[]}}".to_string()), ttl: None, + codec: None, + materialized: None, }; // Convert to proto and back @@ -1558,6 +1570,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let proto = column_without_comment.to_proto(); @@ -1741,6 +1755,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "name".to_string(), @@ -1752,6 +1768,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; diff --git a/apps/framework-cli/src/framework/core/infrastructure_map.rs b/apps/framework-cli/src/framework/core/infrastructure_map.rs index 95ef3f7465..a93f0ffe37 100644 --- a/apps/framework-cli/src/framework/core/infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/infrastructure_map.rs @@ -52,6 +52,7 @@ use crate::framework::core::infrastructure_map::Change::Added; use crate::framework::languages::SupportedLanguages; use crate::framework::python::datamodel_config::load_main_py; use crate::framework::scripts::Workflow; +use crate::infrastructure::olap::clickhouse::codec_expressions_are_equivalent; use crate::infrastructure::olap::clickhouse::config::DEFAULT_DATABASE_NAME; use crate::infrastructure::redis::redis_client::RedisClient; use crate::project::Project; @@ -2769,12 +2770,13 @@ fn ttl_expressions_are_equivalent(before: &Option, after: &Option bool { - // Check all non-data_type and non-ttl fields first + // Check all non-data_type and non-ttl and non-codec fields first if before.name != after.name || before.required != after.required || before.unique != after.unique // primary_key change is handled at the table level || before.default != after.default + || before.materialized != after.materialized || before.annotations != after.annotations || before.comment != after.comment { @@ -2786,6 +2788,12 @@ fn columns_are_equivalent(before: &Column, after: &Column) -> bool { return false; } + // Special handling for codec comparison: normalize both expressions before comparing + // This handles cases where ClickHouse adds default parameters (e.g., Delta → Delta(4)) + if !codec_expressions_are_equivalent(&before.codec, &after.codec) { + return false; + } + // Use ClickHouse-specific semantic comparison for data types // This handles special cases like enums and JSON types with order-independent typed_paths use crate::infrastructure::olap::clickhouse::diff_strategy::column_types_are_equivalent; @@ -3025,6 +3033,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "name".to_string(), @@ -3036,6 +3046,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "to_be_removed".to_string(), @@ -3047,6 +3059,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -3081,6 +3095,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "name".to_string(), @@ -3092,6 +3108,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "age".to_string(), // New column @@ -3103,6 +3121,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string(), "name".to_string()]), // Changed order_by @@ -3151,6 +3171,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "to_remove".to_string(), @@ -3162,6 +3184,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -3178,6 +3202,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "new_column".to_string(), @@ -3189,6 +3215,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -3324,6 +3352,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); let diff = compute_table_columns_diff(&before, &after); @@ -3355,6 +3385,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); let diff = compute_table_columns_diff(&before, &after); @@ -3383,6 +3415,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); after.columns.push(Column { @@ -3395,6 +3429,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); let diff = compute_table_columns_diff(&before, &after); @@ -3429,6 +3465,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "to_remove".to_string(), @@ -3440,6 +3478,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "to_modify".to_string(), @@ -3451,6 +3491,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]); @@ -3466,6 +3508,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "to_modify".to_string(), // modified @@ -3477,6 +3521,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "new_column".to_string(), // added @@ -3488,6 +3534,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]); @@ -3632,6 +3680,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); after.columns.push(Column { @@ -3644,6 +3694,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); let diff = compute_table_columns_diff(&before, &after); @@ -3677,6 +3729,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "name".to_string(), @@ -3688,6 +3742,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]); @@ -3703,6 +3759,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "id".to_string(), @@ -3714,6 +3772,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]); @@ -3741,6 +3801,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; before.columns.push(col.clone()); after.columns.push(col); @@ -3782,6 +3844,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); // Change every other column type in the after table @@ -3815,6 +3879,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); } @@ -3845,6 +3911,8 @@ mod diff_tests { ], comment: None, ttl: None, + codec: None, + materialized: None, }); after.columns.push(Column { @@ -3860,6 +3928,8 @@ mod diff_tests { ], comment: None, ttl: None, + codec: None, + materialized: None, }); let diff = compute_table_columns_diff(&before, &after); @@ -3900,6 +3970,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); after.columns.push(Column { @@ -3912,6 +3984,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); // Test special characters in column name @@ -3925,6 +3999,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); after.columns.push(Column { @@ -3937,6 +4013,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); let diff = compute_table_columns_diff(&before, &after); @@ -3961,6 +4039,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let col2 = col1.clone(); assert!(columns_are_equivalent(&col1, &col2)); @@ -3998,6 +4078,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let clickhouse_enum_col = Column { @@ -4022,6 +4104,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; // These should be equivalent due to the enum semantic comparison @@ -4047,6 +4131,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; assert!(!columns_are_equivalent( @@ -4065,6 +4151,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let int_col2 = Column { @@ -4077,6 +4165,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; assert!(!columns_are_equivalent(&int_col1, &int_col2)); @@ -4108,6 +4198,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let json_col2 = Column { @@ -4130,6 +4222,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; // These should be equivalent - order of typed_paths doesn't matter @@ -4155,6 +4249,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; assert!(!columns_are_equivalent(&json_col1, &json_col3)); @@ -4180,6 +4276,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; assert!(!columns_are_equivalent(&json_col1, &json_col4)); @@ -4222,6 +4320,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let nested_json_col2 = Column { @@ -4255,6 +4355,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; // These should be equivalent - order doesn't matter at any level @@ -4286,6 +4388,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "priority".to_string(), @@ -4297,6 +4401,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], jwt: false, @@ -4308,6 +4414,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let col_with_user_name = Column { @@ -4328,6 +4436,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "priority".to_string(), @@ -4339,6 +4449,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], jwt: false, @@ -4350,6 +4462,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; // These should be equivalent - name difference doesn't matter if structure matches @@ -4376,6 +4490,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], // Missing priority column jwt: false, }), @@ -4386,6 +4502,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; assert!(!columns_are_equivalent( @@ -4422,6 +4540,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "notifications".to_string(), @@ -4433,6 +4553,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], jwt: false, @@ -4444,6 +4566,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], jwt: false, }), @@ -4454,6 +4578,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], jwt: false, }), @@ -4464,6 +4590,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let col_user = Column { @@ -4489,6 +4617,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "notifications".to_string(), @@ -4500,6 +4630,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], jwt: false, @@ -4511,6 +4643,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], jwt: false, }), @@ -4521,6 +4655,8 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], jwt: false, }), @@ -4531,11 +4667,124 @@ mod diff_tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; // These should be equivalent - name differences at all levels don't matter assert!(columns_are_equivalent(&col_generated, &col_user)); } + + #[test] + fn test_columns_are_equivalent_with_codec() { + use crate::framework::core::infrastructure::table::{Column, ColumnType}; + + let base_col = Column { + name: "data".to_string(), + data_type: ColumnType::String, + required: true, + unique: false, + primary_key: false, + default: None, + annotations: vec![], + comment: None, + ttl: None, + codec: None, + materialized: None, + }; + + // Test 1: Columns with same codec should be equivalent + let col_with_codec1 = Column { + codec: Some("ZSTD(3)".to_string()), + materialized: None, + ..base_col.clone() + }; + let col_with_codec2 = Column { + codec: Some("ZSTD(3)".to_string()), + materialized: None, + ..base_col.clone() + }; + assert!(columns_are_equivalent(&col_with_codec1, &col_with_codec2)); + + // Test 2: Columns with different codecs should not be equivalent + let col_with_different_codec = Column { + codec: Some("LZ4".to_string()), + materialized: None, + ..base_col.clone() + }; + assert!(!columns_are_equivalent( + &col_with_codec1, + &col_with_different_codec + )); + + // Test 3: Column with codec vs column without codec should not be equivalent + assert!(!columns_are_equivalent(&col_with_codec1, &base_col)); + + // Test 4: Columns with codec chains should be detected as different + let col_with_chain1 = Column { + codec: Some("Delta, LZ4".to_string()), + materialized: None, + ..base_col.clone() + }; + let col_with_chain2 = Column { + codec: Some("Delta, ZSTD".to_string()), + materialized: None, + ..base_col.clone() + }; + assert!(!columns_are_equivalent(&col_with_chain1, &col_with_chain2)); + + // Test 5: Codec with different compression levels should be detected as different + let col_zstd3 = Column { + codec: Some("ZSTD(3)".to_string()), + materialized: None, + ..base_col.clone() + }; + let col_zstd9 = Column { + codec: Some("ZSTD(9)".to_string()), + materialized: None, + ..base_col.clone() + }; + assert!(!columns_are_equivalent(&col_zstd3, &col_zstd9)); + + // Test 6: Normalized codec comparison - user "Delta" vs ClickHouse "Delta(4)" + let col_user_delta = Column { + codec: Some("Delta".to_string()), + materialized: None, + ..base_col.clone() + }; + let col_ch_delta = Column { + codec: Some("Delta(4)".to_string()), + materialized: None, + ..base_col.clone() + }; + assert!(columns_are_equivalent(&col_user_delta, &col_ch_delta)); + + // Test 7: Normalized codec comparison - user "Gorilla" vs ClickHouse "Gorilla(8)" + let col_user_gorilla = Column { + codec: Some("Gorilla".to_string()), + materialized: None, + ..base_col.clone() + }; + let col_ch_gorilla = Column { + codec: Some("Gorilla(8)".to_string()), + materialized: None, + ..base_col.clone() + }; + assert!(columns_are_equivalent(&col_user_gorilla, &col_ch_gorilla)); + + // Test 8: Normalized chain comparison - "Delta, LZ4" vs "Delta(4), LZ4" + let col_user_chain = Column { + codec: Some("Delta, LZ4".to_string()), + materialized: None, + ..base_col.clone() + }; + let col_ch_chain = Column { + codec: Some("Delta(4), LZ4".to_string()), + materialized: None, + ..base_col.clone() + }; + assert!(columns_are_equivalent(&col_user_chain, &col_ch_chain)); + } } #[cfg(test)] @@ -4890,6 +5139,8 @@ mod diff_topic_tests { annotations: Vec::new(), comment: None, ttl: None, + codec: None, + materialized: None, }], metadata: None, life_cycle: LifeCycle::FullyManaged, @@ -5180,6 +5431,8 @@ mod diff_topic_to_table_sync_process_tests { annotations: Vec::new(), comment: None, ttl: None, + codec: None, + materialized: None, }], version: Some(version.clone()), source_primitive: PrimitiveSignature { @@ -5303,6 +5556,8 @@ mod diff_topic_to_table_sync_process_tests { annotations: vec![("note".to_string(), Value::String("changed".to_string()))], comment: None, ttl: None, + codec: None, + materialized: None, }]; assert_eq!( diff --git a/apps/framework-cli/src/framework/core/plan.rs b/apps/framework-cli/src/framework/core/plan.rs index c3978121c2..fbb0d004a4 100644 --- a/apps/framework-cli/src/framework/core/plan.rs +++ b/apps/framework-cli/src/framework/core/plan.rs @@ -509,6 +509,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -713,6 +715,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); // Create test project first to get the database name @@ -1090,6 +1094,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }); // Create mock OLAP client with the reality table diff --git a/apps/framework-cli/src/framework/core/plan_validator.rs b/apps/framework-cli/src/framework/core/plan_validator.rs index 99f9dda6a5..6d1dfc1783 100644 --- a/apps/framework-cli/src/framework/core/plan_validator.rs +++ b/apps/framework-cli/src/framework/core/plan_validator.rs @@ -149,6 +149,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -324,6 +326,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, diff --git a/apps/framework-cli/src/framework/python/generate.rs b/apps/framework-cli/src/framework/python/generate.rs index a8be4e26d3..7c876c2148 100644 --- a/apps/framework-cli/src/framework/python/generate.rs +++ b/apps/framework-cli/src/framework/python/generate.rs @@ -557,7 +557,7 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option) -> Stri .unwrap(); writeln!( output, - "from moose_lib import clickhouse_default, LifeCycle, ClickHouseTTL" + "from moose_lib import clickhouse_default, ClickHouseCodec, ClickHouseMaterialized, LifeCycle, ClickHouseTTL" ) .unwrap(); writeln!( @@ -677,11 +677,30 @@ pub fn tables_to_python(tables: &[Table], life_cycle: Option) -> Stri if let Some(ref ttl_expr) = column.ttl { type_str = format!("Annotated[{}, ClickHouseTTL({:?})]", type_str, ttl_expr); } - if let Some(ref default_expr) = column.default { - type_str = format!( - "Annotated[{}, clickhouse_default({:?})]", - type_str, default_expr - ); + if let Some(ref codec_expr) = column.codec { + type_str = format!("Annotated[{}, ClickHouseCodec({:?})]", type_str, codec_expr); + } + // Handle DEFAULT and MATERIALIZED (mutually exclusive) + match (&column.default, &column.materialized) { + (Some(default_expr), None) => { + type_str = format!( + "Annotated[{}, clickhouse_default({:?})]", + type_str, default_expr + ); + } + (None, Some(materialized_expr)) => { + type_str = format!( + "Annotated[{}, ClickHouseMaterialized({:?})]", + type_str, materialized_expr + ); + } + (None, None) => { + // No default or materialized, do nothing + } + (Some(_), Some(_)) => { + // This should never happen due to validation + panic!("Column '{}' has both DEFAULT and MATERIALIZED - this should be caught by validation", column.name) + } } let type_str = if can_use_key_wrapping && column.primary_key { @@ -1045,6 +1064,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "timestamp".to_string(), @@ -1056,6 +1077,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "optional_text".to_string(), @@ -1067,6 +1090,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["primary_key".to_string()]), @@ -1100,7 +1125,7 @@ from enum import IntEnum, Enum from moose_lib import Key, IngestPipeline, IngestPipelineConfig, OlapTable, OlapConfig, clickhouse_datetime64, clickhouse_decimal, ClickhouseSize, StringToEnumMixin from moose_lib.data_models import ClickHouseJson from moose_lib import Point, Ring, LineString, MultiLineString, Polygon, MultiPolygon, FixedString -from moose_lib import clickhouse_default, LifeCycle, ClickHouseTTL +from moose_lib import clickhouse_default, ClickHouseCodec, ClickHouseMaterialized, LifeCycle, ClickHouseTTL from moose_lib.blocks import MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, S3QueueEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine class Foo(BaseModel): @@ -1130,6 +1155,8 @@ foo_table = OlapTable[Foo]("Foo", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "numbers".to_string(), @@ -1144,6 +1171,8 @@ foo_table = OlapTable[Foo]("Foo", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "nested_numbers".to_string(), @@ -1161,6 +1190,8 @@ foo_table = OlapTable[Foo]("Foo", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1215,6 +1246,8 @@ nested_array_table = OlapTable[NestedArray]("NestedArray", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "city".to_string(), @@ -1226,6 +1259,8 @@ nested_array_table = OlapTable[NestedArray]("NestedArray", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "zipCode".to_string(), @@ -1237,6 +1272,8 @@ nested_array_table = OlapTable[NestedArray]("NestedArray", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], jwt: false, @@ -1255,6 +1292,8 @@ nested_array_table = OlapTable[NestedArray]("NestedArray", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "address".to_string(), @@ -1266,6 +1305,8 @@ nested_array_table = OlapTable[NestedArray]("NestedArray", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "addresses".to_string(), @@ -1280,6 +1321,8 @@ nested_array_table = OlapTable[NestedArray]("NestedArray", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1337,6 +1380,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "data".to_string(), @@ -1348,6 +1393,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1405,6 +1452,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -1461,6 +1510,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "version".to_string(), @@ -1472,6 +1523,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "is_deleted".to_string(), @@ -1483,6 +1536,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1530,6 +1585,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "coordinates".to_string(), @@ -1544,6 +1601,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "metadata".to_string(), @@ -1558,6 +1617,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1619,6 +1680,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "timestamp".to_string(), @@ -1630,6 +1693,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "email".to_string(), @@ -1641,6 +1706,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: Some("timestamp + INTERVAL 30 DAY".to_string()), + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string(), "timestamp".to_string()]), @@ -1687,6 +1754,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -1752,6 +1821,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "payload".to_string(), @@ -1772,6 +1843,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1826,6 +1899,8 @@ user_table = OlapTable[User]("User", OlapConfig( annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, diff --git a/apps/framework-cli/src/framework/python/utils.rs b/apps/framework-cli/src/framework/python/utils.rs index fb0717bed5..c8a3be9362 100644 --- a/apps/framework-cli/src/framework/python/utils.rs +++ b/apps/framework-cli/src/framework/python/utils.rs @@ -53,6 +53,8 @@ impl ColumnBuilder { // are generated later when converting to ClickHouse columns comment: None, ttl: None, + codec: None, + materialized: None, }) } } diff --git a/apps/framework-cli/src/framework/streaming/generate.rs b/apps/framework-cli/src/framework/streaming/generate.rs index 7a3446e302..4da15a4e07 100644 --- a/apps/framework-cli/src/framework/streaming/generate.rs +++ b/apps/framework-cli/src/framework/streaming/generate.rs @@ -514,6 +514,8 @@ my_function = StreamingFunction( // for actual ClickHouse table columns to preserve enum definitions comment: None, ttl: None, + codec: None, + materialized: None, }) .collect() } diff --git a/apps/framework-cli/src/framework/typescript/generate.rs b/apps/framework-cli/src/framework/typescript/generate.rs index cffd6cae21..6c93837340 100644 --- a/apps/framework-cli/src/framework/typescript/generate.rs +++ b/apps/framework-cli/src/framework/typescript/generate.rs @@ -340,6 +340,8 @@ pub fn tables_to_typescript(tables: &[Table], life_cycle: Option) -> "WithDefault", "LifeCycle", "ClickHouseTTL", + "ClickHouseCodec", + "ClickHouseMaterialized", ]; if uses_simple_aggregate { @@ -587,19 +589,36 @@ pub fn tables_to_typescript(tables: &[Table], life_cycle: Option) -> } } - // Append ClickHouseTTL type tag if present on the column - if let Some(expr) = &column.ttl { - type_str = format!("{type_str} & ClickHouseTTL<\"{}\">", expr); - } - let type_str = match column.default { - None => type_str, - Some(ref default) if type_str == "Date" => { + // Handle DEFAULT and MATERIALIZED (mutually exclusive) + let type_str = match (&column.default, &column.materialized) { + (Some(default), None) if type_str == "Date" => { // https://github.com/samchon/typia/issues/1658 format!("WithDefault<{type_str}, {:?}>", default) } - Some(ref default) => { + (Some(default), None) => { format!("{type_str} & ClickHouseDefault<{:?}>", default) } + (None, Some(materialized)) => { + format!("{type_str} & ClickHouseMaterialized<{:?}>", materialized) + } + (None, None) => type_str, + (Some(_), Some(_)) => { + // This should never happen due to validation, but handle it gracefully + panic!("Column '{}' has both DEFAULT and MATERIALIZED - this should be caught by validation", column.name) + } + }; + + // Append ClickHouseTTL type tag if present on the column + let type_str = if let Some(expr) = &column.ttl { + format!("{type_str} & ClickHouseTTL<\"{}\">", expr) + } else { + type_str + }; + + // Wrap with Codec if present + let type_str = match column.codec.as_ref() { + None => type_str, + Some(ref codec) => format!("{type_str} & ClickHouseCodec<{codec:?}>"), }; let type_str = if can_use_key_wrapping && column.primary_key { format!("Key<{type_str}>") @@ -938,6 +957,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "city".to_string(), @@ -949,6 +970,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "zip_code".to_string(), @@ -960,6 +983,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], jwt: false, @@ -978,6 +1003,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "address".to_string(), @@ -989,6 +1016,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "addresses".to_string(), @@ -1003,6 +1032,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1063,6 +1094,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "data".to_string(), @@ -1074,6 +1107,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1130,6 +1165,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -1181,6 +1218,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "version".to_string(), @@ -1192,6 +1231,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "is_deleted".to_string(), @@ -1203,6 +1244,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1249,6 +1292,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], sample_by: None, order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1301,6 +1346,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "version".to_string(), @@ -1312,6 +1359,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "is_deleted".to_string(), @@ -1323,6 +1372,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], sample_by: None, @@ -1377,6 +1428,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["u64".to_string()]), partition_by: None, @@ -1451,6 +1504,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "status".to_string(), @@ -1462,6 +1517,8 @@ export const UserTable = new OlapTable("User", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1518,6 +1575,8 @@ export const TaskTable = new OlapTable("Task", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "timestamp".to_string(), @@ -1529,6 +1588,8 @@ export const TaskTable = new OlapTable("Task", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "email".to_string(), @@ -1540,6 +1601,8 @@ export const TaskTable = new OlapTable("Task", { annotations: vec![], comment: None, ttl: Some("timestamp + INTERVAL 30 DAY".to_string()), + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string(), "timestamp".to_string()]), @@ -1588,6 +1651,8 @@ export const TaskTable = new OlapTable("Task", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "payload".to_string(), @@ -1608,6 +1673,8 @@ export const TaskTable = new OlapTable("Task", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -1656,6 +1723,8 @@ export const TaskTable = new OlapTable("Task", { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/diff_strategy.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/diff_strategy.rs index 46168bcc69..ad9c085eca 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/diff_strategy.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/diff_strategy.rs @@ -662,6 +662,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "timestamp".to_string(), @@ -673,6 +675,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(order_by), @@ -798,6 +802,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, position_after: Some("timestamp".to_string()), }]; @@ -855,6 +861,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, position_after: Some("timestamp".to_string()), }]; diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/mapper.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/mapper.rs index 2eecff31d0..16769e5b1b 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/mapper.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/mapper.rs @@ -54,6 +54,27 @@ fn generate_column_comment(column: &Column) -> Result, Clickhouse pub fn std_column_to_clickhouse_column( column: Column, ) -> Result { + // Validate mutual exclusivity of DEFAULT and MATERIALIZED + if column.default.is_some() && column.materialized.is_some() { + return Err(ClickhouseError::InvalidParameters { + message: format!( + "Column '{}' cannot have both DEFAULT and MATERIALIZED. Use one or the other.", + column.name + ), + }); + } + + // Validate that MATERIALIZED columns are not primary keys + if column.materialized.is_some() && column.primary_key { + return Err(ClickhouseError::InvalidParameters { + message: format!( + "Column '{}' cannot be both MATERIALIZED and a primary key. \ + MATERIALIZED columns are computed and cannot be used as primary keys.", + column.name + ), + }); + } + let comment = generate_column_comment(&column)?; let mut column_type = @@ -83,6 +104,8 @@ pub fn std_column_to_clickhouse_column( default: column.default.clone(), comment, ttl: column.ttl.clone(), + codec: column.codec.clone(), + materialized: column.materialized.clone(), }; Ok(clickhouse_column) @@ -426,6 +449,8 @@ mod tests { annotations: vec![], comment: Some("This is a user comment about the record type".to_string()), ttl: None, + codec: None, + materialized: None, }; let clickhouse_column = std_column_to_clickhouse_column(column_with_user_comment).unwrap(); @@ -450,6 +475,8 @@ mod tests { annotations: vec![], comment: Some(format!("Old user comment {}", old_metadata)), ttl: None, + codec: None, + materialized: None, }; let clickhouse_column = std_column_to_clickhouse_column(column_with_both).unwrap(); @@ -476,6 +503,8 @@ mod tests { annotations: vec![], comment: Some(old_metadata), ttl: None, + codec: None, + materialized: None, }; let clickhouse_column = std_column_to_clickhouse_column(column_metadata_only).unwrap(); @@ -518,6 +547,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "status".to_string(), @@ -529,6 +560,8 @@ mod tests { annotations: vec![], comment: Some("User status field".to_string()), // User comment ttl: None, + codec: None, + materialized: None, }, ], jwt: false, diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/mod.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/mod.rs index 757bf54b4d..734e98023f 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/mod.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/mod.rs @@ -812,6 +812,10 @@ async fn execute_add_table_column( let clickhouse_column = std_column_to_clickhouse_column(column.clone())?; let column_type_string = basic_field_type_to_string(&clickhouse_column.column_type)?; + let cluster_clause = cluster_name + .map(|c| format!(" ON CLUSTER {}", c)) + .unwrap_or_default(); + // Include DEFAULT clause if column has a default value let default_clause = clickhouse_column .default @@ -819,22 +823,42 @@ async fn execute_add_table_column( .map(|d| format!(" DEFAULT {}", d)) .unwrap_or_default(); - let cluster_clause = cluster_name - .map(|c| format!(" ON CLUSTER {}", c)) + // Include MATERIALIZED clause if column has a materialized expression + let materialized_clause = clickhouse_column + .materialized + .as_ref() + .map(|m| format!(" MATERIALIZED {}", m)) .unwrap_or_default(); + let codec_clause = clickhouse_column + .codec + .as_ref() + .map(|c| format!(" CODEC({})", c)) + .unwrap_or_default(); + + let ttl_clause = clickhouse_column + .ttl + .as_ref() + .map(|t| format!(" TTL {}", t)) + .unwrap_or_default(); + + let position_clause = match after_column { + None => "FIRST".to_string(), + Some(after_col) => format!("AFTER `{after_col}`"), + }; + let add_column_query = format!( - "ALTER TABLE `{}`.`{}`{} ADD COLUMN `{}` {}{} {}", + "ALTER TABLE `{}`.`{}`{} ADD COLUMN `{}` {}{}{}{}{} {}", db_name, table_name, cluster_clause, clickhouse_column.name, column_type_string, default_clause, - match after_column { - None => "FIRST".to_string(), - Some(after_col) => format!("AFTER `{after_col}`"), - } + materialized_clause, + codec_clause, + ttl_clause, + position_clause ); log::debug!("Adding column: {}", add_column_query); run_query(&add_column_query, client).await.map_err(|e| { @@ -892,16 +916,20 @@ async fn execute_modify_table_column( // Check if only the comment has changed let data_type_changed = before_column.data_type != after_column.data_type; let default_changed = before_column.default != after_column.default; + let materialized_changed = before_column.materialized != after_column.materialized; let required_changed = before_column.required != after_column.required; let comment_changed = before_column.comment != after_column.comment; let ttl_changed = before_column.ttl != after_column.ttl; + let codec_changed = before_column.codec != after_column.codec; // If only the comment changed, use a simpler ALTER TABLE ... MODIFY COLUMN ... COMMENT // This is more efficient and avoids unnecessary table rebuilds if !data_type_changed && !required_changed && !default_changed + && !materialized_changed && !ttl_changed + && !codec_changed && comment_changed { log::info!( @@ -940,7 +968,7 @@ async fn execute_modify_table_column( log::info!( "Executing ModifyTableColumn for table: {}, column: {} ({}→{})\ -data_type_changed: {data_type_changed}, default_changed: {default_changed}, required_changed: {required_changed}, comment_changed: {comment_changed}, ttl_changed: {ttl_changed}", +data_type_changed: {data_type_changed}, default_changed: {default_changed}, materialized_changed: {materialized_changed}, required_changed: {required_changed}, comment_changed: {comment_changed}, ttl_changed: {ttl_changed}, codec_changed: {codec_changed}", table_name, after_column.name, before_column.data_type, @@ -952,13 +980,18 @@ data_type_changed: {data_type_changed}, default_changed: {default_changed}, requ // Build all the SQL statements needed (main modify + optional removes) let removing_default = before_column.default.is_some() && after_column.default.is_none(); + let removing_materialized = + before_column.materialized.is_some() && after_column.materialized.is_none(); let removing_ttl = before_column.ttl.is_some() && after_column.ttl.is_none(); + let removing_codec = before_column.codec.is_some() && after_column.codec.is_none(); let queries = build_modify_column_sql( db_name, table_name, &clickhouse_column, removing_default, + removing_materialized, removing_ttl, + removing_codec, cluster_name, )?; @@ -1007,12 +1040,15 @@ async fn execute_modify_column_comment( Ok(()) } +#[allow(clippy::too_many_arguments)] fn build_modify_column_sql( db_name: &str, table_name: &str, ch_col: &ClickHouseColumn, removing_default: bool, + removing_materialized: bool, removing_ttl: bool, + removing_codec: bool, cluster_name: Option<&str>, ) -> Result, ClickhouseChangesError> { let column_type_string = basic_field_type_to_string(&ch_col.column_type)?; @@ -1032,6 +1068,14 @@ fn build_modify_column_sql( )); } + // Add REMOVE MATERIALIZED statement if needed + if removing_materialized { + statements.push(format!( + "ALTER TABLE `{}`.`{}`{} MODIFY COLUMN `{}` REMOVE MATERIALIZED", + db_name, table_name, cluster_clause, ch_col.name + )); + } + // Add REMOVE TTL statement if needed if removing_ttl { statements.push(format!( @@ -1040,6 +1084,14 @@ fn build_modify_column_sql( )); } + // Add REMOVE CODEC statement if needed + if removing_codec { + statements.push(format!( + "ALTER TABLE `{}`.`{}`{} MODIFY COLUMN `{}` REMOVE CODEC", + db_name, table_name, cluster_clause, ch_col.name + )); + } + // DEFAULT clause: If omitted, ClickHouse KEEPS any existing DEFAULT // Therefore, DEFAULT removal requires a separate REMOVE DEFAULT statement // Default values from ClickHouse/Python are already properly formatted @@ -1053,6 +1105,13 @@ fn build_modify_column_sql( .map(|d| format!(" DEFAULT {}", d)) .unwrap_or_default(); + // MATERIALIZED clause: If omitted, ClickHouse KEEPS any existing MATERIALIZED + let materialized_clause = ch_col + .materialized + .as_ref() + .map(|m| format!(" MATERIALIZED {}", m)) + .unwrap_or_default(); + // TTL clause: If omitted, ClickHouse KEEPS any existing TTL // Therefore, TTL removal requires a separate REMOVE TTL statement let ttl_clause = ch_col @@ -1061,29 +1120,41 @@ fn build_modify_column_sql( .map(|t| format!(" TTL {}", t)) .unwrap_or_default(); + // CODEC clause: If omitted, ClickHouse KEEPS any existing CODEC + // Therefore, CODEC removal requires a separate REMOVE CODEC statement + let codec_clause = ch_col + .codec + .as_ref() + .map(|c| format!(" CODEC({})", c)) + .unwrap_or_default(); + // Build the main MODIFY COLUMN statement let main_sql = if let Some(ref comment) = ch_col.comment { let escaped_comment = comment.replace('\'', "''"); format!( - "ALTER TABLE `{}`.`{}`{} MODIFY COLUMN IF EXISTS `{}` {}{}{} COMMENT '{}'", + "ALTER TABLE `{}`.`{}`{} MODIFY COLUMN IF EXISTS `{}` {}{}{}{}{} COMMENT '{}'", db_name, table_name, cluster_clause, ch_col.name, column_type_string, default_clause, + materialized_clause, + codec_clause, ttl_clause, escaped_comment ) } else { format!( - "ALTER TABLE `{}`.`{}`{} MODIFY COLUMN IF EXISTS `{}` {}{}{}", + "ALTER TABLE `{}`.`{}`{} MODIFY COLUMN IF EXISTS `{}` {}{}{}{}{}", db_name, table_name, cluster_clause, ch_col.name, column_type_string, default_clause, + materialized_clause, + codec_clause, ttl_clause ) }; @@ -1647,7 +1718,8 @@ impl OlapOperations for ConfiguredDBClient { is_in_primary_key, is_in_sorting_key, default_kind, - default_expression + default_expression, + compression_codec FROM system.columns WHERE database = '{db_name}' AND table = '{table_name}' @@ -1662,7 +1734,7 @@ impl OlapOperations for ConfiguredDBClient { let mut columns_cursor = self .client .query(&columns_query) - .fetch::<(String, String, String, u8, u8, String, String)>() + .fetch::<(String, String, String, u8, u8, String, String, String)>() .map_err(|e| { debug!("Error fetching columns for table {}: {}", table_name, e); OlapChangesError::DatabaseError(e.to_string()) @@ -1680,6 +1752,7 @@ impl OlapOperations for ConfiguredDBClient { is_sorting, default_kind, default_expression, + compression_codec, )) = columns_cursor .next() .await @@ -1764,16 +1837,17 @@ impl OlapOperations for ConfiguredDBClient { None }; - let default = match default_kind.deref() { - "" => None, - "DEFAULT" => Some(default_expression), - "MATERIALIZED" | "ALIAS" => { - debug!("MATERIALIZED and ALIAS not yet handled."); - None + let (default, materialized) = match default_kind.deref() { + "" => (None, None), + "DEFAULT" => (Some(default_expression.clone()), None), + "MATERIALIZED" => (None, Some(default_expression.clone())), + "ALIAS" => { + debug!("ALIAS columns not yet supported, skipping column {col_name}"); + continue; // Skip ALIAS columns (they're virtual, not stored) } _ => { debug!("Unknown default kind: {default_kind} for column {col_name}"); - None + (None, None) } }; @@ -1806,6 +1880,19 @@ impl OlapOperations for ConfiguredDBClient { .get(&col_name) .map(|ttl| normalize_ttl_expression(ttl)); + // Parse codec if present + // Strip CODEC(...) wrapper from compression_codec (e.g., "CODEC(ZSTD(3))" -> "ZSTD(3)") + let codec = if !compression_codec.is_empty() { + let trimmed = compression_codec.trim(); + if trimmed.starts_with("CODEC(") && trimmed.ends_with(')') { + Some(trimmed[6..trimmed.len() - 1].to_string()) + } else { + Some(trimmed.to_string()) + } + } else { + None + }; + let column = Column { name: col_name.clone(), data_type, @@ -1816,6 +1903,8 @@ impl OlapOperations for ConfiguredDBClient { annotations, comment: column_comment, ttl: normalized_ttl, + codec, + materialized, }; columns.push(column); @@ -2303,6 +2392,34 @@ pub fn extract_table_ttl_from_create_query(create_query: &str) -> Option /// - "timestamp + INTERVAL 1 MONTH" → "timestamp + toIntervalMonth(1)" /// - "timestamp + INTERVAL 90 DAY DELETE" → "timestamp + toIntervalDay(90)" /// - "timestamp + toIntervalDay(90) DELETE" → "timestamp + toIntervalDay(90)" +pub fn normalize_codec_expression(expr: &str) -> String { + expr.split(',') + .map(|codec| { + let trimmed = codec.trim(); + match trimmed { + "Delta" => "Delta(4)", + "Gorilla" => "Gorilla(8)", + "ZSTD" => "ZSTD(1)", + // DoubleDelta, LZ4, NONE, and any codec with params stay as-is + _ => trimmed, + } + }) + .collect::>() + .join(", ") +} + +/// Checks if two codec expressions are semantically equivalent after normalization. +/// +/// This handles cases where ClickHouse normalizes codecs by adding default parameters. +/// For example, "Delta, LZ4" from user code is equivalent to "Delta(4), LZ4" from ClickHouse. +pub fn codec_expressions_are_equivalent(before: &Option, after: &Option) -> bool { + match (before, after) { + (None, None) => true, + (Some(b), Some(a)) => normalize_codec_expression(b) == normalize_codec_expression(a), + _ => false, + } +} + pub fn normalize_ttl_expression(expr: &str) -> String { use regex::Regex; @@ -2640,6 +2757,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: Some("Old user comment".to_string()), ttl: None, + codec: None, + materialized: None, }; let after_column = Column { @@ -2658,6 +2777,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: Some("New user comment".to_string()), ttl: None, + codec: None, + materialized: None, }; // The execute_modify_table_column function should detect this as comment-only change @@ -2683,6 +2804,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: Some("Number of things".to_string()), ttl: None, + codec: None, + materialized: None, }; let after_column = Column { default: Some("42".to_string()), @@ -2690,7 +2813,9 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra }; let ch_after = std_column_to_clickhouse_column(after_column).unwrap(); - let sqls = build_modify_column_sql("db", "table", &ch_after, false, false, None).unwrap(); + let sqls = + build_modify_column_sql("db", "table", &ch_after, false, false, false, false, None) + .unwrap(); assert_eq!(sqls.len(), 1); assert_eq!( @@ -2714,6 +2839,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: Some("old".to_string()), ttl: None, + codec: None, + materialized: None, }; let after_column = Column { @@ -2746,13 +2873,23 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: Some("Updated description field".to_string()), ttl: None, + codec: None, + materialized: None, }; let clickhouse_column = std_column_to_clickhouse_column(column).unwrap(); - let sqls = - build_modify_column_sql("test_db", "users", &clickhouse_column, false, false, None) - .unwrap(); + let sqls = build_modify_column_sql( + "test_db", + "users", + &clickhouse_column, + false, + false, + false, + false, + None, + ) + .unwrap(); assert_eq!(sqls.len(), 1); assert_eq!( @@ -2776,6 +2913,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra default: Some("xxHash64(_id)".to_string()), // SQL function - no quotes comment: Some("Hash of the ID".to_string()), ttl: None, + codec: None, + materialized: None, }; let sqls = build_modify_column_sql( @@ -2784,6 +2923,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra &sample_hash_col, false, false, + false, + false, None, ) .unwrap(); @@ -2805,11 +2946,21 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra default: Some("now()".to_string()), // SQL function - no quotes comment: None, ttl: None, + codec: None, + materialized: None, }; - let sqls = - build_modify_column_sql("test_db", "test_table", &created_at_col, false, false, None) - .unwrap(); + let sqls = build_modify_column_sql( + "test_db", + "test_table", + &created_at_col, + false, + false, + false, + false, + None, + ) + .unwrap(); assert_eq!(sqls.len(), 1); // The fix ensures now() is NOT quoted @@ -2828,11 +2979,21 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra default: Some("'active'".to_string()), // String literal - quotes preserved comment: None, ttl: None, + codec: None, + materialized: None, }; - let sqls = - build_modify_column_sql("test_db", "test_table", &status_col, false, false, None) - .unwrap(); + let sqls = build_modify_column_sql( + "test_db", + "test_table", + &status_col, + false, + false, + false, + false, + None, + ) + .unwrap(); assert_eq!(sqls.len(), 1); // String literals should preserve their quotes @@ -2842,6 +3003,79 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra ); } + #[test] + fn test_modify_column_with_materialized() { + use crate::infrastructure::olap::clickhouse::model::ClickHouseColumn; + + // Test changing a MATERIALIZED expression + let ch_col = ClickHouseColumn { + name: "event_date".to_string(), + column_type: ClickHouseColumnType::Date, + required: true, + primary_key: false, + unique: false, + default: None, + materialized: Some("toStartOfMonth(event_time)".to_string()), + comment: None, + ttl: None, + codec: None, + }; + + let sqls = build_modify_column_sql( + "test_db", + "test_table", + &ch_col, + false, // removing_default + false, // removing_materialized + false, // removing_ttl + false, // removing_codec + None, + ) + .unwrap(); + + assert_eq!(sqls.len(), 1); + assert_eq!( + sqls[0], + "ALTER TABLE `test_db`.`test_table` MODIFY COLUMN IF EXISTS `event_date` Date MATERIALIZED toStartOfMonth(event_time)" + ); + } + + #[test] + fn test_remove_materialized_sql_generation() { + use crate::infrastructure::olap::clickhouse::model::ClickHouseColumn; + + let ch_col = ClickHouseColumn { + name: "user_hash".to_string(), + column_type: ClickHouseColumnType::ClickhouseInt(ClickHouseInt::UInt64), + required: true, + primary_key: false, + unique: false, + default: None, + materialized: None, + comment: None, + ttl: None, + codec: None, + }; + + let sqls = build_modify_column_sql( + "test_db", + "test_table", + &ch_col, + false, + true, + false, + false, + None, + ) + .unwrap(); + + assert!(!sqls.is_empty()); + assert_eq!( + sqls[0], + "ALTER TABLE `test_db`.`test_table` MODIFY COLUMN `user_hash` REMOVE MATERIALIZED" + ); + } + #[test] fn test_extract_order_by_from_create_query_nested_objects() { // Test with deeply nested structure @@ -2904,6 +3138,108 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra ); } + #[test] + fn test_normalize_codec_expression() { + // Test single codec without params - should add defaults + assert_eq!(normalize_codec_expression("Delta"), "Delta(4)"); + assert_eq!(normalize_codec_expression("Gorilla"), "Gorilla(8)"); + assert_eq!(normalize_codec_expression("ZSTD"), "ZSTD(1)"); + + // Test codecs with params - should stay as-is + assert_eq!(normalize_codec_expression("Delta(4)"), "Delta(4)"); + assert_eq!(normalize_codec_expression("Gorilla(8)"), "Gorilla(8)"); + assert_eq!(normalize_codec_expression("ZSTD(3)"), "ZSTD(3)"); + assert_eq!(normalize_codec_expression("ZSTD(9)"), "ZSTD(9)"); + + // Test codecs that don't have default params + assert_eq!(normalize_codec_expression("DoubleDelta"), "DoubleDelta"); + assert_eq!(normalize_codec_expression("LZ4"), "LZ4"); + assert_eq!(normalize_codec_expression("NONE"), "NONE"); + + // Test codec chains + assert_eq!(normalize_codec_expression("Delta, LZ4"), "Delta(4), LZ4"); + assert_eq!( + normalize_codec_expression("Gorilla, ZSTD"), + "Gorilla(8), ZSTD(1)" + ); + assert_eq!( + normalize_codec_expression("Delta, ZSTD(3)"), + "Delta(4), ZSTD(3)" + ); + assert_eq!( + normalize_codec_expression("DoubleDelta, LZ4"), + "DoubleDelta, LZ4" + ); + + // Test whitespace handling + assert_eq!(normalize_codec_expression("Delta,LZ4"), "Delta(4), LZ4"); + assert_eq!( + normalize_codec_expression(" Delta , LZ4 "), + "Delta(4), LZ4" + ); + + // Test already normalized expressions + assert_eq!(normalize_codec_expression("Delta(4), LZ4"), "Delta(4), LZ4"); + assert_eq!( + normalize_codec_expression("Gorilla(8), ZSTD(3)"), + "Gorilla(8), ZSTD(3)" + ); + } + + #[test] + fn test_codec_expressions_are_equivalent() { + // Test None vs None + assert!(codec_expressions_are_equivalent(&None, &None)); + + // Test Some vs None + assert!(!codec_expressions_are_equivalent( + &Some("ZSTD(3)".to_string()), + &None + )); + + // Test same codec + assert!(codec_expressions_are_equivalent( + &Some("ZSTD(3)".to_string()), + &Some("ZSTD(3)".to_string()) + )); + + // Test normalization: user writes "Delta", ClickHouse returns "Delta(4)" + assert!(codec_expressions_are_equivalent( + &Some("Delta".to_string()), + &Some("Delta(4)".to_string()) + )); + + // Test normalization: user writes "Gorilla", ClickHouse returns "Gorilla(8)" + assert!(codec_expressions_are_equivalent( + &Some("Gorilla".to_string()), + &Some("Gorilla(8)".to_string()) + )); + + // Test normalization: user writes "ZSTD", ClickHouse returns "ZSTD(1)" + assert!(codec_expressions_are_equivalent( + &Some("ZSTD".to_string()), + &Some("ZSTD(1)".to_string()) + )); + + // Test chain normalization + assert!(codec_expressions_are_equivalent( + &Some("Delta, LZ4".to_string()), + &Some("Delta(4), LZ4".to_string()) + )); + + // Test different codecs + assert!(!codec_expressions_are_equivalent( + &Some("ZSTD(3)".to_string()), + &Some("ZSTD(9)".to_string()) + )); + + // Test different chains + assert!(!codec_expressions_are_equivalent( + &Some("Delta, LZ4".to_string()), + &Some("Delta, ZSTD".to_string()) + )); + } + #[test] fn test_normalize_ttl_expression() { // Test DAY conversion @@ -3045,6 +3381,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: Some("Number of items".to_string()), ttl: None, + codec: None, + materialized: None, }; let clickhouse_column = std_column_to_clickhouse_column(column).unwrap(); @@ -3058,19 +3396,34 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra .map(|d| format!(" DEFAULT {}", d)) .unwrap_or_default(); + let ttl_clause = clickhouse_column + .ttl + .as_ref() + .map(|t| format!(" TTL {}", t)) + .unwrap_or_default(); + + let codec_clause = clickhouse_column + .codec + .as_ref() + .map(|c| format!(" CODEC({})", c)) + .unwrap_or_default(); + let add_column_query = format!( - "ALTER TABLE `{}`.`{}` ADD COLUMN `{}` {}{} {}", + "ALTER TABLE `{}`.`{}`{} ADD COLUMN `{}` {}{}{}{} {}", "test_db", "test_table", + "", clickhouse_column.name, column_type_string, default_clause, + codec_clause, + ttl_clause, "FIRST" ); assert_eq!( add_column_query, - "ALTER TABLE `test_db`.`test_table` ADD COLUMN `count` Int32 DEFAULT 42 FIRST" + "ALTER TABLE `test_db`.`test_table` ADD COLUMN `count` Int32 DEFAULT 42 FIRST" ); } @@ -3091,6 +3444,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; let clickhouse_column = std_column_to_clickhouse_column(column).unwrap(); @@ -3105,19 +3460,34 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra .map(|d| format!(" DEFAULT {}", d)) .unwrap_or_default(); + let ttl_clause = clickhouse_column + .ttl + .as_ref() + .map(|t| format!(" TTL {}", t)) + .unwrap_or_default(); + + let codec_clause = clickhouse_column + .codec + .as_ref() + .map(|c| format!(" CODEC({})", c)) + .unwrap_or_default(); + let add_column_query = format!( - "ALTER TABLE `{}`.`{}` ADD COLUMN `{}` {}{} {}", + "ALTER TABLE `{}`.`{}`{} ADD COLUMN `{}` {}{}{}{} {}", "test_db", "test_table", + "", clickhouse_column.name, column_type_string, default_clause, + codec_clause, + ttl_clause, "AFTER `id`" ); assert_eq!( add_column_query, - "ALTER TABLE `test_db`.`test_table` ADD COLUMN `description` Nullable(String) DEFAULT 'default text' AFTER `id`" + "ALTER TABLE `test_db`.`test_table` ADD COLUMN `description` Nullable(String) DEFAULT 'default text' AFTER `id`" ); } @@ -3140,6 +3510,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: None, ttl: Some("created_at + INTERVAL 7 DAY".to_string()), + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: Some("toYYYYMM(created_at)".to_string()), @@ -3206,6 +3578,8 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra annotations: vec![], comment: None, ttl: Some("created_at + INTERVAL 7 DAY".to_string()), + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: Some("toYYYYMM(created_at)".to_string()), @@ -3350,4 +3724,39 @@ SETTINGS enable_mixed_granularity_parts = 1, index_granularity = 8192, index_gra _ => panic!("Expected Table signature"), } } + + #[test] + fn test_codec_wrapper_stripping() { + let test_cases = vec![ + ("CODEC(ZSTD(3))", "ZSTD(3)"), + ("CODEC(Delta, LZ4)", "Delta, LZ4"), + ("CODEC(Gorilla, ZSTD(3))", "Gorilla, ZSTD(3)"), + ("CODEC(DoubleDelta)", "DoubleDelta"), + ("", ""), + ]; + + for (input, expected) in test_cases { + let result = if !input.is_empty() { + let trimmed = input.trim(); + if trimmed.starts_with("CODEC(") && trimmed.ends_with(')') { + Some(trimmed[6..trimmed.len() - 1].to_string()) + } else { + Some(input.to_string()) + } + } else { + None + }; + + if expected.is_empty() { + assert_eq!(result, None, "Failed for input: {}", input); + } else { + assert_eq!( + result, + Some(expected.to_string()), + "Failed for input: {}", + input + ); + } + } + } } diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/model.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/model.rs index fbc134ea65..5f09fa7be5 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/model.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/model.rs @@ -432,6 +432,8 @@ pub struct ClickHouseColumn { pub default: Option, pub comment: Option, // Column comment for metadata storage pub ttl: Option, + pub codec: Option, // Compression codec expression (e.g., "ZSTD(3)", "Delta, LZ4") + pub materialized: Option, // MATERIALIZED column expression (computed at write-time, physically stored) } impl ClickHouseColumn { diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs index 154fc53b57..6ec816134a 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs @@ -124,7 +124,7 @@ static CREATE_TABLE_TEMPLATE: &str = r#" CREATE TABLE IF NOT EXISTS `{{db_name}}`.`{{table_name}}`{{#if cluster_name}} ON CLUSTER {{cluster_name}}{{/if}} ( -{{#each fields}} `{{field_name}}` {{{field_type}}} {{field_nullable}}{{#if field_default}} DEFAULT {{{field_default}}}{{/if}}{{#if field_comment}} COMMENT '{{{field_comment}}}'{{/if}}{{#if field_ttl}} TTL {{{field_ttl}}}{{/if}}{{#unless @last}}, +{{#each fields}} `{{field_name}}` {{{field_type}}} {{field_nullable}}{{#if field_default}} DEFAULT {{{field_default}}}{{/if}}{{#if field_materialized}} MATERIALIZED {{{field_materialized}}}{{/if}}{{#if field_codec}} CODEC({{{field_codec}}}){{/if}}{{#if field_ttl}} TTL {{{field_ttl}}}{{/if}}{{#if field_comment}} COMMENT '{{{field_comment}}}'{{/if}}{{#unless @last}}, {{/unless}}{{/each}}{{#if has_indexes}}, {{#each indexes}}{{this}}{{#unless @last}}, {{/unless}}{{/each}}{{/if}} ) ENGINE = {{engine}}{{#if primary_key_string}} @@ -3054,6 +3054,7 @@ fn builds_field_context(columns: &[ClickHouseColumn]) -> Result, Clic let escaped_comment = column.comment.as_ref().map(|c| c.replace('\'', "''")); let field_ttl = column.ttl.as_ref(); + let field_codec = column.codec.as_ref(); // Default values from ClickHouse/Python are already properly formatted // - String literals come with quotes: 'active' @@ -3061,12 +3062,15 @@ fn builds_field_context(columns: &[ClickHouseColumn]) -> Result, Clic // - Numbers come without quotes: 42 // So we use them as-is without additional formatting let formatted_default = column.default.as_ref(); + let formatted_materialized = column.materialized.as_ref(); Ok(json!({ "field_name": column.name, "field_type": field_type, "field_ttl": field_ttl, + "field_codec": field_codec, "field_default": formatted_default, + "field_materialized": formatted_materialized, "field_nullable": if let ClickHouseColumnType::Nullable(_) = column.column_type { // if type is Nullable, do not add extra specifier "".to_string() @@ -3103,6 +3107,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "nested_field_2".to_string(), @@ -3113,6 +3119,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "nested_field_3".to_string(), @@ -3123,6 +3131,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "nested_field_4".to_string(), @@ -3133,6 +3143,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "nested_field_5".to_string(), @@ -3143,6 +3155,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "nested_field_6".to_string(), @@ -3165,6 +3179,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "nested_field_7".to_string(), @@ -3175,6 +3191,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ]); @@ -3259,6 +3277,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "name".to_string(), @@ -3269,6 +3289,8 @@ mod tests { default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec![]), @@ -3308,6 +3330,8 @@ PRIMARY KEY (`id`) default: Some("'abc'".to_string()), comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec![]), partition_by: None, @@ -3345,6 +3369,8 @@ ENGINE = MergeTree default: Some("42".to_string()), comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec![]), partition_by: None, @@ -3384,6 +3410,8 @@ ENGINE = MergeTree default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "sample_hash".to_string(), @@ -3394,6 +3422,8 @@ ENGINE = MergeTree default: Some("xxHash64(_id)".to_string()), // SQL function - no quotes comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "created_at".to_string(), @@ -3404,6 +3434,8 @@ ENGINE = MergeTree default: Some("now()".to_string()), // SQL function - no quotes comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec![]), @@ -3443,6 +3475,8 @@ ENGINE = MergeTree default: None, comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec!["id".to_string()]), partition_by: None, @@ -3483,6 +3517,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }], engine: ClickhouseEngine::ReplacingMergeTree { ver: None, @@ -3519,6 +3555,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "version".to_string(), @@ -3529,6 +3567,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -3572,6 +3612,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "version".to_string(), @@ -3582,6 +3624,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "is_deleted".to_string(), @@ -3592,6 +3636,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -3635,6 +3681,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }], sample_by: None, order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -3740,6 +3788,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "nested_data".to_string(), @@ -3753,6 +3803,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "field2".to_string(), @@ -3763,6 +3815,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ]), required: true, @@ -3771,6 +3825,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "status".to_string(), @@ -3793,6 +3849,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ], sample_by: None, @@ -3842,6 +3900,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ClickHouseColumn { name: "data".to_string(), @@ -3852,6 +3912,8 @@ ORDER BY (`id`) "#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec![]), @@ -4326,6 +4388,8 @@ SETTINGS keeper_path = '/clickhouse/s3queue/test_table', mode = 'unordered', s3q default: None, comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec![]), partition_by: None, @@ -4873,6 +4937,8 @@ ENGINE = S3Queue('s3://my-bucket/data/*.csv', NOSIGN, 'CSV')"#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec![]), partition_by: None, @@ -4920,6 +4986,8 @@ ENGINE = S3Queue('s3://my-bucket/data/*.csv', NOSIGN, 'CSV')"#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }], order_by: OrderBy::Fields(vec![]), partition_by: None, @@ -5017,6 +5085,8 @@ ENGINE = S3Queue('s3://my-bucket/data/*.csv', NOSIGN, 'CSV')"#; default: None, comment: None, ttl: None, + codec: None, + materialized: None, }; let cluster_clause = Some("test_cluster") @@ -5880,4 +5950,232 @@ ENGINE = S3Queue('s3://my-bucket/data/*.csv', NOSIGN, 'CSV')"#; let result2 = ClickhouseEngine::try_from(another_invalid); assert!(result2.is_err(), "Should reject invalid format 'BadFormat'"); } + + #[test] + fn test_create_table_with_codec() { + let columns = vec![ + ClickHouseColumn { + name: "id".to_string(), + column_type: ClickHouseColumnType::String, + required: true, + unique: false, + primary_key: true, + default: None, + comment: None, + ttl: None, + codec: None, + materialized: None, + }, + ClickHouseColumn { + name: "log_blob".to_string(), + column_type: ClickHouseColumnType::Json(Default::default()), + required: true, + unique: false, + primary_key: false, + default: None, + comment: None, + ttl: None, + codec: Some("ZSTD(3)".to_string()), + materialized: None, + }, + ClickHouseColumn { + name: "timestamp".to_string(), + column_type: ClickHouseColumnType::DateTime64 { precision: 3 }, + required: true, + unique: false, + primary_key: false, + default: None, + comment: None, + ttl: None, + codec: Some("Delta, LZ4".to_string()), + materialized: None, + }, + ClickHouseColumn { + name: "tags".to_string(), + column_type: ClickHouseColumnType::Array(Box::new(ClickHouseColumnType::String)), + required: true, + unique: false, + primary_key: false, + default: None, + comment: None, + ttl: None, + codec: Some("ZSTD(1)".to_string()), + materialized: None, + }, + ]; + + let table = ClickHouseTable { + name: "test_table".to_string(), + version: None, + columns, + order_by: OrderBy::Fields(vec!["id".to_string()]), + engine: ClickhouseEngine::MergeTree, + table_ttl_setting: None, + partition_by: None, + sample_by: None, + table_settings: None, + indexes: vec![], + cluster_name: None, + }; + + let query = create_table_query("test_db", table, false).unwrap(); + let expected = r#" +CREATE TABLE IF NOT EXISTS `test_db`.`test_table` +( + `id` String NOT NULL, + `log_blob` JSON NOT NULL CODEC(ZSTD(3)), + `timestamp` DateTime64(3) NOT NULL CODEC(Delta, LZ4), + `tags` Array(String) NOT NULL CODEC(ZSTD(1)) +) +ENGINE = MergeTree +PRIMARY KEY (`id`) +ORDER BY (`id`) +"#; + assert_eq!(query.trim(), expected.trim()); + } + + #[test] + fn test_create_table_with_materialized_column() { + use crate::framework::versions::Version; + + let columns = vec![ + ClickHouseColumn { + name: "event_time".to_string(), + column_type: ClickHouseColumnType::DateTime64 { precision: 3 }, + required: true, + primary_key: false, + unique: false, + default: None, + materialized: None, + comment: None, + ttl: None, + codec: None, + }, + ClickHouseColumn { + name: "event_date".to_string(), + column_type: ClickHouseColumnType::Date, + required: true, + primary_key: false, + unique: false, + default: None, + materialized: Some("toDate(event_time)".to_string()), + comment: None, + ttl: None, + codec: None, + }, + ]; + + let table = ClickHouseTable { + version: Some(Version::from_string("1".to_string())), + name: "test_table".to_string(), + columns, + order_by: OrderBy::Fields(vec!["event_time".to_string()]), + partition_by: None, + sample_by: None, + engine: ClickhouseEngine::MergeTree, + table_settings: None, + indexes: vec![], + table_ttl_setting: None, + cluster_name: None, + }; + + let query = create_table_query("test_db", table, false).unwrap(); + let expected = r#" +CREATE TABLE IF NOT EXISTS `test_db`.`test_table` +( + `event_time` DateTime64(3) NOT NULL, + `event_date` Date NOT NULL MATERIALIZED toDate(event_time) +) +ENGINE = MergeTree +ORDER BY (`event_time`) +"#; + assert_eq!(query.trim(), expected.trim()); + } + + #[test] + fn test_materialized_column_with_codec() { + use crate::framework::core::infrastructure::table::JsonOptions; + use crate::framework::versions::Version; + + // Test customer's use case: MATERIALIZED column with CODEC + let columns = vec![ + ClickHouseColumn { + name: "log_blob".to_string(), + column_type: ClickHouseColumnType::Json(JsonOptions::default()), + required: true, + primary_key: false, + unique: false, + default: None, + materialized: None, + comment: None, + ttl: None, + codec: Some("ZSTD(3)".to_string()), + }, + ClickHouseColumn { + name: "combination_hash".to_string(), + column_type: ClickHouseColumnType::Array(Box::new( + ClickHouseColumnType::ClickhouseInt(ClickHouseInt::UInt64), + )), + required: true, + primary_key: false, + unique: false, + default: None, + materialized: Some( + "arrayMap(kv -> cityHash64(kv.1, kv.2), JSONExtractKeysAndValuesRaw(toString(log_blob)))".to_string(), + ), + comment: None, + ttl: None, + codec: Some("ZSTD(1)".to_string()), + }, + ]; + + let table = ClickHouseTable { + version: Some(Version::from_string("1".to_string())), + name: "logs".to_string(), + columns, + order_by: OrderBy::SingleExpr("tuple()".to_string()), + partition_by: None, + sample_by: None, + engine: ClickhouseEngine::MergeTree, + table_settings: None, + indexes: vec![], + table_ttl_setting: None, + cluster_name: None, + }; + + let query = create_table_query("test_db", table, false).unwrap(); + + // Verify the query contains the MATERIALIZED clause and CODEC + assert!(query.contains("MATERIALIZED arrayMap")); + assert!(query.contains("CODEC(ZSTD(1))")); + assert!(query.contains("CODEC(ZSTD(3))")); + } + + #[test] + fn test_validation_default_and_materialized_mutually_exclusive() { + use crate::framework::core::infrastructure::table::{Column, ColumnType, IntType}; + use crate::infrastructure::olap::clickhouse::mapper::std_column_to_clickhouse_column; + + let column = Column { + name: "bad_column".to_string(), + data_type: ColumnType::Int(IntType::Int32), + required: true, + unique: false, + primary_key: false, + default: Some("42".to_string()), + materialized: Some("id + 1".to_string()), // Invalid: both default and materialized + annotations: vec![], + comment: None, + ttl: None, + codec: None, + }; + + let result = std_column_to_clickhouse_column(column); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!( + error_msg.contains("both DEFAULT and MATERIALIZED") + || error_msg.contains("mutually exclusive") + ); + } } diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/type_parser.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/type_parser.rs index fc2221a218..cdb26dd80b 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/type_parser.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/type_parser.rs @@ -1678,6 +1678,8 @@ pub fn convert_ast_to_column_type( // system.columns queries, not from type string parsing. comment: None, ttl: None, + codec: None, + materialized: None, }); } TupleElement::Unnamed(_) => { diff --git a/apps/framework-cli/src/infrastructure/olap/ddl_ordering.rs b/apps/framework-cli/src/infrastructure/olap/ddl_ordering.rs index ca9589426b..1987176a42 100644 --- a/apps/framework-cli/src/infrastructure/olap/ddl_ordering.rs +++ b/apps/framework-cli/src/infrastructure/olap/ddl_ordering.rs @@ -1351,6 +1351,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, after_column: None, dependency_info: DependencyInfo { @@ -1673,6 +1675,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; // Create operations with correct dependencies @@ -2727,6 +2731,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }; // Create operations with signatures that work with the current implementation @@ -2914,6 +2920,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "old_column".to_string(), @@ -2925,6 +2933,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -2959,6 +2969,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "new_column".to_string(), @@ -2970,6 +2982,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], order_by: OrderBy::Fields(vec!["id".to_string()]), @@ -3003,6 +3017,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }), ColumnChange::Added { column: Column { @@ -3015,6 +3031,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, position_after: Some("id".to_string()), }, diff --git a/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs b/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs index bd7ccf8cca..bcdcc11dfc 100644 --- a/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs +++ b/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs @@ -1214,6 +1214,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "B".to_string(), @@ -1225,6 +1227,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "C".to_string(), @@ -1242,6 +1246,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "b".to_string(), @@ -1259,6 +1265,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "e".to_string(), @@ -1270,6 +1278,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "f".to_string(), @@ -1281,6 +1291,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], }), @@ -1291,6 +1303,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "c".to_string(), @@ -1302,6 +1316,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], }), @@ -1312,6 +1328,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "D".to_string(), @@ -1323,6 +1341,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ], }; diff --git a/apps/framework-cli/src/utilities/validate_passthrough.rs b/apps/framework-cli/src/utilities/validate_passthrough.rs index b938189eb5..9bdcf2b499 100644 --- a/apps/framework-cli/src/utilities/validate_passthrough.rs +++ b/apps/framework-cli/src/utilities/validate_passthrough.rs @@ -648,6 +648,8 @@ impl<'de, S: SerializeValue> Visitor<'de> for &mut ValueVisitor<'_, S> { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, } }) .collect(); @@ -1316,6 +1318,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "int_col".to_string(), @@ -1327,6 +1331,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "float_col".to_string(), @@ -1338,6 +1344,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "bool_col".to_string(), @@ -1349,6 +1357,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "date_col".to_string(), @@ -1360,6 +1370,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -1394,6 +1406,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let json = r#" @@ -1428,6 +1442,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let json = r#" @@ -1469,6 +1485,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Test valid enum value @@ -1518,6 +1536,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "nested_int".to_string(), @@ -1529,6 +1549,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -1543,6 +1565,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "nested_object".to_string(), @@ -1558,6 +1582,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -1616,6 +1642,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "optional_field".to_string(), @@ -1627,6 +1655,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -1658,6 +1688,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "aud".to_string(), @@ -1669,6 +1701,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "exp".to_string(), @@ -1680,6 +1714,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -1694,6 +1730,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, Column { name: "jwt_object".to_string(), @@ -1709,6 +1747,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }, ]; @@ -1754,6 +1794,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Test valid map @@ -1811,6 +1853,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Test valid map with numeric keys (as strings in JSON) @@ -1865,6 +1909,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Min boundary 0 @@ -1908,6 +1954,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Min boundary -32768 @@ -1951,6 +1999,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let positive_limit: BigInt = BigInt::from(1u8) << 127usize; @@ -1996,6 +2046,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let positive_limit: BigInt = BigInt::from(1u8) << 255usize; @@ -2041,6 +2093,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let limit: BigUint = BigUint::from(1u8) << 256usize; @@ -2087,6 +2141,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Valid keys @@ -2127,6 +2183,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let positive_limit: BigInt = BigInt::from(1u8) << 255usize; @@ -2167,6 +2225,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let limit: BigUint = BigUint::from(1u8) << 256usize; @@ -2211,6 +2271,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; let json = r#" @@ -2242,6 +2304,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // missing nested path @@ -2274,6 +2338,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // null at the nested path counts as missing for non-nullable types @@ -2321,6 +2387,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Test 1: Two's complement value (what -1 becomes with naive cast) should be rejected @@ -2390,6 +2458,8 @@ mod tests { annotations: vec![], comment: None, ttl: None, + codec: None, + materialized: None, }]; // Test negative values work with i64 diff --git a/apps/framework-docs/llm-docs/python/table-setup.md b/apps/framework-docs/llm-docs/python/table-setup.md index 9bbb2b1e1e..65da3efc7f 100644 --- a/apps/framework-docs/llm-docs/python/table-setup.md +++ b/apps/framework-docs/llm-docs/python/table-setup.md @@ -914,4 +914,65 @@ class DistributedEngine: 3. **Load distribution**: Balance write and read load across multiple servers 4. **Geographic distribution**: Place data closer to users in different regions -For more details, see the [ClickHouse Distributed documentation](https://clickhouse.com/docs/en/engines/table-engines/special/distributed). \ No newline at end of file +For more details, see the [ClickHouse Distributed documentation](https://clickhouse.com/docs/en/engines/table-engines/special/distributed). + +## Compression Codecs + +Specify per-column compression codecs to optimize storage and performance: + +```python +from typing import Annotated, Any +from moose_lib import OlapTable, OlapConfig, ClickHouseCodec, UInt64 +from pydantic import BaseModel +from datetime import datetime + +class Metrics(BaseModel): + # Delta for timestamps and monotonically increasing values + timestamp: Annotated[datetime, ClickHouseCodec("Delta, LZ4")] + + # Gorilla for floating point sensor data + temperature: Annotated[float, ClickHouseCodec("Gorilla, ZSTD(3)")] + + # DoubleDelta for counters and metrics + request_count: Annotated[float, ClickHouseCodec("DoubleDelta, LZ4")] + + # ZSTD for text/JSON with compression level (1-22) + log_data: Annotated[Any, ClickHouseCodec("ZSTD(9)")] + user_agent: Annotated[str, ClickHouseCodec("ZSTD(3)")] + + # Compress array elements + tags: Annotated[list[str], ClickHouseCodec("LZ4")] + event_ids: Annotated[list[UInt64], ClickHouseCodec("ZSTD(1)")] + + # No codec (uses ClickHouse default) + status_code: int + +metrics_table = OlapTable[Metrics]( + "Metrics", + OlapConfig(order_by_fields=["timestamp"]) +) +``` + +### Common Codecs +- **Delta/DoubleDelta**: For timestamps, counters, monotonic values +- **Gorilla**: For floating-point sensor data, temperatures, stock prices +- **ZSTD**: General-purpose with levels 1-22 (higher = better compression, slower) +- **LZ4**: Fast decompression, lower CPU usage + +### Codec Chains +Combine codecs (processed left-to-right): `"Delta, LZ4"` or `"Gorilla, ZSTD(3)"` + +### Combining with Other Annotations +```python +from moose_lib import clickhouse_default, ClickHouseTTL, UInt64 + +class Events(BaseModel): + # Codec + Default value + status: Annotated[str, clickhouse_default("'pending'"), ClickHouseCodec("ZSTD(3)")] + + # Codec + TTL + email: Annotated[str, ClickHouseTTL("timestamp + INTERVAL 30 DAY"), ClickHouseCodec("ZSTD(3)")] + + # Codec + Numeric type + event_count: Annotated[UInt64, ClickHouseCodec("DoubleDelta, LZ4")] +``` diff --git a/apps/framework-docs/llm-docs/typescript/table-setup.md b/apps/framework-docs/llm-docs/typescript/table-setup.md index 59714c4a45..4e93d9e020 100644 --- a/apps/framework-docs/llm-docs/typescript/table-setup.md +++ b/apps/framework-docs/llm-docs/typescript/table-setup.md @@ -723,4 +723,60 @@ You can verify your tables were created correctly using: moose ls ``` -Or by connecting directly to your local ClickHouse instance and running SQL commands. \ No newline at end of file +Or by connecting directly to your local ClickHouse instance and running SQL commands. + +## Compression Codecs + +Specify per-column compression codecs to optimize storage and performance: + +```typescript +import { Codec, DateTime, UInt64 } from '@514labs/moose-lib'; + +interface Metrics { + // Delta for timestamps and monotonically increasing values + timestamp: DateTime & ClickHouseCodec<"Delta, LZ4">; + + // Gorilla for floating point sensor data + temperature: number & ClickHouseCodec<"Gorilla, ZSTD(3)">; + + // DoubleDelta for counters and metrics + request_count: number & ClickHouseCodec<"DoubleDelta, LZ4">; + + // ZSTD for text/JSON with compression level (1-22) + log_data: Record & ClickHouseCodec<"ZSTD(9)">; + user_agent: string & ClickHouseCodec<"ZSTD(3)">; + + // Compress array elements + tags: string[] & ClickHouseCodec<"LZ4">; + event_ids: UInt64[] & ClickHouseCodec<"ZSTD(1)">; +} + +export const MetricsTable = new OlapTable("Metrics", { + orderByFields: ["timestamp"] +}); +``` + +### Common Codecs +- **Delta/DoubleDelta**: For timestamps, counters, monotonic values +- **Gorilla**: For floating-point sensor data, temperatures, stock prices +- **ZSTD**: General-purpose with levels 1-22 (higher = better compression, slower) +- **LZ4**: Fast decompression, lower CPU usage + +### Codec Chains +Combine codecs (processed left-to-right): `Delta, LZ4` or `Gorilla, ZSTD(3)` + +### Combining with Other Annotations +```typescript +import { ClickHouseDefault, ClickHouseTTL } from "@514labs/moose-lib"; + +interface Events { + // Codec + Default value + status: string & ClickHouseDefault<"'pending'"> & ClickHouseCodec<"ZSTD(3)">; + + // Codec + TTL + email: string & ClickHouseTTL<"timestamp + INTERVAL 30 DAY"> & ClickHouseCodec<"ZSTD(3)">; + + // Codec + Numeric type + event_count: UInt64 & ClickHouseCodec<"DoubleDelta, LZ4">; +} +``` diff --git a/apps/framework-docs/src/pages/moose/olap/_meta.tsx b/apps/framework-docs/src/pages/moose/olap/_meta.tsx index 4dbc6563d8..1d3db08d43 100644 --- a/apps/framework-docs/src/pages/moose/olap/_meta.tsx +++ b/apps/framework-docs/src/pages/moose/olap/_meta.tsx @@ -20,6 +20,9 @@ const rawMeta = { ttl: { title: "TTL (Time-to-Live)", }, + compression: { + title: "Compression Codecs", + }, "schema-optimization": { title: "Schema Optimization", }, diff --git a/apps/framework-docs/src/pages/moose/olap/compression.mdx b/apps/framework-docs/src/pages/moose/olap/compression.mdx new file mode 100644 index 0000000000..08dd9bd197 --- /dev/null +++ b/apps/framework-docs/src/pages/moose/olap/compression.mdx @@ -0,0 +1,167 @@ +import { TypeScript, Python, LanguageSwitcher, Callout } from "@/components"; + + + +# Compression Codecs + +Moose lets you specify ClickHouse compression codecs per-column to optimize storage and query performance. Different codecs work better for different data types, and you can chain multiple codecs together. + +## When to use compression codecs + +- **Time series data**: Use `Delta` or `DoubleDelta` for timestamps and monotonically increasing values +- **Floating point metrics**: Use `Gorilla` codec for sensor data, temperatures, and other float values +- **Text and JSON**: Use `ZSTD` with compression levels (1-22) for large strings and JSON +- **High cardinality data**: Combine specialized codecs with general-purpose compression (e.g., `Delta, LZ4`) + +## Basic Usage + + +```typescript +import { OlapTable, Key, DateTime, Codec, UInt64 } from "@514labs/moose-lib"; + +interface Metrics { + id: Key; + // Delta codec for timestamps (monotonically increasing) + timestamp: DateTime & ClickHouseCodec<"Delta, LZ4">; + + // Gorilla codec for floating point sensor data + temperature: number & ClickHouseCodec<"Gorilla, ZSTD(3)">; + + // DoubleDelta for counters and metrics + request_count: number & ClickHouseCodec<"DoubleDelta, LZ4">; + + // ZSTD for text/JSON with compression level + log_data: Record & ClickHouseCodec<"ZSTD(3)">; + user_agent: string & ClickHouseCodec<"ZSTD(3)">; + + // Compress array elements + tags: string[] & ClickHouseCodec<"LZ4">; + event_ids: UInt64[] & ClickHouseCodec<"ZSTD(1)">; +} + +export const MetricsTable = new OlapTable("Metrics", { + orderByFields: ["id", "timestamp"] +}); +``` + + + +```python +from typing import Annotated, Any +from moose_lib import OlapTable, OlapConfig, Key, ClickHouseCodec, UInt64 +from pydantic import BaseModel +from datetime import datetime + +class Metrics(BaseModel): + id: Key[str] + # Delta codec for timestamps (monotonically increasing) + timestamp: Annotated[datetime, ClickHouseCodec("Delta, LZ4")] + + # Gorilla codec for floating point sensor data + temperature: Annotated[float, ClickHouseCodec("Gorilla, ZSTD(3)")] + + # DoubleDelta for counters and metrics + request_count: Annotated[float, ClickHouseCodec("DoubleDelta, LZ4")] + + # ZSTD for text/JSON with compression level + log_data: Annotated[Any, ClickHouseCodec("ZSTD(3)")] + user_agent: Annotated[str, ClickHouseCodec("ZSTD(3)")] + + # Compress array elements + tags: Annotated[list[str], ClickHouseCodec("LZ4")] + event_ids: Annotated[list[UInt64], ClickHouseCodec("ZSTD(1)")] + +metrics_table = OlapTable[Metrics]( + "Metrics", + OlapConfig(order_by_fields=["id", "timestamp"]) +) +``` + + +## Codec Chains + +You can chain multiple codecs together. Data is processed by each codec in sequence (left-to-right). + + +```typescript +interface Events { + // Delta compress timestamps, then apply LZ4 + timestamp: DateTime & ClickHouseCodec<"Delta, LZ4">; + + // Gorilla for floats, then ZSTD for extra compression + value: number & ClickHouseCodec<"Gorilla, ZSTD(3)">; +} +``` + + + +```python +class Events(BaseModel): + # Delta compress timestamps, then apply LZ4 + timestamp: Annotated[datetime, ClickHouseCodec("Delta, LZ4")] + + # Gorilla for floats, then ZSTD for extra compression + value: Annotated[float, ClickHouseCodec("Gorilla, ZSTD(3)")] +``` + + +## Combining with Other Annotations + +Codecs work alongside other ClickHouse annotations: + + +```typescript +import { ClickHouseDefault, ClickHouseTTL } from "@514labs/moose-lib"; + +interface UserEvents { + id: Key; + timestamp: DateTime & ClickHouseCodec<"Delta, LZ4">; + + // Codec + Default value + status: string & ClickHouseDefault<"'pending'"> & ClickHouseCodec<"ZSTD(3)">; + + // Codec + TTL + email: string & ClickHouseTTL<"timestamp + INTERVAL 30 DAY"> & ClickHouseCodec<"ZSTD(3)">; + + // Codec + Numeric type + event_count: UInt64 & ClickHouseCodec<"DoubleDelta, LZ4">; +} +``` + + + +```python +from moose_lib import clickhouse_default, ClickHouseTTL, ClickHouseCodec + +class UserEvents(BaseModel): + id: Key[str] + timestamp: Annotated[datetime, ClickHouseCodec("Delta, LZ4")] + + # Codec + Default value + status: Annotated[str, clickhouse_default("'pending'"), ClickHouseCodec("ZSTD(3)")] + + # Codec + TTL + email: Annotated[str, ClickHouseTTL("timestamp + INTERVAL 30 DAY"), ClickHouseCodec("ZSTD(3)")] + + # Codec + Numeric type + event_count: Annotated[UInt64, ClickHouseCodec("DoubleDelta, LZ4")] +``` + + +## Syncing from Remote Tables + +When using `moose init --from-remote` to introspect existing ClickHouse tables, Moose automatically captures codec definitions and generates the appropriate annotations in your data models. + +## Notes + +- Codec expressions must be valid ClickHouse codec syntax (without the `CODEC()` wrapper) +- ClickHouse may normalize codecs by adding default parameters (e.g., `Delta` becomes `Delta(4)`) +- Moose applies codec changes via migrations using `ALTER TABLE ... MODIFY COLUMN` +- Not all codecs work with all data types - ClickHouse will validate during table creation + +## Related + +- See [Supported Types](/moose/olap/supported-types) for all available column types +- See [Schema Optimization](/moose/olap/schema-optimization) for other performance techniques +- See [Applying Migrations](/moose/olap/apply-migrations) to roll out codec changes +- See [ClickHouse Compression Codecs](https://clickhouse.com/docs/en/sql-reference/statements/create/table#column_compression_codec) for detailed codec documentation diff --git a/apps/framework-docs/src/pages/moose/olap/schema-optimization.mdx b/apps/framework-docs/src/pages/moose/olap/schema-optimization.mdx index 69bf4bb0d5..2cc959490a 100644 --- a/apps/framework-docs/src/pages/moose/olap/schema-optimization.mdx +++ b/apps/framework-docs/src/pages/moose/olap/schema-optimization.mdx @@ -472,3 +472,41 @@ bad_user_events_table = OlapTable[UserEvent]("user_events", { }) ``` + +## Compression Codecs + +ClickHouse supports per-column compression codecs to optimize storage and query performance. Different codecs work better for different data types. + +**Quick examples:** +- Time series: Use `Delta` or `DoubleDelta` for timestamps and counters +- Floating point: Use `Gorilla` codec for sensor data and metrics +- Text/JSON: Use `ZSTD` with compression levels for large strings +- Combine codecs: Chain specialized compression with general-purpose (e.g., `Delta, LZ4`) + + +```typescript +import { Codec, DateTime, UInt64 } from "@514labs/moose-lib"; + +interface Metrics { + timestamp: DateTime & ClickHouseCodec<"Delta, LZ4">; // Timestamps + temperature: number & ClickHouseCodec<"Gorilla, ZSTD(3)">; // Float metrics + log_data: Record & ClickHouseCodec<"ZSTD(9)">; // JSON with heavy compression + event_ids: UInt64[] & ClickHouseCodec<"ZSTD(1)">; // Arrays +} +``` + + + +```python +from moose_lib import ClickHouseCodec, UInt64 +from typing import Annotated, Any + +class Metrics(BaseModel): + timestamp: Annotated[datetime, ClickHouseCodec("Delta, LZ4")] # Timestamps + temperature: Annotated[float, ClickHouseCodec("Gorilla, ZSTD(3)")] # Float metrics + log_data: Annotated[Any, ClickHouseCodec("ZSTD(9)")] # JSON with heavy compression + event_ids: Annotated[list[UInt64], ClickHouseCodec("ZSTD(1)")] # Arrays +``` + + +**See the [Compression Codecs](/moose/olap/compression) guide for detailed codec documentation and use cases.** diff --git a/packages/protobuf/infrastructure_map.proto b/packages/protobuf/infrastructure_map.proto index 96887f6167..655dfa1a90 100644 --- a/packages/protobuf/infrastructure_map.proto +++ b/packages/protobuf/infrastructure_map.proto @@ -241,6 +241,10 @@ message Column { optional string comment = 8; // Column comment for metadata storage // Optional column-level TTL expression (without leading 'TTL') optional string ttl = 10; + // Compression codec expression (e.g., "ZSTD(3)", "Delta, LZ4") + optional string codec = 11; + // MATERIALIZED column expression (computed at write-time, physically stored) + optional string materialized = 12; } enum SimpleColumnType { diff --git a/packages/py-moose-lib/moose_lib/__init__.py b/packages/py-moose-lib/moose_lib/__init__.py index 1fa7d5ca76..8ef2d3d3f7 100644 --- a/packages/py-moose-lib/moose_lib/__init__.py +++ b/packages/py-moose-lib/moose_lib/__init__.py @@ -37,6 +37,11 @@ StringToEnumMixin, FixedString, ClickhouseFixedStringSize, + ClickhouseDefault, + clickhouse_default, + ClickHouseTTL, + ClickHouseCodec, + ClickHouseMaterialized, # Integer types Int8, Int16, diff --git a/packages/py-moose-lib/moose_lib/data_models.py b/packages/py-moose-lib/moose_lib/data_models.py index dcf3c287db..fa2a5c63ce 100644 --- a/packages/py-moose-lib/moose_lib/data_models.py +++ b/packages/py-moose-lib/moose_lib/data_models.py @@ -72,6 +72,46 @@ class ClickHouseTTL: expression: str +@dataclasses.dataclass(frozen=True) +class ClickHouseCodec: + expression: str + + +@dataclasses.dataclass(frozen=True) +class ClickHouseMaterialized: + """ + ClickHouse MATERIALIZED column annotation. + The column value is computed at INSERT time and physically stored. + Cannot be explicitly inserted by users. + + Args: + expression: ClickHouse SQL expression using column names (snake_case) + + Examples: + # Extract date component + event_date: Annotated[date, ClickHouseMaterialized("toDate(event_time)")] + + # Precompute hash + user_hash: Annotated[int, ClickHouseMaterialized("cityHash64(user_id)")] + + # Complex expression with JSON + combination_hash: Annotated[ + list[int], + ClickHouseMaterialized( + "arrayMap(kv -> cityHash64(kv.1, kv.2), " + "JSONExtractKeysAndValuesRaw(toString(log_blob)))" + ) + ] + + Notes: + - Expression uses ClickHouse column names, not Python field names + - MATERIALIZED and DEFAULT are mutually exclusive + - Can be combined with ClickHouseCodec for compression + - Changing the expression requires DROP + ADD (data is lost) + """ + expression: str + + @dataclasses.dataclass(frozen=True) class ClickHouseJson: max_dynamic_paths: int | None = None @@ -271,6 +311,8 @@ class Column(BaseModel): default: str | None = None annotations: list[Tuple[str, Any]] = [] ttl: str | None = None + codec: str | None = None + materialized: str | None = None def to_expr(self): # Lazy import to avoid circular dependency at import time @@ -613,12 +655,31 @@ def _to_columns(model: type[BaseModel]) -> list[Column]: None, ) + # Extract MATERIALIZED expression from metadata, if provided + materialized_expr = next( + (md.expression for md in mds if isinstance(md, ClickHouseMaterialized)), + None, + ) + + # Validate mutual exclusivity of DEFAULT and MATERIALIZED + if default_expr and materialized_expr: + raise ValueError( + f"Column '{column_name}' cannot have both DEFAULT and MATERIALIZED. " + f"Use one or the other." + ) + # Extract TTL expression from metadata, if provided ttl_expr = next( (md.expression for md in mds if isinstance(md, ClickHouseTTL)), None, ) + # Extract CODEC expression from metadata, if provided + codec_expr = next( + (md.expression for md in mds if isinstance(md, ClickHouseCodec)), + None, + ) + columns.append( Column( name=column_name, @@ -627,8 +688,10 @@ def _to_columns(model: type[BaseModel]) -> list[Column]: unique=False, primary_key=primary_key, default=default_expr, + materialized=materialized_expr, annotations=annotations, ttl=ttl_expr, + codec=codec_expr, ) ) return columns diff --git a/packages/py-moose-lib/tests/test_codec.py b/packages/py-moose-lib/tests/test_codec.py new file mode 100644 index 0000000000..e52f24ae7b --- /dev/null +++ b/packages/py-moose-lib/tests/test_codec.py @@ -0,0 +1,76 @@ +from datetime import datetime +from typing import Annotated, Any +from pydantic import BaseModel +from moose_lib import Key, ClickHouseCodec, UInt64 +from moose_lib.data_models import _to_columns + + +def test_codec_single(): + """Test single codec annotation converts to correct ClickHouse CODEC.""" + + class CodecTest(BaseModel): + id: Key[str] + data: Annotated[str, ClickHouseCodec("ZSTD(3)")] + + columns = _to_columns(CodecTest) + by_name = {col.name: col for col in columns} + + assert by_name["data"].codec == "ZSTD(3)" + assert by_name["id"].codec is None + + +def test_codec_chain(): + """Test codec chain annotation (Delta, LZ4).""" + + class CodecChainTest(BaseModel): + timestamp: Annotated[datetime, ClickHouseCodec("Delta, LZ4")] + value: Annotated[float, ClickHouseCodec("Gorilla, ZSTD")] + + columns = _to_columns(CodecChainTest) + by_name = {col.name: col for col in columns} + + assert by_name["timestamp"].codec == "Delta, LZ4" + assert by_name["value"].codec == "Gorilla, ZSTD" + + +def test_codec_with_level(): + """Test codec with compression level.""" + + class CodecLevelTest(BaseModel): + log_blob: Annotated[Any, ClickHouseCodec("ZSTD(3)")] + combination_hash: Annotated[list[UInt64], ClickHouseCodec("ZSTD(1)")] + + columns = _to_columns(CodecLevelTest) + by_name = {col.name: col for col in columns} + + assert by_name["log_blob"].codec == "ZSTD(3)" + assert by_name["combination_hash"].codec == "ZSTD(1)" + + +def test_codec_specialized(): + """Test specialized codecs.""" + + class SpecializedCodecTest(BaseModel): + timestamp: Annotated[datetime, ClickHouseCodec("Delta")] + counter: Annotated[int, ClickHouseCodec("DoubleDelta")] + temperature: Annotated[float, ClickHouseCodec("Gorilla")] + + columns = _to_columns(SpecializedCodecTest) + by_name = {col.name: col for col in columns} + + assert by_name["timestamp"].codec == "Delta" + assert by_name["counter"].codec == "DoubleDelta" + assert by_name["temperature"].codec == "Gorilla" + + +def test_codec_none(): + """Test codec with NONE (uncompressed).""" + + class NoCodecTest(BaseModel): + data: Annotated[str, ClickHouseCodec("NONE")] + + columns = _to_columns(NoCodecTest) + by_name = {col.name: col for col in columns} + + assert by_name["data"].codec == "NONE" + diff --git a/packages/py-moose-lib/tests/test_materialized.py b/packages/py-moose-lib/tests/test_materialized.py new file mode 100644 index 0000000000..adedb52286 --- /dev/null +++ b/packages/py-moose-lib/tests/test_materialized.py @@ -0,0 +1,70 @@ +from datetime import datetime, date +from typing import Annotated, Any +from pydantic import BaseModel +from moose_lib import Key, ClickHouseMaterialized, ClickHouseCodec, UInt64 +from moose_lib.data_models import _to_columns +import pytest + + +def test_materialized_basic(): + """Test basic MATERIALIZED annotation converts to correct expression.""" + + class MaterializedTest(BaseModel): + timestamp: datetime + event_date: Annotated[date, ClickHouseMaterialized("toDate(timestamp)")] + + columns = _to_columns(MaterializedTest) + by_name = {col.name: col for col in columns} + + assert by_name["timestamp"].materialized is None + assert by_name["event_date"].materialized == "toDate(timestamp)" + + +def test_materialized_hash(): + """Test MATERIALIZED with hash function.""" + + class HashTest(BaseModel): + user_id: str + user_hash: Annotated[UInt64, ClickHouseMaterialized("cityHash64(user_id)")] + + columns = _to_columns(HashTest) + by_name = {col.name: col for col in columns} + + assert by_name["user_id"].materialized is None + assert by_name["user_hash"].materialized == "cityHash64(user_id)" + + +def test_materialized_with_codec(): + """Test MATERIALIZED combined with CODEC.""" + + class MaterializedCodecTest(BaseModel): + log_blob: Annotated[Any, ClickHouseCodec("ZSTD(3)")] + combination_hash: Annotated[ + list[UInt64], + ClickHouseMaterialized("arrayMap(kv -> cityHash64(kv.1, kv.2), JSONExtractKeysAndValuesRaw(toString(log_blob)))"), + ClickHouseCodec("ZSTD(1)") + ] + + columns = _to_columns(MaterializedCodecTest) + by_name = {col.name: col for col in columns} + + assert by_name["log_blob"].materialized is None + assert by_name["log_blob"].codec == "ZSTD(3)" + assert by_name["combination_hash"].materialized == "arrayMap(kv -> cityHash64(kv.1, kv.2), JSONExtractKeysAndValuesRaw(toString(log_blob)))" + assert by_name["combination_hash"].codec == "ZSTD(1)" + + +def test_materialized_mutually_exclusive_with_default(): + """Test that MATERIALIZED and DEFAULT are mutually exclusive.""" + from moose_lib import clickhouse_default + + class BadModel(BaseModel): + bad_field: Annotated[ + str, + clickhouse_default("'default_value'"), + ClickHouseMaterialized("'materialized_value'") + ] + + with pytest.raises(ValueError, match="cannot have both DEFAULT and MATERIALIZED"): + _to_columns(BadModel) + diff --git a/packages/ts-moose-lib/src/browserCompatible.ts b/packages/ts-moose-lib/src/browserCompatible.ts index d998aeda2b..e3a19f8a6f 100644 --- a/packages/ts-moose-lib/src/browserCompatible.ts +++ b/packages/ts-moose-lib/src/browserCompatible.ts @@ -61,7 +61,9 @@ export { ClickHouseNamedTuple, ClickHouseDefault, ClickHouseTTL, + ClickHouseMaterialized, WithDefault, + ClickHouseCodec, // Added friendly aliases and numeric helpers DateTime, DateTime64, diff --git a/packages/ts-moose-lib/src/dataModels/dataModelTypes.ts b/packages/ts-moose-lib/src/dataModels/dataModelTypes.ts index 3a398121f8..30162c850c 100644 --- a/packages/ts-moose-lib/src/dataModels/dataModelTypes.ts +++ b/packages/ts-moose-lib/src/dataModels/dataModelTypes.ts @@ -32,7 +32,9 @@ export interface Column { unique: false; // what is this for? primary_key: boolean; default: string | null; + materialized: string | null; ttl: string | null; + codec: string | null; annotations: [string, any][]; } diff --git a/packages/ts-moose-lib/src/dataModels/typeConvert.ts b/packages/ts-moose-lib/src/dataModels/typeConvert.ts index e66da7b6a9..50adc708df 100644 --- a/packages/ts-moose-lib/src/dataModels/typeConvert.ts +++ b/packages/ts-moose-lib/src/dataModels/typeConvert.ts @@ -306,6 +306,27 @@ const handleDefault = (t: ts.Type, checker: TypeChecker): string | null => { return defaultType.value; }; +/** Detect ClickHouse materialized annotation on a type and return raw sql */ +const handleMaterialized = ( + t: ts.Type, + checker: TypeChecker, +): string | null => { + const materializedType = getTaggedType( + t, + checker, + "_clickhouse_materialized", + ); + if (materializedType === null) { + return null; + } + if (!materializedType.isStringLiteral()) { + throw new UnsupportedFeature( + 'ClickHouseMaterialized must use a string literal, e.g. ClickHouseMaterialized<"now()">', + ); + } + return materializedType.value; +}; + /** Detect ClickHouse TTL annotation on a type and return raw sql */ const handleTtl = (t: ts.Type, checker: TypeChecker): string | null => { const ttlType = getTaggedType(t, checker, "_clickhouse_ttl"); @@ -925,6 +946,20 @@ const handleDefaultWrapping = ( return undefined; }; +/** Detect ClickHouse Codec annotation on a type and return codec expression */ +const handleCodec = (t: ts.Type, checker: TypeChecker): string | null => { + const codecType = getTaggedType(t, checker, "_clickhouse_codec"); + if (codecType === null) { + return null; + } + if (!codecType.isStringLiteral()) { + throw new UnsupportedFeature( + 'ClickHouseCodec must use a string literal, e.g. ClickHouseCodec<"ZSTD(3)">', + ); + } + return codecType.value; +}; + export const toColumns = (t: ts.Type, checker: TypeChecker): Column[] => { if (checker.getIndexInfosOfType(t).length !== 0) { console.log("[CompilerPlugin]", checker.getIndexInfosOfType(t)); @@ -956,14 +991,26 @@ export const toColumns = (t: ts.Type, checker: TypeChecker): Column[] => { node?.type, ); + const defaultValue = defaultExpression ?? handleDefault(type, checker); + const materializedValue = handleMaterialized(type, checker); + + // Validate mutual exclusivity of DEFAULT and MATERIALIZED + if (defaultValue && materializedValue) { + throw new UnsupportedFeature( + `Column '${prop.name}' cannot have both ClickHouseDefault and ClickHouseMaterialized. Use one or the other.`, + ); + } + return { name: prop.name, data_type: dataType, primary_key: isKey, required: !nullable, unique: false, - default: defaultExpression ?? handleDefault(type, checker), + default: defaultValue, + materialized: materializedValue, ttl: handleTtl(type, checker), + codec: handleCodec(type, checker), annotations, }; }); diff --git a/packages/ts-moose-lib/src/dataModels/types.ts b/packages/ts-moose-lib/src/dataModels/types.ts index 25180b5bd0..d104eb5cab 100644 --- a/packages/ts-moose-lib/src/dataModels/types.ts +++ b/packages/ts-moose-lib/src/dataModels/types.ts @@ -73,6 +73,34 @@ export type UInt64 = number & ClickHouseInt<"uint64">; export type Decimal

= string & ClickHouseDecimal; +/** + * Attach compression codec to a column type. + * + * Any valid ClickHouse codec expression is allowed. ClickHouse validates the codec at runtime. + * + * @template T The base data type + * @template CodecExpr The codec expression (single codec or chain) + * + * @example + * interface Metrics { + * // Single codec + * log_blob: string & ClickHouseCodec<"ZSTD(3)">; + * + * // Codec chain (processed left-to-right) + * timestamp: Date & ClickHouseCodec<"Delta, LZ4">; + * temperature: number & ClickHouseCodec<"Gorilla, ZSTD">; + * + * // Specialized codecs + * counter: number & ClickHouseCodec<"DoubleDelta">; + * + * // Can combine with other annotations + * count: UInt64 & ClickHouseCodec<"DoubleDelta, LZ4">; + * } + */ +export type ClickHouseCodec = { + _clickhouse_codec?: CodecExpr; +}; + export type ClickHouseFloat = tags.Type< Value extends "float32" ? "float" : "double" >; @@ -175,6 +203,32 @@ export type ClickHouseTTL = { _clickhouse_ttl?: SqlExpression; }; +/** + * ClickHouse MATERIALIZED column annotation. + * The column value is computed at INSERT time and physically stored. + * Cannot be explicitly inserted by users. + * + * @example + * interface Events { + * eventTime: Date; + * // Extract date component - computed and stored at insert time + * eventDate: Date & ClickHouseMaterialized<"toDate(event_time)">; + * + * userId: string; + * // Precompute hash for fast lookups + * userHash: number & ClickHouseInt<"uint64"> & ClickHouseMaterialized<"cityHash64(user_id)">; + * } + * + * @remarks + * - Expression uses ClickHouse column names (snake_case), not TypeScript field names + * - MATERIALIZED and DEFAULT are mutually exclusive + * - Can be combined with ClickHouseCodec for compression + * - Changing the expression requires DROP + ADD (data is lost) + */ +export type ClickHouseMaterialized = { + _clickhouse_materialized?: SqlExpression; +}; + /** * See also {@link ClickHouseDefault} * diff --git a/packages/ts-moose-lib/src/dmv2/internal.ts b/packages/ts-moose-lib/src/dmv2/internal.ts index 7f003e79dd..bcc6fddef8 100644 --- a/packages/ts-moose-lib/src/dmv2/internal.ts +++ b/packages/ts-moose-lib/src/dmv2/internal.ts @@ -1162,6 +1162,8 @@ export const dlqColumns: Column[] = [ default: null, annotations: [], ttl: null, + codec: null, + materialized: null, }, { name: "errorMessage", @@ -1172,6 +1174,8 @@ export const dlqColumns: Column[] = [ default: null, annotations: [], ttl: null, + codec: null, + materialized: null, }, { name: "errorType", @@ -1182,6 +1186,8 @@ export const dlqColumns: Column[] = [ default: null, annotations: [], ttl: null, + codec: null, + materialized: null, }, { name: "failedAt", @@ -1192,6 +1198,8 @@ export const dlqColumns: Column[] = [ default: null, annotations: [], ttl: null, + codec: null, + materialized: null, }, { name: "source", @@ -1202,6 +1210,8 @@ export const dlqColumns: Column[] = [ default: null, annotations: [], ttl: null, + codec: null, + materialized: null, }, ]; diff --git a/packages/ts-moose-lib/tests/cluster-validation.test.ts b/packages/ts-moose-lib/tests/cluster-validation.test.ts index 6f3feea84c..e7b52e6d3a 100644 --- a/packages/ts-moose-lib/tests/cluster-validation.test.ts +++ b/packages/ts-moose-lib/tests/cluster-validation.test.ts @@ -23,7 +23,9 @@ const createMockColumns = (fields: string[]): Column[] => unique: false, primary_key: false, default: null, + materialized: null, ttl: null, + codec: null, annotations: [], })); diff --git a/packages/ts-moose-lib/tests/olap-table-versioning.test.ts b/packages/ts-moose-lib/tests/olap-table-versioning.test.ts index 861555e374..8f3cd65c2b 100644 --- a/packages/ts-moose-lib/tests/olap-table-versioning.test.ts +++ b/packages/ts-moose-lib/tests/olap-table-versioning.test.ts @@ -26,7 +26,9 @@ const createMockColumns = (fields: string[]): Column[] => unique: false, primary_key: false, default: null, + materialized: null, ttl: null, + codec: null, annotations: [], })); diff --git a/packages/ts-moose-lib/tests/typeConvert.test.ts b/packages/ts-moose-lib/tests/typeConvert.test.ts index e9cd56971b..0af1800fc7 100644 --- a/packages/ts-moose-lib/tests/typeConvert.test.ts +++ b/packages/ts-moose-lib/tests/typeConvert.test.ts @@ -46,6 +46,7 @@ function createProgramWithSource(tempDir: string, sourceText: string) { describe("typeConvert mappings for helper types", function () { this.timeout(20000); // Increase timeout for TypeScript compilation + it("maps DateTime, DateTime64, numeric aliases, Decimal and LowCardinality", function () { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "moose-typeconv-")); @@ -256,4 +257,65 @@ describe("typeConvert mappings for helper types", function () { expect(byName.id.data_type).to.equal("String"); expect(byName.created_at.data_type).to.equal("DateTime"); }); + + it("maps Codec annotations for compression", function () { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "moose-typeconv-")); + try { + const source = ` + import { ClickHouseCodec } from "@514labs/moose-lib"; + + export interface TestModel { + id: string; + log_blob: Record & ClickHouseCodec<"ZSTD(3)">; + timestamp: Date & ClickHouseCodec<"Delta, LZ4">; + temperature: number & ClickHouseCodec<"Gorilla, ZSTD(3)">; + user_agent: string & ClickHouseCodec<"ZSTD(3)">; + tags: string[] & ClickHouseCodec<"ZSTD(1)">; + no_codec: string; + } + `; + const { checker, type } = createProgramWithSource(tempDir, source); + const columns = toColumns(type, checker); + const byName = Object.fromEntries(columns.map((c) => [c.name, c])); + + expect(byName.id.codec).to.equal(null); + expect(byName.log_blob.codec).to.equal("ZSTD(3)"); + expect(byName.timestamp.codec).to.equal("Delta, LZ4"); + expect(byName.temperature.codec).to.equal("Gorilla, ZSTD(3)"); + expect(byName.user_agent.codec).to.equal("ZSTD(3)"); + expect(byName.tags.codec).to.equal("ZSTD(1)"); + expect(byName.no_codec.codec).to.equal(null); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it("maps Materialized annotations for computed columns", function () { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "moose-typeconv-")); + try { + const source = ` + import { ClickHouseMaterialized, UInt64 } from "@514labs/moose-lib"; + import typia from "typia"; + + export interface TestModel { + timestamp: Date; + userId: string; + eventDate: Date & ClickHouseMaterialized<"toDate(timestamp)">; + userHash: UInt64 & ClickHouseMaterialized<"cityHash64(userId)">; + no_materialized: string; + } + `; + const { checker, type } = createProgramWithSource(tempDir, source); + const columns = toColumns(type, checker); + const byName = Object.fromEntries(columns.map((c) => [c.name, c])); + + expect(byName.timestamp.materialized).to.equal(null); + expect(byName.userId.materialized).to.equal(null); + expect(byName.eventDate.materialized).to.equal("toDate(timestamp)"); + expect(byName.userHash.materialized).to.equal("cityHash64(userId)"); + expect(byName.no_materialized.materialized).to.equal(null); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); }); diff --git a/templates/python-tests/src/ingest/models.py b/templates/python-tests/src/ingest/models.py index 70124db444..61f941e7a2 100644 --- a/templates/python-tests/src/ingest/models.py +++ b/templates/python-tests/src/ingest/models.py @@ -1,7 +1,7 @@ # This file was auto-generated by the framework. You can add data models or change the existing ones from moose_lib import Point, Ring, LineString, MultiLineString, Polygon, MultiPolygon -from moose_lib import Key, IngestPipeline, IngestPipelineConfig, StringToEnumMixin, clickhouse_default, OlapTable, \ +from moose_lib import Key, IngestPipeline, IngestPipelineConfig, StringToEnumMixin, clickhouse_default, ClickHouseCodec, OlapTable, \ OlapConfig, MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, simple_aggregated, \ ClickhouseSize, UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64, ClickhousePrecision from datetime import datetime, date @@ -684,3 +684,54 @@ class DateTimePrecisionTestData(BaseModel): "DateTimePrecisionOutput", StreamConfig(destination=datetime_precision_output_table) ) + + +# =======Codec Compression Test======= +class CodecTest(BaseModel): + """Test model for codec compression support.""" + id: Key[str] + timestamp: Annotated[datetime, ClickHouseCodec("Delta, LZ4")] + log_blob: Annotated[Any, ClickHouseCodec("ZSTD(3)")] + combination_hash: Annotated[list[UInt64], ClickHouseCodec("ZSTD(1)")] + temperature: Annotated[float, ClickHouseCodec("Gorilla, ZSTD(3)")] + request_count: Annotated[float, ClickHouseCodec("DoubleDelta, LZ4")] + user_agent: Annotated[str, ClickHouseCodec("ZSTD(3)")] + tags: Annotated[list[str], ClickHouseCodec("LZ4")] + status_code: float + + +codec_test_model = IngestPipeline[CodecTest]("CodecTest", IngestPipelineConfig( + ingest_api=True, + stream=True, + table=True, + dead_letter_queue=True +)) + + +# =======Materialized Columns Test======= +from moose_lib import ClickHouseMaterialized + +class MaterializedTest(BaseModel): + """Test model for materialized column support.""" + id: Key[str] + timestamp: datetime + user_id: str + # Extract date from timestamp - computed and stored at insert time + event_date: Annotated[date, ClickHouseMaterialized("toDate(timestamp)")] + # Precompute hash for fast lookups + user_hash: Annotated[UInt64, ClickHouseMaterialized("cityHash64(user_id)")] + # Combine MATERIALIZED with CODEC + log_blob: Annotated[Any, ClickHouseCodec("ZSTD(3)")] + combination_hash: Annotated[ + list[UInt64], + ClickHouseMaterialized("arrayMap(kv -> cityHash64(kv.1, kv.2), JSONExtractKeysAndValuesRaw(toString(log_blob)))"), + ClickHouseCodec("ZSTD(1)") + ] + + +materialized_test_model = IngestPipeline[MaterializedTest]("MaterializedTest", IngestPipelineConfig( + ingest_api=True, + stream=True, + table=True, + dead_letter_queue=True +)) diff --git a/templates/typescript-tests/src/ingest/models.ts b/templates/typescript-tests/src/ingest/models.ts index 7dc1e25332..a2c9497a44 100644 --- a/templates/typescript-tests/src/ingest/models.ts +++ b/templates/typescript-tests/src/ingest/models.ts @@ -21,6 +21,7 @@ import { ClickHouseByteSize, ClickHouseJson, Int64, + ClickHouseCodec, } from "@514labs/moose-lib"; /** @@ -684,3 +685,48 @@ export const dateTimePrecisionOutputStream = new Stream("DateTimePrecisionOutput", { destination: DateTimePrecisionOutputTable, }); + +// =======Codec Compression Test======= +export interface CodecTest { + id: Key; + timestamp: DateTime & ClickHouseCodec<"Delta, LZ4">; + log_blob: Record & ClickHouseCodec<"ZSTD(3)">; + combination_hash: UInt64[] & ClickHouseCodec<"ZSTD(1)">; + temperature: number & ClickHouseCodec<"Gorilla, ZSTD(3)">; + request_count: number & ClickHouseCodec<"DoubleDelta, LZ4">; + user_agent: string & ClickHouseCodec<"ZSTD(3)">; + tags: string[] & ClickHouseCodec<"LZ4">; + status_code: number; +} + +export const CodecTestPipeline = new IngestPipeline("CodecTest", { + table: true, + stream: true, + ingestApi: true, +}); + +// =======Materialized Columns Test======= +import { ClickHouseMaterialized } from "@514labs/moose-lib"; + +export interface MaterializedTest { + id: Key; + timestamp: DateTime; + userId: string; + eventDate: string & + typia.tags.Format<"date"> & + ClickHouseMaterialized<"toDate(timestamp)">; + userHash: UInt64 & ClickHouseMaterialized<"cityHash64(userId)">; + log_blob: Record & ClickHouseCodec<"ZSTD(3)">; + combinationHash: UInt64[] & + ClickHouseMaterialized<"arrayMap(kv -> cityHash64(kv.1, kv.2), JSONExtractKeysAndValuesRaw(toString(log_blob)))"> & + ClickHouseCodec<"ZSTD(1)">; +} + +export const MaterializedTestPipeline = new IngestPipeline( + "MaterializedTest", + { + table: true, + stream: true, + ingestApi: true, + }, +);