Unable to run GlueStudio Jobs & getting aws sdk java.lang.NoSuchMethodError

0
2023-03-10 03:52:55,572 ERROR [stream execution thread for [id = bea7911a-d787-44e1-836b-8488a43e16ea, runId = ba5fb563-91a6-4d39-befb-f740a9be9a08]] streaming.MicroBatchExecution (Logging.scala:logError(94)): Query [id = bea7911a-d787-44e1-836b-8488a43e16ea, runId = ba5fb563-91a6-4d39-befb-f740a9be9a08] terminated with error
java.lang.NoSuchMethodError: software.amazon.awssdk.utils.SystemSetting.getStringValueFromEnvironmentVariable(Ljava/lang/String;)Ljava/util/Optional;
	at software.amazon.awssdk.awscore.interceptor.TraceIdExecutionInterceptor.lambdaFunctionNameEnvironmentVariable(TraceIdExecutionInterceptor.java:63)
	at software.amazon.awssdk.awscore.interceptor.TraceIdExecutionInterceptor.modifyHttpRequest(TraceIdExecutionInterceptor.java:40)
	at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.modifyHttpRequestAndHttpContent(ExecutionInterceptorChain.java:90)
	at software.amazon.awssdk.core.internal.handler.BaseClientHandler.runModifyHttpRequestAndHttpContentInterceptors(BaseClientHandler.java:151)
	at software.amazon.awssdk.core.internal.handler.BaseClientHandler.finalizeSdkHttpFullRequest(BaseClientHandler.java:78)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:151)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
	at software.amazon.awssdk.services.glue.DefaultGlueClient.getTable(DefaultGlueClient.java:7227)
	at org.apache.iceberg.aws.glue.GlueTableOperations.getGlueTable(GlueTableOperations.java:185)
	at org.apache.iceberg.aws.glue.GlueTableOperations.doRefresh(GlueTableOperations.java:103)
	at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:96)
	at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:79)
	at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:44)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:161)
	at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:526)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:141)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:98)
	at org.apache.spark.sql.connector.catalog.TableCatalog.tableExists(TableCatalog.java:119)
	at org.apache.spark.sql.execution.datasources.v2.AtomicCreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:108)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
	at SparkSqlQuery$.execute(sync_cj_data_shopify_product_updates_stream.py:24)
	at GlueApp$.$anonfun$main$1(sync_cj_data_shopify_product_updates_stream.py:48)
	at GlueApp$.$anonfun$main$1$adapted(sync_cj_data_shopify_product_updates_stream.py:40)
	at com.amazonaws.services.glue.GlueContext.$anonfun$forEachBatch$1(GlueContext.scala:1366)
	at com.amazonaws.services.glue.GlueContext.$anonfun$forEachBatch$1$adapted(GlueContext.scala:1347)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

Glue version 3.0

We use the Apache Iceberg Connector for Glue 3.0 to read data from Kinesis, and write data to Glue Catalog. Job Generated Script :

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.streaming.Trigger
import java.util.Calendar
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.sql.SQLContext
import com.amazonaws.services.glue.errors.CallSite

object SparkSqlQuery {
  def execute(glueContext: GlueContext, sqlContext: SQLContext, query: String, mapping: Map[String, DynamicFrame]) : DynamicFrame = {
    for ((alias, frame) <- mapping) {
      frame.toDF().createOrReplaceTempView(alias)
    }
    val resultDataFrame = sqlContext.sql(query)
    return DynamicFrame(resultDataFrame, glueContext)
  }
}

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sql: SQLContext = new SQLContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    // Script generated for node Kinesis Stream
    val dataframe_KinesisStream_node1 = glueContext.getSource(connectionType="kinesis",connectionOptions={JsonOptions("""{"typeOfData": "kinesis", "streamARN": "arn:aws:kinesis:us-west-2:133981716884:stream/shopify_product_updates_stream", "classification": "json", "startingPosition": "earliest", "inferSchema": "true"}""")}, transformationContext="dataframe_KinesisStream_node1").getDataFrame()

    glueContext.forEachBatch(dataframe_KinesisStream_node1, (dataFrame: Dataset[Row], batchId: Long) => {
      if (dataFrame.count() > 0) {
        val KinesisStream_node1 = DynamicFrame(dataFrame, glueContext)
        // Script generated for node INSERT INTO
        val SqlQuery0: String = """INSERT INTO my_catalog.cj_data.shopify_product_updates_stream SELECT * FROM myDataSource;
        ||""".stripMargin

        val INSERTINTO_node1678417695303 = SparkSqlQuery.execute(glueContext = glueContext, sqlContext = sql, query = SqlQuery0, mapping = Map("myDataSource" -> KinesisStream_node1)
        )

        // Script generated for node CREATE TABLE
        val SqlQuery1: String = """CREATE TABLE IF NOT EXISTS my_catalog.cj_data.shopify_product_updates_stream AS 
        ||(SELECT * FROM myDataSource LIMIT 0);
        ||""".stripMargin

        val CREATETABLE_node1678417075933 = SparkSqlQuery.execute(glueContext = glueContext, sqlContext = sql, query = SqlQuery1, mapping = Map("myDataSource" -> KinesisStream_node1)
        )

        // Script generated for node NO ROW SQL
        val SqlQuery2: String = """select * from my_catalog.cj_data.shopify_product_updates_stream 
        ||limit 0;
        ||""".stripMargin

        val NOROWSQL_node1678417543220 = SparkSqlQuery.execute(glueContext = glueContext, sqlContext = sql, query = SqlQuery2, mapping = Map("myDataSource" -> CREATETABLE_node1678417075933, "myDataSource2" -> INSERTINTO_node1678417695303)
        )

        // Script generated for node Apache Iceberg Connector for Glue 3.0
        val ApacheIcebergConnectorforGlue30_node3 = glueContext.getSink(connectionType="marketplace.spark", connectionOptions=JsonOptions("""{"path": "my_catalog.cj_data.shopify_product_updates_stream", "connectionName": "my-iceberg-conn"}"""), transformationContext="ApacheIcebergConnectorforGlue30_node3").writeDynamicFrame(NOROWSQL_node1678417543220)

      }
    }, JsonOptions(s"""{"windowSize" : "100 seconds", "checkpointLocation" : "${args("TempDir")}/${args("JOB_NAME")}/checkpoint/"}"""))
    Job.commit()
  }
}
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions