-
Notifications
You must be signed in to change notification settings - Fork 214
/
protocol_lifecycle.go
143 lines (121 loc) · 3.22 KB
/
protocol_lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package http
import (
"context"
"fmt"
"net"
"net/http"
"strings"
"github.com/cloudevents/sdk-go/v2/protocol"
)
var _ protocol.Opener = (*Protocol)(nil)
func (p *Protocol) OpenInbound(ctx context.Context) error {
p.reMu.Lock()
defer p.reMu.Unlock()
if p.Handler == nil {
p.Handler = http.NewServeMux()
}
if !p.handlerRegistered {
// handler.Handle might panic if the user tries to use the same path as the sdk.
p.Handler.Handle(p.GetPath(), p)
p.handlerRegistered = true
}
// After listener is invok
listener, err := p.listen()
if err != nil {
return err
}
p.server = &http.Server{
Addr: listener.Addr().String(),
Handler: attachMiddleware(p.Handler, p.middleware),
ReadTimeout: DefaultTimeout,
WriteTimeout: DefaultTimeout,
}
// Shutdown
defer func() {
_ = p.server.Close()
p.server = nil
}()
errChan := make(chan error)
go func() {
errChan <- p.server.Serve(listener)
}()
// wait for the server to return or ctx.Done().
select {
case <-ctx.Done():
// Try a graceful shutdown.
ctx, cancel := context.WithTimeout(context.Background(), p.ShutdownTimeout)
defer cancel()
shdwnErr := p.server.Shutdown(ctx)
if shdwnErr != nil {
shdwnErr = fmt.Errorf("shutting down HTTP server: %w", shdwnErr)
}
// Wait for server goroutine to exit
rntmErr := <-errChan
if rntmErr != nil && rntmErr != http.ErrServerClosed {
rntmErr = fmt.Errorf("server failed during shutdown: %w", rntmErr)
if shdwnErr != nil {
return fmt.Errorf("combined error during shutdown of HTTP server: %w, %v",
shdwnErr, rntmErr)
}
return rntmErr
}
return shdwnErr
case err := <-errChan:
if err != nil {
return fmt.Errorf("during runtime of HTTP server: %w", err)
}
return nil
}
}
// GetListeningPort returns the listening port.
// Returns -1 if it's not listening.
func (p *Protocol) GetListeningPort() int {
if listener := p.listener.Load(); listener != nil {
if tcpAddr, ok := listener.(net.Listener).Addr().(*net.TCPAddr); ok {
return tcpAddr.Port
}
}
return -1
}
// listen if not already listening, update t.Port
func (p *Protocol) listen() (net.Listener, error) {
if p.listener.Load() == nil {
port := 8080
if p.Port != -1 {
port = p.Port
if port < 0 || port > 65535 {
return nil, fmt.Errorf("invalid port %d", port)
}
}
var err error
var listener net.Listener
if listener, err = net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil {
return nil, err
}
p.listener.Store(listener)
return listener, nil
}
return p.listener.Load().(net.Listener), nil
}
// GetPath returns the path the transport is hosted on. If the path is '/',
// the transport will handle requests on any URI. To discover the true path
// a request was received on, inspect the context from Receive(cxt, ...) with
// TransportContextFrom(ctx).
func (p *Protocol) GetPath() string {
path := strings.TrimSpace(p.Path)
if len(path) > 0 {
return path
}
return "/" // default
}
// attachMiddleware attaches the HTTP middleware to the specified handler.
func attachMiddleware(h http.Handler, middleware []Middleware) http.Handler {
for _, m := range middleware {
h = m(h)
}
return h
}