Skip to content

Commit

Permalink
Fan out events in async mode for async recordings.
Browse files Browse the repository at this point in the history
This commit fixes #4695.

Teleport in async recording mode sends all events to disk,
and uploads them to the server later.

It uploads some events synchronously to the audit log so
they show up in the global event log right away.

However if the auth server is slow, the fanout blocks the session.

This commit makes the fanout of some events to be fast,
but nonblocking and never fail so sessions will not hang
unless the disk writes hang.

It also adds ability to debug GRPC connection state
when running in debug mode.

To start sending GRPC connection state logs,
set environment variables:

GRPC_GO_LOG_SEVERITY_LEVEL=info GRPC_GO_LOG_VERBOSITY_LEVEL=99 teleport start -d
  • Loading branch information
klizhentas committed Nov 6, 2020
1 parent ecb165f commit 0082311
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 8 deletions.
3 changes: 3 additions & 0 deletions lib/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ var (
// connections. These pings are needed to avoid timeouts on load balancers
// that don't respect TCP keep-alives.
SPDYPingPeriod = 30 * time.Second

// AsyncBufferSize is a default buffer size for async emitters
AsyncBufferSize = 1024
)

// Default connection limits, they can be applied separately on any of the Teleport
Expand Down
82 changes: 81 additions & 1 deletion lib/events/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"

Expand All @@ -31,6 +32,86 @@ import (
log "github.com/sirupsen/logrus"
)

// AsyncEmitterConfig provides parameters for emitter
type AsyncEmitterConfig struct {
// Inner emits events to the underlying store
Inner Emitter
// BufferSize is a default buffer size for emitter
BufferSize int
}

// CheckAndSetDefaults checks and sets default values
func (c *AsyncEmitterConfig) CheckAndSetDefaults() error {
if c.Inner == nil {
return trace.BadParameter("missing parameter Inner")
}
if c.BufferSize == 0 {
c.BufferSize = defaults.AsyncBufferSize
}
return nil
}

// NewAsyncEmitter returns emitter that submits events
// without blocking the caller. It will start loosing events
// on buffer overflow.
func NewAsyncEmitter(cfg AsyncEmitterConfig) (*AsyncEmitter, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
ctx, cancel := context.WithCancel(context.Background())
a := &AsyncEmitter{
cancel: cancel,
ctx: ctx,
eventsCh: make(chan AuditEvent, cfg.BufferSize),
cfg: cfg,
}
go a.forward()
return a, nil
}

// AsyncEmitter accepts events to a buffered channel and emits
// events in a separate goroutine without blocking the caller.
type AsyncEmitter struct {
cfg AsyncEmitterConfig
eventsCh chan AuditEvent
cancel context.CancelFunc
ctx context.Context
}

// Close closes emitter and cancels all in flight events.
func (a *AsyncEmitter) Close() error {
a.cancel()
return nil
}

func (a *AsyncEmitter) forward() {
for {
select {
case <-a.ctx.Done():
return
case event := <-a.eventsCh:
err := a.cfg.Inner.EmitAuditEvent(a.ctx, event)
if err != nil {
log.WithError(err).Errorf("Failed to emit audit event.")
}
}
}
}

// EmitAuditEvent emits audit event without blocking the caller. It will start
// loosing events on buffer overflow, but it never fails.
func (a *AsyncEmitter) EmitAuditEvent(ctx context.Context, event AuditEvent) error {
select {
case a.eventsCh <- event:
return nil
case <-ctx.Done():
return trace.ConnectionProblem(ctx.Err(), "context canceled or closed")
default:
log.Errorf("Failed to emit audit event %v(%v). This server's connection to the auth service appears to be slow.", event.GetType(), event.GetCode())
return nil
}
}

// CheckingEmitterConfig provides parameters for emitter
type CheckingEmitterConfig struct {
// Inner emits events to the underlying store
Expand Down Expand Up @@ -405,7 +486,6 @@ func (t *TeeStreamer) CreateAuditStream(ctx context.Context, sid session.ID) (St
return nil, trace.Wrap(err)
}
return &TeeStream{stream: stream, emitter: t.Emitter}, nil

}

// ResumeAuditStream resumes audit event stream
Expand Down
108 changes: 108 additions & 0 deletions lib/events/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)

// TestProtoStreamer tests edge cases of proto streamer implementation
Expand Down Expand Up @@ -129,3 +131,109 @@ func TestWriterEmitter(t *testing.T) {
assert.Contains(t, scanner.Text(), events[i].GetCode())
}
}

func TestAsyncEmitter(t *testing.T) {
clock := clockwork.NewRealClock()
events := GenerateTestSession(SessionParams{PrintEvents: 20})

// Slow tests that async emitter does not block
// on slow emitters
t.Run("Slow", func(t *testing.T) {
emitter, err := NewAsyncEmitter(AsyncEmitterConfig{
Inner: &slowEmitter{clock: clock, timeout: time.Hour},
})
assert.NoError(t, err)
defer emitter.Close()
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
err = emitter.EmitAuditEvent(ctx, events[0])
assert.NoError(t, err)
assert.NoError(t, ctx.Err())
})

// Receive makes sure all events are recevied in the same order as they are sent
t.Run("Receive", func(t *testing.T) {
chanEmitter := &channelEmitter{eventsCh: make(chan AuditEvent, len(events))}
emitter, err := NewAsyncEmitter(AsyncEmitterConfig{
Inner: chanEmitter,
})

assert.NoError(t, err)
defer emitter.Close()
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()
for _, event := range events {
err := emitter.EmitAuditEvent(ctx, event)
assert.NoError(t, err)
}

for i := 0; i < len(events); i++ {
select {
case event := <-chanEmitter.eventsCh:
assert.Equal(t, events[i], event)
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}
})

// Close makes sure that close cancels operations and context
t.Run("Close", func(t *testing.T) {
counter := &counterEmitter{}
emitter, err := NewAsyncEmitter(AsyncEmitterConfig{
Inner: counter,
BufferSize: len(events),
})
assert.NoError(t, err)

ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

for i := 0; i < len(events); i++ {
go emitter.EmitAuditEvent(ctx, events[i])
}

// context will not wait until all events have been submitted
emitter.Close()
assert.True(t, int(counter.count.Load()) <= len(events))

// make sure context is done to prevent context leaks
select {
case <-emitter.ctx.Done():
default:
t.Fatalf("Context leak, should be closed")
}
})
}

type slowEmitter struct {
clock clockwork.Clock
timeout time.Duration
}

func (s *slowEmitter) EmitAuditEvent(ctx context.Context, event AuditEvent) error {
<-s.clock.After(s.timeout)
return nil
}

type counterEmitter struct {
count atomic.Int64
}

func (c *counterEmitter) EmitAuditEvent(ctx context.Context, event AuditEvent) error {
c.count.Inc()
return nil
}

type channelEmitter struct {
eventsCh chan AuditEvent
}

func (c *channelEmitter) EmitAuditEvent(ctx context.Context, event AuditEvent) error {
select {
case <-ctx.Done():
return ctx.Err()
case c.eventsCh <- event:
return nil
}
}
12 changes: 9 additions & 3 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type ForwarderConfig struct {
Auth auth.Authorizer
// Client is a proxy client
Client auth.ClientI
// StreamEmitter is used to create audit streams
// and emit audit events
StreamEmitter events.StreamEmitter
// DataDir is a data dir to store logs
DataDir string
// Namespace is a namespace of the proxy server (not a K8s namespace)
Expand Down Expand Up @@ -107,6 +110,9 @@ func (f *ForwarderConfig) CheckAndSetDefaults() error {
if f.Tunnel == nil {
return trace.BadParameter("missing parameter Tunnel")
}
if f.StreamEmitter == nil {
return trace.BadParameter("missing parameter StreamEmitter")
}
if f.ClusterName == "" {
return trace.BadParameter("missing parameter LocalCluster")
}
Expand Down Expand Up @@ -463,7 +469,7 @@ func (f *Forwarder) newStreamer(ctx *authContext) (events.Streamer, error) {
// TeeStreamer sends non-print and non disk events
// to the audit log in async mode, while buffering all
// events on disk for further upload at the end of the session
return events.NewTeeStreamer(fileStreamer, f.Client), nil
return events.NewTeeStreamer(fileStreamer, f.StreamEmitter), nil
}

// exec forwards all exec requests to the target server, captures
Expand Down Expand Up @@ -551,7 +557,7 @@ func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Requ
}
}
} else {
emitter = f.Client
emitter = f.StreamEmitter
}

sess, err := f.getOrCreateClusterSession(*ctx)
Expand Down Expand Up @@ -747,7 +753,7 @@ func (f *Forwarder) portForward(ctx *authContext, w http.ResponseWriter, req *ht
if !success {
portForward.Code = events.PortForwardFailureCode
}
if err := f.Client.EmitAuditEvent(f.Context, portForward); err != nil {
if err := f.StreamEmitter.EmitAuditEvent(f.Context, portForward); err != nil {
f.WithError(err).Warn("Failed to emit event.")
}
}
Expand Down
63 changes: 63 additions & 0 deletions lib/service/override.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2020 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"io"
"io/ioutil"
"os"
"strconv"

// import to call client v3 init section
_ "go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/grpclog"
)

// setGRPCLogger sets GRPC logger to writer and sets
// severity using env variables from https://pkg.go.dev/google.golang.org/grpc/grpclog
//
// GRPC client logger is pretty verbose, so only call this in debug mode.
//
// etcd client overrides GRPC logger to discard in init
// section. That's why setGRPCLogger is called after etcd client's init
// call and sets it to writer, while taking GRPC standard
// environment variables into account.
func setDebugGRPCLogger(w io.Writer) {
errorW := ioutil.Discard
warningW := ioutil.Discard
infoW := ioutil.Discard

logLevel := os.Getenv("GRPC_GO_LOG_SEVERITY_LEVEL")

switch logLevel {
case "", "ERROR", "error": // If env is unset, set level to ERROR.
errorW = os.Stderr
case "WARNING", "warning":
warningW = os.Stderr
case "INFO", "info":
infoW = os.Stderr
}

var v int
vLevel := os.Getenv("GRPC_GO_LOG_VERBOSITY_LEVEL")
if vl, err := strconv.Atoi(vLevel); err == nil {
v = vl
}

l := grpclog.NewLoggerV2WithVerbosity(infoW, warningW, errorW, v)
grpclog.SetLoggerV2(l)
}

0 comments on commit 0082311

Please sign in to comment.