How do you read from a postgres table (or other system via JDBC) using Glue and Spark?

0
  • Testing a Glue connection is successful.
  • Retrieving information about the connection I think should work using glue_context.extract_jdbc_conf(). The spark session is returned from another util function and then is passed into this function, which is why I am getting the glue context from the spark context instead of the other way around.
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession


spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

# Default settings
spark_config_settings = {
    "spark.logConf": "true",
    "spark.sql.sources.partitionOverwriteMode": "dynamic",
    "spark.cleaner.referenceTracking.cleanCheckpoints": "true",
}

spark_conf = SparkConf().setAll(pairs=list(spark_config_settings.items()))

spark_session = (
    glue_context.spark_session.builder.appName(session_name)
    .config(conf=spark_conf)
    .enableHiveSupport()
    .getOrCreate()
)


def get_jdbc_table(spark_session: SparkSession, glue_connector: str, db_table: str, logger) -> DataFrame:
    glue_context = GlueContext(spark_session.sparkContext)

    jdbc_config = glue_context.extract_jdbc_conf(connection_name=glue_connector)
  • And then I think I should be able to pass the connection info to either a Glue or Spark function to read from the table.
    connection_details = {
        "dbTable": <db_table name>,
        "connectionName" : <glue_connector name>,
        "url": jdbc_config["url"],
        "user": jdbc_config["user"],
        "password": jdbc_config["password"],
        "redshiftTmpDir": <s3 tmp path - do I need this for postgres?>,
    }

    # 1. Read from jdbc table via Glue context
    dynamic_frame = glue_context.create_dynamic_frame_from_options(
        connection_type=jdbc_config["vendor"],  # "postgresql"
        connection_options=connection_details,
    )

    # 2. Or Read via spark.read.jdbc()
    Read from jdbc table via Spark context
    df = spark_session.read.jdbc(
        url=jdbc_config["url"],
        # table=db_table,
        table=db_table.split(".")[1],
        properties={
            "driver": "org.postgresql.Driver",
            "user": jdbc_config["user"],
            "password": jdbc_config["password"],
        },
    )

    # 3. Or Read via spark.read.format("jdbc")
    df = (
        spark.read
        .format("jdbc")
        .option("url", jdbc_config["url"])
        .option("dbtable", db_table)
        .option("user", jdbc_config["user"])
        .option("password", jdbc_config["password"])
        .option("driver", "org.postgresql.Driver")
        .load()
    )
  • The following is the error I get when trying different options listed above.
AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to <servers listed here> failed: connect timed out
profile picture
Jaime
asked a year ago1958 views
3 Answers
1

Sounds that's just a connectivity issue.
If the server url is not public, you will need to run the Glue job inside a VPC (using a Network type connection and assigning it to the Glue job).
More info: https://docs.aws.amazon.com/glue/latest/dg/connection-JDBC-VPC.html

BTW, you can tell it to take the config from the connection directly instead of extracting it yourself. https://docs.amazonaws.cn/en_us/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-jdbc

profile pictureAWS
EXPERT
answered a year ago
  • I ended up going with the following because converting a dynamic_frame to a Spark dataframe is eager, which caused performance issues in some of my jobs that use this util function I created.

    connection_properties = glue_context.extract_jdbc_conf(connection_name=connection_name)
    
    spark_reader_options = {
        "url": connection_properties["fullUrl"],
        "user": connection_properties["user"],
        "password": connection_properties["password"],
        "queryTimeout": 60,
        "dbtable": <db.table>,
        # "query": <sql query string>,
        }
    
    spark_reader = spark_session.read.format("jdbc")
    
    for key, value in spark_reader_options.items():
        spark_reader = spark_reader.option(key, value)
    
    df = spark_reader.load()
    df = F.broadcast(df)
  • Allowing the VPC access to the Glue endpoint enabled some Glue jobs to now work properly with the connection. However, jobs requiring a public Python package (e.g., apache-sedona) no longer have access to install the public package when running within the connector VPC.

    Do you have any specific recommendation as to how to account for this in addition to accounting for com.amazonaws.us-east-1.glue?

0

41 minutes and then timeout:

ConnectTimeoutError: Connect timeout on endpoint URL: "https://glue.us-east-1.amazonaws.com/"
profile picture
Jaime
answered a year ago
  • That means your job runs in a VPC that neither has internet connectivity nor the Glue regional endpoint added to the VPC (com.amazonaws.us-east-1.glue)

  • Thank you for that info. Let me be more specific. I get that error when I specify the connector in the Glue job configuration. If I remove it, I get further along. Without it, I am able to use two other methods to retrieve connection details

        import boto3
        secrets_manager_client = boto3.client(
            service_name="secretsmanager",
            region_name="us-east-1",
        )
        secret = secrets_manager_client.get_secret_value(SecretId=secret_name)
        connection_info = json.loads(secret["SecretString"])
        return connection_info
    
        import boto3
        glue_client = boto3.client(service_name="glue", region_name="us-east-1")
        connection_info = glue_client.get_connection(Name=connection_name)
        return connection_info
    
  • But I am unable to connect in the following methods based on different input credentials format:

        df = spark.read.jdbc(
            url="<url>",
            table=db_table,
            properties={
                "driver": "org.postgresql.Driver",
                # "user": connection_details["username"],
                # "password": connection_details["password"],
                "user": connection_details["Connection"]["username"],
                "password": connection_details["Connection"]["password"]
            },
        )
    
        dynamic_frame = (
            glue_context
            .read.format("jdbc")
            .option("driver", "org.postgresql.Driver")
            .option("url", "<url>")
            # .option("url", connection_details["Connection"]["ConnectionProperties"]["JDBC_CONNECTION_URL"])
            .option("dbtable", db_table)
            .option("user", connection_details["username"])
            .option("password", connection_details["password"])
            # .option("user", connection_details["Connection"]["ConnectionProperties"]["USERNAME"])
            # .option("password", connection_details["Connection"]["ConnectionProperties"]["PASSWORD"])
            .load()
        )
    
        connection_details = {
            "dbTable": db_table,
            "connectionName" : connector_name,
            "useConnectionProperties": True,
            "url": "<url>"
        }
        dynamic_frame = glue_context.create_dynamic_frame_from_options(
            connection_type="postgresql",
            connection_options=connection_details,
        )
    
  • Depending on the API might check the catalog or not, the point is that the job should be able to do that, you should have the Glue service endpoint accessible or you can run into issues any time

0

The issue with VPC is only when trying to write data out to S3. I can df.collect() and log a sample of data just fine.

connection_details = {
    "dbTable": db_table,
    "connectionName" : connector_name,
    "useConnectionProperties": True,
    "url": "<url>",
}

dynamic_frame = glue_context.create_dynamic_frame_from_options(
        connection_type="postgresql",
        connection_options=connection_details,
)

df = dynamic_frame.toDF()
profile picture
Jaime
answered a year ago
  • Yes as long as your job doesn't try to access the catalog for any reason, directly or indirectly (for instance using a SparkSession might list the databases in the catalog). It's better if you allow the job to connect to the catalog with the endpoint, otherwise any minute it can break

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions