Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't use dataloader twice within a single field #1198

Open
Togrias opened this issue Sep 1, 2018 · 37 comments
Open

Can't use dataloader twice within a single field #1198

Togrias opened this issue Sep 1, 2018 · 37 comments
Labels

Comments

@Togrias
Copy link

Togrias commented Sep 1, 2018

The data loader instrumentation only allows one data loader to resolve per field. Complex queries may require more than a single data loader to resolve.

Would it be better to change the dispatcher instrumentation to something like the following:
when dataloader.load(key) is called - if (numberOfIdleFetches++ == numberOfExpectedFetches) dispatchAll();
when load is resolved: numberOfIdleFetches--

@bbakerman
Copy link
Member

Can you please give a more concrete example of the type of fetchers you are doing that require multiple data loaders.

Psudeo java code is ok

The data loader instrumentation code watches fields fetches not data loader calls. So off hand I would expect that given fieldX if its data fetcher called 2 data loaders it would not care that 2 calls are made

Or are you chaining them?? where the results of dataloader A feeds into a new dataloader B?

@Togrias
Copy link
Author

Togrias commented Sep 3, 2018

Hi,

Thanks for your reply. A common (?) use-case would be when the graphql schema does not match the database schema. A single object can require data from multiple tables to be fetched.

One example is a many-to-many relationship. A "USER" reads many "BOOKS". A "BOOK" has many "READERS". Right now, if we want to use dataloaders, the results of one dataloader must feed into another dataloader. We must include the intermediary associative table "USERS_BOOKS" as a GraphQLObjectType, which may not be desirable.

ideally we can do this (pardon my kotlin):
val userBooks = userBooksLoader.load(x).await() val books = userBooks.map { booksLoader.load(it.books).await() }

Another example is a relational database where one table represents a "main" object and there are many other tables which are subclasses of that table, there is a table for "PERSON" that has name and id, but stores information on other tables like "USER_WITH_OCCUPATION" and "USER_WITH_HEALTHCARE_INFORMATION". Sometimes we don't want to expose the subclasses as fragments.
val user = usersLoader.load(x).await() val userWithOccupation = userWithOccupationLoader.load(user.occupation).await() val userWithAllTheInformation = new UserWithAllTheInformation(user, userWithOccupation, ...)

And a third example is as you mentioned if a dataloader needs to chain from the results of another.

I think we just need to wrap the load function:

in DataLoader:
suspend fun asyncLoad(id: K, callback: {with onAwait() and onComplete() callbacks}) { //wrap the original CompletableFuture with a CompletableDeferred (if kotlin) / CompletableFuture (if java) that has the aforementioned callbacks. }

class WrappedCompletable(original: CompletableDeferred, callbacks) { suspend fun await() { onAwait() return original.await().also { onCompleted() } } }
Then pass the callbacks to the instrumentation to count the number of delayed fields.

A side benefit is that execution can be performed asynchronously instead of level by level.

@bbakerman
Copy link
Member

Thanks for outlining more in detail.

One of the challenges we have today in Java is knowing when to dispatch the data loaders. Today we track fields and levels and dispatch once we have exhausted all fields.

However with chained data loaders, we will kick it off but we will not know when to dispatch again since we don't see the chaining in place

This is not a problem in JavaScript because they have a tick loop and it fires when node.js has not more work to do. Hence they hooked data loader into it

You could manually call dataLoaderRegistry.dispatch() inside chained calls but the it would be TOO eager I think.

@Togrias
Copy link
Author

Togrias commented Sep 8, 2018

Thank you for your reply.

I'm using graphql-java on a Kotlin project. Turns out that it's not too difficult in Kotlin to implement a node.js-style nextTick() function to queue dispatches. Kotlin has its own async framework.

Then I just have to re-write AsyncExecutionStrategy and DataLoader to use Kotlin's async framework rather than the Java standard CompletableFuture framework. The resulting dataloader would function just as intuitively as the node.js version.

I guess my solution is more relevant for people who use Kotlin. Not sure how many of us are here.

@Togrias Togrias closed this as completed Sep 8, 2018
@bbakerman bbakerman reopened this Sep 11, 2018
@Togrias
Copy link
Author

Togrias commented Oct 5, 2018

I saw that you reopened this issue and thought to chime in.

If the desired eventual outcome is to implement the node.js functionality, I would suggest a centralised dispatcher. It makes sense to make DataLoaderDispatchInstrumentation a first-class entity rather than an optional one.

  1. Deprecate using CompletableFuture static methods like CompletableFuture.supplyAsync() in DataFetchers.
  2. Move the above methods into DataFetchingEnvironment. Eg. DataFetchingEnvironment.supplyAsync().
  3. The new methods spawn an extension of CompletableFuture that holds a reference to the Instrumentation. methods such as whenComplete() would also notify the centralised Instrumentation.
  4. The ExecutionStrategy would also have to use the centralised dispatcher to execute asynchronous operations.
  5. The DataLoaderDispatchInstrumentation instance is now able to monitor all asynchronous operations and implement process.nextTick().

I'm using a similar solution but with Kotlin's coroutine framework. I'm not very familiar with Java so please forgive me for mistakes.

@guy-klebanov
Copy link

Hey all,

Is there any solution in Java (graphql) for chained DataLoaders ?
Anyone done it and can share (don't want to reinvent the wheel) ?

Thanks Guy.

@bbakerman
Copy link
Member

You can chain data loaders BUT you cant do it with ULTIMATE efficiency

graphql-java tracks fields and hence how many dataLoader.dispatch() calls it needs to make

if it does it too much then it will not be as efficient as it could be - if it does it too little the threads will block on promise that will never finished

You could do this

DataFetcher df  = env -> {
        CompletableFuture cf = firstDataLoader.load(env.getArgument("id")):
       cf.thenApply( resultFromFirst -> secondDataLoader.load(resultFromFirst.otherId();
           // important step next
          secondDataLoader.dispatch();
));

The graphql field tracking will cause the first data loader to dispatch() while the second needs manual calling since graphql did not know that a chain CF was in play

This might mean you dispatch some other fields too early and hence less optimally

Its not optimal but it is possible.

@helfer
Copy link
Contributor

helfer commented Mar 17, 2019

I have exactly the same problem: I'm fetching a set of objects, and then need to check permissions for each of these objects via a separate call to another service. I can't do that efficiently with the current data loaders. As you said, I could hack something together that would kind of work, but it isn't nearly as efficient, adds quite a bit of complexity and requires a deeper understanding of how queries are resolved in GraphQL, and how graphql-java orchestrates the loaders. I don't feel comfortable going down that route given that many different teams at my company will be contributing to the GraphQL schema, and I can't expect everyone to go through the trouble of wiring up data fetchers and dispatching them manually. Ideally, whether or not batching is done should only need to be known at the resolver (data fetcher) level. If batching is desired, implement a batched data fetcher. If batching is not desired, implement a normal data fetcher.

So, how hard would it be to implement something along the lines of Sangria's DeferredResolver, i.e. DeferredDataFetcher? I think that's a much better abstraction than having to add a second concept of loaders, having to construct them for every query, passing them through the context and then calling them in the resolver.

Deferred data fetchers could have a method called getBatch, whose signature could be just like get of a normal data fetcher, except that it would take a list of DataFetchingEnvironment. To provide extra convenience for the common use-case where only the source is needed (eg. fetching a list of related objects by their id), a function could be provided to extract the sources from the BatchDataFetchingEnvironment.

If the resolver is inherently batched, all the problems of figuring out when to dispatch and whether to dispatch manually go away, because inside the resolver everything is batched, so firing off a second batch async request is as simple as chaining the promise.

I'm not familiar with the internals of graphql-java's execution engine, but I assume that since it's already aware of levels there is a place in the code where the executor is aware of all the fields at a given level, and could batch together all calls to a given resolver (aka data fetcher) if the field has a DeferredDataFetcher.

@stevenheidel
Copy link

Turns out that it's not too difficult in Kotlin to implement a node.js-style nextTick() function to queue dispatches. Then I just have to re-write AsyncExecutionStrategy and DataLoader to use Kotlin's async framework rather than the Java standard CompletableFuture framework. The resulting dataloader would function just as intuitively as the node.js version.

@Togrias - Did this end up working in Kotlin? And if so would you mind sharing how you did it?

@Togrias
Copy link
Author

Togrias commented Dec 1, 2019

@stevenheidel
I have a node.js-ish implementation written in Kotlin. I assume you are familiar with the functionality of Kotlin coroutines, including the CoroutineDispatcher and CoroutineContext, as well as the original node.js reference implementation of DataLoaders (including process.nextTick()). Here are roughly my steps from memory:

General idea: Dataloaders delay execution until their dispatch() method is called. The GraphQL query execution strategy executes datafetchers concurrently as it traverses the query graph. Whenever it calls a dataloader.load() method, it suspends indefinitely (gets stuck). We want to call dispatch() only when all current coroutines are suspended.

  1. Create a new CoroutineDispatcher wrapping the CommonPool (the default coroutine dispatcher - or whatever dispatcher your app is using) I call it the QueueableDispatcher. It has an atomic integer counter and a mutable list of callbacks.
    1a) Override the QueueableDispatcher's "execute" method to the following:
    i) increment the atomic counter, then;
    ii) call the "execute" method of the wrapped inner dispatcher (CommonPool), then;
    ii) decrement the atomic counter, then if it's zero, execute and remove the first callback on the list.
    1b) A nextTick() method takes a callback and adds it to the aforementioned mutable list. You can optionally return a Deferred from it, but it's not necessary for the implementation.

Explanation: The atomic counter tracks the number of suspendable coroutines that are currently being executed and not suspended. When it reaches zero, we know it's time to fire the DataLoader dispatch() function (i.e. nextTick()).

  1. Create new KDataLoader class and KBatchLoader interface that has uses suspend functions rather than futures. For instance, the Java BatchLoader is List -> Future<List>, we should use suspend List -> List instead.
    2a) Now that we have the nextTick() method as above, we can implement the Facebook DataLoader reference implementation. You can access the QueueableDispatcher in suspending functions using either CoroutineContext or the GraphQL Context.
    2b) Make sure that your KDataLoader's load(key) and dispatch() methods are suspending functions and have QueueableDispatcher inside their CoroutineContext.

  2. Rewrite portions of the graphql.executions package: ExecutionStrategy, AsyncExecutionStrategy, AsyncSerialExecutionStrategy (or whatever it's called - the one that GraphQL.java uses for mutations), and the Async object. For each method that produces a future, rewrite it such that the Future created uses the Kotlinx coroutines future() api. Pass QueueableDispatcher into the context. Also pay attention to .whenComplete and .thenXX (thenAccept, etc) calls. Replace them all.

I use kotlin extension functions so they're easier to replace and the code reads similarly to the graphql.java implementation so I can maintain them more easily.

The goal is to force graphql.java to use the CoroutineDispatcher to create all its Futures.

I find that Dataloaders are quite difficult to test. But I'd be more worried that they get stuck indefinitely (overbatch), and not that they fetch too quickly (underbatch). So far, the above coroutines-based approach hasn't caused me any problems on the former.

@haizz
Copy link

haizz commented Dec 4, 2019

It's not only dataloader which is impossible to use twice: you also can't use single dataloader which depends on any async result:

        return CompletableFuture.supplyAsync {
            Thread.sleep(100L)
            123 // simulate API call which returns some key
        }.thenCompose { key ->
            dataLoader.load(key)
        }

Things will work only if you call dataLoader.load() exactly when preparing CompletableFuture returned from DataFetcher. If dataLoader.load() is called after DataFetcher is done, then it will be never dispatched. Yes, I can move async API call to a new dataLoader, but this will greatly limit the batching ability in case of such schema:

type Query {
    field1: Type
    field2: Type
    // ....
    fieldN: Type     
}

In this case I would have N different dataloaders for one single type, and they wouldn't batch together. This is quite significant limitation.

@haizz
Copy link

haizz commented Dec 4, 2019

@bbakerman Ironically, while there's all this business going on with Futures and AsyncExecutionStrategy to make it work with a modern async concurrency system, we have to resort to good old sync code with threads when fetching cases get a little bit more complicated than most simple ones.

@stevenheidel
Copy link

stevenheidel commented Dec 5, 2019

@Togrias - thanks for the help so far, really appreciate the detailed explanation of your Kotlin implementation.

Here's what I've got so far with the dispatcher:

import kotlinx.coroutines.*
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext

class QueueableDispatcher(
    private val wrappedDispatcher: CoroutineDispatcher
) : CoroutineDispatcher() {
    private val counter: AtomicInteger = AtomicInteger(0)
    private val queue: Queue<Runnable> = LinkedList()

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        counter.incrementAndGet()
        val wrappedBlock = Runnable {
            block.run()

            if (counter.decrementAndGet() == 0) {
                queue.poll()?.run()
            }
        }

        wrappedDispatcher.dispatch(context, wrappedBlock)
    }

    fun nextTick(block: suspend () -> Unit) {
        queue.add(Runnable { block() }) // *****
    }
}

Unfortunately now I'm stuck since it doesn't make sense to convert a suspend fun into a Runnable for use in the queue. If the nextTick function makes an async call as in the reference implementation it's not clear what to do next.

@Togrias
Copy link
Author

Togrias commented Dec 5, 2019

Here's my implementation:

override fun nextTick(context: CoroutineContext, block: suspend () -> Unit): Job {
		val job: Job = addQueuedJob()
		return CoroutineScope(context + this).launch {
			job.join()
			block()
		}
	}

private fun addQueuedJob(): Job = lock.withLock {
		val job = Job()
		if (counter == 0) {
			job.complete()
		} else {
			queue.add(job)
		}
		job
	}

My queue is a LinkedList of Jobs.

@Togrias
Copy link
Author

Togrias commented Dec 5, 2019

I don't know much Java, much less Java's Futures library, but I think one way to implement this in Java without a framework is to have onStart() and onComplete() methods that track the beginning and completion of each asynchronous block of code.

You can expose a series of wrapper classes:

public class WrappedSupplier<T> implements Supplier<T> {

    @Override
    public T get() {
        onStart();
        try {
           return delegate.get(); 
        } finally {
            onComplete();
        }
    }
}

Alternatively expose the onStart() and onComplete() methods in the DataFetchingEnvironment that users manually call whenever an async future is created (eg. supplyAsync / whenComplete).

The idea is to allow users to manually increment/decrement the counter whenever an asynchronous block of code begins/ends.

It looks quite painful though.

Edit: Or we can have a new CustomFutures framework that creates futures that automatically implement the above wrappers. E.g. we overrride supplyAsync/whencomplete/thenAccept directly.

@jhaals
Copy link

jhaals commented Dec 11, 2019

I have the same use case and problem as @helfer and others.

I've have a fetcher that need data from another DataLoader and I want to defer dispatch and batch all of those queries instead of having to resolve them one by one.

Are there any plans on supporting this use case or workarounds in Java?

@glasser
Copy link
Contributor

glasser commented Apr 23, 2020

@Togrias Do you have a fully working setup here? We use Kotlin coroutines as well and would love to try yours if it works.

@Togrias
Copy link
Author

Togrias commented Apr 30, 2020

@glasser the code/rough idea provided by @stevenheidel and me works. I only have piecemeal solutions that override the execution portion of the graphql-java library, while keeping the code as similar to the graphql-java implementation as possible for easy maintenance (I'm not good with Java).

If you need help on specific areas, I can post some code here.

While it does work, it messes with CoroutineDispatcher and is thus very fragile. Using it can conflict with another library that interacts with CoroutineDispatcher.

It seems to me that the most idiomatic kotlin way is to use kotlinx.coroutines.yield(). That would probably require a ground-up rewrite of the graphql-java library to use coroutines though.

@glasser
Copy link
Contributor

glasser commented Apr 30, 2020

@Togrias Are you able to share the code you've written? Even just in a gist or something? Including KDataLoader, the changed executor files, etc?

@glasser
Copy link
Contributor

glasser commented May 15, 2020

@Togrias and just to make sure I understand: your step 3 above (where you actually rewrite big chunks of the graphql-java library) is actually necessary? Like, graphql-java internally does enough async stuff that if you don't do that, the queue will get emptied way more often than you want and not much batching will happen?

@glasser
Copy link
Contributor

glasser commented May 15, 2020

(How awful would it be to implement nextTick as just "run in 1 ms"? So every dataloader use would say "if there isn't already a queued-and-not-started dispatch, queue a dispatch to run in 1ms? Given that most dataloaders are doing network calls, I suspect a 1ms delay would be bearable...)

@glasser
Copy link
Contributor

glasser commented May 19, 2020

OK, expanding on my last comment, here's a new approach I'm trying. Note that I'm using Kotlin coroutines including Flows here, though the basic concept could work without them — mostly just using Flows to get access to debounce.

First, note that the way we use suspend functions in field fetchers in the first place involves putting a CoroutineScope on the context object accessible from DataFetchingEnvironment.getContext(), and wrapping all of our suspending fetchers like this:

typeBuilder.dataFetcher(name) { env ->
	env.getContext().coroutineScope.future {
		env.fetcher()  // run the actual fetcher defined in our custom DSL
	}
}

(The CoroutineScope in question is actually a supervisorScope so that errors thrown by fetchers don't cause the entire request to fail.)

Wrap the DataLoaderRegistry in a (per-request) DataLoaders object. This both provides some easy type-safe wrappers for some of my favorite DataLoaders, and provides this load function:

class DataLoaders(
	private val registry: DataLoaderRegistry,
	private val coroutineScope: CoroutineScope,
	private val data: SchemaDataSources,  // This gives the high-level loaders access to our DB etc
	private val dispatch: suspend () -> Unit
) {
	private enum class LoaderName {
		SAMPLE_LOADER_A  // etc
	}

	// Loads an element via the named DataLoader, suspending until it is ready. Ensures that registry.dispatchAll()
	// is called soon. All DataLoader loads must go through this function!
	suspend fun <K, V> load(dataLoaderName: String, k: K): V =
		registry.getDataLoader<K, V>(dataLoaderName).load(k).also {
			// Make the channelFlow in graphqlHandlers.kt emit an element into the debouncing logic.
			dispatch.invoke()
		}.await()

	suspend fun sampleLoaderA(id: String): Foo? = load(SAMPLE_LOADER_A.name, id)

	init {
		registry.register(SAMPLE_LOADER_A.name, DataLoader.newMappedDataLoader<String, Foo> { ids ->
			coroutineScope.future {
				data.getFooByIDs(ids)
			}
		})
	}

    // etc for more loaders

}

This DataLoaders object goes on a field of the context object accessible from DataFetchingEnvironment.getContext(). The important bit is load: this call looks up a DataLoader in the registry, calls load(k) on it (which registers the load request with the DataLoader infrastructure and returns a CompletableFuture), invokes the dispatch function passed to the DataLoaders constructor (which we'll see later), and then calls the kotlin-coroutines-jdk8 extension function await to wait for the data to load in a suspend-y way.

Stop using the built-in DataLoaderRegistry instrumentation: When building ExecutionInput, don't pass a DataLoaderRegistry to it, and even call doNotAddDefaultInstrumentations() when constructing a GraphQL (though admittedly that's redundant with not passing a DataLoaderRegistry in ExecutionInput).

Our HTTP handler (we use Ktor) looks essentially like:

// Start a new scope, which we provide to GraphQL execution to use to start `future`
// coroutines. This handler will wait until all such coroutines are finished before
// finishing, and if the handler is canceled, all the coroutines will as well.
// However, we do use a supervisorScope instead of a coroutineScope, so that failure
// of launched coroutines (eg, an error thrown by a suspendField resolver) does not
// lead to immediate cancellation of the scope (ie, of the whole operation).
// We don't leak these failures, because they all end up in CompletableFutures
// that are consumed inside the GraphQL-Java machinery.
supervisorScope {
	val scope = this

	// note: nonBlockingInputStream is something we wrote too
	val params = call.nonBlockingInputStream(maxContentLength).use { stream ->
		try {
			OurObjectMapper.readValue<GraphQLQueryParams>(stream)
		} catch (t: JsonParseException) {
			throw BadRequestException(t)
		} catch (t: JsonMappingException) {
			throw BadRequestException(t)
		}
	}

	// If you give a DataLoaderRegistry to GraphQL-Java (in the ExecutionInput below), it has some
	// some fancy logic to call dispatchAll() on it once per "depth" level of the GraphQL resolver tree.
	// This is very clever! But it has the serious drawback that you can only use DataLoaders synchronously
	// at the beginning of a resolver; you can't use it twice in a resolver, or after running some other
	// suspend function! If you mess this up, you just hang forever. At one point we had a wrapper around
	// getting a DataLoader which threw if you had suspended before, which at least made the error more
	// obvious, but it's still an annoying restriction. More information on that restriction, plus some
	// sketches of ideas of how to fix this for Kotlin coroutines specifically involving a fancy
	// custom CoroutineDispatcher, is at https://github.com/graphql-java/graphql-java/issues/1198
	//
	// We take a different approach. We assume that most DataLoaders are doing some sort of network
	// call, probably to a database, such that slowing down the call by 1ms would not be noticeable.
	// We execute the GraphQL request in a "channel Flow", which uses `channel.send` to output values
	// to the Flow returned by channelFlow every time we start a DataLoader load (via DataLoaders.load).
	// We then debounce that Flow so that it only produces values when there hasn't been an incoming value for
	// 1ms.
	//
	// Every time we successfully reads an element from the flow, it dispatches all outstanding DataLoaders. If
	// 1ms is too small and more load requests are still coming in, that's OK --- they will hopefully at least
	// come in while the first batch is being loaded, so maybe we'll end up with one more batch than optimally
	// necessary, but 2 is still likely less than N.
	val dataLoaderRegistry = DataLoaderRegistry()
	channelFlow<Unit> {
		val dataLoaders = DataLoaders(
			registry = dataLoaderRegistry,
			coroutineScope = scope,
			data = ourDataSourceObject,
			dispatch = { channel.send(Unit) }
		)

		val executionResult = try {
			graphql.executeAsync(
				// Note: we do *NOT* pass dataLoaderRegistry directly to ExecutionInput here;
				// we use our own dispatch logic above instead of integrating with GraphQL-Java's.
				ExecutionInput.newExecutionInput(params.query)
					.operationName(params.operationName)
					.context(GraphqlAPIContext(
						dataLoaders = dataLoaders,
						coroutineScope = scope
					))
					.variables(params.variables)
					.build()
			).await() as ExecutionResultImpl
		} catch (t: Throwable) {
			// Convert any thrown error to a GraphQL error.
			// XXX It's possible this is wrong and the only errors that cause executeAsync().await() to throw
			// are internal enough that they shouldn't be reported to end users.
			ExecutionResultImpl(listOf(t as? GraphQLError ?: SimpleGraphQLError(t)))
		}
		call.respondText(
			OurObjectMapper.writeValueAsString(executionResult.toSpecification()),
			ContentType.Application.Json
		)
	}
		.debounce(1.milliseconds)
		.collect {
			willDispatchAll?.invoke()  // a hook for unit testing this logic
			dataLoaderRegistry.dispatchAll()
		}
}

Basically, we run our execution inside a channelFlow which runs a background coroutine that can send messages (but the only message we send is Unit, the single legal value of the type Unit --- ie, the contents of the message isn't important) along a channel which is read at the bottom in the bottom via a Flow which first debounces it (ie, only sends a message once we haven't seen any messages for 1ms, and throws away any "previous" messages, which is fine because we don't care about message contents anyway), and then calls dispatchAll each time.

So far this appears to work but it's definitely in "code I just wrote" stage rather than "battle-tested"!

@tinnou
Copy link
Contributor

tinnou commented May 28, 2020

@glasser Thank you for posting your approach.
I'm wondering, how functionally equivalent your approach is to the time-based dispatching that is offered in data loaders in some of the implementations like .NET or go?

Since the tick reference implementation re-adapted by @Togrias doesn't rely on time, does that mean it is a superior approach? (provided we don't consider the integration fragility with graphql-java)

@glasser
Copy link
Contributor

glasser commented May 28, 2020

I'm not familiar with those implementations but I assume they're relatively similar.

The downside to @Togrias' approach, if I understand correctly, is that in order for it to be reliable you need to rewrite all the async code in graphql-java to use kotlin coroutines; I'm not quite sure what happens if you don't. (I think that's what you're saying re fragility.)

@tinnou
Copy link
Contributor

tinnou commented May 28, 2020

Yes that's what I was referring to when I said fragility, but taking that out of the picture, introducing an artificial delay felt to me "intuitively" a little less elegant that the original event loop approach. Although don't get me wrong, I learned a few tricks reading your code. (neat idea using debounce)

In the meantime a colleague found this other issue graphql/dataloader#58 which requests for a delay based dispatching approach option in the original data-loader library. People make good arguments on both sides but without experimentation IMO it's hard to know ahead of time which approach will prove more efficient. Looks like it might depend widely on the application use case.

@glasser
Copy link
Contributor

glasser commented May 28, 2020

The JS dataloader does support custom scheduling as of the recent 2.0 release, actually (graphql/dataloader#228)

@lennyburdette
Copy link

I refactored @glasser 's example above for use in our app (using graphql-kotlin) ... seems to be working nicely: https://gist.github.com/lennyburdette/f3fe6ae7a498698774cc95d1bfc956b4

@linqi
Copy link

linqi commented Sep 12, 2020

I encounter the same use case in the java world. We are able to mitigate this issue by adding some addition APIs for Dataloader.java

Our use case:

CompletableFuture<List<Profile>> profileFuture = profileDataLoader.loadMany(input.getMemberIds);
CompletableFuture<List<GeoEntity>> geoFuture = moprofileFuture.thenCompose(profiles -> {
  List<GeoId> geoIds = getGeoIdsFromProfies(profiles);
  return geoDataLoader.loadMany(geoIds);
  // Without this line, execution will never finish. 
  geoDataLoader.dispatch();
});

The line geoDataLoader.dispatch(); is error-prone for devs that are developing features but not familiar with graphql-java.

What we end up doing is providing some APIs in Dataloader.java to allow loading with futures of keys.

public CompletableFuture<V> load(CompletableFuture<K> keyFuture);
public CompletableFuture<List<V>> loadMany(CompletableFuture<List<K>> keysFuture);

Inside DataLoader implementation, we can choose to either invoke dispatch(), or simply use the key futures to construct the loading results.

So the client side code now looks like this

CompletableFuture<List<Profile>> profileFuture = profileDataLoader.loadMany(input.getMemberUrns());
CompletableFuture<List<GeoUrn>> geoUrnFuture = profileFuture.thenApply(this::getGeoIdsFromProfies);
CompletableFuture<List<GeoEntity>> geoFuture = geoDataLoader.loadMany(geoUrnFuture);

We also add lint rules to forbid the load APIs to be invoked asynchronously.

@bbakerman
Copy link
Member

@linqi - can you please provide more code examples on how you dispatch

You said

Inside DataLoader implementation, we can choose to either invoke dispatch(), or simply use the key futures to construct the loading results.

This is the bit I am unsure of. How did you decide to dispatch on the inside?

Seeing some code would be unreal

@linqi
Copy link

linqi commented Sep 15, 2020

@bbakerman of course.

Approach 1) - Using a wrapper for DataLoader to hide dispatch(). This is what we are doing right now.

public class DataLoaderWrapper<K, V> {
  private DataLoader<K, V> _dataLoader;
  /**
   * Method to accept a CompletableFuture of a key and load the key from a data loader.
   *
   * @param keyFuture the given key future
   * @return
   */
  public CompletableFuture<V> load(CompletableFuture<K> keyFuture) {
    return keyFuture.thenCompose(key -> {
      CompletableFuture<V> resultFuture = _dataLoader.load(key);
      _dataLoader.dispatch();
      return resultFuture;
    });
  }

  /**
   * Method to accept a CompletableFuture of a list of keys, and load them from a data loader.
   *
   * @param keysFuture the given key list future
   * @return
   */
  public CompletableFuture<List<V>> loadMany(CompletableFuture<List<K>> keysFuture) {
    return keysFuture.thenCompose(keys -> {
      CompletableFuture<List<V>> resultFuture = _dataLoader.loadMany(keys);
      _dataLoader.dispatch();
      return resultFuture;
    });
  }

@linqi
Copy link

linqi commented Sep 15, 2020

Approach 2) - Have DataLoaderHelper.java support CompletableFuture of keys.
Pros: Clients don't need to explicitly invoke dispatch().
Cons: This may come with suboptimal batching.

  CompletableFuture<V> load(CompletableFuture<K> keyFuture, Object loadContext) {
    synchronized (dataLoader) {
      ...
      CompletableFuture<V> future = new CompletableFuture<>();
      if (batchingEnabled) {
        loaderFutureQueue.add(new DataLoaderHelper.LoaderQueueEntry<>(keyFuture, future, loadContext));
      } else {
        stats.incrementBatchLoadCountBy(1);
        // immediate execution of batch function
        future = invokeLoaderImmediately(keyFuture, loadContext);
      }
      if (cachingEnabled) {
        futureCache.set(cacheKey, future);
      }
      return future;
    }
  }

  CompletableFuture<List<V>> dispatch() {
    ...
    final List<K> keys = new ArrayList<>();
    final List<CompletableFuture<K>> keyFutures = new ArrayList<>();
    synchronized (dataLoader) {
      loaderQueue.forEach(entry -> {
        keys.add(entry.getKey());
        queuedFutures.add(entry.getValue());
        callContexts.add(entry.getCallContext());
      });
      loaderQueue.clear();
      loaderFutureQueue.forEach(entry -> {
        keyFutures.add(entry.getKey());
        queuedFutures.add(entry.getValue());
        callContexts.add(entry.getCallContext());
      });
      loaderFutureQueue.clear();
    }
   ...
    // Loading result future of keys
    CompletableFuture<List<V>> keyLoadingResult = getDispatchResults(keys, queuedFutures, callContexts);
    // Loading result future of key futures
    CompletableFuture<List<V>> keyListFuture = CompletableFuture.allOf(keyFutures.toArray(new CompletableFuture[0]))
        .thenApply(v -> keyFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()))
        .thenCompose(fulfilledKeys -> getDispatchResults(fulfilledKeys, queuedFutures, callContexts));

    return keyListFuture.thenCombine(keyListFuture, (result1, result2) -> {
      // merging two results.
    })
  }

@samuelAndalon
Copy link

samuelAndalon commented May 4, 2022

Hello, just wanted to mention that we found a way to solve this problem natively with CompletableFuture, by writing a custom and optimized Instrumentation that will dispatch when synchronous execution of a query was exhausted (matching the logic of the javascript dataloader)

more info ->
ExpediaGroup/graphql-kotlin#1436

@pfyod
Copy link

pfyod commented Sep 20, 2022

@samuelAndalon is your solution portable to graphql-java (we do not use kotlin, but facing the same issue)?

@pfyod
Copy link

pfyod commented Sep 21, 2022

@samuelAndalon thank you, we made it work in our project! FYI, you have a mistake in documentation (ExpediaGroup/graphql-kotlin#1557)

For everyone else who's trying to call graphql-kotlin library from java code, this is the the way to add kotlin's state object into the GraphQL context from java (exact code depends on the framework you use of course, the important part is wrapping the class with getKotlinClass):

input.getGraphQLContext().put(
  kotlin.jvm.JvmClassMappingKt.getKotlinClass(SyncExecutionExhaustedState.class), 
  new SyncExecutionExhaustedState(1, kotlinDataLoaderRegistry)
);

@fluff-shark
Copy link

Unsure if this is useful here, but while working at Carbon Health I amended @lennyburdette's debounce strategy for use in DGS, which is built on graphql-java.

@muratakbal was nice enough to give me permission to extract and publish it. I have done that here. The debounce code specifically is in this file, with tests for all the batching cases here.

We used these in prod without seeing any issues, though at relatively small scale.

Copy link

github-actions bot commented May 5, 2024

Hello, this issue has been inactive for 60 days, so we're marking it as stale. If you would like to continue this discussion, please comment within the next 30 days or we'll close the issue.

@github-actions github-actions bot added the Stale label May 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

16 participants