-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question about subgraphs/filtering #152
Comments
Partially. The best you can get is to only load all Assuming only CommitNodes have
This assumes that the Left Semi join pushes the |
If you are calling that
Then you can read your sub-graph by calling:
This will read only relevant edges. I recommend to use |
Some thinking about how to make the connector support your use-case (#144). Your request would turn into the following DQL query:
Which would be straightforward to turn into triples. As soon as you want to retrieve data from IDs or FileNodes, things become tricky as this can introduce duplicate triples. Also the |
What I REALLY am aiming towards, in this case, is some sort of community
detection - although probably one I have to hand-build because LPA as built
into SPARK isn't working for me. So for example, assuming the structure
above, figuring out who the "main authors/committers" of a "group of files"
becomes possible. In theory you can assign weights based on how many times
you see another file in the same Commit as you are in, for example. The
months here are any arbitrary time period so you can track how things
change over time, if that makes sense. :)
Thanks!
…On Mon, Nov 15, 2021 at 10:02 AM Enrico Minack ***@***.***> wrote:
Some thinking about how to make the connector support your use-case (#144
<#144>). Your
request would turn into the following DQL query:
query {
commitNodes as var(func: between(<author_date>, "2020-01-01T00:00:00Z", "2020-01-31T23:59:59Z"))
result (func: uid(commitNodes), first: 100, offset: 0) {
<uid>
<author_date>
<~commited> { <uid> }
<modified> { <uid> }
}
}
Which would be straightforward to turn into triples. As soon as you want
to retrieve data from IDs or FileNodes, things become tricky as this can
introduce duplicate triples. Also the , first: 100, offset: 0 would need
to be injected to add partitioning. It looks like only a very limited
subset of DQL can potentially be supported by this connector.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#152 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AE25MYQBJVTOTEG6ICHGFELUMEOGZANCNFSM5IBSA2CQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
|
>> authorDates = nodes.filter("predicate =
'author_date'").selectExpr("subject as node_id", "objectTimestamp as
author_date")
>> authorDates.limit(10).show(10)
+-------+-------------------+
|node_id| author_date|
+-------+-------------------+
|7195639|2020-01-09 23:56:17|
|7195640|2020-01-09 23:56:20|
|7195641|2020-01-09 23:56:19|
|7195642|2020-01-09 23:56:18|
|7195647|2020-01-09 23:57:38|
|7195660|2020-01-10 00:08:16|
|7195664|2020-01-10 00:11:23|
|7195667|2020-01-10 00:11:33|
|7195668|2020-01-10 00:11:38|
|7195676|2020-01-10 00:13:33|
+-------+-------------------+
>> authorDates.count()
1743986
>>
Step one complete (in pySpark which is a lot more legible to me - and
perhaps this code will help some other poor soul doing something similar) :)
…On Mon, Nov 15, 2021 at 9:58 AM Enrico Minack ***@***.***> wrote:
If you are calling that read for many different months, you could also
read the entire committed-modified subgraph, enrich it with author date and
store the data in partitioned parquet files:
val edges = spark.read.dgraph.edges("localhost:9080")
val nodes = spark.read.dgraph.nodes("localhost:9080")
val committedEdges = edges.where($"predicate" === "committed")
val modifiedEdges = edges.where($"predicate" === "modified")
val authorDates = nodes.where($"predicate" === "author_date").select($"subject".as("node_id"), $"objectTimestamp".as("author_date"))
val authorMonths = authorDates.select($"node_id", date_format($"author_date", "yyyy-MM").as("author_month"))
edgesWithAuthorDate = Seq(
committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")),
modifiedEdges.join(authorMonths.withColumnRenamed("node_id", "subject"))
).reduce(_.union(_))
edgesWithAuthorDate.write.partitionBy("author_month").parquet("commit-modified-edges")
Then you can read your sub-graph by calling:
spark.read.parquet("commit-modified-edges").where($"author_month" === "2020-01")
This will read only relevant edges.
I recommend to use DataFrame.writePartitionedBy provided by our spark-extension
package <https://github.com/G-Research/spark-extension> to write sensible
partitioned files
<https://github.com/G-Research/spark-extension/blob/master/PARTITIONING.md#partitioned-writing>
.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#152 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AE25MYSMHHEJ6R66NUYRAJTUMENY5ANCNFSM5IBSA2CQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
|
Ok, I ran into an exception (which is weird because the DGraph is still up
and happy). It's possible I'm mis-translating one of your lines, which I
think is just joining and uniquing the RDD.
from gresearch.spark.dgraph.connector import *
from pyspark.sql import functions as F
edges: DataFrame = spark.read.option("dgraph.chunksize",
300).dgraph.edges("localhost:9080")
nodes: DataFrame = spark.read.option("dgraph.chunksize",
300).dgraph.nodes("localhost:9080")
authorDates = nodes.filter("predicate = 'author_date'").selectExpr("subject
as node_id", "objectTimestamp as author_date")
authorMonths = authorDates.select("node_id", F.date_format("author_date",
"yyyy-MM").alias("date_month"))
committedEdges = edges.where("predicate = 'committed'")
modifiedEdges = edges.where("predicate = 'modified'")
committedEdges.join(authorMonths.withColumnRenamed("node_id",
"objectUid")).show(10)
>> committedEdges.join(authorMonths.withColumnRenamed("node_id",
"objectUid")).show(10)
21/11/19 17:03:28 ERROR Executor: Exception in task 64.0 in stage 15.0 (TID
155)
java.lang.RuntimeException: java.util.concurrent.CompletionException:
java.lang.RuntimeException: The doRequest encountered an execution
exception:
at
uk.co.gresearch.spark.dgraph.connector.executor.DgraphExecutor.query(DgraphExecutor.scala:57)
at
uk.co.gresearch.spark.dgraph.connector.executor.DgraphExecutor.query(DgraphExecutor.scala:30)
at
uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.readChunk(GraphTableModel.scala:93)
at
uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.readChunk$(GraphTableModel.scala:85)
at
uk.co.gresearch.spark.dgraph.connector.model.NodeTableModel.readChunk(NodeTableModel.scala:26)
at
uk.co.gresearch.spark.dgraph.connector.model.GraphTableModel.$anonfun$modelPartition$4(GraphTableModel.scala:81)
at
uk.co.gresearch.spark.dgraph.connector.model.ChunkIterator.next(ChunkIterator.scala:46)
at
uk.co.gresearch.spark.dgraph.connector.model.ChunkIterator.next(ChunkIterator.scala:23)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at
uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.next(TriplePartitionReader.scala:28)
at
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.foreach(WholeStageCodegenExec.scala:753)
at
org.apache.spark.sql.execution.joins.UnsafeCartesianRDD.compute(CartesianProductExec.scala:47)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException:
java.lang.RuntimeException: The doRequest encountered an execution
exception:
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.RuntimeException: The doRequest encountered an
execution exception:
at
uk.co.gresearch.thirdparty.io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:248)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
Caused by: java.util.concurrent.ExecutionException:
uk.co.gresearch.thirdparty.io.grpc.StatusRuntimeException: UNAVAILABLE: io
exception
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at
uk.co.gresearch.thirdparty.io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:216)
... 7 more
Caused by: uk.co.gresearch.thirdparty.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception
at
uk.co.gresearch.thirdparty.io.grpc.Status.asRuntimeException(Status.java:533)
at
uk.co.gresearch.thirdparty.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478)
at
uk.co.gresearch.thirdparty.io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
at
uk.co.gresearch.thirdparty.io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
at
uk.co.gresearch.thirdparty.io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
at
uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
at
uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at
uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
at
uk.co.gresearch.thirdparty.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
at
uk.co.gresearch.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
uk.co.gresearch.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.netty.channel.unix.Errors$NativeIoException: readAddress(..)
failed: Connection reset by peer
…On Mon, Nov 15, 2021 at 9:58 AM Enrico Minack ***@***.***> wrote:
If you are calling that read for many different months, you could also
read the entire committed-modified subgraph, enrich it with author date and
store the data in partitioned parquet files:
val edges = spark.read.dgraph.edges("localhost:9080")
val nodes = spark.read.dgraph.nodes("localhost:9080")
val committedEdges = edges.where($"predicate" === "committed")
val modifiedEdges = edges.where($"predicate" === "modified")
val authorDates = nodes.where($"predicate" === "author_date").select($"subject".as("node_id"), $"objectTimestamp".as("author_date"))
val authorMonths = authorDates.select($"node_id", date_format($"author_date", "yyyy-MM").as("author_month"))
edgesWithAuthorDate = Seq(
committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")),
modifiedEdges.join(authorMonths.withColumnRenamed("node_id", "subject"))
).reduce(_.union(_))
edgesWithAuthorDate.write.partitionBy("author_month").parquet("commit-modified-edges")
Then you can read your sub-graph by calling:
spark.read.parquet("commit-modified-edges").where($"author_month" === "2020-01")
This will read only relevant edges.
I recommend to use DataFrame.writePartitionedBy provided by our spark-extension
package <https://github.com/G-Research/spark-extension> to write sensible
partitioned files
<https://github.com/G-Research/spark-extension/blob/master/PARTITIONING.md#partitioned-writing>
.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#152 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AE25MYSMHHEJ6R66NUYRAJTUMENY5ANCNFSM5IBSA2CQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
|
The |
I set my chunksize down to 200 and 100 to try again....and the same thing
happened. As another datapoint: There are no errors from my Dgraph and
Ratel is happy as well. I forget how to enable debugging though, which in
theory would provide more data?
I'm just using the single-instance Docker image that has one Alpha and Zero
node on it. But I'm not sure why this would cause a connection reset issue?
It's weird because pagecount worked fine.
OH. VERY STRANGE. chunksize of 500 works. chunksize of 300 does not. Is it
possible with a lower chunksize we run out of file handles or something?!?
(or is there some other error?)
>>
>> edges: DataFrame = spark.read.option("dgraph.chunksize",
500).dgraph.edges("localhost:9080")
>> nodes: DataFrame = spark.read.option("dgraph.chunksize",
500).dgraph.nodes("localhost:9080")
>> authorDates = nodes.filter("predicate =
'author_date'").selectExpr("subject as node_id", "objectTimestamp as
author_date")
>> authorMonths = authorDates.select("node_id",
F.date_format("author_date", "yyyy-MM").alias("date_month"))
>> committedEdges = edges.where("predicate = 'committed'")
>> modifiedEdges = edges.where("predicate = 'modified'")
>> committedEdges.join(authorMonths.withColumnRenamed("node_id",
"objectUid")).show(10)
+-------+---------+---------+---------+----------+
|subject|predicate|objectUid|objectUid|date_month|
+-------+---------+---------+---------+----------+
|7195635|committed| 8392773| 7195639| 2020-01|
|7195635|committed| 8392773| 7195640| 2020-01|
|7195635|committed| 8392773| 7195641| 2020-01|
|7195635|committed| 8392773| 7195642| 2020-01|
|7195635|committed| 8392773| 7195647| 2020-01|
|7195635|committed| 8392773| 7195660| 2020-01|
|7195635|committed| 8392773| 7195664| 2020-01|
|7195635|committed| 8392773| 7195667| 2020-01|
|7195635|committed| 8392773| 7195668| 2020-01|
|7195635|committed| 8392773| 7195676| 2020-01|
+-------+---------+---------+---------+----------+
only showing top 10 rows
Thanks!
…On Fri, Nov 19, 2021 at 3:18 PM Enrico Minack ***@***.***> wrote:
Have you set your chunk size small enough?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#152 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AE25MYV6VGXMAHUCBN6ZDXLUM2WIHANCNFSM5IBSA2CQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
|
Sorry for the late reply. Hope this background info is still relevant. Smaller chunk sizes mean more partitions. With a large Spark cluster, this means more concurrent reads hitting the Dgraph cluster, which might cause the unresponsiveness. |
Let's say I had a very large graph of IDs->commited->CommitNodes->modified->FileNodes where my edges are reversable.
Commitnodes have a author_date that is a datetime type in the DGraph schema. (ID's and FileNodes do not).
If I want to select from my graph a subgraph of of all the commitNodes for a particular month, and all the filenodes and IDs that they link to, I'm not sure if I can with the current API. Or can I?
Thanks in advance!
-dave
The text was updated successfully, but these errors were encountered: