From 4b1dbb02f5cca2e9c0f0b672f795670c0e585d7a Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 14 Mar 2024 00:49:22 +0200 Subject: [PATCH 01/10] implement heartbeat for websockets --- ...nfiguration__tests__schema_generation.snap | 42 +++++++++++++++ apollo-router/src/plugins/subscription.rs | 28 +++++++++- apollo-router/src/protocols/websocket.rs | 51 ++++++++++++++++--- .../src/services/subgraph_service.rs | 10 ++++ .../subscription-support.mdx | 4 +- 5 files changed, 127 insertions(+), 8 deletions(-) diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 2aab96b824..3b67a48346 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -2464,6 +2464,27 @@ expression: "&schema" "default": null, "type": "object", "properties": { + "heartbeat_interval": { + "description": "Heartbeat interval for graphql-ws protocol (default: disabled)", + "default": "disabled", + "anyOf": [ + { + "type": "string", + "enum": [ + "disabled" + ] + }, + { + "type": "string", + "enum": [ + "enabled" + ] + }, + { + "type": "string" + } + ] + }, "path": { "description": "Path on which WebSockets are listening", "default": null, @@ -2491,6 +2512,27 @@ expression: "&schema" "description": "WebSocket configuration for a specific subgraph", "type": "object", "properties": { + "heartbeat_interval": { + "description": "Heartbeat interval for graphql-ws protocol (default: disabled)", + "default": "disabled", + "anyOf": [ + { + "type": "string", + "enum": [ + "disabled" + ] + }, + { + "type": "string", + "enum": [ + "enabled" + ] + }, + { + "type": "string" + } + ] + }, "path": { "description": "Path on which WebSockets are listening", "default": null, diff --git a/apollo-router/src/plugins/subscription.rs b/apollo-router/src/plugins/subscription.rs index 335e3e770b..9d1ed3aef3 100644 --- a/apollo-router/src/plugins/subscription.rs +++ b/apollo-router/src/plugins/subscription.rs @@ -43,6 +43,7 @@ use crate::Endpoint; use crate::ListenAddr; type HmacSha256 = Hmac; +pub(crate) const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); pub(crate) const APOLLO_SUBSCRIPTION_PLUGIN: &str = "apollo.subscription"; #[cfg(not(test))] pub(crate) const APOLLO_SUBSCRIPTION_PLUGIN_NAME: &str = "subscription"; @@ -185,7 +186,30 @@ pub(crate) enum Disabled { impl Default for HeartbeatInterval { fn default() -> Self { - Self::Duration(Duration::from_secs(5)) + Self::Duration(DEFAULT_HEARTBEAT_INTERVAL) + } +} + +// Differs from HeartbeatInterval since it's disable by default +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case", untagged)] +pub(crate) enum HeartbeatIntervalWs { + Disabled(Disabled), + Enabled(Enabled), + #[serde(with = "humantime_serde")] + #[schemars(with = "String")] + Duration(Duration), +} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub(crate) enum Enabled { + Enabled, +} + +impl Default for HeartbeatIntervalWs { + fn default() -> Self { + Self::Disabled(Disabled::Disabled) } } @@ -205,6 +229,8 @@ pub(crate) struct WebSocketConfiguration { pub(crate) path: Option, /// Which WebSocket GraphQL protocol to use for this subgraph possible values are: 'graphql_ws' | 'graphql_transport_ws' (default: graphql_ws) pub(crate) protocol: WebSocketProtocol, + /// Heartbeat interval for graphql-ws protocol (default: disabled) + pub(crate) heartbeat_interval: HeartbeatIntervalWs, } fn default_path() -> String { diff --git a/apollo-router/src/protocols/websocket.rs b/apollo-router/src/protocols/websocket.rs index 4dc8663b9f..4349671590 100644 --- a/apollo-router/src/protocols/websocket.rs +++ b/apollo-router/src/protocols/websocket.rs @@ -219,6 +219,7 @@ pub(crate) struct GraphqlWebSocket { stream: S, id: String, protocol: WebSocketProtocol, + heartbeat_interval: Option, // Booleans for state machine when closing the stream completed: bool, terminated: bool, @@ -234,6 +235,7 @@ where id: String, protocol: WebSocketProtocol, connection_params: Option, + heartbeat_interval: Option, ) -> Result { let connection_init_msg = match connection_params { Some(connection_params) => ClientMessage::ConnectionInit { @@ -285,10 +287,21 @@ where .build()); } + let heartbeat_interval = if protocol == WebSocketProtocol::GraphqlWs { + heartbeat_interval.map(|duration| { + let mut interval = tokio::time::interval(duration); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval + }) + } else { + None + }; + Ok(Self { stream, id, protocol, + heartbeat_interval, completed: false, terminated: false, }) @@ -313,12 +326,12 @@ where stream .with(|client_message: ClientMessage| { // It applies to the Sink - future::ready(match serde_json::to_string(&client_message) { + future::ready(match serde_json::to_string(&dbg!(client_message)) { Ok(client_message_str) => Ok(Message::Text(client_message_str)), Err(err) => Err(Error::SerdeError(err)), }) }) - .map(move |msg| match msg { + .map(move |msg| match dbg!(msg) { // It applies to the Stream Ok(Message::Text(text)) => serde_json::from_str(&text), Ok(Message::Binary(bin)) => serde_json::from_slice(&bin), @@ -372,11 +385,15 @@ where cx: &mut std::task::Context<'_>, ) -> Poll> { let mut this = self.as_mut().project(); + let mut stream = Pin::new(&mut this.stream); - match Pin::new(&mut this.stream).poll_next(cx) { + match stream.as_mut().poll_next(cx) { Poll::Ready(message) => match message { Some(server_message) => match server_message { Ok(server_message) => { + if let Some(heartbeat_interval) = this.heartbeat_interval { + heartbeat_interval.reset(); + } if let Some(id) = &server_message.id() { if this.id != id { tracing::error!("we should not receive data from other subscriptions, closing the stream"); @@ -386,8 +403,7 @@ where if let ServerMessage::Ping { .. } = server_message { // Send pong asynchronously let _ = Pin::new( - &mut Pin::new(&mut this.stream) - .send(ClientMessage::Pong { payload: None }), + &mut stream.as_mut().send(ClientMessage::Pong { payload: None }), ) .poll(cx); } @@ -414,11 +430,32 @@ where }, None => Poll::Ready(None), }, - Poll::Pending => Poll::Pending, + Poll::Pending => { + if let Some(heartbeat_interval) = this.heartbeat_interval { + match heartbeat_interval.poll_tick(cx) { + Poll::Ready(_) => send_heartbeat(this.stream, cx), + Poll::Pending => (), + }; + } + Poll::Pending + } } } } +fn send_heartbeat(mut stream: Pin<&mut impl Sink>, cx: &mut std::task::Context<'_>) { + match stream.as_mut().poll_flush(cx) { + Poll::Ready(Ok(_)) => match stream.as_mut().poll_ready(cx) { + Poll::Ready(Ok(_)) => { + // Ignore error + let _ = stream.start_send(ClientMessage::Ping { payload: None }); + } + _ => (), + }, + _ => (), + } +} + impl Sink for GraphqlWebSocket where S: Stream> + Sink, @@ -800,6 +837,7 @@ mod tests { Some(serde_json_bytes::json!({ "token": "XXX" })), + None, ) .await .unwrap(); @@ -865,6 +903,7 @@ mod tests { sub_uuid.to_string(), WebSocketProtocol::SubscriptionsTransportWs, None, + None, ) .await .unwrap(); diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 394eaf4135..b4d85ef5a7 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -45,6 +45,8 @@ use crate::graphql; use crate::json_ext::Object; use crate::plugins::authentication::subgraph::SigningParamsConfig; use crate::plugins::file_uploads; +use crate::plugins::subscription::DEFAULT_HEARTBEAT_INTERVAL; +use crate::plugins::subscription::HeartbeatIntervalWs; use crate::plugins::subscription::create_verifier; use crate::plugins::subscription::CallbackMode; use crate::plugins::subscription::HeartbeatInterval; @@ -574,11 +576,18 @@ async fn call_websocket( ); } + let heartbeat_interval = match subgraph_cfg.heartbeat_interval { + HeartbeatIntervalWs::Disabled(_) => None, + HeartbeatIntervalWs::Enabled(_) => Some(DEFAULT_HEARTBEAT_INTERVAL), + HeartbeatIntervalWs::Duration(duration) => Some(duration), + }; + let mut gql_stream = GraphqlWebSocket::new( convert_websocket_stream(ws_stream, subscription_hash.clone()), subscription_hash, subgraph_cfg.protocol, connection_params, + heartbeat_interval, ) .await .map_err(|_| FetchError::SubrequestWsError { @@ -1648,6 +1657,7 @@ mod tests { WebSocketConfiguration { path: Some(String::from("/ws")), protocol: WebSocketProtocol::default(), + heartbeat_interval: HeartbeatIntervalWs::default(), }, )] .into(), diff --git a/docs/source/executing-operations/subscription-support.mdx b/docs/source/executing-operations/subscription-support.mdx index 48c2fae617..3bcfb5b08c 100644 --- a/docs/source/executing-operations/subscription-support.mdx +++ b/docs/source/executing-operations/subscription-support.mdx @@ -101,9 +101,11 @@ subscription: reviews: # Overrides settings for the 'reviews' subgraph path: /ws # Absolute path that overrides '/subscriptions' defined above protocol: graphql_ws # The WebSocket-based subprotocol to use for subscription communication (Default: graphql_ws) + heartbeat_interval: enable # Optional (Default: disabled) ``` This example enables subscriptions in **passthrough mode**, which uses long-lived WebSocket connections. +Note: If your subgraph implementation (e.g. [DGS](https://netflix.github.io/dgs/)) can close idle connection, set `heartbest_interval` to keep connection alive. The router supports the following WebSocket subprotocols, specified via the `protocol` option: @@ -160,7 +162,7 @@ subscription: - accounts ``` -You can disable the heartbeat by setting `heartbeat_interval_ms: disabled`. This is useful for example if you're running in callback mode in an infrastructure based on lambda functions, where you prefer neither to send heartbeats nor to keep a lambda awake just to send heartbeats to subscriptions. +You can disable the heartbeat by setting `heartbeat_interval: disabled`. This is useful for example if you're running in callback mode in an infrastructure based on lambda functions, where you prefer neither to send heartbeats nor to keep a lambda awake just to send heartbeats to subscriptions. From 0b82e0c76b1c69330f6e3e5fb1ddf8fd9572bfc0 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 14 Mar 2024 01:28:41 +0200 Subject: [PATCH 02/10] Unify into single HeartbeatInterval type --- ...nfiguration__tests__schema_generation.snap | 11 ++- apollo-router/src/plugins/subscription.rs | 79 +++++++++---------- .../src/services/subgraph_service.rs | 27 ++----- 3 files changed, 54 insertions(+), 63 deletions(-) diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 3b67a48346..d3c897cd2f 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -2404,7 +2404,7 @@ expression: "&schema" "properties": { "heartbeat_interval": { "description": "Heartbeat interval for callback mode (default: 5secs)", - "default": "5s", + "default": "enabled", "anyOf": [ { "type": "string", @@ -2412,6 +2412,13 @@ expression: "&schema" "disabled" ] }, + { + "description": "enable with default interval of 5s", + "type": "string", + "enum": [ + "enabled" + ] + }, { "type": "string" } @@ -2475,6 +2482,7 @@ expression: "&schema" ] }, { + "description": "enable with default interval of 5s", "type": "string", "enum": [ "enabled" @@ -2523,6 +2531,7 @@ expression: "&schema" ] }, { + "description": "enable with default interval of 5s", "type": "string", "enum": [ "enabled" diff --git a/apollo-router/src/plugins/subscription.rs b/apollo-router/src/plugins/subscription.rs index 9d1ed3aef3..0d62372e83 100644 --- a/apollo-router/src/plugins/subscription.rs +++ b/apollo-router/src/plugins/subscription.rs @@ -43,7 +43,6 @@ use crate::Endpoint; use crate::ListenAddr; type HmacSha256 = Hmac; -pub(crate) const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); pub(crate) const APOLLO_SUBSCRIPTION_PLUGIN: &str = "apollo.subscription"; #[cfg(not(test))] pub(crate) const APOLLO_SUBSCRIPTION_PLUGIN_NAME: &str = "subscription"; @@ -152,7 +151,7 @@ pub(crate) struct CallbackMode { pub(crate) public_url: url::Url, /// Heartbeat interval for callback mode (default: 5secs) - #[serde(default = "HeartbeatInterval::default")] + #[serde(default = "HeartbeatInterval::new_enabled")] pub(crate) heartbeat_interval: HeartbeatInterval, // `skip_serializing` We don't need it in the context /// Listen address on which the callback must listen (default: 127.0.0.1:4000) @@ -173,32 +172,33 @@ pub(crate) struct CallbackMode { #[serde(rename_all = "snake_case", untagged)] pub(crate) enum HeartbeatInterval { Disabled(Disabled), + /// enable with default interval of 5s + Enabled(Enabled), #[serde(with = "humantime_serde")] #[schemars(with = "String")] Duration(Duration), } -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] -#[serde(rename_all = "snake_case")] -pub(crate) enum Disabled { - Disabled, -} - -impl Default for HeartbeatInterval { - fn default() -> Self { - Self::Duration(DEFAULT_HEARTBEAT_INTERVAL) +impl HeartbeatInterval { + pub(crate) fn new_enabled() -> Self { + Self::Enabled(Enabled::Enabled) + } + pub(crate) fn new_disabled() -> Self { + Self::Disabled(Disabled::Disabled) + } + pub(crate) fn into_option(&self) -> Option { + match self { + Self::Disabled(_) => None, + Self::Enabled(_) => Some(Duration::from_secs(5)), + Self::Duration(duration) => Some(*duration), + } } } -// Differs from HeartbeatInterval since it's disable by default #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] -#[serde(rename_all = "snake_case", untagged)] -pub(crate) enum HeartbeatIntervalWs { - Disabled(Disabled), - Enabled(Enabled), - #[serde(with = "humantime_serde")] - #[schemars(with = "String")] - Duration(Duration), +#[serde(rename_all = "snake_case")] +pub(crate) enum Disabled { + Disabled, } #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] @@ -207,12 +207,6 @@ pub(crate) enum Enabled { Enabled, } -impl Default for HeartbeatIntervalWs { - fn default() -> Self { - Self::Disabled(Disabled::Disabled) - } -} - /// Using websocket to directly connect to subgraph #[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize, JsonSchema)] #[serde(deny_unknown_fields, default)] @@ -221,16 +215,19 @@ pub(crate) struct PassthroughMode { subgraph: SubgraphPassthroughMode, } -#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize, JsonSchema)] -#[serde(deny_unknown_fields, default)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(deny_unknown_fields)] /// WebSocket configuration for a specific subgraph pub(crate) struct WebSocketConfiguration { /// Path on which WebSockets are listening + #[serde(default)] pub(crate) path: Option, /// Which WebSocket GraphQL protocol to use for this subgraph possible values are: 'graphql_ws' | 'graphql_transport_ws' (default: graphql_ws) + #[serde(default)] pub(crate) protocol: WebSocketProtocol, /// Heartbeat interval for graphql-ws protocol (default: disabled) - pub(crate) heartbeat_interval: HeartbeatIntervalWs, + #[serde(default = "HeartbeatInterval::new_disabled")] + pub(crate) heartbeat_interval: HeartbeatInterval, } fn default_path() -> String { @@ -254,21 +251,17 @@ impl Plugin for Subscription { .clone(), ); #[cfg(not(test))] - match init - .config - .mode - .callback - .as_ref() - .expect("we checked in the condition the callback conf") - .heartbeat_interval - { - HeartbeatInterval::Duration(duration) => { - init.notify.set_ttl(Some(duration)).await?; - } - HeartbeatInterval::Disabled(_) => { - init.notify.set_ttl(None).await?; - } - } + init.notify + .set_ttl( + init.config + .mode + .callback + .as_ref() + .expect("we checked in the condition the callback conf") + .heartbeat_interval + .into_option(), + ) + .await?; } Ok(Subscription { diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index b4d85ef5a7..3f9dfe09f1 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -45,11 +45,8 @@ use crate::graphql; use crate::json_ext::Object; use crate::plugins::authentication::subgraph::SigningParamsConfig; use crate::plugins::file_uploads; -use crate::plugins::subscription::DEFAULT_HEARTBEAT_INTERVAL; -use crate::plugins::subscription::HeartbeatIntervalWs; use crate::plugins::subscription::create_verifier; use crate::plugins::subscription::CallbackMode; -use crate::plugins::subscription::HeartbeatInterval; use crate::plugins::subscription::SubscriptionConfig; use crate::plugins::subscription::SubscriptionMode; use crate::plugins::subscription::WebSocketConfiguration; @@ -332,12 +329,10 @@ impl tower::Service for SubgraphService { subscription_id, callback_url, verifier, - heartbeat_interval_ms: match heartbeat_interval { - HeartbeatInterval::Disabled(_) => 0, - HeartbeatInterval::Duration(duration) => { - duration.as_millis() as u64 - } - }, + heartbeat_interval_ms: heartbeat_interval + .into_option() + .map(|duration| duration.as_millis() as u64) + .unwrap_or(0), }; body.extensions.insert( "subscription", @@ -576,18 +571,12 @@ async fn call_websocket( ); } - let heartbeat_interval = match subgraph_cfg.heartbeat_interval { - HeartbeatIntervalWs::Disabled(_) => None, - HeartbeatIntervalWs::Enabled(_) => Some(DEFAULT_HEARTBEAT_INTERVAL), - HeartbeatIntervalWs::Duration(duration) => Some(duration), - }; - let mut gql_stream = GraphqlWebSocket::new( convert_websocket_stream(ws_stream, subscription_hash.clone()), subscription_hash, subgraph_cfg.protocol, connection_params, - heartbeat_interval, + subgraph_cfg.heartbeat_interval.into_option(), ) .await .map_err(|_| FetchError::SubrequestWsError { @@ -1063,7 +1052,7 @@ mod tests { use crate::graphql::Error; use crate::graphql::Request; use crate::graphql::Response; - use crate::plugins::subscription::Disabled; + use crate::plugins::subscription::HeartbeatInterval; use crate::plugins::subscription::SubgraphPassthroughMode; use crate::plugins::subscription::SubscriptionModeConfig; use crate::plugins::subscription::SUBSCRIPTION_CALLBACK_HMAC_KEY; @@ -1648,7 +1637,7 @@ mod tests { listen: None, path: Some("/testcallback".to_string()), subgraphs: vec![String::from("testbis")].into_iter().collect(), - heartbeat_interval: HeartbeatInterval::Disabled(Disabled::Disabled), + heartbeat_interval: HeartbeatInterval::new_disabled(), }), passthrough: Some(SubgraphPassthroughMode { all: None, @@ -1657,7 +1646,7 @@ mod tests { WebSocketConfiguration { path: Some(String::from("/ws")), protocol: WebSocketProtocol::default(), - heartbeat_interval: HeartbeatIntervalWs::default(), + heartbeat_interval: HeartbeatInterval::new_disabled(), }, )] .into(), From 1947d3f5d735d33c845841febe74a8cb7de655dc Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 14 Mar 2024 04:29:17 +0200 Subject: [PATCH 03/10] remove dbg + add unique payload to ping --- apollo-router/src/protocols/websocket.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/apollo-router/src/protocols/websocket.rs b/apollo-router/src/protocols/websocket.rs index 4349671590..f1bf3533e2 100644 --- a/apollo-router/src/protocols/websocket.rs +++ b/apollo-router/src/protocols/websocket.rs @@ -289,7 +289,8 @@ where let heartbeat_interval = if protocol == WebSocketProtocol::GraphqlWs { heartbeat_interval.map(|duration| { - let mut interval = tokio::time::interval(duration); + let mut interval = + tokio::time::interval_at(tokio::time::Instant::now() + duration, duration); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval }) @@ -326,12 +327,12 @@ where stream .with(|client_message: ClientMessage| { // It applies to the Sink - future::ready(match serde_json::to_string(&dbg!(client_message)) { + future::ready(match serde_json::to_string(&client_message) { Ok(client_message_str) => Ok(Message::Text(client_message_str)), Err(err) => Err(Error::SerdeError(err)), }) }) - .map(move |msg| match dbg!(msg) { + .map(move |msg| match msg { // It applies to the Stream Ok(Message::Text(text)) => serde_json::from_str(&text), Ok(Message::Binary(bin)) => serde_json::from_slice(&bin), @@ -448,7 +449,11 @@ fn send_heartbeat(mut stream: Pin<&mut impl Sink>, cx: &mut std:: Poll::Ready(Ok(_)) => match stream.as_mut().poll_ready(cx) { Poll::Ready(Ok(_)) => { // Ignore error - let _ = stream.start_send(ClientMessage::Ping { payload: None }); + let _ = stream.start_send(ClientMessage::Ping { + payload: Some(serde_json_bytes::Value::String( + "APOLLO_ROUTER_HEARTBEAT".into() + )) + }); } _ => (), }, From efcb2bfb10d242ab34cca5598e03df73eeb2d6b8 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 14 Mar 2024 04:59:53 +0200 Subject: [PATCH 04/10] fix lint errors --- apollo-router/src/plugins/subscription.rs | 12 ++++++------ apollo-router/src/protocols/websocket.rs | 22 +++++++++------------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/apollo-router/src/plugins/subscription.rs b/apollo-router/src/plugins/subscription.rs index 0d62372e83..3c3ae74f90 100644 --- a/apollo-router/src/plugins/subscription.rs +++ b/apollo-router/src/plugins/subscription.rs @@ -112,7 +112,7 @@ impl SubscriptionModeConfig { if callback_cfg.subgraphs.contains(service_name) || callback_cfg.subgraphs.is_empty() { let callback_cfg = CallbackMode { public_url: callback_cfg.public_url.clone(), - heartbeat_interval: callback_cfg.heartbeat_interval.clone(), + heartbeat_interval: callback_cfg.heartbeat_interval, listen: callback_cfg.listen.clone(), path: callback_cfg.path.clone(), subgraphs: HashSet::new(), // We don't need it @@ -168,7 +168,7 @@ pub(crate) struct CallbackMode { pub(crate) subgraphs: HashSet, } -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "snake_case", untagged)] pub(crate) enum HeartbeatInterval { Disabled(Disabled), @@ -186,22 +186,22 @@ impl HeartbeatInterval { pub(crate) fn new_disabled() -> Self { Self::Disabled(Disabled::Disabled) } - pub(crate) fn into_option(&self) -> Option { + pub(crate) fn into_option(self) -> Option { match self { Self::Disabled(_) => None, Self::Enabled(_) => Some(Duration::from_secs(5)), - Self::Duration(duration) => Some(*duration), + Self::Duration(duration) => Some(duration), } } } -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub(crate) enum Disabled { Disabled, } -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub(crate) enum Enabled { Enabled, diff --git a/apollo-router/src/protocols/websocket.rs b/apollo-router/src/protocols/websocket.rs index f1bf3533e2..81ac20165c 100644 --- a/apollo-router/src/protocols/websocket.rs +++ b/apollo-router/src/protocols/websocket.rs @@ -445,19 +445,15 @@ where } fn send_heartbeat(mut stream: Pin<&mut impl Sink>, cx: &mut std::task::Context<'_>) { - match stream.as_mut().poll_flush(cx) { - Poll::Ready(Ok(_)) => match stream.as_mut().poll_ready(cx) { - Poll::Ready(Ok(_)) => { - // Ignore error - let _ = stream.start_send(ClientMessage::Ping { - payload: Some(serde_json_bytes::Value::String( - "APOLLO_ROUTER_HEARTBEAT".into() - )) - }); - } - _ => (), - }, - _ => (), + if stream.as_mut().poll_flush(cx).map(|result| result.is_ok()) == Poll::Ready(true) + && stream.as_mut().poll_ready(cx).map(|result| result.is_ok()) == Poll::Ready(true) + { + // Ignore error + let _ = stream.start_send(ClientMessage::Ping { + payload: Some(serde_json_bytes::Value::String( + "APOLLO_ROUTER_HEARTBEAT".into(), + )), + }); } } From 9d245df25ce9e503d1c909ba1e90ad54c8c1bed7 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Thu, 14 Mar 2024 05:18:06 +0200 Subject: [PATCH 05/10] Add changelog --- .../feat_subscription_websocket_heartbeat.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .changesets/feat_subscription_websocket_heartbeat.md diff --git a/.changesets/feat_subscription_websocket_heartbeat.md b/.changesets/feat_subscription_websocket_heartbeat.md new file mode 100644 index 0000000000..440bb16c95 --- /dev/null +++ b/.changesets/feat_subscription_websocket_heartbeat.md @@ -0,0 +1,17 @@ +### feat(subscription): add configurable heartbeat for websocket protocol ([Issue #4621](https://github.com/apollographql/router/issues/4621)) + +Add the ability to enable heartbeat for cases where the subgraph drops idle connections. +For example, https://netflix.github.io/dgs/ + +Example of configuration: + +```yaml +subscription: + mode: + passthrough: + all: + path: /graphql + heartbeat_interval: enable #Optional + ``` + +By [@IvanGoncharov](https://github.com/IvanGoncharov) in https://github.com/apollographql/router/pull/4802 From 8ebebd33ec030d7d1ede3213ed2d8e7f2d2b551a Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Wed, 20 Mar 2024 08:59:16 +0200 Subject: [PATCH 06/10] rewrite and move heartbeat into separate task --- apollo-router/src/protocols/websocket.rs | 324 ++++++++++++------ .../src/services/subgraph_service.rs | 14 +- 2 files changed, 224 insertions(+), 114 deletions(-) diff --git a/apollo-router/src/protocols/websocket.rs b/apollo-router/src/protocols/websocket.rs index 81ac20165c..d254a5ecbb 100644 --- a/apollo-router/src/protocols/websocket.rs +++ b/apollo-router/src/protocols/websocket.rs @@ -3,6 +3,7 @@ use std::task::Poll; use std::time::Duration; use futures::future; +use futures::stream::SplitStream; use futures::Future; use futures::Sink; use futures::SinkExt; @@ -16,6 +17,7 @@ use serde::Serialize; use serde_json_bytes::Value; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio_stream::wrappers::IntervalStream; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::Message; @@ -213,29 +215,25 @@ impl ServerMessage { } } -pin_project! { pub(crate) struct GraphqlWebSocket { - #[pin] stream: S, id: String, protocol: WebSocketProtocol, - heartbeat_interval: Option, - // Booleans for state machine when closing the stream - completed: bool, - terminated: bool, -} } impl GraphqlWebSocket where - S: Stream> + Sink + std::marker::Unpin, + S: Stream> + + Sink + + std::marker::Unpin + + std::marker::Send + + 'static, { pub(crate) async fn new( mut stream: S, id: String, protocol: WebSocketProtocol, connection_params: Option, - heartbeat_interval: Option, ) -> Result { let connection_init_msg = match connection_params { Some(connection_params) => ClientMessage::ConnectionInit { @@ -287,26 +285,41 @@ where .build()); } - let heartbeat_interval = if protocol == WebSocketProtocol::GraphqlWs { - heartbeat_interval.map(|duration| { - let mut interval = - tokio::time::interval_at(tokio::time::Instant::now() + duration, duration); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - interval - }) - } else { - None - }; - Ok(Self { stream, id, protocol, - heartbeat_interval, - completed: false, - terminated: false, }) } + + pub(crate) async fn into_subscription( + mut self, + request: graphql::Request, + heartbeat_interval: Option, + ) -> Result, graphql::Error> { + tracing::info!( + monotonic_counter + .apollo + .router + .operations + .subscriptions + .events = 1u64, + subscriptions.mode = "passthrough" + ); + + self.stream + .send(self.protocol.subscribe(self.id.to_string(), request)) + .await + .map(|_| { + SubscriptionStream::new(self.stream, self.id, self.protocol, heartbeat_interval) + }) + .map_err(|_err| { + graphql::Error::builder() + .message("cannot send to websocket connection") + .extension_code("WEBSOCKET_CONNECTION_ERROR") + .build() + }) + } } #[derive(thiserror::Error, Debug)] @@ -375,7 +388,129 @@ where }) } -impl Stream for GraphqlWebSocket +pub(crate) struct SubscriptionStream { + inner_stream: SplitStream>, + close_signal: Option>, +} + +impl SubscriptionStream +where + S: Stream> + + Sink + + std::marker::Unpin + + std::marker::Send + + 'static, +{ + pub(crate) fn new( + stream: S, + id: String, + protocol: WebSocketProtocol, + heartbeat_interval: Option, + ) -> Self { + let (mut sink, inner_stream) = InnerStream::new(stream, id, protocol).split(); + let (close_signal, close_sentinel) = tokio::sync::oneshot::channel::<()>(); + + tokio::task::spawn(async move { + if let (WebSocketProtocol::GraphqlWs, Some(duration)) = (protocol, heartbeat_interval) { + let mut interval = + tokio::time::interval_at(tokio::time::Instant::now() + duration, duration); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut heartbeat_stream = IntervalStream::new(interval) + .map(|_| { + Ok(ClientMessage::Ping { + payload: Some(serde_json_bytes::Value::String( + "APOLLO_ROUTER_HEARTBEAT".into(), + )), + }) + }) + .take_until(close_sentinel); + if let Err(err) = sink.send_all(&mut heartbeat_stream).await { + tracing::trace!("cannot send heartbeat: {err:?}"); + if let Some(close_sentinel) = heartbeat_stream.take_future() { + if let Err(err) = close_sentinel.await { + tracing::trace!("cannot shutdown sink: {err:?}"); + } + } + } + } else if let Err(err) = close_sentinel.await { + tracing::trace!("cannot shutdown sink: {err:?}"); + }; + + tracing::info!( + monotonic_counter + .apollo + .router + .operations + .subscriptions + .events = 1u64, + subscriptions.mode = "passthrough", + subscriptions.complete = true + ); + + if let Err(err) = sink.close().await { + tracing::trace!("cannot close the websocket stream: {err:?}"); + } + }); + + Self { + inner_stream, + close_signal: Some(close_signal), + } + } +} + +impl Drop for SubscriptionStream { + fn drop(&mut self) { + if let Some(close_signal) = self.close_signal.take() { + if let Err(err) = close_signal.send(()) { + tracing::trace!("cannot close the websocket stream: {err:?}"); + } + } + } +} + +impl Stream for SubscriptionStream +where + S: Stream> + Sink + std::marker::Unpin, +{ + type Item = graphql::Response; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.inner_stream.poll_next_unpin(cx) + } +} + +pin_project! { +struct InnerStream { + #[pin] + stream: S, + id: String, + protocol: WebSocketProtocol, + // Booleans for state machine when closing the stream + completed: bool, + terminated: bool, +} +} + +impl InnerStream +where + S: Stream> + Sink + std::marker::Unpin, +{ + fn new(stream: S, id: String, protocol: WebSocketProtocol) -> Self { + Self { + stream, + id, + protocol, + completed: false, + terminated: false, + } + } +} + +impl Stream for InnerStream where S: Stream> + Sink, { @@ -392,9 +527,6 @@ where Poll::Ready(message) => match message { Some(server_message) => match server_message { Ok(server_message) => { - if let Some(heartbeat_interval) = this.heartbeat_interval { - heartbeat_interval.reset(); - } if let Some(id) = &server_message.id() { if this.id != id { tracing::error!("we should not receive data from other subscriptions, closing the stream"); @@ -431,33 +563,12 @@ where }, None => Poll::Ready(None), }, - Poll::Pending => { - if let Some(heartbeat_interval) = this.heartbeat_interval { - match heartbeat_interval.poll_tick(cx) { - Poll::Ready(_) => send_heartbeat(this.stream, cx), - Poll::Pending => (), - }; - } - Poll::Pending - } + Poll::Pending => Poll::Pending, } } } -fn send_heartbeat(mut stream: Pin<&mut impl Sink>, cx: &mut std::task::Context<'_>) { - if stream.as_mut().poll_flush(cx).map(|result| result.is_ok()) == Poll::Ready(true) - && stream.as_mut().poll_ready(cx).map(|result| result.is_ok()) == Poll::Ready(true) - { - // Ignore error - let _ = stream.start_send(ClientMessage::Ping { - payload: Some(serde_json_bytes::Value::String( - "APOLLO_ROUTER_HEARTBEAT".into(), - )), - }); - } -} - -impl Sink for GraphqlWebSocket +impl Sink for InnerStream where S: Stream> + Sink, { @@ -482,26 +593,15 @@ where }) } - fn start_send(self: Pin<&mut Self>, item: graphql::Request) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: ClientMessage) -> Result<(), Self::Error> { let mut this = self.project(); - tracing::info!( - monotonic_counter - .apollo - .router - .operations - .subscriptions - .events = 1u64, - subscriptions.mode = "passthrough" - ); - Pin::new(&mut this.stream) - .start_send(this.protocol.subscribe(this.id.to_string(), item)) - .map_err(|_err| { - graphql::Error::builder() - .message("cannot send to websocket connection") - .extension_code("WEBSOCKET_CONNECTION_ERROR") - .build() - }) + Pin::new(&mut this.stream).start_send(item).map_err(|_err| { + graphql::Error::builder() + .message("cannot send to websocket connection") + .extension_code("WEBSOCKET_CONNECTION_ERROR") + .build() + }) } fn poll_flush( @@ -521,16 +621,6 @@ where self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - tracing::info!( - monotonic_counter - .apollo - .router - .operations - .subscriptions - .events = 1u64, - subscriptions.mode = "passthrough", - subscriptions.complete = true - ); let mut this = self.project(); if !*this.completed { match Pin::new( @@ -586,7 +676,7 @@ mod tests { use axum::routing::get; use axum::Router; use axum::Server; - use futures::StreamExt; + use futures::FutureExt; use http::HeaderValue; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::client::IntoClientRequest; @@ -597,6 +687,7 @@ mod tests { async fn emulate_correct_websocket_server_new_protocol( send_ping: bool, + heartbeat_interval: Option, port: Option, ) -> SocketAddr { let ws_handler = move |ws: WebSocketUpgrade| async move { @@ -654,6 +745,26 @@ mod tests { .await .unwrap(); + if let Some(duration) = heartbeat_interval { + tokio::time::pause(); + assert!( + socket.next().now_or_never().is_none(), + "It should be no pending messages" + ); + + tokio::time::sleep(duration).await; + let ping_message = socket.next().await.unwrap().unwrap(); + assert_eq!(ping_message, AxumWsMessage::Text( + serde_json::to_string(&ClientMessage::Ping { payload: Some(serde_json_bytes::Value::String("APOLLO_ROUTER_HEARTBEAT".into())) }).unwrap(), + )); + + assert!( + socket.next().now_or_never().is_none(), + "It should be no pending messages" + ); + tokio::time::resume(); + } + socket .send(AxumWsMessage::Text( serde_json::to_string(&ServerMessage::Next { id: client_id.clone().unwrap(), payload: graphql::Response::builder().data(serde_json_bytes::json!({"userWasCreated": {"username": "ada_lovelace"}})).build() }).unwrap(), @@ -812,16 +923,27 @@ mod tests { #[tokio::test] async fn test_ws_connection_new_proto_with_ping() { - test_ws_connection_new_proto(true, None).await + test_ws_connection_new_proto(true, None, None).await } #[tokio::test] async fn test_ws_connection_new_proto_without_ping() { - test_ws_connection_new_proto(false, None).await + test_ws_connection_new_proto(false, None, None).await } - async fn test_ws_connection_new_proto(send_ping: bool, port: Option) { - let socket_addr = emulate_correct_websocket_server_new_protocol(send_ping, port).await; + #[tokio::test] + async fn test_ws_connection_new_proto_with_heartbeat() { + test_ws_connection_new_proto(false, Some(tokio::time::Duration::from_secs(60)), None).await + } + + async fn test_ws_connection_new_proto( + send_ping: bool, + heartbeat_interval: Option, + port: Option, + ) { + let socket_addr = + emulate_correct_websocket_server_new_protocol(send_ping, heartbeat_interval, port) + .await; let url = url::Url::parse(format!("ws://{}/ws", socket_addr).as_str()).unwrap(); let mut request = url.into_client_request().unwrap(); request.headers_mut().insert( @@ -831,26 +953,25 @@ mod tests { let (ws_stream, _resp) = connect_async(request).await.unwrap(); let sub_uuid = Uuid::new_v4(); - let gql_stream = GraphqlWebSocket::new( + let gql_socket = GraphqlWebSocket::new( convert_websocket_stream(ws_stream, sub_uuid.to_string()), sub_uuid.to_string(), WebSocketProtocol::GraphqlWs, Some(serde_json_bytes::json!({ "token": "XXX" })), - None, ) .await .unwrap(); let sub = "subscription {\n userWasCreated {\n username\n }\n}"; - let (mut gql_sink, mut gql_read_stream) = gql_stream.split(); - let _handle = tokio::task::spawn(async move { - gql_sink - .send(graphql::Request::builder().query(sub).build()) - .await - .unwrap(); - }); + let mut gql_read_stream = gql_socket + .into_subscription( + graphql::Request::builder().query(sub).build(), + heartbeat_interval, + ) + .await + .unwrap(); let next_payload = gql_read_stream.next().await.unwrap(); assert_eq!(next_payload, graphql::Response::builder() @@ -873,7 +994,7 @@ mod tests { .build() ); assert!( - gql_read_stream.next().await.is_none(), + gql_read_stream.next().now_or_never().is_none(), "It should be completed" ); } @@ -899,25 +1020,20 @@ mod tests { let (ws_stream, _resp) = connect_async(request).await.unwrap(); let sub_uuid = Uuid::new_v4(); - let gql_stream = GraphqlWebSocket::new( + let gql_socket = GraphqlWebSocket::new( convert_websocket_stream(ws_stream, sub_uuid.to_string()), sub_uuid.to_string(), WebSocketProtocol::SubscriptionsTransportWs, None, - None, ) .await .unwrap(); let sub = "subscription {\n userWasCreated {\n username\n }\n}"; - let (mut gql_sink, mut gql_read_stream) = gql_stream.split(); - let _handle = tokio::task::spawn(async move { - gql_sink - .send(graphql::Request::builder().query(sub).build()) - .await - .unwrap(); - gql_sink.close().await.unwrap(); - }); + let mut gql_read_stream = gql_socket + .into_subscription(graphql::Request::builder().query(sub).build(), None) + .await + .unwrap(); let next_payload = gql_read_stream.next().await.unwrap(); assert_eq!(next_payload, graphql::Response::builder() @@ -940,7 +1056,7 @@ mod tests { .build() ); assert!( - gql_read_stream.next().await.is_none(), + gql_read_stream.next().now_or_never().is_none(), "It should be completed" ); } diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 3f9dfe09f1..048541c32e 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -8,7 +8,6 @@ use std::task::Poll; use bytes::Bytes; use futures::future::BoxFuture; -use futures::SinkExt; use futures::StreamExt; use futures::TryFutureExt; use http::header::ACCEPT; @@ -571,12 +570,11 @@ async fn call_websocket( ); } - let mut gql_stream = GraphqlWebSocket::new( + let gql_socket = GraphqlWebSocket::new( convert_websocket_stream(ws_stream, subscription_hash.clone()), subscription_hash, subgraph_cfg.protocol, connection_params, - subgraph_cfg.heartbeat_interval.into_option(), ) .await .map_err(|_| FetchError::SubrequestWsError { @@ -584,14 +582,14 @@ async fn call_websocket( reason: "cannot get the GraphQL websocket stream".to_string(), })?; - gql_stream - .send(body) + let gql_stream = gql_socket + .into_subscription(body, subgraph_cfg.heartbeat_interval.into_option()) .await .map_err(|err| FetchError::SubrequestWsError { service: service_name, reason: format!("cannot send the subgraph request to websocket stream: {err:?}"), })?; - let (mut gql_sink, gql_stream) = gql_stream.split(); + let (handle_sink, handle_stream) = handle.split(); tokio::task::spawn(async move { @@ -599,10 +597,6 @@ async fn call_websocket( .map(Ok::<_, graphql::Error>) .forward(handle_sink) .await; - - if let Err(err) = gql_sink.close().await { - tracing::trace!("cannot close the websocket stream: {err:?}"); - } }); subscription_stream_tx.send(Box::pin(handle_stream)).await?; From e3976f7930e0cdaf499d961fd3bf7675ff329b21 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Wed, 20 Mar 2024 09:17:06 +0200 Subject: [PATCH 07/10] address review comments --- apollo-router/src/protocols/websocket.rs | 6 +++--- docs/source/executing-operations/subscription-support.mdx | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apollo-router/src/protocols/websocket.rs b/apollo-router/src/protocols/websocket.rs index d254a5ecbb..4e68e3bd6d 100644 --- a/apollo-router/src/protocols/websocket.rs +++ b/apollo-router/src/protocols/websocket.rs @@ -521,9 +521,8 @@ where cx: &mut std::task::Context<'_>, ) -> Poll> { let mut this = self.as_mut().project(); - let mut stream = Pin::new(&mut this.stream); - match stream.as_mut().poll_next(cx) { + match Pin::new(&mut this.stream).poll_next(cx) { Poll::Ready(message) => match message { Some(server_message) => match server_message { Ok(server_message) => { @@ -536,7 +535,8 @@ where if let ServerMessage::Ping { .. } = server_message { // Send pong asynchronously let _ = Pin::new( - &mut stream.as_mut().send(ClientMessage::Pong { payload: None }), + &mut Pin::new(&mut this.stream) + .send(ClientMessage::Pong { payload: None }), ) .poll(cx); } diff --git a/docs/source/executing-operations/subscription-support.mdx b/docs/source/executing-operations/subscription-support.mdx index 3bcfb5b08c..c5ecef7395 100644 --- a/docs/source/executing-operations/subscription-support.mdx +++ b/docs/source/executing-operations/subscription-support.mdx @@ -101,7 +101,7 @@ subscription: reviews: # Overrides settings for the 'reviews' subgraph path: /ws # Absolute path that overrides '/subscriptions' defined above protocol: graphql_ws # The WebSocket-based subprotocol to use for subscription communication (Default: graphql_ws) - heartbeat_interval: enable # Optional (Default: disabled) + heartbeat_interval: 10s # Optional, with 'enable' equal to '5s' (Default: disabled) ``` This example enables subscriptions in **passthrough mode**, which uses long-lived WebSocket connections. From 4d80f74e4f2a4c5956293cd0fb5d8c16beca6daf Mon Sep 17 00:00:00 2001 From: Jesse Rosenberger Date: Wed, 20 Mar 2024 12:25:10 +0200 Subject: [PATCH 08/10] Update .changesets/feat_subscription_websocket_heartbeat.md --- .changesets/feat_subscription_websocket_heartbeat.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.changesets/feat_subscription_websocket_heartbeat.md b/.changesets/feat_subscription_websocket_heartbeat.md index 440bb16c95..a026abcedc 100644 --- a/.changesets/feat_subscription_websocket_heartbeat.md +++ b/.changesets/feat_subscription_websocket_heartbeat.md @@ -1,9 +1,8 @@ -### feat(subscription): add configurable heartbeat for websocket protocol ([Issue #4621](https://github.com/apollographql/router/issues/4621)) +### Subscriptions: Add configurable "heartbeat" to subgraph WebSocket protocol ([Issue #4621](https://github.com/apollographql/router/issues/4621)) -Add the ability to enable heartbeat for cases where the subgraph drops idle connections. -For example, https://netflix.github.io/dgs/ +To account for GraphQL Subscription WebSocket implementations (e.g., [DGS](https://netflix.github.io/dgs/)) which drop idle connections by design, the router adds the ability to configure a heartbeat to keep active connections alive. -Example of configuration: +An example of configuration: ```yaml subscription: From cd30d12169030e3a93df2a6b2d17771e4f0d7ff8 Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Wed, 20 Mar 2024 23:08:53 +0200 Subject: [PATCH 09/10] Improve docs --- apollo-router/src/plugins/subscription.rs | 2 ++ docs/source/executing-operations/subscription-support.mdx | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/apollo-router/src/plugins/subscription.rs b/apollo-router/src/plugins/subscription.rs index 3c3ae74f90..1662e16cc9 100644 --- a/apollo-router/src/plugins/subscription.rs +++ b/apollo-router/src/plugins/subscription.rs @@ -171,9 +171,11 @@ pub(crate) struct CallbackMode { #[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "snake_case", untagged)] pub(crate) enum HeartbeatInterval { + /// disable heartbeat Disabled(Disabled), /// enable with default interval of 5s Enabled(Enabled), + /// enable with custom interval, e.g. '100ms', '10s' or '1m' #[serde(with = "humantime_serde")] #[schemars(with = "String")] Duration(Duration), diff --git a/docs/source/executing-operations/subscription-support.mdx b/docs/source/executing-operations/subscription-support.mdx index c5ecef7395..2f2fe92d9c 100644 --- a/docs/source/executing-operations/subscription-support.mdx +++ b/docs/source/executing-operations/subscription-support.mdx @@ -101,7 +101,7 @@ subscription: reviews: # Overrides settings for the 'reviews' subgraph path: /ws # Absolute path that overrides '/subscriptions' defined above protocol: graphql_ws # The WebSocket-based subprotocol to use for subscription communication (Default: graphql_ws) - heartbeat_interval: 10s # Optional, with 'enable' equal to '5s' (Default: disabled) + heartbeat_interval: 10s # Optional and 'disable' by default, also supports 'enable' (set 5s interval) and custom values for intervals, e.g. '100ms', '10s', '1m'. ``` This example enables subscriptions in **passthrough mode**, which uses long-lived WebSocket connections. From 62acd790134ab46614bb6b4a0b4e511e5728c6ce Mon Sep 17 00:00:00 2001 From: Ivan Goncharov Date: Wed, 20 Mar 2024 23:27:38 +0200 Subject: [PATCH 10/10] update snapshot --- ...llo_router__configuration__tests__schema_generation.snap | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index d3c897cd2f..7012246d6f 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -2407,6 +2407,7 @@ expression: "&schema" "default": "enabled", "anyOf": [ { + "description": "disable heartbeat", "type": "string", "enum": [ "disabled" @@ -2420,6 +2421,7 @@ expression: "&schema" ] }, { + "description": "enable with custom interval, e.g. '100ms', '10s' or '1m'", "type": "string" } ] @@ -2476,6 +2478,7 @@ expression: "&schema" "default": "disabled", "anyOf": [ { + "description": "disable heartbeat", "type": "string", "enum": [ "disabled" @@ -2489,6 +2492,7 @@ expression: "&schema" ] }, { + "description": "enable with custom interval, e.g. '100ms', '10s' or '1m'", "type": "string" } ] @@ -2525,6 +2529,7 @@ expression: "&schema" "default": "disabled", "anyOf": [ { + "description": "disable heartbeat", "type": "string", "enum": [ "disabled" @@ -2538,6 +2543,7 @@ expression: "&schema" ] }, { + "description": "enable with custom interval, e.g. '100ms', '10s' or '1m'", "type": "string" } ]