Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement task groups and returning results #84

Open
PROger4ever opened this issue Feb 6, 2023 · 3 comments
Open

Implement task groups and returning results #84

PROger4ever opened this issue Feb 6, 2023 · 3 comments

Comments

@PROger4ever
Copy link

PROger4ever commented Feb 6, 2023

Sometimes we need to run a lot of tasks, grouped into small sets (less than goroutines count). The effective way is to run groups concurrently, not one-by-one (they are small). But the result should be grouped. Similar logic is implemented in alito/pong

One of usage case is parsing of search results:

s := stream.New().WithMaxGoroutines(10)
groups := make([]*conc.Group, len(searchQueries), 0)

for query := searchQueries {
    g := s.group(query)
    groups = append(groups, g)

    for pageNumber := 1; pageNumber < 5; pageNumber++ {
        pageNumber := pageNumber
        g.Go(func() (res interface{}, err error) {
            res, err = getPageResults(query, pageNumber)
            return
        })
    }

    g.onFinish(printGroupResults)
}

func printGroupResults(g *conc.group, results []interface, err error) error {
    fmt.Printf("Results of search query %v: %v\n", g.GetId(), results)
    return nil
}


// or synchronous
for g := groups {
    results := g.GetResults(); // or g.Wait()
    _ = printGroupResults(g, results, nil)
}

Or how can I implement this logic an easy way with current functionality?

@camdencheek
Copy link
Member

Have you seen ResultErrorPool? It looks like that will do a similar thing to the "synchronous" version at the bottom.

Alternatively, a task submitted to a stream returns a closure that will be executed in the order submitted, so something like the following should work:

s := stream.New().WithMaxGoroutines(10)
for query := range searchQueries {
    for pageNumber := 1; pageNumber < 5; pageNumber++ {
        pageNumber := pageNumber
        query := query
        s.Go(func() stream.Callback {
            res, err := getPageResults(query, pageNumber)
            return func() { printGroupResults(query, page, res, err) }
        })
    } 
}
s.Wait()

@bobheadxi
Copy link
Member

bobheadxi commented Feb 16, 2023

I think the use case in the non-synchronous version is similar to what was available in lib/group's Limiter - we use a Limiter in sg to execute groups of work concurrently with a shared, global limit, abbreviated version:

	categoriesGroup = group.NewWithStreaming[error]()
	// checksLimiter is shared to limit all concurrent checks across categories.
	checksLimiter = group.NewBasicLimiter(r.Concurrency)

	// ...

	for i, category := range r.categories {
		// Run categories concurrently
		categoriesGroup.Go(func() error {
			// Run all checks for this category concurrently, with a shared
			// limit on how many checks can run together
			checksGroup := group.New().
				WithErrors().
				WithConcurrencyLimiter(checksLimiter)
			for _, check := range category.Checks {
				// run checks concurrently
				checksGroup.Go(func() error { ... })
			}

			return checksGroup.Wait()
		}, ...)
	}

This helps us show the user that all categories are running, while only r.Concurrency checks are running at the same time - it might be a bit of a funky way to do this, but it made sense to me when I wrote it 😅

@bobheadxi
Copy link
Member

bobheadxi commented Feb 16, 2023

We might be able to do something similar by allowing users to specify a Pool to use (kind of like the ForEachIn(*pool.Pool, ...) idea that was floated):

type PoolLike interface {
   unwrap() Pool
}

func (p *Pool) InPool(pool PoolLike) *Pool {
   p.limiter = pool.unwrap().limiter
} 

In the current design, I think the above will "just work" for the most part, since nobody ever closes limiter, and deref() by default shares the limiter.

Usage might look like:

s := stream.New().WithMaxGoroutines(10)
for query := range searchQueries {
    s.Go(func() stream.Callback {
        // all groups, combined, can use up to the 10 goroutines in s
        g := pool.NewWithResults().InPool(s) 
        for pageNumber := 1; pageNumber < 5; pageNumber++ {
            pageNumber := pageNumber
            g.Go(func() (res interface{}, err error) {
                res, err = getPageResults(query, pageNumber)
                return
            })
        }
        groupResults, err := g.Wait()
        return func() {
            printGroupResults(...)
        }
    })
}
s.Wait()

The awkward bit in this example is that s.Go consumes a worker that could be used by g.Go, but maybe one could create a separate Pool that does nothing but act as a limiter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants