How do I correctly reference the preprocessed output model's S3 URI in a SageMaker Pipeline?

0

Hello Experts!

I'm working on a SageMaker Pipeline poc that uses two custom containers: one for creating a preprocessing model and data, and another for inference to serve the model. I've defined various pipeline parameters and steps, including a data processing step and steps to create and register the model.(Forget inference for now) inference plays a role only when i register the model (I use the inference image URI that has the serve script) Additionally: I've tested both custom containers(model serving with flask and nginx and preprocessing with processing script inside it) locally and it works charm.

The issue arises when defining the step_create_model and register_step. The processor provides a location path to the preprocessing model, but not the specific model.pkg file.

Hardcoding the model's path (i.e., 's3://sagemaker-eu-west-1....../output/preprocessing_model/model.pkg') makes it work. Still, I'd like to dynamically reference the output from the processing step. I tried appending a string to processing_step.properties, but this method isn't supported.

XXXXX ClientError: Failed to invoke sagemaker:CreateModelPackage. Error Details: Cannot find S3 object: sagemaker-pipeline-2023-xx-xx-xx-xx-xx/xxxxx0aa4c/DataProcessing/output/preprocessing_model in bucket sagemaker-eu-west-1-xxxxxxxxxxxxx1. Please check if your S3 object exists and has proper permissions for SageMaker. XXXx

To reiterate, my use case involves creating a pipeline with two models (preprocessing and training). How can I dynamically reference the model's S3 URI, so I don't have to hardcode the path? Given the AWS guidelines, I've ensured not to include any confidential or personal information.

from sagemaker.clarify import BiasConfig, DataConfig, ModelConfig

# Parameters
region = 'eu-west-1'

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-eu-west-1-xxxxx/3xxxxxxx71xx1xxxx/PipelienUsecase/data/processing/input",
)

model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

# for data quality check step
skip_check_data_quality = ParameterBoolean(name="SkipDataQualityCheck", default_value=False)
register_new_baseline_data_quality = ParameterBoolean(name="RegisterNewDataQualityBaseline", default_value=False)
supplied_baseline_statistics_data_quality = ParameterString(name="DataQualitySuppliedStatistics", default_value="")
supplied_baseline_constraints_data_quality = ParameterString(name="DataQualitySuppliedConstraints", default_value="")

# for data bias check step
skip_check_data_bias = ParameterBoolean(name="SkipDataBiasCheck", default_value=False)
register_new_baseline_data_bias = ParameterBoolean(name="RegisterNewDataBiasBaseline", default_value=False)
supplied_baseline_constraints_data_bias = ParameterString(name="DataBiasSuppliedBaselineConstraints", default_value="")

# Setup
sagemaker_session = Session()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "sagemaker/pipeline"
image_uri_processing = "xxxxxxxxx.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-processing-container:latest"
image_uri_inference = "xxxxxxxxx.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-processing-inference-container:latest"
model_package_group_name = "model-monitor-clarify-group-ashlin"

# Data Processing Step
processor = Processor(
    image_uri=image_uri_processing,
    sagemaker_session=pipeline_session,
    role=role,
    instance_count=processing_instance_count,
    instance_type="ml.c4.xlarge"
)

processing_step = ProcessingStep(
    name="DataProcessing",
    processor=processor,
    inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input/')],
    outputs=[ProcessingOutput(output_name="output", source="/opt/ml/processing/output/"),
             ProcessingOutput(output_name="preprocessing_model", source="/opt/ml/processing/model")]
)
...
...
...
sagemaker_model = Model(
    model_data = 's3://sagemaker-eu-west-1....../output/preprocessing_model/model.tar.gz',<-Hardcoded and it works as the processing job creates the model
    #processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri,
    image_uri=image_uri_inference,
    sagemaker_session=pipeline_session,
    role=role,
)

from sagemaker.model import Model
step_create_model = ModelStep(
    name="ashlinspreprocessormodel",
    step_args=sagemaker_model.create(),#, accelerator_type="ml.eia1.medium" instance_type="ml.c4.large"
)

from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
 name="ashlinsprocessingmodel",
 model=sagemaker_model,
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.c4.xlarge"],
 transform_instances=["ml.c4.xlarge"],
 model_package_group_name=model_package_group_name,
)

Tags: AWS, SageMaker, Pipeline, S3, Model

**Output of failure ** ClientError: Failed to invoke sagemaker:CreateModelPackage. Error Details: Cannot find S3 object: sagemaker-pipeline-2023-xx-xx-xx-xx-xx/xxxxx0aa4c/DataProcessing/output/preprocessing_model in bucket sagemaker-eu-west-1-xxxxxxxxxxxxx1. Please check if your S3 object exists and has proper permissions for SageMaker.

Preprocessing.py part for creating model

        # Preprocess data
        pipeline = modalias.pipeline(features)
        train_features = pipeline.fit_transform(train_features)
        test_features = pipeline.transform(test_features)

        # Save preprocessed data
        train_features.to_csv(os.path.join(output_path, 'train_features_output_path.csv'), index=False)
        train_labels.to_csv(os.path.join(output_path, 'train_labels_output_path.csv'), index=False)
        test_features.to_csv(os.path.join(output_path, 'test_features_output_path.csv'), index=False)
        test_labels.to_csv(os.path.join(output_path, 'test_labels_output_path.csv'), index=False)

        # Save the preprocessing model
        #dump(pipeline, os.path.join(model_path, 'model.pkg'))
        save(pipeline, modalias, os.path.join(model_path, 'model.pkg')) <- Model created

Thanks for your support and help!

Best Regards, Scarlet G

1 Answer
0
Accepted Answer

Hello Fam!

After a thorough review of the documentation, I identified a valuable method: utilizing the Join function from the SageMaker workflow. Here's a streamlined implementation that may benefit those working on similar challenges: Never try to append strings or concatenate ordinary strings or variables, otherwise sagemaker pipeline would throw a pipeline doesn't support concatenation

Error "Pipeline variables do not support str operation" despite .to_string() is used

TypeError: Pipeline variables do not support concatenation.

from sagemaker.workflow.functions import Join

# Define the model's suffix
model_suffix = "model.pkg"

sagemaker_model = Model(
    model_data = Join(on="/", values=[processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri, model_suffix]),
    #model_data = 's3://sagemaker-eu-west-1-xxxxxxx/sagemaker-processing-container-2023-09-12-20-11-09-137/output/preprocessing_model/model.tar.gz',
    #processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri+model_name,
    #model_data = f"{v_path}/model.tar.gz",
    #model_data = processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri,
    
    #processing_step.properties.ProcessingOutputConfig.Outputs
    image_uri=image_uri_inference,
    sagemaker_session=pipeline_session,
    role=role,

Now my workflow can access the model and has registered it.

I encourage everyone to delve into documentation when facing challenges & use LLM's augmented retrieval learning; it's a valuable skill that fosters professional growth. Always remember, each challenge is an opportunity for learning and development.

Cheers and Regards, Ashlin Gabriel Rajan (Pen-name Scarlet)

answered 7 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions