/
WebsocketConnection.java
298 lines (249 loc) · 10 KB
/
WebsocketConnection.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
package com.github.twitch4j.client.websocket;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.util.ExponentialBackoffStrategy;
import com.neovisionaries.ws.client.WebSocket;
import com.neovisionaries.ws.client.WebSocketAdapter;
import com.neovisionaries.ws.client.WebSocketFactory;
import com.neovisionaries.ws.client.WebSocketFrame;
import lombok.Getter;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@Slf4j
public class WebsocketConnection implements AutoCloseable {
/**
* connection configuration
*/
@Getter
protected final WebsocketConnectionConfig config;
/**
* holds the underlying webSocket
*/
private volatile WebSocket webSocket;
/**
* connection state
*/
private final AtomicReference<WebsocketConnectionState> connectionState = new AtomicReference<>(WebsocketConnectionState.DISCONNECTED);
/**
* Calls {@link ExponentialBackoffStrategy#reset()} upon a successful websocket connection
*/
private volatile Future<?> backoffClearer;
/**
* WebSocket Factory
*/
protected final WebSocketFactory webSocketFactory;
/**
* WebSocket Adapter
*/
protected final WebSocketAdapter webSocketAdapter;
/**
* Tracks the timestamp of the last outbound ping
*/
protected final AtomicLong lastPing = new AtomicLong();
@Getter
protected volatile long latency = -1L;
/**
* TwitchWebsocketConnection
*
* @param configSpec the websocket connection configuration
*/
public WebsocketConnection(Consumer<WebsocketConnectionConfig> configSpec) {
config = WebsocketConnectionConfig.process(configSpec);
// webSocketFactory and proxy configuration
this.webSocketFactory = new WebSocketFactory();
if (config.proxyConfig() != null) {
webSocketFactory.getProxySettings()
.setHost(config.proxyConfig().getHostname())
.setPort(config.proxyConfig().getPort())
.setId(config.proxyConfig().getUsername())
.setPassword(config.proxyConfig().getPassword() == null ? null : String.valueOf(config.proxyConfig().getPassword()));
}
// adapter
webSocketAdapter = new WebSocketAdapter() {
@Override
public void onConnected(WebSocket ws, Map<String, List<String>> headers) {
// hook: on connected
config.onConnected().run();
// Connection Success
setState(WebsocketConnectionState.CONNECTED);
backoffClearer = config.taskExecutor().schedule(() -> {
if (connectionState.get() == WebsocketConnectionState.CONNECTED)
config.backoffStrategy().reset();
}, 30, TimeUnit.SECONDS);
}
@Override
public void onTextMessage(WebSocket ws, String text) {
// hook: on text message
config.onTextMessage().accept(text);
}
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
if (connectionState.get() != WebsocketConnectionState.DISCONNECTING) {
closeSocket(); // avoid possible resource leak
setState(WebsocketConnectionState.LOST);
log.info("Connection to WebSocket [{}] lost! Retrying soon ...", config.baseUrl());
// connection lost - reconnecting
if (backoffClearer != null) backoffClearer.cancel(false);
long reconnectDelay = config.backoffStrategy().get();
if (reconnectDelay < 0) {
log.debug("Maximum retry count for websocket reconnection attempts was hit.");
config.backoffStrategy().reset(); // start fresh on the next manual connect() call
} else {
config.taskExecutor().schedule(() -> {
WebsocketConnectionState state = connectionState.get();
if (state != WebsocketConnectionState.CONNECTING && state != WebsocketConnectionState.CONNECTED)
reconnect();
}, reconnectDelay, TimeUnit.MILLISECONDS);
}
} else {
setState(WebsocketConnectionState.DISCONNECTED);
log.info("Disconnected from WebSocket [{}]!", config.baseUrl());
}
}
@Override
public void onFrameSent(WebSocket websocket, WebSocketFrame frame) {
if (frame != null && frame.isPingFrame()) {
lastPing.compareAndSet(0L, System.currentTimeMillis());
}
}
@Override
public void onPongFrame(WebSocket websocket, WebSocketFrame frame) {
final long last = lastPing.getAndSet(0L);
if (last > 0) {
latency = System.currentTimeMillis() - last;
log.trace("T4J Websocket: Round-trip socket latency recorded at {} ms.", latency);
}
}
};
}
protected WebSocket createWebsocket() throws IOException {
WebSocket ws = webSocketFactory.createSocket(config.baseUrl());
ws.setPingInterval(config.wsPingPeriod());
if (config.headers() != null)
config.headers().forEach(ws::addHeader);
ws.clearListeners();
ws.addListener(webSocketAdapter);
return ws;
}
protected void setState(WebsocketConnectionState newState) {
WebsocketConnectionState oldState = connectionState.getAndSet(newState);
if (oldState != newState) {
config.onStateChanged().accept(oldState, newState);
}
}
/**
* Connect to the WebSocket
*/
@Synchronized
public void connect() {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState == WebsocketConnectionState.DISCONNECTED || connectionState == WebsocketConnectionState.RECONNECTING || connectionState == WebsocketConnectionState.LOST) {
try {
// avoid any resource leaks
this.closeSocket();
// hook: on pre connect
config.onPreConnect().run();
// Change Connection State
setState(WebsocketConnectionState.CONNECTING);
// init websocket
webSocket = createWebsocket();
// connect
this.webSocket.connect();
// hook: post connect
config.onPostConnect().run();
} catch (Exception ex) {
final long retryDelay = config.backoffStrategy().get();
if (retryDelay < 0) {
log.error("failed to connect to webSocket server {} and max retries were hit.", config.baseUrl(), ex);
config.backoffStrategy().reset(); // start fresh on the next manual connect() call
return;
}
log.error("connection to webSocket server {} failed: retrying ...", config.baseUrl(), ex);
// Sleep before trying to reconnect
try {
Thread.sleep(retryDelay);
} catch (Exception ignored) {
} finally {
// reconnect
reconnect();
}
}
}
}
/**
* Disconnect from the WebSocket
*/
@Synchronized
public void disconnect() {
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState == WebsocketConnectionState.DISCONNECTED) {
// have already disconnected
return;
}
if (connectionState == WebsocketConnectionState.CONNECTED || connectionState == WebsocketConnectionState.LOST) {
// hook: disconnecting
config.onDisconnecting().run();
setState(WebsocketConnectionState.DISCONNECTING);
}
// hook: pre disconnect
config.onPreDisconnect().run();
// CleanUp
this.closeSocket();
// update state
setState(WebsocketConnectionState.DISCONNECTED);
// hook: post disconnect
config.onPostDisconnect().run();
}
/**
* Reconnecting to the WebSocket
*/
@Synchronized
public void reconnect() {
setState(WebsocketConnectionState.RECONNECTING);
disconnect();
connect();
}
/**
* sends a message to the websocket server
*
* @param message message content
*/
public boolean sendText(String message) {
// only send if state is CONNECTING or CONNECTED
WebsocketConnectionState connectionState = this.connectionState.get();
if (connectionState != WebsocketConnectionState.CONNECTED && connectionState != WebsocketConnectionState.CONNECTING) {
return false;
}
this.webSocket.sendText(message);
return true;
}
/**
* @return the socket's connection state
*/
public WebsocketConnectionState getConnectionState() {
return connectionState.get();
}
@Override
public void close() throws Exception {
disconnect();
}
@Synchronized
private void closeSocket() {
// Clean up the socket
if (webSocket != null) {
this.webSocket.disconnect();
this.webSocket.clearListeners();
this.webSocket = null;
}
// Reset latency tracker
this.latency = -1L;
lastPing.lazySet(0L);
}
}