Hi!
I'm having some issue processing a DynamoDB export in Glue. It looks like my schema is breaking DynamicFrame's unnestDDBJson transformation.
This is the minimal test case that will trigger the error:
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import com.holdenkarau.spark.testing.SharedSparkContext
import org.scalatest.flatspec.AnyFlatSpec
class DynamoUnnestTest extends AnyFlatSpec with SharedSparkContext {
"DynamoDB unnest" should "work" in {
val glueContext = new GlueContext(sc)
val spark = glueContext.sparkSession
import spark.implicits._
val ddbItem =
"""
|{
| "Item": {
| "PK": {
| "S": "PFL/XPAR/EUR"
| },
| "Obj": {
| "M": {
| "FiMap": {
| "M": {
| "3659288a-0b41-4bf5-ac7f-a379fafa5576": {
| "M": {
| "DT": {
| "S": "2023-05-25"
| }
| }
| }
| }
| }
| }
| }
| }
|}
|""".stripMargin
val df = spark.read.json(Seq(ddbItem).toDS).toDF()
val dyf = DynamicFrame.apply(df, glueContext)
// This fails with exception:
// org.apache.spark.sql.AnalysisException: No such struct field 3659288a in 3659288a-0b41-4bf5-ac7f-a379fafa5576;
dyf.unnestDDBJson()
}
}
Any idea if I'm doing something wrong? Or could approach it differently somehow?
It seems the DynamicFrame is initially created (including all the "M", "S" objects in its schema), but then fails to run the transform. Initially, I got this error while using the Glue Dynamo connector ^2 (configured using the PITR export to S3). I now intend to try with a DynamoDB Scan, but that's really not the optimal way...
You can see the configuration and stacktrace in this gist ^1
For the sake of completeness, I now managed to do the import via the DynamoDB ETL connector that runs a Scan over the table^1. In this case, the conversion is performed implicitly: the structure I get in the DynamicFrame does not use the DynamoDB JSON format at all, and imports without errors... I guess I'll probably use this strategy for now, until it becomes too expensive. It's really annoying that this Glue lib is not OSS to figure out what's wrong though...