Skip to content

Commit

Permalink
trace: make trace byte receiving synchronous
Browse files Browse the repository at this point in the history
This avoids the asynchrony related to the pipe+buffer+goroutine. Using
this approach, we can guarantee that once the `Write` call completes,
the data is stored in flightrecorder buffers. This is related to
https://go.dev/cl/562616.

There is a behavioral change: previously, in case of error the recorder
goroutine would just stop, which would essentially block the tracing
infra by not accepting the `Write`. Now, we actively stop tracing on any
error.

I did not do real performance tests, but it can't be very different:

Before:

    running go [test . -count 10]
    ok      golang.org/x/exp/trace  60.423s

After:

    running go [test . -count 10]
    ok      golang.org/x/exp/trace  60.394s

Change-Id: Ie900fec2b45f1c227c82e68f4f7af1902de3582b
Reviewed-on: https://go-review.googlesource.com/c/exp/+/566255
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Auto-Submit: Michael Knyszek <mknyszek@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
  • Loading branch information
aktau authored and gopherbot committed Feb 22, 2024
1 parent e579c86 commit 814bf88
Showing 1 changed file with 115 additions and 112 deletions.
227 changes: 115 additions & 112 deletions trace/flightrecorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
package trace

import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math/bits"
Expand All @@ -28,13 +29,9 @@ import (
//
// Only one flight recording may be active at any given time.
type FlightRecorder struct {
// State for coordinating with the recorder goroutine.
fromTracer *io.PipeReader
toRecorder *io.PipeWriter
recorderWait sync.WaitGroup
err error
err error

// State specific to the recorder goroutine.
// State specific to the recorder.
header [16]byte
active rawGeneration
ringMu sync.Mutex
Expand Down Expand Up @@ -100,6 +97,114 @@ func (r *FlightRecorder) SetSize(bytes int) {
r.targetSize = bytes
}

// A recorder receives bytes from the runtime tracer, processes it.
type recorder struct {
r *FlightRecorder

headerReceived bool
}

func (w *recorder) Write(p []byte) (n int, err error) {
r := w.r

defer func() {
if err != nil {
// Propagate errors to the flightrecorder.
if r.err == nil {
r.err = err
}
trace.Stop() // Stop the tracer, preventing further writes.
}
}()

rd := bytes.NewReader(p)

if !w.headerReceived {
if len(p) < len(r.header) {
return 0, fmt.Errorf("expected at least %d bytes in the first write", len(r.header))
}
rd.Read(r.header[:])
w.headerReceived = true
}

b, gen, err := readBatch(rd) // Every write from the runtime is guaranteed to be a complete batch.
if err == io.EOF {
if rd.Len() > 0 {
return len(p) - rd.Len(), errors.New("short read")
}
return len(p), nil
}
if err != nil {
return len(p) - rd.Len(), err
}

// Check if we're entering a new generation.
if r.active.gen != 0 && r.active.gen+1 == gen {
r.ringMu.Lock()

// Validate r.active.freq before we use it. It's required for a generation
// to not be considered broken, and without it, we can't correctly handle
// SetPeriod.
if r.active.freq == 0 {
return len(p) - rd.Len(), fmt.Errorf("broken trace: failed to find frequency event in generation %d", r.active.gen)
}

// Get the current trace clock time.
now := traceTimeNow(r.active.freq)

// Add the current generation to the ring. Make sure we always have at least one
// complete generation by putting the active generation onto the new list, regardless
// of whatever our settings are.
//
// N.B. Let's completely replace the ring here, so that WriteTo can just make a copy
// and not worry about aliasing. This creates allocations, but at a very low rate.
newRing := []rawGeneration{r.active}
size := r.active.size
for i := len(r.ring) - 1; i >= 0; i-- {
// Stop adding older generations if the new ring already exceeds the thresholds.
// This ensures we keep generations that cross a threshold, but not any that lie
// entirely outside it.
if size > r.wantSize || now.Sub(newRing[len(newRing)-1].minTraceTime()) > r.wantDur {
break
}
size += r.ring[i].size
newRing = append(newRing, r.ring[i])
}
slices.Reverse(newRing)
r.ring = newRing
r.ringMu.Unlock()

// Start a new active generation.
r.active = rawGeneration{}
}

// Obtain the frequency if this is a frequency batch.
if b.isFreqBatch() {
freq, err := parseFreq(b)
if err != nil {
return len(p) - rd.Len(), err
}
r.active.freq = freq
}

// Append the batch to the current generation.
if r.active.gen == 0 {
r.active.gen = gen
}
if r.active.minTime == 0 || r.active.minTime > b.time {
r.active.minTime = b.time
}
r.active.size += 1
r.active.size += uvarintSize(gen)
r.active.size += uvarintSize(uint64(b.m))
r.active.size += uvarintSize(uint64(b.time))
r.active.size += uvarintSize(uint64(len(b.data)))
r.active.size += len(b.data)
r.active.batches = append(r.active.batches, b)

return len(p) - rd.Len(), nil
}

// Start begins flight recording. Only one flight recorder or one call to [runtime/trace.Start]
// may be active at any given time. Returns an error if starting the flight recorder would
// violate this rule.
Expand All @@ -111,106 +216,13 @@ func (r *FlightRecorder) Start() error {
r.wantSize = r.targetSize
r.wantDur = r.targetPeriod
r.err = nil
r.fromTracer, r.toRecorder = io.Pipe()

// Start tracing, sending data to the recorder goroutine (not yet started) via an io.Pipe.
if err := trace.Start(r.toRecorder); err != nil {
// Start tracing, data is sent to a recorder which forwards it to our own
// storage.
if err := trace.Start(&recorder{r: r}); err != nil {
return err
}

// Start recorder goroutine.
r.recorderWait.Add(1)
go func() {
defer r.recorderWait.Done()

// Read in the header so we can tack it on to the front
// of whatever WriteTo emits later.
_, err := io.ReadFull(r.fromTracer, r.header[:])
if err != nil {
r.err = err
return
}

// Process the rest of the trace.
rd := bufio.NewReader(r.fromTracer)
for {
b, gen, err := readBatch(rd)
if err == io.EOF || err == io.ErrClosedPipe {
break
}
if err != nil {
r.err = err
return
}

// Check if we're entering a new generation.
if r.active.gen != 0 && r.active.gen+1 == gen {
r.ringMu.Lock()

// Validate r.active.freq before we use it. It's required for a generation
// to not be considered broken, and without it, we can't correctly handle
// SetPeriod.
if r.active.freq == 0 {
r.err = fmt.Errorf("broken trace: failed to find frequency event in generation %d", r.active.gen)
return
}

// Get the current trace clock time.
now := traceTimeNow(r.active.freq)

// Add the current generation to the ring. Make sure we always have at least one
// complete generation by putting the active generation onto the new list, regardless
// of whatever our settings are.
//
// N.B. Let's completely replace the ring here, so that WriteTo can just make a copy
// and not worry about aliasing. This creates allocations, but at a very low rate.
newRing := []rawGeneration{r.active}
size := r.active.size
for i := len(r.ring) - 1; i >= 0; i-- {
// Stop adding older generations if the new ring already exceeds the thresholds.
// This ensures we keep generations that cross a threshold, but not any that lie
// entirely outside it.
if size > r.wantSize || now.Sub(newRing[len(newRing)-1].minTraceTime()) > r.wantDur {
break
}
size += r.ring[i].size
newRing = append(newRing, r.ring[i])
}
slices.Reverse(newRing)
r.ring = newRing
r.ringMu.Unlock()

// Start a new active generation.
r.active = rawGeneration{}
}

// Obtain the frequency if this is a frequency batch.
if b.isFreqBatch() {
freq, err := parseFreq(b)
if err != nil {
r.err = err
return
}
r.active.freq = freq
}

// Append the batch to the current generation.
if r.active.gen == 0 {
r.active.gen = gen
}
if r.active.minTime == 0 || r.active.minTime > b.time {
r.active.minTime = b.time
}
r.active.size += 1
r.active.size += uvarintSize(gen)
r.active.size += uvarintSize(uint64(b.m))
r.active.size += uvarintSize(uint64(b.time))
r.active.size += uvarintSize(uint64(len(b.data)))
r.active.size += len(b.data)
r.active.batches = append(r.active.batches, b)
}
}()

r.enabled = true
return nil
}
Expand All @@ -224,18 +236,9 @@ func (r *FlightRecorder) Stop() error {
r.enabled = false
trace.Stop()

// Close the write side of the pipe. This is safe because tracing has stopped, so no more will
// be written to the pipe.
r.fromTracer.Close()

// Wait for the reader to exit.
r.recorderWait.Wait()

// Reset all state. No need to lock because the reader has already exited.
r.active = rawGeneration{}
r.ring = nil
r.toRecorder.Close()
r.fromTracer.Close()
return r.err
}

Expand Down

0 comments on commit 814bf88

Please sign in to comment.