Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question about subgraphs/filtering #152

Open
daveaitel opened this issue Nov 15, 2021 · 9 comments
Open

Question about subgraphs/filtering #152

daveaitel opened this issue Nov 15, 2021 · 9 comments
Labels
question Further information is requested

Comments

@daveaitel
Copy link

Let's say I had a very large graph of IDs->commited->CommitNodes->modified->FileNodes where my edges are reversable.

Commitnodes have a author_date that is a datetime type in the DGraph schema. (ID's and FileNodes do not).

If I want to select from my graph a subgraph of of all the commitNodes for a particular month, and all the filenodes and IDs that they link to, I'm not sure if I can with the current API. Or can I?

Thanks in advance!
-dave

@EnricoMi
Copy link
Collaborator

Partially. The best you can get is to only load all committed and modified edges and all node ids with author_date in your date range and then filtering the edges by joining with the node ids. You can't get better than that with the current connector implementation.

Assuming only CommitNodes have author_date, you could do

def read(first: Timestamp, last: Timestamp): Unit = {
  val edges = spark.read.dgraph.edges("localhost:9080")
  val nodes = spark.read.dgraph.nodes("localhost:9080")

  val committedEdges = edges.where($"predicate" === "committed")
  val modifiedEdges = edges.where($"predicate" === "modified")
  val commitNodeIds = nodes.where($"predicate" === "author_date" && $"objectTimestamp".between(first, last))

  Seq(
    committedEdges.join(commitNodeIds, committedEdges("objectUid") === commitNodeIds("subject"), "leftsemi"),
    modifiedEdges.join(commitNodeIds, modifiedEdges("subject") === commitNodeIds("subject"), "leftsemi")
  ).reduce(_.union(_))
}

This assumes that the Left Semi join pushes the "subject" projection down to the nodes DataFrame.

@EnricoMi
Copy link
Collaborator

If you are calling that read for many different months, you could also read the entire committed-modified subgraph, enrich it with author date and store the data in partitioned parquet files:

val edges = spark.read.dgraph.edges("localhost:9080")
val nodes = spark.read.dgraph.nodes("localhost:9080")

val committedEdges = edges.where($"predicate" === "committed")
val modifiedEdges = edges.where($"predicate" === "modified")
val authorDates = nodes.where($"predicate" === "author_date").select($"subject".as("node_id"), $"objectTimestamp".as("author_date"))
val authorMonths = authorDates.select($"node_id", date_format($"author_date", "yyyy-MM").as("author_month"))

edgesWithAuthorDate = Seq(
  committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")),
  modifiedEdges.join(authorMonths.withColumnRenamed("node_id", "subject"))
).reduce(_.union(_))

edgesWithAuthorDate.write.partitionBy("author_month").parquet("commit-modified-edges")

Then you can read your sub-graph by calling:

spark.read.parquet("commit-modified-edges").where($"author_month" === "2020-01") 

This will read only relevant edges.

I recommend to use DataFrame.writePartitionedBy provided by our spark-extension package to write sensible partitioned files.

@EnricoMi
Copy link
Collaborator

Some thinking about how to make the connector support your use-case (#144). Your request would turn into the following DQL query:

query {
  commitNodes as var(func: between(<author_date>, "2020-01-01T00:00:00Z", "2020-01-31T23:59:59Z"))

  result (func: uid(commitNodes), first: 100, offset: 0) {
    <uid>
    <author_date>
    <~commited> { <uid> }
    <modified> { <uid> }
  }
}

Which would be straightforward to turn into triples. As soon as you want to retrieve data from IDs or FileNodes, things become tricky as this can introduce duplicate triples. Also the , first: 100, offset: 0 would need to be injected to add partitioning. It looks like only a very limited subset of DQL can potentially be supported by this connector.

@EnricoMi EnricoMi added the question Further information is requested label Nov 15, 2021
@daveaitel
Copy link
Author

daveaitel commented Nov 15, 2021 via email

@daveaitel
Copy link
Author

daveaitel commented Nov 19, 2021 via email

@daveaitel
Copy link
Author

daveaitel commented Nov 19, 2021 via email

@EnricoMi
Copy link
Collaborator

The io.grpc.StatusRuntimeException: UNAVAILABLE is 100% DGraph, so up to this point your PySpark is correct. This looks like the Dgraph instance is down. You should check the output of the alpha and zero nodes. And check it is still working with Ratel / https://play.dgraph.io.

@daveaitel
Copy link
Author

daveaitel commented Nov 21, 2021 via email

@EnricoMi
Copy link
Collaborator

Sorry for the late reply. Hope this background info is still relevant.

Smaller chunk sizes mean more partitions. With a large Spark cluster, this means more concurrent reads hitting the Dgraph cluster, which might cause the unresponsiveness.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants