Skip to content

Commit

Permalink
feat(subscription): add configurable heartbeat for websocket protocol (
Browse files Browse the repository at this point in the history
…#4802)

Add the ability to enable heartbeat for cases where the subgraph drops
idle connections.
For example, https://netflix.github.io/dgs/

Example of configuration:

```yaml
subscription:
  mode:
    passthrough:
      all:
        path: /graphql
        heartbeat_interval: enable #Optional
 ```

Fixes #4621

<!-- start metadata -->
---

**Checklist**

Complete the checklist (and note appropriate exceptions) before the PR is marked ready-for-review.

- [x] Changes are compatible[^1]
- [x] Documentation[^2] completed
- [x] Performance impact assessed and acceptable
- Tests added and passing[^3]
    - [x] Unit Tests
    - [ ] Integration Tests
    - [x] Manual Tests

**Exceptions**

*Note any exceptions here*

**Notes**

[^1]: It may be appropriate to bring upcoming changes to the attention of other (impacted) groups. Please endeavour to do this before seeking PR approval. The mechanism for doing this will vary considerably, so use your judgement as to how and when to do this.
[^2]: Configuration is an important part of many changes. Where applicable please try to document configuration examples.
[^3]: Tick whichever testing boxes are applicable. If you are adding Manual Tests, please document the manual testing (extensively) in the Exceptions.

---------

Co-authored-by: Jesse Rosenberger <git@jro.cc>
  • Loading branch information
IvanGoncharov and abernix committed Mar 20, 2024
1 parent cb57bc1 commit 3400cf3
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 108 deletions.
16 changes: 16 additions & 0 deletions .changesets/feat_subscription_websocket_heartbeat.md
@@ -0,0 +1,16 @@
### Subscriptions: Add configurable "heartbeat" to subgraph WebSocket protocol ([Issue #4621](https://github.com/apollographql/router/issues/4621))

To account for GraphQL Subscription WebSocket implementations (e.g., [DGS](https://netflix.github.io/dgs/)) which drop idle connections by design, the router adds the ability to configure a heartbeat to keep active connections alive.

An example of configuration:

```yaml
subscription:
mode:
passthrough:
all:
path: /graphql
heartbeat_interval: enable #Optional
```

By [@IvanGoncharov](https://github.com/IvanGoncharov) in https://github.com/apollographql/router/pull/4802
Expand Up @@ -2404,15 +2404,24 @@ expression: "&schema"
"properties": {
"heartbeat_interval": {
"description": "Heartbeat interval for callback mode (default: 5secs)",
"default": "5s",
"default": "enabled",
"anyOf": [
{
"description": "disable heartbeat",
"type": "string",
"enum": [
"disabled"
]
},
{
"description": "enable with default interval of 5s",
"type": "string",
"enum": [
"enabled"
]
},
{
"description": "enable with custom interval, e.g. '100ms', '10s' or '1m'",
"type": "string"
}
]
Expand Down Expand Up @@ -2464,6 +2473,30 @@ expression: "&schema"
"default": null,
"type": "object",
"properties": {
"heartbeat_interval": {
"description": "Heartbeat interval for graphql-ws protocol (default: disabled)",
"default": "disabled",
"anyOf": [
{
"description": "disable heartbeat",
"type": "string",
"enum": [
"disabled"
]
},
{
"description": "enable with default interval of 5s",
"type": "string",
"enum": [
"enabled"
]
},
{
"description": "enable with custom interval, e.g. '100ms', '10s' or '1m'",
"type": "string"
}
]
},
"path": {
"description": "Path on which WebSockets are listening",
"default": null,
Expand Down Expand Up @@ -2491,6 +2524,30 @@ expression: "&schema"
"description": "WebSocket configuration for a specific subgraph",
"type": "object",
"properties": {
"heartbeat_interval": {
"description": "Heartbeat interval for graphql-ws protocol (default: disabled)",
"default": "disabled",
"anyOf": [
{
"description": "disable heartbeat",
"type": "string",
"enum": [
"disabled"
]
},
{
"description": "enable with default interval of 5s",
"type": "string",
"enum": [
"enabled"
]
},
{
"description": "enable with custom interval, e.g. '100ms', '10s' or '1m'",
"type": "string"
}
]
},
"path": {
"description": "Path on which WebSockets are listening",
"default": null,
Expand Down
71 changes: 46 additions & 25 deletions apollo-router/src/plugins/subscription.rs
Expand Up @@ -112,7 +112,7 @@ impl SubscriptionModeConfig {
if callback_cfg.subgraphs.contains(service_name) || callback_cfg.subgraphs.is_empty() {
let callback_cfg = CallbackMode {
public_url: callback_cfg.public_url.clone(),
heartbeat_interval: callback_cfg.heartbeat_interval.clone(),
heartbeat_interval: callback_cfg.heartbeat_interval,
listen: callback_cfg.listen.clone(),
path: callback_cfg.path.clone(),
subgraphs: HashSet::new(), // We don't need it
Expand Down Expand Up @@ -151,7 +151,7 @@ pub(crate) struct CallbackMode {
pub(crate) public_url: url::Url,

/// Heartbeat interval for callback mode (default: 5secs)
#[serde(default = "HeartbeatInterval::default")]
#[serde(default = "HeartbeatInterval::new_enabled")]
pub(crate) heartbeat_interval: HeartbeatInterval,
// `skip_serializing` We don't need it in the context
/// Listen address on which the callback must listen (default: 127.0.0.1:4000)
Expand All @@ -168,25 +168,45 @@ pub(crate) struct CallbackMode {
pub(crate) subgraphs: HashSet<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case", untagged)]
pub(crate) enum HeartbeatInterval {
/// disable heartbeat
Disabled(Disabled),
/// enable with default interval of 5s
Enabled(Enabled),
/// enable with custom interval, e.g. '100ms', '10s' or '1m'
#[serde(with = "humantime_serde")]
#[schemars(with = "String")]
Duration(Duration),
}

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
impl HeartbeatInterval {
pub(crate) fn new_enabled() -> Self {
Self::Enabled(Enabled::Enabled)
}
pub(crate) fn new_disabled() -> Self {
Self::Disabled(Disabled::Disabled)
}
pub(crate) fn into_option(self) -> Option<Duration> {
match self {
Self::Disabled(_) => None,
Self::Enabled(_) => Some(Duration::from_secs(5)),
Self::Duration(duration) => Some(duration),
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub(crate) enum Disabled {
Disabled,
}

impl Default for HeartbeatInterval {
fn default() -> Self {
Self::Duration(Duration::from_secs(5))
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub(crate) enum Enabled {
Enabled,
}

/// Using websocket to directly connect to subgraph
Expand All @@ -197,14 +217,19 @@ pub(crate) struct PassthroughMode {
subgraph: SubgraphPassthroughMode,
}

#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields)]
/// WebSocket configuration for a specific subgraph
pub(crate) struct WebSocketConfiguration {
/// Path on which WebSockets are listening
#[serde(default)]
pub(crate) path: Option<String>,
/// Which WebSocket GraphQL protocol to use for this subgraph possible values are: 'graphql_ws' | 'graphql_transport_ws' (default: graphql_ws)
#[serde(default)]
pub(crate) protocol: WebSocketProtocol,
/// Heartbeat interval for graphql-ws protocol (default: disabled)
#[serde(default = "HeartbeatInterval::new_disabled")]
pub(crate) heartbeat_interval: HeartbeatInterval,
}

fn default_path() -> String {
Expand All @@ -228,21 +253,17 @@ impl Plugin for Subscription {
.clone(),
);
#[cfg(not(test))]
match init
.config
.mode
.callback
.as_ref()
.expect("we checked in the condition the callback conf")
.heartbeat_interval
{
HeartbeatInterval::Duration(duration) => {
init.notify.set_ttl(Some(duration)).await?;
}
HeartbeatInterval::Disabled(_) => {
init.notify.set_ttl(None).await?;
}
}
init.notify
.set_ttl(
init.config
.mode
.callback
.as_ref()
.expect("we checked in the condition the callback conf")
.heartbeat_interval
.into_option(),
)
.await?;
}

Ok(Subscription {
Expand Down

0 comments on commit 3400cf3

Please sign in to comment.