I am using EMR 6.6.0, which has hudi 10.1. I am trying to bulkinsert and do inline clustering using Hudi. But seems its not clustering the file as per file size being mentioned. But it is still producing the files in KB only.
I tried below configuration:
hudi_clusteringopt = {
'hoodie.table.name': 'myhudidataset_upsert_legacy_new7',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'creation_date',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': 'my_hudi_db',
'hoodie.datasource.hive_sync.table': 'myhudidataset_upsert_legacy_new7',
'hoodie.datasource.hive_sync.partition_fields': 'creation_date',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.operation": "bulk_insert",
}
"hoodie.datasource.write.operation": "bulk_insert",
try:
inputDF.write.format("org.apache.hudi"). \
options(**hudi_clusteringopt). \
option("hoodie.parquet.small.file.limit", "0"). \
option("hoodie.clustering.inline", "true"). \
option("hoodie.clustering.inline.max.commits", "0"). \
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824"). \
option("hoodie.clustering.plan.strategy.small.file.limit", "629145600"). \
option("hoodie.clustering.plan.strategy.sort.columns", "pk_col"). \
mode('append'). \
save("s3://xxxxxxxxxxxxxx");
except Exception as e:
print(e)
Here is the data set if someone wants to regenerate:
inputDF = spark.createDataFrame(
[
("1001",1001, "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("1011",1011, "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("1021",1021, "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("1031",1031, "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("1041",1041, "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("1051",1051, "2015-01-02", "2015-01-01T13:51:42.248818Z"),
],
["id","id_val", "creation_date", "last_update_time"]
)
I tried with EMR but its not updating the file size