Skip to content

Lowering S3 KMS Decrypt API Costs in Amazon EMR Spark Jobs

5 minute read
Content level: Intermediate
0

Running Spark on EMR with KMS-encrypted S3 data? Every object read triggers a kms:Decrypt API call — and at scale, those costs add up fast. If your compliance requirements prevent switching to S3 Bucket Keys, you need smarter optimizations at the data and compute layer.

This article covers three practical techniques to cut KMS decrypt costs without changing your encryption strategy:

Introduction

Modern organizations rely on Amazon EMR and Apache Spark for processing vast amounts of transaction data stored in Amazon S3. To maintain data security, this data is often encrypted using AWS Key Management Service (KMS). While encryption is essential, frequent KMS decrypt API calls can significantly increase costs as data volumes scale.

This article introduces practical techniques to reduce KMS decrypt costs while maintaining data security, emphasizing file format optimization (e.g., OTF for Iceberg) and Glue Data Catalog partition index usage.


Challenges with Frequent KMS Decrypt API Calls

Consider a retail organization that processes hundreds of terabytes of customer transaction data daily. Each Spark task accessing encrypted objects triggers a KMS decrypt API call, leading to significant cost increases. For workloads that require key auditability and cannot switch to S3 bucket keys, exploring data and EMR-level optimizations becomes essential.


Optimization Techniques

1. Leverage Data Aggregation

Data aggregation reduces redundant KMS decrypt API calls by consolidating smaller files into larger blocks — combining multiple small files into fewer, larger files using Spark.

By using Amazon CloudTrail to monitor changes in API call frequency, you can validate the effectiveness of data aggregation in reducing costs. This technique is particularly effective for read-heavy workloads that involve numerous small files stored on S3.

Step 1: Benchmark Baseline Performance

Before applying optimizations, establish baseline metrics to quantify improvements.

from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName("Baseline Job").getOrCreate()
start_time = time.time()
data = spark.read.format("csv").load("s3://emr-kms-demo/data/")
end_time = time.time()
load_time = end_time - start_time
print(f"Load time: {load_time:.2f} seconds")
data_count = data.count()
print(f"KMS calls triggered: {data_count} rows processed")

Monitor the number of KMS Decrypt API calls via CloudTrail

Step 2: Aggregate Files Using Spark

Consolidating smaller files into fewer, larger files stored in S3 minimizes redundant decrypt API calls.

consolidated_data = data.coalesce(10)
consolidated_data.write.mode("overwrite").parquet("s3://emr-kms-demo/optimized-data/")

Step 3: Rerun the Job with Optimized Files

Run the optimized workload and compare numbers.

optimized_data = spark.read.parquet("s3://emr-kms-demo/optimized-data/")
optimized_data.count()

Cloudtrail Decrypt Calls with Optimized Files

CloudTrail Metrics Comparison

Track the number of API calls and observe the direct impact of data aggregation on reducing KMS decrypt API calls for the same amount of data. Below is a comparison:

CloudTrail Metrics Comparison

This reduction is achieved by minimizing the frequency with which small files require decryption along with streamlining the load time.


2. Optimize File Formats and Compression

Selecting efficient file formats and applying compression minimizes the amount of data read from S3 and the number of KMS decrypt operations.

Efficient file formatting and compression are crucial when working with data stored in S3 using PySpark. These techniques reduce the volume of data read from S3 and the number of encryption or decryption operations performed by AWS Key Management Service, resulting in faster processing and lower costs.

File Format Optimization

Columnar Formats (Parquet/ORC): Columnar file formats like Parquet and ORC enable Spark to read only the required columns for analysis, dramatically improving performance for analytical queries.

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("s3://emr-kms-demo/data/")

# Set compression for Parquet files
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

df.write.format("parquet").save("s3://emr-kms-demo/parquet-data/")

Iceberg Format: Apache Iceberg is a modern table format designed for large-scale analytic datasets. It supports schema evolution, snapshot isolation, and time travel, making it an excellent choice for data lakes on S3.

pyspark \
  --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2 \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
  --conf spark.sql.catalog.spark_catalog.type=hadoop \
  --conf spark.sql.catalog.spark_catalog.warehouse=s3://emr-kms-demo/iceberg-warehouse \
  --conf spark.sql.defaultCatalog=spark_catalog
df = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load("s3://emr-kms-demo/data/")

spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

# Write data to Iceberg table
df.writeTo("iceberg_from_emr_data").using("iceberg").create()

# Read from Iceberg table
spark.read.table("iceberg_from_emr_data").show()

Compression

Snappy is the recommended codec for Parquet and Iceberg due to its balance between speed and compression ratio.

spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

Compression not only minimizes I/O and network overhead but also accelerates job execution in distributed Spark environments.

CloudWatch Comparison on API Calls

CloudWatch Comparison on API calls

The table below illustrates the reduction in KMS decrypt calls when moving from raw, uncompressed CSV data to optimized Parquet files with compression enabled.

KMS decrypt calls when moving from raw to optimized Parquet files

3. Glue Data Catalog Partition Index

Partitioning data helps Spark jobs retrieve subsets of relevant data, reducing scan ranges and decrypt operations. Leveraging Glue Data Catalog's partition indexes not only reduces scanning overhead but also significantly reduces the number of KMS API calls.

Without a partition index, when Spark queries a partitioned table, Glue Data Catalog returns all partitions via GetPartitions API. Spark then lists S3 objects for each partition — each object read triggers a kms:Decrypt call. With a partition index, Glue does server-side partition filtering, returning only matching partitions — fewer S3 objects to read — fewer Decrypt calls.

Step 1: Baseline — Query WITHOUT partition index

spark.sql("SELECT * FROM default.`kms-demoevents` WHERE year='2000' AND month='04'").count()

Then check CloudTrail for kms:Decrypt call count.

CloudTrail Decrypt call WITHOUT partition index using EMR Spark job

Step 2: Add partition index and query WITH partition index

In the AWS Management Console or via AWS CLI, create partition indexes for year and month. Then check CloudTrail for kms:Decrypt call count now.

CloudTrail Decrypt call WITH partition index using EMR Spark job

Conclusion

Optimizing EMR Spark jobs ensures cost-effective and performant processing of encrypted data at scale. Techniques like file aggregation, Iceberg OTF and partition indexing offer tangible benefits in reducing operational costs. Start implementing these strategies today to transform your Spark workload efficiency and achieve significant cost savings.