Skip to content

Commit

Permalink
PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Mar 29, 2023
1 parent 3d28e75 commit a8bc6f8
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 8 deletions.
1 change: 1 addition & 0 deletions sdk/go.mod
Expand Up @@ -20,6 +20,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.15.0-rc.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
4 changes: 2 additions & 2 deletions sdk/go.sum
Expand Up @@ -19,12 +19,12 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.opentelemetry.io/otel/schema v0.0.4 h1:xgqNjF5c5oy7F1PDm4q6a6wDUJTm+po4jEiXmcN5ncI=
go.opentelemetry.io/otel/schema v0.0.4/go.mod h1:LBBdyW+43YB5XmeQtH4b2ET5k0hx7dh3yJgRGY4Qw+A=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
67 changes: 67 additions & 0 deletions sdk/resource/internal/schema/compare.go
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schema

import (
"fmt"
"net/url"
"strings"

"github.com/Masterminds/semver/v3"
)

type Comparison uint8

const (
invalidComparison Comparison = iota
EqualTo
GreaterThan
LessThan
)

// CompareVersions compares schema URL versions and returns the Comparison of a
// vs b (i.e. a is [comparison value] b).
func CompareVersions(a, b string) (Comparison, error) {
aVer, err := version(a)
if err != nil {
return invalidComparison, fmt.Errorf("invalid version for %q: %w", a, err)
}

bVer, err := version(b)
if err != nil {
return invalidComparison, fmt.Errorf("invalid version for %q: %w", b, err)
}

switch aVer.Compare(bVer) {
case -1:
return LessThan, nil
case 0:
return EqualTo, nil
case 1:
return GreaterThan, nil
default:
return invalidComparison, fmt.Errorf("unable to compare versions: %s, %s", aVer, bVer)
}
}

func version(schemaURL string) (*semver.Version, error) {
// https://github.com/open-telemetry/oteps/blob/main/text/0152-telemetry-schemas.md#schema-url
u, err := url.Parse(schemaURL)
if err != nil {
return nil, err
}

return semver.NewVersion(u.Path[strings.LastIndex(u.Path, "/")+1:])
}
82 changes: 82 additions & 0 deletions sdk/resource/internal/schema/registry.go
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schema

import (
"context"
"net/http"

sUtil "go.opentelemetry.io/otel/schema/v1.1"
"go.opentelemetry.io/otel/schema/v1.1/ast"
)

type cache struct {
data map[string]*ast.Schema
}

func (c *cache) lookup(key string, f func() (*ast.Schema, error)) (*ast.Schema, error) {
if c.data == nil {
s, err := f()
if err != nil {
return nil, err
}
c.data = map[string]*ast.Schema{key: s}
return s, nil
}

if s, ok := c.data[key]; ok {
return s, nil
}

s, err := f()
if err != nil {
return nil, err
}
c.data = map[string]*ast.Schema{key: s}
return s, nil
}

// Registry hold a registration of schema files. It will cache any schema files
// it gets from external URLs.
type Registry struct {
client *http.Client

cache cache
}

// NewRegistry returns a Registry that uses the HTTP client. If client is nil
// it will use the default client from "net/http".
func NewRegistry(client *http.Client) *Registry {
if client == nil {
client = http.DefaultClient
}
return &Registry{client: client}
}

// Get returns the Schema at the target schemaURL using the registry client.
func (r *Registry) Get(ctx context.Context, schemaURL string) (*ast.Schema, error) {
return r.cache.lookup(schemaURL, func() (*ast.Schema, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody)
if err != nil {
return nil, err
}
resp, err := r.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return sUtil.Parse(resp.Body)
})
}
21 changes: 15 additions & 6 deletions sdk/resource/internal/schema/upgrade.go
Expand Up @@ -25,9 +25,9 @@ import (
"go.opentelemetry.io/otel/schema/v1.1/types"
)

// Upgrade upgrades attrs in place using schema.
// Upgrade upgrades attrs in place with schema.
func Upgrade(schema *ast.Schema, attrs []attribute.KeyValue) error {
vers, err := versions(schema, false)
vers, err := versions(schema, nil, false)
if err != nil {
return fmt.Errorf("upgrade error: %w", err)
}
Expand All @@ -52,9 +52,14 @@ func Upgrade(schema *ast.Schema, attrs []attribute.KeyValue) error {
return nil
}

// Downgrade downgrade attrs in place using schema.
func Downgrade(schema *ast.Schema, attrs []attribute.KeyValue) error {
vers, err := versions(schema, true)
// Downgrade downgrade attrs to the schema version of url in place with schema.
func Downgrade(schema *ast.Schema, url string, attrs []attribute.KeyValue) error {
min, err := version(url)
if err != nil {
return fmt.Errorf("downgrade error: %w", err)
}

vers, err := versions(schema, min, true)
if err != nil {
return fmt.Errorf("downgrade error: %w", err)
}
Expand All @@ -79,7 +84,8 @@ func Downgrade(schema *ast.Schema, attrs []attribute.KeyValue) error {
return nil
}

func versions(schema *ast.Schema, reverse bool) ([]types.TelemetryVersion, error) {
// versions returns the sorted versions contained in schema.
func versions(schema *ast.Schema, min *semver.Version, reverse bool) ([]types.TelemetryVersion, error) {
// The transformations specified in each version are applied one by one.
// Order the versions to ensure correct application.
versions := make([]*semver.Version, 0, len(schema.Versions))
Expand All @@ -99,6 +105,9 @@ func versions(schema *ast.Schema, reverse bool) ([]types.TelemetryVersion, error

out := make([]types.TelemetryVersion, len(versions))
for i := range versions {
if min != nil && min.GreaterThan(versions[i]) {
continue
}
out[i] = types.TelemetryVersion(versions[i].String())
}
return out, nil
Expand Down
82 changes: 82 additions & 0 deletions sdk/resource/resource.go
Expand Up @@ -21,6 +21,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource/internal/schema"
)

// Resource describes an entity about which identifying information
Expand Down Expand Up @@ -127,6 +128,62 @@ func (r *Resource) SchemaURL() string {
return r.schemaURL
}

// WithSchemaURL returns a copy of r with the schema URL set to url and all
// attributes transformed based on the associated schema. If the schema
// transformation fails, or url is empty, an error is returned.
func (r *Resource) WithSchemaURL(ctx context.Context, url string) (*Resource, error) {
return r.withSchemaURL(ctx, schema.NewRegistry(nil), url)
}

func (r *Resource) withSchemaURL(ctx context.Context, reg *schema.Registry, url string) (*Resource, error) {
if url == "" {
return nil, errors.New(`invalid schema url: ""`)
}

if r == nil || r.attrs.Len() == 0 {
return NewWithAttributes(url), nil
}

if r.schemaURL == url {
// Resources are immutable, just return the ptr to the same value.
return r, nil
}

comp, err := schema.CompareVersions(r.schemaURL, url)
if err != nil {
return nil, err
}
switch comp {
case schema.EqualTo:
// Resources are immutable, just return the ptr to the same value.
return r, nil
case schema.LessThan:
s, err := reg.Get(ctx, url)
if err != nil {
return nil, err
}
attrs := r.Attributes()
err = schema.Upgrade(s, attrs)
if err != nil {
return nil, err
}
return NewWithAttributes(url, attrs...), nil
case schema.GreaterThan:
s, err := reg.Get(ctx, r.schemaURL)
if err != nil {
return nil, err
}
attrs := r.Attributes()
err = schema.Downgrade(s, url, attrs)
if err != nil {
return nil, err
}
return NewWithAttributes(url, attrs...), nil
default:
panic("unknown schema URL comparison")
}
}

// Iter returns an iterator of the Resource attributes.
// This is ideal to use if you do not want a copy of the attributes.
func (r *Resource) Iter() attribute.Iterator {
Expand Down Expand Up @@ -192,6 +249,31 @@ func Merge(a, b *Resource) (*Resource, error) {
return merged, nil
}

// MergeAt creates a new resource by combining resources at the target
// schemaURL version.
//
// If there are common keys between resources the latter resource will
// overwrite the former.
//
// Any of the resources not already at schemaURL version will be appropriately
// upgraded or downgraded to match the version. An error is returned if this is
// not possible.
func MergeAt(ctx context.Context, schemaURL string, resources ...*Resource) (*Resource, error) {
reg := schema.NewRegistry(nil)
merged := NewWithAttributes(schemaURL)
for _, r := range resources {
versioned, err := r.withSchemaURL(ctx, reg, schemaURL)
if err != nil {
return nil, err
}
merged, err = Merge(merged, versioned)
if err != nil {
return nil, err
}
}
return merged, nil
}

// Empty returns an instance of Resource with no attributes. It is
// equivalent to a `nil` Resource.
func Empty() *Resource {
Expand Down

0 comments on commit a8bc6f8

Please sign in to comment.