Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscription): add configurable heartbeat for websocket protocol #4802

Merged
merged 11 commits into from Mar 20, 2024
17 changes: 17 additions & 0 deletions .changesets/feat_subscription_websocket_heartbeat.md
@@ -0,0 +1,17 @@
### feat(subscription): add configurable heartbeat for websocket protocol ([Issue #4621](https://github.com/apollographql/router/issues/4621))

Add the ability to enable heartbeat for cases where the subgraph drops idle connections.
For example, https://netflix.github.io/dgs/

Example of configuration:
abernix marked this conversation as resolved.
Show resolved Hide resolved

```yaml
subscription:
mode:
passthrough:
all:
path: /graphql
heartbeat_interval: enable #Optional
IvanGoncharov marked this conversation as resolved.
Show resolved Hide resolved
```

By [@IvanGoncharov](https://github.com/IvanGoncharov) in https://github.com/apollographql/router/pull/4802
Expand Up @@ -2404,14 +2404,21 @@ expression: "&schema"
"properties": {
"heartbeat_interval": {
"description": "Heartbeat interval for callback mode (default: 5secs)",
"default": "5s",
"default": "enabled",
"anyOf": [
{
"type": "string",
"enum": [
"disabled"
]
},
{
"description": "enable with default interval of 5s",
"type": "string",
"enum": [
"enabled"
]
},
{
"type": "string"
}
Expand Down Expand Up @@ -2464,6 +2471,28 @@ expression: "&schema"
"default": null,
"type": "object",
"properties": {
"heartbeat_interval": {
"description": "Heartbeat interval for graphql-ws protocol (default: disabled)",
"default": "disabled",
"anyOf": [
{
"type": "string",
"enum": [
"disabled"
]
},
{
"description": "enable with default interval of 5s",
"type": "string",
"enum": [
"enabled"
]
},
{
"type": "string"
}
]
},
"path": {
"description": "Path on which WebSockets are listening",
"default": null,
Expand Down Expand Up @@ -2491,6 +2520,28 @@ expression: "&schema"
"description": "WebSocket configuration for a specific subgraph",
"type": "object",
"properties": {
"heartbeat_interval": {
"description": "Heartbeat interval for graphql-ws protocol (default: disabled)",
"default": "disabled",
"anyOf": [
{
"type": "string",
"enum": [
"disabled"
]
},
{
"description": "enable with default interval of 5s",
"type": "string",
"enum": [
"enabled"
]
},
{
"type": "string"
}
]
},
"path": {
"description": "Path on which WebSockets are listening",
"default": null,
Expand Down
69 changes: 44 additions & 25 deletions apollo-router/src/plugins/subscription.rs
Expand Up @@ -112,7 +112,7 @@ impl SubscriptionModeConfig {
if callback_cfg.subgraphs.contains(service_name) || callback_cfg.subgraphs.is_empty() {
let callback_cfg = CallbackMode {
public_url: callback_cfg.public_url.clone(),
heartbeat_interval: callback_cfg.heartbeat_interval.clone(),
heartbeat_interval: callback_cfg.heartbeat_interval,
listen: callback_cfg.listen.clone(),
path: callback_cfg.path.clone(),
subgraphs: HashSet::new(), // We don't need it
Expand Down Expand Up @@ -151,7 +151,7 @@ pub(crate) struct CallbackMode {
pub(crate) public_url: url::Url,

/// Heartbeat interval for callback mode (default: 5secs)
#[serde(default = "HeartbeatInterval::default")]
#[serde(default = "HeartbeatInterval::new_enabled")]
pub(crate) heartbeat_interval: HeartbeatInterval,
// `skip_serializing` We don't need it in the context
/// Listen address on which the callback must listen (default: 127.0.0.1:4000)
Expand All @@ -168,25 +168,43 @@ pub(crate) struct CallbackMode {
pub(crate) subgraphs: HashSet<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case", untagged)]
pub(crate) enum HeartbeatInterval {
Disabled(Disabled),
IvanGoncharov marked this conversation as resolved.
Show resolved Hide resolved
/// enable with default interval of 5s
Enabled(Enabled),
#[serde(with = "humantime_serde")]
#[schemars(with = "String")]
Duration(Duration),
IvanGoncharov marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
impl HeartbeatInterval {
pub(crate) fn new_enabled() -> Self {
Self::Enabled(Enabled::Enabled)
}
pub(crate) fn new_disabled() -> Self {
Self::Disabled(Disabled::Disabled)
}
pub(crate) fn into_option(self) -> Option<Duration> {
match self {
Self::Disabled(_) => None,
Self::Enabled(_) => Some(Duration::from_secs(5)),
Self::Duration(duration) => Some(duration),
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub(crate) enum Disabled {
Disabled,
}

impl Default for HeartbeatInterval {
fn default() -> Self {
Self::Duration(Duration::from_secs(5))
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub(crate) enum Enabled {
Enabled,
}

/// Using websocket to directly connect to subgraph
Expand All @@ -197,14 +215,19 @@ pub(crate) struct PassthroughMode {
subgraph: SubgraphPassthroughMode,
}

#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields)]
/// WebSocket configuration for a specific subgraph
pub(crate) struct WebSocketConfiguration {
/// Path on which WebSockets are listening
#[serde(default)]
pub(crate) path: Option<String>,
/// Which WebSocket GraphQL protocol to use for this subgraph possible values are: 'graphql_ws' | 'graphql_transport_ws' (default: graphql_ws)
#[serde(default)]
pub(crate) protocol: WebSocketProtocol,
/// Heartbeat interval for graphql-ws protocol (default: disabled)
#[serde(default = "HeartbeatInterval::new_disabled")]
pub(crate) heartbeat_interval: HeartbeatInterval,
}

fn default_path() -> String {
Expand All @@ -228,21 +251,17 @@ impl Plugin for Subscription {
.clone(),
);
#[cfg(not(test))]
match init
.config
.mode
.callback
.as_ref()
.expect("we checked in the condition the callback conf")
.heartbeat_interval
{
HeartbeatInterval::Duration(duration) => {
init.notify.set_ttl(Some(duration)).await?;
}
HeartbeatInterval::Disabled(_) => {
init.notify.set_ttl(None).await?;
}
}
init.notify
.set_ttl(
init.config
.mode
.callback
.as_ref()
.expect("we checked in the condition the callback conf")
.heartbeat_interval
.into_option(),
)
.await?;
}

Ok(Subscription {
Expand Down