Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Authentication: Fix immediate update_connection_password to use username #3330

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ jobs:
source .env/bin/activate
pip install -r dev_requirements.txt
cd python/tests/
pytest -v --asyncio-mode=auto --html=pytest_report.html --self-contained-html
pytest -v --asyncio-mode=auto --html=pytest_report.html --self-contained-html -k test_update_connection_password

- uses: ./.github/workflows/test-benchmark
if: ${{ matrix.engine.version == '8.0' && matrix.host.OS == 'ubuntu' && matrix.host.RUNNER == 'ubuntu-latest' && matrix.python == '3.12' }}
5 changes: 5 additions & 0 deletions glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
@@ -563,6 +563,11 @@ impl Client {
.await
.map(|connection| connection.into_monitor())
}

/// Updates the password in connection_info.
pub fn update_password(&mut self, password: Option<String>) {
self.connection_info.redis.password = password;
}
}

#[cfg(feature = "aio")]
16 changes: 16 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
@@ -316,6 +316,11 @@ where
.await
}

/// Get the username used to authenticate with all cluster servers
pub async fn get_username(&mut self) -> RedisResult<Value> {
self.route_operation_request(Operation::GetUsername).await
}

/// Routes an operation request to the appropriate handler.
async fn route_operation_request(
&mut self,
@@ -620,6 +625,7 @@ enum CmdArg<C> {
#[derive(Clone)]
enum Operation {
UpdateConnectionPassword(Option<String>),
GetUsername,
}

fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult<Option<Route>> {
@@ -2276,6 +2282,16 @@ where
.expect(MUTEX_WRITE_ERR);
Ok(Response::Single(Value::Okay))
}
Operation::GetUsername => {
let username = match core
.get_cluster_param(|params| params.username.clone())
.expect(MUTEX_READ_ERR)
{
Some(username) => Value::SimpleString(username),
None => Value::Nil,
};
Ok(Response::Single(username))
}
},
}
}
20 changes: 20 additions & 0 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -525,11 +525,31 @@ impl Client {
Some(ResponsePolicy::AllSucceeded),
));
let mut cmd = redis::cmd("AUTH");
if let Ok(Some(username)) = self.get_username().await {
cmd.arg(username);
}
cmd.arg(password);
self.send_command(&cmd, Some(routing)).await
}
}
}

/// Returns the username if one was configured during client creation. Otherwise, returns None.
async fn get_username(&mut self) -> RedisResult<Option<String>> {
match &mut self.internal_client {
ClientWrapper::Cluster { client } => match client.get_username().await {
Ok(Value::SimpleString(username)) => Ok(Some(username)),
Ok(Value::Nil) => Ok(None),
Ok(other) => unreachable!("Expected SimpleString or Nil, got: {:?}", other),
Err(e) => Err(RedisError::from((
ErrorKind::ResponseError,
"Error getting username",
format!("Received error - {:?}.", e),
))),
},
ClientWrapper::Standalone(client) => Ok(client.get_username()),
}
}
}

fn load_cmd(code: &[u8]) -> Cmd {
58 changes: 49 additions & 9 deletions glide-core/src/client/reconnecting_connection.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::{RwLock, RwLockReadGuard};
use std::time::Duration;
use telemetrylib::Telemetry;
use tokio::sync::{mpsc, Notify};
@@ -20,6 +21,9 @@ use tokio_retry2::{Retry, RetryError};

use super::{run_with_timeout, DEFAULT_CONNECTION_TIMEOUT};

const WRITE_LOCK_ERR: &str = "Failed to acquire the write lock";
const READ_LOCK_ERR: &str = "Failed to acquire the read lock";

/// The reason behind the call to `reconnect()`
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum ReconnectReason {
@@ -34,7 +38,7 @@ struct ConnectionBackend {
/// This signal is reset when a connection disconnects, and set when a new `ConnectionState` has been set with a `Connected` state.
connection_available_signal: ManualResetEvent,
/// Information needed in order to create a new connection.
connection_info: redis::Client,
connection_info: RwLock<redis::Client>,
/// Once this flag is set, the internal connection needs no longer try to reconnect to the server, because all the outer clients were dropped.
client_dropped_flagged: AtomicBool,
}
@@ -119,7 +123,14 @@ async fn create_connection(
discover_az: bool,
connection_timeout: Duration,
) -> Result<ReconnectingConnection, (ReconnectingConnection, RedisError)> {
let client = &connection_backend.connection_info;
let client = {
let guard = connection_backend
.connection_info
.read()
.expect(READ_LOCK_ERR);
guard.clone()
};

let connection_options = GlideConnectionOptions {
push_sender,
disconnect_notifier: Some::<Box<dyn DisconnectNotifier>>(Box::new(
@@ -128,8 +139,9 @@ async fn create_connection(
discover_az,
connection_timeout: Some(connection_timeout),
};

let action = || async {
get_multiplexed_connection(client, &connection_options)
get_multiplexed_connection(&client, &connection_options)
.await
.map_err(RetryError::transient)
};
@@ -141,7 +153,7 @@ async fn create_connection(
format!(
"Connection to {} created",
connection_backend
.connection_info
.get_backend_client()
.get_connection_info()
.addr
),
@@ -161,7 +173,7 @@ async fn create_connection(
format!(
"Failed connecting to {}, due to {err}",
connection_backend
.connection_info
.get_backend_client()
.get_connection_info()
.addr
),
@@ -204,6 +216,13 @@ fn internal_retry_iterator() -> impl Iterator<Item = Duration> {
.chain(std::iter::repeat(MAX_DURATION))
}

impl ConnectionBackend {
/// Returns a read-only reference to the client's connection information
fn get_backend_client(&self) -> RwLockReadGuard<'_, redis::Client> {
self.connection_info.read().expect(READ_LOCK_ERR)
}
}

impl ReconnectingConnection {
pub(super) async fn new(
address: &NodeAddress,
@@ -221,7 +240,7 @@ impl ReconnectingConnection {

let connection_info = get_client(address, tls_mode, redis_connection_info);
let backend = ConnectionBackend {
connection_info,
connection_info: RwLock::new(connection_info),
connection_available_signal: ManualResetEvent::new(true),
client_dropped_flagged: AtomicBool::new(false),
};
@@ -238,7 +257,7 @@ impl ReconnectingConnection {
pub(crate) fn node_address(&self) -> String {
self.inner
.backend
.connection_info
.get_backend_client()
.get_connection_info()
.addr
.to_string()
@@ -306,7 +325,10 @@ impl ReconnectingConnection {
// The reconnect task is spawned instead of awaited here, so that the reconnect attempt will continue in the
// background, regardless of whether the calling task is dropped or not.
task::spawn(async move {
let client = &connection_clone.inner.backend.connection_info;
let client = {
let guard = connection_clone.inner.backend.get_backend_client();
guard.clone()
};
for sleep_duration in internal_retry_iterator() {
if connection_clone.is_dropped() {
log_debug(
@@ -316,7 +338,8 @@ impl ReconnectingConnection {
// Client was dropped, reconnection attempts can stop
return;
}
match get_multiplexed_connection(client, &connection_clone.connection_options).await
match get_multiplexed_connection(&client, &connection_clone.connection_options)
.await
{
Ok(mut connection) => {
if connection
@@ -363,4 +386,21 @@ impl ReconnectingConnection {
log_error("disconnect notifier", "BUG! Disconnect notifier is not set");
}
}

/// Updates the password that's saved inside connection_info, that will be used in case of disconnection from the server.
pub(crate) fn update_connection_password(&self, new_password: Option<String>) {
let mut client = self
.inner
.backend
.connection_info
.write()
.expect(WRITE_LOCK_ERR);
client.update_password(new_password);
}

/// Returns the username if one was configured during client creation. Otherwise, returns None.
pub(crate) fn get_username(&self) -> Option<String> {
let client = self.inner.backend.get_backend_client();
client.get_connection_info().redis.username.clone()
}
}
21 changes: 13 additions & 8 deletions glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
@@ -607,15 +607,20 @@ impl StandaloneClient {
/// Update the password used to authenticate with the servers.
/// If the password is `None`, the password will be removed.
pub async fn update_connection_password(
&mut self,
password: Option<String>,
&self,
new_password: Option<String>,
) -> RedisResult<Value> {
self.get_connection(false)
.await
.get_connection()
.await?
.update_connection_password(password.clone())
.await
for node in self.inner.nodes.iter() {
node.update_connection_password(new_password.clone());
}

Ok(Value::Okay)
}

/// Retrieve the username used to authenticate with the server.
pub fn get_username(&self) -> Option<String> {
// All nodes in the client should have the same username configured, thus any connection would work here.
self.get_primary_connection().get_username()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment explaining that all nodes in the client should have the same username and password thus we can get it from any of the nodes

}
}

81 changes: 81 additions & 0 deletions java/integTest/src/test/java/glide/TestUtilities.java
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
import static glide.TestConfiguration.CLUSTER_HOSTS;
import static glide.TestConfiguration.STANDALONE_HOSTS;
import static glide.TestConfiguration.TLS;
import static glide.api.BaseClient.OK;
import static glide.api.models.GlideString.gs;
import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleSingleNodeRoute.RANDOM;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -452,4 +453,84 @@ public static String getServerVersion(@NonNull final BaseClient client) {
}
return null;
}

/**
* Delete an ACL user and assert it was deleted.
*
* @param client Glide client to be used for running the ACL DELUSER command.
* @param username The username of the ACL user to be deleted.
*/
@SneakyThrows
public static void deleteAclUser(GlideClient client, String username) {
try {
assertEquals(1L, client.customCommand(new String[] {"ACL", "DELUSER", username}).get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* Set an ACL user and a password for it.
*
* @param client Glide client to be used for running the ACL SETUSER command.
* @param username The username of the ACL user to be registered.
* @param password The password of the ACL user to be registered.
*/
@SneakyThrows
public static void setNewAclUserPassword(GlideClient client, String username, String password) {
try {
assertEquals(
OK,
client
.customCommand(
new String[] {
"ACL", "SETUSER", username, "on", ">" + password, "~*", "&*", "+@all",
})
.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* Delete an ACL user and assert it was deleted.
*
* @param client Glide client to be used for running the ACL DELUSER command.
* @param username The username of the ACL user to be deleted.
*/
@SneakyThrows
public static void deleteAclUser(GlideClusterClient client, String username) {
try {
assertEquals(
1L,
client.customCommand(new String[] {"ACL", "DELUSER", username}).get().getSingleValue());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* Set an ACL user and a password for it.
*
* @param client Glide client to be used for running the ACL SETUSER command.
* @param username The username of the ACL user to be registered.
* @param password The password of the ACL user to be registered.
*/
@SneakyThrows
public static void setNewAclUserPassword(
GlideClusterClient client, String username, String password) {
try {
assertEquals(
OK,
client
.customCommand(
new String[] {
"ACL", "SETUSER", username, "on", ">" + password, "~*", "&*", "+@all",
})
.get()
.getSingleValue());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Loading
Loading