Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable placement timeout with no peers #7418

Merged
merged 12 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 0 additions & 18 deletions pkg/actors/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
)

const (
defaultInitialDelay = time.Second * 1
defaultFailureThreshold = 2
defaultRequestTimeout = time.Second * 2
defaultHealthyStateInterval = time.Second * 3
Expand All @@ -39,7 +38,6 @@ var healthLogger = logger.NewLogger("actorshealth")
type options struct {
client *http.Client
address string
initialDelay time.Duration
requestTimeout time.Duration
failureThreshold int
healthyStateInterval time.Duration
Expand All @@ -59,7 +57,6 @@ type Checker struct {
ch chan bool

address string
initialDelay time.Duration
requestTimeout time.Duration
failureThreshold int
healthyStateInterval time.Duration
Expand All @@ -75,7 +72,6 @@ type Checker struct {
func New(opts ...Option) (*Checker, error) {
options := &options{
failureThreshold: defaultFailureThreshold,
initialDelay: defaultInitialDelay,
requestTimeout: defaultRequestTimeout,
successStatusCode: defaultSuccessStatusCode,
healthyStateInterval: defaultHealthyStateInterval,
Expand All @@ -99,7 +95,6 @@ func New(opts ...Option) (*Checker, error) {
ch: make(chan bool),
client: options.client,
address: options.address,
initialDelay: options.initialDelay,
requestTimeout: options.requestTimeout,
failureThreshold: options.failureThreshold,
healthyStateInterval: options.healthyStateInterval,
Expand All @@ -126,12 +121,6 @@ func (c *Checker) Run(ctx context.Context) {
cancel()
}()

select {
case <-ctx.Done():
return
case <-c.clock.After(c.initialDelay):
}

for {
c.doUnHealthyStateCheck(ctx)
c.doHealthyStateCheck(ctx)
Expand Down Expand Up @@ -248,13 +237,6 @@ func WithAddress(address string) Option {
}
}

// WithInitialDelay sets the initial delay for the health check.
func WithInitialDelay(delay time.Duration) Option {
return func(o *options) {
o.initialDelay = delay
}
}

// WithFailureThreshold sets the failure threshold for the health check.
func WithFailureThreshold(threshold int) Option {
return func(o *options) {
Expand Down
30 changes: 5 additions & 25 deletions pkg/actors/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@ func TestApplyOptions(t *testing.T) {
require.NoError(t, err)

assert.Equal(t, defaultFailureThreshold, 2)
assert.Equal(t, defaultInitialDelay, time.Second)
assert.Equal(t, defaultHealthyStateInterval, time.Second*3)
assert.Equal(t, defaultUnHealthyStateInterval, time.Second/2)
assert.Equal(t, defaultRequestTimeout, time.Second*2)
assert.Equal(t, defaultSuccessStatusCode, 200)

assert.Equal(t, "http://localhost:8080", checker.address)
assert.Equal(t, defaultFailureThreshold, checker.failureThreshold)
assert.Equal(t, defaultInitialDelay, checker.initialDelay)
assert.Equal(t, defaultHealthyStateInterval, checker.healthyStateInterval)
assert.Equal(t, defaultUnHealthyStateInterval, checker.unhealthyStateInterval)
assert.Equal(t, defaultRequestTimeout, checker.requestTimeout)
Expand All @@ -59,7 +57,6 @@ func TestApplyOptions(t *testing.T) {
checker, err := New(
WithAddress("http://localhost:8081"),
WithFailureThreshold(10),
WithInitialDelay(time.Second*11),
WithHealthyStateInterval(time.Second*12),
WithUnHealthyStateInterval(time.Second*15),
WithRequestTimeout(time.Second*13),
Expand All @@ -69,7 +66,6 @@ func TestApplyOptions(t *testing.T) {

assert.Equal(t, "http://localhost:8081", checker.address)
assert.Equal(t, 10, checker.failureThreshold)
assert.Equal(t, time.Second*11, checker.initialDelay)
assert.Equal(t, time.Second*12, checker.healthyStateInterval)
assert.Equal(t, time.Second*15, checker.unhealthyStateInterval)
assert.Equal(t, time.Second*13, checker.requestTimeout)
Expand All @@ -78,27 +74,12 @@ func TestApplyOptions(t *testing.T) {
}

func TestResponses(t *testing.T) {
t.Run("before initial interval should have no signal", func(t *testing.T) {
ch, clock, ts := testChecker(t,
WithInitialDelay(time.Second*2),
)

ts.statusCode.Store(200)
assert.Eventually(t, clock.HasWaiters, time.Second, time.Millisecond*10)
clock.Step(time.Second)
assertNoHealthSignal(t, clock, ch)

assert.Equal(t, int64(0), ts.numberOfCalls.Load())
})

t.Run("default success status", func(t *testing.T) {
ch, clock, ts := testChecker(t,
WithInitialDelay(0),
200,
WithFailureThreshold(2),
)

ts.statusCode.Store(200)

clock.Step(1)
assert.True(t, assertHealthSignal(t, clock, ch))

Expand All @@ -119,12 +100,11 @@ func TestResponses(t *testing.T) {

t.Run("custom success status", func(t *testing.T) {
ch, clock, ts := testChecker(t,
WithInitialDelay(0),
201,
WithFailureThreshold(1),
WithSuccessStatusCode(201),
)

ts.statusCode.Store(201)
clock.Step(1)
assert.True(t, assertHealthSignal(t, clock, ch))
assert.Equal(t, int64(1), ts.numberOfCalls.Load())
Expand All @@ -138,11 +118,10 @@ func TestResponses(t *testing.T) {

t.Run("test app recovery", func(t *testing.T) {
ch, clock, ts := testChecker(t,
WithInitialDelay(0),
200,
WithFailureThreshold(1),
)

ts.statusCode.Store(200)
clock.Step(1)
assert.True(t, assertHealthSignal(t, clock, ch))
assert.Equal(t, int64(1), ts.numberOfCalls.Load())
Expand All @@ -161,8 +140,9 @@ func TestResponses(t *testing.T) {
})
}

func testChecker(t *testing.T, opts ...Option) (<-chan bool, *clocktesting.FakeClock, *testServer) {
func testChecker(t *testing.T, initialCode int64, opts ...Option) (<-chan bool, *clocktesting.FakeClock, *testServer) {
ts := new(testServer)
ts.statusCode.Store(initialCode)
server := httptest.NewServer(ts)
t.Cleanup(server.Close)

Expand Down
38 changes: 27 additions & 11 deletions pkg/placement/raft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,33 @@
// Setup Raft configuration.
if config == nil {
// Set default configuration for raft
s.config = &raft.Config{
ProtocolVersion: raft.ProtocolVersionMax,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 500 * time.Millisecond,

if len(s.peers) == 1 {
s.config = &raft.Config{
ProtocolVersion: raft.ProtocolVersionMax,
HeartbeatTimeout: 5 * time.Millisecond,
ElectionTimeout: 5 * time.Millisecond,
CommitTimeout: 5 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 5 * time.Millisecond,

Check warning on line 227 in pkg/placement/raft/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/raft/server.go#L216-L227

Added lines #L216 - L227 were not covered by tests
}
} else {
s.config = &raft.Config{
ProtocolVersion: raft.ProtocolVersionMax,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 500 * time.Millisecond,

Check warning on line 240 in pkg/placement/raft/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/placement/raft/server.go#L229-L240

Added lines #L229 - L240 were not covered by tests
}
}
} else {
s.config = config
Expand Down