Skip to content

Commit

Permalink
Cluster: Create read_from_replicas option (#635)
Browse files Browse the repository at this point in the history
Send write queries to primaries & read queries to replicas in `read_from_replicas` mode;
deprecate the `readonly` param in favour of the new `read_from_replicas` param.
  • Loading branch information
utkarshgupta137 committed Jul 8, 2022
1 parent 3c91ce2 commit 36fa7e3
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 97 deletions.
42 changes: 37 additions & 5 deletions benches/bench_cluster.rs
Expand Up @@ -16,7 +16,10 @@ fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterCon
let mut group = c.benchmark_group("cluster_basic");

group.bench_function("set", |b| {
b.iter(|| black_box(redis::cmd("SET").arg(key).arg(42).execute(con)))
b.iter(|| {
redis::cmd("SET").arg(key).arg(42).execute(con);
black_box(())
})
});

group.bench_function("get", |b| {
Expand All @@ -27,7 +30,12 @@ fn bench_set_get_and_del(c: &mut Criterion, con: &mut redis::cluster::ClusterCon
redis::cmd("SET").arg(key).arg(42).execute(con);
redis::cmd("DEL").arg(key).execute(con);
};
group.bench_function("set_and_del", |b| b.iter(|| black_box(set_and_del())));
group.bench_function("set_and_del", |b| {
b.iter(|| {
set_and_del();
black_box(())
})
});

group.finish();
}
Expand All @@ -47,14 +55,22 @@ fn bench_pipeline(c: &mut Criterion, con: &mut redis::cluster::ClusterConnection
pipe.set(q, "bar").ignore();
}
};
group.bench_function("build_pipeline", |b| b.iter(|| black_box(build_pipeline())));
group.bench_function("build_pipeline", |b| {
b.iter(|| {
build_pipeline();
black_box(())
})
});

let mut pipe = cluster_pipe();
for q in &queries {
pipe.set(q, "bar").ignore();
}
group.bench_function("query_pipeline", |b| {
b.iter(|| black_box(pipe.query::<()>(con).unwrap()))
b.iter(|| {
pipe.query::<()>(con).unwrap();
black_box(())
})
});

group.finish();
Expand All @@ -69,5 +85,21 @@ fn bench_cluster_setup(c: &mut Criterion) {
bench_pipeline(c, &mut con);
}

criterion_group!(cluster_bench, bench_cluster_setup);
#[allow(dead_code)]
fn bench_cluster_read_from_replicas_setup(c: &mut Criterion) {
let cluster = TestClusterContext::new_with_cluster_client_builder(6, 1, |builder| {
builder.read_from_replicas()
});
cluster.wait_for_cluster_up();

let mut con = cluster.connection();
bench_set_get_and_del(c, &mut con);
bench_pipeline(c, &mut con);
}

criterion_group!(
cluster_bench,
bench_cluster_setup,
// bench_cluster_read_from_replicas_setup
);
criterion_main!(cluster_bench);
154 changes: 89 additions & 65 deletions src/cluster.rs
Expand Up @@ -61,15 +61,15 @@ use crate::cluster_pipeline::UNROUTABLE_ERROR;
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};
use crate::cluster_routing::{Routable, RoutingInfo, Slot, SLOT_SIZE};

type SlotMap = BTreeMap<u16, String>;
type SlotMap = BTreeMap<u16, [String; 2]>;

/// This is a connection of Redis cluster.
pub struct ClusterConnection {
initial_nodes: Vec<ConnectionInfo>,
connections: RefCell<HashMap<String, Connection>>,
slots: RefCell<SlotMap>,
auto_reconnect: RefCell<bool>,
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
read_timeout: RefCell<Option<Duration>>,
Expand All @@ -96,13 +96,13 @@ impl TlsMode {
impl ClusterConnection {
pub(crate) fn new(
initial_nodes: Vec<ConnectionInfo>,
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<ClusterConnection> {
let connections = Self::create_initial_connections(
&initial_nodes,
readonly,
read_from_replicas,
username.clone(),
password.clone(),
)?;
Expand All @@ -111,7 +111,7 @@ impl ClusterConnection {
connections: RefCell::new(connections),
slots: RefCell::new(SlotMap::new()),
auto_reconnect: RefCell::new(true),
readonly,
read_from_replicas,
username,
password,
read_timeout: RefCell::new(None),
Expand Down Expand Up @@ -155,6 +155,14 @@ impl ClusterConnection {
/// block indefinitely. It is an error to pass the zero `Duration` to this
/// method.
pub fn set_write_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
// Check if duration is valid before updating local value.
if dur.is_some() && dur.unwrap().is_zero() {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Duration should be None or non-zero.",
)));
}

let mut t = self.write_timeout.borrow_mut();
*t = dur;
let connections = self.connections.borrow();
Expand All @@ -170,6 +178,14 @@ impl ClusterConnection {
/// block indefinitely. It is an error to pass the zero `Duration` to this
/// method.
pub fn set_read_timeout(&self, dur: Option<Duration>) -> RedisResult<()> {
// Check if duration is valid before updating local value.
if dur.is_some() && dur.unwrap().is_zero() {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Duration should be None or non-zero.",
)));
}

let mut t = self.read_timeout.borrow_mut();
*t = dur;
let connections = self.connections.borrow();
Expand Down Expand Up @@ -203,7 +219,7 @@ impl ClusterConnection {
/// `BrokenPipe` error.
fn create_initial_connections(
initial_nodes: &[ConnectionInfo],
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<HashMap<String, Connection>> {
Expand All @@ -223,9 +239,12 @@ impl ClusterConnection {
_ => panic!("No reach."),
};

if let Ok(mut conn) =
connect(info.clone(), readonly, username.clone(), password.clone())
{
if let Ok(mut conn) = connect(
info.clone(),
read_from_replicas,
username.clone(),
password.clone(),
) {
if conn.check_connection() {
connections.insert(addr, conn);
break;
Expand All @@ -245,58 +264,59 @@ impl ClusterConnection {
// Query a node to discover slot-> master mappings.
fn refresh_slots(&self) -> RedisResult<()> {
let mut slots = self.slots.borrow_mut();
*slots = if self.readonly {
let mut rng = thread_rng();
self.create_new_slots(|slot_data| {
let replicas = slot_data.replicas();
if replicas.is_empty() {
slot_data.master().to_string()
} else {
replicas.choose(&mut rng).unwrap().to_string()
}
})?
} else {
self.create_new_slots(|slot_data| slot_data.master().to_string())?
};
*slots = self.create_new_slots(|slot_data| {
let replica = if !self.read_from_replicas || slot_data.replicas().is_empty() {
slot_data.master().to_string()
} else {
slot_data
.replicas()
.choose(&mut thread_rng())
.unwrap()
.to_string()
};

[slot_data.master().to_string(), replica]
})?;

let mut nodes = slots.values().flatten().collect::<Vec<_>>();
nodes.sort_unstable();
nodes.dedup();

let mut connections = self.connections.borrow_mut();
*connections = {
// Remove dead connections and connect to new nodes if necessary
let mut new_connections = HashMap::with_capacity(connections.len());

for addr in slots.values() {
if !new_connections.contains_key(addr) {
if connections.contains_key(addr) {
let mut conn = connections.remove(addr).unwrap();
if conn.check_connection() {
new_connections.insert(addr.to_string(), conn);
continue;
}
*connections = nodes
.into_iter()
.filter_map(|addr| {
if connections.contains_key(addr) {
let mut conn = connections.remove(addr).unwrap();
if conn.check_connection() {
return Some((addr.to_string(), conn));
}
}

if let Ok(mut conn) = connect(
addr.as_ref(),
self.readonly,
self.username.clone(),
self.password.clone(),
) {
if conn.check_connection() {
conn.set_read_timeout(*self.read_timeout.borrow())?;
conn.set_write_timeout(*self.write_timeout.borrow())?;
new_connections.insert(addr.to_string(), conn);
}
if let Ok(mut conn) = connect(
addr.as_ref(),
self.read_from_replicas,
self.username.clone(),
self.password.clone(),
) {
if conn.check_connection() {
conn.set_read_timeout(*self.read_timeout.borrow()).unwrap();
conn.set_write_timeout(*self.write_timeout.borrow())
.unwrap();
return Some((addr.to_string(), conn));
}
}
}
new_connections
};

None
})
.collect();

Ok(())
}

fn create_new_slots<F>(&self, mut get_addr: F) -> RedisResult<SlotMap>
where
F: FnMut(&Slot) -> String,
F: FnMut(&Slot) -> [String; 2],
{
let mut connections = self.connections.borrow_mut();
let mut new_slots = None;
Expand All @@ -323,7 +343,7 @@ impl ClusterConnection {
Ok(slot_data.end() + 1)
})?;

if usize::from(last_slot) != SLOT_SIZE {
if last_slot != SLOT_SIZE {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
Expand Down Expand Up @@ -354,13 +374,14 @@ impl ClusterConnection {
fn get_connection<'a>(
&self,
connections: &'a mut HashMap<String, Connection>,
slot: u16,
route: (u16, usize),
) -> RedisResult<(String, &'a mut Connection)> {
let (slot, idx) = route;
let slots = self.slots.borrow();
if let Some((_, addr)) = slots.range(&slot..).next() {
Ok((
addr.to_string(),
self.get_connection_by_addr(connections, addr)?,
addr[idx].clone(),
self.get_connection_by_addr(connections, &addr[idx])?,
))
} else {
// try a random node next. This is safe if slots are involved
Expand All @@ -381,7 +402,7 @@ impl ClusterConnection {
// TODO: error handling
let conn = connect(
addr,
self.readonly,
self.read_from_replicas,
self.username.clone(),
self.password.clone(),
)?;
Expand Down Expand Up @@ -412,9 +433,10 @@ impl ClusterConnection {
T: MergeResults + std::fmt::Debug,
F: FnMut(&mut Connection) -> RedisResult<T>,
{
let slot = match RoutingInfo::for_routable(cmd) {
let route = match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::Random) => None,
Some(RoutingInfo::Slot(slot)) => Some(slot),
Some(RoutingInfo::MasterSlot(slot)) => Some((slot, 0)),
Some(RoutingInfo::ReplicaSlot(slot)) => Some((slot, 1)),
Some(RoutingInfo::AllNodes) | Some(RoutingInfo::AllMasters) => {
return self.execute_on_all_nodes(func);
}
Expand All @@ -439,10 +461,10 @@ impl ClusterConnection {
is_asking = false;
}
(addr.to_string(), conn)
} else if !excludes.is_empty() || slot.is_none() {
} else if !excludes.is_empty() || route.is_none() {
get_random_connection(&mut *connections, Some(&excludes))
} else {
self.get_connection(&mut *connections, slot.unwrap())?
self.get_connection(&mut *connections, route.unwrap())?
};
(addr, func(conn))
};
Expand Down Expand Up @@ -484,7 +506,7 @@ impl ClusterConnection {
} else if *self.auto_reconnect.borrow() && err.is_io_error() {
let new_connections = Self::create_initial_connections(
&self.initial_nodes,
self.readonly,
self.read_from_replicas,
self.username.clone(),
self.password.clone(),
)?;
Expand Down Expand Up @@ -552,20 +574,21 @@ impl ClusterConnection {
fn get_addr_for_cmd(&self, cmd: &Cmd) -> RedisResult<String> {
let slots = self.slots.borrow();

let addr_for_slot = |slot: u16| -> RedisResult<String> {
let addr_for_slot = |slot: u16, idx: usize| -> RedisResult<String> {
let (_, addr) = slots
.range(&slot..)
.next()
.ok_or((ErrorKind::ClusterDown, "Missing slot coverage"))?;
Ok(addr.to_string())
Ok(addr[idx].clone())
};

match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::Random) => {
let mut rng = thread_rng();
Ok(addr_for_slot(rng.gen_range(0..SLOT_SIZE) as u16)?)
Ok(addr_for_slot(rng.gen_range(0..SLOT_SIZE) as u16, 0)?)
}
Some(RoutingInfo::Slot(slot)) => Ok(addr_for_slot(slot)?),
Some(RoutingInfo::MasterSlot(slot)) => Ok(addr_for_slot(slot, 0)?),
Some(RoutingInfo::ReplicaSlot(slot)) => Ok(addr_for_slot(slot, 1)?),
_ => fail!(UNROUTABLE_ERROR),
}
}
Expand Down Expand Up @@ -714,7 +737,7 @@ impl ConnectionLike for ClusterConnection {

fn connect<T: IntoConnectionInfo>(
info: T,
readonly: bool,
read_from_replicas: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<Connection>
Expand All @@ -727,7 +750,8 @@ where
let client = super::Client::open(connection_info)?;

let mut con = client.get_connection()?;
if readonly {
if read_from_replicas {
// If READONLY is sent to primary nodes, it will have no effect
cmd("READONLY").query(&mut con)?;
}
Ok(con)
Expand Down

0 comments on commit 36fa7e3

Please sign in to comment.