-
Notifications
You must be signed in to change notification settings - Fork 214
/
protocol_retry.go
145 lines (121 loc) · 3.2 KB
/
protocol_retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package http
import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"go.uber.org/zap"
"github.com/cloudevents/sdk-go/v2/binding"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/protocol"
)
func (p *Protocol) do(ctx context.Context, req *http.Request) (binding.Message, error) {
params := cecontext.RetriesFrom(ctx)
switch params.Strategy {
case cecontext.BackoffStrategyConstant, cecontext.BackoffStrategyLinear, cecontext.BackoffStrategyExponential:
return p.doWithRetry(ctx, params, req)
case cecontext.BackoffStrategyNone:
fallthrough
default:
return p.doOnce(req)
}
}
func (p *Protocol) doOnce(req *http.Request) (binding.Message, protocol.Result) {
resp, err := p.Client.Do(req)
if err != nil {
return nil, protocol.NewReceipt(false, "%w", err)
}
var result protocol.Result
if resp.StatusCode/100 == 2 {
result = protocol.ResultACK
} else {
result = protocol.ResultNACK
}
return NewMessage(resp.Header, resp.Body), NewResult(resp.StatusCode, "%w", result)
}
func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParams, req *http.Request) (binding.Message, error) {
then := time.Now()
retry := 0
results := make([]protocol.Result, 0)
var (
body []byte
err error
)
if req != nil && req.Body != nil {
defer func() {
if err = req.Body.Close(); err != nil {
cecontext.LoggerFrom(ctx).Warnw("could not close request body", zap.Error(err))
}
}()
body, err = ioutil.ReadAll(req.Body)
if err != nil {
panic(err)
}
resetBody(req, body)
}
for {
msg, result := p.doOnce(req)
// Fast track common case.
if protocol.IsACK(result) {
return msg, NewRetriesResult(result, retry, then, results)
}
// Try again?
//
// Make sure the error was something we should retry.
{
var uErr *url.Error
if errors.As(result, &uErr) {
goto DoBackoff
}
}
{
var httpResult *Result
if errors.As(result, &httpResult) {
sc := httpResult.StatusCode
if p.isRetriableFunc(sc) {
// retry!
goto DoBackoff
} else {
// Permanent error
cecontext.LoggerFrom(ctx).Debugw("status code not retryable, will not try again",
zap.Error(httpResult),
zap.Int("statusCode", sc))
return msg, NewRetriesResult(result, retry, then, results)
}
}
}
DoBackoff:
resetBody(req, body)
// Wait for the correct amount of backoff time.
// total tries = retry + 1
if err := params.Backoff(ctx, retry+1); err != nil {
// do not try again.
cecontext.LoggerFrom(ctx).Debugw("backoff error, will not try again", zap.Error(err))
return msg, NewRetriesResult(result, retry, then, results)
}
retry++
results = append(results, result)
}
}
// reset body to allow it to be read multiple times, e.g. when retrying http
// requests
func resetBody(req *http.Request, body []byte) {
if req == nil || req.Body == nil {
return
}
req.Body = ioutil.NopCloser(bytes.NewReader(body))
// do not modify existing GetBody function
if req.GetBody == nil {
req.GetBody = func() (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(body)), nil
}
}
}