Skip to content

Instrumenting Glue, Redshift, and Step Functions with CloudWatch Custom Metrics

9 minute read
Content level: Advanced
0

This technical blog demonstrates how to implement CloudWatch custom metrics in AWS Analytics services, helping data engineers create effective monitoring systems for data pipeline health, with practical examples using healthcare insurance data to detect quality issues early.

by Matt Nispel and Seun Akinyosoye

Introduction

Instrumenting and monitoring are critical components of any application or data pipeline. Understanding the health of your data early in the data pipeline leads to fewer data quality issues downstream and can alert you of possible data issues before they cause larger problems.

Customers utilize a variety of AWS Analytics services to build their pipelines. Many AWS Analytics services integrate with Amazon CloudWatch out of the box and provide enhanced insights. Data engineers who are working with AWS Analytics services often understand additional unique characteristics of their data at various stages of their pipeline that signal pipeline health. Being able to monitor and alert on these additional signals can give you confidence that your pipeline is functioning as intended.

Let’s say for example that you are a data engineer at a healthcare insurance company. Your job is to build data pipelines to process claims data from healthcare providers. You receive data from each provider daily that needs to be processed. It may be important for you to understand that your pipeline is processing data from all of the healthcare providers to ensure that you aren’t missing any data. If you don’t receive information from any particular provider you may want to generate an alert so that your team can investigate.

In this blog we will show how to generate CloudWatch custom metrics in AWS Glue, AWS Lambda and Amazon Redshift using multiple approaches. Building on the example of a healthcare insurance company we will show you how to turn some basic data validation checks into CloudWatch custom metrics. Once you have instrumented your pipeline to generate CloudWatch custom metrics you can now utilize CloudWatch features such as CloudWatch metric math and CloudWatch alarms to provide a comprehensive view and automated remediation of your pipeline.

Note: You can also utilize AWS Glue Data Quality to measure and monitor the quality of your data so that you can make good business decisions. This article can be used as an alternative approach.

Solution Overview

We will now show you four examples of utilizing CloudWatch custom metrics with AWS Analytics services. The Glue jobs utilize Spark SQL and PySpark to do data validation checks and then call the CloudWatch Boto3 put_metric_datafrom inside the Glue job. The Redshift examples utilize AWS Step Functions and AWS Lambda to orchestrate the Redshift SQL based data validation and then call CloudWatch metrics using Step Functions SDK service integrationsand Lambda Boto3 put_metric_data.

Prerequisites

  • Medicare Dataset - Download public dataset using aws s3 cp s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv ./
  • Access to Glue, Lambda, Redshift
  • NOTE: Running AWS services can incur charges in your AWS account

Architecture Overview

1/ Glue job with Spark SQL Generated Metrics

Glue job with Spark SQL Generated Metrics

Here is how using a AWS Glue job with Spark SQL metrics works:

  1. Glue job runs
  2. Glue job uses SELECT to capture specific metrics as variable
  3. Metrics are written to CloudWatch
import boto3
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job

CloudWatch = boto3.client('cloudwatch')  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# If you data doesn't have a table in the catalog, you can use a temporary view to use SQL
# Here we read all the CSV files under the indicated s3 path
medicareDF = spark.read.csv("s3://awsglue-datasets/examples/medicare/", header=True)
# If there data has a reasonable size (like in this case), we can cache in memory/disk (depending on cluster size) 
# so after the first query, the following no longer have to go to read and parse from s3
medicareDF.cache()
# Instead of using the DataFrame API, you can register it as a view for SQL usage like this:
medicareDF.registerTempTable("medicare")

# Explore the data, since it has many long columns, change the display to be vertical for easier read
spark.sql("SELECT * from medicare").show(n=10, truncate=False, vertical=True)

# You can also register as a view the result of another query, to avoid repetition
spark.sql("SELECT * FROM medicare WHERE `Provider State` = 'NY'").registerTempTable("ny_medicare")

# count unique rows in table
glue_unique_provider_count = spark.sql("""
SELECT COUNT(DISTINCT `Provider Id`) as unique_provider_count FROM ny_medicare
""").collect()[0]['unique_provider_count']
print(glue_unique_provider_count)

# count number of rows in table
glue_row_count = medicareDF.count()
print(glue_row_count)
# write unique rows to cloudwatch
response = CloudWatch.put_metric_data(
    MetricData = [
        {
            'MetricName': 'Count',
            'Dimensions': [
                {
                    'Name': 'Glue Transform Job',
                    'Value': 'Number of Unique Providers Processed'
                },
            ],
            'Unit': 'Count',
            'Value': glue_unique_provider_count
        },
    ],
    Namespace='Medicare Pipeline Metrics'
)

# write total rows to cloudwatch
response = CloudWatch.put_metric_data(
    MetricData = [
        {
            'MetricName': 'Count',
            'Dimensions': [
                {
                    'Name': 'Glue Transform Job',
                    'Value': 'Number of Rows Read'
                },
            ],
            'Unit': 'Count',
            'Value': glue_row_count
        },
    ],
    Namespace='Medicare Pipeline Metrics'
)

job.commit()

2/ Glue job with PySpark Generated Metrics

Glue job with PySpark Generated Metrics

Here is how using a AWS Glue job with PySpark metrics works:

  1. Customers processed data lands in Amazon Redshift
  2. Glue connects to Amazon Redshift
  3. Utilizing AWS Glue and PySpark to get validation metrics from redshift (sample below)
  4. Sending validated to Amazon CloudWatch
  5. Metric is landed in Amazon CloudWatch which can be used to trigger alerts
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

AmazonRedshift_node123 = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options={
        "redshiftTmpDir": "s3://[s3 bucket]/temporary/",
        "useConnectionProperties": "true",
        "dbtable": "public.medicare",
        "connectionName": "Redshift connection",
    },
    transformation_ctx="AmazonRedshift_node123",
)

df = AmazonRedshift_node123.toDF()

final_record = df.select("provider_id").distinct().count()

response = CloudWatch.put_metric_data(MetricData = [{'MetricName': 'Count', 'Dimensions':[{'Name': 'PySpark job', 'Value': 'Number of Unique Providers Processed'},],'Unit': 'Count', 
        'Value':final_record},],Namespace='Medicare Pipeline Metrics')

3/ Lambda function with Python using Redshift Data API

Lambda function with Python using Redshift Data API

Here is how using an AWS Lambda job written in python works:

  1. Customers processed data lands in Amazon Redshift
  2. Lambda connects to redshift using Amazon Redshift data api
  3. Utilizing AWS Lambda to get validation metrics from Amazon Redshift (sample below)
  4. Sending validated to Amazon CloudWatch
  5. Metric is landed in Amazon CloudWatch which can be used to trigger alerts
import os
import json
import boto3
import time
import botocore 
import botocore.session as bc
from botocore.client import Config
 
print('Loading function')

secret_name=os.environ['SecretId'] # getting SecretId from Environment varibales
session = boto3.session.Session()
region = session.region_name

# Initializing Secret Manager's client    
client = session.client(
    service_name='secretsmanager',
        region_name=region
    )

get_secret_value_response = client.get_secret_value(
        SecretId=secret_name
    )
secret_arn=get_secret_value_response['ARN']

secret = get_secret_value_response['SecretString']

secret_json = json.loads(secret)

cluster_id=secret_json['dbClusterIdentifier']

# Initializing Botocore client
bc_session = bc.get_session()

session = boto3.Session(
        botocore_session=bc_session,
        region_name=region
    )

# Initializing Redshift's client   
config = Config(connect_timeout=5, read_timeout=5)
client_redshift = session.client("redshift-data", config = config)

# Initializing Cloudwatch client
client_cloudWatch = boto3.client('cloudwatch')


def lambda_handler(event, context):
    print("Entered lambda_handler")

    query_str = "SELECT COUNT(DISTINCT provider_id) as unique_provider_count FROM medicare;"
    try:
        result = client_redshift.execute_statement(Database= 'dev', Sql= query_str, ClusterIdentifier= cluster_id, DbUser='[dbusername]')
        print(result['Id'])
        resulted = result['Id']
        resultsed = client_redshift.describe_statement(Id=resulted)
        time.sleep(10)
        response_rslt = client_redshift.get_statement_result(Id=resulted)
        final_record = response_rslt['Records'][0][0]['longValue']
        print(final_record)
        
    except botocore.exceptions.ConnectionError as e:
        client_redshift_1 = session.client("redshift-data", config = config)
        result = client_redshift_1.execute_statement(Database= 'dev', SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
        print("API executed after reestablishing the connection")
        return str(result)
        
    except Exception as e:
        raise Exception(e)
        
    cloudwatch_response = client_cloudWatch.put_metric_data(
        MetricData = [{'MetricName': 'Count', 'Dimensions':[{'Name': 'Lambda Job', 'Value': 'Number of Unique Providers Processed'},],'Unit': 'Count', 
        'Value':final_record},],Namespace='Medicare Pipeline Metrics')
        
    return str(result)

4/ Step Functions with Metrics

Step Functions with Metrics

  1. Customers processed data lands in Amazon Redshift
  2. Lambda connects to redshift using Amazon Redshift data api
  3. Utilizing AWS Lambda to get validation metrics from Amazon Redshift (sample below)
  4. Sending validated to Amazon CloudWatch
  5. Metric is landed in Amazon CloudWatch which can be used to trigger alerts

Step Functions with Metrics 2

{
  "Comment": "A description of my state machine",
  "StartAt": "Run Count Distinct Provider",
  "States": {
    "Run Count Distinct Provider": {
      "Type": "Task",
      "Parameters": {
        "WorkgroupName": "workgroup-medicare",
        "Database": "dev",
        "SecretArn": "arn:aws:secretsmanager:us-east-1:[awsaccountnumber]:secret:redshift!namespace-medicare-admin-Fo7u1g",
        "Sql": "SELECT COUNT(DISTINCT provider_id) FROM public.medicare;"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
      "Next": "Wait"
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 5,
      "Next": "DescribeStatement"
    },
    "DescribeStatement": {
      "Type": "Task",
      "Next": "GetStatementResult",
      "Parameters": {
        "Id.$": "$.Id"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement"
    },
    "GetStatementResult": {
      "Type": "Task",
      "Parameters": {
        "Id.$": "$.Id"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:getStatementResult",
      "Next": "PutMetricData",
      "ResultPath": "$.countdistinctproviders"
    },
    "PutMetricData": {
      "Type": "Task",
      "Next": "Pass",
      "Parameters": {
        "MetricData": [
          {
            "MetricName": "Count",
            "Value.$": "States.ArrayGetItem($.countdistinctproviders.Records...LongValue,0)",
            "Unit": "Count",
            "Dimensions": [
              {
                "Name": "Redshift Transform Job",
                "Value": "Unique Providers Processed"
              }
            ]
          },
          {
            "MetricName": "Nanoseconds",
            "Value.$": "$.Duration",
            "Unit": "Count",
            "Dimensions": [
              {
                "Name": "Redshift Transform Job",
                "Value": "Unique Providers Query Length"
              }
            ]
          }
        ],
        "Namespace": "Medicare Pipeline Metrics"
      },
      "Resource": "arn:aws:states:::aws-sdk:cloudwatch:putMetricData",
      "ResultPath": "$"
    },
    "Pass": {
      "Type": "Pass",
      "End": true
    }
  }
}

Conclusion

Instrumenting and monitoring are essential for ensuring the health and reliability of any data pipeline. By gaining visibility into the state of data throughout the pipeline, engineers can proactively identify and address issues before they escalate. In this blog post, we demonstrated several techniques for achieving this level of observability:

  • Using AWS Glue with Spark SQL and PySpark to process and analyze data in the pipeline
  • Leveraging AWS Lambda functions and the Redshift Data API to send processed data metrics to Amazon CloudWatch
  • Employing AWS Step Functions to orchestrate the flow of data and pipeline monitoring

By implementing these monitoring and instrumentation solutions, engineers can maintain a clear, real-time understanding of their data pipeline's performance. This empowers them to quickly detect and resolve problems, minimizing the impact on downstream data consumers and ensuring the overall integrity and reliability of the pipeline.

About the Authors

Matt Nispel

Matt Nispel

Matt Nispel is an Enterprise Solutions Architect at AWS. He has more than 10 years of experience building cloud architectures for large enterprise companies. At AWS, Matt helps customers rearchitect their applications to take full advantage of the cloud. Matt lives in Minneapolis, Minnesota, and in his free time enjoys spending time with friends and family

Seun Akinyosoye

Seun Akinyosoye

Seun Akinyosoye is a Sr. Technical Account Manager supporting public sector customers at Amazon Web Services. Seun has a background in analytics and data engineering, which he uses to help customers achieve their outcomes and goals. Outside of work, Seun enjoys spending time with his family, reading, traveling, and supporting his favorite sports teams.