Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: unistack-org/micro
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v3.3.2
Choose a base ref
...
head repository: unistack-org/micro
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v3.3.3
Choose a head ref
  • 3 commits
  • 5 files changed
  • 1 contributor

Commits on Mar 25, 2021

  1. add SkipEndpoints for wrappers

    Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
    vtolstov committed Mar 25, 2021
    Copy the full SHA
    6189a1b View commit details

Commits on Mar 26, 2021

  1. client: add TLSConfig option

    Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
    vtolstov committed Mar 26, 2021
    Copy the full SHA
    040fc45 View commit details
  2. meter/handler: regen

    Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
    vtolstov committed Mar 26, 2021
    Copy the full SHA
    ed61cad View commit details
Showing with 133 additions and 10 deletions.
  1. +19 −0 client/options.go
  2. +55 −0 logger/wrapper/wrapper.go
  3. +3 −2 meter/handler/handler_micro_http.pb.go
  4. +16 −8 meter/wrapper/wrapper.go
  5. +40 −0 tracer/wrapper/wrapper.go
19 changes: 19 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package client

import (
"context"
"crypto/tls"
"time"

"github.com/unistack-org/micro/v3/broker"
@@ -52,6 +53,8 @@ type Options struct {
PoolSize int
// PoolTTL connection pool ttl
PoolTTL time.Duration
// TLSConfig specifies tls.Config for secure connection
TLSConfig *tls.Config
}

// NewCallOptions creates new call options struct
@@ -312,6 +315,22 @@ func Lookup(l LookupFunc) Option {
}
}

// TLSConfig specifies a *tls.Config
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
// set the internal tls
o.TLSConfig = t

// set the default transport if one is not
// already set. Required for Init call below.

// set the transport tls
o.Transport.Init(
transport.TLSConfig(t),
)
}
}

// Retries sets the retry count when making the request.
func Retries(i int) Option {
return func(o *Options) {
55 changes: 55 additions & 0 deletions logger/wrapper/wrapper.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package wrapper

import (
"context"
"fmt"

"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/logger"
@@ -57,6 +58,8 @@ var (
}
return labels
}

DefaultSkipEndpoints = []string{"Meter.Metrics"}
)

type lWrapper struct {
@@ -94,6 +97,8 @@ type Options struct {
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}

// Option func signature
@@ -110,6 +115,7 @@ func NewOptions(opts ...Option) Options {
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}

for _, o := range opts {
@@ -182,9 +188,23 @@ func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
}
}

// SkipEndpoins
func SkipEndpoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}

func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
err := l.Client.Call(ctx, req, rsp, opts...)

endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}

if !l.opts.Enabled {
return err
}
@@ -205,6 +225,13 @@ func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}
func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
stream, err := l.Client.Stream(ctx, req, opts...)

endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return stream, err
}
}

if !l.opts.Enabled {
return stream, err
}
@@ -225,6 +252,13 @@ func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...clien
func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
err := l.Client.Publish(ctx, msg, opts...)

endpoint := msg.Topic()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}

if !l.opts.Enabled {
return err
}
@@ -245,6 +279,13 @@ func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...clie
func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
err := l.serverHandler(ctx, req, rsp)

endpoint := req.Endpoint()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}

if !l.opts.Enabled {
return err
}
@@ -265,6 +306,13 @@ func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp in
func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
err := l.serverSubscriber(ctx, msg)

endpoint := msg.Topic()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}

if !l.opts.Enabled {
return err
}
@@ -309,6 +357,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper {
func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
err := l.clientCallFunc(ctx, addr, req, rsp, opts)

endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}

if !l.opts.Enabled {
return err
}
5 changes: 3 additions & 2 deletions meter/handler/handler_micro_http.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 16 additions & 8 deletions meter/wrapper/wrapper.go
Original file line number Diff line number Diff line change
@@ -34,16 +34,18 @@ var (
)

type Options struct {
Meter meter.Meter
lopts []meter.Option
Meter meter.Meter
lopts []meter.Option
SkipEndpoints []string
}

type Option func(*Options)

func NewOptions(opts ...Option) Options {
options := Options{
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5),
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
@@ -75,6 +77,12 @@ func Meter(m meter.Meter) Option {
}
}

func SkipEndoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}

type wrapper struct {
client.Client
callFunc client.CallFunc
@@ -103,7 +111,7 @@ func NewCallWrapper(opts ...Option) client.CallWrapper {

func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range DefaultSkipEndpoints {
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.callFunc(ctx, addr, req, rsp, opts)
}
@@ -130,7 +138,7 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,

func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range DefaultSkipEndpoints {
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Call(ctx, req, rsp, opts...)
}
@@ -158,7 +166,7 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},

func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range DefaultSkipEndpoints {
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return w.Client.Stream(ctx, req, opts...)
}
@@ -217,7 +225,7 @@ func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint()
for _, ep := range DefaultSkipEndpoints {
for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint {
return fn(ctx, req, rsp)
}
40 changes: 40 additions & 0 deletions tracer/wrapper/wrapper.go
Original file line number Diff line number Diff line change
@@ -101,6 +101,8 @@ var (
}
sp.SetLabels(labels...)
}

DefaultSkipEndpoints = []string{"Meter.Metrics"}
)

type tWrapper struct {
@@ -134,6 +136,8 @@ type Options struct {
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}

// Option func signature
@@ -149,6 +153,7 @@ func NewOptions(opts ...Option) Options {
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}

for _, o := range opts {
@@ -165,6 +170,13 @@ func WithTracer(t tracer.Tracer) Option {
}
}

// SkipEndponts
func SkipEndpoins(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}

// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
@@ -208,6 +220,13 @@ func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
}

func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Call(ctx, req, rsp, opts...)
}
}

sp := tracer.SpanFromContext(ctx)
defer sp.Finish()

@@ -221,6 +240,13 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
}

func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Stream(ctx, req, opts...)
}
}

sp := tracer.SpanFromContext(ctx)
defer sp.Finish()

@@ -247,6 +273,13 @@ func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...cli
}

func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint()
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.serverHandler(ctx, req, rsp)
}
}

sp := tracer.SpanFromContext(ctx)
defer sp.Finish()

@@ -297,6 +330,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper {
}

func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
}
}

sp := tracer.SpanFromContext(ctx)
defer sp.Finish()