Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for server enforcement of keepalive policy. #1147

Merged
merged 5 commits into from
Mar 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 17 additions & 6 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,40 @@ import (
// ClientParameters is used to set keepalive parameters on the client-side.
// These configure how the client will actively probe to notice when a connection broken
// and to cause activity so intermediaries are aware the connection is still in use.
// Make sure these parameters are set in coordination with the keepalive policy on the server,
// as incompatible settings can result in closing of connection.
type ClientParameters struct {
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
Time time.Duration // The current default value is infinity.
// After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration // The current default value is 20 seconds.
// If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
PermitWithoutStream bool // false by default.
}

// ServerParameters is used to set keepalive and max-age parameters on the server-side.
type ServerParameters struct {
// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
MaxConnectionIdle time.Duration
MaxConnectionIdle time.Duration // The current default value is infinity.
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
// A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
MaxConnectionAge time.Duration
MaxConnectionAge time.Duration // The current default value is infinity.
// MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
MaxConnectionAgeGrace time.Duration
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
Time time.Duration
Time time.Duration // The current default value is 2 hours.
// After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration
Timeout time.Duration // The current default value is 20 seconds.
}

// EnforcementPolicy is used to set keepalive enforcement policy on the server-side.
// Server will close connection with a client that violates this policy.
type EnforcementPolicy struct {
// MinTime is the minimum amount of time a client should wait before sending a keepalive ping.
MinTime time.Duration // The current default value is 5 minutes.
// If true, server expects keepalive pings even when there are no active streams(RPCs).
PermitWithoutStream bool // false by default.
}
9 changes: 9 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type options struct {
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
}

var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
Expand All @@ -133,6 +134,13 @@ func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
}
}

// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
return func(o *options) {
o.keepalivePolicy = kep
}
}

// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
Expand Down Expand Up @@ -479,6 +487,7 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
)

// The following defines various control items which could flow through
Expand Down Expand Up @@ -84,6 +85,8 @@ type resetStream struct {
func (*resetStream) item() {}

type goAway struct {
code http2.ErrCode
debugData []byte
}

func (*goAway) item() {}
Expand Down
3 changes: 3 additions & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,9 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}

func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
}
t.mu.Lock()
if t.state == reachable || t.state == draining {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
Expand Down
76 changes: 73 additions & 3 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ type http2Server struct {
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters

// Keepalive enforcement policy.
kep keepalive.EnforcementPolicy
// The time instance last ping was received.
lastPingAt time.Time
// Number of times the client has violated keepalive ping policy so far.
pingStrikes uint8
// Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent.
// 1 means yes.
resetPingStrikes uint32 // Accessed atomically.

mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
Expand Down Expand Up @@ -161,6 +172,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
kep := config.KeepalivePolicy
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
Expand All @@ -184,6 +199,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
Expand Down Expand Up @@ -504,13 +520,50 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
t.controlBuf.put(&settings{ack: true, ss: ss})
}

const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time.Hour
)

func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
return
}
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)

now := time.Now()
defer func() {
t.lastPingAt = now
}()
// A reset ping strikes means that we don't need to check for policy
// violation for this ping and the pingStrikes counter should be set
// to 0.
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
t.pingStrikes = 0
return
}
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after atleast defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
}
}

func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
Expand All @@ -529,6 +582,13 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
first := true
endHeaders := false
var err error
defer func() {
if err == nil {
// Reset ping strikes when seding headers since that might cause the
// peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
}()
// Sends the headers in a single batch.
for !endHeaders {
size := t.hBuf.Len()
Expand Down Expand Up @@ -672,7 +732,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s

// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
// TODO(zhaoq): Support multi-writers for a single stream.
var writeHeaderFrame bool
s.mu.Lock()
Expand All @@ -687,6 +747,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
if writeHeaderFrame {
t.WriteHeader(s, nil)
}
defer func() {
if err == nil {
// Reset ping strikes when sending data since this might cause
// the peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
}()
r := bytes.NewBuffer(data)
for {
if r.Len() == 0 {
Expand Down Expand Up @@ -892,7 +959,10 @@ func (t *http2Server) controller() {
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.code == http2.ErrCodeEnhanceYourCalm {
t.Close()
}
case *flushIO:
t.framer.flushWrite()
case *ping:
Expand Down Expand Up @@ -972,7 +1042,7 @@ func (t *http2Server) RemoteAddr() net.Addr {
}

func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{})
t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
}

var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down
1 change: 1 addition & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ type ServerConfig struct {
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
}

// NewServerTransport creates a ServerTransport with conn or non-nil error
Expand Down