Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions sagemaker-core/src/sagemaker/core/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@
)
from sagemaker.core.local.local_session import LocalSession
from sagemaker.core.helper.session_helper import Session
from sagemaker.core.shapes import ProcessingInput, ProcessingOutput, ProcessingS3Input
from sagemaker.core.shapes import (
ProcessingInput,
ProcessingOutput,
ProcessingS3Input,
ProcessingS3Output,
)
from sagemaker.core.resources import ProcessingJob
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.common_utils import (
Expand Down Expand Up @@ -86,6 +91,11 @@

logger = logging.getLogger(__name__)

# Default values used when creating a ProcessingS3Output for outputs
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: DEFAULT_S3_UPLOAD_MODE should use the enum or constant from the shapes/API model if one exists, rather than a raw string "EndOfJob". This avoids drift if the API model changes the accepted values.

# that don't have an explicit s3_output configured.
DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH = "/opt/ml/processing/output"
DEFAULT_S3_UPLOAD_MODE = "EndOfJob"


class Processor(object):
"""Handles Amazon SageMaker Processing tasks."""
Expand Down Expand Up @@ -483,13 +493,25 @@ def _normalize_outputs(self, outputs=None):
# Generate a name for the ProcessingOutput if it doesn't have one.
if output.output_name is None:
output.output_name = "output-{}".format(count)
# If s3_output is None, create a default one with None s3_uri.
# The s3_uri will be auto-generated below based on job/pipeline context.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default local_path and s3_upload_mode values are hardcoded here. Consider whether these should match whatever defaults the user might expect from the ProcessingS3Output model itself. If ProcessingS3Output already has defaults for local_path and s3_upload_mode, you could simplify:

output.s3_output = ProcessingS3Output(
    s3_upload_mode=DEFAULT_S3_UPLOAD_MODE,
    local_path=DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH,
)

Also, the local_path on the auto-created ProcessingS3Output may conflict with the user's actual ProcessingOutput.app_managed or other output configurations. Is /opt/ml/processing/output always the correct default?

if output.s3_output is None:
output.s3_output = ProcessingS3Output(
Comment thread
aviruthen marked this conversation as resolved.
s3_uri=None,
local_path=DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH,
s3_upload_mode=DEFAULT_S3_UPLOAD_MODE,
)
# is_pipeline_variable handles None gracefully (returns False)
if output.s3_output and is_pipeline_variable(output.s3_output.s3_uri):
Comment thread
aviruthen marked this conversation as resolved.
normalized_outputs.append(output)
continue
# If the output's s3_uri is not an s3_uri, create one.
parse_result = urlparse(output.s3_output.s3_uri)
if parse_result.scheme != "s3":
if getattr(self.sagemaker_session, "local_mode", False) and parse_result.scheme == "file":
# If the output's s3_uri is None or not an s3_uri, create one.
if output.s3_output.s3_uri is None:
parse_result_scheme = ""
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line length likely exceeds 100 characters:

                    if getattr(self.sagemaker_session, "local_mode", False) and parse_result_scheme == "file":

This should be wrapped to stay within the 100-character limit.

else:
parse_result_scheme = urlparse(output.s3_output.s3_uri).scheme
if parse_result_scheme != "s3":
if getattr(self.sagemaker_session, "local_mode", False) and parse_result_scheme == "file":
normalized_outputs.append(output)
continue
if _pipeline_config:
Expand Down Expand Up @@ -1421,11 +1443,17 @@ def _processing_output_to_request_dict(processing_output):
}

if processing_output.s3_output:
request_dict["S3Output"] = {
"S3Uri": processing_output.s3_output.s3_uri,
s3_output_dict = {
"LocalPath": processing_output.s3_output.local_path,
"S3UploadMode": processing_output.s3_output.s3_upload_mode,
}
# After _normalize_outputs, s3_uri should always be populated.
# If it is still None at serialization time, omit S3Uri from the dict
# rather than sending None to the API. This is a defensive guard;
# _normalize_outputs is expected to fill in s3_uri before we reach here.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defensive guard to omit S3Uri when it's None is good, but the comment says "_normalize_outputs is expected to fill in s3_uri before we reach here." If that's the case, consider adding a logger.warning() when s3_uri is None at this point, since it indicates an unexpected code path. Silent omission could make debugging harder:

if processing_output.s3_output.s3_uri is not None:
    s3_output_dict["S3Uri"] = processing_output.s3_output.s3_uri
else:
    logger.warning(
        "s3_uri is None for output '%s' at serialization time; "
        "expected _normalize_outputs to have populated it.",
        processing_output.output_name,
    )

if processing_output.s3_output.s3_uri is not None:
s3_output_dict["S3Uri"] = processing_output.s3_output.s3_uri
request_dict["S3Output"] = s3_output_dict

Comment thread
aviruthen marked this conversation as resolved.
return request_dict

Expand Down
4 changes: 2 additions & 2 deletions sagemaker-core/src/sagemaker/core/shapes/shapes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10370,12 +10370,12 @@ class ProcessingS3Output(Base):

Attributes
----------------------
s3_uri: A URI that identifies the Amazon S3 bucket where you want Amazon SageMaker to save the results of a processing job.
s3_uri: A URI that identifies the Amazon S3 bucket where you want Amazon SageMaker to save the results of a processing job. When set to None, the SDK auto-generates an S3 path based on the job name and output name.
local_path: The local path of a directory where you want Amazon SageMaker to upload its contents to Amazon S3. LocalPath is an absolute path to a directory containing output files. This directory will be created by the platform and exist when your container's entrypoint is invoked.
s3_upload_mode: Whether to upload the results of the processing job continuously or after the job completes.
"""

s3_uri: StrPipeVar
s3_uri: Optional[StrPipeVar] = None
s3_upload_mode: StrPipeVar
local_path: Optional[StrPipeVar] = Unassigned()

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical concern: The shapes.py file in sagemaker-core is auto-generated from API definitions (as noted in the SDK architecture: "auto-generated from API definitions"). Manual edits to this file will be overwritten the next time the code generator runs.

The correct fix should be made in the shapes definition/generation configuration (e.g., a JSON/YAML model or codegen template) so that s3_uri is generated as Optional. If this is truly a bug in the API model where the SageMaker API actually accepts S3Uri as optional for ProcessingS3Output, the codegen input should be updated.

Please confirm whether this file is safe to edit manually or if the change needs to go into the code generation source.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field ordering changed here: s3_uri (now optional with default) is listed before s3_upload_mode (required). In Pydantic, required fields must come before optional fields with defaults in the class body, or you'll get a validation error. Since s3_upload_mode is StrPipeVar (required, no default) and s3_uri now has = None, this ordering will cause a Pydantic error.

You need to reorder so that s3_upload_mode comes before s3_uri:

s3_upload_mode: StrPipeVar
s3_uri: Optional[StrPipeVar] = None
local_path: Optional[StrPipeVar] = Unassigned()

Alternatively, if using from __future__ import annotations or Pydantic v2 with appropriate config, this may not be an issue — but please verify.

Expand Down
153 changes: 153 additions & 0 deletions sagemaker-core/tests/unit/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,159 @@ def test_multiple_outputs_with_s3_uris_preserved(self, session_local_mode_false)
assert result[1].s3_output.s3_uri == "s3://my-bucket/second"


class TestProcessingS3OutputOptionalS3Uri:
"""Tests for ProcessingS3Output with optional s3_uri (issue #5559)."""

def test_processing_s3_output_with_none_s3_uri_creates_successfully(self):
"""Verify ProcessingS3Output can be created with s3_uri=None."""
s3_output = ProcessingS3Output(
s3_uri=None,
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
Comment thread
aviruthen marked this conversation as resolved.
assert s3_output.s3_uri is None
assert s3_output.local_path == "/opt/ml/processing/output"
assert s3_output.s3_upload_mode == "EndOfJob"

def test_processing_s3_output_without_s3_uri_param_creates_successfully(self):
"""Verify ProcessingS3Output works with default None for s3_uri."""
s3_output = ProcessingS3Output(
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
assert s3_output.s3_uri is None

def test_processing_s3_output_with_explicit_s3_uri_preserves_value(self):
"""Regression test: explicit s3_uri string is preserved in the model."""
s3_output = ProcessingS3Output(
s3_uri="s3://my-bucket/my-output",
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
assert s3_output.s3_uri == "s3://my-bucket/my-output"
assert s3_output.local_path == "/opt/ml/processing/output"
assert s3_output.s3_upload_mode == "EndOfJob"

def test_normalize_outputs_with_none_s3_uri_generates_s3_path(self, mock_session):
"""When s3_uri is None, _normalize_outputs should auto-generate an S3 URI."""
Comment thread
aviruthen marked this conversation as resolved.
processor = Processor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
image_uri="test-image:latest",
instance_count=1,
instance_type="ml.m5.xlarge",
sagemaker_session=mock_session,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good test coverage! However, the mock_session fixture is used but I don't see it defined in this diff. Please confirm it's defined elsewhere in the test file (e.g., as a conftest.py fixture or earlier in the file). If it's the fixture from the existing test class, note that these tests are in a new class and pytest fixtures defined inside another class won't be inherited.

)
processor._current_job_name = "test-job"

s3_output = ProcessingS3Output(
s3_uri=None,
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)]

with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
result = processor._normalize_outputs(outputs)

assert len(result) == 1
assert result[0].s3_output.s3_uri is not None
assert result[0].s3_output.s3_uri.startswith("s3://")
assert "test-job" in result[0].s3_output.s3_uri
assert "my-output" in result[0].s3_output.s3_uri

def test_normalize_outputs_with_none_s3_uri_and_pipeline_config_generates_join(self, mock_session):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line exceeds 100 characters. Please wrap this line:

    def test_normalize_outputs_with_none_s3_uri_and_pipeline_config_generates_join(
        self, mock_session
    ):

"""When in pipeline context with s3_uri=None, should generate a Join expression."""
processor = Processor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
image_uri="test-image:latest",
instance_count=1,
instance_type="ml.m5.xlarge",
sagemaker_session=mock_session,
)
processor._current_job_name = "test-job"

s3_output = ProcessingS3Output(
Comment thread
aviruthen marked this conversation as resolved.
s3_uri=None,
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)]

with patch("sagemaker.core.workflow.utilities._pipeline_config") as mock_config:
mock_config.pipeline_name = "test-pipeline"
mock_config.step_name = "test-step"
result = processor._normalize_outputs(outputs)

assert len(result) == 1
# In pipeline context, the s3_uri should be a Join object
from sagemaker.core.workflow.functions import Join
assert isinstance(result[0].s3_output.s3_uri, Join)

def test_normalize_outputs_with_none_s3_output_generates_s3_path(self, mock_session):
"""When s3_output is None, _normalize_outputs should create s3_output and auto-generate URI."""
processor = Processor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import of Join is inside the test function body. While this works, it's better practice to put imports at the top of the file. If Join is only needed for this assertion, consider using hasattr or checking the type name string instead, or move the import to the top of the file with the other imports.

image_uri="test-image:latest",
instance_count=1,
instance_type="ml.m5.xlarge",
sagemaker_session=mock_session,
)
processor._current_job_name = "test-job"

