Skip to content

Commit

Permalink
Add username to ClusterClient and ClusterConnection (redis-rs#596)
Browse files Browse the repository at this point in the history
  • Loading branch information
gildaf authored and gkorland committed May 24, 2022
1 parent 0b03f76 commit 9075478
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 9 deletions.
35 changes: 28 additions & 7 deletions src/cluster.rs
Expand Up @@ -68,6 +68,7 @@ pub struct ClusterConnection {
slots: RefCell<SlotMap>,
auto_reconnect: RefCell<bool>,
readonly: bool,
username: Option<String>,
password: Option<String>,
read_timeout: RefCell<Option<Duration>>,
write_timeout: RefCell<Option<Duration>>,
Expand All @@ -94,16 +95,22 @@ impl ClusterConnection {
pub(crate) fn new(
initial_nodes: Vec<ConnectionInfo>,
readonly: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<ClusterConnection> {
let connections =
Self::create_initial_connections(&initial_nodes, readonly, password.clone())?;
let connections = Self::create_initial_connections(
&initial_nodes,
readonly,
username.clone(),
password.clone(),
)?;

let connection = ClusterConnection {
connections: RefCell::new(connections),
slots: RefCell::new(SlotMap::new()),
auto_reconnect: RefCell::new(true),
readonly,
username,
password,
read_timeout: RefCell::new(None),
write_timeout: RefCell::new(None),
Expand Down Expand Up @@ -195,6 +202,7 @@ impl ClusterConnection {
fn create_initial_connections(
initial_nodes: &[ConnectionInfo],
readonly: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<HashMap<String, Connection>> {
let mut connections = HashMap::with_capacity(initial_nodes.len());
Expand All @@ -213,7 +221,9 @@ impl ClusterConnection {
_ => panic!("No reach."),
};

if let Ok(mut conn) = connect(info.clone(), readonly, password.clone()) {
if let Ok(mut conn) =
connect(info.clone(), readonly, username.clone(), password.clone())
{
if conn.check_connection() {
connections.insert(addr, conn);
break;
Expand Down Expand Up @@ -262,9 +272,12 @@ impl ClusterConnection {
}
}

if let Ok(mut conn) =
connect(addr.as_ref(), self.readonly, self.password.clone())
{
if let Ok(mut conn) = connect(
addr.as_ref(),
self.readonly,
self.username.clone(),
self.password.clone(),
) {
if conn.check_connection() {
conn.set_read_timeout(*self.read_timeout.borrow())?;
conn.set_write_timeout(*self.write_timeout.borrow())?;
Expand Down Expand Up @@ -364,7 +377,12 @@ impl ClusterConnection {
} else {
// Create new connection.
// TODO: error handling
let conn = connect(addr, self.readonly, self.password.clone())?;
let conn = connect(
addr,
self.readonly,
self.username.clone(),
self.password.clone(),
)?;
Ok(connections.entry(addr.to_string()).or_insert(conn))
}
}
Expand Down Expand Up @@ -465,6 +483,7 @@ impl ClusterConnection {
let new_connections = Self::create_initial_connections(
&self.initial_nodes,
self.readonly,
self.username.clone(),
self.password.clone(),
)?;
{
Expand Down Expand Up @@ -694,12 +713,14 @@ impl ConnectionLike for ClusterConnection {
fn connect<T: IntoConnectionInfo>(
info: T,
readonly: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<Connection>
where
T: std::fmt::Debug,
{
let mut connection_info = info.into_connection_info()?;
connection_info.redis.username = username;
connection_info.redis.password = password;
let client = super::Client::open(connection_info)?;

Expand Down
58 changes: 57 additions & 1 deletion src/cluster_client.rs
Expand Up @@ -8,6 +8,7 @@ use super::{
pub struct ClusterClientBuilder {
initial_nodes: RedisResult<Vec<ConnectionInfo>>,
readonly: bool,
username: Option<String>,
password: Option<String>,
}

Expand All @@ -20,6 +21,7 @@ impl ClusterClientBuilder {
.map(|x| x.into_connection_info())
.collect(),
readonly: false,
username: None,
password: None,
}
}
Expand All @@ -42,6 +44,12 @@ impl ClusterClientBuilder {
self
}

/// Set username for new ClusterClient.
pub fn username(mut self, username: String) -> ClusterClientBuilder {
self.username = Some(username);
self
}

/// Set read only mode for new ClusterClient (default is false).
/// If readonly is true, all queries will go to replica nodes. If there are no replica nodes,
/// queries will be issued to the primary nodes.
Expand All @@ -55,6 +63,7 @@ impl ClusterClientBuilder {
pub struct ClusterClient {
initial_nodes: Vec<ConnectionInfo>,
readonly: bool,
username: Option<String>,
password: Option<String>,
}

Expand All @@ -81,6 +90,7 @@ impl ClusterClient {
ClusterConnection::new(
self.initial_nodes.clone(),
self.readonly,
self.username.clone(),
self.password.clone(),
)
}
Expand All @@ -89,6 +99,7 @@ impl ClusterClient {
let initial_nodes = builder.initial_nodes?;
let mut nodes = Vec::with_capacity(initial_nodes.len());
let mut connection_info_password = None::<String>;
let mut connection_info_username = None::<String>;

for (index, info) in initial_nodes.into_iter().enumerate() {
if let ConnectionAddr::Unix(_) = info.addr {
Expand All @@ -107,12 +118,24 @@ impl ClusterClient {
}
}

if builder.username.is_none() {
if index == 0 {
connection_info_username = info.redis.username.clone();
} else if connection_info_username != info.redis.username {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot use different username among initial nodes.",
)));
}
}

nodes.push(info);
}

Ok(ClusterClient {
initial_nodes: nodes,
readonly: builder.readonly,
username: builder.username.or(connection_info_username),
password: builder.password.or(connection_info_password),
})
}
Expand Down Expand Up @@ -151,6 +174,20 @@ mod tests {
]
}

fn get_connection_data_with_username_and_password() -> Vec<ConnectionInfo> {
vec![
"redis://user1:password@127.0.0.1:6379"
.into_connection_info()
.unwrap(),
"redis://user1:password@127.0.0.1:6378"
.into_connection_info()
.unwrap(),
"redis://user1:password@127.0.0.1:6377"
.into_connection_info()
.unwrap(),
]
}

#[test]
fn give_no_password() {
let client = ClusterClient::open(get_connection_data()).unwrap();
Expand All @@ -163,6 +200,13 @@ mod tests {
assert_eq!(client.password, Some("password".to_string()));
}

#[test]
fn give_username_and_password_by_initial_nodes() {
let client = ClusterClient::open(get_connection_data_with_username_and_password()).unwrap();
assert_eq!(client.password, Some("password".to_string()));
assert_eq!(client.username, Some("user1".to_string()));
}

#[test]
fn give_different_password_by_initial_nodes() {
let result = ClusterClient::open(vec![
Expand All @@ -174,11 +218,23 @@ mod tests {
}

#[test]
fn give_password_by_method() {
fn give_different_username_by_initial_nodes() {
let result = ClusterClient::open(vec![
"redis://user1:password@127.0.0.1:6379",
"redis://user2:password@127.0.0.1:6378",
"redis://user1:password@127.0.0.1:6377",
]);
assert!(result.is_err());
}

#[test]
fn give_username_password_by_method() {
let client = ClusterClientBuilder::new(get_connection_data_with_password())
.password("pass".to_string())
.username("user1".to_string())
.open()
.unwrap();
assert_eq!(client.password, Some("pass".to_string()));
assert_eq!(client.username, Some("user1".to_string()));
}
}
20 changes: 19 additions & 1 deletion tests/support/cluster.rs
Expand Up @@ -53,12 +53,21 @@ pub struct RedisCluster {
}

impl RedisCluster {
pub fn username() -> &'static str {
"hello"
}

pub fn password() -> &'static str {
"world"
}

pub fn new(nodes: u16, replicas: u16) -> RedisCluster {
let mut servers = vec![];
let mut folders = vec![];
let mut addrs = vec![];
let start_port = 7000;
let mut tls_paths = None;

let mut is_tls = false;

if let ClusterType::TcpTls = ClusterType::get_intended() {
Expand All @@ -84,14 +93,23 @@ impl RedisCluster {
.prefix("redis")
.tempdir()
.expect("failed to create tempdir");
let acl_path = tempdir.path().join("users.acl");
let acl_content = format!(
"user {} on allcommands allkeys >{}",
Self::username(),
Self::password()
);
std::fs::write(&acl_path, acl_content).expect("failed to write acl file");
cmd.arg("--cluster-enabled")
.arg("yes")
.arg("--cluster-config-file")
.arg(&tempdir.path().join("nodes.conf"))
.arg("--cluster-node-timeout")
.arg("5000")
.arg("--appendonly")
.arg("yes");
.arg("yes")
.arg("--aclfile")
.arg(&acl_path);
if is_tls {
cmd.arg("--tls-cluster").arg("yes");
if replicas > 0 {
Expand Down
33 changes: 33 additions & 0 deletions tests/test_cluster.rs
Expand Up @@ -22,6 +22,39 @@ fn test_cluster_basics() {
);
}

#[test]
fn test_cluster_with_username_and_password() {
let cluster = TestClusterContext::new_with_cluster_client_builder(3, 0, |builder| {
builder
.username(RedisCluster::username().to_string())
.password(RedisCluster::password().to_string())
});
let mut con = cluster.connection();

redis::cmd("SET")
.arg("{x}key1")
.arg(b"foo")
.execute(&mut con);
redis::cmd("SET").arg(&["{x}key2", "bar"]).execute(&mut con);

assert_eq!(
redis::cmd("MGET")
.arg(&["{x}key1", "{x}key2"])
.query(&mut con),
Ok(("foo".to_string(), b"bar".to_vec()))
);
}

#[test]
fn test_cluster_with_bad_password() {
let cluster = TestClusterContext::new_with_cluster_client_builder(3, 0, |builder| {
builder
.username(RedisCluster::username().to_string())
.password("not the right password".to_string())
});
assert!(cluster.client.get_connection().is_err());
}

#[test]
fn test_cluster_readonly() {
let cluster =
Expand Down

0 comments on commit 9075478

Please sign in to comment.