Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster: Create read_from_replicas option #635

Merged
merged 5 commits into from Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 37 additions & 5 deletions benches/bench_cluster.rs
Expand Up @@ -15,7 +15,10 @@ fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterCon
let mut group = c.benchmark_group("cluster_basic");

group.bench_function("set", |b| {
b.iter(|| black_box(redis::cmd("SET").arg(key).arg(42).execute(con)))
b.iter(|| {
redis::cmd("SET").arg(key).arg(42).execute(con);
black_box(())
})
});

group.bench_function("get", |b| {
Expand All @@ -26,7 +29,12 @@ fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterCon
redis::cmd("SET").arg(key).arg(42).execute(con);
redis::cmd("DEL").arg(key).execute(con);
};
group.bench_function("set_and_del", |b| b.iter(|| black_box(set_and_del())));
group.bench_function("set_and_del", |b| {
b.iter(|| {
set_and_del();
black_box(())
})
});

group.finish();
}
Expand All @@ -46,14 +54,22 @@ fn bench_pipeline(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection
pipe.set(q, "bar").ignore();
}
};
group.bench_function("build_pipeline", |b| b.iter(|| black_box(build_pipeline())));
group.bench_function("build_pipeline", |b| {
b.iter(|| {
build_pipeline();
black_box(())
})
});

let mut pipe = cluster_pipe();
for q in &queries {
pipe.set(q, "bar").ignore();
}
group.bench_function("query_pipeline", |b| {
b.iter(|| black_box(pipe.query::<()>(con).unwrap()))
b.iter(|| {
pipe.query::<()>(con).unwrap();
black_box(())
})
});

group.finish();
Expand All @@ -68,5 +84,21 @@ fn bench_cluster_setup(c: &mut Criterion) {
bench_pipeline(c, &mut con);
}

criterion_group!(cluster_bench, bench_cluster_setup);
#[allow(dead_code)]
fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) {
let cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| {
builder.read_from_replicas()
});
cluster.wait_for_cluster_up();

let mut con = cluster.connection();
bench_set_get_and_del(c, &mut con);
bench_pipeline(c, &mut con);
}

criterion_group!(
cluster_bench,
bench_cluster_setup,
// bench_cluster_read_from_replicas_setup
);
criterion_main!(cluster_bench);
154 changes: 89 additions & 65 deletions src/cluster.rs
Expand Up @@ -61,15 +61,15 @@ use crate::cluster_pipeline::UNROUTABLE_ERROR;
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};
use crate::cluster_routing::{Routable, RoutingInfo, Slot, SLOT_SIZE};

type SlotMap = BTreeMap<u16, String>;
type SlotMap = BTreeMap<u16, [String; 2]>;

