Sagemaker Pipelines - Is it possible to use a TransformStep with the Catboost Estimator ?

0

Hi! I am trying to implement a Sagemaker Pipeline including the following steps (among other things):

The TransformStep returns the following error: python3: can't open file 'serve': [Errno 2] No such file or directory

I wonder if I'm using TransformStep in the wrong way or if, at the moment, the use of TransformStep with the CatBoost model has not been implemented yet.

Code:

[...]
pyspark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role_arn,
    instance_type="ml.m5.xlarge",
    instance_count=12,
    sagemaker_session=pipeline_session,
    max_runtime_in_seconds=2400,
)

step_process_args = pyspark_processor.run(
    submit_app=os.path.join(
        s3_preprocess_script_dir, "preprocess.py"
    ),  # Hack to fix cache hit
    submit_py_files=[os.path.join(
        s3_preprocess_script_dir, "preprocess_utils.py"
    ), os.path.join(
        s3_preprocess_script_dir, "spark_utils.py"
    )],
    outputs=[
        ProcessingOutput(
            output_name="datasets",
            source="/opt/ml/processing/output",
            destination=s3_preprocess_output_path,
        )
    ],
    arguments=["--aws_account", AWS_ACCOUNT, "--aws_env", AWS_ENV, "--project_name", PROJECT_NAME, "--mode", "training"],
)

step_process = ProcessingStep(
    name="PySparkPreprocessing",
    step_args=step_process_args,
    cache_config=cache_config,
)

train_model_id = "catboost-classification-model"
train_model_version = "*"
train_scope = "training"
training_instance_type = "ml.m5.xlarge"

# Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type,
)

# Retrieve the training script
train_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope=train_scope
)

# Retrieve the pre-trained model tarball to further fine-tune
train_model_uri = model_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, model_scope=train_scope
)

training_job_name = name_from_base(f"jumpstart-{train_model_id}-training")

# Create SageMaker Estimator instance
tabular_estimator = Estimator(
    role=role_arn,
    image_uri=train_image_uri,
    source_dir=train_source_uri,
    model_uri=train_model_uri,
    entry_point="transfer_learning.py",
    instance_count=1,
    instance_type="ml.m5.xlarge",
    max_run=360000,
    hyperparameters=hyperparameters,
    sagemaker_session=pipeline_session,
    output_path=s3_training_output_path,
    disable_profiler=True,  # The default profiler rule includes a timestamp which will change each time the pipeline is upserted, causing cache misses. If profiling is not needed, set disable_profiler to True on the estimator.
)

# Launch a SageMaker Training job by passing s3 path of the training data
step_train_args = tabular_estimator.fit(
    {
        "training": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "datasets"
            ].S3Output.S3Uri
        )
    },
    logs=True,
    job_name=training_job_name,
)

step_train = TrainingStep(
    name="CatBoostTraining",
    step_args=step_train_args,
    cache_config=cache_config,
)

script_eval = ScriptProcessor(
    image_uri=[MASKED],
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-evaluation",
    role=role_arn,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "datasets"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=s3_evaluation_output_path,
        ),
    ],
    code="common/evaluation.py",
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name="Evaluation",
    step_args=eval_args,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

model = Model(
    image_uri="467855596088.dkr.ecr.eu-west-3.amazonaws.com/sagemaker-catboost-image:latest",
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role_arn,
)

evaluation_s3_uri = "{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_step_args = model.create(
    instance_type="ml.m5.large",
)
create_model = ModelStep(name="CatBoostModel", step_args=model_step_args)

step_fail = FailStep(
    name="FailBranch",
    error_message=Join(
        on=" ", values=["Execution failed due to F1-score <", 0.8]
    ),
)

cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.f1-score.value",
    ),
    right=f1_threshold,
)

step_cond = ConditionStep(
    name="F1ScoreCondition",
    conditions=[cond_lte],
    if_steps=[create_model],
    else_steps=[step_fail],
)

# Transform Job
s3_test_transform_input = os.path.join(step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"], "test")

transformer = Transformer(model_name=create_model.properties.ModelName,
                          instance_count=1,
                          instance_type="ml.m5.xlarge",
                          assemble_with="Line",
                          accept="text/csv",
                          output_path=s3_test_transform_output_path,
                          sagemaker_session=pipeline_session)

transform_step_args = transformer.transform(
    data=s3_test_transform_input,
    content_type="text/csv",
    split_type="Line",
)

step_transform = TransformStep(
    name="InferenceTransform",
    step_args=transform_step_args,
)


# Create and execute pipeline
step_transform.add_depends_on([step_process, create_model])

pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond, step_transform],
    sagemaker_session=pipeline_session,
)

pipeline.upsert(role_arn=role_arn, description=[MASKED])
execution = pipeline.start()
execution.wait(delay=60, max_attempts=120)
HaPo
gefragt vor 2 Jahren1375 Aufrufe
2 Antworten
2

Thanks for your answer. I managed to build and run apipeline with the CatBoost model (jumpstart version) which includes:

  • preprocessing
  • training
  • model registration
  • batch inference

Saisissez la description de l'image ici

I encounter two difficulties that I wanted to bring up:

  1. Unexpected behavior when registering the model

What I tried based on the documentation:

[...]

# Retrieve the inference docker container uri
deploy_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    image_scope="inference",
    model_id=train_model_id,
    model_version=train_model_version,
    instance_type=inference_instance_type,
)

# Retrieve the inference script uri
deploy_source_uri = script_uris.retrieve(
    model_id=train_model_id, model_version=train_model_version, script_scope="inference"
)

model = Model(
    image_uri=deploy_image_uri,
    model_data="s3://[MASKED]/output/model.tar.gz",
    source_dir=deploy_source_uri ,
    sagemaker_session=pipeline_session,
    entry_point="inference.py",
    role=role_arn,
)

[...]

register_model_step_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
    model_metrics=model_metrics,
)

By doing so, the execution of the pipeline returns the following error:

boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/tmp5jzb0338/new.tar.gz to jumpstart-cache-prod-eu-west-3/source-directory-tarballs/catboost/inference/classification/v1.1.1/sourcedir.tar.gz: An error occurred (AccessDenied) when calling the CreateMultipartUpload operation: Access Denied

According to the documentation (see source_dir in https://sagemaker.readthedocs.io/en/stable/api/inference/model.html?highlight=model#sagemaker.model.Model), the same s3 path is used to save the model as the one used as source of the files. This is a problem when using the jumpstart inference scripts because you obviously can't upload to this reserved bucket. The workaround I used is to download locally the tarball of the Catboost inference scripts and then specify a local path as the origin of the scripts.

os.makedirs("tmp/deploy_source_uri", exist_ok=True)
S3Downloader.download(deploy_source_uri, "tmp")
os.system("tar -xf tmp/sourcedir.tar.gz --directory tmp/deploy_source_uri")

model = Model(
    image_uri=deploy_image_uri,
    model_data="s3://[MASKED]/output/model.tar.gz",
    source_dir="tmp/deploy_source_uri",
    sagemaker_session=pipeline_session,
    entry_point="inference.py",
    role=role_arn,
)

Is there a better way ?

  1. Failed to output csv files when using batch transform

I managed to perform a batch transformation step with the previous catboost model. However, I was not able to produce files in csv format, only json format seems to be compatible with catboost inference scripts.

What I wanted to do but does not work:

transformer = Transformer(
    model_name=model_step.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    strategy="MultiRecord",
    assemble_with="Line",
    output_path=s3_test_transform_output_path,
    accept="text/csv",
    max_concurrent_transforms=1,
    max_payload=5,
    sagemaker_session=pipeline_session,
)

step_transform = TransformStep(
    name="InferenceTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=s3_test_transform_input,
        content_type="text/csv",              
        split_type="Line",
        input_filter="$[1:]",
        join_source="Input"                         # Wanted to join input data to prediction in csv format
    ),
    depends_on=[model_step]
)

