-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
taskhash.go
377 lines (334 loc) · 12.2 KB
/
taskhash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
// Package taskhash handles calculating dependency hashes for nodes in the task execution graph.
package taskhash
import (
"fmt"
"sort"
"strings"
"sync"
"github.com/hashicorp/go-hclog"
"github.com/pyr-sh/dag"
"github.com/vercel/turbo/cli/internal/env"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/hashing"
"github.com/vercel/turbo/cli/internal/inference"
"github.com/vercel/turbo/cli/internal/nodes"
"github.com/vercel/turbo/cli/internal/runsummary"
"github.com/vercel/turbo/cli/internal/turbopath"
"github.com/vercel/turbo/cli/internal/util"
"github.com/vercel/turbo/cli/internal/workspace"
"golang.org/x/sync/errgroup"
)
// Tracker caches package-inputs hashes, as well as package-task hashes.
// package-inputs hashes must be calculated before package-task hashes,
// and package-task hashes must be calculated in topographical order.
// package-task hashing is threadsafe, provided topographical order is
// respected.
type Tracker struct {
rootNode string
globalHash string
pipeline fs.Pipeline
packageInputsHashes map[string]string
// packageInputsExpandedHashes is a map of a hashkey to a list of files that are inputs to the task.
// Writes to this map happen during CalculateFileHash(). Since this happens synchronously
// before walking the task graph, it does not need to be protected by a mutex.
packageInputsExpandedHashes map[string]map[turbopath.AnchoredUnixPath]string
// mu is a mutex that we can lock/unlock to read/write from maps
// the fields below should be protected by the mutex.
mu sync.RWMutex
packageTaskEnvVars map[string]env.DetailedMap // taskId -> envvar pairs that affect the hash.
packageTaskHashes map[string]string // taskID -> hash
packageTaskFramework map[string]string // taskID -> inferred framework for package
packageTaskOutputs map[string][]turbopath.AnchoredSystemPath
packageTaskCacheStatus map[string]runsummary.TaskCacheSummary
}
// NewTracker creates a tracker for package-inputs combinations and package-task combinations.
func NewTracker(rootNode string, globalHash string, pipeline fs.Pipeline) *Tracker {
return &Tracker{
rootNode: rootNode,
globalHash: globalHash,
pipeline: pipeline,
packageTaskHashes: make(map[string]string),
packageTaskFramework: make(map[string]string),
packageTaskEnvVars: make(map[string]env.DetailedMap),
packageTaskOutputs: make(map[string][]turbopath.AnchoredSystemPath),
packageTaskCacheStatus: make(map[string]runsummary.TaskCacheSummary),
}
}
// packageFileHashInputs defines a combination of a package and optional set of input globs
type packageFileHashInputs struct {
taskID string
taskDefinition *fs.TaskDefinition
packageName string
}
// CalculateFileHashes hashes each unique package-inputs combination that is present
// in the task graph. Must be called before calculating task hashes.
func (th *Tracker) CalculateFileHashes(
allTasks []dag.Vertex,
workerCount int,
workspaceInfos workspace.Catalog,
taskDefinitions map[string]*fs.TaskDefinition,
repoRoot turbopath.AbsoluteSystemPath,
) error {
hashTasks := make(util.Set)
for _, v := range allTasks {
taskID, ok := v.(string)
if !ok {
return fmt.Errorf("unknown task %v", taskID)
}
if taskID == th.rootNode {
continue
}
packageName, _ := util.GetPackageTaskFromId(taskID)
if packageName == th.rootNode {
continue
}
taskDefinition, ok := taskDefinitions[taskID]
if !ok {
return fmt.Errorf("missing pipeline entry %v", taskID)
}
pfs := &packageFileHashInputs{
taskID,
taskDefinition,
packageName,
}
hashTasks.Add(pfs)
}
hashes := make(map[string]string, len(hashTasks))
hashObjects := make(map[string]map[turbopath.AnchoredUnixPath]string, len(hashTasks))
hashQueue := make(chan *packageFileHashInputs, workerCount)
hashErrs := &errgroup.Group{}
for i := 0; i < workerCount; i++ {
hashErrs.Go(func() error {
for packageFileHashInputs := range hashQueue {
pkg, ok := workspaceInfos.PackageJSONs[packageFileHashInputs.packageName]
if !ok {
return fmt.Errorf("cannot find package %v", packageFileHashInputs.packageName)
}
// Get the hashes of each file, keyed by the path.
hashObject, err := hashing.GetPackageFileHashes(repoRoot, pkg.Dir, packageFileHashInputs.taskDefinition.Inputs)
if err != nil {
return err
}
// Get the combined hash of all the files.
hash, err := fs.HashObject(hashObject)
if err != nil {
return err
}
// Save off the hash information, keyed by package task.
th.mu.Lock()
hashes[packageFileHashInputs.taskID] = hash
hashObjects[packageFileHashInputs.taskID] = hashObject
th.mu.Unlock()
}
return nil
})
}
for ht := range hashTasks {
hashQueue <- ht.(*packageFileHashInputs)
}
close(hashQueue)
err := hashErrs.Wait()
if err != nil {
return err
}
th.packageInputsHashes = hashes
th.packageInputsExpandedHashes = hashObjects
return nil
}
type taskHashable struct {
packageDir turbopath.AnchoredUnixPath
hashOfFiles string
externalDepsHash string
task string
outputs fs.TaskOutputs
passThruArgs []string
envMode util.EnvMode
passthroughEnv []string
hashableEnvPairs []string
globalHash string
taskDependencyHashes []string
}
type oldTaskHashable struct {
packageDir turbopath.AnchoredUnixPath
hashOfFiles string
externalDepsHash string
task string
outputs fs.TaskOutputs
passThruArgs []string
hashableEnvPairs []string
globalHash string
taskDependencyHashes []string
}
// calculateTaskHashFromHashable returns a hash string from the taskHashable
func calculateTaskHashFromHashable(full *taskHashable, useOldTaskHashable bool) (string, error) {
// The user is not using the strict environment variables feature.
if useOldTaskHashable {
return fs.HashObject(&oldTaskHashable{
packageDir: full.packageDir,
hashOfFiles: full.hashOfFiles,
externalDepsHash: full.externalDepsHash,
task: full.task,
outputs: full.outputs,
passThruArgs: full.passThruArgs,
hashableEnvPairs: full.hashableEnvPairs,
globalHash: full.globalHash,
taskDependencyHashes: full.taskDependencyHashes,
})
}
switch full.envMode {
case util.Loose:
// Remove the passthroughs from hash consideration if we're explicitly loose.
full.passthroughEnv = nil
return fs.HashObject(full)
case util.Strict:
// Collapse `nil` and `[]` in strict mode.
if full.passthroughEnv == nil {
full.passthroughEnv = make([]string, 0)
}
return fs.HashObject(full)
case util.Infer:
panic("task inferred status should have already been resolved")
default:
panic("unimplemented environment mode")
}
}
func (th *Tracker) calculateDependencyHashes(dependencySet dag.Set) ([]string, error) {
dependencyHashSet := make(util.Set)
rootPrefix := th.rootNode + util.TaskDelimiter
th.mu.RLock()
defer th.mu.RUnlock()
for _, dependency := range dependencySet {
if dependency == th.rootNode {
continue
}
dependencyTask, ok := dependency.(string)
if !ok {
return nil, fmt.Errorf("unknown task: %v", dependency)
}
if strings.HasPrefix(dependencyTask, rootPrefix) {
continue
}
dependencyHash, ok := th.packageTaskHashes[dependencyTask]
if !ok {
return nil, fmt.Errorf("missing hash for dependent task: %v", dependencyTask)
}
dependencyHashSet.Add(dependencyHash)
}
dependenciesHashList := dependencyHashSet.UnsafeListOfStrings()
sort.Strings(dependenciesHashList)
return dependenciesHashList, nil
}
// CalculateTaskHash calculates the hash for package-task combination. It is threadsafe, provided
// that it has previously been called on its task-graph dependencies. File hashes must be calculated
// first.
func (th *Tracker) CalculateTaskHash(logger hclog.Logger, packageTask *nodes.PackageTask, dependencySet dag.Set, frameworkInference bool, args []string, useOldTaskHashable bool) (string, error) {
hashOfFiles, ok := th.packageInputsHashes[packageTask.TaskID]
if !ok {
return "", fmt.Errorf("cannot find package-file hash for %v", packageTask.TaskID)
}
var keyMatchers []string
var framework *inference.Framework
envVarContainingExcludePrefix := ""
if frameworkInference {
envVarContainingExcludePrefix = "TURBO_CI_VENDOR_ENV_KEY"
framework = inference.InferFramework(packageTask.Pkg)
if framework != nil && framework.EnvMatcher != "" {
// log auto detected framework and env prefix
logger.Debug(fmt.Sprintf("auto detected framework for %s", packageTask.PackageName), "framework", framework.Slug, "env_prefix", framework.EnvMatcher)
keyMatchers = append(keyMatchers, framework.EnvMatcher)
}
}
envVars, err := env.GetHashableEnvVars(
packageTask.TaskDefinition.EnvVarDependencies,
keyMatchers,
envVarContainingExcludePrefix,
)
if err != nil {
return "", err
}
hashableEnvPairs := envVars.All.ToHashable()
outputs := packageTask.HashableOutputs()
taskDependencyHashes, err := th.calculateDependencyHashes(dependencySet)
if err != nil {
return "", err
}
// log any auto detected env vars
logger.Debug(fmt.Sprintf("task hash env vars for %s:%s", packageTask.PackageName, packageTask.Task), "vars", hashableEnvPairs)
hash, err := calculateTaskHashFromHashable(&taskHashable{
packageDir: packageTask.Pkg.Dir.ToUnixPath(),
hashOfFiles: hashOfFiles,
externalDepsHash: packageTask.Pkg.ExternalDepsHash,
task: packageTask.Task,
outputs: outputs,
passThruArgs: args,
envMode: packageTask.EnvMode,
passthroughEnv: packageTask.TaskDefinition.PassthroughEnv,
hashableEnvPairs: hashableEnvPairs,
globalHash: th.globalHash,
taskDependencyHashes: taskDependencyHashes,
}, useOldTaskHashable)
if err != nil {
return "", fmt.Errorf("failed to hash task %v: %v", packageTask.TaskID, hash)
}
th.mu.Lock()
th.packageTaskEnvVars[packageTask.TaskID] = envVars
th.packageTaskHashes[packageTask.TaskID] = hash
if framework != nil {
th.packageTaskFramework[packageTask.TaskID] = framework.Slug
}
th.mu.Unlock()
return hash, nil
}
// GetExpandedInputs gets the expanded set of inputs for a given PackageTask
func (th *Tracker) GetExpandedInputs(packageTask *nodes.PackageTask) map[turbopath.AnchoredUnixPath]string {
expandedInputs := th.packageInputsExpandedHashes[packageTask.TaskID]
inputsCopy := make(map[turbopath.AnchoredUnixPath]string, len(expandedInputs))
for path, hash := range expandedInputs {
inputsCopy[path] = hash
}
return inputsCopy
}
// GetEnvVars returns the hashed env vars for a given taskID
func (th *Tracker) GetEnvVars(taskID string) env.DetailedMap {
th.mu.RLock()
defer th.mu.RUnlock()
return th.packageTaskEnvVars[taskID]
}
// GetFramework returns the inferred framework for a given taskID
func (th *Tracker) GetFramework(taskID string) string {
th.mu.RLock()
defer th.mu.RUnlock()
return th.packageTaskFramework[taskID]
}
// GetExpandedOutputs returns a list of outputs for a given taskID
func (th *Tracker) GetExpandedOutputs(taskID string) []turbopath.AnchoredSystemPath {
th.mu.RLock()
defer th.mu.RUnlock()
outputs, ok := th.packageTaskOutputs[taskID]
if !ok {
return []turbopath.AnchoredSystemPath{}
}
return outputs
}
// SetExpandedOutputs a list of outputs for a given taskID so it can be read later
func (th *Tracker) SetExpandedOutputs(taskID string, outputs []turbopath.AnchoredSystemPath) {
th.mu.Lock()
defer th.mu.Unlock()
th.packageTaskOutputs[taskID] = outputs
}
// SetCacheStatus records the task status for the given taskID
func (th *Tracker) SetCacheStatus(taskID string, cacheSummary runsummary.TaskCacheSummary) {
th.mu.Lock()
defer th.mu.Unlock()
th.packageTaskCacheStatus[taskID] = cacheSummary
}
// GetCacheStatus records the task status for the given taskID
func (th *Tracker) GetCacheStatus(taskID string) runsummary.TaskCacheSummary {
th.mu.Lock()
defer th.mu.Unlock()
if status, ok := th.packageTaskCacheStatus[taskID]; ok {
return status
}
// Return an empty one, all the fields will be false and 0
return runsummary.TaskCacheSummary{}
}