Glue Jobs are failing and cannot resolve given input column when run with Enabled Job Bookmark

0

Hi, I found a consistent issue with Glue Jobs using "Transform - SQL Query" activity when the job is run with "Job Bookmark" set to enabled. In this case when the job is run I get the following exception

pyspark.sql.utils.AnalysisException: cannot resolve '`ds.name`' given input columns

ds.name is just a name of the field. Example is here

The following is the error log entry (with the full script code available here https://jsfiddle.net/tvrznbuq/)

2022-01-31 09:49:56,225 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last): File "/tmp/Transform Device Data to DB Table.py", line 88, in <module> transformation_ctx="FormatReadingOutput_node1643513054931", File "/tmp/Transform Device Data to DB Table.py", line 13, in sparkSqlQuery result = spark.sql(query) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco raise converted from None pyspark.sql.utils.AnalysisException: cannot resolve 'ds.name' given input columns: []; line 4 pos 6; 'Distinct +- 'Project ['ds.device_id, 'ds.meter_id, 'ds.value, 'ds.time, 'ds.name, 'ds.channel] +- 'Filter ((('ds.name = 'ds.channel) AND NOT ('ds.value = 0.0)) AND isnotnull('ds.value)) +- SubqueryAlias ds +- SubqueryAlias mydatasource +- LogicalRDD false

It looks to me that when the job is re-run there are no results for the SQL query to return any entries, and instead of returning nothing, the entire activity fails.

I've tested it with the Job Bookmark set to disabled and no errors happening. I've also tested uploading the new files to my source destination dated later then the previous run and still the job has been failing.

It seems as the serious bug and I could not find any workaround. Any advise how to fix it?

Michael
已提問 3 年前檢視次數 3947 次
1 個回答
0

Hi,

what is happening is that when you read with the bookmark and there is no data, Glue generates an empty dynamicframe and Spark SQL cannot map the columns he has to an empty schema.

The solution is to add a check to see if any of dynamicframe generated based on the source data is empty , and then use a if branch to decide if to exit with cleanly with a message that no process was needed or to move forward.

You can insert a Custom Transform with the following code:


def validate_source_table (glueContext, dfc) -> DynamicFrameCollection:
    noData = 0
    for i in range (0, len(list(dfc.keys()))):
        if dfc.select(list(dfc.keys())[i]).toDF().rdd.isEmpty():
            print(' no data in table: ' , list(dfc.keys())[i])
            noData =1
    if noData==1:
        print('committing and exiting job without processing.')
        job.commit()
        quit('No new Data. Exiting')  
    print(noData)  
    return dfc

the job will still fail but it will give a better exit code specifying that there is no New Data and it is exiting.

after the custom transform you need to have to SelectFromCollection and name it properly based on the schema.

the SQL node will have as parent the SelectFromCollection transofrms.

hope this helps

AWS
專家
已回答 3 年前
  • Hi, are there any complete examples of how to address this problem? I'm getting the following error "'DynamicFrameCollection' object has no attribute 'toDF'" using your snippet

    Additionally, what's the best way to debug such custom code?

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南