Glue Jobs are failing and cannot resolve given input column when run with Enabled Job Bookmark

0

Hi, I found a consistent issue with Glue Jobs using "Transform - SQL Query" activity when the job is run with "Job Bookmark" set to enabled. In this case when the job is run I get the following exception

pyspark.sql.utils.AnalysisException: cannot resolve '`ds.name`' given input columns

ds.name is just a name of the field. Example is here

The following is the error log entry (with the full script code available here https://jsfiddle.net/tvrznbuq/)

2022-01-31 09:49:56,225 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last): File "/tmp/Transform Device Data to DB Table.py", line 88, in <module> transformation_ctx="FormatReadingOutput_node1643513054931", File "/tmp/Transform Device Data to DB Table.py", line 13, in sparkSqlQuery result = spark.sql(query) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco raise converted from None pyspark.sql.utils.AnalysisException: cannot resolve 'ds.name' given input columns: []; line 4 pos 6; 'Distinct +- 'Project ['ds.device_id, 'ds.meter_id, 'ds.value, 'ds.time, 'ds.name, 'ds.channel] +- 'Filter ((('ds.name = 'ds.channel) AND NOT ('ds.value = 0.0)) AND isnotnull('ds.value)) +- SubqueryAlias ds +- SubqueryAlias mydatasource +- LogicalRDD false

It looks to me that when the job is re-run there are no results for the SQL query to return any entries, and instead of returning nothing, the entire activity fails.

I've tested it with the Job Bookmark set to disabled and no errors happening. I've also tested uploading the new files to my source destination dated later then the previous run and still the job has been failing.

It seems as the serious bug and I could not find any workaround. Any advise how to fix it?

Michael
gefragt vor 3 Jahren3951 Aufrufe
1 Antwort
0

Hi,

what is happening is that when you read with the bookmark and there is no data, Glue generates an empty dynamicframe and Spark SQL cannot map the columns he has to an empty schema.

The solution is to add a check to see if any of dynamicframe generated based on the source data is empty , and then use a if branch to decide if to exit with cleanly with a message that no process was needed or to move forward.

You can insert a Custom Transform with the following code:


def validate_source_table (glueContext, dfc) -> DynamicFrameCollection:
    noData = 0
    for i in range (0, len(list(dfc.keys()))):
        if dfc.select(list(dfc.keys())[i]).toDF().rdd.isEmpty():
            print(' no data in table: ' , list(dfc.keys())[i])
            noData =1
    if noData==1:
        print('committing and exiting job without processing.')
        job.commit()
        quit('No new Data. Exiting')  
    print(noData)  
    return dfc

the job will still fail but it will give a better exit code specifying that there is no New Data and it is exiting.

after the custom transform you need to have to SelectFromCollection and name it properly based on the schema.

the SQL node will have as parent the SelectFromCollection transofrms.

hope this helps

AWS
EXPERTE
beantwortet vor 3 Jahren
  • Hi, are there any complete examples of how to address this problem? I'm getting the following error "'DynamicFrameCollection' object has no attribute 'toDF'" using your snippet

    Additionally, what's the best way to debug such custom code?

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen