Lowering S3 KMS Decrypt API Costs in Amazon EMR Spark Jobs
Running Spark on EMR with KMS-encrypted S3 data? Every object read triggers a kms:Decrypt API call — and at scale, those costs add up fast. If your compliance requirements prevent switching to S3 Bucket Keys, you need smarter optimizations at the data and compute layer.
This article covers three practical techniques to cut KMS decrypt costs without changing your encryption strategy:
Introduction
Modern organizations rely on Amazon EMR and Apache Spark for processing vast amounts of transaction data stored in Amazon S3. To maintain data security, this data is often encrypted using AWS Key Management Service (KMS). While encryption is essential, frequent KMS decrypt API calls can significantly increase costs as data volumes scale.
This article introduces practical techniques to reduce KMS decrypt costs while maintaining data security, emphasizing file format optimization (e.g., OTF for Iceberg) and Glue Data Catalog partition index usage.
Challenges with Frequent KMS Decrypt API Calls
Consider a retail organization that processes hundreds of terabytes of customer transaction data daily. Each Spark task accessing encrypted objects triggers a KMS decrypt API call, leading to significant cost increases. For workloads that require key auditability and cannot switch to S3 bucket keys, exploring data and EMR-level optimizations becomes essential.
Optimization Techniques
1. Leverage Data Aggregation
Data aggregation reduces redundant KMS decrypt API calls by consolidating smaller files into larger blocks — combining multiple small files into fewer, larger files using Spark.
By using Amazon CloudTrail to monitor changes in API call frequency, you can validate the effectiveness of data aggregation in reducing costs. This technique is particularly effective for read-heavy workloads that involve numerous small files stored on S3.
Step 1: Benchmark Baseline Performance
Before applying optimizations, establish baseline metrics to quantify improvements.
from pyspark.sql import SparkSession import time spark = SparkSession.builder.appName("Baseline Job").getOrCreate() start_time = time.time() data = spark.read.format("csv").load("s3://emr-kms-demo/data/") end_time = time.time() load_time = end_time - start_time print(f"Load time: {load_time:.2f} seconds") data_count = data.count() print(f"KMS calls triggered: {data_count} rows processed")
Step 2: Aggregate Files Using Spark
Consolidating smaller files into fewer, larger files stored in S3 minimizes redundant decrypt API calls.
consolidated_data = data.coalesce(10) consolidated_data.write.mode("overwrite").parquet("s3://emr-kms-demo/optimized-data/")
Step 3: Rerun the Job with Optimized Files
Run the optimized workload and compare numbers.
optimized_data = spark.read.parquet("s3://emr-kms-demo/optimized-data/") optimized_data.count()
CloudTrail Metrics Comparison
Track the number of API calls and observe the direct impact of data aggregation on reducing KMS decrypt API calls for the same amount of data. Below is a comparison:
This reduction is achieved by minimizing the frequency with which small files require decryption along with streamlining the load time.
2. Optimize File Formats and Compression
Selecting efficient file formats and applying compression minimizes the amount of data read from S3 and the number of KMS decrypt operations.
Efficient file formatting and compression are crucial when working with data stored in S3 using PySpark. These techniques reduce the volume of data read from S3 and the number of encryption or decryption operations performed by AWS Key Management Service, resulting in faster processing and lower costs.
File Format Optimization
Columnar Formats (Parquet/ORC): Columnar file formats like Parquet and ORC enable Spark to read only the required columns for analysis, dramatically improving performance for analytical queries.
df = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("s3://emr-kms-demo/data/") # Set compression for Parquet files spark.conf.set("spark.sql.parquet.compression.codec", "snappy") df.write.format("parquet").save("s3://emr-kms-demo/parquet-data/")
Iceberg Format: Apache Iceberg is a modern table format designed for large-scale analytic datasets. It supports schema evolution, snapshot isolation, and time travel, making it an excellent choice for data lakes on S3.
pyspark \ --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hadoop \ --conf spark.sql.catalog.spark_catalog.warehouse=s3://emr-kms-demo/iceberg-warehouse \ --conf spark.sql.defaultCatalog=spark_catalog
df = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("s3://emr-kms-demo/data/") spark.conf.set("spark.sql.parquet.compression.codec", "snappy") # Write data to Iceberg table df.writeTo("iceberg_from_emr_data").using("iceberg").create() # Read from Iceberg table spark.read.table("iceberg_from_emr_data").show()
Compression
Snappy is the recommended codec for Parquet and Iceberg due to its balance between speed and compression ratio.
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
Compression not only minimizes I/O and network overhead but also accelerates job execution in distributed Spark environments.
CloudWatch Comparison on API Calls
The table below illustrates the reduction in KMS decrypt calls when moving from raw, uncompressed CSV data to optimized Parquet files with compression enabled.
3. Glue Data Catalog Partition Index
Partitioning data helps Spark jobs retrieve subsets of relevant data, reducing scan ranges and decrypt operations. Leveraging Glue Data Catalog's partition indexes not only reduces scanning overhead but also significantly reduces the number of KMS API calls.
Without a partition index, when Spark queries a partitioned table, Glue Data Catalog returns all partitions via GetPartitions API. Spark then lists S3 objects for each partition — each object read triggers a kms:Decrypt call. With a partition index, Glue does server-side partition filtering, returning only matching partitions — fewer S3 objects to read — fewer Decrypt calls.
Step 1: Baseline — Query WITHOUT partition index
spark.sql("SELECT * FROM default.`kms-demoevents` WHERE year='2000' AND month='04'").count()
Then check CloudTrail for kms:Decrypt call count.
Step 2: Add partition index and query WITH partition index
In the AWS Management Console or via AWS CLI, create partition indexes for year and month. Then check CloudTrail for kms:Decrypt call count now.
Conclusion
Optimizing EMR Spark jobs ensures cost-effective and performant processing of encrypted data at scale. Techniques like file aggregation, Iceberg OTF and partition indexing offer tangible benefits in reducing operational costs. Start implementing these strategies today to transform your Spark workload efficiency and achieve significant cost savings.
- Language
- English
Relevant content
- Accepted Answerasked 4 years ago
AWS OFFICIALUpdated 2 years ago
AWS OFFICIALUpdated a month ago