Sagemaker Pipelines - Batch Transform job using generated predictions as input for the model

0

Hi all! So, we're trying to implement a very simple Sagemaker Pipeline with 3 steps:

  • ETL: for now it only runs a simple query
  • Batch transform: uses the ETL's result and generates predictions with a batch transform job
  • Report: generates an HTML report

The thing is, when running the batch transform job alone in the Pipeline, everything runs OK. But when trying to run all the steps in a Pipeline, the batch transform job fails, and what we have seen in the logs is that the job takes the dataset which was generated in the ETL step, generates the predictions and saves them correctly in S3 (this is where we would expect the job to stop) but then it resends those predictions to the endpoint, as if they were a new input, and so the step fails as the model receives an array of 1 column thus mismatching the number of features which it was trained with.

There's not much info out there on this, and Sagemaker is painfully hard to debug. Has anyone experienced anything like this?

Our model and transformer code:

model = XGBoostModel(
    model_data=f"s3://{BUCKET}/{MODEL_ARTIFACTS_PATH}/artifacts.gzip",
    role=get_execution_role(),
    entry_point="predict.py",
    framework_version="1.3-1",
)

transformer = model.transformer(
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"s3://{BUCKET}/{PREDICTIONS_PATH}/",
    accept="text/csv",
)

step = TransformStep(
    name="Batch",
    transformer=transformer,
    inputs=TransformInput(
        data=etl_step.properties.ProcessingOutputConfig.Outputs[
            "dataset"
        ].S3Output.S3Uri,
        content_type="text/csv",
        split_type="Line",
    ),
    depends_on=[etl_step],
)

And our inference script:

def input_fn(request_body, content_type):
    return pd.read_csv(StringIO(request_body), header=None).values


def predict_fn(input_obj, model):
    """
    Function which takes the result of input_fn and generates
    predictions.
    """
    return model.predict_proba(input_obj)[:, 1]


def output_fn(predictions, content_type):
    return ",".join(str(pred) for pred in predictions)
질문됨 2년 전1262회 조회
1개 답변
0

Hi,

The issue you describe could happen if the prediction file is written in the same location where the input files are, and thus triggering one more round of prediction.

Can you check that the

etl_step.properties.ProcessingOutputConfig.Outputs[
            "dataset"
        ].S3Output.S3Uri

and

f"s3://{BUCKET}/{PREDICTIONS_PATH}/"

point a different path in your s3 bucket?

Did this work?

Thank you

AWS
답변함 2년 전
profile picture
전문가
검토됨 한 달 전
  • Hey! Thanks for the answer. Yes, I've tried that and no success at all. Still the same error.

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인