Wie stelle ich mithilfe von Spark in meinem EMR-Cluster eine Verbindung zu einem Redshift-Cluster her?

Lesedauer: 4 Minute
0

Ich möchte einen Amazon-Redshift-Cluster mithilfe von Apache Spark in meinem Amazon-EMR-Cluster verbinden.

Lösung

Hinweis: Konfigurieren Sie Ihren Redshift-Cluster und EMR-Cluster und installieren Sie den Spark-Service, bevor Sie mit den folgenden Schritten fortfahren.

Testen Sie die Konnektivität vom EMR-Cluster zum Redshift-Cluster

1.    Stellen Sie sicher, dass die Primär-, Core- und Aufgabenknoten-Sicherheitsgruppen von EMR in der Sicherheitsgruppe von Redshift (Regeln für eingehenden Datenverkehr) für den TCP-Port 5439 zulässig sind. Wenn die EMR- und Redshift-Cluster in zwei verschiedenen Amazon Virtual Private Clouds (Amazon VPC) bereitgestellt werden, konfigurieren Sie das VPC-Peering.

2.    Stellen Sie mithilfe von SSH eine Verbindung zum primären EMR-Knoten her und führen Sie den folgenden Telnet-Befehl aus. Dieser Telnet-Befehl überprüft, ob Sie eine Verbindung zwischen dem EMR-Cluster und dem Redshift-Cluster herstellen können. Ersetzen Sie im folgenden Befehl Redshift_Endpoint durch den richtigen Endpunkt für Ihren Redshift-Cluster.

telnet Redshift_Endpoint 5439

Das Folgende ist eine Beispielausgabe für eine erfolgreiche Verbindung:

telnet redshift-cluster-1.XXXX.us-east-1.redshift.amazonaws.com 5439
Trying 172.31.48.21...
Connected to redshift-cluster-1.XXXXX.us-east-1.redshift.amazonaws.com.
Escape character is

Stellen Sie mithilfe von Spark in Clustern der EMR-5.x.x-Serie eine Verbindung zum Redshift-Cluster her

Verwenden Sie das Spark-Redshift-Paket (Bibliothek) von Databrick. Diese Bibliothek lädt Daten von Amazon Redshift in Spark SQL DataFrames und speichert DataFrames auch wieder in Amazon-Redshift-Tabellen.

1.    Stellen Sie mithilfe von SSH eine Verbindung zum EMR-Primärknoten her.

2.    Um mit der Spark-Redshift-Bibliothek zu arbeiten, laden Sie die folgenden **.jar-**Dateien in den EMR-Cluster herunter:

wget https://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    Kopieren Sie die heruntergeladenen JAR-Dateien in die Standard-Spark-Bibliothek. Der Pfad der Spark-Bibliothek ist /usr/lib/spark/jars/.

sudo cp spark-redshift_2.11-2.0.1.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    Führen Sie den Befehl spark-shell mit dem Amazon Redshift JDBC-Treiber aus, um eine Verbindung zum Redshift-Cluster herzustellen. Der JDBC-Treiber ist in den Amazon-EMR-Versionen 4.7.0 und höher enthalten.

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    Führen Sie nach der Initialisierung der Spark-Shell-Sitzung die folgenden Schritte aus, um eine Verbindung zum Redshift-Cluster herzustellen. Aktualisieren Sie in den folgenden Befehlen den Amazon-Redshift-Endpunkt, den Bucket-Namen von Amazon Simple Storage Service (Amazon S3) und die Tabellendetails entsprechend Ihrem Anwendungsfall.

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("com.databricks.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://<S3-path-to-store-temp-data>").option("query", "select * from <table-name>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

Stellen Sie mithilfe von Spark in Clustern der Amazon-EMR-6.x.x-Serie eine Verbindung zum Redshift-Cluster her

Amazon-EMR-Versionen 6.x und höher verwenden Scala-Version 2.12. Amazon EMR 5.x verwendet Scala-Version 2.11. Die Datei spark-redshift_2.11-2.0.1.jar, die die Amazon-EMR-5.x-Version verwendet, ist nicht mit Amazon-EMR-Version 6.x und höher kompatibel. Verwenden Sie also den spark-redshift_2.12-4.2.0.jar-Connector in Amazon-EMR-6.x- und höheren Clustern.

1.    Stellen Sie mithilfe von SSH eine Verbindung zum EMR-Primärknoten her.

2.    Um mit der Spark-Redshift-Bibliothek zu arbeiten, laden Sie die folgenden **.jar-**Dateien in den EMR-Cluster herunter:

wget https://repo1.maven.org/maven2/io/github/spark-redshift-community/spark-redshift_2.12/4.2.0/spark-redshift_2.12-4.2.0.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    Kopieren Sie die heruntergeladenen JAR-Dateien in die Standard-Spark-Bibliothek. Der Pfad der Spark-Bibliothek ist /usr/lib/spark/jars/.

sudo cp spark-redshift_2.12-4.2.0.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    Führen Sie den Befehl spark-shell mit dem Amazon-Redshift-JDBC-Treiber aus, um eine Verbindung zum Redshift-Cluster herzustellen. Der JDBC-Treiber ist in den EMR-Versionen 4.7.0 und höher enthalten.

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    Führen Sie nach der Initialisierung der Spark-Shell-Sitzung die folgenden Schritte aus, um eine Verbindung zum Redshift-Cluster herzustellen. Aktualisieren Sie in den folgenden Befehlen den Amazon-Redshift-Endpunkt, den S3-Bucket-Namen und die Tabellendetails entsprechend Ihrem Anwendungsfall.

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("io.github.spark_redshift_community.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://bucket/tmp/").option("query", "select * from <table>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

AWS OFFICIAL
AWS OFFICIALAktualisiert vor einem Jahr