Using HuggingFace in Sagemaker Studio as part of a project

0

TLDR: if we are trying to use a HuggingFaceProcessor/Estimator in a Sagemaker Studio project, what are the requirements for the train.py file in terms of how it refers to the assembled training data, and where it should store the results of the operations it performs( e.g. compiled model, datae etc.)


FULL DETAILS

So our high level goal is to be able to deploy some kind of non-XGB model from a sagemaker studio project, given that the templates provided are all XGB. As outlined in an earlier question we'd started with TensorFlow, but since our TensorFlow model wraps a HuggingFace model we thought let's try something even simpler, just a HuggingFace model using the HuggingFaceProcessor.

So following docs on HuggingFaceProcessor and a HuggingFace Estimator example we started to adjust the abalone (project template) pipeline.py to look like this (full code can be provided on request):

    # processing step for feature engineering
    hf_processor = HuggingFaceProcessor(
        role=role, 
        instance_count=processing_instance_count,
        instance_type=processing_instance_type,
        transformers_version='4.4.2',
        pytorch_version='1.6.0', 
        base_job_name=f"{base_job_prefix}/frameworkprocessor-hf",
        sagemaker_session=pipeline_session,
    )
    step_args = hf_processor.run(
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ],
        code=os.path.join(BASE_DIR, "preprocess.py"),
        arguments=["--input-data", input_data],
    )
    step_process = ProcessingStep(
        name="PreprocessTopicData",
        step_args=step_args,
    )

    # training step for generating model artifacts
    model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/TopicTrain"

    hf_train = HuggingFace(entry_point='train.py',
                            source_dir=BASE_DIR,
                            base_job_name='huggingface-sdk-extension',
                            instance_type=processing_instance_type,
                            instance_count=processing_instance_count,
                            transformers_version='4.4',
                            pytorch_version='1.6',
                            py_version='py36',
                            role=role,
                          )
  
    hf_train.set_hyperparameters(
       epochs=3,
       train_batch_size=16,
       learning_rate=1.0e-5,
       model_name='distilbert-base-uncased',
    )
                           
    step_args = hf_train.fit(
        inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
    )

Finding that pushing to master doesn't provide any feedback on issues arising from pipeline.py, we realised that trying to get the pipeline from a notebook was a better way of debugging these sorts of changes, assuming one remembered to restart the kernel each time to ensure changes to the pipeline.py file was available to the notebook.

So using the following code in the notebook we worked through a series of issues trying to bash the code into shape such that it would compile:

from pipelines.topic.pipeline import get_pipeline


pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
)

We needed to change the default processing and training instance types to avoid a "cpu" unsupported issue:

    processing_instance_type="ml.p3.xlarge",
    training_instance_type="ml.p3.xlarge",

and add a train.py script:

from transformers import AutoTokenizer
from transformers import TFAutoModelForSequenceClassification
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model = TFAutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=18)
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from transformers import (
    DistilBertTokenizerFast,
    TFDistilBertForSequenceClassification,
)
DATA_COLUMN = 'text'
LABEL_COLUMN = 'label'
MAX_SEQUENCE_LENGTH = 512
LEARNING_RATE = 5e-5
BATCH_SIZE = 16
NUM_EPOCHS = 3
NUM_LABELS = 15

if __name__ == "__main__":

    # --------------------------------------------------------------------------------
    # Tokenizer
    # --------------------------------------------------------------------------------
    tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
    def tokenize(sentences, max_length=MAX_SEQUENCE_LENGTH, padding='max_length'):
        """Tokenize using the Huggingface tokenizer
        Args:
            sentences: String or list of string to tokenize
            padding: Padding method ['do_not_pad'|'longest'|'max_length']
        """
        return tokenizer(
            sentences,
            truncation=True,
            padding=padding,
            max_length=max_length,
            return_tensors="tf"
        )
    # --------------------------------------------------------------------------------
    # Load data
    # --------------------------------------------------------------------------------
    from keras.utils import to_categorical
    from sklearn.preprocessing import LabelEncoder
    labelencoder_Y_1 = LabelEncoder()
    yy = labelencoder_Y_1.fit_transform(train_data[LABEL_COLUMN].tolist())
    yy = to_categorical(yy)
    print(len(yy))
    print(yy.shape)
    train_dat, validation_dat, train_label, validation_label = train_test_split(
        train_data[DATA_COLUMN].tolist(),
        yy,
        test_size=0.2,
        shuffle=True
    )
    # --------------------------------------------------------------------------------
    # Prepare TF dataset
    # --------------------------------------------------------------------------------
    train_dataset = tf.data.Dataset.from_tensor_slices((
        dict(tokenize(train_dat)),  # Convert BatchEncoding instance to dictionary
        train_label
    )).shuffle(1000).batch(BATCH_SIZE).prefetch(1)
    validation_dataset = tf.data.Dataset.from_tensor_slices((
        dict(tokenize(validation_dat)),
        validation_label
    )).batch(BATCH_SIZE).prefetch(1)
    # --------------------------------------------------------------------------------
    # training
    # --------------------------------------------------------------------------------
    model = TFDistilBertForSequenceClassification.from_pretrained(
        'distilbert-base-uncased',
        num_labels=NUM_LABELS
    )
    optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(
        optimizer=optimizer,
        loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
    )

However we are now stuck on this error when trying to get the pipeline from a notebook.

<ipython-input-3-be38b3dda75f> in <module>
      7     default_bucket=default_bucket,
      8     model_package_group_name=model_package_group_name,
----> 9     pipeline_name=pipeline_name,
     10 )
     11 # !conda list

~/topic-models-no-monitoring-p-rboparx6tdeg/sagemaker-topic-models-no-monitoring-p-rboparx6tdeg-modelbuild/pipelines/topic/pipeline.py in get_pipeline(region, sagemaker_project_arn, role, default_bucket, model_package_group_name, pipeline_name, base_job_prefix, processing_instance_type, training_instance_type)
    228                     "validation"
    229                 ].S3Output.S3Uri,
--> 230                 content_type="text/csv",
    231             ),
    232         },

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline_context.py in wrapper(*args, **kwargs)
    246             return self_instance.sagemaker_session.context
    247 
--> 248         return run_func(*args, **kwargs)
    249 
    250     return wrapper

/opt/conda/lib/python3.7/site-packages/sagemaker/estimator.py in fit(self, inputs, wait, logs, job_name, experiment_config)
   1059         self._prepare_for_training(job_name=job_name)
   1060 
-> 1061         self.latest_training_job = _TrainingJob.start_new(self, inputs, experiment_config)
   1062         self.jobs.append(self.latest_training_job)
   1063         if wait:

/opt/conda/lib/python3.7/site-packages/sagemaker/estimator.py in start_new(cls, estimator, inputs, experiment_config)
   1956         train_args = cls._get_train_args(estimator, inputs, experiment_config)
   1957 
-> 1958         estimator.sagemaker_session.train(**train_args)
   1959 
   1960         return cls(estimator.sagemaker_session, estimator._current_job_name)

/opt/conda/lib/python3.7/site-packages/sagemaker/session.py in train(self, input_mode, input_config, role, job_name, output_config, resource_config, vpc_config, hyperparameters, stop_condition, tags, metric_definitions, enable_network_isolation, image_uri, algorithm_arn, encrypt_inter_container_traffic, use_spot_instances, checkpoint_s3_uri, checkpoint_local_path, experiment_config, debugger_rule_configs, debugger_hook_config, tensorboard_output_config, enable_sagemaker_metrics, profiler_rule_configs, profiler_config, environment, retry_strategy)
    611             self.sagemaker_client.create_training_job(**request)
    612 
--> 613         self._intercept_create_request(train_request, submit, self.train.__name__)
    614 
    615     def _get_train_request(  # noqa: C901

/opt/conda/lib/python3.7/site-packages/sagemaker/session.py in _intercept_create_request(self, request, create, func_name)
   4303             func_name (str): the name of the function needed intercepting
   4304         """
-> 4305         return create(request)
   4306 
   4307 

/opt/conda/lib/python3.7/site-packages/sagemaker/session.py in submit(request)
    608         def submit(request):
    609             LOGGER.info("Creating training-job with name: %s", job_name)
--> 610             LOGGER.debug("train request: %s", json.dumps(request, indent=4))
    611             self.sagemaker_client.create_training_job(**request)
    612 

/opt/conda/lib/python3.7/json/__init__.py in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    236         check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237         separators=separators, default=default, sort_keys=sort_keys,
--> 238         **kw).encode(obj)
    239 
    240 

/opt/conda/lib/python3.7/json/encoder.py in encode(self, o)
    199         chunks = self.iterencode(o, _one_shot=True)
    200         if not isinstance(chunks, (list, tuple)):
--> 201             chunks = list(chunks)
    202         return ''.join(chunks)
    203 

/opt/conda/lib/python3.7/json/encoder.py in _iterencode(o, _current_indent_level)
    429             yield from _iterencode_list(o, _current_indent_level)
    430         elif isinstance(o, dict):
--> 431             yield from _iterencode_dict(o, _current_indent_level)
    432         else:
    433             if markers is not None:

/opt/conda/lib/python3.7/json/encoder.py in _iterencode_dict(dct, _current_indent_level)
    403                 else:
    404                     chunks = _iterencode(value, _current_indent_level)
--> 405                 yield from chunks
    406         if newline_indent is not None:
    407             _current_indent_level -= 1

/opt/conda/lib/python3.7/json/encoder.py in _iterencode_dict(dct, _current_indent_level)
    403                 else:
    404                     chunks = _iterencode(value, _current_indent_level)
--> 405                 yield from chunks
    406         if newline_indent is not None:
    407             _current_indent_level -= 1

/opt/conda/lib/python3.7/json/encoder.py in _iterencode(o, _current_indent_level)
    436                     raise ValueError("Circular reference detected")
    437                 markers[markerid] = o
--> 438             o = _default(o)
    439             yield from _iterencode(o, _current_indent_level)
    440             if markers is not None:

/opt/conda/lib/python3.7/json/encoder.py in default(self, o)
    177 
    178         """
--> 179         raise TypeError(f'Object of type {o.__class__.__name__} '
    180                         f'is not JSON serializable')
    181 

TypeError: Object of type ParameterInteger is not JSON serializable

Which is telling us that some aspect of the training job (?) is not serializable, and it's not clear how to debug further.

What would be enormously helpful is project templates for sagemaker studio showing the use of all the Processors, e.g. HuggingFace, TensorFlow and so on, but failing that we'd be most grateful is anyone could point us to documentation on what the requirements are for the train.py file that we need to specifiy for the HuggingFace Estimator.

many thanks in advance

asked 2 years ago742 views
2 Answers
0

Hi, Not sure I can get you all the way to solution either but I think here are some more useful tips:

Instance type: To my knowledge ml.p3.xlarge does not exist - you'll probably want to look at ml.g4dn.xlarge or ml.p3.2xlarge

Avoiding kernel restarts:

It is possible to make Jupyter pick up changes you make to local files on-the-fly instead of having to restart the kernel each time. Just add the following lines to the top of your notebook before you run any imports: The autoreload extension will then reload modules each time before running your code.

%load_ext autoreload
%autoreload 2

Framework Processors with Pipelines:

As you might already be aware, there were some issues using Pipelines when FrameworkProcessor (which HuggingFaceProcessor, TensorFlowProcessor, etc are built on) was first launched. I believe these should now be fixed, but do require you to be using the pipeline_session syntax - I see you already are, so that's great.

Just in case you're seeing any echoes of this, would maybe recommend to try demonstrating pipeline creation first without the processing job (don't necessarily need to have it working properly end-to-end), and then adding the processing job in. Do be aware you might come across some older samples that haven't been updated yet to use PipelineSession, and these may not work properly with your DL Framework Processors. (I think Heiko's sample above might pre-date this).