By doing so, the execution of the pipeline returns the following error:

2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG - Failed to do transform
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG - Traceback (most recent call last):
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/ml/model/code/inference.py", line 55, in transform_fn
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG -     return encoder.encode(output, accept)
2022-11-23 13:48:12,273 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/conda/lib/python3.8/site-packages/sagemaker_inference/encoder.py", line 108, in encode
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -     return encoder(array_like)
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/conda/lib/python3.8/site-packages/sagemaker_inference/encoder.py", line 79, in _array_to_csv
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -     np.savetxt(stream, array_like, delimiter=",", fmt="%s")
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "<__array_function__ internals>", line 5, in savetxt
2022-11-23 13:48:12,275 [INFO ] W-9000-model_1-stdout MODEL_LOG -   File "/opt/conda/lib/python3.8/site-packages/numpy/lib/npyio.py", line 1380, in savetxt
2022-11-23 13:48:12,274 [INFO ] W-9000-model_1 org.pytorch.serve.wlm.WorkerThread - Backend response time: 272
2022-11-23 13:48:12,275 [INFO ] W-9000-model_1 ACCESS_LOG - /169.254.255.130:42106 "POST /invocations HTTP/1.1" 500 288
2022-11-23 13:48:12,275 [INFO ] W-9000-model_1 TS_METRICS - Requests5XX.Count:1|#Level:Host|#hostname:379f03461a27,timestamp:null
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1 TS_METRICS - QueueTime.ms:0|#Level:Host|#hostname:379f03461a27,timestamp:null
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1 TS_METRICS - WorkerThreadTime.ms:11|#Level:Host|#hostname:379f03461a27,timestamp:null
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1-stdout MODEL_LOG -     raise ValueError(
2022-11-23 13:48:12,276 [INFO ] W-9000-model_1-stdout MODEL_LOG - ValueError: Expected 1D or 2D array, got 0D array instead

After analysis of the inference.py script of the jumpstart model, it seems that the implementation of transform_fn is not compatible with the generation of output in csv format (transform_fn in inference.py provides a dict (output variable) to encoder.encode(output, accept) which call np.savetxt(stream, array_like, delimiter=",", fmt="%s"), array_like variable is thus a dict which is not compatible with np.savetxt).

The workaround I used is to output a json file like so:

transformer = Transformer(
    model_name=model_step.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    strategy="MultiRecord",
    assemble_with="Line",
    output_path=s3_test_transform_output_path,
    accept="application/json",                                                 # JSON
    max_concurrent_transforms=1,
    max_payload=5,
    sagemaker_session=pipeline_session,
)

step_transform = TransformStep(
    name="InferenceTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=s3_test_transform_input,
        content_type="text/csv",
        split_type="Line",
        input_filter="$[1:]",
    ),                                                                # No more join_source :(
    depends_on=[model_step]
)

Is what I'm trying to do with the CatBoost Jumpstart model not yet implemented or have I misused the pipeline objects?

HaPo
beantwortet vor 2 Jahren
  • Hello, i'm facing same issue, i will need output format as csv, have u managed to figure out anything from aws or should we go with custom data processing to associate input and output data.

1

I'd suggest to start out by debugging whether the model created by your pipeline actually deploys or transforms OK (just from notebook), because I think that's where your problem might be.

As shown in the sample notebooks for classification and regression, deploy(...) and similar calls for CatBoost (and other new JumpStart-based algorithms) require some extra parameters including inference image_uri and source_dir. Unlike, say, the XGBoost algorithm where only one image URI needs to be specified across training and inference - and no source scripts need to be bundled in at either training or inference time.

I haven't been able to test for myself yet, but think you might be able to fix this by adding image_uri and source_dir (specifying the inference container and script bundle as shown in the example notebooks, which are different from the training ones) to your create_model(...) call.

AWS
EXPERTE
Alex_T
beantwortet vor 2 Jahren
  • Thank you for the advice. Please find a detailed answer below.

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen