New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can't use dataloader twice within a single field #1198
Comments
Can you please give a more concrete example of the type of fetchers you are doing that require multiple data loaders. Psudeo java code is ok The data loader instrumentation code watches fields fetches not data loader calls. So off hand I would expect that given Or are you chaining them?? where the results of dataloader A feeds into a new dataloader B? |
Hi, Thanks for your reply. A common (?) use-case would be when the graphql schema does not match the database schema. A single object can require data from multiple tables to be fetched. One example is a many-to-many relationship. A "USER" reads many "BOOKS". A "BOOK" has many "READERS". Right now, if we want to use dataloaders, the results of one dataloader must feed into another dataloader. We must include the intermediary associative table "USERS_BOOKS" as a GraphQLObjectType, which may not be desirable. ideally we can do this (pardon my kotlin): Another example is a relational database where one table represents a "main" object and there are many other tables which are subclasses of that table, there is a table for "PERSON" that has name and id, but stores information on other tables like "USER_WITH_OCCUPATION" and "USER_WITH_HEALTHCARE_INFORMATION". Sometimes we don't want to expose the subclasses as fragments. And a third example is as you mentioned if a dataloader needs to chain from the results of another. I think we just need to wrap the load function: in DataLoader:
A side benefit is that execution can be performed asynchronously instead of level by level. |
Thanks for outlining more in detail. One of the challenges we have today in Java is knowing when to dispatch the data loaders. Today we track fields and levels and dispatch once we have exhausted all fields. However with chained data loaders, we will kick it off but we will not know when to dispatch again since we don't see the chaining in place This is not a problem in JavaScript because they have a You could manually call |
Thank you for your reply. I'm using graphql-java on a Kotlin project. Turns out that it's not too difficult in Kotlin to implement a node.js-style nextTick() function to queue dispatches. Kotlin has its own async framework. Then I just have to re-write AsyncExecutionStrategy and DataLoader to use Kotlin's async framework rather than the Java standard CompletableFuture framework. The resulting dataloader would function just as intuitively as the node.js version. I guess my solution is more relevant for people who use Kotlin. Not sure how many of us are here. |
I saw that you reopened this issue and thought to chime in. If the desired eventual outcome is to implement the node.js functionality, I would suggest a centralised dispatcher. It makes sense to make DataLoaderDispatchInstrumentation a first-class entity rather than an optional one.
I'm using a similar solution but with Kotlin's coroutine framework. I'm not very familiar with Java so please forgive me for mistakes. |
Hey all, Is there any solution in Java (graphql) for chained DataLoaders ? Thanks Guy. |
You can chain data loaders BUT you cant do it with ULTIMATE efficiency graphql-java tracks fields and hence how many dataLoader.dispatch() calls it needs to make if it does it too much then it will not be as efficient as it could be - if it does it too little the threads will block on promise that will never finished You could do this
The graphql field tracking will cause the first data loader to dispatch() while the second needs manual calling since graphql did not know that a chain CF was in play This might mean you dispatch some other fields too early and hence less optimally Its not optimal but it is possible. |
I have exactly the same problem: I'm fetching a set of objects, and then need to check permissions for each of these objects via a separate call to another service. I can't do that efficiently with the current data loaders. As you said, I could hack something together that would kind of work, but it isn't nearly as efficient, adds quite a bit of complexity and requires a deeper understanding of how queries are resolved in GraphQL, and how graphql-java orchestrates the loaders. I don't feel comfortable going down that route given that many different teams at my company will be contributing to the GraphQL schema, and I can't expect everyone to go through the trouble of wiring up data fetchers and dispatching them manually. Ideally, whether or not batching is done should only need to be known at the resolver (data fetcher) level. If batching is desired, implement a batched data fetcher. If batching is not desired, implement a normal data fetcher. So, how hard would it be to implement something along the lines of Sangria's DeferredResolver, i.e. DeferredDataFetcher? I think that's a much better abstraction than having to add a second concept of loaders, having to construct them for every query, passing them through the context and then calling them in the resolver. Deferred data fetchers could have a method called If the resolver is inherently batched, all the problems of figuring out when to dispatch and whether to dispatch manually go away, because inside the resolver everything is batched, so firing off a second batch async request is as simple as chaining the promise. I'm not familiar with the internals of graphql-java's execution engine, but I assume that since it's already aware of levels there is a place in the code where the executor is aware of all the fields at a given level, and could batch together all calls to a given resolver (aka data fetcher) if the field has a DeferredDataFetcher. |
@Togrias - Did this end up working in Kotlin? And if so would you mind sharing how you did it? |
@stevenheidel General idea: Dataloaders delay execution until their dispatch() method is called. The GraphQL query execution strategy executes datafetchers concurrently as it traverses the query graph. Whenever it calls a dataloader.load() method, it suspends indefinitely (gets stuck). We want to call dispatch() only when all current coroutines are suspended.
Explanation: The atomic counter tracks the number of suspendable coroutines that are currently being executed and not suspended. When it reaches zero, we know it's time to fire the DataLoader dispatch() function (i.e. nextTick()).
I use kotlin extension functions so they're easier to replace and the code reads similarly to the graphql.java implementation so I can maintain them more easily. The goal is to force graphql.java to use the CoroutineDispatcher to create all its Futures. I find that Dataloaders are quite difficult to test. But I'd be more worried that they get stuck indefinitely (overbatch), and not that they fetch too quickly (underbatch). So far, the above coroutines-based approach hasn't caused me any problems on the former. |
It's not only dataloader which is impossible to use twice: you also can't use single dataloader which depends on any async result: return CompletableFuture.supplyAsync {
Thread.sleep(100L)
123 // simulate API call which returns some key
}.thenCompose { key ->
dataLoader.load(key)
} Things will work only if you call dataLoader.load() exactly when preparing CompletableFuture returned from DataFetcher. If dataLoader.load() is called after DataFetcher is done, then it will be never dispatched. Yes, I can move async API call to a new dataLoader, but this will greatly limit the batching ability in case of such schema: type Query {
field1: Type
field2: Type
// ....
fieldN: Type
} In this case I would have N different dataloaders for one single type, and they wouldn't batch together. This is quite significant limitation. |
@bbakerman Ironically, while there's all this business going on with Futures and AsyncExecutionStrategy to make it work with a modern async concurrency system, we have to resort to good old sync code with threads when fetching cases get a little bit more complicated than most simple ones. |
@Togrias - thanks for the help so far, really appreciate the detailed explanation of your Kotlin implementation. Here's what I've got so far with the dispatcher: import kotlinx.coroutines.*
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
class QueueableDispatcher(
private val wrappedDispatcher: CoroutineDispatcher
) : CoroutineDispatcher() {
private val counter: AtomicInteger = AtomicInteger(0)
private val queue: Queue<Runnable> = LinkedList()
override fun dispatch(context: CoroutineContext, block: Runnable) {
counter.incrementAndGet()
val wrappedBlock = Runnable {
block.run()
if (counter.decrementAndGet() == 0) {
queue.poll()?.run()
}
}
wrappedDispatcher.dispatch(context, wrappedBlock)
}
fun nextTick(block: suspend () -> Unit) {
queue.add(Runnable { block() }) // *****
}
} Unfortunately now I'm stuck since it doesn't make sense to convert a suspend fun into a Runnable for use in the queue. If the nextTick function makes an async call as in the reference implementation it's not clear what to do next. |
Here's my implementation:
My queue is a LinkedList of Jobs. |
I don't know much Java, much less Java's Futures library, but I think one way to implement this in Java without a framework is to have onStart() and onComplete() methods that track the beginning and completion of each asynchronous block of code. You can expose a series of wrapper classes:
Alternatively expose the onStart() and onComplete() methods in the DataFetchingEnvironment that users manually call whenever an async future is created (eg. supplyAsync / whenComplete). The idea is to allow users to manually increment/decrement the counter whenever an asynchronous block of code begins/ends. It looks quite painful though. Edit: Or we can have a new CustomFutures framework that creates futures that automatically implement the above wrappers. E.g. we overrride supplyAsync/whencomplete/thenAccept directly. |
I have the same use case and problem as @helfer and others. I've have a fetcher that need data from another DataLoader and I want to defer dispatch and batch all of those queries instead of having to resolve them one by one. Are there any plans on supporting this use case or workarounds in Java? |
@Togrias Do you have a fully working setup here? We use Kotlin coroutines as well and would love to try yours if it works. |
@glasser the code/rough idea provided by @stevenheidel and me works. I only have piecemeal solutions that override the execution portion of the graphql-java library, while keeping the code as similar to the graphql-java implementation as possible for easy maintenance (I'm not good with Java). If you need help on specific areas, I can post some code here. While it does work, it messes with CoroutineDispatcher and is thus very fragile. Using it can conflict with another library that interacts with CoroutineDispatcher. It seems to me that the most idiomatic kotlin way is to use kotlinx.coroutines.yield(). That would probably require a ground-up rewrite of the graphql-java library to use coroutines though. |
@Togrias Are you able to share the code you've written? Even just in a gist or something? Including KDataLoader, the changed executor files, etc? |
@Togrias and just to make sure I understand: your step 3 above (where you actually rewrite big chunks of the graphql-java library) is actually necessary? Like, graphql-java internally does enough async stuff that if you don't do that, the queue will get emptied way more often than you want and not much batching will happen? |
(How awful would it be to implement nextTick as just "run in 1 ms"? So every dataloader use would say "if there isn't already a queued-and-not-started |
OK, expanding on my last comment, here's a new approach I'm trying. Note that I'm using Kotlin coroutines including Flows here, though the basic concept could work without them — mostly just using Flows to get access to debounce. First, note that the way we use suspend functions in field fetchers in the first place involves putting a typeBuilder.dataFetcher(name) { env ->
env.getContext().coroutineScope.future {
env.fetcher() // run the actual fetcher defined in our custom DSL
}
} (The Wrap the DataLoaderRegistry in a (per-request) DataLoaders object. This both provides some easy type-safe wrappers for some of my favorite DataLoaders, and provides this class DataLoaders(
private val registry: DataLoaderRegistry,
private val coroutineScope: CoroutineScope,
private val data: SchemaDataSources, // This gives the high-level loaders access to our DB etc
private val dispatch: suspend () -> Unit
) {
private enum class LoaderName {
SAMPLE_LOADER_A // etc
}
// Loads an element via the named DataLoader, suspending until it is ready. Ensures that registry.dispatchAll()
// is called soon. All DataLoader loads must go through this function!
suspend fun <K, V> load(dataLoaderName: String, k: K): V =
registry.getDataLoader<K, V>(dataLoaderName).load(k).also {
// Make the channelFlow in graphqlHandlers.kt emit an element into the debouncing logic.
dispatch.invoke()
}.await()
suspend fun sampleLoaderA(id: String): Foo? = load(SAMPLE_LOADER_A.name, id)
init {
registry.register(SAMPLE_LOADER_A.name, DataLoader.newMappedDataLoader<String, Foo> { ids ->
coroutineScope.future {
data.getFooByIDs(ids)
}
})
}
// etc for more loaders
} This Stop using the built-in DataLoaderRegistry instrumentation: When building Our HTTP handler (we use Ktor) looks essentially like: // Start a new scope, which we provide to GraphQL execution to use to start `future`
// coroutines. This handler will wait until all such coroutines are finished before
// finishing, and if the handler is canceled, all the coroutines will as well.
// However, we do use a supervisorScope instead of a coroutineScope, so that failure
// of launched coroutines (eg, an error thrown by a suspendField resolver) does not
// lead to immediate cancellation of the scope (ie, of the whole operation).
// We don't leak these failures, because they all end up in CompletableFutures
// that are consumed inside the GraphQL-Java machinery.
supervisorScope {
val scope = this
// note: nonBlockingInputStream is something we wrote too
val params = call.nonBlockingInputStream(maxContentLength).use { stream ->
try {
OurObjectMapper.readValue<GraphQLQueryParams>(stream)
} catch (t: JsonParseException) {
throw BadRequestException(t)
} catch (t: JsonMappingException) {
throw BadRequestException(t)
}
}
// If you give a DataLoaderRegistry to GraphQL-Java (in the ExecutionInput below), it has some
// some fancy logic to call dispatchAll() on it once per "depth" level of the GraphQL resolver tree.
// This is very clever! But it has the serious drawback that you can only use DataLoaders synchronously
// at the beginning of a resolver; you can't use it twice in a resolver, or after running some other
// suspend function! If you mess this up, you just hang forever. At one point we had a wrapper around
// getting a DataLoader which threw if you had suspended before, which at least made the error more
// obvious, but it's still an annoying restriction. More information on that restriction, plus some
// sketches of ideas of how to fix this for Kotlin coroutines specifically involving a fancy
// custom CoroutineDispatcher, is at https://github.com/graphql-java/graphql-java/issues/1198
//
// We take a different approach. We assume that most DataLoaders are doing some sort of network
// call, probably to a database, such that slowing down the call by 1ms would not be noticeable.
// We execute the GraphQL request in a "channel Flow", which uses `channel.send` to output values
// to the Flow returned by channelFlow every time we start a DataLoader load (via DataLoaders.load).
// We then debounce that Flow so that it only produces values when there hasn't been an incoming value for
// 1ms.
//
// Every time we successfully reads an element from the flow, it dispatches all outstanding DataLoaders. If
// 1ms is too small and more load requests are still coming in, that's OK --- they will hopefully at least
// come in while the first batch is being loaded, so maybe we'll end up with one more batch than optimally
// necessary, but 2 is still likely less than N.
val dataLoaderRegistry = DataLoaderRegistry()
channelFlow<Unit> {
val dataLoaders = DataLoaders(
registry = dataLoaderRegistry,
coroutineScope = scope,
data = ourDataSourceObject,
dispatch = { channel.send(Unit) }
)
val executionResult = try {
graphql.executeAsync(
// Note: we do *NOT* pass dataLoaderRegistry directly to ExecutionInput here;
// we use our own dispatch logic above instead of integrating with GraphQL-Java's.
ExecutionInput.newExecutionInput(params.query)
.operationName(params.operationName)
.context(GraphqlAPIContext(
dataLoaders = dataLoaders,
coroutineScope = scope
))
.variables(params.variables)
.build()
).await() as ExecutionResultImpl
} catch (t: Throwable) {
// Convert any thrown error to a GraphQL error.
// XXX It's possible this is wrong and the only errors that cause executeAsync().await() to throw
// are internal enough that they shouldn't be reported to end users.
ExecutionResultImpl(listOf(t as? GraphQLError ?: SimpleGraphQLError(t)))
}
call.respondText(
OurObjectMapper.writeValueAsString(executionResult.toSpecification()),
ContentType.Application.Json
)
}
.debounce(1.milliseconds)
.collect {
willDispatchAll?.invoke() // a hook for unit testing this logic
dataLoaderRegistry.dispatchAll()
}
} Basically, we run our execution inside a So far this appears to work but it's definitely in "code I just wrote" stage rather than "battle-tested"! |
@glasser Thank you for posting your approach. Since the tick reference implementation re-adapted by @Togrias doesn't rely on time, does that mean it is a superior approach? (provided we don't consider the integration fragility with graphql-java) |
I'm not familiar with those implementations but I assume they're relatively similar. The downside to @Togrias' approach, if I understand correctly, is that in order for it to be reliable you need to rewrite all the async code in graphql-java to use kotlin coroutines; I'm not quite sure what happens if you don't. (I think that's what you're saying re fragility.) |
Yes that's what I was referring to when I said fragility, but taking that out of the picture, introducing an artificial delay felt to me "intuitively" a little less elegant that the original event loop approach. Although don't get me wrong, I learned a few tricks reading your code. (neat idea using debounce) In the meantime a colleague found this other issue graphql/dataloader#58 which requests for a delay based dispatching approach option in the original data-loader library. People make good arguments on both sides but without experimentation IMO it's hard to know ahead of time which approach will prove more efficient. Looks like it might depend widely on the application use case. |
The JS dataloader does support custom scheduling as of the recent 2.0 release, actually (graphql/dataloader#228) |
I refactored @glasser 's example above for use in our app (using graphql-kotlin) ... seems to be working nicely: https://gist.github.com/lennyburdette/f3fe6ae7a498698774cc95d1bfc956b4 |
I encounter the same use case in the java world. We are able to mitigate this issue by adding some addition APIs for Our use case: CompletableFuture<List<Profile>> profileFuture = profileDataLoader.loadMany(input.getMemberIds);
CompletableFuture<List<GeoEntity>> geoFuture = moprofileFuture.thenCompose(profiles -> {
List<GeoId> geoIds = getGeoIdsFromProfies(profiles);
return geoDataLoader.loadMany(geoIds);
// Without this line, execution will never finish.
geoDataLoader.dispatch();
}); The line What we end up doing is providing some APIs in public CompletableFuture<V> load(CompletableFuture<K> keyFuture);
public CompletableFuture<List<V>> loadMany(CompletableFuture<List<K>> keysFuture); Inside DataLoader implementation, we can choose to either invoke So the client side code now looks like this CompletableFuture<List<Profile>> profileFuture = profileDataLoader.loadMany(input.getMemberUrns());
CompletableFuture<List<GeoUrn>> geoUrnFuture = profileFuture.thenApply(this::getGeoIdsFromProfies);
CompletableFuture<List<GeoEntity>> geoFuture = geoDataLoader.loadMany(geoUrnFuture); We also add lint rules to forbid the load APIs to be invoked asynchronously. |
@linqi - can you please provide more code examples on how you dispatch You said
This is the bit I am unsure of. How did you decide to dispatch on the inside? Seeing some code would be unreal |
@bbakerman of course. Approach 1) - Using a wrapper for DataLoader to hide dispatch(). This is what we are doing right now. public class DataLoaderWrapper<K, V> {
private DataLoader<K, V> _dataLoader;
/**
* Method to accept a CompletableFuture of a key and load the key from a data loader.
*
* @param keyFuture the given key future
* @return
*/
public CompletableFuture<V> load(CompletableFuture<K> keyFuture) {
return keyFuture.thenCompose(key -> {
CompletableFuture<V> resultFuture = _dataLoader.load(key);
_dataLoader.dispatch();
return resultFuture;
});
}
/**
* Method to accept a CompletableFuture of a list of keys, and load them from a data loader.
*
* @param keysFuture the given key list future
* @return
*/
public CompletableFuture<List<V>> loadMany(CompletableFuture<List<K>> keysFuture) {
return keysFuture.thenCompose(keys -> {
CompletableFuture<List<V>> resultFuture = _dataLoader.loadMany(keys);
_dataLoader.dispatch();
return resultFuture;
});
} |
Approach 2) - Have DataLoaderHelper.java support CompletableFuture<V> load(CompletableFuture<K> keyFuture, Object loadContext) {
synchronized (dataLoader) {
...
CompletableFuture<V> future = new CompletableFuture<>();
if (batchingEnabled) {
loaderFutureQueue.add(new DataLoaderHelper.LoaderQueueEntry<>(keyFuture, future, loadContext));
} else {
stats.incrementBatchLoadCountBy(1);
// immediate execution of batch function
future = invokeLoaderImmediately(keyFuture, loadContext);
}
if (cachingEnabled) {
futureCache.set(cacheKey, future);
}
return future;
}
}
CompletableFuture<List<V>> dispatch() {
...
final List<K> keys = new ArrayList<>();
final List<CompletableFuture<K>> keyFutures = new ArrayList<>();
synchronized (dataLoader) {
loaderQueue.forEach(entry -> {
keys.add(entry.getKey());
queuedFutures.add(entry.getValue());
callContexts.add(entry.getCallContext());
});
loaderQueue.clear();
loaderFutureQueue.forEach(entry -> {
keyFutures.add(entry.getKey());
queuedFutures.add(entry.getValue());
callContexts.add(entry.getCallContext());
});
loaderFutureQueue.clear();
}
...
// Loading result future of keys
CompletableFuture<List<V>> keyLoadingResult = getDispatchResults(keys, queuedFutures, callContexts);
// Loading result future of key futures
CompletableFuture<List<V>> keyListFuture = CompletableFuture.allOf(keyFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> keyFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.thenCompose(fulfilledKeys -> getDispatchResults(fulfilledKeys, queuedFutures, callContexts));
return keyListFuture.thenCombine(keyListFuture, (result1, result2) -> {
// merging two results.
})
} |
Hello, just wanted to mention that we found a way to solve this problem natively with more info -> |
@samuelAndalon is your solution portable to graphql-java (we do not use kotlin, but facing the same issue)? |
@pfyod yes, we have an specific module for instrumentations wrote some tests here Ideally these instrumentations could be provided by graphql-java just havent found the time to create a PR |
@samuelAndalon thank you, we made it work in our project! FYI, you have a mistake in documentation (ExpediaGroup/graphql-kotlin#1557) For everyone else who's trying to call graphql-kotlin library from java code, this is the the way to add kotlin's state object into the GraphQL context from java (exact code depends on the framework you use of course, the important part is wrapping the class with input.getGraphQLContext().put(
kotlin.jvm.JvmClassMappingKt.getKotlinClass(SyncExecutionExhaustedState.class),
new SyncExecutionExhaustedState(1, kotlinDataLoaderRegistry)
); |
Unsure if this is useful here, but while working at Carbon Health I amended @lennyburdette's @muratakbal was nice enough to give me permission to extract and publish it. I have done that here. The debounce code specifically is in this file, with tests for all the batching cases here. We used these in prod without seeing any issues, though at relatively small scale. |
Hello, this issue has been inactive for 60 days, so we're marking it as stale. If you would like to continue this discussion, please comment within the next 30 days or we'll close the issue. |
The data loader instrumentation only allows one data loader to resolve per field. Complex queries may require more than a single data loader to resolve.
Would it be better to change the dispatcher instrumentation to something like the following:
when dataloader.load(key) is called - if (numberOfIdleFetches++ == numberOfExpectedFetches) dispatchAll();
when load is resolved: numberOfIdleFetches--
The text was updated successfully, but these errors were encountered: