Skip to content

Commit

Permalink
Remove .with_current_subscriber() calls
Browse files Browse the repository at this point in the history
Those calls were introduced in e557095,
with message:
```
Now, when the driver spawns a task to run a new future on it, that
future will use the same subscriber as the code that spawned the task in
the first place.
```

There is unfortunately no explanation about when it is necessary.
I don't see any problems after removing those - logs are still collected
correctly using a tracing subscriber.
Those calls however have a harmful side-effect: they prevent usage of
`log` loggers to listen to driver logs using `log` feature in `tracing`
crate. After reporting the problem to `tracing` crate:
tokio-rs/tracing#2913
one of maintainers said that using `.with_current_subscriber()` in
a library is not necessary in general.

As I don't see any issue caused by removing these calls, but their
existence cause an issue, they are removed in this commit.
  • Loading branch information
Lorak-mmk committed May 14, 2024
1 parent 1d78cbe commit 5a50a57
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 30 deletions.
3 changes: 1 addition & 2 deletions scylla-proxy/examples/cmdline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::{
};

use scylla_proxy::{Node, Proxy, ShardAwareness};
use tracing::instrument::WithSubscriber;

fn init_logger() {
tracing_subscriber::fmt::fmt()
Expand Down Expand Up @@ -53,7 +52,7 @@ async fn main() {
None,
None,
)]);
let running_proxy = proxy.run().with_current_subscriber().await.unwrap();
let running_proxy = proxy.run().await.unwrap();

pause().await;
running_proxy.finish().await.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions scylla-proxy/examples/identity_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{net::SocketAddr, str::FromStr};

use scylla_proxy::{Node, Proxy, ShardAwareness};
use tracing::instrument::WithSubscriber;

fn init_logger() {
tracing_subscriber::fmt::fmt()
Expand Down Expand Up @@ -30,7 +29,7 @@ async fn main() {
.build(),
)
.build();
let running_proxy = proxy.run().with_current_subscriber().await.unwrap();
let running_proxy = proxy.run().await.unwrap();

pause().await;
running_proxy.finish().await.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions scylla-proxy/examples/identity_shard_aware_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{net::SocketAddr, str::FromStr};

use scylla_proxy::{Node, Proxy, ShardAwareness};
use tracing::instrument::WithSubscriber;

fn init_logger() {
tracing_subscriber::fmt::fmt()
Expand All @@ -27,7 +26,7 @@ async fn main() {
None,
None,
)]);
let running_proxy = proxy.run().with_current_subscriber().await.unwrap();
let running_proxy = proxy.run().await.unwrap();

pause().await;
running_proxy.finish().await.unwrap();
Expand Down
5 changes: 2 additions & 3 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument::WithSubscriber;
use tracing::{debug, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -206,7 +205,7 @@ impl Cluster {
};

let (fut, worker_handle) = worker.work().remote_handle();
tokio::spawn(fut.with_current_subscriber());
tokio::spawn(fut);

let result = Cluster {
data: cluster_data,
Expand Down Expand Up @@ -647,7 +646,7 @@ impl ClusterWorker {

let cluster_data = self.cluster_data.load_full();
let use_keyspace_future = Self::handle_use_keyspace_request(cluster_data, request);
tokio::spawn(use_keyspace_future.with_current_subscriber());
tokio::spawn(use_keyspace_future);
},
None => return, // If use_keyspace_channel was closed then cluster was dropped, we can stop working
}
Expand Down
5 changes: 2 additions & 3 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio::io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWrite
use tokio::net::{TcpSocket, TcpStream};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::instrument::WithSubscriber;
use tracing::{debug, error, trace, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -1090,7 +1089,7 @@ impl Connection {
node_address,
)
.remote_handle();
tokio::task::spawn(task.with_current_subscriber());
tokio::task::spawn(task);
return Ok(handle);
}

Expand All @@ -1104,7 +1103,7 @@ impl Connection {
node_address,
)
.remote_handle();
tokio::task::spawn(task.with_current_subscriber());
tokio::task::spawn(task);
Ok(handle)
}

Expand Down
20 changes: 8 additions & 12 deletions scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::sync::{Arc, RwLock, Weak};
use std::time::Duration;

use tokio::sync::{broadcast, mpsc, Notify};
use tracing::instrument::WithSubscriber;
use tracing::{debug, error, trace, warn};

/// The target size of a per-node connection pool.
Expand Down Expand Up @@ -212,7 +211,7 @@ impl NodeConnectionPool {

let conns = refiller.get_shared_connections();
let (fut, refiller_handle) = refiller.run(use_keyspace_request_receiver).remote_handle();
tokio::spawn(fut.with_current_subscriber());
tokio::spawn(fut);

Self {
conns,
Expand Down Expand Up @@ -1138,17 +1137,14 @@ impl PoolRefiller {
Err(QueryError::IoError(io_error.unwrap()))
};

tokio::task::spawn(
async move {
let res = fut.await;
match &res {
Ok(()) => debug!("[{}] Successfully changed current keyspace", address),
Err(err) => warn!("[{}] Failed to change keyspace: {:?}", address, err),
}
let _ = response_sender.send(res);
tokio::task::spawn(async move {
let res = fut.await;
match &res {
Ok(()) => debug!("[{}] Successfully changed current keyspace", address),
Err(err) => warn!("[{}] Failed to change keyspace: {:?}", address, err),
}
.with_current_subscriber(),
);
let _ = response_sender.send(res);
});
}

// Requires the keyspace to be set
Expand Down
3 changes: 1 addition & 2 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use scylla_cql::types::serialize::row::SerializedValues;
use std::result::Result;
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::instrument::WithSubscriber;

use super::errors::QueryError;
use super::execution_profile::ExecutionProfileInner;
Expand Down Expand Up @@ -387,7 +386,7 @@ impl RowIterator {
worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
mut receiver: mpsc::Receiver<Result<ReceivedPage, QueryError>>,
) -> Result<RowIterator, QueryError> {
tokio::task::spawn(worker_task.with_current_subscriber());
tokio::task::spawn(worker_task);

// This unwrap is safe because:
// - The future returned by worker.work sends at least one item
Expand Down
4 changes: 2 additions & 2 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2269,7 +2269,7 @@ mod latency_awareness {
use itertools::Either;
use scylla_cql::errors::{DbError, QueryError};
use tokio::time::{Duration, Instant};
use tracing::{instrument::WithSubscriber, trace, warn};
use tracing::{trace, warn};
use uuid::Uuid;

use crate::{load_balancing::NodeRef, routing::Shard, transport::node::Node};
Expand Down Expand Up @@ -2454,7 +2454,7 @@ mod latency_awareness {
}
}
.remote_handle();
tokio::task::spawn(updater_fut.with_current_subscriber());
tokio::task::spawn(updater_fut);

Self {
_updater_handle: Some(updater_handle),
Expand Down
3 changes: 1 addition & 2 deletions scylla/tests/integration/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::str::FromStr;
use tracing::instrument::WithSubscriber;

use scylla_proxy::{Node, Proxy, ProxyError, RunningProxy, ShardAwareness};

Expand Down Expand Up @@ -53,7 +52,7 @@ where
);

let translation_map = proxy.translation_map();
let running_proxy = proxy.run().with_current_subscriber().await.unwrap();
let running_proxy = proxy.run().await.unwrap();

let running_proxy = test(
[proxy1_uri, proxy2_uri, proxy3_uri],
Expand Down

0 comments on commit 5a50a57

Please sign in to comment.