Skip to content

Commit

Permalink
Merge pull request #119557 from p0lyn0mial/upstream-watch-list-etcd-s…
Browse files Browse the repository at this point in the history
…tore

Add support for API streaming to the etcd store implementation
  • Loading branch information
k8s-ci-robot committed Sep 25, 2023
2 parents c6bb90a + ca562fd commit fcfc2d0
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 32 deletions.
11 changes: 11 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ type event struct {
isDeleted bool
isCreated bool
isProgressNotify bool
// isInitialEventsEndBookmark helps us keep track
// of whether we have sent an annotated bookmark event.
//
// when this variable is set to true,
// a special annotation will be added
// to the bookmark event.
//
// note that we decided to extend the event
// struct field to eliminate contention
// between startWatching and processEvent
isInitialEventsEndBookmark bool
}

// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
Expand Down
20 changes: 5 additions & 15 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/storage"
Expand Down Expand Up @@ -112,12 +111,11 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
pathPrefix += "/"
}

// TODO(p0lyn0mial): pass newListFunc and resourcePrefix to the watcher
w := &watcher{
client: c,
codec: codec,
groupResource: groupResource,
newFunc: newFunc,
groupResource: groupResource,
versioner: versioner,
transformer: transformer,
}
Expand All @@ -126,7 +124,6 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
} else {
w.objectType = reflect.TypeOf(newFunc()).String()
}

s := &store{
client: c,
codec: codec,
Expand All @@ -139,6 +136,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}

w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
}
return s
}

Expand Down Expand Up @@ -855,18 +856,7 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
}

// Watch implements storage.Interface.Watch.
// TODO(#115478): In order to graduate the WatchList feature to beta, the etcd3 implementation must/should also support it.
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
// it is safe to skip SendInitialEvents if the request is backward compatible
// see https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260
compatibility := opts.Predicate.AllowWatchBookmarks == false && (opts.ResourceVersion == "" || opts.ResourceVersion == "0")
if opts.SendInitialEvents != nil && !compatibility {
return nil, apierrors.NewInvalid(
schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource},
"",
field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is unsupported by an etcd cluster")},
)
}
preparedKey, err := s.prepareKey(key)
if err != nil {
return nil, err
Expand Down
130 changes: 114 additions & 16 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,21 @@ import (
"sync"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"

clientv3 "go.etcd.io/etcd/client/v3"

"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -67,13 +68,14 @@ func TestOnlySetFatalOnDecodeError(b bool) {
}

type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
transformer value.Transformer
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
transformer value.Transformer
getCurrentStorageRV func(context.Context) (uint64, error)
}

// watchChan implements watch.Interface.
Expand Down Expand Up @@ -105,8 +107,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage
if opts.ProgressNotify && w.newFunc == nil {
return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
}
wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate)
go wc.run()
startWatchRV, err := w.getStartWatchResourceVersion(ctx, rev, opts)
if err != nil {
return nil, err
}
wc := w.createWatchChan(ctx, key, startWatchRV, opts.Recursive, opts.ProgressNotify, opts.Predicate)
go wc.run(isInitialEventsEndBookmarkRequired(opts), areInitialEventsRequired(rev, opts))

// For etcd watch we don't have an easy way to answer whether the watch
// has already caught up. So in the initial version (given that watchcache
Expand Down Expand Up @@ -138,6 +144,62 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
return wc
}

// getStartWatchResourceVersion returns a ResourceVersion
// the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
// - start at Exact (return resourceVersion)
// - start at Most Recent (return an RV from etcd)
func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) {
if resourceVersion > 0 {
return resourceVersion, nil
}
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return 0, nil
}
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
// note that when opts.SendInitialEvents=true
// we will be issuing a consistent LIST request
// against etcd followed by the special bookmark event
return 0, nil
}
// at this point the clients is interested
// only in getting a stream of events
// starting at the MostRecent point in time (RV)
currentStorageRV, err := w.getCurrentStorageRV(ctx)
if err != nil {
return 0, err
}
// currentStorageRV is taken from resp.Header.Revision (int64)
// and cast to uint64, so it is safe to do reverse
// at some point we should unify the interface but that
// would require changing Versioner.UpdateList
return int64(currentStorageRV), nil
}

// isInitialEventsEndBookmarkRequired since there is no way to directly set
// opts.ProgressNotify from the API and the etcd3 impl doesn't support
// notification for external clients we simply return initialEventsEndBookmarkRequired
// to only send the bookmark event after the initial list call.
//
// see: https://github.com/kubernetes/kubernetes/issues/120348
func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false
}
return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks
}

// areInitialEventsRequired returns true if all events from the etcd should be returned.
func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool {
if opts.SendInitialEvents == nil && resourceVersion == 0 {
return true // legacy case
}
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false
}
return opts.SendInitialEvents != nil && *opts.SendInitialEvents
}

type etcdError interface {
Code() grpccodes.Code
Error() string
Expand All @@ -163,9 +225,9 @@ func isCancelError(err error) bool {
return false
}

func (wc *watchChan) run() {
func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)

var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
Expand Down Expand Up @@ -284,14 +346,44 @@ func logWatchChannelErr(err error) {
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
//
// initialEventsEndBookmarkSent helps us keep track
// of whether we have sent an annotated bookmark event.
//
// it's important to note that we don't
// need to track the actual RV because
// we only send the bookmark event
// after the initial list call.
//
// when this variable is set to false,
// it means we don't have any specific
// preferences for delivering bookmark events.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEndBookmarkRequired, forceInitialEvents bool) {
if wc.initialRev > 0 && forceInitialEvents {
currentStorageRV, err := wc.watcher.getCurrentStorageRV(wc.ctx)
if err != nil {
wc.sendError(err)
return
}
if uint64(wc.initialRev) > currentStorageRV {
wc.sendError(storage.NewTooLargeResourceVersionError(uint64(wc.initialRev), currentStorageRV, int(wait.Jitter(1*time.Second, 3).Seconds())))
return
}
}
if forceInitialEvents {
if err := wc.sync(); err != nil {
klog.Errorf("failed to sync with latest state: %v", err)
wc.sendError(err)
return
}
}
if initialEventsEndBookmarkRequired {
wc.sendEvent(func() *event {
e := progressNotifyEvent(wc.initialRev)
e.isInitialEventsEndBookmark = true
return e
}())
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
Expand Down Expand Up @@ -388,6 +480,12 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
klog.Errorf("failed to propagate object version: %v", err)
return nil
}
if e.isInitialEventsEndBookmark {
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
return nil
}
}
res = &watch.Event{
Type: watch.Bookmark,
Object: object,
Expand Down
62 changes: 61 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
clientv3 "go.etcd.io/etcd/client/v3"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/features"
Expand All @@ -35,6 +39,7 @@ import (
storagetesting "k8s.io/apiserver/pkg/storage/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/ptr"
)

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -123,6 +128,16 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}

func TestEtcdWatchSemantics(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
}

func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
}

// =======================================================================
// Implementation-specific tests are following.
// The following tests are exercising the details of the implementation
Expand All @@ -145,7 +160,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
w.run()
w.run(false, true)
wg.Done()
}()
w.errChan <- fmt.Errorf("some error")
Expand Down Expand Up @@ -194,6 +209,51 @@ func TestWatchErrorIncorrectConfiguration(t *testing.T) {
}
}

func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
origCtx, store, _ := testSetup(t)
ctx, cancel := context.WithCancel(origCtx)
defer cancel()
requestOpts := storage.ListOptions{
SendInitialEvents: ptr.To(true),
Recursive: true,
Predicate: storage.SelectionPredicate{
Field: fields.Everything(),
Label: labels.Everything(),
AllowWatchBookmarks: true,
},
}
var expectedErr *apierrors.StatusError
if !errors.As(storage.NewTooLargeResourceVersionError(uint64(102), 1, 0), &expectedErr) {
t.Fatalf("Unable to convert NewTooLargeResourceVersionError to apierrors.StatusError")
}

w, err := store.watcher.Watch(ctx, "/abc", int64(102), requestOpts)
if err != nil {
t.Fatal(err)
}
defer w.Stop()

actualEvent := <-w.ResultChan()
if actualEvent.Type != watch.Error {
t.Fatalf("Unexpected type of the event: %v, expected: %v", actualEvent.Type, watch.Error)
}
actualErr, ok := actualEvent.Object.(*metav1.Status)
if !ok {
t.Fatalf("Expected *apierrors.StatusError, got: %#v", actualEvent.Object)
}

if actualErr.Details.RetryAfterSeconds <= 0 {
t.Fatalf("RetryAfterSeconds must be > 0, actual value: %v", actualErr.Details.RetryAfterSeconds)
}
// rewrite the Details as it contains retry seconds
// and validate the whole struct
expectedErr.ErrStatus.Details = actualErr.Details
if diff := cmp.Diff(*actualErr, expectedErr.ErrStatus); diff != "" {
t.Fatalf("Unexpected error returned, diff: %v", diff)
}
}

func TestWatchChanSync(t *testing.T) {
testCases := []struct {
name string
Expand Down

0 comments on commit fcfc2d0

Please sign in to comment.