Querying Delta tables from SQL Explorer in EMR Workspace

7 minute read
Content level: Advanced
3

This article offers instructions on how to set up and access Delta tables from SQL Explorer in EMR JupyterHub. SQL Explorer utilizes the Presto engine configured within the EMR cluster to process data. To access Delta tables, this setup requires integrating Presto with Delta Lake.

Essentially, this article involves creating an EMR cluster configured with Presto, and then creating a Delta table using EMR Spark. Next, you'll attach the cluster to an EMR workspace or notebook and integrate Delta with Presto by making some configuration changes. These changes are essential to access the Delta table from the SQL Explorer.

Steps to configure EMR cluster

  1. Spin up an EMR cluster with the following applications installed: Presto, Hive, and JupyterEnterpriseGateway.
  2. Configure an external Hive metastore using Amazon RDS, following the example provided.
  3. Enable the delta table configuration on delta-defaults.
  4. Please ensure that your EMR cluster's version is greater than 6.4.0 and no in-transit encryption enabled as EMR Studio SQL Explorer does not support Presto clusters that have been configured with in-transit encryption enabled that affects the Presto already running in TLS mode.
aws emr create-cluster \
 --name "presto-sql-explorer-external-HMS" \
 --log-uri "s3://<YOUR-S3-BUCKET>/elasticmapreduce" \
 --release-label "emr-7.1.0" \
 --service-role "arn:aws:iam::xxxxxxxxxxxxx:role/EMR_DefaultRole" \
 --unhealthy-node-replacement \
 --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","EmrManagedMasterSecurityGroup":"sg-xxxxxxxxx","EmrManagedSlaveSecurityGroup":"sg-xxxxxxxxxx","KeyName":"testemr","AdditionalMasterSecurityGroups":[],"AdditionalSlaveSecurityGroups":[],"SubnetId":"subnet-xxxxxxxxxxxxxxx"}' \
 --applications Name=Hive Name=JupyterEnterpriseGateway Name=Livy Name=Presto Name=Spark \
 --configurations '[{"Classification":"hive-site","Properties":{"javax.jdo.option.ConnectionDriverName":"org.mariadb.jdbc.Driver","javax.jdo.option.ConnectionPassword":"********","javax.jdo.option.ConnectionURL":"jdbc:mysql://database-1.xxxxxxxxxxxxxxxxxx.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true","javax.jdo.option.ConnectionUserName":"admin"}},{"Classification":"delta-defaults","Properties":{"delta.enabled":"true"}}]' \
 --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","Name":"Primary","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]},"Configurations":[{"Classification":"delta-defaults","Properties":{"delta.enabled":"true"}}]},{"InstanceCount":1,"InstanceGroupType":"CORE","Name":"Core","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]},"Configurations":[{"Classification":"delta-defaults","Properties":{"delta.enabled":"true"}}]}]' \
 --scale-down-behavior "TERMINATE_AT_TASK_COMPLETION" \
 --auto-termination-policy '{"IdleTimeout":3600}' \
 --region "us-east-1"

Configure Delta Table using Spark

  1. Once the above EMR cluster is available, connect to the primary node and open the pyspark shell using below Delta configurations,
pyspark   --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
   --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
  1. Create the database with S3 location. The given s3 path should be accessible to the EMR service role where the pyspark application opened.
spark.sql("""CREATE DATABASE delta_db LOCATION 's3://<YOUR-S3-BUCKET>/delta_db/'""")
spark.sql("show databases;").show()
+---------+
|namespace|
+---------+
|  default|
| delta_db|
+---------+
  1. Create the delta table as per below sample, Make sure to include symLinkFormat which will generate the manifest file that includes the parquet files created as part of this table.
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_db.delta_table (
id bigint,
creation_date date,
last_update_time varchar(100))
USING delta location
's3://<YOUR-S3-BUCKET>/delta_db/delta_table'
TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)""");
  1. Insert some records as shown below, and verify they are committed,
spark.sql("""INSERT INTO delta_db.delta_table values ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z")""");
spark.sql("""INSERT INTO delta_db.delta_table values ("101", "2016-01-01", "2016-01-01T13:51:39.340396Z")""");

spark.sql("select * from delta_db.delta_table").show()
+---+-------------+--------------------+                                        
| id|creation_date|    last_update_time|
+---+-------------+--------------------+
|100|   2015-01-01|2015-01-01T13:51:...|
|101|   2016-01-01|2016-01-01T13:51:...|
+---+-------------+--------------------+

Collaborate with EMR Workspace

  1. Launch EMR Studio and start the workspace. Configure the workspace if not done already.
  2. Attach the EMR cluster as shown below, Please do not select the Runtime role that will not work when SQL explorer is launched.

Enter image description here

If the runtime role attached, It will throw exception like SQL Explorer is not enabled when attached to a runtime role enabled EMR on EC2 cluster.

  1. Connect with SQL Explorer and browse the table that just created. This will throw an Internal Server Error as shown below.

Enter image description here

  1. You can also connect to the Primary instance of EMR cluster and refer to /var/log/presto/server.log presto server log file which provides the actual exception as mentioned below,
2024-07-09T11:35:35.201Z        ERROR   SplitRunner-4-121       com.facebook.presto.execution.executor.TaskExecutor     Error processing Split 20240709_113534_00004_au39i.2.0.0.0-0 InformationSchemaSplit{tableHandle=hive:information_schema:columns, prefixes=[hive.delta_db.delta_table], addresses=[172.31.2.142:8889]} (start = 3836825.505599, wall = 12 ms, cpu = 0 ms, wait = 0 ms, calls = 1): HIVE_UNSUPPORTED_FORMAT: Not a Hive table 'delta_db.delta_table'
2024-07-09T11:35:35.211Z        ERROR   remote-task-callback-13 com.facebook.presto.execution.StageExecutionStateMachine        Stage execution 20240709_113534_00004_au39i.2.0 failed
com.facebook.presto.spi.PrestoException: Not a Hive table 'delta_db.delta_table'
        at com.facebook.presto.hive.HiveMetadata.getTableMetadata(HiveMetadata.java:641)
        at com.facebook.presto.hive.HiveMetadata.getTableMetadata(HiveMetadata.java:631)
        at com.facebook.presto.hive.HiveMetadata.listTableColumns(HiveMetadata.java:822)
  1. To further test this exception(optional), connect to presto-cli and run the query which will also give the same error. Essentially it means the given table is not known by Presto. Please go ahead next section to integrate the delta table with Presto.
presto> select * from hive.delta_db.delta_table;
Query 20240709_114029_00008_au39i failed: Not a Hive table 'delta_db.delta_table'

Integrate Delta with Presto

  1. Since this cluster is created with external Hive metastore and Presto enabled during the cluster configuration, hive catalog set with below properties by default The Hive.properties file located at /etc/presto/conf/catalog. Here, we make sure connector.name set to hive-hadoop2 and hive.metastore.uri set to thrift://<PrimaryNodeDNS>:9083. If the metastore configured with Glue data catalog, then hive.metastore set to glue. However this feature does not work with querying delta table.
hive.metastore-refresh-interval=1m
connector.name=hive-hadoop2
hive.metastore-cache-ttl=20m
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
hive.non-managed-table-writes-enabled = true
hive.s3-file-system-type = PRESTO
hive.hdfs.impersonation.enabled = true
hive.metastore.uri = thrift://ip-172-31-2-142.ec2.internal:9083
  1. Change hive.s3-file-system-type to PRESTO as this will be pointing to EMRFS by default. For querying Delta table, it should be a presto type.
  2. Restart the presto service and make sure they are running,
sudo systemctl restart presto-server.service
sudo systemctl status presto-server.service 
  1. To allow Presto to understand the table's metadata, the Delta table needs to be registered with the Hive metastore. Presto utilizes the Hive connector to read Delta tables. Therefore, create an external table with the same definition mentioned previously and use the SymlinkTextInputFormat as the input format. Point the S3 location to the delta_table created from Spark, specifically referencing the manifest file located at _symlink_format_manifest.
CREATE EXTERNAL TABLE IF NOT EXISTS delta_db.delta_table ( 
id bigint, creation_date date, last_update_time varchar(100)
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<YOUR-S3-BUCKET>/delta_db/delta_table/_symlink_format_manifest/';
  1. Query the delta table using presto-cli and SQL Explorer editor that will connect hive catalog and navigate to specific table to query the data.
presto> desc hive.delta_db.delta_table;
      Column      |     Type     | Extra | Comment 
------------------+--------------+-------+---------
 id               | bigint       |       |         
 creation_date    | date         |       |         
 last_update_time | varchar(100) |       |         
(3 rows)

Query 20240709_121623_00019_vpaxz, FINISHED, 2 nodes
Splits: 19 total, 19 done (100.00%)
[Latency: client-side: 240ms, server-side: 157ms] [3 rows, 227B] [19 rows/s, 1.41KB/s]

presto> select * from hive.delta_db.delta_table;
 id  | creation_date |      last_update_time       
-----+---------------+-----------------------------
 101 | 2016-01-01    | 2016-01-01T13:51:39.340396Z 
 100 | 2015-01-01    | 2015-01-01T13:51:39.340396Z 
(2 rows)

Query 20240709_121625_00020_vpaxz, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
[Latency: client-side: 0:01, server-side: 0:01] [2 rows, 2.57KB] [2 rows/s, 3.11KB/s]

Enter image description here

In this approach, we integrated Delta with Presto by utilizing the manifest file method. If you already have existing delta tables and need to generate manifests or alter tables, refer to the external document for configuration guidance.

delta connector

Alternatively, you can also access Delta tables in Presto using the Delta connector, as detailed here. In the delta connector approach, we have to create delta catalog file(locate at /etc/presto/conf/catalog/delta.properties)with minimum below properties in all nodes and restart the presto server service. You can configure post Bootstrap action script to configure this setting instead of manually changing them.

connector.name=delta
hive.metastore.uri=thrift://<PrimaryNodeDNS>:9083

Register the table using below definition. Please note that the external table location points to delta_table directory.

CREATE EXTERNAL TABLE delta_db.delta_table (
id bigint, creation_date date, last_update_time varchar(100)
)
LOCATION 's3://<YOUR-S3-BUCKET>/delta_db/delta_table/';

You can access the delta table using presto-cli or SQL explorer. Use the format like <delta_catalog>.<database>.<table>. This make sure to use delta catalog and refer the relevant schema where the delta_table belongs to.

presto-cli 
presto> select * from  delta.delta_db.delta_table;
 id  | creation_date |      last_update_time       
-----+---------------+-----------------------------
 100 | 2015-01-01    | 2015-01-01T13:51:39.340396Z 
 101 | 2016-01-01    | 2016-01-01T13:51:39.340396Z 
(2 rows)

Query 20240709_132430_00014_46bb3, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
[Latency: client-side: 0:17, server-side: 0:17] [2 rows, 2.57KB] [0 rows/s, 151B/s]

Enter image description here

AWS
SUPPORT ENGINEER
published 9 days ago863 views