Pipeline JSON error:

It's hard to tell what's up for sure because seems like there are some gaps in your code (e.g. is processing_instance_count a plain number or a pipeline parameter? Either way, seems like something is going wrong at the actual pipeline definition stage rather than trying to execute it. I'd suggest to simplify and gradually build up: E.g. hard-coding pipeline parameters to plain values, building the pipeline with just a subset of steps, etc.

HF processing and training scripts:

Getting your pipeline working should hopefully be largely separate from (and parallelizable with) getting your individual training/processing/inference jobs running as expected. You'll find more information about the requirements for your script in the Hugging Face section of the SageMaker Python SDK doc.

I'd maybe point to this example training script for sequence classification, which shows the standard pattern of using argparse and SM_MODEL_DIR to find what local folder your script should save the trained model to. That same repository has many examples showing other features and use cases too... But from a quick check most/all of them don't seem to take data channel inputs. This one shows how input data (local folders) are also passed through the CLI/environment variables to your script.

To summarize:

  • Your script receives input and output locations (local folders) through CLI variables with environment variable fallbacks - e.g. --foo and SM_CHANNEL_FOO if you run a job like estimator.fit({"foo": "s3://.../..."})
  • Hyperparameters are also received through CLI and/or the SM_HPS JSON environment variable
  • Make sure to save your model to the SM_MODEL_DIR folder
  • if you want to output metrics, just use print() or logging on the script side and then define regular expressions on the pipeline side, to tell SageMaker how to scrape them from the logs

Here is a quick overview from HF, another introductory example, and a really over-engineered sample if you want to see what a very complex use case might look like 🥲 Hope this helps!

AWS
EXPERT
Alex_T
answered 2 years ago
  • hi @Alex_T, thanks so much for your detailed response - that's been really helpful. Sorry for not thanking you sooner - we had to move away from Studio to get something else working as a stop gap, but now we're looking at it again.

    Thanks to your help and the post from @Heiko we've moved forward to the point of starting to get a train.py working. We're now stuck on a new point regarding saving DataSets. For XGB we had been saving the training, test, and validation data into csvs. All the train.py examples work with load_from_disk operations that expect a DataSet format. In our preprocess.py we can't load tensorflow or DataSet from datasets, so what would be of great help is an example of a preprocess.py script that worked with DataSets.

    The particular difficulty we're finding with SageMaker Studio is working out exactly which versions of which libraries are running within these scripts ...

0

Hi there

I'm not 100% sure what caused the error that you're seeing, but since you mentioned that a Hugging Face (HF) Pipeline example could be useful, I wanted to share this project I developed a while ago where we use all the HF component in a Sagemaker Pipeline: https://github.com/marshmellow77/ade-pipeline/tree/main/ade-modelbuild/pipelines

The original pipeline definition is in teh abalone folder, and the new one in the ade folder (ade = Adverse Drug Event).

Cheers Heiko

AWS
Heiko
answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions