Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Support] Request: Ability to Invoke Scala Code for Operations in oneTable #353

Open
soumilshah1995 opened this issue Feb 29, 2024 · 21 comments

Comments

@soumilshah1995
Copy link

Hello,

I hope this message finds you well.

I'm currently in the midst of experimenting with oneTable and conducting some proof of concept work. As I delve deeper into this, I've encountered a few questions, and I was hoping to seek some guidance. Please forgive me if some of these questions seem basic; I'm earnestly striving to expand my knowledge about oneTable.

One of the queries I have pertains to the invocation of Scala code instead of the following command:


java \
-jar  ./jar/utilities-0.1.0-beta1-bundled.jar \
--dataset ./my_config.yaml

The reason behind this inquiry is that, during regular write operations, I typically utilize Scala code to execute tasks on AWS Glue for table synchronization.

I would greatly appreciate any insights or suggestions you might have regarding this matter.

Thank you for your time and assistance.

@the-other-tim-brown
Copy link
Contributor

Do you mean you want to call the OneTable classes directly from scala? I've done something similar locally in the docker demo with the notebook but will need to look into how to get the same jars available for Glue.

@soumilshah1995
Copy link
Author

I am interested in running oneTable on Glue for my synchronization process. Specifically, I am looking for a method to translate metadata efficiently. I was hoping there might be a solution to invoke Scala code and utilize jar files within Glue to accomplish this task.

By enabling such functionality, customers would find it significantly easier to schedule their jobs, eliminating the need for manual execution via shell commands.

Thank you for considering this request. Please let me know if there are any possibilities or recommendations regarding this matter.

@the-other-tim-brown
Copy link
Contributor

the-other-tim-brown commented Mar 4, 2024

I still don't really understand the request.

There is functionality to add jars to your glue jobs with --extra-jars https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html

We also have a dockerized demo you can run which has a scala notebook https://github.com/apache/incubator-xtable/tree/main/demo

@soumilshah1995
Copy link
Author

Hello @the-other-tim-brown,

Thank you for your response. I understand that we can add Jar files, but I was wondering if you have any examples of glue code with one table that you could share?

Additionally, I'm curious if there's a method to invoke this in PySpark. We have numerous jobs running on Glue, primarily in Python. Do you happen to have any examples with PySpark? If not, perhaps it would be beneficial to consider adding some examples to the website, specifically onetable.dev.

Thank you for your assistance.

@the-other-tim-brown
Copy link
Contributor

I have not worked with Glue before. Can you help me understand where it differs from regular Scala code?

There is no support for python at this time but that is tracked in this issue: #253

@soumilshah1995
Copy link
Author

Hello @the-other-tim-brown

I'm curious about the Python wrapper 253 Ticket. Do you think it would enable us to utilize Onetable in PySpark?

I haven't delved deeply into Scala and Glue, but I believe we can collaborate effectively here by providing some examples. This could help spread awareness among companies about leveraging Onetable on AWS. Many customers are eager to utilize Onetable on AWS Glue. Perhaps during our free time, we could collaborate and get some examples up and running.

@soumilshah1995
Copy link
Author

soumilshah1995 commented Mar 7, 2024

Hey @the-other-tim-brown I started some work with glue and onetable on delta streamer

here is what I am doing and I know 99% its jar issue

Step 1: Upload jar to S3

s3://XX/jar/hudi-spark3.3-bundle_2.12-0.14.0.jar
s3://XX/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar
s3://XXjar/jcommander-1.78.jar
s3://XX/jar/hudi-extensions-0.1.0-SNAPSHOT-bundled.jar
s3://XX/jar/hudi-java-client-0.14.0.jar

Step 2: Upload Sample dataset inside test folder

Link. ; https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

Stop 3: Create Glue job with Delta Streamer and onetable


import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.api.java.JavaSparkContext
import org.apache.hudi.utilities.streamer.HoodieStreamer
import org.apache.hudi.utilities.streamer.SchedulerConfGenerator
import org.apache.hudi.utilities.UtilHelpers
import com.beust.jcommander.JCommander
import com.beust.jcommander.Parameter

object GlueApp {

  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val BUCKET = "XXX"

    var config = Array(
      "--source-class", "org.apache.hudi.utilities.sources.ParquetDFSSource",
      "--source-ordering-field", "replicadmstimestamp",
      s"--target-base-path", s"s3://$BUCKET/testcases/",
      "--target-table", "invoice",
      "--table-type", "COPY_ON_WRITE",
      "--sync-tool-classes", "io.onetable.hudi.sync.OneTableSyncTool",
      "--enable-sync",
      "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
      "--hoodie-conf", "hoodie.datasource.write.recordkey.field=invoiceid",
      "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=destinationstate",
      s"--hoodie-conf", s"hoodie.streamer.source.dfs.root=s3://$BUCKET/test/",
      "--hoodie-conf", "hoodie.datasource.write.precombine.field=replicadmstimestamp",
      "--hoodie-conf", "hoodie.onetable.formats.to.sync=DELTA",
      "--hoodie-conf", "hoodie.onetable.target.metadata.retention.hr=168"
    )

    val cfg = HoodieStreamer.getConfig(config)
    val additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg)
    val jssc = UtilHelpers.buildSparkContext("delta-streamer-test", "jes", additionalSparkConfigs)
    val spark = jssc.sc

    val glueContext: GlueContext = new GlueContext(spark)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    try {
      new HoodieStreamer(cfg, jssc).sync()
    } finally {
      jssc.stop()
    }

    Job.commit()
  }
}

Glue side make to to add all jar as shown in image above

image

Screenshot 2024-03-07 at 7 52 26 AM

Hudi Tables is created I think it fails sync tool same error from past which mean mostly Jar issue

Error

2024-03-07 12:45:56,334 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(98)): Exception in User Class
org.apache.hudi.exception.HoodieMetaSyncException: Could not sync using the meta sync class io.onetable.hudi.sync.OneTableSyncTool
	at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:81) ~[hudi-spark3.3-bundle_2.12-0.14.0.jar:0.14.0]
	at org.apache.hudi.utilities.streamer.StreamSync.runMetaSync(StreamSync.java:938) ~[hudi-utilities-slim-bundle_2.12-0.14.0.jar:0.14.0]
	at org.apache.hudi.utilities.streamer.StreamSync.writeToSink(StreamSync.java:851) ~[hudi-utilities-slim-bundle_2.12-0.14.0.jar:0.14.0]
	at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:446) ~[hudi-utilities-slim-bundle_2.12-0.14.0.jar:0.14.0]
	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:840) ~[hudi-utilities-slim-bundle_2.12-0.14.0.jar:0.14.0]
	at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72) ~[hudi-utilities-slim-bundle_2.12-0.14.0.jar:0.14.0]
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) ~[hudi-spark3.3-bundle_2.12-0.14.0.jar:0.14.0]
	at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205) ~[hudi-utilities-slim-bundle_2.12-0.14.0.jar:0.14.0]
	at GlueApp$.main(test0.scala:50) ~[test0.scala.jar:?]
	at GlueApp.main(test0.scala) ~[test0.scala.jar:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_402]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_402]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_402]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_402]
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:65) ~[AWSGlueSparkResourceManager-1.0.jar:?]
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:65) ~[AWSGlueSparkResourceManager-1.0.jar:?]
	at com.amazonaws.services.glue.ProcessLauncher$$anon$2.invoke(ProcessLauncher.scala:212) ~[AWSGlueSparkResourceManager-1.0.jar:?]
	at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:400) ~[AWSGlueSparkResourceManager-1.0.jar:?]
	at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:45) ~[AWSGlueSparkResourceManager-1.0.jar:?]
	at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala) ~[AWSGlueSparkResourceManager-1.0.jar:?]
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/expressions/Empty2Null
	at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:214) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:211) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.delta.DeltaClient$TransactionState.<init>(DeltaClient.java:232) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.delta.DeltaClient$TransactionState.<init>(DeltaClient.java:217) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.delta.DeltaClient.beginSync(DeltaClient.java:153) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.spi.sync.TableFormatSync.getSyncResult(TableFormatSync.java:154) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.spi.sync.TableFormatSync.syncSnapshot(TableFormatSync.java:70) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.client.OneTableClient.syncSnapshot(OneTableClient.java:179) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.client.OneTableClient.sync(OneTableClient.java:116) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.hudi.sync.OneTableSyncTool.syncHoodieTable(OneTableSyncTool.java:80) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:79) ~[hudi-spark3.3-bundle_2.12-0.14.0.jar:0.14.0]
	... 19 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.expressions.Empty2Null
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_402]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
	at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:214) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:211) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.delta.DeltaClient$TransactionState.<init>(DeltaClient.java:232) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.delta.DeltaClient$TransactionState.<init>(DeltaClient.java:217) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.delta.DeltaClient.beginSync(DeltaClient.java:153) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.spi.sync.TableFormatSync.getSyncResult(TableFormatSync.java:154) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.spi.sync.TableFormatSync.syncSnapshot(TableFormatSync.java:70) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.client.OneTableClient.syncSnapshot(OneTableClient.java:179) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.client.OneTableClient.sync(OneTableClient.java:116) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at io.onetable.hudi.sync.OneTableSyncTool.syncHoodieTable(OneTableSyncTool.java:80) ~[hudi-extensions-0.1.0-SNAPSHOT-bundled.jar:?]
	at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:79) ~[hudi-spark3.3-bundle_2.12-0.14.0.jar:0.14.0]
	... 19 more

@soumilshah1995
Copy link
Author

I think as sagar was saying there is NULL pointer exception hmm

@soumilshah1995
Copy link
Author

For Glue Scala should I create separate Ticket or this thread is good ?

@the-other-tim-brown
Copy link
Contributor

This ticket is fine. There is no NullPointerException thrown here. If you inspect the stacktrace you'll see a ClassNotFoundException. This means that there is an issue with the classpath. Which version of spark does the Glue job run with?

@soumilshah1995
Copy link
Author

I've encountered a slight challenge with our AWS setup. While AWS utilizes Spark 3.3-amzn, my local environment runs Spark 3.4. My jar files are built for Spark 3.4. Could you advise on the best approach for building the jar files to ensure compatibility with Spark 3.3-amzn on AWS?

@soumilshah1995
Copy link
Author

let me try this today not sure if works
I will clone glue 4 container locally clone onetable project and try to build jar file inside container lets see how that goes

@soumilshah1995
Copy link
Author

soumilshah1995 commented Mar 8, 2024

didn't work

by the way how would I build for specific spark version @the-other-tim-brown or @sagarlakshmipathy

shouldn't the jar that I build locally should work on Glue ?
it works locally throws class not found on glue (Scala)

@the-other-tim-brown
Copy link
Contributor

You can change the versions used in the pom.xml and rebuild the jar. You will likely need to use a Delta 2.3.X version based on this table: https://docs.delta.io/latest/releases.html

@soumilshah1995
Copy link
Author

where do I need to change pom.xml I mean which directory

image

if you can specify where I need to change delta version that would be great

@soumilshah1995
Copy link
Author

Guess what it works for ICEBERG failed for Delta

@soumilshah1995
Copy link
Author

is this a bug then for delta not sure why it failed for delta ?

@soumilshah1995
Copy link
Author

Screenshot 2024-03-09 at 4 58 38 PM

Test passed for ICEBERG failed for delta

Keeping Ticket open for Delta

@soumilshah1995
Copy link
Author

Screenshot 2024-03-09 at 5 07 30 PM

Tutorials are uploaded on Youtube :D

@the-other-tim-brown
Copy link
Contributor

The "parent pom" as it is sometimes referred is where we keep all the version information: https://github.com/apache/incubator-xtable/blob/main/pom.xml#L29

Updating here will set the versions in the dependencyManagement section in the same pom.xml that controls the versions that the other modules use by default.

If the delta error is still a class not found or something related to the catalyst package, it is going to be some packaging error. This is something we'll have to solve in the future. Ideally we would be able to use the new Delta Kernel library. I tried using their "standalone java" library in the past but it was missing some necessary features for partitioning.

@soumilshah1995
Copy link
Author

Thank you very much. I wholeheartedly agree that addressing this issue in the future is crucial. From my testing, it appears that the functionality of onetable is working as expected, which is promising. It would be tremendously helpful to complement this with some Python examples. I'm aware that there is an open ticket for Python-related tasks

wanted to say thank you for all help
ill watch #253

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants