diff --git a/sagemaker-core/src/sagemaker/core/processing.py b/sagemaker-core/src/sagemaker/core/processing.py index b507ae1a93..751f8a90f4 100644 --- a/sagemaker-core/src/sagemaker/core/processing.py +++ b/sagemaker-core/src/sagemaker/core/processing.py @@ -51,7 +51,12 @@ ) from sagemaker.core.local.local_session import LocalSession from sagemaker.core.helper.session_helper import Session -from sagemaker.core.shapes import ProcessingInput, ProcessingOutput, ProcessingS3Input +from sagemaker.core.shapes import ( + ProcessingInput, + ProcessingOutput, + ProcessingS3Input, + ProcessingS3Output, +) from sagemaker.core.resources import ProcessingJob from sagemaker.core.workflow.pipeline_context import PipelineSession from sagemaker.core.common_utils import ( @@ -86,6 +91,11 @@ logger = logging.getLogger(__name__) +# Default values used when creating a ProcessingS3Output for outputs +# that don't have an explicit s3_output configured. +DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH = "/opt/ml/processing/output" +DEFAULT_S3_UPLOAD_MODE = "EndOfJob" + class Processor(object): """Handles Amazon SageMaker Processing tasks.""" @@ -483,13 +493,25 @@ def _normalize_outputs(self, outputs=None): # Generate a name for the ProcessingOutput if it doesn't have one. if output.output_name is None: output.output_name = "output-{}".format(count) + # If s3_output is None, create a default one with None s3_uri. + # The s3_uri will be auto-generated below based on job/pipeline context. + if output.s3_output is None: + output.s3_output = ProcessingS3Output( + s3_uri=None, + local_path=DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH, + s3_upload_mode=DEFAULT_S3_UPLOAD_MODE, + ) + # is_pipeline_variable handles None gracefully (returns False) if output.s3_output and is_pipeline_variable(output.s3_output.s3_uri): normalized_outputs.append(output) continue - # If the output's s3_uri is not an s3_uri, create one. - parse_result = urlparse(output.s3_output.s3_uri) - if parse_result.scheme != "s3": - if getattr(self.sagemaker_session, "local_mode", False) and parse_result.scheme == "file": + # If the output's s3_uri is None or not an s3_uri, create one. + if output.s3_output.s3_uri is None: + parse_result_scheme = "" + else: + parse_result_scheme = urlparse(output.s3_output.s3_uri).scheme + if parse_result_scheme != "s3": + if getattr(self.sagemaker_session, "local_mode", False) and parse_result_scheme == "file": normalized_outputs.append(output) continue if _pipeline_config: @@ -1421,11 +1443,17 @@ def _processing_output_to_request_dict(processing_output): } if processing_output.s3_output: - request_dict["S3Output"] = { - "S3Uri": processing_output.s3_output.s3_uri, + s3_output_dict = { "LocalPath": processing_output.s3_output.local_path, "S3UploadMode": processing_output.s3_output.s3_upload_mode, } + # After _normalize_outputs, s3_uri should always be populated. + # If it is still None at serialization time, omit S3Uri from the dict + # rather than sending None to the API. This is a defensive guard; + # _normalize_outputs is expected to fill in s3_uri before we reach here. + if processing_output.s3_output.s3_uri is not None: + s3_output_dict["S3Uri"] = processing_output.s3_output.s3_uri + request_dict["S3Output"] = s3_output_dict return request_dict diff --git a/sagemaker-core/src/sagemaker/core/shapes/shapes.py b/sagemaker-core/src/sagemaker/core/shapes/shapes.py index adbcf6ec67..aabafe34b3 100644 --- a/sagemaker-core/src/sagemaker/core/shapes/shapes.py +++ b/sagemaker-core/src/sagemaker/core/shapes/shapes.py @@ -10370,12 +10370,12 @@ class ProcessingS3Output(Base): Attributes ---------------------- - s3_uri: A URI that identifies the Amazon S3 bucket where you want Amazon SageMaker to save the results of a processing job. + s3_uri: A URI that identifies the Amazon S3 bucket where you want Amazon SageMaker to save the results of a processing job. When set to None, the SDK auto-generates an S3 path based on the job name and output name. local_path: The local path of a directory where you want Amazon SageMaker to upload its contents to Amazon S3. LocalPath is an absolute path to a directory containing output files. This directory will be created by the platform and exist when your container's entrypoint is invoked. s3_upload_mode: Whether to upload the results of the processing job continuously or after the job completes. """ - s3_uri: StrPipeVar + s3_uri: Optional[StrPipeVar] = None s3_upload_mode: StrPipeVar local_path: Optional[StrPipeVar] = Unassigned() diff --git a/sagemaker-core/tests/unit/test_processing.py b/sagemaker-core/tests/unit/test_processing.py index dbe8d5f9ef..7ab4bda9de 100644 --- a/sagemaker-core/tests/unit/test_processing.py +++ b/sagemaker-core/tests/unit/test_processing.py @@ -493,6 +493,159 @@ def test_multiple_outputs_with_s3_uris_preserved(self, session_local_mode_false) assert result[1].s3_output.s3_uri == "s3://my-bucket/second" +class TestProcessingS3OutputOptionalS3Uri: + """Tests for ProcessingS3Output with optional s3_uri (issue #5559).""" + + def test_processing_s3_output_with_none_s3_uri_creates_successfully(self): + """Verify ProcessingS3Output can be created with s3_uri=None.""" + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + assert s3_output.s3_uri is None + assert s3_output.local_path == "/opt/ml/processing/output" + assert s3_output.s3_upload_mode == "EndOfJob" + + def test_processing_s3_output_without_s3_uri_param_creates_successfully(self): + """Verify ProcessingS3Output works with default None for s3_uri.""" + s3_output = ProcessingS3Output( + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + assert s3_output.s3_uri is None + + def test_processing_s3_output_with_explicit_s3_uri_preserves_value(self): + """Regression test: explicit s3_uri string is preserved in the model.""" + s3_output = ProcessingS3Output( + s3_uri="s3://my-bucket/my-output", + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + assert s3_output.s3_uri == "s3://my-bucket/my-output" + assert s3_output.local_path == "/opt/ml/processing/output" + assert s3_output.s3_upload_mode == "EndOfJob" + + def test_normalize_outputs_with_none_s3_uri_generates_s3_path(self, mock_session): + """When s3_uri is None, _normalize_outputs should auto-generate an S3 URI.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + assert result[0].s3_output.s3_uri is not None + assert result[0].s3_output.s3_uri.startswith("s3://") + assert "test-job" in result[0].s3_output.s3_uri + assert "my-output" in result[0].s3_output.s3_uri + + def test_normalize_outputs_with_none_s3_uri_and_pipeline_config_generates_join(self, mock_session): + """When in pipeline context with s3_uri=None, should generate a Join expression.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] + + with patch("sagemaker.core.workflow.utilities._pipeline_config") as mock_config: + mock_config.pipeline_name = "test-pipeline" + mock_config.step_name = "test-step" + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + # In pipeline context, the s3_uri should be a Join object + from sagemaker.core.workflow.functions import Join + assert isinstance(result[0].s3_output.s3_uri, Join) + + def test_normalize_outputs_with_none_s3_output_generates_s3_path(self, mock_session): + """When s3_output is None, _normalize_outputs should create s3_output and auto-generate URI.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + outputs = [ProcessingOutput(output_name="my-output")] + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + assert result[0].s3_output is not None + assert result[0].s3_output.s3_uri is not None + assert result[0].s3_output.s3_uri.startswith("s3://") + assert result[0].s3_output.local_path == "/opt/ml/processing/output" + assert result[0].s3_output.s3_upload_mode == "EndOfJob" + + def test_processing_output_to_request_dict_with_none_s3_uri_omits_key(self): + """When s3_uri is None, S3Uri should be omitted from the request dict.""" + s3_output = ProcessingS3Output( + s3_uri=None, + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + processing_output = ProcessingOutput(output_name="results", s3_output=s3_output) + + result = _processing_output_to_request_dict(processing_output) + + assert result["OutputName"] == "results" + assert "S3Output" in result + assert "S3Uri" not in result["S3Output"] + assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output" + assert result["S3Output"]["S3UploadMode"] == "EndOfJob" + + def test_normalize_outputs_with_explicit_s3_uri_unchanged(self, mock_session): + """Regression test: explicit s3:// URIs should be preserved.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + s3_output = ProcessingS3Output( + s3_uri="s3://my-bucket/my-output", + local_path="/opt/ml/processing/output", + s3_upload_mode="EndOfJob", + ) + outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + result = processor._normalize_outputs(outputs) + + assert len(result) == 1 + assert result[0].s3_output.s3_uri == "s3://my-bucket/my-output" + + class TestProcessorStartNew: def test_start_new_with_pipeline_session(self, mock_session): from sagemaker.core.workflow.pipeline_context import PipelineSession