-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
runcache.go
355 lines (313 loc) · 12.5 KB
/
runcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
package runcache
import (
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/fatih/color"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/cli"
"github.com/vercel/turbo/cli/internal/cache"
"github.com/vercel/turbo/cli/internal/colorcache"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/globby"
"github.com/vercel/turbo/cli/internal/logstreamer"
"github.com/vercel/turbo/cli/internal/nodes"
"github.com/vercel/turbo/cli/internal/turbopath"
"github.com/vercel/turbo/cli/internal/ui"
"github.com/vercel/turbo/cli/internal/util"
)
// LogReplayer is a function that is responsible for replaying the contents of a given log file
type LogReplayer = func(logger hclog.Logger, output *cli.PrefixedUi, logFile turbopath.AbsoluteSystemPath)
// Opts holds the configurable options for a RunCache instance
type Opts struct {
SkipReads bool
SkipWrites bool
TaskOutputModeOverride *util.TaskOutputMode
LogReplayer LogReplayer
OutputWatcher OutputWatcher
}
// SetTaskOutputMode parses the task output mode from string and then sets it in opts
func (opts *Opts) SetTaskOutputMode(value string) error {
outputMode, err := util.FromTaskOutputModeString(value)
if err != nil {
return fmt.Errorf("must be one of \"%v\"", TaskOutputModes())
}
opts.TaskOutputModeOverride = &outputMode
return nil
}
// TaskOutputModes creates the description string for task outputs
func TaskOutputModes() string {
var builder strings.Builder
first := true
for _, mode := range util.TaskOutputModeStrings {
if !first {
builder.WriteString("|")
}
first = false
builder.WriteString(string(mode))
}
return builder.String()
}
// RunCache represents the interface to the cache for a single `turbo run`
type RunCache struct {
taskOutputModeOverride *util.TaskOutputMode
cache cache.Cache
readsDisabled bool
writesDisabled bool
repoRoot turbopath.AbsoluteSystemPath
logReplayer LogReplayer
outputWatcher OutputWatcher
colorCache *colorcache.ColorCache
}
// New returns a new instance of RunCache, wrapping the given cache
func New(cache cache.Cache, repoRoot turbopath.AbsoluteSystemPath, opts Opts, colorCache *colorcache.ColorCache) *RunCache {
rc := &RunCache{
taskOutputModeOverride: opts.TaskOutputModeOverride,
cache: cache,
readsDisabled: opts.SkipReads,
writesDisabled: opts.SkipWrites,
repoRoot: repoRoot,
logReplayer: opts.LogReplayer,
outputWatcher: opts.OutputWatcher,
colorCache: colorCache,
}
if rc.logReplayer == nil {
rc.logReplayer = defaultLogReplayer
}
if rc.outputWatcher == nil {
rc.outputWatcher = &NoOpOutputWatcher{}
}
return rc
}
// TaskCache represents a single task's (package-task?) interface to the RunCache
// and controls access to the task's outputs
type TaskCache struct {
ExpandedOutputs []turbopath.AnchoredSystemPath
rc *RunCache
repoRelativeGlobs fs.TaskOutputs
hash string
pt *nodes.PackageTask
taskOutputMode util.TaskOutputMode
cachingDisabled bool
LogFileName turbopath.AbsoluteSystemPath
}
// RestoreOutputs attempts to restore output for the corresponding task from the cache.
// Returns the cacheStatus, the timeSaved, and error values, so the consumer can understand
// what happened in here.
func (tc *TaskCache) RestoreOutputs(ctx context.Context, prefixedUI *cli.PrefixedUi, progressLogger hclog.Logger) (cache.ItemStatus, int, error) {
if tc.cachingDisabled || tc.rc.readsDisabled {
if tc.taskOutputMode != util.NoTaskOutput && tc.taskOutputMode != util.ErrorTaskOutput {
prefixedUI.Output(fmt.Sprintf("cache bypass, force executing %s", ui.Dim(tc.hash)))
}
return cache.ItemStatus{Local: false, Remote: false}, 0, nil
}
changedOutputGlobs, err := tc.rc.outputWatcher.GetChangedOutputs(ctx, tc.hash, tc.repoRelativeGlobs.Inclusions)
if err != nil {
progressLogger.Warn(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err))
prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to check if we can skip restoring outputs for %v: %v. Proceeding to check cache", tc.pt.TaskID, err)))
changedOutputGlobs = tc.repoRelativeGlobs.Inclusions
}
hasChangedOutputs := len(changedOutputGlobs) > 0
var cacheStatus cache.ItemStatus
var timeSaved int
if hasChangedOutputs {
// Note that we currently don't use the output globs when restoring, but we could in the
// future to avoid doing unnecessary file I/O. We also need to pass along the exclusion
// globs as well.
itemStatus, restoredFiles, duration, err := tc.rc.cache.Fetch(tc.rc.repoRoot, tc.hash, nil)
hit := itemStatus.Local || itemStatus.Remote
timeSaved = duration
tc.ExpandedOutputs = restoredFiles
// Assign to this variable outside this closure so we can return at the end of the function
cacheStatus = itemStatus
if err != nil {
// If there was an error fetching from cache, we'll say there was no cache hit
return cache.ItemStatus{Local: false, Remote: false}, 0, err
} else if !hit {
if tc.taskOutputMode != util.NoTaskOutput && tc.taskOutputMode != util.ErrorTaskOutput {
prefixedUI.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(tc.hash)))
}
// If there was no hit, we can also say there was no hit
return cache.ItemStatus{Local: false, Remote: false}, 0, nil
}
if err := tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs); err != nil {
// Don't fail the whole operation just because we failed to watch the outputs
prefixedUI.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err)))
}
} else {
// If no outputs have changed, that means we have a local cache hit.
cacheStatus.Local = true
prefixedUI.Warn(fmt.Sprintf("Skipping cache check for %v, outputs have not changed since previous run.", tc.pt.TaskID))
}
switch tc.taskOutputMode {
// When only showing new task output, cached output should only show the computed hash
case util.NewTaskOutput:
fallthrough
case util.HashTaskOutput:
prefixedUI.Info(fmt.Sprintf("cache hit, suppressing output %s", ui.Dim(tc.hash)))
case util.FullTaskOutput:
progressLogger.Debug("log file", "path", tc.LogFileName)
prefixedUI.Info(fmt.Sprintf("cache hit, replaying output %s", ui.Dim(tc.hash)))
tc.ReplayLogFile(prefixedUI, progressLogger)
case util.ErrorTaskOutput:
// The task succeeded, so we don't output anything in this case
default:
// NoLogs, do not output anything
}
// TODO: timeSaved could be part of cacheStatus, so we don't have to make a new struct
// downstream, but this would be a more invasive change right now.
return cacheStatus, timeSaved, nil
}
// ReplayLogFile writes out the stored logfile to the terminal
func (tc TaskCache) ReplayLogFile(prefixedUI *cli.PrefixedUi, progressLogger hclog.Logger) {
if tc.LogFileName.FileExists() {
tc.rc.logReplayer(progressLogger, prefixedUI, tc.LogFileName)
}
}
// OnError replays the logfile if --output-mode=errors-only.
// This is called if the task exited with an non-zero error code.
func (tc TaskCache) OnError(terminal *cli.PrefixedUi, logger hclog.Logger) {
if tc.taskOutputMode == util.ErrorTaskOutput {
terminal.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(tc.hash)))
tc.ReplayLogFile(terminal, logger)
}
}
// nopWriteCloser is modeled after io.NopCloser, which is for Readers
type nopWriteCloser struct {
io.Writer
}
func (nopWriteCloser) Close() error { return nil }
type fileWriterCloser struct {
io.Writer
file *os.File
bufio *bufio.Writer
}
func (fwc *fileWriterCloser) Close() error {
if err := fwc.bufio.Flush(); err != nil {
return err
}
return fwc.file.Close()
}
// OutputWriter creates a sink suitable for handling the output of the command associated
// with this task.
func (tc TaskCache) OutputWriter(prefix string) (io.WriteCloser, error) {
// an os.Stdout wrapper that will add prefixes before printing to stdout
stdoutWriter := logstreamer.NewPrettyStdoutWriter(prefix)
if tc.cachingDisabled || tc.rc.writesDisabled {
return nopWriteCloser{stdoutWriter}, nil
}
// Setup log file
if err := tc.LogFileName.EnsureDir(); err != nil {
return nil, err
}
output, err := tc.LogFileName.Create()
if err != nil {
return nil, err
}
bufWriter := bufio.NewWriter(output)
fwc := &fileWriterCloser{
file: output,
bufio: bufWriter,
}
if tc.taskOutputMode == util.NoTaskOutput || tc.taskOutputMode == util.HashTaskOutput || tc.taskOutputMode == util.ErrorTaskOutput {
// only write to log file, not to stdout
fwc.Writer = bufWriter
} else {
fwc.Writer = io.MultiWriter(stdoutWriter, bufWriter)
}
return fwc, nil
}
var _emptyIgnore []string
// SaveOutputs is responsible for saving the outputs of task to the cache, after the task has completed
func (tc *TaskCache) SaveOutputs(ctx context.Context, logger hclog.Logger, terminal cli.Ui, duration int) error {
if tc.cachingDisabled || tc.rc.writesDisabled {
return nil
}
logger.Debug("caching output", "outputs", tc.repoRelativeGlobs)
filesToBeCached, err := globby.GlobAll(tc.rc.repoRoot.ToStringDuringMigration(), tc.repoRelativeGlobs.Inclusions, tc.repoRelativeGlobs.Exclusions)
if err != nil {
return err
}
relativePaths := make([]turbopath.AnchoredSystemPath, len(filesToBeCached))
for index, value := range filesToBeCached {
relativePath, err := tc.rc.repoRoot.RelativePathString(value)
if err != nil {
logger.Error(fmt.Sprintf("error: %v", err))
terminal.Error(fmt.Sprintf("%s%s", ui.ERROR_PREFIX, color.RedString(" %v", fmt.Errorf("File path cannot be made relative: %w", err))))
continue
}
relativePaths[index] = fs.UnsafeToAnchoredSystemPath(relativePath)
}
if err = tc.rc.cache.Put(tc.rc.repoRoot, tc.hash, duration, relativePaths); err != nil {
return err
}
err = tc.rc.outputWatcher.NotifyOutputsWritten(ctx, tc.hash, tc.repoRelativeGlobs)
if err != nil {
// Don't fail the cache write because we also failed to record it, we will just do
// extra I/O in the future restoring files that haven't changed from cache
logger.Warn(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err))
terminal.Warn(ui.Dim(fmt.Sprintf("Failed to mark outputs as cached for %v: %v", tc.pt.TaskID, err)))
}
tc.ExpandedOutputs = relativePaths
return nil
}
// TaskCache returns a TaskCache instance, providing an interface to the underlying cache specific
// to this run and the given PackageTask
func (rc *RunCache) TaskCache(pt *nodes.PackageTask, hash string) TaskCache {
logFileName := rc.repoRoot.UntypedJoin(pt.LogFile)
hashableOutputs := pt.HashableOutputs()
repoRelativeGlobs := fs.TaskOutputs{
Inclusions: make([]string, len(hashableOutputs.Inclusions)),
Exclusions: make([]string, len(hashableOutputs.Exclusions)),
}
for index, output := range hashableOutputs.Inclusions {
repoRelativeGlobs.Inclusions[index] = filepath.Join(pt.Pkg.Dir.ToStringDuringMigration(), output)
}
for index, output := range hashableOutputs.Exclusions {
repoRelativeGlobs.Exclusions[index] = filepath.Join(pt.Pkg.Dir.ToStringDuringMigration(), output)
}
taskOutputMode := pt.TaskDefinition.OutputMode
if rc.taskOutputModeOverride != nil {
taskOutputMode = *rc.taskOutputModeOverride
}
return TaskCache{
ExpandedOutputs: []turbopath.AnchoredSystemPath{},
rc: rc,
repoRelativeGlobs: repoRelativeGlobs,
hash: hash,
pt: pt,
taskOutputMode: taskOutputMode,
cachingDisabled: !pt.TaskDefinition.ShouldCache,
LogFileName: logFileName,
}
}
// defaultLogReplayer will try to replay logs back to the given Ui instance
func defaultLogReplayer(logger hclog.Logger, output *cli.PrefixedUi, logFileName turbopath.AbsoluteSystemPath) {
logger.Debug("start replaying logs")
f, err := logFileName.Open()
if err != nil {
output.Warn(fmt.Sprintf("error reading logs: %v", err))
logger.Error(fmt.Sprintf("error reading logs: %v", err.Error()))
}
defer func() { _ = f.Close() }()
scan := bufio.NewScanner(f)
for scan.Scan() {
str := string(scan.Bytes())
// cli.PrefixedUi won't prefix empty strings (it'll just print them as empty strings).
// So if we have a blank string, we'll just output the string here, instead of passing
// it onto the PrefixedUi.
if str == "" {
// Just output the prefix if the current line is a blank string
// Note: output.OutputPrefix is also a colored prefix already
output.Ui.Output(output.OutputPrefix)
} else {
// Writing to Stdout
output.Output(str)
}
}
logger.Debug("finish replaying logs")
}