outputs = [ProcessingOutput(output_name="my-output")]

with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
result = processor._normalize_outputs(outputs)

assert len(result) == 1
assert result[0].s3_output is not None
assert result[0].s3_output.s3_uri is not None
assert result[0].s3_output.s3_uri.startswith("s3://")
assert result[0].s3_output.local_path == "/opt/ml/processing/output"
assert result[0].s3_output.s3_upload_mode == "EndOfJob"

def test_processing_output_to_request_dict_with_none_s3_uri_omits_key(self):
"""When s3_uri is None, S3Uri should be omitted from the request dict."""
s3_output = ProcessingS3Output(
s3_uri=None,
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
processing_output = ProcessingOutput(output_name="results", s3_output=s3_output)

result = _processing_output_to_request_dict(processing_output)

assert result["OutputName"] == "results"
assert "S3Output" in result
assert "S3Uri" not in result["S3Output"]
assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output"
assert result["S3Output"]["S3UploadMode"] == "EndOfJob"

def test_normalize_outputs_with_explicit_s3_uri_unchanged(self, mock_session):
"""Regression test: explicit s3:// URIs should be preserved."""
processor = Processor(
role="arn:aws:iam::123456789012:role/SageMakerRole",
image_uri="test-image:latest",
instance_count=1,
instance_type="ml.m5.xlarge",
sagemaker_session=mock_session,
)
processor._current_job_name = "test-job"

s3_output = ProcessingS3Output(
s3_uri="s3://my-bucket/my-output",
local_path="/opt/ml/processing/output",
s3_upload_mode="EndOfJob",
)
outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)]

with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
result = processor._normalize_outputs(outputs)

assert len(result) == 1
assert result[0].s3_output.s3_uri == "s3://my-bucket/my-output"


class TestProcessorStartNew:
def test_start_new_with_pipeline_session(self, mock_session):
from sagemaker.core.workflow.pipeline_context import PipelineSession
Expand Down
Loading