/// This is a connection of Redis cluster.
pub struct ClusterConnection {
initial_nodes: Vec<ConnectionInfo>,
connections: RefCell<HashMap<String, Connection>>,
slots: RefCell<SlotMap>,
auto_reconnect: RefCell<bool>,
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
read_timeout: RefCell<Option<Duration>>,
Expand All @@ -96,13 +96,13 @@ impl TlsMode {
impl ClusterConnection {
pub(crate) fn new(
initial_nodes: Vec<ConnectionInfo>,
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<ClusterConnection> {
let connections = Self::create_initial_connections(
&initial_nodes,
readonly,
read_from_replicas,
username.clone(),
password.clone(),
)?;
Expand All @@ -111,7 +111,7 @@ impl ClusterConnection {
connections: RefCell::new(connections),
slots: RefCell::new(SlotMap::new()),
auto_reconnect: RefCell::new(true),
readonly,
read_from_replicas,
username,
password,
read_timeout: RefCell::new(None),
Expand Down Expand Up @@ -155,6 +155,14 @@ impl ClusterConnection {
/// block indefinitely. It is an error to pass the zero `Duration` to this
/// method.
pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
// Check if duration is valid before updating local value.
if dur.is_some() && dur.unwrap() == Duration::default() {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Duration should be None or non-zero.",
)));
}

let mut t = self.write_timeout.borrow_mut();
*t = dur;
let connections = self.connections.borrow();
Expand All @@ -170,6 +178,14 @@ impl ClusterConnection {
/// block indefinitely. It is an error to pass the zero `Duration` to this
/// method.
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
// Check if duration is valid before updating local value.
if dur.is_some() && dur.unwrap() == Duration::default() {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Duration should be None or non-zero.",
)));
}

let mut t = self.read_timeout.borrow_mut();
*t = dur;
let connections = self.connections.borrow();
Expand Down Expand Up @@ -203,7 +219,7 @@ impl ClusterConnection {
/// `BrokenPipe` error.
fn create_initial_connections(
initial_nodes: &[ConnectionInfo],
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<HashMap<String, Connection>> {
Expand All @@ -223,9 +239,12 @@ impl ClusterConnection {
_ => panic!("No reach."),
};

if let Ok(mut conn) =
connect(info.clone(), readonly, username.clone(), password.clone())
{
if let Ok(mut conn) = connect(
info.clone(),
read_from_replicas,
username.clone(),
password.clone(),
) {
if conn.check_connection() {
connections.insert(addr, conn);
break;
Expand All @@ -245,58 +264,59 @@ impl ClusterConnection {
// Query a node to discover slot-> master mappings.
fn refresh_slots(&self) -> RedisResult<()> {
let mut slots = self.slots.borrow_mut();
*slots = if self.readonly {
let mut rng = thread_rng();
self.create_new_slots(|slot_data| {
let replicas = slot_data.replicas();
if replicas.is_empty() {
slot_data.master().to_string()
} else {
replicas.choose(&mut rng).unwrap().to_string()
}
})?
} else {
self.create_new_slots(|slot_data| slot_data.master().to_string())?
};
*slots = self.create_new_slots(|slot_data| {
let replica = if !self.read_from_replicas || slot_data.replicas().is_empty() {
slot_data.master().to_string()
} else {
slot_data
.replicas()
.choose(&mut thread_rng())
.unwrap()
.to_string()
};

[slot_data.master().to_string(), replica]
})?;

let mut nodes = slots.values().flatten().collect::<Vec<_>>();
nodes.sort_unstable();
nodes.dedup();

let mut connections = self.connections.borrow_mut();
*connections = {
// Remove dead connections and connect to new nodes if necessary
let mut new_connections = HashMap::with_capacity(connections.len());

for addr in slots.values() {
if !new_connections.contains_key(addr) {
if connections.contains_key(addr) {
let mut conn = connections.remove(addr).unwrap();
if conn.check_connection() {
new_connections.insert(addr.to_string(), conn);
continue;
}
*connections = nodes
.into_iter()
.filter_map(|addr| {
if connections.contains_key(addr) {
let mut conn = connections.remove(addr).unwrap();
if conn.check_connection() {
return Some((addr.to_string(), conn));
}
}

if let Ok(mut conn) = connect(
addr.as_ref(),
self.readonly,
self.username.clone(),
self.password.clone(),
) {
if conn.check_connection() {
conn.set_read_timeout(*self.read_timeout.borrow())?;
conn.set_write_timeout(*self.write_timeout.borrow())?;
new_connections.insert(addr.to_string(), conn);
}
if let Ok(mut conn) = connect(
addr.as_ref(),
self.read_from_replicas,
self.username.clone(),
self.password.clone(),
) {
if conn.check_connection() {
conn.set_read_timeout(*self.read_timeout.borrow()).unwrap();
conn.set_write_timeout(*self.write_timeout.borrow())
.unwrap();
return Some((addr.to_string(), conn));
}
}
}
new_connections
};

None
})
.collect();

Ok(())
}

fn create_new_slots<F>(&self, mut get_addr: F) -> RedisResult<SlotMap>
where
F: FnMut(&Slot) -> String,
F: FnMut(&Slot) -> [String; 2],
{
let mut connections = self.connections.borrow_mut();
let mut new_slots = None;
Expand All @@ -323,7 +343,7 @@ impl ClusterConnection {
Ok(slot_data.end() + 1)
})?;

if usize::from(last_slot) != SLOT_SIZE {
if last_slot != SLOT_SIZE {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
Expand Down Expand Up @@ -354,13 +374,14 @@ impl ClusterConnection {
fn get_connection<'a>(
&self,
connections: &'a mut HashMap<String, Connection>,
slot: u16,
route: (u16, usize),
) -> RedisResult<(String, &'a mut Connection)> {
let (slot, idx) = route;
let slots = self.slots.borrow();
if let Some((_, addr)) = slots.range(&slot..).next() {
Ok((
addr.to_string(),
self.get_connection_by_addr(connections, addr)?,
addr[idx].clone(),
self.get_connection_by_addr(connections, &addr[idx])?,
))
} else {
// try a random node next. This is safe if slots are involved
Expand All @@ -381,7 +402,7 @@ impl ClusterConnection {
// TODO: error handling
let conn = connect(
addr,
self.readonly,
self.read_from_replicas,
self.username.clone(),
self.password.clone(),
)?;
Expand Down Expand Up @@ -412,9 +433,10 @@ impl ClusterConnection {
T: MergeResults + std::fmt::Debug,
F: FnMut(&mut Connection) -> RedisResult<T>,
{
let slot = match RoutingInfo::for_routable(cmd) {
let route = match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::Random) => None,
Some(RoutingInfo::Slot(slot)) => Some(slot),
Some(RoutingInfo::MasterSlot(slot)) => Some((slot, 0)),
Some(RoutingInfo::ReplicaSlot(slot)) => Some((slot, 1)),
Some(RoutingInfo::AllNodes) | Some(RoutingInfo::AllMasters) => {
return self.execute_on_all_nodes(func);
}
Expand All @@ -439,10 +461,10 @@ impl ClusterConnection {
is_asking = false;
}
(addr.to_string(), conn)
} else if !excludes.is_empty() || slot.is_none() {
} else if !excludes.is_empty() || route.is_none() {
get_random_connection(&mut *connections, Some(&excludes))
} else {
self.get_connection(&mut *connections, slot.unwrap())?
self.get_connection(&mut *connections, route.unwrap())?
};
(addr, func(conn))
};
Expand Down Expand Up @@ -484,7 +506,7 @@ impl ClusterConnection {
} else if *self.auto_reconnect.borrow() && err.is_io_error() {
let new_connections = Self::create_initial_connections(
&self.initial_nodes,
self.readonly,
self.read_from_replicas,
self.username.clone(),
self.password.clone(),
)?;
Expand Down Expand Up @@ -552,20 +574,21 @@ impl ClusterConnection {
fn get_addr_for_cmd(&self, cmd: &Cmd) -> RedisResult<String> {
let slots = self.slots.borrow();

let addr_for_slot = |slot: u16| -> RedisResult<String> {
let addr_for_slot = |slot: u16, idx: usize| -> RedisResult<String> {
let (_, addr) = slots
.range(&slot..)
.next()
.ok_or((ErrorKind::ClusterDown, "Missing slot coverage"))?;
Ok(addr.to_string())
Ok(addr[idx].clone())
};

match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::Random) => {
let mut rng = thread_rng();
Ok(addr_for_slot(rng.gen_range(0..SLOT_SIZE) as u16)?)
Ok(addr_for_slot(rng.gen_range(0..SLOT_SIZE) as u16, 0)?)
}
Some(RoutingInfo::Slot(slot)) => Ok(addr_for_slot(slot)?),
Some(RoutingInfo::MasterSlot(slot)) => Ok(addr_for_slot(slot, 0)?),
Some(RoutingInfo::ReplicaSlot(slot)) => Ok(addr_for_slot(slot, 1)?),
_ => fail!(UNROUTABLE_ERROR),
}
}
Expand Down Expand Up @@ -714,7 +737,7 @@ impl ConnectionLike for ClusterConnection {

fn connect<T: IntoConnectionInfo>(
info: T,
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<Connection>
Expand All @@ -727,7 +750,8 @@ where
let client = super::Client::open(connection_info)?;

let mut con = client.get_connection()?;
if readonly {
if read_from_replicas {
// If READONLY is sent to primary nodes, it will have no effect
cmd("READONLY").query(&mut con)?;
}
Ok(con)
Expand Down