Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FanOut/Until Discussion #86

Open
thrawn01 opened this issue Feb 6, 2023 · 12 comments
Open

FanOut/Until Discussion #86

thrawn01 opened this issue Feb 6, 2023 · 12 comments

Comments

@thrawn01
Copy link

thrawn01 commented Feb 6, 2023

I like what you have built here, It's almost exactly what we need! We at mailgun have been using https://github.com/mailgun/holster/tree/master/syncutil for quite some time. However, there are two things missing from this library of which I would like to open for discussion.

Until

conc.WaitGroup.Until() https://github.com/mailgun/holster/blob/master/syncutil/waitgroup.go#L56

Until() simplifies running and stopping many long running go rountines. You can chain Until() multiple times and call Stop() to signal all the go rountine's which are looping to end.

func main() {
    // Golang std pattern
    {
        done := make(chan struct{})
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            for {
                select {
                case <-time.After(time.Second):
                case <-done:
                    wg.Done()
                    return
                }
            }
        }()
        close(done)
        wg.Wait()
    }

    // Until
    {
        wg := syncutil.WaitGroup{}
        wg.Until(func(done chan struct{}) bool {
            select {
            case <-time.After(time.Second):
            case <-done:
                return false
            }
            return true
        })
        wg.Stop()
    }
}

In this example, we save state by not needing the obligatory done every time we start a new go routine. It also saves a single indent as we no longer need the for loop. Our typical use case is to have many go rountines running which all need to be shutdown when the service ends. Avoiding the need for a donevariable every time we use this pattern has been nice. Combined with the panic handling in the conc, Until would be even more useful.

Fanout

It appears that conc.pool provides almost everything our current implementation of Fanout does and more. However, we have a VERY common use case where we need to pass in a local variable to the closure to avoid race conditions.

{
    // Limits the max number of go routines running concurrently to 10
    // Loop will block when capacity is reached.
    fan := syncutil.NewFanOut(10)
    for i := 0; i < 100; i++ {
        // Should be updated to support generics
        fan.Run(func(o interface{}) error {
            num := o.(int)
            _, _ = http.Get(fmt.Sprintf("https://example.com/integers/%d", num))
            return nil
        }, i)
    }

    // Should be updated to return an err that supports
    // the new `Unwrap() []error` for multi errors in golang 1.20
    errs := fan.Wait()
    if len(errs) != 0 {
        panic(errs)
    }
}

This isn't a request to add these features so much as a wish to start a conversation around them and other common use cases which might not be covered here.

@thrawn01 thrawn01 changed the title FanOut Discussion FanOut/Until Discussion Feb 6, 2023
@camdencheek
Copy link
Member

Hi @thrawn01! Thanks for reaching out.

Starting with Until, could this not be achieved with a context pool?

func main() {
	// Golang std pattern
	{
		done := make(chan struct{})
		wg := sync.WaitGroup{}
		wg.Add(1)
		go func() {
			for {
				select {
				case <-time.After(time.Second):
				case <-done:
					wg.Done()
					return
				}
			}
		}()
		close(done)
		wg.Wait()
	}

	// conc version
	{
		ctx, cancel := context.WithCancel(context.Background())
		p := pool.New().WithContext(ctx)
		p.Go(func(ctx context.Context) error {
			for {
				select {
				case <-time.After(time.Second):
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		})
		cancel()
		p.Wait()
	}
}

@camdencheek
Copy link
Member

a VERY common use case where we need to pass in a local variable to the closure to avoid race conditions

Yes, this is a rather...annoying...problem when it comes to the ergonomics of the library and goroutines/for loops in general. Interestingly, there has been some discussion recently about changing these semantics. The classic way to solve this is by redefining the variable inside the loop:

func main() {
	var wg conc.WaitGroup
	for i := 0; i < 100; i++ {
		i := i // redefined the variable to avoid loop aliasing
		wg.Go(func() {
			println(i)
		})
	}
	wg.Wait()
}

Alternatively, if you have a pre-collected list of things you're iterating over, you can use the iter package.

func main() {
	tasks := make([]int, 100)
	for i := 0; i < 100; i++ {
		tasks[i] = i
	}

	iter.ForEach(tasks, func(i *int) {
		println(*i)
	})
}

@thrawn01
Copy link
Author

thrawn01 commented Feb 8, 2023

This isn't so much a question of how it can be achieved, as there is always the standard implementation. The question is

"Does Until make the code clearer and reduce state management enough to be included in conc or syncutil".

Your proposed p := pool.New().WithContext(ctx) works fine, but doesn't improve upon or achieve anything over using chan done. This pattern of usage requires both wg and done to exist, or in your example p and ctx need to exist. If it is a common case where done or ctx is always useful outside just needing it to stop the go routines, then Until becomes useless as a state reducer.

I hope that makes sense. conc and syncutils as libraries are not solving a problem, they are instead quality of life improvements for developers. If Until doesn't improve the quality of life for a developer then it isn't useful in either library.

@samix73
Copy link

samix73 commented Feb 8, 2023

The Stop method would possibly be a good addition to ContextPool so it would possible to cancel the context without the need cancel the parent context or return an error

func (p *ContextPool) Stop() error {
	p.cancel()
	return p.Wait()
}

Here is how it would look

func main() {
	// Golang std pattern
	{
		done := make(chan struct{})
		wg := sync.WaitGroup{}
		wg.Add(1)
		go func() {
			for {
				select {
				case <-time.After(time.Second):
				case <-done:
					wg.Done()
					return
				}
			}
		}()
		close(done)
		wg.Wait()
	}

	// conc version
	{
		p := pool.New().WithContext(context.Background())
		p.Go(func(ctx context.Context) error {
			for {
				select {
				case <-time.After(time.Second):
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		})
		p.Stop()
	}
}

@thrawn01
Copy link
Author

thrawn01 commented Feb 8, 2023

I like the use of iter which is quite ingenious. I expect the use of a pre-collected list of things is a very common use case for FanOut(). As such Iter() would be more appropriate and removes the need to pass in a local variable. ✨

I expect the exception to this would be reading from a channel to feed the FanOut(). I wonder if a channel version of iter is possible and would be more appropriate. If possible, I suspect It would likely remove any need for an implementation of FanOut() at all.

Thoughts?

Thinking out loud

I recall listening to rob pike lamenting the lack of generics in early versions of golang and because of this, there was no standard library version of FanOut(). I wonder why there still is no standard version 🤔

@camdencheek
Copy link
Member

Ah, I see. Thanks for clarifying here!

I like the idea of Stop() as a more elegant way of handling this pattern. However, it's not super clear to me whether this toy example is actually a pattern that comes up. Could you provide a more realistic example of when someone might actually want to use Stop()?

In the provided example, we're calling Stop() immediately, which will cancel the spawned goroutines immediately. We probably never actually want to do something like that since we probably are always spawning goroutines to do some work and canceling immediately means no work will be done.

So what calls Stop() or triggers it to be called? It's probably some external, asynchronous signal right? Like...a Context? At that point, we're right back where we started by passing a context into the wait group.

I'm struggling to put words to why this pattern feels a little off, so bear with me 🙁

func doTheThing(ctx context.Context) {
	p := pool.New().WithContext(ctx)
	for i := 0; i < 10; i++ {
		p.Go(func(ctx context.Context) error {
			<-ctx.Done()
			return ctx.Err()
		})
	}

	// What happens here that makes us decide to call p.Stop()?
	// <-ctx.Done()? Why not just pass that context directly into
	// the pool?

	p.Stop()
}

@camdencheek
Copy link
Member

I wonder if a channel version of iter is possible and would be more appropriate

Yes, it would definitely be possible. Not terribly difficult to implement either. And it does provide an easy way so concurrently process streams of unknown length with iter, making it viable for even large result sets that we don't want to collect into memory ahead-of-time.

However...I'm pretty allergic to making channels part of a public API, so I'd like to make sure the design is thought through first.

For the sake of discussion, let's take the following stripped-down design:

func ForEachCh[T any](input chan T, f func(T)) {
	task := func() {
		for v := range input {
			f(v)
		}
	}

	var wg conc.WaitGroup
	for i := 0; i < runtime.GOMAXPROCS(0); i++ {
		wg.Go(task)
	}
	wg.Wait()
}

Some arguments I can think of against adding a channel-based API to the iter package:

  • A caller must always close the input channel, otherwise a we face a deadlock. This must happen even in the face of panics, so close(input) must be defer-ed.
  • input must be fed somehow, and it has to be from another goroutine because ForEachCh blocks the calling goroutine. The caller now has to manage that goroutine.
  • We cannot safely provide form of cancellation because input must be consumed in its entirety, otherwise the feeding goroutine will block forever. We don't control the feeding goroutine, so we don't know if it will respect a context cancellation.
  • If we want to implement the Map equivalent of this pattern, we need both an input channel and an output channel, which adds even more complexity because now the caller also needs to always fully consume the output channel, needs to manager yet another goroutine, etc.

@thrawn01
Copy link
Author

So what calls Stop() or triggers it to be called? It's probably some external, asynchronous signal right? Like...a Context? At that point, we're right back where we started by passing a context into the wait group.

I'm struggling to put words to why this pattern feels a little off, so bear with me 🙁

Sure! Our services own the entire problem domain they are trying to solve. Which includes async work, cron, and data movement between packages & services. These often require long running go routines which span the life time of the service. This pattern is so common we use a naming scheme where if a struct/package creates a long running go routine we call it package.SpawnThing() instead of package.NewThing(). Spawn() is a hint that you must also call Stop(). We noticed we were writing the same boiler plate code over and over with (done & wg) that we developed syncutil.Until(). You will note that Until() also includes a for loop, as this was also very common.

Kafka-Pixy is a public repo of ours (quite old) which has some examples of this pattern. While we no longer use this Factory interface, and no longer call every thing actor 😄 It can serve to illustrate how many structs implement this pattern and how common it can be in a service. https://github.com/mailgun/kafka-pixy/blob/master/offsetmgr/offsetmgr.go#L41

@thrawn01
Copy link
Author

thrawn01 commented Feb 13, 2023

However...I'm pretty allergic to making channels part of a public API, so I'd like to make sure the design is thought through first.

I also have an allergy to using channels in a Public API.

Some arguments I can think of against adding a channel-based API to the iter package:

No amount of sugar or documentation can keep developers from not respecting or using channels correctly.

We cannot safely provide form of cancellation because input must be consumed in its
entirety, otherwise the feeding goroutine will block forever. We don't control the feeding
goroutine, so we don't know if it will respect a context cancellation.

Again, as much as I would want to, we can't keep dev's from making silly mistakes. However, If we can, we should encourage good behavior, but this is nearly impossible todo with channels. (One of the reasons I avoid it in my API's)

The best way to think about this is to ask "Is the alternative to this sugar a magnitude more painful than the possible pitfalls provided by my API?". Which, I'll admit is often a judgement call.

Here is a complete example of what we currently have today, and hopefully explains some of the common use cases.

package cmd

import (
	"log"
	"time"

	"github.com/mailgun/holster/v4/syncutil"
)

type Event struct {
	AccountID string
}

type EventProcessor struct {
	eventCh chan Event
	wg      syncutil.WaitGroup
	fan     *syncutil.FanOut
}

func (p *EventProcessor) SpawnEventProcessor() {
	// Since Cassandra performs better with lots of concurrency, we want
	// to handle each event with as much concurrency as possible and avoid
	// synchronous handling of events. Allow 30 concurrent handling of events.
	p.fan = syncutil.NewFanOut(30)

	// Run until we ask it to stop or until the channel is closed
	p.wg.Until(func(done chan struct{}) bool {
		select {
		case e, ok := <-p.eventCh:
			// This should be the common case
			if !ok {
				return false
			}
			p.fan.Run(func(obj interface{}) error {
				event := obj.(Event)
				if err := writeEvent(event); err != nil {
					log.Printf("err: %s\n", err)
					return nil
				}
				return nil
			}, e)
		case <-done:
			return false
		}
		return true
	})
}

// HandleEvent is called by our kafka consumer to pass the event to the processor
func (p *EventProcessor) HandleEvent(e Event) {
	p.eventCh <- e
}

func (p *EventProcessor) Stop() {
	close(p.eventCh)
	wait := make(chan struct{})
	go func() {
		p.fan.Wait()
		wait <- struct{}{}
	}()

	// It's possible that `writeEvent()` is stuck talking to an external system because something
	// catastrophic happened. If that is the case we don't want to wait around forever as k8s will
	// get mad at us and force kill us, thus possibly not providing time to flush the data other from
	// processors in this service. So we wait for a reasonable amount of time, then force the work
	// group to close.
	select {
	case <-time.After(time.Minute):
		return
	case <-wait:
		p.wg.Stop()
	}
}

func writeEvent(e Event) error {
	// Talk to Cassandra and update a datapoint or something
	return nil
}

@thrawn01
Copy link
Author

thrawn01 commented Feb 13, 2023

Here is a standard golang implementation. Which isn't horrible. Both implementations require the user to handle channels properly. IE, if we called HandleEvent() after p.eventCh is closed, it will cause a panic, etc.... so I could add an atomic p.hasClosed to solve for this, or I could just not call HandleEvent() after I've called EventProcessor.Stop(). Using channels is fret with pitfalls that only experience can help remedy.

package cmd

import (
	"log"
	"sync"
	"time"
)

type Event struct {
	AccountID string
}

type EventProcessor struct {
	concurrentCh chan struct{}
	eventCh      chan Event
	doneCh       chan struct{}

	wg  sync.WaitGroup
	fan sync.WaitGroup
}

func (p *EventProcessor) SpawnEventProcessor() {
	// Since Cassandra performs better with lots of concurrency, we want
	// to handle each event with as much concurrency as possible and avoid
	// synchronous handling of events. Allow 30 concurrent handling of events.
	p.concurrentCh = make(chan struct{}, 30)

	// Run until we ask it to stop or until the channel is closed
	p.wg.Add(1)
	go func() {
		defer p.wg.Done()
		for {
			select {
			case e, ok := <-p.eventCh:
				if !ok {
					return
				}
				p.concurrentCh <- struct{}{}
				p.fan.Add(1)
				go func(event Event) {
					defer func() {
						<-p.concurrentCh
						p.fan.Done()
					}()
					if err := writeEvent(event); err != nil {
						log.Printf("err: %s\n", err)
					}
				}(e)
			case <-p.doneCh:
				return
			}
		}
	}()
}

// HandleEvent is called by our kafka consumer to pass the event to the processor
func (p *EventProcessor) HandleEvent(e Event) {
	p.eventCh <- e
}

func (p *EventProcessor) Stop() {
	close(p.eventCh)
	wait := make(chan struct{})
	go func() {
		p.fan.Wait()
		wait <- struct{}{}
	}()

	// It's possible that `writeEvent()` is stuck talking to an external system because something
	// catastrophic happened. If that is the case we don't want to wait around forever as k8s will
	// get mad at us and force kill us, thus possibly not providing time to flush the data other from
	// processors in this service. So we wait for a reasonable amount of time, then force the work
	// group to close.
	select {
	case <-time.After(time.Minute):
		return
	case <-wait:
		close(p.doneCh)
		p.wg.Wait()
	}
}

func writeEvent(e Event) error {
	// Talk to Cassandra and update a datapoint or something
	return nil
}

RE: Is the alternative to this sugar a magnitude more painful than the possible pitfalls provided by my API?

If a dev uses the standard golang implementation over syncutil they still have the same amount of channel pitfalls to avoid. Which indicates the simplification of the code is useful. (Especially as the code become more complex beyond this simple example.)

As an exercise I'll implement this with iter and see what I come up with later. I've run out of time for today 😭

@bobheadxi
Copy link
Member

bobheadxi commented Feb 16, 2023

I think the use cases around (*ContextPool).Stop() (or more explicitly, CancelAndWait()?) seem to generally be long-lived goroutines (like the example above I think?), where the primary use case is concurrency control and goroutine cleanup on service shutdown, where you want to explicitly stop and wait for things in an order you control. @camdencheek made some comments on long-lived pools over at #89 (comment) (and I wrote some thoughts in #93), but I think the may concern here is if you replace the above example's sync.WaitGroup usages with conc/pool, panics get swallowed until EventProcessor is explicitly stopped.

That aside, in the example above, you could manage p.fan easily with a ContextPool and your own cancellation context, but to control a second pool (p.wg) where you don't want to cancel both at the same time, you need another cancellation:

type EventProcessor struct {
	eventCh chan Event

	until *pool.ContextPool
	cancelUntil func()

	fan *pool.ContextPool
	cancelFan func()
}

func (p *EventProcessor) Stop() {
	close(p.eventCh)
	wait := make(chan struct{})
	go func() {
		p.cancelFan()
		_ = p.fan.Wait()
		wait <- struct{}{}
	}()

	select {
	case <-time.After(time.Minute):
		return
	case <-wait:
		p.cancelUntil()
		_ = p.until.Wait()
	}
}

In this case Stop / CancelAndWait would be nice sugar to avoid setting up cancellation contexts and calling cancel explicitly, but error handling becomes a bit hard to hold right because there could be a panic that only gets surfaced in each ContextPool's Wait().

@ThinkChaos
Copy link

Maybe I'm missing something from your requirements for FanOut, but the example can be rewritten to use shadowing with normal pools:

for i := 0; i < 100; i++ {
    i := i // each iteration gets its own var
    pool.Go(func() { /* use i as normal */ })
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants