Sagemaker Pipelines - Is it possible to use a TransformStep with the Catboost Estimator ?

0

Hi! I am trying to implement a Sagemaker Pipeline including the following steps (among other things):

The TransformStep returns the following error: python3: can't open file 'serve': [Errno 2] No such file or directory

I wonder if I'm using TransformStep in the wrong way or if, at the moment, the use of TransformStep with the CatBoost model has not been implemented yet.

Code:

[...]
pyspark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role_arn,
    instance_type="ml.m5.xlarge",
    instance_count=12,
    sagemaker_session=pipeline_session,
    max_runtime_in_seconds=2400,
)

step_process_args = pyspark_processor.run(
    submit_app=os.path.join(
        s3_preprocess_script_dir, "preprocess.py"
    ),  # Hack to fix cache hit
    submit_py_files=[os.path.join(
        s3_preprocess_script_dir, "preprocess_utils.py"
    ), os.path.join(
        s3_preprocess_script_dir, "spark_utils.py"
    )],
    outputs=[
        ProcessingOutput(
            output_name="datasets",
            source="/opt/ml/processing/output",
            destination=s3_preprocess_output_path,
        )
    ],
    arguments=["--aws_account", AWS_ACCOUNT, "--aws_env", AWS_ENV, "--project_name", PROJECT_NAME, "--mode", "training"],
)

step_process = ProcessingStep(
    name="PySparkPreprocessing",
    step_args=step_process_args,
    cache_config=cache_config,
)

train_model_id = "catboost-classification-model"
train_model_version = "*"
train_scope = "training"
training_instance_type = "ml.m5.xlarge"

# Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type,
)

# Retrieve the training script
train_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope=train_scope
)

# Retrieve the pre-trained model tarball to further fine-tune
train_model_uri = model_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, model_scope=train_scope
)

training_job_name = name_from_base(f"jumpstart-{train_model_id}-training")

# Create SageMaker Estimator instance
tabular_estimator = Estimator(
    role=role_arn,
    image_uri=train_image_uri,
    source_dir=train_source_uri,
    model_uri=train_model_uri,
    entry_point="transfer_learning.py",
    instance_count=1,
    instance_type="ml.m5.xlarge",
    max_run=360000,
    hyperparameters=hyperparameters,
    sagemaker_session=pipeline_session,
    output_path=s3_training_output_path,
    disable_profiler=True,  # The default profiler rule includes a timestamp which will change each time the pipeline is upserted, causing cache misses. If profiling is not needed, set disable_profiler to True on the estimator.
)

# Launch a SageMaker Training job by passing s3 path of the training data
step_train_args = tabular_estimator.fit(
    {
        "training": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "datasets"
            ].S3Output.S3Uri
        )
    },
    logs=True,
    job_name=training_job_name,
)

step_train = TrainingStep(
    name="CatBoostTraining",
    step_args=step_train_args,
    cache_config=cache_config,
)

script_eval = ScriptProcessor(
    image_uri=[MASKED],
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-evaluation",
    role=role_arn,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "datasets"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=s3_evaluation_output_path,
        ),
    ],
    code="common/evaluation.py",
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name="Evaluation",
    step_args=eval_args,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

model = Model(
    image_uri="467855596088.dkr.ecr.eu-west-3.amazonaws.com/sagemaker-catboost-image:latest",
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role_arn,
)

evaluation_s3_uri = "{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_step_args = model.create(
    instance_type="ml.m5.large",
)
create_model = ModelStep(name="CatBoostModel", step_args=model_step_args)

step_fail = FailStep(
    name="FailBranch",
    error_message=Join(
        on=" ", values=["Execution failed due to F1-score <", 0.8]
    ),
)

cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.f1-score.value",
    ),
    right=f1_threshold,
)

step_cond = ConditionStep(
    name="F1ScoreCondition",
    conditions=[cond_lte],
    if_steps=[create_model],
    else_steps=[step_fail],
)

# Transform Job
s3_test_transform_input = os.path.join(step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"], "test")

transformer = Transformer(model_name=create_model.properties.ModelName,
                          instance_count=1,
                          instance_type="ml.m5.xlarge",
                          assemble_with="Line",
                          accept="text/csv",
                          output_path=s3_test_transform_output_path,
                          sagemaker_session=pipeline_session)

transform_step_args = transformer.transform(
    data=s3_test_transform_input,
    content_type="text/csv",
    split_type="Line",
)

step_transform = TransformStep(
    name="InferenceTransform",
    step_args=transform_step_args,
)


# Create and execute pipeline
step_transform.add_depends_on([step_process, create_model])

pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond, step_transform],
    sagemaker_session=pipeline_session,
)

pipeline.upsert(role_arn=role_arn, description=[MASKED])
execution = pipeline.start()
execution.wait(delay=60, max_attempts=120)
HaPo
demandé il y a un an1118 vues
2 réponses
2

Thanks for your answer. I managed to build and run apipeline with the CatBoost model (jumpstart version) which includes:

  • preprocessing
  • training
  • model registration
  • batch inference

Saisissez la description de l'image ici

I encounter two difficulties that I wanted to bring up:

  1. Unexpected behavior when registering the model

What I tried based on the documentation:

[...]

# Retrieve the inference docker container uri
deploy_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    image_scope="inference",
    model_id=train_model_id,
    model_version=train_model_version,
    instance_type=inference_instance_type,
)

# Retrieve the inference script uri
deploy_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope="inference"
)

model = Model(
    image_uri=deploy_image_uri,
    model_data="s3://[MASKED]/output/model.tar.gz",
    source_dir=deploy_source_uri ,
    sagemaker_session=pipeline_session,
    entry_point="inference.py",
    role=role_arn,
)

[...]

register_model_step_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
    model_metrics=model_metrics,
)

By doing so, the execution of the pipeline returns the following error:

boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/tmp5jzb0338/new.tar.gz to jumpstart-cache-prod-eu-west-3/source-directory-tarballs/catboost/inference/classification/v1.1.1/sourcedir.tar.gz: An error occurred (AccessDenied) when calling the CreateMultipartUpload operation: Access Denied

According to the documentation (see source_dir in https://sagemaker.readthedocs.io/en/stable/api/inference/model.html?highlight=model#sagemaker.model.Model), the same s3 path is used to save the model as the one used as source of the files. This is a problem when using the jumpstart inference scripts because you obviously can't upload to this reserved bucket. The workaround I used is to download locally the tarball of the Catboost inference scripts and then specify a local path as the origin of the scripts.

os.makedirs("tmp/deploy_source_uri", exist_ok=True)
S3Downloader.download(deploy_source_uri, "tmp")
os.system("tar -xf tmp/sourcedir.tar.gz --directory tmp/deploy_source_uri")

model = Model(
    image_uri=deploy_image_uri,
    model_data="s3://[MASKED]/output/model.tar.gz",
    source_dir="tmp/deploy_source_uri",
    sagemaker_session=pipeline_session,
    entry_point="inference.py",
    role=role_arn,
)

Is there a better way ?

  1. Failed to output csv files when using batch transform

I managed to perform a batch transformation step with the previous catboost model. However, I was not able to produce files in csv format, only json format seems to be compatible with catboost inference scripts.

What I wanted to do but does not work:

transformer = Transformer(
    model_name=model_step.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    strategy="MultiRecord",
    assemble_with="Line",
    output_path=s3_test_transform_output_path,
    accept="text/csv",
    max_concurrent_transforms=1,
    max_payload=5,
    sagemaker_session=pipeline_session,
)

step_transform = TransformStep(
    name="InferenceTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=s3_test_transform_input,
        content_type="text/csv",              
        split_type="Line",
        input_filter="$[1:]",
        join_source="Input"                         # Wanted to join input data to prediction in csv format
    ),
    depends_on=[model_step]
)

By doing so, the execution of the pipeline returns the following error:

2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG - Failed to do transform
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG - Traceback (most recent call last):
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/ml/model/code/inference.py", line 55, in transform_fn
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG -     return encoder.encode(output, accept)
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/conda/lib/python3.8/site-packages/sagemaker_inference/encoder.py", line 108, in encode
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -     return encoder(array_like)
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/conda/lib/python3.8/site-packages/sagemaker_inference/encoder.py", line 79, in _array_to_csv
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -     np.savetxt(stream, array_like, delimiter=",", fmt="%s")
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "<__array_function__ internals>", line 5, in savetxt
2022-11-23 13:48:12,275 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/conda/lib/python3.8/site-packages/numpy/lib/npyio.py", line 1380, in savetxt
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1 org.pytorch.serve.wlm.WorkerThread - Backend response time: 272
2022-11-23 13:48:12,275 [INFO ] W-9000-model_1 ACCESS_LOG - /169.254.255.130:42106 "POST /invocations HTTP/1.1" 500 288
2022-11-23 13:48:12,275 [INFO ] W-9000-model_1 TS_METRICS - Requests5XX.Count:1|#Level:Host|#hostname:379f03461a27,timestamp:null
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1 TS_METRICS - QueueTime.ms:0|#Level:Host|#hostname:379f03461a27,timestamp:null
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1 TS_METRICS - WorkerThreadTime.ms:11|#Level:Host|#hostname:379f03461a27,timestamp:null
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1-stdout MODEL_LOG -     raise ValueError(
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1-stdout MODEL_LOG - ValueError: Expected 1D or 2D array, got 0D array instead

After analysis of the inference.py script of the jumpstart model, it seems that the implementation of transform_fn is not compatible with the generation of output in csv format (transform_fn in inference.py provides a dict (output variable) to encoder.encode(output, accept) which call np.savetxt(stream, array_like, delimiter=",", fmt="%s"), array_like variable is thus a dict which is not compatible with np.savetxt).

The workaround I used is to output a json file like so:

transformer = Transformer(
    model_name=model_step.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    strategy="MultiRecord",
    assemble_with="Line",
    output_path=s3_test_transform_output_path,
    accept="application/json",                                                 # JSON
    max_concurrent_transforms=1,
    max_payload=5,
    sagemaker_session=pipeline_session,
)

step_transform = TransformStep(
    name="InferenceTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=s3_test_transform_input,
        content_type="text/csv",
        split_type="Line",
        input_filter="$[1:]",
    ),                                                                # No more join_source :(
    depends_on=[model_step]
)

Is what I'm trying to do with the CatBoost Jumpstart model not yet implemented or have I misused the pipeline objects?

HaPo
répondu il y a un an
  • Hello, i'm facing same issue, i will need output format as csv, have u managed to figure out anything from aws or should we go with custom data processing to associate input and output data.

1

I'd suggest to start out by debugging whether the model created by your pipeline actually deploys or transforms OK (just from notebook), because I think that's where your problem might be.

As shown in the sample notebooks for classification and regression, deploy(...) and similar calls for CatBoost (and other new JumpStart-based algorithms) require some extra parameters including inference image_uri and source_dir. Unlike, say, the XGBoost algorithm where only one image URI needs to be specified across training and inference - and no source scripts need to be bundled in at either training or inference time.

I haven't been able to test for myself yet, but think you might be able to fix this by adding image_uri and source_dir (specifying the inference container and script bundle as shown in the example notebooks, which are different from the training ones) to your create_model(...) call.

AWS
EXPERT
Alex_T
répondu il y a un an
  • Thank you for the advice. Please find a detailed answer below.

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions