How do I correctly reference the preprocessed output model's S3 URI in a SageMaker Pipeline?

0

Hello Experts!

I'm working on a SageMaker Pipeline poc that uses two custom containers: one for creating a preprocessing model and data, and another for inference to serve the model. I've defined various pipeline parameters and steps, including a data processing step and steps to create and register the model.(Forget inference for now) inference plays a role only when i register the model (I use the inference image URI that has the serve script) Additionally: I've tested both custom containers(model serving with flask and nginx and preprocessing with processing script inside it) locally and it works charm.

The issue arises when defining the step_create_model and register_step. The processor provides a location path to the preprocessing model, but not the specific model.pkg file.

Hardcoding the model's path (i.e., 's3://sagemaker-eu-west-1....../output/preprocessing_model/model.pkg') makes it work. Still, I'd like to dynamically reference the output from the processing step. I tried appending a string to processing_step.properties, but this method isn't supported.

XXXXX ClientError: Failed to invoke sagemaker:CreateModelPackage. Error Details: Cannot find S3 object: sagemaker-pipeline-2023-xx-xx-xx-xx-xx/xxxxx0aa4c/DataProcessing/output/preprocessing_model in bucket sagemaker-eu-west-1-xxxxxxxxxxxxx1. Please check if your S3 object exists and has proper permissions for SageMaker. XXXx

To reiterate, my use case involves creating a pipeline with two models (preprocessing and training). How can I dynamically reference the model's S3 URI, so I don't have to hardcode the path? Given the AWS guidelines, I've ensured not to include any confidential or personal information.

from sagemaker.clarify import BiasConfig, DataConfig, ModelConfig

# Parameters
region = 'eu-west-1'

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-eu-west-1-xxxxx/3xxxxxxx71xx1xxxx/PipelienUsecase/data/processing/input",
)

model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

# for data quality check step
skip_check_data_quality = ParameterBoolean(name="SkipDataQualityCheck", default_value=False)
register_new_baseline_data_quality = ParameterBoolean(name="RegisterNewDataQualityBaseline", default_value=False)
supplied_baseline_statistics_data_quality = ParameterString(name="DataQualitySuppliedStatistics", default_value="")
supplied_baseline_constraints_data_quality = ParameterString(name="DataQualitySuppliedConstraints", default_value="")

# for data bias check step
skip_check_data_bias = ParameterBoolean(name="SkipDataBiasCheck", default_value=False)
register_new_baseline_data_bias = ParameterBoolean(name="RegisterNewDataBiasBaseline", default_value=False)
supplied_baseline_constraints_data_bias = ParameterString(name="DataBiasSuppliedBaselineConstraints", default_value="")

# Setup
sagemaker_session = Session()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "sagemaker/pipeline"
image_uri_processing = "xxxxxxxxx.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-processing-container:latest"
image_uri_inference = "xxxxxxxxx.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-processing-inference-container:latest"
model_package_group_name = "model-monitor-clarify-group-ashlin"

# Data Processing Step
processor = Processor(
    image_uri=image_uri_processing,
    sagemaker_session=pipeline_session,
    role=role,
    instance_count=processing_instance_count,
    instance_type="ml.c4.xlarge"
)

processing_step = ProcessingStep(
    name="DataProcessing",
    processor=processor,
    inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input/')],
    outputs=[ProcessingOutput(output_name="output", source="/opt/ml/processing/output/"),
             ProcessingOutput(output_name="preprocessing_model", source="/opt/ml/processing/model")]
)
...
...
...
sagemaker_model = Model(
    model_data = 's3://sagemaker-eu-west-1....../output/preprocessing_model/model.tar.gz',<-Hardcoded and it works as the processing job creates the model
    #processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri,
    image_uri=image_uri_inference,
    sagemaker_session=pipeline_session,
    role=role,
)

from sagemaker.model import Model
step_create_model = ModelStep(
    name="ashlinspreprocessormodel",
    step_args=sagemaker_model.create(),#, accelerator_type="ml.eia1.medium" instance_type="ml.c4.large"
)

from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
 name="ashlinsprocessingmodel",
 model=sagemaker_model,
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.c4.xlarge"],
 transform_instances=["ml.c4.xlarge"],
 model_package_group_name=model_package_group_name,
)

Tags: AWS, SageMaker, Pipeline, S3, Model

**Output of failure ** ClientError: Failed to invoke sagemaker:CreateModelPackage. Error Details: Cannot find S3 object: sagemaker-pipeline-2023-xx-xx-xx-xx-xx/xxxxx0aa4c/DataProcessing/output/preprocessing_model in bucket sagemaker-eu-west-1-xxxxxxxxxxxxx1. Please check if your S3 object exists and has proper permissions for SageMaker.

Preprocessing.py part for creating model

        # Preprocess data
        pipeline = modalias.pipeline(features)
        train_features = pipeline.fit_transform(train_features)
        test_features = pipeline.transform(test_features)

        # Save preprocessed data
        train_features.to_csv(os.path.join(output_path, 'train_features_output_path.csv'), index=False)
        train_labels.to_csv(os.path.join(output_path, 'train_labels_output_path.csv'), index=False)
        test_features.to_csv(os.path.join(output_path, 'test_features_output_path.csv'), index=False)
        test_labels.to_csv(os.path.join(output_path, 'test_labels_output_path.csv'), index=False)

        # Save the preprocessing model
        #dump(pipeline, os.path.join(model_path, 'model.pkg'))
        save(pipeline, modalias, os.path.join(model_path, 'model.pkg')) <- Model created

Thanks for your support and help!

Best Regards, Scarlet G

1 Antwort
0
Akzeptierte Antwort

Hello Fam!

After a thorough review of the documentation, I identified a valuable method: utilizing the Join function from the SageMaker workflow. Here's a streamlined implementation that may benefit those working on similar challenges: Never try to append strings or concatenate ordinary strings or variables, otherwise sagemaker pipeline would throw a pipeline doesn't support concatenation

Error "Pipeline variables do not support str operation" despite .to_string() is used

TypeError: Pipeline variables do not support concatenation.

from sagemaker.workflow.functions import Join

# Define the model's suffix
model_suffix = "model.pkg"

sagemaker_model = Model(
    model_data = Join(on="/", values=[processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri, model_suffix]),
    #model_data = 's3://sagemaker-eu-west-1-xxxxxxx/sagemaker-processing-container-2023-09-12-20-11-09-137/output/preprocessing_model/model.tar.gz',
    #processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri+model_name,
    #model_data = f"{v_path}/model.tar.gz",
    #model_data = processing_step.properties.ProcessingOutputConfig.Outputs["preprocessing_model"].S3Output.S3Uri,
    
    #processing_step.properties.ProcessingOutputConfig.Outputs
    image_uri=image_uri_inference,
    sagemaker_session=pipeline_session,
    role=role,

Now my workflow can access the model and has registered it.

I encourage everyone to delve into documentation when facing challenges & use LLM's augmented retrieval learning; it's a valuable skill that fosters professional growth. Always remember, each challenge is an opportunity for learning and development.

Cheers and Regards, Ashlin Gabriel Rajan (Pen-name Scarlet)

beantwortet vor 8 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen