Skip to content

Commit

Permalink
Merge pull request #280 from travisn/stable-etcd
Browse files Browse the repository at this point in the history
Improve stability of internal etcd at startup
  • Loading branch information
jbw976 committed Dec 12, 2016
2 parents 0f5e282 + 89abc5d commit 0fc5408
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 35 deletions.
12 changes: 4 additions & 8 deletions pkg/etcdmgr/bootstrap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@ limitations under the License.
*/
package bootstrap

import (
"errors"
"fmt"
)
import "fmt"

// GetEtcdClients bootstraps an embedded etcd instance and returns a list of
// current etcd cluster's client URLs. (entrypoint, when it's used as a library)
func GetEtcdClients(configDir, token, ipAddr, nodeID string) ([]string, error) {

full, err, currentNodes := isQuorumFull(token)
//currentNodes, err := GetCurrentNodesFromDiscovery(token)
full, currentNodes, err := isQuorumFull(token)
if err != nil {
return []string{}, errors.New("error querying discovery service")
return nil, fmt.Errorf("error querying discovery service. %+v", err)
}
logger.Infof("current etcd cluster nodes: %+v", currentNodes)

Expand Down Expand Up @@ -58,7 +54,7 @@ func GetEtcdClients(configDir, token, ipAddr, nodeID string) ([]string, error) {
factory := EmbeddedEtcdFactory{}
ee, err := factory.NewEmbeddedEtcd(token, conf, true)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to start embedded etcd. %+v", err)
}

return ee.Server.Cluster().ClientURLs(), nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/etcdmgr/bootstrap/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ func (e *Context) KeysAPI() (client.KeysAPI, error) {
func (e *Context) Members() ([]string, types.URLsMap, error) {
urlsMap := types.URLsMap{}
var nodes []string
initialNodes, err := GetCurrentNodesFromDiscovery(e.ClusterToken)

size, err := getClusterSize(e.ClusterToken)
if err != nil {
return nodes, urlsMap, err
}

initialNodes, err := GetCurrentNodesFromDiscovery(e.ClusterToken, size)
if err != nil {
logger.Errorf("error in GetCurrentNodesFromDiscovery: %+v", err)
return nodes, urlsMap, err
Expand Down
127 changes: 102 additions & 25 deletions pkg/etcdmgr/bootstrap/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,79 @@ package bootstrap
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

"github.com/coreos/etcd/client"
"github.com/coreos/etcd/store"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
)

func isQuorumFull(token string) (bool, error, []string) {
res, err := queryDiscoveryService(token + "/_config/size")
const (
discoveryRetryAttempts = 10
)

func isQuorumFull(token string) (bool, []string, error) {
size, err := getClusterSize(token)
if err != nil {
return false, fmt.Errorf("cannot get discovery url cluster size: %v", err), []string{}
return false, nil, err
}
logger.Infof("cluster size is: %d", size)

size, _ := strconv.ParseInt(*res.Node.Value, 10, 16)
clusterSize := int(size)
logger.Infof("cluster max size is: %d", clusterSize)

currentNodes, _ := GetCurrentNodesFromDiscovery(token)
currentNodes, err := GetCurrentNodesFromDiscovery(token, size)
if err != nil {
return false, nil, fmt.Errorf("failed to get etcd members from discovery. %+v", err)
}
logger.Infof("currentNodes: %+v", currentNodes)
if len(currentNodes) < clusterSize {
return false, nil, []string{}
if len(currentNodes) < size {
return false, []string{}, nil
}
return true, nil, currentNodes
return true, currentNodes, nil

}

// queryDiscoveryService reads a key from a discovery url.
func queryDiscoveryService(token string) (*store.Event, error) {
ctx, _ := context.WithTimeout(context.Background(), DefaultClientTimeout)
resp, err := ctxhttp.Get(ctx, http.DefaultClient, token)
func getClusterSize(token string) (int, error) {
res, err := queryDiscoveryService(token + "/_config/size")
if err != nil {
return nil, err
return -1, fmt.Errorf("cannot get discovery url cluster size: %v", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("status code %d from %q: %s", resp.StatusCode, token, body)

size, err := strconv.ParseInt(*res.Node.Value, 10, 16)
if err != nil {
return -1, fmt.Errorf("failed to read cluster size. %+v", err)
}
return int(size), nil
}

// queryDiscoveryService reads a key from a discovery url.
func queryDiscoveryService(token string) (*store.Event, error) {
var resp *http.Response
var err error

// retry the http request in case of network errors
for i := 1; i <= discoveryRetryAttempts; i++ {
ctx, _ := context.WithTimeout(context.Background(), DefaultClientTimeout)
resp, err = ctxhttp.Get(ctx, http.DefaultClient, token)
if err == nil {
if resp.StatusCode == http.StatusOK {
break
}
resp.Body.Close()
}

logger.Warningf("failed to query discovery service on attempt %d/%d. code=%d, err=%+v", i, discoveryRetryAttempts, resp.StatusCode, err)

if i < discoveryRetryAttempts {
// delay an extra half second for each retry
delay := time.Duration(i) * 500 * time.Millisecond
<-time.After(delay)
}
}

defer resp.Body.Close()
var res store.Event
err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
Expand All @@ -70,32 +100,79 @@ func queryDiscoveryService(token string) (*store.Event, error) {
return &res, nil
}

func GetCurrentNodesFromDiscovery(token string) ([]string, error) {
// Get the nodes that have registered with the discovery service.
// We only want to return the number of nodes that are expected for the etcd cluster size.
// Etcd will not allow more etcd servers to start unless the AddMember api is called.
// Here we will ignore any nodes that have registered beyond the expected size.
// It is important to return the nodes with the lowest index for consistent behavior in the cluster.
func GetCurrentNodesFromDiscovery(token string, size int) ([]string, error) {
res, err := queryDiscoveryService(token)
if err != nil {
return nil, err
}

nodes := make([]string, 0, len(res.Node.Nodes))
var upperIndex uint64
var ignored []string
nodeMap := map[uint64][]string{}
for _, nn := range res.Node.Nodes {
if nn.Value == nil {
logger.Debugf("Skipping %q because no value exists", nn.Key)
}

n, err := newDiscoveryNode(*nn.Value, DefaultClientPort)
endpoints, err := newDiscoveryNode(*nn.Value, DefaultClientPort)
if err != nil {
logger.Warningf("invalid peer url %q in discovery service: %v", *nn.Value, err)
continue
}

for _, node := range n {
upperIndex, ignored = addNodeToMap(nodeMap, endpoints, size, nn.CreatedIndex, upperIndex, ignored)
}

if len(ignored) > 0 {
logger.Infof("Ignored extra etcd members: %+v", ignored)
}

// create a flat slice from all the nodes' endpoints
var nodes []string
for _, endpoints := range nodeMap {
for _, node := range endpoints {
nodes = append(nodes, node)
}
}

return nodes, nil
}

func addNodeToMap(nodeMap map[uint64][]string, endpoints []string, size int, index, upperIndex uint64, ignored []string) (uint64, []string) {
if len(nodeMap) < size {
// add the node to the expected list since we haven't reached the expected size yet
nodeMap[index] = endpoints
if upperIndex < index {
upperIndex = index
}
} else {
// if the created index is lower than this index, replace the higher index with the lower
if upperIndex > index {
ignored = append(ignored, nodeMap[upperIndex]...)
delete(nodeMap, upperIndex)
nodeMap[index] = endpoints

// find the highest index
upperIndex = 0
for i := range nodeMap {
if upperIndex < i {
upperIndex = i
}
}
} else {
// ignore nodes that registered after the quorum was full and the index is over the max
ignored = append(ignored, endpoints...)
}
}

return upperIndex, ignored
}

type Machine struct {
client.Member
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/etcdmgr/bootstrap/discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2016 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bootstrap

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSelectDiscoveredNodes(t *testing.T) {
var ignored []string
nodeMap := map[uint64][]string{}
upperIndex := uint64(0)
size := 1

// the first entry will be added because it's below size 1
index5 := uint64(5)
endpoints5 := []string{"http://10.1.1.5:5000"}
upperIndex, ignored = addNodeToMap(nodeMap, endpoints5, size, index5, upperIndex, ignored)
assert.Equal(t, index5, upperIndex)
assert.Equal(t, 1, len(nodeMap))
assert.Equal(t, endpoints5, nodeMap[index5])
assert.Equal(t, 0, len(ignored))

// the second entry is ignored since size is full and the index is higher
index6 := uint64(6)
endpoints6 := []string{"http://10.1.1.6:5000"}
upperIndex, ignored = addNodeToMap(nodeMap, endpoints6, size, index6, upperIndex, ignored)
assert.Equal(t, index5, upperIndex)
assert.Equal(t, 1, len(nodeMap))
assert.Equal(t, endpoints5, nodeMap[index5])
assert.Equal(t, 1, len(ignored))
assert.Equal(t, endpoints6[0], ignored[0])

// the third entry replaces the first since its index is lower
index4 := uint64(4)
endpoints4 := []string{"http://1.1.1.4:5000"}
upperIndex, ignored = addNodeToMap(nodeMap, endpoints4, size, index4, upperIndex, ignored)
assert.Equal(t, index4, upperIndex)
assert.Equal(t, 1, len(nodeMap))
assert.Equal(t, endpoints4, nodeMap[index4])
assert.Equal(t, 2, len(ignored))
assert.Equal(t, endpoints6[0], ignored[0])
assert.Equal(t, endpoints5[0], ignored[1])

// increase size to 3
ignored = []string{}
size = 3
upperIndex, ignored = addNodeToMap(nodeMap, endpoints6, size, index6, upperIndex, ignored)
upperIndex, ignored = addNodeToMap(nodeMap, endpoints5, size, index5, upperIndex, ignored)
assert.Equal(t, index6, upperIndex)
assert.Equal(t, 3, len(nodeMap))
assert.Equal(t, 0, len(ignored))
assert.Equal(t, index6, upperIndex)

// replace a node with a lower index
index3 := uint64(3)
endpoints3 := []string{"http://1.1.1.3:5000"}
upperIndex, ignored = addNodeToMap(nodeMap, endpoints3, size, index3, upperIndex, ignored)
assert.Equal(t, index5, upperIndex)
assert.Equal(t, 3, len(nodeMap))
assert.Equal(t, endpoints6[0], ignored[0])

// replace another node
index2 := uint64(2)
endpoints2 := []string{"http://1.1.1.2:5000"}
upperIndex, ignored = addNodeToMap(nodeMap, endpoints2, size, index2, upperIndex, ignored)
assert.Equal(t, index4, upperIndex)
assert.Equal(t, 3, len(nodeMap))
assert.Equal(t, 2, len(ignored))
}
2 changes: 1 addition & 1 deletion pkg/etcdmgr/bootstrap/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (e *EmbeddedEtcdFactory) NewEmbeddedEtcd(token string, conf *Config, newClu

instance.Server, err = etcdserver.NewServer(serverConfig)
if err != nil {
return nil, fmt.Errorf("error in creating etcd server. %+v", err)
return nil, err
}

instance.Server.Start()
Expand Down

0 comments on commit 0fc5408

Please sign in to comment.