如何使用 EMR 叢集中的 Spark 連線至 Redshift 叢集?

3 分的閱讀內容
0

我想在我的 Amazon EMR 叢集中使用 Apache Spark 連接 Amazon Redshift 叢集。

解決方案

**注意:**請先設定您的 Redshift 叢集和 EMR 叢集並安裝 Spark 服務,再繼續執行下列步驟。

測試從 EMR 叢集到 Redshift 叢集的連線能力

1.    確認針對 TCP 連接埠 5439 的 Redshift 安全性群組 (輸入規則) 允許 EMR 主要、核心和任務節點安全群組。如果 EMR 和 Redshift 叢集部署在兩個不同的 Amazon Virtual Private Clouds (Amazon VPC) 中,請設定 VPC 對等互連。

2.    使用 SSH 連線至 EMR 主節點,並執行下列 Telnet 命令。這個 Telnet 命令會驗證您是否可以在 EMR 叢集和 Redshift 叢集之間建立連線。在下列命令中,將 Redshift_Endpoint 替換為您 Redshift 叢集的正確端點。

telnet Redshift_Endpoint 5439

以下是成功連線的輸出範例:

telnet redshift-cluster-1.XXXX.us-east-1.redshift.amazonaws.com 5439
Trying 172.31.48.21...
Connected to redshift-cluster-1.XXXXX.us-east-1.redshift.amazonaws.com.
Escape character is

使用 EMR-5.x.x 系列叢集中的 Spark 連線至 Redshift 叢集

使用 Databrick 的 spark-redshift 套件 (程式庫)。此程式庫會將資料從 Amazon Redshift 載入 Spark SQL DataFrames,並同時將 DataFrames 存回 Amazon Redshift 資料表。

1.    使用 SSH 連線至 EMR 主節點

2.    若要使用 spark-redshift 程式庫,請將下列 .jar 檔案下載至 EMR 叢集:

wget https://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    將下載的 JAR 檔案複製到預設的 Spark 程式庫。Spark 程式庫的路徑是 /usr/lib/spark/jars/

sudo cp spark-redshift_2.11-2.0.1.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    使用 Amazon Redshift JDBC 驅動程式執行 spark-shell 命令,以連線到 Redshift 叢集。JDBC 驅動程式包含在 Amazon EMR 4.7.0 及更新版本中。

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    spark-shell 工作階段初始化完成後,請執行下列步驟以連線至 Redshift 叢集。在以下命令中,根據您的使用案例更新 Amazon Redshift 端點、Amazon Simple Storage Service (Amazon S3) 儲存貯體名稱和資料表詳細資料。

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("com.databricks.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://<S3-path-to-store-temp-data>").option("query", "select * from <table-name>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

使用 Amazon EMR-6.x.x 系列叢集中的 Spark 連接至 Redshift 叢集

Amazon EMR 6.x 及更新版本使用 Scala 版本 2.12。Amazon EMR 5.x 使用 Scala 版本 2.11。Amazon EMR 5.x 版本使用的 spark-redshift_2.11-2.0.1.jar 檔案與 Amazon EMR 6.x 及更新版本不相容。因此,請使用 Amazon EMR 6.x 及更新版本叢集中的 spark-redshift_2.12-4.2.0.jar 連接器

1.    使用 SSH 連線至 EMR 主節點

2.    若要使用 spark-redshift 程式庫,請將下列 .jar 檔案下載至 EMR 叢集:

wget https://repo1.maven.org/maven2/io/github/spark-redshift-community/spark-redshift_2.12/4.2.0/spark-redshift_2.12-4.2.0.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    將下載的 JAR 檔案複製到預設的 Spark 程式庫。Spark 程式庫的路徑是 /usr/lib/spark/jars/

sudo cp spark-redshift_2.12-4.2.0.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    使用 Amazon Redshift JDBC 驅動程式執行 spark-shell 命令,以連線到 Redshift 叢集。JDBC 驅動程式包含在 EMR 4.7.0 及更新版本中。

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    spark-shell 工作階段初始化完成後,請執行下列步驟以連線至 Redshift 叢集。在以下命令中,根據您的使用案例更新 Amazon Redshift 端點、S3 儲存貯體名稱和資料表詳細資料。

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("io.github.spark_redshift_community.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://bucket/tmp/").option("query", "select * from <table>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

AWS 官方
AWS 官方已更新 1 年前