Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WideNodeEncoder fails on multiple values per predicate #138

Open
RJKeevil opened this issue Oct 18, 2021 · 3 comments
Open

WideNodeEncoder fails on multiple values per predicate #138

RJKeevil opened this issue Oct 18, 2021 · 3 comments
Labels
enhancement New feature or request

Comments

@RJKeevil
Copy link

Hi,

When I run the following code:

./spark-shell --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12,uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.7.0-3.1

import uk.co.gresearch.spark.dgraph.graphframes._
import org.graphframes._
val graph: GraphFrame = spark.read.dgraph.graphframes("localhost:9080")

graph.vertices.show()

I get the following error:

21/10/18 12:12:18 ERROR WideNodeEncoder: failed to encode node: {"uid":"0x80e4d3","Entity.identifier":"6","Entity.label":"test@gmail.com","Entity.type":"email","Entity.updateTS":"2021-10-18T12:11:23+02:00","dgraph.type":["Entity","EmailEntity"]}
21/10/18 12:12:18 ERROR Executor: Exception in task 3.0 in stage 5.0 (TID 14)
java.lang.IllegalStateException
        at com.google.gson.JsonArray.getAsString(JsonArray.java:133)
        at uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder.getValue(JsonNodeInternalRowEncoder.scala:110)
        at uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder.getValue$(JsonNodeInternalRowEncoder.scala:104)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.getValue(WideNodeEncoder.scala:35)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$toNode$5(WideNodeEncoder.scala:118)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$toNode$5$adapted(WideNodeEncoder.scala:116)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.toNode(WideNodeEncoder.scala:116)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$fromJson$1(WideNodeEncoder.scala:92)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
        at uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.get(TriplePartitionReader.scala:30)
        at uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.get(TriplePartitionReader.scala:23)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:89)
        at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:124)
        at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:121)
        at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)

My suspicion is that it fails because I have multiple entries in dgraph.type. Could you test this scenario? Or is there something else in the json above that fails to encode? Thanks!

@EnricoMi
Copy link
Collaborator

You are right, list properties / array values / multiple values per predicate are not supported (#69). This is definitively a required feature. Not sure when I will get back to this so contribution welcome.

@EnricoMi EnricoMi changed the title WideNodeEncoder fails WideNodeEncoder fails on multiple values per predicate Oct 18, 2021
@RJKeevil
Copy link
Author

I'm more than happy to give a PR a go, any pointers you can give me on where in the code is best to begin?

@EnricoMi EnricoMi added the enhancement New feature or request label Oct 18, 2021
@EnricoMi
Copy link
Collaborator

Excellent, here is what I would do:

First lets simplify the example above and get rid of the GraphFrames dependency as GraphFrames internally uses the wide node mode:

import uk.co.gresearch.spark.dgraph.connector._

val nodes = spark.read.option(NodesModeOption, NodesModeWideOption).dgraph.nodes("localhost:9080")

Then what we want is that nodes.printSchema() tells us that dgraph.type is an array of strings:

root
 |-- subject: long (nullable = false)
 |-- dgraph.type: array (nullable = false)
 |    |-- element: string (containsNull = true)
 ...

So we expect the DataFrame to contain string arrays as elements in the dgraph.type column.

Multiple values per predicate are indicated by the list attribute in the Dgraph schema. The SchemaProvider fetches the schema from Dgraph. Its current query "schema { predicate type lang }" excludes the list information, so this should read "schema { predicate type lang list }". Then Predicate need to store that list information.

The WideNodeEncoder is central here. In toStructField, a given Predicate is translated into a Spark type. So when list is true, this needs to be ArrayType.

In toNode it turns a JSON object like

{
  "uid": "0x3",
  "name": "Luke Skywalker"
}

into a Seq[Any] = Seq(3, "Luke Skywalker") and eventually into a Spark InternalRow. It uses JsonNodeInternalRowEncoder to parse the individual JSON values into their corresponding Scala types, here long and String, which need to be able to parse lists of values.

This all should be implemented test-driven through TestWideNodeEncoder. Maybe it is also time to refactor SchemaProvider so that the JSON parsing bit in getSchema is decoupled from the DgraphClient bit and can be tested by giving it some test JSON strings. You could also introduce a test for JsonNodeInternalRowEncoder as currently its methods are only tested through the derived classes, not directly. But do it as you prefer.

I hope these pointers help you to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants