Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't forget about context for concurrent functions #24

Open
awalterschulze opened this issue Aug 15, 2017 · 17 comments
Open

Don't forget about context for concurrent functions #24

awalterschulze opened this issue Aug 15, 2017 · 17 comments

Comments

@awalterschulze
Copy link
Owner

Some plugins might need to pass down the context to functions that execute concurrently.

@awalterschulze
Copy link
Owner Author

For example, should deriveDo and derivePipeline have context aware versions?

derivePipeline(func(context, A) <-chan B, func(context, B) <-chan C) func(context, A) <-chan C

And how will we know that all routines are canceled

@tomberek
Copy link

Context seems to be a monad bundle of (State,Cancel,Timeouts) used by most Go libraries. It would be convenient to use that idiom with goderive.

@awalterschulze
Copy link
Owner Author

awalterschulze commented May 18, 2020 via email

@tomberek
Copy link

I have a few components that make grpc calls which are Context powered. The ability to cancel or to have timeouts set via Context is convenient because the rest of the Go ecosystem will propagate them through middleware nicely.

@awalterschulze
Copy link
Owner Author

awalterschulze commented May 18, 2020 via email

@tomberek
Copy link

The case have is something like this:

// Note this receiver
func (c HasContext) deriveJoin(in <-chan (<-chan struct{})) <-chan struct{} {
    out := make(chan struct{})
    go func() {
        wait := sync.WaitGroup{}
        for c := range in {
            wait.Add(1)
            res := c
            go func() {
                 // for r := range res {   ORIGINAL
            loop:
                 for {
                     select {                // NEW STUFF vvvvvv
                     case <- c.Done():
                        beak loop:
                     default:
                        out <- r
                }
                wait.Done()
            }()
        }
        wait.Wait()
        close(out)
    }()
    return out
}

@awalterschulze
Copy link
Owner Author

This is a great start.

Okay so we read struct{} from res and place it on out, until context is cancelled.
This happens from each chan struct{}, which is being read in a go rountine.
So they all stop reading from the chan struct{} and stop writing struct{} to out.
All waitgroups receive a Done and wait.Wait() stops waiting.
The out channel is closed.

The reader of out, then also needs to remember to select on ctx.Done(), otherwise it will keep on reading the channel, until the remaining struct{}s has been read, but that sounds fair.

Very interesting.

  1. goderive currently doesn't support methods.
    I am not against the idea of adding methods, but it would be a huge amount of work, even for me as the reviewer.
    It would be very easy to rework this into a function though:
func deriveJoin(c context.Context, in <-chan (<-chan struct{})) <-chan struct{} {

Then the user can simply call this function from the method.

  1. One point of confusion, while I was reading this:
    c is HasContext but also c is type chan struct{} from in.

  2. Is deriveJoin the only place you would want to see this?

  3. On a more theoretical note. I wonder if context.Context really is a monad bundle. My instinct says you are correct, but what would fmap and join look like. My original intent was also to add context like you are describing here, but I never thought of it as a monad and now I want to be sure, because maybe that will change the design. Could you explain your thinking?

@tomberek
Copy link

1,2. Fair point, i added that receiver/method just to get the context from somewhere, but I like your suggestion better, because the context is really describing the bind. I find myself using Curry/Uncurry with your style:

derivePipeline( deriveCurry( deriveJoin )(ctx), somethingelse)
  1. My intent is to build processing pipelines. This may be closer to the concept of Arrows/FRP/dataflow based programming.

  2. We'd have to do something like associate a Context which every chan for it to be a proper Monad.

type struct ChanWithContext {
    context.Context
    Chan <-chan struct{}
}

But that looks odd, not sure about the resulting syntax. It's farther from idiomatic? Or is that worth trying? (normal seems to be to make the user explicitly pass in the context: https://levelup.gitconnected.com/how-to-use-context-to-manage-your-goroutines-like-a-boss-ef1e478919e6 )

I will experiment with the: func deriveJoin(ctx, ... style

@tomberek
Copy link

Example of how i am currently addressing this. This can cancel the processors, but i'm not sure if the go func's in deriveJoin become orphaned.

type DbService interface {
     ProcIn(context.Context) func(l ChanDetectionList) <-chan error
     ProcOut(context.Context) func(table string) <-chan ChanMessage

@awalterschulze
Copy link
Owner Author

Unfortunately deriveCurry( deriveJoin ) won't work :(
goderive requires input parameters to derive a function
#10

@awalterschulze
Copy link
Owner Author

  1. Do you maybe have a small example, as code?

@awalterschulze
Copy link
Owner Author

  1. Hmmm, I think this is a move in the right direction.
    Not in terms of syntax, but we can get back to the parameter version, which I agree would be more idiomatic, once we are sure about the theory.

We need to define the generic parameter so I guess it would be"

type struct ChanWithContext<a> {
    context.Context
    Chan <-chan a
}

where a is the generic parameter.

Then deriveFmap would need make sense and deriveJoin would need to make sense.

deriveJoin would then theoretically operate on

type struct ChanWithContext<ChanWithContext<a>> {
    context.Context
    Chan <-chan ChanWithContext<a>
}

which I don't know what it would mean with two context.
But possibly it would be fine, I would just need to see it.

@tomberek
Copy link

There are two types of "Context" that may make sense to track:

  1. a Context for the entire pipeline, this may include things like DB connections, GRPC connections. This can be a long-running Context that persists and allows resource cleanup.
  2. a Context for an individual request being threaded through a pipeline. This can include short timeouts. Fan-out would duplicate this context.

My approach with the closure can better accommodate (1) while the approach with the ChanWithContext is better for handling (2).

Example pipelines:

    convertChannelName := func(channel timelines.ChanMessage) <-chan timelines.ChanMessage {
                ch := make(chan timelines.ChanMessage)
                go func() {
                        channel.Channel = "gst"
                        ch <- channel
                        close(ch)
                }()
                return ch
        }

    ctx, cancel := context.WithCancel(context.Background())
    // Pipeline G
    g := derivePipelineInjest(k.Proc(ctx), db.ProcIn(ctx))

    / Pipeline F
    f1 := derivePipelineF1(db.ProcOut(ctx), convertChannelName)
    f2 := derivePipelineF2(f1, s.Proc(ctx))

For contrast: a whole different approach is here (https://blog.gopheracademy.com/advent-2015/composable-pipelines-improvements/).

@awalterschulze
Copy link
Owner Author

Sorry for the late reply, I have been on holiday and then sick.

derivePipeline is probably what will be used in the end.
And thinking about derivePipeline makes it easier to think about.
But I think it is worth doing the experiment of writing it as deriveFmap and deriveJoin first.
This just checks that theory is sound.
After this creating the derivePipeline version as an optimized version is fine.

But alternatively, you can be unblocked by creating your own goderive binary, which includes your extension.
https://github.com/awalterschulze/goderive#customization
I just want to be as sure as I can that pipeline is a monad, before I merge it in, and using fmap and join helps me think.

@tomberek
Copy link

Still using this approach

func (context.Context) func A <-chan B

Doesn't require any changes to goderive and I use it like this:

derivePipeline( f(ctx), g(ctx))

@awalterschulze
Copy link
Owner Author

Oh okay, sorry I totally misunderstood.
I am clearly still not fully recovered.

@awalterschulze
Copy link
Owner Author

@tomberek seems you are an intimate user, do you have a project that you want to link here https://github.com/awalterschulze/goderive#users ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants