Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add username to ClusterClient and ClusterConnection #596

Merged
merged 1 commit into from May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 28 additions & 7 deletions src/cluster.rs
Expand Up @@ -68,6 +68,7 @@ pub struct ClusterConnection {
slots: RefCell<SlotMap>,
auto_reconnect: RefCell<bool>,
readonly: bool,
username: Option<String>,
password: Option<String>,
read_timeout: RefCell<Option<Duration>>,
write_timeout: RefCell<Option<Duration>>,
Expand All @@ -94,16 +95,22 @@ impl ClusterConnection {
pub(crate) fn new(
initial_nodes: Vec<ConnectionInfo>,
readonly: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<ClusterConnection> {
let connections =
Self::create_initial_connections(&initial_nodes, readonly, password.clone())?;
let connections = Self::create_initial_connections(
&initial_nodes,
readonly,
username.clone(),
password.clone(),
)?;

let connection = ClusterConnection {
connections: RefCell::new(connections),
slots: RefCell::new(SlotMap::new()),
auto_reconnect: RefCell::new(true),
readonly,
username,
password,
read_timeout: RefCell::new(None),
write_timeout: RefCell::new(None),
Expand Down Expand Up @@ -195,6 +202,7 @@ impl ClusterConnection {
fn create_initial_connections(
initial_nodes: &[ConnectionInfo],
readonly: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<HashMap<String, Connection>> {
let mut connections = HashMap::with_capacity(initial_nodes.len());
Expand All @@ -213,7 +221,9 @@ impl ClusterConnection {
_ => panic!("No reach."),
};

if let Ok(mut conn) = connect(info.clone(), readonly, password.clone()) {
if let Ok(mut conn) =
connect(info.clone(), readonly, username.clone(), password.clone())
{
if conn.check_connection() {
connections.insert(addr, conn);
break;
Expand Down Expand Up @@ -262,9 +272,12 @@ impl ClusterConnection {
}
}

if let Ok(mut conn) =
connect(addr.as_ref(), self.readonly, self.password.clone())
{
if let Ok(mut conn) = connect(
addr.as_ref(),
self.readonly,
self.username.clone(),
self.password.clone(),
) {
if conn.check_connection() {
conn.set_read_timeout(*self.read_timeout.borrow())?;
conn.set_write_timeout(*self.write_timeout.borrow())?;
Expand Down Expand Up @@ -364,7 +377,12 @@ impl ClusterConnection {
} else {
// Create new connection.
// TODO: error handling
let conn = connect(addr, self.readonly, self.password.clone())?;
let conn = connect(
addr,
self.readonly,
self.username.clone(),
self.password.clone(),
)?;
Ok(connections.entry(addr.to_string()).or_insert(conn))
}
}
Expand Down Expand Up @@ -465,6 +483,7 @@ impl ClusterConnection {
let new_connections = Self::create_initial_connections(
&self.initial_nodes,
self.readonly,
self.username.clone(),
self.password.clone(),
)?;
{
Expand Down Expand Up @@ -694,12 +713,14 @@ impl ConnectionLike for ClusterConnection {
fn connect<T: IntoConnectionInfo>(
info: T,
readonly: bool,
username: Option<String>,
password: Option<String>,
) -> RedisResult<Connection>
where
T: std::fmt::Debug,
{
let mut connection_info = info.into_connection_info()?;
connection_info.redis.username = username;
connection_info.redis.password = password;
let client = super::Client::open(connection_info)?;

Expand Down
58 changes: 57 additions & 1 deletion src/cluster_client.rs
Expand Up @@ -8,6 +8,7 @@ use super::{
pub struct ClusterClientBuilder {
initial_nodes: RedisResult<Vec<ConnectionInfo>>,
readonly: bool,
username: Option<String>,
password: Option<String>,
}

Expand All @@ -20,6 +21,7 @@ impl ClusterClientBuilder {
.map(|x| x.into_connection_info())
.collect(),
readonly: false,
username: None,
password: None,
}
}
Expand All @@ -42,6 +44,12 @@ impl ClusterClientBuilder {
self
}

/// Set username for new ClusterClient.
pub fn username(mut self, username: String) -> ClusterClientBuilder {
self.username = Some(username);
self
}

/// Set read only mode for new ClusterClient (default is false).
/// If readonly is true, all queries will go to replica nodes. If there are no replica nodes,
/// queries will be issued to the primary nodes.
Expand All @@ -55,6 +63,7 @@ impl ClusterClientBuilder {
pub struct ClusterClient {
initial_nodes: Vec<ConnectionInfo>,
readonly: bool,
username: Option<String>,
password: Option<String>,
}

Expand All @@ -81,6 +90,7 @@ impl ClusterClient {
ClusterConnection::new(
self.initial_nodes.clone(),
self.readonly,
self.username.clone(),
self.password.clone(),
)
}
Expand All @@ -89,6 +99,7 @@ impl ClusterClient {
let initial_nodes = builder.initial_nodes?;
let mut nodes = Vec::with_capacity(initial_nodes.len());
let mut connection_info_password = None::<String>;
let mut connection_info_username = None::<String>;

for (index, info) in initial_nodes.into_iter().enumerate() {
if let ConnectionAddr::Unix(_) = info.addr {
Expand All @@ -107,12 +118,24 @@ impl ClusterClient {
}
}

if builder.username.is_none() {
if index == 0 {
connection_info_username = info.redis.username.clone();
} else if connection_info_username != info.redis.username {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"Cannot use different username among initial nodes.",
)));
}
}

nodes.push(info);
}

Ok(ClusterClient {
initial_nodes: nodes,
readonly: builder.readonly,
username: builder.username.or(connection_info_username),
password: builder.password.or(connection_info_password),
})
}
Expand Down Expand Up @@ -151,6 +174,20 @@ mod tests {
]
}

fn get_connection_data_with_username_and_password() -> Vec<ConnectionInfo> {
vec![
"redis://user1:password@127.0.0.1:6379"
.into_connection_info()
.unwrap(),
"redis://user1:password@127.0.0.1:6378"
.into_connection_info()
.unwrap(),
"redis://user1:password@127.0.0.1:6377"
.into_connection_info()
.unwrap(),
]
}

#[test]
fn give_no_password() {
let client = ClusterClient::open(get_connection_data()).unwrap();
Expand All @@ -163,6 +200,13 @@ mod tests {
assert_eq!(client.password, Some("password".to_string()));
}

#[test]
fn give_username_and_password_by_initial_nodes() {
let client = ClusterClient::open(get_connection_data_with_username_and_password()).unwrap();
assert_eq!(client.password, Some("password".to_string()));
assert_eq!(client.username, Some("user1".to_string()));
}

#[test]
fn give_different_password_by_initial_nodes() {
let result = ClusterClient::open(vec![
Expand All @@ -174,11 +218,23 @@ mod tests {
}

#[test]
fn give_password_by_method() {
fn give_different_username_by_initial_nodes() {
let result = ClusterClient::open(vec![
"redis://user1:password@127.0.0.1:6379",
"redis://user2:password@127.0.0.1:6378",
"redis://user1:password@127.0.0.1:6377",
]);
assert!(result.is_err());
}

#[test]
fn give_username_password_by_method() {
let client = ClusterClientBuilder::new(get_connection_data_with_password())
.password("pass".to_string())
.username("user1".to_string())
.open()
.unwrap();
assert_eq!(client.password, Some("pass".to_string()));
assert_eq!(client.username, Some("user1".to_string()));
}
}
20 changes: 19 additions & 1 deletion tests/support/cluster.rs
Expand Up @@ -53,12 +53,21 @@ pub struct RedisCluster {
}

impl RedisCluster {
pub fn username() -> &'static str {
"hello"
}

pub fn password() -> &'static str {
"world"
}

pub fn new(nodes: u16, replicas: u16) -> RedisCluster {
let mut servers = vec![];
let mut folders = vec![];
let mut addrs = vec![];
let start_port = 7000;
let mut tls_paths = None;

let mut is_tls = false;

if let ClusterType::TcpTls = ClusterType::get_intended() {
Expand All @@ -84,14 +93,23 @@ impl RedisCluster {
.prefix("redis")
.tempdir()
.expect("failed to create tempdir");
let acl_path = tempdir.path().join("users.acl");
let acl_content = format!(
"user {} on allcommands allkeys >{}",
Self::username(),
Self::password()
);
std::fs::write(&acl_path, acl_content).expect("failed to write acl file");
cmd.arg("--cluster-enabled")
.arg("yes")
.arg("--cluster-config-file")
.arg(&tempdir.path().join("nodes.conf"))
.arg("--cluster-node-timeout")
.arg("5000")
.arg("--appendonly")
.arg("yes");
.arg("yes")
.arg("--aclfile")
.arg(&acl_path);
if is_tls {
cmd.arg("--tls-cluster").arg("yes");
if replicas > 0 {
Expand Down
33 changes: 33 additions & 0 deletions tests/test_cluster.rs
Expand Up @@ -22,6 +22,39 @@ fn test_cluster_basics() {
);
}

#[test]
fn test_cluster_with_username_and_password() {
let cluster = TestClusterContext::new_with_cluster_client_builder(3, 0, |builder| {
builder
.username(RedisCluster::username().to_string())
.password(RedisCluster::password().to_string())
});
let mut con = cluster.connection();

redis::cmd("SET")
.arg("{x}key1")
.arg(b"foo")
.execute(&mut con);
redis::cmd("SET").arg(&["{x}key2", "bar"]).execute(&mut con);

assert_eq!(
redis::cmd("MGET")
.arg(&["{x}key1", "{x}key2"])
.query(&mut con),
Ok(("foo".to_string(), b"bar".to_vec()))
);
}

#[test]
fn test_cluster_with_bad_password() {
let cluster = TestClusterContext::new_with_cluster_client_builder(3, 0, |builder| {
builder
.username(RedisCluster::username().to_string())
.password("not the right password".to_string())
});
assert!(cluster.client.get_connection().is_err());
}

#[test]
fn test_cluster_readonly() {
let cluster =
Expand Down