Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add token bucket example to Semaphore #5978

Merged
merged 10 commits into from Sep 23, 2023
83 changes: 83 additions & 0 deletions tokio/src/sync/semaphore.rs
Expand Up @@ -76,6 +76,89 @@ use std::sync::Arc;
/// }
/// ```
///
/// Implement a simple token bucket for rate limiting
///
/// Many applications and systems have constraints on the rate at which certain
/// operations should occur. Exceeding this rate can result in suboptimal
/// performance or even errors.
///
/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
/// arrive at the same time.
///
/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
/// wait for new tokens to be added.
///
/// Unlike the example that limits how many requests can be handled at the same time, we do not add
/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
///
/// Note that this implementation is suboptimal when the duration is small, because it consumes a
/// lot of cpu constantly looping and sleeping.
///
/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
/// ```
/// use std::sync::Arc;
/// use tokio::sync::{AcquireError, Semaphore};
/// use tokio::time::{interval, Duration};
///
/// struct TokenBucket {
/// sem: Arc<Semaphore>,
/// jh: tokio::task::JoinHandle<()>,
/// }
///
/// impl TokenBucket {
/// fn new(duration: Duration, capacity: usize) -> Self {
/// let sem = Arc::new(Semaphore::new(capacity));
///
/// // refills the tokens at the end of each interval
/// let jh = tokio::spawn({
/// let sem = sem.clone();
/// let mut interval = interval(duration);
/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
///
/// async move {
/// loop {
/// interval.tick().await;
///
/// if sem.available_permits() < capacity {
/// sem.add_permits(1);
/// }
/// }
/// }
/// });
///
/// Self { jh, sem }
/// }
///
/// async fn acquire(&self) -> Result<(), AcquireError> {
/// self.sem.acquire().await.map(|p| p.forget())
/// }
///
/// async fn close(self) {
/// self.sem.close();
/// self.jh.abort();
/// let _ = self.jh.await;
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let capacity = 5; // operation per second
/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
/// let bucket = TokenBucket::new(update_interval, capacity);
///
/// for _ in 0..5 {
/// bucket.acquire().await.unwrap();
///
/// // do the operation
/// }
///
/// bucket.close().await;
/// }
/// ```
///
/// Limit the number of incoming requests being handled at the same time.
///
/// Similar to limiting the number of simultaneously opened files, network handles
Expand Down