Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Merge into command fails when trying to use only few columns in source data while using partial data payload #11138

Open
pravin1406 opened this issue May 2, 2024 · 5 comments
Labels
priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions

Comments

@pravin1406
Copy link

HUDI version -> 0.14.1
Spark version -> 3.2.0
hadoop version -> 3.1.1
hive version -> 3.1.1

Hi
I wanted to use partial data update payload. I have multiple sources, which all want to write into same hudi table. Each of these table do have 1 precombine and record key in common.

With reconcile schema set to true and payload set to partial data payload. I'm able to achieve this as reconcile schema takes the effort to condition my schema propertly when using datasource .

Bu same is not the case when using merge into with spark-sql. It gives me below error.

2024-05-02 02:38:02,771 ERROR io.HoodieAppendHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=pravin partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20240502023620823, fileId=75001b47-689f-41d2-9216-9fbd79502292-0}', newLocation='HoodieRecordLocation {instantTime=20240502023755261, fileId=75001b47-689f-41d2-9216-9fbd79502292-0}'} java.lang.ArrayIndexOutOfBoundsException: 8 at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.isNullAt(SpecificInternalRow.scala:241) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:128) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:118) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:118) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getInsertValue(ExpressionPayload.scala:247) at org.apache.hudi.common.model.HoodieAvroRecord.shouldIgnore(HoodieAvroRecord.java:173) at org.apache.hudi.io.HoodieAppendHandle.prepareRecord(HoodieAppendHandle.java:254) at org.apache.hudi.io.HoodieAppendHandle.writeToBuffer(HoodieAppendHandle.java:592) at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:448) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:338) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:260) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2024-05-02 02:38:03,375 ERROR hudi.HoodieSparkSqlWriterInternal: UPSERT failed with errors 2024-05-02 02:38:03,375 WARN hudi.HoodieSparkSqlWriterInternal: Closing write client org.apache.hudi.exception.HoodieException: Merge into Hoodie table command failed at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:441) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) ... 47 elided
On looking at the code i found 2 problems, when i created this table with CTAS query using all relevant options including table_properties. table_properties were not getting set inside the hive table. I had to set hoodie.datasource.hive_sync.table_properties again with all properties. But then also reconcile schema did not seem to work.

After debugging and code walk through i found reconcile schema to be among overriding properties. I read the comment there, but did not really understood the perspective behind it. This has kind of blocked me from using spark sql to partial update my records.

Can you guys explain why this is there? Also how can we make this work.

CTAS Query
spark.sql("CREATE TABLE " + tablename + " USING org.apache.hudi " + " OPTIONS ( " + " primaryKey '" + recordkey + "', " + " path '/tmp/pravin/" + tablename + "', " + " hoodie.table.name '" + tablename + "', " + " hoodie.datasource.write.operation 'upsert', " + " hoodie.datasource.write.precombine.field '" + precombinekey + "', " + " hoodie.datasource.write.recordkey.field '" + recordkey + "', " + " hoodie.datasource.write.payload.class 'org.apache.hudi.common.model.PartialUpdateAvroPayload', " + " hoodie.datasource.write.table.type 'MERGE_ON_READ', " + " hoodie.enable.data.skipping 'true', " + " hoodie.datasource.write.reconcile.schema 'true', " + " hoodie.datasource.hive_sync.support_timestamp 'true', " + " hoodie.upsert.shuffle.parallelism '200', " + " hoodie.index.type 'SIMPLE', " + " hoodie.simple.index.update.partition.path 'true', " + " hoodie.datasource.write.hive_style_partitioning 'true', " + " hoodie.datasource.hive_sync.enable 'true', " + " hoodie.datasource.hive_sync.mode 'HMS', " + " hoodie.datasource.hive_sync.sync_comment 'true', " + " hoodie.datasource.hive_sync.database 'default', " + " hoodie.datasource.hive_sync.table_properties '" + tableProperties + "', " + " hoodie.datasource.hive_sync.table '" + tablename + "', " + " hoodie.schema.on.read.enable 'true' " + " ) as select * from merge_source " );

input data
val df = Seq((9,"qwertyuiop","US","1","pravin","abcd","SFO")).toDF("EventTime","transactionId","Country","storeNbr","FullName","CompanyName","City")

merge command:

spark.sql("merge into partial_update_4_rt as target using merge_source as source on target."+recordkey +" = source."+recordkey+" when matched then update set City = source.City , CompanyName = source.CompanyName, EventTime = source.EventTime").show

@codope
@ad1happy2go

@xushiyan
Copy link
Member

xushiyan commented May 2, 2024

@jonvex you have better context on this?

@pravin1406
Copy link
Author

pravin1406 commented May 4, 2024

@jonvex Any help on this ?

@pravin1406
Copy link
Author

@nsivabalan @jonvex Can you help on this ?

@ad1happy2go
Copy link
Contributor

@pravin1406
We already had the JIRA Open for this . This is thhe issue with MERGE INTO for MOR tables - https://issues.apache.org/jira/browse/HUDI-6737

@jonvex Did we got a chance to fix it?

Code to reproduce -

CREATE TABLE merge_source (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING PARQUET;

INSERT INTO merge_source
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

CREATE TABLE hudi_table (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING HUDI
PARTITIONED BY (city)
OPTIONS ( 
  primaryKey 'uuid', 
  hoodie.datasource.write.operation 'upsert', 
  hoodie.datasource.write.precombine.field 'ts', 
  hoodie.datasource.write.recordkey.field 'uuid',
  hoodie.table.name 'issue_11138',
  hoodie.datasource.write.table.type 'MERGE_ON_READ'
);

insert into hudi_table 
select * from merge_source;

merge into hudi_table as target 
using merge_source as source 
on target.uuid = source.uuid
when matched then update set target.city =  source.city , 
target.fare = source.fare, target.ts = source.ts;

@codope codope added priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions labels May 9, 2024
@pravin1406
Copy link
Author

After debugging and code walk through i found reconcile schema to be among overriding properties. I read the comment there, but did not really understood the perspective behind it. This has kind of blocked me from using spark sql to partial update my records.

This part of the codebase, leads to the failure. I tested with removing reconcile schema form overriding properties, then it worked for me. But not fully aware of it's consequences

@ad1happy2go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

4 participants