Skip to content

Commit

Permalink
Merge pull request #36 from ccremer/package
Browse files Browse the repository at this point in the history
Restructure packages
  • Loading branch information
ccremer committed Mar 22, 2022
2 parents 2b1d53b + d6b1fc7 commit c06da67
Show file tree
Hide file tree
Showing 21 changed files with 288 additions and 294 deletions.
19 changes: 9 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Small Go utility that executes business actions in a pipeline.
import (
"context"
pipeline "github.com/ccremer/go-command-pipeline"
"github.com/ccremer/go-command-pipeline/predicate"
)

type Data struct {
Expand All @@ -24,7 +23,7 @@ func main() {
data := &Data // define arbitrary data to pass around in the steps.
p := pipeline.NewPipeline()
p.WithSteps(
pipeline.NewStep("define random number", defineNumber),
pipeline.NewStepFromFunc("define random number", defineNumber),
pipeline.NewStepFromFunc("print number", printNumber),
)
result := p.RunWithContext(context.WithValue(context.Background, "data", data))
Expand All @@ -33,9 +32,9 @@ func main() {
}
}

func defineNumber(ctx context.Context) pipeline.Result {
func defineNumber(ctx context.Context) error {
ctx.Value("data").(*Data).Number = 10
return pipeline.Result{}
return nil
}

// Let's assume this is a business function that can fail.
Expand Down Expand Up @@ -76,18 +75,18 @@ It could be simplified to something like this:
```go
func Persist(data *Data) error {
p := pipeline.NewPipeline().WithSteps(
pipeline.NewStep("prepareTransaction", prepareTransaction()),
pipeline.NewStep("executeQuery", executeQuery()),
pipeline.NewStep("commitTransaction", commit()),
pipeline.NewStepFromFunc("prepareTransaction", prepareTransaction()),
pipeline.NewStepFromFunc("executeQuery", executeQuery()),
pipeline.NewStepFromFunc("commitTransaction", commit()),
)
return p.RunWithContext(context.WithValue(context.Background(), myKey, data).Err
}

func executeQuery() pipeline.ActionFunc {
return func(ctx context.Context) pipeline.Result {
func executeQuery() error {
return func(ctx context.Context) error {
data := ctx.Value(myKey).(*Data)
err := database.executeQuery("SOME QUERY", data)
return pipeline.Result{Err: err}
return err
)
}
...
Expand Down
3 changes: 2 additions & 1 deletion examples/abort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ func TestExample_Abort(t *testing.T) {
pipeline.NewStepFromFunc("never executed", doNotExecute),
)
result := p.Run()
assert.True(t, result.IsSuccessful())
assert.True(t, result.IsCompleted())
assert.True(t, result.IsAborted())
assert.False(t, result.IsSuccessful())
}

func doNotExecute(_ context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions examples/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestExample_Context(t *testing.T) {
// Create pipeline with defaults
p := pipeline.NewPipeline()
p.WithSteps(
pipeline.NewStep("define random number", defineNumber),
pipeline.NewStepFromFunc("define random number", defineNumber),
pipeline.NewStepFromFunc("print number", printNumber),
)
result := p.RunWithContext(context.WithValue(context.Background(), key, &Data{}))
Expand All @@ -31,9 +31,9 @@ func TestExample_Context(t *testing.T) {
}
}

func defineNumber(ctx context.Context) pipeline.Result {
func defineNumber(ctx context.Context) error {
ctx.Value(key).(*Data).Number = rand.Int()
return pipeline.Result{}
return nil
}

func printNumber(ctx context.Context) error {
Expand Down
35 changes: 17 additions & 18 deletions examples/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ import (
"testing"

pipeline "github.com/ccremer/go-command-pipeline"
"github.com/ccremer/go-command-pipeline/predicate"
)

func TestExample_Git(t *testing.T) {
p := pipeline.NewPipeline()
p.WithSteps(
predicate.ToStep("clone repository", CloneGitRepository(), predicate.Not(DirExists("my-repo"))),
pipeline.NewStep("checkout branch", CheckoutBranch()),
pipeline.NewStep("pull", Pull()).WithResultHandler(logSuccess),
CloneGitRepository(),
CheckoutBranch(),
Pull().WithResultHandler(logSuccess),
)
result := p.Run()
if !result.IsSuccessful() {
Expand All @@ -28,29 +27,29 @@ func TestExample_Git(t *testing.T) {
}

func logSuccess(_ context.Context, result pipeline.Result) error {
log.Println("handler called")
log.Println("handler called", result.Name())
return result.Err()
}

func CloneGitRepository() pipeline.ActionFunc {
return func(_ context.Context) pipeline.Result {
func CloneGitRepository() pipeline.Step {
return pipeline.ToStep("clone repository", func(_ context.Context) error {
err := execGitCommand("clone", "git@github.com/ccremer/go-command-pipeline")
return pipeline.NewResultWithError("clone repository", err)
}
return err
}, pipeline.Not(DirExists("my-repo")))
}

func Pull() pipeline.ActionFunc {
return func(_ context.Context) pipeline.Result {
func Pull() pipeline.Step {
return pipeline.NewStepFromFunc("pull", func(_ context.Context) error {
err := execGitCommand("pull")
return pipeline.NewResultWithError("pull", err)
}
return err
})
}

func CheckoutBranch() pipeline.ActionFunc {
return func(_ context.Context) pipeline.Result {
func CheckoutBranch() pipeline.Step {
return pipeline.NewStepFromFunc("checkout branch", func(_ context.Context) error {
err := execGitCommand("checkout", "master")
return pipeline.NewResultWithError("checkout branch", err)
}
return err
})
}

func execGitCommand(args ...string) error {
Expand All @@ -61,7 +60,7 @@ func execGitCommand(args ...string) error {
return err
}

func DirExists(path string) predicate.Predicate {
func DirExists(path string) pipeline.Predicate {
return func(_ context.Context) bool {
if info, err := os.Stat(path); err != nil || !info.IsDir() {
return false
Expand Down
42 changes: 42 additions & 0 deletions fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pipeline

import (
"context"
"sync"
)

/*
NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines.
The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever.
The step waits until all pipelines are finished.
If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
If the context is canceled, no new pipelines will be retrieved from the channel and the Supplier is expected to stop supplying new instances.
Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ParallelResultHandler.
However, the error returned from ParallelResultHandler is wrapped in context.Canceled.
*/
func NewFanOutStep(name string, pipelineSupplier Supplier, handler ParallelResultHandler) Step {
step := Step{Name: name}
step.F = func(ctx context.Context) Result {
pipelineChan := make(chan *Pipeline)
m := sync.Map{}
var wg sync.WaitGroup
i := uint64(0)

go pipelineSupplier(ctx, pipelineChan)
for pipe := range pipelineChan {
p := pipe
wg.Add(1)
n := i
i++
go func() {
defer wg.Done()
m.Store(n, p.RunWithContext(ctx))
}()
}
wg.Wait()
res := collectResults(ctx, handler, &m)
return setResultErrorFromContext(ctx, name, res)
}
return step
}
37 changes: 18 additions & 19 deletions parallel/fanout_test.go → fanout_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package parallel
package pipeline

import (
"context"
Expand All @@ -7,7 +7,6 @@ import (
"testing"
"time"

pipeline "github.com/ccremer/go-command-pipeline"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
Expand All @@ -16,7 +15,7 @@ func TestNewFanOutStep(t *testing.T) {
var counts uint64
tests := map[string]struct {
jobs int
givenResultHandler ResultHandler
givenResultHandler ParallelResultHandler
returnErr error
expectedCounts int
}{
Expand All @@ -31,9 +30,9 @@ func TestNewFanOutStep(t *testing.T) {
"GivenPipelineWith_WhenRunningStep_ThenReturnSuccessButRunErrorHandler": {
jobs: 1,
returnErr: fmt.Errorf("should be called"),
givenResultHandler: func(ctx context.Context, _ map[uint64]pipeline.Result) pipeline.Result {
givenResultHandler: func(ctx context.Context, _ map[uint64]Result) error {
atomic.AddUint64(&counts, 1)
return pipeline.Result{}
return nil
},
expectedCounts: 2,
},
Expand All @@ -44,15 +43,15 @@ func TestNewFanOutStep(t *testing.T) {
goleak.VerifyNone(t)
handler := tt.givenResultHandler
if handler == nil {
handler = func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
handler = func(ctx context.Context, results map[uint64]Result) error {
assert.NoError(t, results[0].Err())
return pipeline.Result{}
return nil
}
}
step := NewFanOutStep("fanout", func(_ context.Context, funcs chan *pipeline.Pipeline) {
step := NewFanOutStep("fanout", func(_ context.Context, funcs chan *Pipeline) {
defer close(funcs)
for i := 0; i < tt.jobs; i++ {
funcs <- pipeline.NewPipeline().WithSteps(pipeline.NewStepFromFunc("step", func(_ context.Context) error {
funcs <- NewPipeline().WithSteps(NewStepFromFunc("step", func(_ context.Context) error {
atomic.AddUint64(&counts, 1)
return tt.returnErr
}))
Expand All @@ -68,36 +67,36 @@ func TestNewFanOutStep(t *testing.T) {
func TestNewFanOutStep_Cancel(t *testing.T) {
defer goleak.VerifyNone(t)
var counts uint64
step := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *pipeline.Pipeline) {
step := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *Pipeline) {
defer close(pipelines)
for i := 0; i < 10000; i++ {
select {
case <-ctx.Done():
return
default:
pipelines <- pipeline.NewPipeline().WithSteps(pipeline.NewStepFromFunc("increase", func(_ context.Context) error {
pipelines <- NewPipeline().WithSteps(NewStepFromFunc("increase", func(_ context.Context) error {
atomic.AddUint64(&counts, 1)
return nil
}))
time.Sleep(10 * time.Millisecond)
}
}
t.Fail() // should not reach this
}, func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
}, func(ctx context.Context, results map[uint64]Result) error {
assert.Len(t, results, 3)
return pipeline.NewResultWithError("fanout", fmt.Errorf("some error"))
return fmt.Errorf("some error")
})
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond)
defer cancel()
result := pipeline.NewPipeline().WithSteps(step).RunWithContext(ctx)
result := NewPipeline().WithSteps(step).RunWithContext(ctx)
assert.Equal(t, 3, int(counts))
assert.True(t, result.IsCanceled(), "canceled flag")
assert.EqualError(t, result.Err(), `step "fanout" failed: context deadline exceeded, collection error: some error`)
}

func ExampleNewFanOutStep() {
p := pipeline.NewPipeline()
fanout := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *pipeline.Pipeline) {
p := NewPipeline()
fanout := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *Pipeline) {
defer close(pipelines)
// create some pipelines
for i := 0; i < 3; i++ {
Expand All @@ -106,20 +105,20 @@ func ExampleNewFanOutStep() {
case <-ctx.Done():
return // parent pipeline has been canceled, let's not create more pipelines.
default:
pipelines <- pipeline.NewPipeline().AddStep(pipeline.NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
pipelines <- NewPipeline().AddStep(NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
time.Sleep(time.Duration(n * 10000000)) // fake some load
fmt.Println(fmt.Sprintf("I am worker %d", n))
return nil
}))
}
}
}, func(ctx context.Context, results map[uint64]pipeline.Result) pipeline.Result {
}, func(ctx context.Context, results map[uint64]Result) error {
for worker, result := range results {
if result.IsFailed() {
fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err()))
}
}
return pipeline.Result{}
return nil
})
p.AddStep(fanout)
p.Run()
Expand Down
44 changes: 0 additions & 44 deletions parallel/fanout.go

This file was deleted.

0 comments on commit c06da67

Please sign in to comment.