Spark job fails in AWS Glue | "An error occurred while calling o86.getSink. The connection attempt failed."

1

I attempted to migrate data from a csv file from an S3 storage to a table in my Redshift cluster. I took reference from an autogenerated code which came after I built blocks using Visual mode in AWS Glue. This job ran successfully.

I copied that code and changed basic details such as variable and table name in the Spark script I wanted to run under the different job. FYI, I gave all necessary permissions and attached required policies for the job to run and have set the VPC and security group rules.

Here's my code given below.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
node1 = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://testbucket-mohit/Source/AMH.csv"]},
    transformation_ctx="S3bucket_node1",
)

# Script generated for node Amazon Redshift
node2 = glueContext.write_dynamic_frame.from_options(
    frame=node1,
    connection_type="redshift",
    connection_options={
        "redshiftTmpDir": "s3://aws-glue-assets-765579251507-ap-south-1/temporary/",
        "useConnectionProperties": "true",
        "dbtable": "public.amh2",
        "connectionName": "redshift",
        "preactions": "DROP TABLE IF EXISTS public.amh2; CREATE TABLE IF NOT EXISTS public.amh2 (id VARCHAR, owner_name VARCHAR, property_name VARCHAR, address_line1 VARCHAR, city VARCHAR, state VARCHAR, zipcode VARCHAR, country VARCHAR, square_feet VARCHAR, property_type VARCHAR, year_built VARCHAR, url VARCHAR, cityurl VARCHAR);",
    },
    transformation_ctx="node2",
)

job.commit()

The data successfully loads into dynamic frame however when it comes to writing data to redshift, it throws below error.

23/10/10 09:07:56 INFO LogPusher: stopping
23/10/10 09:07:56 INFO ProcessLauncher: postprocessing
23/10/10 09:07:56 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
  File "/tmp/testrun.py", line 26, in <module>
    node2 = glueContext.write_dynamic_frame.from_options(
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    return self._glue_context.write_dynamic_frame_from_options(frame,
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 337, in write_dynamic_frame_from_options
    return self.write_from_options(frame, connection_type,
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 355, in write_from_options
    sink = self.getSink(connection_type, format, transformation_ctx, **new_options)
  File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 317, in getSink
    j_sink = self._ssql_ctx.getSink(connection_type,
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o86.getSink.
: java.sql.SQLException: The connection attempt failed.
    at com.amazon.redshift.util.RedshiftException.getSQLException(RedshiftException.java:56)
    at com.amazon.redshift.Driver.connect(Driver.java:319)
    at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectionProperties$5(JDBCUtils.scala:1061)
    at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectWithSSLAttempt$2(JDBCUtils.scala:1012)
    at scala.Option.getOrElse(Option.scala:189)
    at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectWithSSLAttempt$1(JDBCUtils.scala:1012)
    at scala.Option.getOrElse(Option.scala:189)
    at com.amazonaws.services.glue.util.JDBCWrapper$.connectWithSSLAttempt(JDBCUtils.scala:1012)
    at com.amazonaws.services.glue.util.JDBCWrapper$.connectionProperties(JDBCUtils.scala:1057)
    at com.amazonaws.services.glue.util.JDBCWrapper.connectionProperties$lzycompute(JDBCUtils.scala:820)
    at com.amazonaws.services.glue.util.JDBCWrapper.connectionProperties(JDBCUtils.scala:820)
    at com.amazonaws.services.glue.util.JDBCWrapper.getRawConnection(JDBCUtils.scala:833)
    at com.amazonaws.services.glue.RedshiftDataSink.<init>(RedshiftDataSink.scala:39)
    at com.amazonaws.services.glue.GlueContext.getSink(GlueContext.scala:1121)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at com.amazon.redshift.core.RedshiftStream.<init>(RedshiftStream.java:86)
    at com.amazon.redshift.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:111)
    at com.amazon.redshift.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:224)
    at com.amazon.redshift.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
    at com.amazon.redshift.jdbc.RedshiftConnectionImpl.<init>(RedshiftConnectionImpl.java:328)
    at com.amazon.redshift.Driver.makeConnection(Driver.java:474)
    at com.amazon.redshift.Driver.connect(Driver.java:295)
    ... 24 more

Even I tried to run below cell block in AWS Glue's Jupyter Notebook, still it gives below error.

node2 = glueContext.write_dynamic_frame.from_options(
    frame=node1,
    connection_type="redshift",
    connection_options={
        "redshiftTmpDir": "s3://aws-glue-assets-765579251507-ap-south-1/temporary/",
        "useConnectionProperties": "true",
        "dbtable": "public.amh2",
        "connectionName": "redshift",
        "preactions": "DROP TABLE IF EXISTS public.amh2; CREATE TABLE IF NOT EXISTS public.amh2 (id VARCHAR, owner_name VARCHAR, property_name VARCHAR, address_line1 VARCHAR, city VARCHAR, state VARCHAR, zipcode VARCHAR, country VARCHAR, square_feet VARCHAR, property_type VARCHAR, year_built VARCHAR, url VARCHAR, cityurl VARCHAR);",
    },
    transformation_ctx="node2",
)

Am I missing something here? Thank you in advance.

posta 9 mesi fa2043 visualizzazioni
1 Risposta
3

As you might aware, there are two types of connections can be made from Glue,

my_conn_options = {  
    "url": "jdbc:redshift://redshift-cluster-1.xxxxxxxxxxxx.eu-west-1.redshift.amazonaws.com:5439/dev",
    "dbtable": "test2",
    "user": "admin",
    "password": "xxxxxxxx!",
    "redshiftTmpDir": "s3://xxxxxxxxxxxxxxx/"}

df = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)

-or-

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "testnb3", table_name = "redshift_dev_public_test2", redshift_tmp_dir ="s3://xxxxxxxxxxx/", transformation_ctx = "datasource0")

Failing the connection for the above methods purely seems to be an network or IAM issue.

An error occurred while calling o86.getSink. : java.sql.SQLException: The connection attempt failed. Caused by: java.net.SocketTimeoutException: connect timed out

To use Amazon Redshift clusters in AWS Glue, you will need some prerequisites:

  1. An Amazon S3 directory to use for temporary storage when reading from and writing to the database.
  2. An Amazon VPC enabling communication between your Amazon Redshift cluster, your AWS Glue job and your Amazon S3 directory.
  3. Appropriate IAM permissions on the AWS Glue job and Amazon Redshift cluster.

Please refer this document for more details to set them up.

Besides, please also check the security groups, routings for the redshift cluster if your cluster is in private subnet. Below links help to add the security group rules, vpc settings etc.

https://repost.aws/knowledge-center/private-redshift-cluster-local-machine https://docs.aws.amazon.com/glue/latest/dg/setup-vpc-for-glue-access.html

AWS
TECNICO DI SUPPORTO
con risposta 9 mesi fa
  • Thanks. I am able to fix my silly issue. In my case, the IAM permissions missing. This doc helps https://docs.aws.amazon.com/redshift/latest/mgmt/redshift-iam-authentication-access-control.html#redshift-iam-accesscontrol

  • No, the problem is not with creating dynamic frame but with writing dynamic frame to redshift table. S3 directory for temporary storage has already been provided. IAM permissions and communication between all 3 had been already established since ETL job runs successfully built visually and not via script.

    This error comes up when I create job via spark script and copy template that was auto-generated during the successful ETL job which was built visually using blocks. Even tried same with Jupyter Notebook, however error still persists.

  • Hmm, interesting. Did you check the network configurations set at redshift cluster. If SG, Routing in private subnet allows connection to redshift. I think you have issue only when writing to redshift.

Accesso non effettuato. Accedi per postare una risposta.

Una buona risposta soddisfa chiaramente la domanda, fornisce un feedback costruttivo e incoraggia la crescita professionale del richiedente.

Linee guida per rispondere alle domande