From 2ee80d44ae13c1b10b81c0087b5dc9c06d97b8ec Mon Sep 17 00:00:00 2001 From: aviruthen <91846056+aviruthen@users.noreply.github.com> Date: Mon, 6 Apr 2026 16:26:02 -0400 Subject: [PATCH] feature: Feat: Feature Store in Sagemaker SDK v3 (5496) --- docs/sagemaker_core/index.rst | 40 +++ migration.md | 160 +++++++++++ sagemaker-mlops/README.md | 45 +++ .../src/sagemaker/mlops/__init__.py | 10 +- .../feature_store/test_feature_store_init.py | 256 ++++++++++++++++++ 5 files changed, 510 insertions(+), 1 deletion(-) create mode 100644 sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_store_init.py diff --git a/docs/sagemaker_core/index.rst b/docs/sagemaker_core/index.rst index 899156eef4..f5ee731c1e 100644 --- a/docs/sagemaker_core/index.rst +++ b/docs/sagemaker_core/index.rst @@ -151,6 +151,46 @@ Key Core Features * **Monitoring Integration** - Built-in support for CloudWatch metrics, logging, and resource status tracking * **Error Handling** - Comprehensive error handling with detailed feedback for troubleshooting and debugging +Feature Store +~~~~~~~~~~~~~ + +SageMaker Core provides the foundational ``FeatureGroup`` resource class used by the Feature Store module. +For full Feature Store functionality in V3 — including DataFrame ingestion, Athena queries, dataset building, +and feature definitions — use the ``sagemaker.mlops.feature_store`` package: + +.. code-block:: python + + from sagemaker.mlops.feature_store import ( + FeatureGroup, + OnlineStoreConfig, + OfflineStoreConfig, + S3StorageConfig, + load_feature_definitions_from_dataframe, + ingest_dataframe, + create_athena_query, + DatasetBuilder, + ) + + # Create a feature group + feature_defs = load_feature_definitions_from_dataframe(df) + FeatureGroup.create( + feature_group_name="my-feature-group", + feature_definitions=feature_defs, + record_identifier_feature_name="id", + event_time_feature_name="timestamp", + role_arn=role, + online_store_config=OnlineStoreConfig(enable_online_store=True), + ) + + # Ingest data from a DataFrame + ingest_dataframe(feature_group_name="my-feature-group", data_frame=df, max_workers=4) + +.. note:: + + If you are migrating from V2 (``sagemaker.feature_store``), see the + `Feature Store Migration Guide `_ + for detailed V2-to-V3 migration instructions. + Supported Core Scenarios ------------------------ diff --git a/migration.md b/migration.md index a7f353e747..aa567db376 100644 --- a/migration.md +++ b/migration.md @@ -418,6 +418,139 @@ pipeline = Pipeline( ) ``` +### 5. Feature Store + +Feature Store is fully supported in V3 under the `sagemaker.mlops.feature_store` namespace. The V2 `sagemaker.feature_store` module has been reorganized — FeatureGroup and shapes come from `sagemaker-core`, while utility functions, ingestion, Athena queries, and dataset building are provided by the `sagemaker-mlops` package. + +> **Detailed Migration Guide:** For comprehensive V2-to-V3 Feature Store migration instructions, see [`sagemaker-mlops/src/sagemaker/mlops/feature_store/MIGRATION_GUIDE.md`](sagemaker-mlops/src/sagemaker/mlops/feature_store/MIGRATION_GUIDE.md). + +**Import Changes:** + +```python +# V2 imports +from sagemaker.feature_store.feature_group import FeatureGroup # ❌ +from sagemaker.feature_store.inputs import FeatureValue # ❌ +from sagemaker.session import Session # ❌ + +# V3 imports — everything from one place +from sagemaker.mlops.feature_store import FeatureGroup # ✅ +from sagemaker.mlops.feature_store import FeatureValue # ✅ +from sagemaker.mlops.feature_store import ingest_dataframe # ✅ +``` + +**Create a FeatureGroup:** + +**V2:** + +```python +from sagemaker.feature_store.feature_group import FeatureGroup +from sagemaker.session import Session + +session = Session() +fg = FeatureGroup(name="my-fg", sagemaker_session=session) +fg.load_feature_definitions(data_frame=df) +fg.create( + s3_uri="s3://bucket/prefix", + record_identifier_name="id", + event_time_feature_name="ts", + role_arn=role, + enable_online_store=True, +) +``` + +**V3:** + +```python +from sagemaker.mlops.feature_store import ( + FeatureGroup, + OnlineStoreConfig, + OfflineStoreConfig, + S3StorageConfig, + load_feature_definitions_from_dataframe, +) + +feature_defs = load_feature_definitions_from_dataframe(df) + +FeatureGroup.create( + feature_group_name="my-fg", + feature_definitions=feature_defs, + record_identifier_feature_name="id", + event_time_feature_name="ts", + role_arn=role, + online_store_config=OnlineStoreConfig(enable_online_store=True), + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri="s3://bucket/prefix") + ), +) +``` + +**Record Operations (Put/Get/Delete):** + +**V2:** + +```python +from sagemaker.feature_store.inputs import FeatureValue + +fg.put_record(record=[FeatureValue(feature_name="id", value_as_string="123")]) +response = fg.get_record(record_identifier_value_as_string="123") +fg.delete_record(record_identifier_value_as_string="123", event_time="2024-01-15T00:00:00Z") +``` + +**V3:** + +```python +from sagemaker.mlops.feature_store import FeatureGroup, FeatureValue + +fg = FeatureGroup(feature_group_name="my-fg") +fg.put_record(record=[FeatureValue(feature_name="id", value_as_string="123")]) +response = fg.get_record(record_identifier_value_as_string="123") +fg.delete_record(record_identifier_value_as_string="123", event_time="2024-01-15T00:00:00Z") +``` + +**DataFrame Ingestion:** + +**V2:** + +```python +fg.ingest(data_frame=df, max_workers=4, max_processes=2, wait=True) +``` + +**V3:** + +```python +from sagemaker.mlops.feature_store import ingest_dataframe + +manager = ingest_dataframe( + feature_group_name="my-fg", + data_frame=df, + max_workers=4, + max_processes=2, + wait=True, +) +``` + +**Athena Queries:** + +**V2:** + +```python +query = fg.athena_query() +query.run(query_string="SELECT * FROM ...", output_location="s3://...") +query.wait() +df = query.as_dataframe() +``` + +**V3:** + +```python +from sagemaker.mlops.feature_store import create_athena_query + +query = create_athena_query("my-fg", session) +query.run(query_string="SELECT * FROM ...", output_location="s3://...") +query.wait() +df = query.as_dataframe() +``` + ## Feature Mapping ### Training Features @@ -450,6 +583,27 @@ pipeline = Pipeline( | ScriptProcessor | ProcessingJob | Script-based processing | | FrameworkProcessor | ProcessingJob | Framework-specific processing | +### Feature Store Features + +| V2 Feature | V3 Equivalent | Notes | +|------------|---------------|-------| +| `sagemaker.feature_store.feature_group.FeatureGroup` | `sagemaker.mlops.feature_store.FeatureGroup` | Re-exported from sagemaker-core | +| `FeatureGroup(name=..., sagemaker_session=...)` | `FeatureGroup(feature_group_name=...)` | Session managed internally by core | +| `fg.create(s3_uri=..., enable_online_store=...)` | `FeatureGroup.create(online_store_config=..., offline_store_config=...)` | Structured config objects | +| `fg.describe()` | `FeatureGroup.get(feature_group_name=...)` | Returns typed object | +| `fg.delete()` | `FeatureGroup(feature_group_name=...).delete()` | Same pattern | +| `fg.put_record(record=...)` | `FeatureGroup(feature_group_name=...).put_record(record=...)` | FeatureValue from core | +| `fg.get_record(...)` | `FeatureGroup(feature_group_name=...).get_record(...)` | Same interface | +| `fg.delete_record(...)` | `FeatureGroup(feature_group_name=...).delete_record(...)` | Use strings not enums | +| `fg.ingest(data_frame=df)` | `ingest_dataframe(feature_group_name=..., data_frame=df)` | Standalone function | +| `fg.athena_query()` | `create_athena_query(feature_group_name, session)` | Standalone function | +| `fg.as_hive_ddl()` | `as_hive_ddl(feature_group_name)` | Standalone function | +| `fg.load_feature_definitions(df)` | `load_feature_definitions_from_dataframe(df)` | Returns list, no mutation | +| `FeatureStore(session).search(...)` | `FeatureStore.search(resource=..., search_expression=...)` | Core resource class | +| `FeatureStore.create_dataset(...)` | `DatasetBuilder.create(...)` | Dataclass-based builder | +| Config shapes with `to_dict()` | Pydantic shapes (auto-serialization) | No manual serialization needed | +| `TargetStoreEnum.ONLINE_STORE.value` | `"OnlineStore"` (plain strings) | Enums available but strings preferred | + ## Functionality Level Mapping ### Training @@ -766,6 +920,12 @@ from sagemaker.train import ModelTrainer # ✅ from sagemaker.model import Model # ❌ from sagemaker.serve import ModelBuilder # ✅ + +from sagemaker.feature_store.feature_group import FeatureGroup # ❌ +from sagemaker.mlops.feature_store import FeatureGroup # ✅ + +from sagemaker.feature_store.inputs import FeatureValue # ❌ +from sagemaker.mlops.feature_store import FeatureValue # ✅ ``` ### 2. Parameter Mapping diff --git a/sagemaker-mlops/README.md b/sagemaker-mlops/README.md index eaffdc8adb..095745f4d9 100644 --- a/sagemaker-mlops/README.md +++ b/sagemaker-mlops/README.md @@ -77,6 +77,51 @@ The following files were moved from `sagemaker-core/src/sagemaker/core/workflow/ - `retry.py` - Retry policies - `selective_execution_config.py` - Selective execution settings +### Feature Store + +The Feature Store module (`sagemaker.mlops.feature_store`) provides comprehensive support for Amazon SageMaker Feature Store operations. This is the V3 equivalent of the V2 `sagemaker.feature_store` module. + +**Key Modules:** + +- `__init__.py` - Re-exports all Feature Store components from a single entry point +- `feature_definition.py` - Feature definition helpers (FractionalFeatureDefinition, IntegralFeatureDefinition, etc.) +- `feature_utils.py` - Utility functions (ingest_dataframe, create_athena_query, as_hive_ddl, etc.) +- `ingestion_manager_pandas.py` - Multi-threaded DataFrame ingestion manager +- `athena_query.py` - Athena query execution and result retrieval +- `dataset_builder.py` - Dataset building with point-in-time joins across feature groups +- `inputs.py` - Enums for Feature Store operations (TargetStoreEnum, DeletionModeEnum, etc.) +- `feature_processor/` - Feature processor for PySpark-based transformations + +**Quick Start:** + +```python +from sagemaker.mlops.feature_store import ( + FeatureGroup, + OnlineStoreConfig, + OfflineStoreConfig, + S3StorageConfig, + load_feature_definitions_from_dataframe, + ingest_dataframe, + create_athena_query, +) + +# Create a feature group +feature_defs = load_feature_definitions_from_dataframe(df) +FeatureGroup.create( + feature_group_name="my-feature-group", + feature_definitions=feature_defs, + record_identifier_feature_name="id", + event_time_feature_name="timestamp", + role_arn=role, + online_store_config=OnlineStoreConfig(enable_online_store=True), +) + +# Ingest data +ingest_dataframe(feature_group_name="my-feature-group", data_frame=df, max_workers=4) +``` + +> **Migrating from V2?** See the detailed [Feature Store Migration Guide](src/sagemaker/mlops/feature_store/MIGRATION_GUIDE.md) for V2-to-V3 migration instructions. + ### Model Building ModelBuilder is now located in the `sagemaker-serve` package but is re-exported from MLOps for convenience. diff --git a/sagemaker-mlops/src/sagemaker/mlops/__init__.py b/sagemaker-mlops/src/sagemaker/mlops/__init__.py index 18527db318..acf633e16b 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/__init__.py +++ b/sagemaker-mlops/src/sagemaker/mlops/__init__.py @@ -1,7 +1,8 @@ """SageMaker MLOps package for workflow orchestration and model building. This package provides high-level orchestration capabilities for SageMaker workflows, -including pipeline definitions, step implementations, and model building utilities. +including pipeline definitions, step implementations, model building utilities, +and Feature Store operations. The MLOps package sits at the top of the dependency hierarchy and can import from: - sagemaker.core (foundation primitives) @@ -11,10 +12,12 @@ Key components: - workflow: Pipeline and step orchestration - model_builder: Model building and orchestration +- feature_store: Feature Store operations (FeatureGroup, ingestion, Athena queries) Example usage: from sagemaker.mlops import ModelBuilder from sagemaker.mlops.workflow import Pipeline, TrainingStep + from sagemaker.mlops.feature_store import FeatureGroup, ingest_dataframe """ from __future__ import absolute_import @@ -27,7 +30,12 @@ # from sagemaker.mlops import workflow # from sagemaker.mlops.workflow import Pipeline, TrainingStep, etc. +# Feature Store submodule is available via: +# from sagemaker.mlops import feature_store +# from sagemaker.mlops.feature_store import FeatureGroup, ingest_dataframe, etc. + __all__ = [ "ModelBuilder", "workflow", # Submodule + "feature_store", # Submodule - Feature Store operations ] diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_store_init.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_store_init.py new file mode 100644 index 0000000000..e7da8956b7 --- /dev/null +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/test_feature_store_init.py @@ -0,0 +1,256 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 +"""Unit tests to verify Feature Store module discoverability and imports.""" +import pytest + + +class TestFeatureStoreResourceImports: + """Test that core resources are importable from feature_store.""" + + def test_feature_group_importable(self): + from sagemaker.mlops.feature_store import FeatureGroup + assert FeatureGroup is not None + + def test_feature_metadata_importable(self): + from sagemaker.mlops.feature_store import FeatureMetadata + assert FeatureMetadata is not None + + +class TestFeatureStoreShapesImports: + """Test that core shapes are importable from feature_store.""" + + def test_online_store_config_importable(self): + from sagemaker.mlops.feature_store import OnlineStoreConfig + assert OnlineStoreConfig is not None + + def test_offline_store_config_importable(self): + from sagemaker.mlops.feature_store import OfflineStoreConfig + assert OfflineStoreConfig is not None + + def test_s3_storage_config_importable(self): + from sagemaker.mlops.feature_store import S3StorageConfig + assert S3StorageConfig is not None + + def test_ttl_duration_importable(self): + from sagemaker.mlops.feature_store import TtlDuration + assert TtlDuration is not None + + def test_feature_value_importable(self): + from sagemaker.mlops.feature_store import FeatureValue + assert FeatureValue is not None + + def test_feature_parameter_importable(self): + from sagemaker.mlops.feature_store import FeatureParameter + assert FeatureParameter is not None + + def test_filter_importable(self): + from sagemaker.mlops.feature_store import Filter + assert Filter is not None + + def test_search_expression_importable(self): + from sagemaker.mlops.feature_store import SearchExpression + assert SearchExpression is not None + + def test_throughput_config_importable(self): + from sagemaker.mlops.feature_store import ThroughputConfig + assert ThroughputConfig is not None + + def test_data_catalog_config_importable(self): + from sagemaker.mlops.feature_store import DataCatalogConfig + assert DataCatalogConfig is not None + + def test_online_store_security_config_importable(self): + from sagemaker.mlops.feature_store import OnlineStoreSecurityConfig + assert OnlineStoreSecurityConfig is not None + + +class TestFeatureStoreEnumImports: + """Test that all enums are importable from feature_store.""" + + def test_target_store_enum_importable(self): + from sagemaker.mlops.feature_store import TargetStoreEnum + assert TargetStoreEnum is not None + + def test_deletion_mode_enum_importable(self): + from sagemaker.mlops.feature_store import DeletionModeEnum + assert DeletionModeEnum is not None + + def test_throughput_mode_enum_importable(self): + from sagemaker.mlops.feature_store import ThroughputModeEnum + assert ThroughputModeEnum is not None + + def test_table_format_enum_importable(self): + from sagemaker.mlops.feature_store import TableFormatEnum + assert TableFormatEnum is not None + + def test_online_store_storage_type_enum_importable(self): + from sagemaker.mlops.feature_store import OnlineStoreStorageTypeEnum + assert OnlineStoreStorageTypeEnum is not None + + def test_filter_operator_enum_importable(self): + from sagemaker.mlops.feature_store import FilterOperatorEnum + assert FilterOperatorEnum is not None + + def test_resource_enum_importable(self): + from sagemaker.mlops.feature_store import ResourceEnum + assert ResourceEnum is not None + + def test_search_operator_enum_importable(self): + from sagemaker.mlops.feature_store import SearchOperatorEnum + assert SearchOperatorEnum is not None + + def test_sort_order_enum_importable(self): + from sagemaker.mlops.feature_store import SortOrderEnum + assert SortOrderEnum is not None + + def test_expiration_time_response_enum_importable(self): + from sagemaker.mlops.feature_store import ExpirationTimeResponseEnum + assert ExpirationTimeResponseEnum is not None + + +class TestFeatureStoreUtilityFunctionImports: + """Test that utility functions are importable from feature_store.""" + + def test_ingest_dataframe_importable(self): + from sagemaker.mlops.feature_store import ingest_dataframe + assert callable(ingest_dataframe) + + def test_create_athena_query_importable(self): + from sagemaker.mlops.feature_store import create_athena_query + assert callable(create_athena_query) + + def test_as_hive_ddl_importable(self): + from sagemaker.mlops.feature_store import as_hive_ddl + assert callable(as_hive_ddl) + + def test_load_feature_definitions_from_dataframe_importable(self): + from sagemaker.mlops.feature_store import load_feature_definitions_from_dataframe + assert callable(load_feature_definitions_from_dataframe) + + def test_get_session_from_role_importable(self): + from sagemaker.mlops.feature_store import get_session_from_role + assert callable(get_session_from_role) + + +class TestFeatureStoreClassImports: + """Test that classes are importable from feature_store.""" + + def test_athena_query_importable(self): + from sagemaker.mlops.feature_store import AthenaQuery + assert AthenaQuery is not None + + def test_dataset_builder_importable(self): + from sagemaker.mlops.feature_store import DatasetBuilder + assert DatasetBuilder is not None + + def test_ingestion_manager_pandas_importable(self): + from sagemaker.mlops.feature_store import IngestionManagerPandas + assert IngestionManagerPandas is not None + + def test_ingestion_error_importable(self): + from sagemaker.mlops.feature_store import IngestionError + assert IngestionError is not None + + def test_feature_group_to_be_merged_importable(self): + from sagemaker.mlops.feature_store import FeatureGroupToBeMerged + assert FeatureGroupToBeMerged is not None + + def test_join_type_enum_importable(self): + from sagemaker.mlops.feature_store import JoinTypeEnum + assert JoinTypeEnum is not None + + def test_join_comparator_enum_importable(self): + from sagemaker.mlops.feature_store import JoinComparatorEnum + assert JoinComparatorEnum is not None + + def test_table_type_importable(self): + from sagemaker.mlops.feature_store import TableType + assert TableType is not None + + +class TestFeatureDefinitionHelperImports: + """Test that feature definition helpers are importable from feature_store.""" + + def test_feature_definition_importable(self): + from sagemaker.mlops.feature_store import FeatureDefinition + assert FeatureDefinition is not None + + def test_feature_type_enum_importable(self): + from sagemaker.mlops.feature_store import FeatureTypeEnum + assert FeatureTypeEnum is not None + + def test_collection_type_enum_importable(self): + from sagemaker.mlops.feature_store import CollectionTypeEnum + assert CollectionTypeEnum is not None + + def test_fractional_feature_definition_importable(self): + from sagemaker.mlops.feature_store import FractionalFeatureDefinition + assert callable(FractionalFeatureDefinition) + + def test_integral_feature_definition_importable(self): + from sagemaker.mlops.feature_store import IntegralFeatureDefinition + assert callable(IntegralFeatureDefinition) + + def test_string_feature_definition_importable(self): + from sagemaker.mlops.feature_store import StringFeatureDefinition + assert callable(StringFeatureDefinition) + + def test_list_collection_type_importable(self): + from sagemaker.mlops.feature_store import ListCollectionType + assert ListCollectionType is not None + + def test_set_collection_type_importable(self): + from sagemaker.mlops.feature_store import SetCollectionType + assert SetCollectionType is not None + + def test_vector_collection_type_importable(self): + from sagemaker.mlops.feature_store import VectorCollectionType + assert VectorCollectionType is not None + + +class TestAllExportsMatchAllList: + """Test that all names in __all__ are actually importable.""" + + def test_all_exports_are_importable(self): + import sagemaker.mlops.feature_store as fs_module + all_names = fs_module.__all__ + assert len(all_names) > 0, "__all__ should not be empty" + for name in all_names: + assert hasattr(fs_module, name), ( + f"{name} is listed in __all__ but not importable from " + f"sagemaker.mlops.feature_store" + ) + + def test_no_extra_public_names_missing_from_all(self): + """Verify key public names are in __all__.""" + import sagemaker.mlops.feature_store as fs_module + all_names = set(fs_module.__all__) + # Check a representative set of important names + expected_names = { + "FeatureGroup", + "FeatureMetadata", + "FeatureValue", + "OnlineStoreConfig", + "OfflineStoreConfig", + "ingest_dataframe", + "create_athena_query", + "AthenaQuery", + "DatasetBuilder", + "IngestionManagerPandas", + "FeatureDefinition", + } + for name in expected_names: + assert name in all_names, ( + f"{name} should be in __all__ but is missing" + ) + + +class TestMlopsInitIncludesFeatureStore: + """Test that feature_store is listed in sagemaker.mlops.__all__.""" + + def test_feature_store_in_mlops_all(self): + import sagemaker.mlops as mlops_module + assert "feature_store" in mlops_module.__all__, ( + "feature_store should be listed in sagemaker.mlops.__all__ " + "for discoverability" + )