Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: DataDog/dd-trace-go
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.50.0
Choose a base ref
...
head repository: DataDog/dd-trace-go
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.50.1
Choose a head ref
  • 3 commits
  • 6 files changed
  • 2 contributors

Commits on May 1, 2023

  1. Copy the full SHA
    59ce8e2 View commit details
  2. Copy the full SHA
    2a82a8e View commit details

Commits on May 2, 2023

  1. Copy the full SHA
    596353a View commit details
Showing with 119 additions and 44 deletions.
  1. +68 −0 ddtrace/tracer/propagating_tags.go
  2. +1 −1 ddtrace/tracer/span.go
  3. +5 −22 ddtrace/tracer/spancontext.go
  4. +23 −20 ddtrace/tracer/textmap.go
  5. +21 −0 ddtrace/tracer/textmap_test.go
  6. +1 −1 internal/version/version.go
68 changes: 68 additions & 0 deletions ddtrace/tracer/propagating_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracer

func (t *trace) hasPropagatingTag(k string) bool {
t.mu.RLock()
defer t.mu.RUnlock()
_, ok := t.propagatingTags[k]
return ok
}

func (t *trace) propagatingTag(k string) string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.propagatingTags[k]
}

// setPropagatingTag sets the key/value pair as a trace propagating tag.
func (t *trace) setPropagatingTag(key, value string) {
t.mu.Lock()
defer t.mu.Unlock()
t.setPropagatingTagLocked(key, value)
}

// setPropagatingTagLocked sets the key/value pair as a trace propagating tag.
// Not safe for concurrent use, setPropagatingTag should be used instead in that case.
func (t *trace) setPropagatingTagLocked(key, value string) {
if t.propagatingTags == nil {
t.propagatingTags = make(map[string]string, 1)
}
t.propagatingTags[key] = value
}

// unsetPropagatingTag deletes the key/value pair from the trace's propagated tags.
func (t *trace) unsetPropagatingTag(key string) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.propagatingTags, key)
}

// iteratePropagatingTags allows safe iteration through the propagating tags of a trace.
// the trace must not be modified during this call, as it is locked for reading.
//
// f should return whether or not the iteration should continue.
func (t *trace) iteratePropagatingTags(f func(k, v string) bool) {
t.mu.RLock()
defer t.mu.RUnlock()
for k, v := range t.propagatingTags {
if !f(k, v) {
break
}
}
}

func (t *trace) replacePropagatingTags(tags map[string]string) {
t.mu.Lock()
defer t.mu.Unlock()
t.propagatingTags = tags
}

func (t *trace) propagatingTagsLen() int {
t.mu.RLock()
defer t.mu.RUnlock()
return len(t.propagatingTags)
}
2 changes: 1 addition & 1 deletion ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
@@ -221,7 +221,7 @@ func (s *span) SetUser(id string, opts ...UserMonitoringOption) {
} else {
// Unset the propagated user ID so that a propagated user ID coming from upstream won't be propagated anymore.
trace.unsetPropagatingTag(keyPropagatedUserID)
if _, ok := trace.propagatingTags[keyPropagatedUserID]; ok {
if trace.hasPropagatingTag(keyPropagatedUserID) {
s.context.updated = true
}
delete(root.Meta, keyPropagatedUserID)
27 changes: 5 additions & 22 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
@@ -301,33 +301,16 @@ func (t *trace) drop() {
}

func (t *trace) setTag(key, value string) {
if t.tags == nil {
t.tags = make(map[string]string, 1)
}
t.tags[key] = value
}

// setPropagatingTag sets the key/value pair as a trace propagating tag.
func (t *trace) setPropagatingTag(key, value string) {
t.mu.Lock()
defer t.mu.Unlock()
t.setPropagatingTagLocked(key, value)
t.setTagLocked(key, value)
}

// setPropagatingTagLocked sets the key/value pair as a trace propagating tag.
// Not safe for concurrent use, setPropagatingTag should be used instead in that case.
func (t *trace) setPropagatingTagLocked(key, value string) {
if t.propagatingTags == nil {
t.propagatingTags = make(map[string]string, 1)
func (t *trace) setTagLocked(key, value string) {
if t.tags == nil {
t.tags = make(map[string]string, 1)
}
t.propagatingTags[key] = value
}

// unsetPropagatingTag deletes the key/value pair from the trace's propagated tags.
func (t *trace) unsetPropagatingTag(key string) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.propagatingTags, key)
t.tags[key] = value
}

func (t *trace) setSamplingPriorityLocked(p int, sampler samplernames.SamplerName) {
43 changes: 23 additions & 20 deletions ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
@@ -321,26 +321,30 @@ func (p *propagator) marshalPropagatingTags(ctx *spanContext) string {
if ctx.trace == nil {
return ""
}
ctx.trace.mu.Lock()
defer ctx.trace.mu.Unlock()
for k, v := range ctx.trace.propagatingTags {

var properr string
ctx.trace.iteratePropagatingTags(func(k, v string) bool {
if err := isValidPropagatableTag(k, v); err != nil {
log.Warn("Won't propagate tag '%s': %v", k, err.Error())
ctx.trace.setTag(keyPropagationError, "encoding_error")
continue
properr = "encoding_error"
return true
}
if sb.Len()+len(k)+len(v) > p.cfg.MaxTagsHeaderLen {
sb.Reset()
log.Warn("Won't propagate tag: maximum trace tags header len (%d) reached.", p.cfg.MaxTagsHeaderLen)
ctx.trace.setTag(keyPropagationError, "inject_max_size")
break
properr = "inject_max_size"
return false
}
if sb.Len() > 0 {
sb.WriteByte(',')
}
sb.WriteString(k)
sb.WriteByte('=')
sb.WriteString(v)
return true
})
if properr != "" {
ctx.trace.setTag(keyPropagationError, properr)
}
return sb.String()
}
@@ -394,7 +398,7 @@ func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
}
if ctx.trace != nil {
// TODO: this always assumed it was valid so I copied that logic here, maybe we shouldn't
ctx.traceID.SetUpperFromHex(ctx.trace.propagatingTags[keyTraceID128])
ctx.traceID.SetUpperFromHex(ctx.trace.propagatingTag(keyTraceID128))
}
if ctx.traceID.Empty() || (ctx.spanID == 0 && ctx.origin != "synthetics") {
return nil, ErrSpanContextNotFound
@@ -407,19 +411,17 @@ func unmarshalPropagatingTags(ctx *spanContext, v string) {
if ctx.trace == nil {
ctx.trace = newTrace()
}
ctx.trace.mu.Lock()
defer ctx.trace.mu.Unlock()
if len(v) > propagationExtractMaxSize {
log.Warn("Did not extract %s, size limit exceeded: %d. Incoming tags will not be propagated further.", traceTagsHeader, propagationExtractMaxSize)
ctx.trace.setTag(keyPropagationError, "extract_max_size")
return
}
var err error
ctx.trace.propagatingTags, err = parsePropagatableTraceTags(v)
tags, err := parsePropagatableTraceTags(v)
if err != nil {
log.Warn("Did not extract %s: %v. Incoming tags will not be propagated further.", traceTagsHeader, err.Error())
ctx.trace.setTag(keyPropagationError, "decoding_error")
}
ctx.trace.replacePropagatingTags(tags)
}

// setPropagatingTag adds the key value pair to the map of propagating tags on the trace,
@@ -671,11 +673,11 @@ func (*propagatorW3c) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapW
// or the tracestateHeader doesn't start with `dd=`
// we need to recreate tracestate
if ctx.updated ||
(ctx.trace != nil && ctx.trace.propagatingTags != nil && !strings.HasPrefix(ctx.trace.propagatingTags[tracestateHeader], "dd=")) ||
len(ctx.trace.propagatingTags[tracestateHeader]) == 0 {
writer.Set(tracestateHeader, composeTracestate(ctx, p, ctx.trace.propagatingTags[tracestateHeader]))
(ctx.trace != nil && !strings.HasPrefix(ctx.trace.propagatingTag(tracestateHeader), "dd=")) ||
ctx.trace.propagatingTagsLen() == 0 {
writer.Set(tracestateHeader, composeTracestate(ctx, p, ctx.trace.propagatingTag(tracestateHeader)))
} else {
writer.Set(tracestateHeader, ctx.trace.propagatingTags[tracestateHeader])
writer.Set(tracestateHeader, ctx.trace.propagatingTag(tracestateHeader))
}
return nil
}
@@ -728,21 +730,22 @@ func composeTracestate(ctx *spanContext, priority int, oldState string) string {
strings.ReplaceAll(oWithSub, "=", "~")))
}

for k, v := range ctx.trace.propagatingTags {
ctx.trace.iteratePropagatingTags(func(k, v string) bool {
if !strings.HasPrefix(k, "_dd.p.") {
continue
return true
}
// Datadog propagating tags must be appended to the tracestateHeader
// with the `t.` prefix. Tag value must have all `=` signs replaced with a tilde (`~`).
tag := fmt.Sprintf("t.%s:%s",
keyRgx.ReplaceAllString(k[len("_dd.p."):], "_"),
strings.ReplaceAll(valueRgx.ReplaceAllString(v, "_"), "=", "~"))
if b.Len()+len(tag) > 256 {
break
return false
}
b.WriteString(";")
b.WriteString(tag)
}
return true
})
// the old state is split by vendors, must be concatenated with a `,`
if len(oldState) == 0 {
return b.String()
21 changes: 21 additions & 0 deletions ddtrace/tracer/textmap_test.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"testing"

"github.com/DataDog/datadog-go/v5/statsd"
@@ -1931,3 +1932,23 @@ func FuzzExtractTraceID128(f *testing.F) {
extractTraceID128(ctx, v) // make sure it doesn't panic
})
}

// Regression test for https://github.com/DataDog/dd-trace-go/issues/1944
func TestPropagatingTagsConcurrency(_ *testing.T) {
// This test ensures Injection can be done concurrently.
trc := newTracer()
defer trc.Stop()

var wg sync.WaitGroup
for i := 0; i < 1_000; i++ {
root := trc.StartSpan("test")
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
trc.Inject(root.Context(), TextMapCarrier(make(map[string]string)))
}()
}
wg.Wait()
}
}
2 changes: 1 addition & 1 deletion internal/version/version.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ import (
// Tag specifies the current release tag. It needs to be manually
// updated. A test checks that the value of Tag never points to a
// git tag that is older than HEAD.
const Tag = "v1.50.0"
const Tag = "v1.50.1"

// Dissected version number. Filled during init()
var (