-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
engine.go
639 lines (536 loc) · 21.2 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
package core
import (
"errors"
"fmt"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/graph"
"github.com/vercel/turbo/cli/internal/util"
"github.com/pyr-sh/dag"
)
const ROOT_NODE_NAME = "___ROOT___"
// Task is a higher level struct that contains the underlying TaskDefinition
// but also some adjustments to it, based on business logic.
type Task struct {
Name string
// TaskDefinition contains the config for the task from turbo.json
TaskDefinition fs.TaskDefinition
}
type Visitor = func(taskID string) error
// Engine contains both the DAG for the packages and the tasks and implements the methods to execute tasks in them
type Engine struct {
// TaskGraph is a graph of package-tasks
TaskGraph *dag.AcyclicGraph
PackageTaskDeps map[string][]string
rootEnabledTasks util.Set
// completeGraph is the CompleteGraph. We need this to look up the Pipeline, etc.
completeGraph *graph.CompleteGraph
// isSinglePackage is used to load turbo.json correctly
isSinglePackage bool
}
// NewEngine creates a new engine given a topologic graph of workspace package names
func NewEngine(
completeGraph *graph.CompleteGraph,
isSinglePackage bool,
) *Engine {
return &Engine{
completeGraph: completeGraph,
TaskGraph: &dag.AcyclicGraph{},
PackageTaskDeps: map[string][]string{},
rootEnabledTasks: make(util.Set),
isSinglePackage: isSinglePackage,
}
}
// EngineBuildingOptions help construct the TaskGraph
type EngineBuildingOptions struct {
// Packages in the execution scope, if nil, all packages will be considered in scope
Packages []string
// TaskNames in the execution scope, if nil, all tasks will be executed
TaskNames []string
// Restrict execution to only the listed task names
TasksOnly bool
}
// EngineExecutionOptions controls a single walk of the task graph
type EngineExecutionOptions struct {
// Parallel is whether to run tasks in parallel
Parallel bool
// Concurrency is the number of concurrent tasks that can be executed
Concurrency int
}
// StopExecutionSentinel is used to return an error from a graph Walk that indicates that
// all further walking should stop.
type StopExecutionSentinel struct {
err error
}
// StopExecution wraps the given error in a sentinel error indicating that
// graph traversal should stop. Note that this will stop all tasks, not just
// downstream tasks.
func StopExecution(reason error) *StopExecutionSentinel {
return &StopExecutionSentinel{
err: reason,
}
}
// Error implements error.Error for StopExecutionSentinel
func (se *StopExecutionSentinel) Error() string {
return fmt.Sprintf("Execution stopped due to error: %v", se.err)
}
// Execute executes the pipeline, constructing an internal task graph and walking it accordingly.
func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error {
var sema = util.NewSemaphore(opts.Concurrency)
var errored int32
// Returning an error from the Walk callback cancels downstream tasks, but not unrelated tasks.
// The behavior we want is to either cancel everything or nothing (--continue). So, we do our own
// error handling. Collect any errors that occur in "errors", and report them as the result of
// Execute. panic on any other error returned by Walk.
var errorMu sync.Mutex
var errors []error
recordErr := func(err error) {
errorMu.Lock()
defer errorMu.Unlock()
errors = append(errors, err)
}
unusedErrs := e.TaskGraph.Walk(func(v dag.Vertex) error {
// Use an extra func() to ensure that we are not returning any errors to Walk
func() {
// If something has already errored, short-circuit.
// There is a race here between concurrent tasks. However, if there is not a
// dependency edge between them, we are not required to have a strict order
// between them, so a failed task can fail to short-circuit a concurrent
// task that happened to be starting at the same time.
if atomic.LoadInt32(&errored) != 0 {
return
}
// Each vertex in the graph is a taskID (package#task format)
taskID := dag.VertexName(v)
// Always return if it is the root node
if strings.Contains(taskID, ROOT_NODE_NAME) {
return
}
// Acquire the semaphore unless parallel
if !opts.Parallel {
sema.Acquire()
defer sema.Release()
}
if err := visitor(taskID); err != nil {
if se, ok := err.(*StopExecutionSentinel); ok {
// We only ever flip from false to true, so we don't need to compare and swap the atomic
atomic.StoreInt32(&errored, 1)
recordErr(se.err)
// Note: returning an error here would cancel execution of downstream tasks only, and show
// up in the errors returned from Walk. However, we are doing our own error collection
// and intentionally ignoring errors from walk, so fallthrough and use the "errored" mechanism
// to skip downstream tasks
} else {
recordErr(err)
}
}
}()
return nil
})
if len(unusedErrs) > 0 {
panic("we should be handling execution errors via our own errors + errored mechanism")
}
return errors
}
// MissingTaskError is a specialized Error thrown in the case that we can't find a task.
// We want to allow this error when getting task definitions, so we have to special case it.
type MissingTaskError struct {
workspaceName string
taskID string
taskName string
}
func (m *MissingTaskError) Error() string {
return fmt.Sprintf("Could not find \"%s\" or \"%s\" in workspace \"%s\"", m.taskName, m.taskID, m.workspaceName)
}
func (e *Engine) getTaskDefinition(pkg string, taskName string, taskID string) (*Task, error) {
pipeline, err := e.completeGraph.GetPipelineFromWorkspace(pkg, e.isSinglePackage)
if err != nil {
if pkg != util.RootPkgName {
// If there was no turbo.json in the workspace, fallback to the root turbo.json
if errors.Is(err, os.ErrNotExist) {
return e.getTaskDefinition(util.RootPkgName, taskName, taskID)
}
// otherwise bubble it up
return nil, err
}
return nil, err
}
if task, ok := pipeline[taskID]; ok {
return &Task{
Name: taskName,
TaskDefinition: task.GetTaskDefinition(),
}, nil
}
if task, ok := pipeline[taskName]; ok {
return &Task{
Name: taskName,
TaskDefinition: task.GetTaskDefinition(),
}, nil
}
// An error here means turbo.json exists, but didn't define the task.
// Fallback to the root pipeline to find the task.
if pkg != util.RootPkgName {
return e.getTaskDefinition(util.RootPkgName, taskName, taskID)
}
// Return this as a custom type so we can ignore it specifically
return nil, &MissingTaskError{
taskName: taskName,
taskID: taskID,
workspaceName: pkg,
}
}
// Prepare constructs the Task Graph for a list of packages and tasks
func (e *Engine) Prepare(options *EngineBuildingOptions) error {
pkgs := options.Packages
taskNames := options.TaskNames
tasksOnly := options.TasksOnly
// If there are no affected packages, we don't need to go through all this work
// we can just exit early.
// TODO(mehulkar): but we still need to validate bad task names?
if len(pkgs) == 0 {
return nil
}
traversalQueue := []string{}
// get a set of taskNames passed in. we'll remove the ones that have a definition
missing := util.SetFromStrings(taskNames)
// Get a list of entry points into our TaskGraph.
// We do this by taking the input taskNames, and pkgs
// and creating a queue of taskIDs that we can traverse and gather dependencies from.
for _, pkg := range pkgs {
for _, taskName := range taskNames {
taskID := util.GetTaskId(pkg, taskName)
// Look up the task in the package
foundTask, err := e.getTaskDefinition(pkg, taskName, taskID)
// We can skip MissingTaskErrors because we'll validate against them later
// Return all other errors
if err != nil {
var e *MissingTaskError
if errors.As(err, &e) {
// Initially, non-package tasks are not required to exist, as long as some
// package in the list packages defines it as a package-task. Dependencies
// *are* required to have a definition.
continue
}
return err
}
// If we found a task definition, remove it from the missing list
if foundTask != nil {
// delete taskName if it was found
missing.Delete(taskName)
// Even if a task definition was found, we _only_ want to add it as an entry point to
// the task graph (i.e. the traversalQueue), if it's:
// - A task from the non-root workspace (i.e. tasks from every other workspace)
// - A task that we *know* is rootEnabled task (in which case, the root workspace is acceptable)
isRootPkg := pkg == util.RootPkgName
if !isRootPkg || e.rootEnabledTasks.Includes(taskName) {
traversalQueue = append(traversalQueue, taskID)
}
}
}
}
visited := make(util.Set)
// validate that all tasks passed were found
missingList := missing.UnsafeListOfStrings()
sort.Strings(missingList)
if len(missingList) > 0 {
return fmt.Errorf("Could not find the following tasks in project: %s", strings.Join(missingList, ", "))
}
// Things get appended to traversalQueue inside this loop, so we use the len() check instead of range.
for len(traversalQueue) > 0 {
// pop off the first item from the traversalQueue
taskID := traversalQueue[0]
traversalQueue = traversalQueue[1:]
pkg, taskName := util.GetPackageTaskFromId(taskID)
if pkg == util.RootPkgName && !e.rootEnabledTasks.Includes(taskName) {
return fmt.Errorf("%v needs an entry in turbo.json before it can be depended on because it is a task run from the root package", taskID)
}
if pkg != ROOT_NODE_NAME {
if _, ok := e.completeGraph.WorkspaceInfos.PackageJSONs[pkg]; !ok {
// If we have a pkg it should be in WorkspaceInfos.
// If we're hitting this error something has gone wrong earlier when building WorkspaceInfos
// or the workspace really doesn't exist and turbo.json is misconfigured.
return fmt.Errorf("Could not find workspace \"%s\" from task \"%s\" in project", pkg, taskID)
}
}
taskDefinitions, err := e.getTaskDefinitionChain(taskID, taskName)
if err != nil {
return err
}
taskDefinition, err := fs.MergeTaskDefinitions(taskDefinitions)
if err != nil {
return err
}
// Skip this iteration of the loop if we've already seen this taskID
if visited.Includes(taskID) {
continue
}
visited.Add(taskID)
// Put this taskDefinition into the Graph so we can look it up later during execution.
e.completeGraph.TaskDefinitions[taskID] = taskDefinition
topoDeps := util.SetFromStrings(taskDefinition.TopologicalDependencies)
deps := make(util.Set)
isPackageTask := util.IsPackageTask(taskName)
for _, dependency := range taskDefinition.TaskDependencies {
// If the current task is a workspace-specific task (including root Task)
// and its dependency is _also_ a workspace-specific task, we need to add
// a reference to this dependency directly into the engine.
// TODO @mehulkar: Why do we need this?
if isPackageTask && util.IsPackageTask(dependency) {
if err := e.AddDep(dependency, taskName); err != nil {
return err
}
} else {
// For non-workspace-specific dependencies, we attach a reference to
// the task that is added into the engine.
deps.Add(dependency)
}
}
// Filter down the tasks if there's a filter in place
// https: //turbo.build/repo/docs/reference/command-line-reference#--only
if tasksOnly {
deps = deps.Filter(func(d interface{}) bool {
for _, target := range taskNames {
return fmt.Sprintf("%v", d) == target
}
return false
})
topoDeps = topoDeps.Filter(func(d interface{}) bool {
for _, target := range taskNames {
return fmt.Sprintf("%v", d) == target
}
return false
})
}
toTaskID := taskID
// hasTopoDeps will be true if the task depends on any tasks from dependency packages
// E.g. `dev: { dependsOn: [^dev] }`
hasTopoDeps := topoDeps.Len() > 0 && e.completeGraph.WorkspaceGraph.DownEdges(pkg).Len() > 0
// hasDeps will be true if the task depends on any tasks from its own package
// E.g. `build: { dependsOn: [dev] }`
hasDeps := deps.Len() > 0
// hasPackageTaskDeps will be true if this is a workspace-specific task, and
// it depends on another workspace-specific tasks
// E.g. `my-package#build: { dependsOn: [my-package#beforebuild] }`.
hasPackageTaskDeps := false
if _, ok := e.PackageTaskDeps[toTaskID]; ok {
hasPackageTaskDeps = true
}
if hasTopoDeps {
depPkgs := e.completeGraph.WorkspaceGraph.DownEdges(pkg)
for _, from := range topoDeps.UnsafeListOfStrings() {
// add task dep from all the package deps within repo
for depPkg := range depPkgs {
fromTaskID := util.GetTaskId(depPkg, from)
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
}
}
if hasDeps {
for _, from := range deps.UnsafeListOfStrings() {
fromTaskID := util.GetTaskId(pkg, from)
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
}
if hasPackageTaskDeps {
if pkgTaskDeps, ok := e.PackageTaskDeps[toTaskID]; ok {
for _, fromTaskID := range pkgTaskDeps {
e.TaskGraph.Add(fromTaskID)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
traversalQueue = append(traversalQueue, fromTaskID)
}
}
}
// Add the root node into the graph
if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps {
e.TaskGraph.Add(ROOT_NODE_NAME)
e.TaskGraph.Add(toTaskID)
e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME))
}
}
return nil
}
// AddTask adds root tasks to the engine so they can be looked up later.
func (e *Engine) AddTask(taskName string) {
if util.IsPackageTask(taskName) {
pkg, taskName := util.GetPackageTaskFromId(taskName)
if pkg == util.RootPkgName {
e.rootEnabledTasks.Add(taskName)
}
}
}
// AddDep adds tuples from+to task ID combos in tuple format so they can be looked up later.
func (e *Engine) AddDep(fromTaskID string, toTaskID string) error {
fromPkg, _ := util.GetPackageTaskFromId(fromTaskID)
if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.completeGraph.WorkspaceGraph.HasVertex(fromPkg) {
return fmt.Errorf("found reference to unknown package: %v in task %v", fromPkg, fromTaskID)
}
if _, ok := e.PackageTaskDeps[toTaskID]; !ok {
e.PackageTaskDeps[toTaskID] = []string{}
}
e.PackageTaskDeps[toTaskID] = append(e.PackageTaskDeps[toTaskID], fromTaskID)
return nil
}
// ValidatePersistentDependencies checks if any task dependsOn persistent tasks and throws
// an error if that task is actually implemented
func (e *Engine) ValidatePersistentDependencies(graph *graph.CompleteGraph, concurrency int) error {
var validationError error
persistentCount := 0
// Adding in a lock because otherwise walking the graph can introduce a data race
// (reproducible with `go test -race`)
var sema = util.NewSemaphore(1)
errs := e.TaskGraph.Walk(func(v dag.Vertex) error {
vertexName := dag.VertexName(v) // vertexName is a taskID
// No need to check the root node if that's where we are.
if strings.Contains(vertexName, ROOT_NODE_NAME) {
return nil
}
// Aquire a lock, because otherwise walking this group can cause a race condition
// writing to the same validationError var defined outside the Walk(). This shows
// up when running tests with the `-race` flag.
sema.Acquire()
defer sema.Release()
currentTaskDefinition, currentTaskExists := e.completeGraph.TaskDefinitions[vertexName]
if currentTaskExists && currentTaskDefinition.Persistent {
persistentCount++
}
currentPackageName, currentTaskName := util.GetPackageTaskFromId(vertexName)
// For each "downEdge" (i.e. each task that _this_ task dependsOn)
// check if the downEdge is a Persistent task, and if it actually has the script implemented
// in that package's package.json
for dep := range e.TaskGraph.DownEdges(vertexName) {
depTaskID := dep.(string)
// No need to check the root node
if strings.Contains(depTaskID, ROOT_NODE_NAME) {
return nil
}
// Parse the taskID of this dependency task
packageName, taskName := util.GetPackageTaskFromId(depTaskID)
// Get the Task Definition so we can check if it is Persistent
depTaskDefinition, taskExists := e.completeGraph.TaskDefinitions[depTaskID]
if !taskExists {
return fmt.Errorf("Cannot find task definition for %v in package %v", depTaskID, packageName)
}
// Get information about the package
pkg, pkgExists := graph.WorkspaceInfos.PackageJSONs[packageName]
if !pkgExists {
return fmt.Errorf("Cannot find package %v", packageName)
}
_, hasScript := pkg.Scripts[taskName]
// If both conditions are true set a value and break out of checking the dependencies
if depTaskDefinition.Persistent && hasScript {
validationError = fmt.Errorf(
"\"%s\" is a persistent task, \"%s\" cannot depend on it",
util.GetTaskId(packageName, taskName),
util.GetTaskId(currentPackageName, currentTaskName),
)
break
}
}
return nil
})
for _, err := range errs {
return fmt.Errorf("Validation failed: %v", err)
}
if validationError != nil {
return validationError
} else if persistentCount >= concurrency {
return fmt.Errorf("You have %v persistent tasks but `turbo` is configured for concurrency of %v. Set --concurrency to at least %v", persistentCount, concurrency, persistentCount+1)
}
return nil
}
// getTaskDefinitionChain gets a set of TaskDefinitions that apply to the taskID.
// These definitions should be merged by the consumer.
func (e *Engine) getTaskDefinitionChain(taskID string, taskName string) ([]fs.BookkeepingTaskDefinition, error) {
// Start a list of TaskDefinitions we've found for this TaskID
taskDefinitions := []fs.BookkeepingTaskDefinition{}
rootPipeline, err := e.completeGraph.GetPipelineFromWorkspace(util.RootPkgName, e.isSinglePackage)
if err != nil {
// It should be very unlikely that we can't find a root pipeline. Even for single package repos
// the pipeline is synthesized from package.json, so there should be _something_ here.
return nil, err
}
// Look for the taskDefinition in the root pipeline.
if rootTaskDefinition, err := rootPipeline.GetTask(taskID, taskName); err == nil {
taskDefinitions = append(taskDefinitions, *rootTaskDefinition)
}
// If we're in a single package repo, we can just exit with the TaskDefinition in the root pipeline
// since there are no workspaces, and we don't need to follow any extends keys.
if e.isSinglePackage {
if len(taskDefinitions) == 0 {
return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json", taskID)
}
return taskDefinitions, nil
}
// If the taskID is a root task (e.g. //#build), we don't need to look
// for a workspace task, since these can only be defined in the root turbo.json.
taskIDPackage, _ := util.GetPackageTaskFromId(taskID)
if taskIDPackage != util.RootPkgName && taskIDPackage != ROOT_NODE_NAME {
// If there is an error, we can ignore it, since turbo.json config is not required in the workspace.
if workspaceTurboJSON, err := e.completeGraph.GetTurboConfigFromWorkspace(taskIDPackage, e.isSinglePackage); err != nil {
// swallow the error where the config file doesn't exist, but bubble up other things
if !errors.Is(err, os.ErrNotExist) {
return nil, err
}
} else {
// Run some validations on a workspace turbo.json. Note that these validations are on
// the whole struct, and not relevant to the taskID we're looking at right now.
validationErrors := workspaceTurboJSON.Validate([]fs.TurboJSONValidation{
validateNoPackageTaskSyntax,
validateExtends,
})
if len(validationErrors) > 0 {
fullError := errors.New("Invalid turbo.json")
for _, validationErr := range validationErrors {
fullError = fmt.Errorf("%w\n - %s", fullError, validationErr)
}
return nil, fullError
}
// If there are no errors, we can (try to) add the TaskDefinition to our list.
if workspaceDefinition, ok := workspaceTurboJSON.Pipeline[taskName]; ok {
taskDefinitions = append(taskDefinitions, workspaceDefinition)
}
}
}
if len(taskDefinitions) == 0 {
return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json or \"%s\" workspace", taskID, taskIDPackage)
}
return taskDefinitions, nil
}
func validateNoPackageTaskSyntax(turboJSON *fs.TurboJSON) []error {
errors := []error{}
for taskIDOrName := range turboJSON.Pipeline {
if util.IsPackageTask(taskIDOrName) {
taskName := util.StripPackageName(taskIDOrName)
errors = append(errors, fmt.Errorf("\"%s\". Use \"%s\" instead", taskIDOrName, taskName))
}
}
return errors
}
func validateExtends(turboJSON *fs.TurboJSON) []error {
extendErrors := []error{}
extends := turboJSON.Extends
// TODO(mehulkar): Enable extending from more than one workspace.
if len(extends) > 1 {
extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace"))
}
// We don't support this right now
if len(extends) == 0 {
extendErrors = append(extendErrors, fmt.Errorf("No \"extends\" key found"))
}
// TODO(mehulkar): Enable extending from non-root workspace.
if len(extends) == 1 && extends[0] != util.RootPkgName {
extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace"))
}
return extendErrors
}