Skip to content

Commit

Permalink
Merge pull request #443 from Appva/chore/runtime-docs-cleanup
Browse files Browse the repository at this point in the history
Clean up runtime docs a bit
  • Loading branch information
clux committed Feb 28, 2021
2 parents 97df991 + 726617d commit 11f60c7
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 31 deletions.
5 changes: 3 additions & 2 deletions kube-runtime/src/controller/future_hash_map.rs
Expand Up @@ -6,8 +6,9 @@ use std::{
task::{Context, Poll},
};

/// Variant of [`tokio::stream::StreamMap`] that only runs [`Future`]s, and uses a [`HashMap`] as
/// the backing store, giving O(1) insertion and membership checks.
/// Variant of [`tokio_stream::StreamMap`](https://docs.rs/tokio-stream/0.1.3/tokio_stream/struct.StreamMap.html)
/// that only runs [`Future`]s, and uses a [`HashMap`] as the backing store, giving (amortized) O(1) insertion
/// and membership checks.
///
/// Just like for `StreamMap`'s `S`, `F` must be [`Unpin`], since [`HashMap`] is free to move
/// entries as it pleases (for example: resizing the backing array).
Expand Down
16 changes: 10 additions & 6 deletions kube-runtime/src/controller/mod.rs
@@ -1,3 +1,5 @@
//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated

use self::runner::Runner;
use crate::{
reflector::{
Expand Down Expand Up @@ -55,7 +57,7 @@ pub struct ReconcilerAction {
pub requeue_after: Option<Duration>,
}

/// Helper for building custom trigger filters, see [`trigger_self`] and [`trigger_owners`] for some examples.
/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
Expand Down Expand Up @@ -127,14 +129,15 @@ impl<T> Context<T> {

/// Apply a reconciler to an input stream, with a given retry policy
///
/// Takes a `store` parameter for the main object which should be updated by a `reflector`.
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector`].
///
/// The `queue` is a source of external events that trigger the reconciler,
/// usually taken from a `reflector` and then passed through a trigger function such as
/// [`trigger_self`].
/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
/// the [`reflector`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector`]
/// with a [`watcher`](watcher()) or [`reflector`](reflector()) for the subobject.
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
/// (such as triggering from arbitrary `Stream`s), at the cost of some more verbosity.
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
Expand Down Expand Up @@ -290,6 +293,7 @@ where
///
/// Configure `ListParams` and `Api` so you only get reconcile events
/// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset
#[must_use]
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self {
let writer = Writer::<K>::default();
let reader = writer.as_reader();
Expand Down
8 changes: 6 additions & 2 deletions kube-runtime/src/controller/runner.rs
Expand Up @@ -8,8 +8,12 @@ use std::{
task::{Context, Poll},
};

/// Pulls messages from a [`Scheduler`], and runs an action for each message in parallel,
/// while making sure to not run the same message multiple times at once.
/// Pulls items from a [`Scheduler`], and runs an action for each item in parallel,
/// while making sure to not process [equal](`Eq`) items multiple times at once.
///
/// If an item is to be emitted from the [`Scheduler`] while an equal item is
/// already being processed then it will be held pending until the current item
/// is finished.
#[pin_project]
pub struct Runner<T, R, F, MkF> {
#[pin]
Expand Down
7 changes: 4 additions & 3 deletions kube-runtime/src/lib.rs
@@ -1,11 +1,12 @@
//! Crate with kubernetes runtime components
//! Common components for building Kubernetes operators
//!
//! This crate contains the core building blocks to allow users to build
//! controllers/operators/watchers that need to synchronize/reconcile kubernetes
//! state.
//!
//! Newcomers should generally get started with the [`Controller`] builder, which manages
//! all state internals for you.
//! Newcomers are recommended to start with the [`Controller`] builder, which gives an
//! opinionated starting point that should be appropriate for simple operators, but all
//! components are designed to be usable á la carte if your operator doesn't quite fit that mold.

#![deny(unsafe_code)]
#![deny(clippy::all)]
Expand Down
2 changes: 2 additions & 0 deletions kube-runtime/src/reflector/mod.rs
@@ -1,3 +1,5 @@
//! Caches objects in memory

mod object_ref;
pub mod store;

Expand Down
18 changes: 11 additions & 7 deletions kube-runtime/src/scheduler.rs
@@ -1,3 +1,5 @@
//! Delays and deduplicates [`Stream`] items

use futures::{
stream::{Fuse, FusedStream},
Stream, StreamExt,
Expand Down Expand Up @@ -113,9 +115,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
);
if can_take_message(&msg) {
break Poll::Ready(Some(Ok(msg)));
} else {
self.pending.insert(msg);
}
self.pending.insert(msg);
}
Poll::Ready(Some(Err(err))) => break Poll::Ready(Some(Err(err))),
Poll::Ready(None) => {
Expand Down Expand Up @@ -184,7 +185,7 @@ where
/// no messages will be lost, even if it is reconstructed on each call to [`poll_next`](Self::poll_next).
/// In fact, this is often desirable, to avoid long-lived borrows in `can_take_message`'s closure.
///
/// NOTE: `can_take_message` should be considered fairly performance-sensitive, since
/// NOTE: `can_take_message` should be considered to be fairly performance-sensitive, since
/// it will generally be executed for each pending message, for each [`poll_next`](Self::poll_next).
pub fn hold_unless<C: Fn(&T) -> bool>(self: Pin<&mut Self>, can_take_message: C) -> HoldUnless<T, R, C> {
HoldUnless {
Expand Down Expand Up @@ -212,11 +213,14 @@ where
}
}

/// Stream transformer that takes a message and `Instant` (in the form of a `ScheduleRequest`), and emits
/// the message at the specified `Instant`.
/// Stream transformer that delays and deduplicates [`Stream`] items.
///
/// Items are deduplicated: if an item is submitted multiple times before being emitted then it will only be
/// emitted at the earliest `Instant`.
///
/// Objects are de-duplicated: if a message is submitted twice before being emitted then it will only be
/// emitted at the earlier of the two `Instant`s.
/// Items can be "held pending" if the item doesn't match some predicate. Items trying to schedule an item
/// that is already pending will be discarded (since it is already going to be emitted as soon as the consumer
/// is ready for it).
pub fn scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(requests: S) -> Scheduler<T, S> {
Scheduler::new(requests)
}
Expand Down
40 changes: 29 additions & 11 deletions kube-runtime/src/watcher.rs
@@ -1,3 +1,5 @@
//! Watches a Kubernetes Resource for changes, with error recovery

use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube::{
Expand Down Expand Up @@ -35,18 +37,21 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Clone)]
/// Watch events returned from the `Watcher`
/// Watch events returned from the [`watcher`]
pub enum Event<K> {
/// A resource was added or modified
/// An object was added or modified
Applied(K),
/// A resource was deleted
/// An object was deleted
///
/// NOTE: This should not be used for managing persistent state elsewhere, since
/// events may be lost if the watcher is unavailable. Use Finalizers instead.
Deleted(K),
/// The watch stream was restarted, so `Deleted` events may have been missed
///
/// Should be used as a signal to replace the store contents atomically.
///
/// Any objects that were previously [`Applied`](Event::Applied) but are not listed in this event
/// should be assumed to have been [`Deleted`](Event::Deleted).
Restarted(Vec<K>),
}

Expand Down Expand Up @@ -80,9 +85,9 @@ impl<K> Event<K> {

#[derive(Derivative)]
#[derivative(Debug)]
/// The internal finite state machine driving the [`Watcher`](struct.Watcher.html)
/// The internal finite state machine driving the [`watcher`]
enum State<K: Meta + Clone> {
/// The Watcher is empty, and the next poll() will start the initial LIST to get all existing objects
/// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
Empty,
/// The initial LIST was successful, so we should move on to starting the actual watch.
InitListed { resource_version: String },
Expand Down Expand Up @@ -183,11 +188,13 @@ async fn step<K: Meta + Clone + DeserializeOwned + Send + 'static>(

/// Watches a Kubernetes Resource for changes continuously
///
/// Creates an indefinite read stream through continual [`Api::watch`] calls, and keeping track
/// of [returned resource versions](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes).
/// It tries to recover (by reconnecting and resyncing as required) if polled again after an error.
/// However, keep in mind that most terminal `TryStream` combinators (such as `TryFutureExt::try_for_each`
/// and `TryFutureExt::try_concat` will terminate eagerly if an `Error` reaches them.
/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
///
/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
/// You can apply your own backoff by not polling the stream for a duration after errors.
/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`],
/// direct users may want to flatten composite events with [`try_flatten_applied`]:
Expand Down Expand Up @@ -217,9 +224,20 @@ async fn step<K: Meta + Clone + DeserializeOwned + Send + 'static>(
///
/// # Migration from `kube::runtime`
///
/// This is similar to the legacy `kube::runtime::Informer`, or the watching half of client-go's `Reflector`.
/// This is similar to the legacy [`kube::runtime::Informer`], or the watching half of client-go's `Reflector`.
/// Renamed to avoid confusion with client-go's `Informer` (which watches a `Reflector` for updates, rather
/// the Kubernetes API).
///
/// # Recovery
///
/// (The details of recovery are considered an implementation detail and should not be relied on to be stable, but are
/// documented here for posterity.)
///
/// If the watch connection is interrupted then we attempt to restart the watch using the last
/// [resource versions](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
/// an [`Event::Restarted`].
pub fn watcher<K: Meta + Clone + DeserializeOwned + Send + 'static>(
api: Api<K>,
list_params: ListParams,
Expand Down

0 comments on commit 11f60c7

Please sign in to comment.