Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/configuration #110

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ tonic-build = "0.8"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1" }

[[example]]
name = "client"
path = "examples/client/client.rs"

[[example]]
name = "configuration"
path = "examples/configuration/main.rs"

[[example]]
name = "publisher"
path = "examples/pubsub/publisher.rs"
Expand Down
12 changes: 12 additions & 0 deletions examples/components/configstore.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: configstore
spec:
type: configuration.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
33 changes: 33 additions & 0 deletions examples/configuration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Before you run the example make sure local redis state store is running by executing:
```bash
docker ps
```

1. To run the example we need to first build the examples using the following command:

```bash
cargo build --examples
```

2. Run the example with dapr using the following command:

```bash
dapr run --app-id=rustapp --resources-path ./examples/components --dapr-grpc-port 3500 -- cargo run --example configuration
```

3. Change the value of the key `hello` in redis using the following command:

```bash
docker exec dapr_redis redis-cli MSET hello "world"
```


If everything went well you should see the following output along with dapr logs:
```
Configuration value: ConfigurationItem { value: "world", version: "", metadata: {} }
App subscribed to config changes with subscription id: "d383169a-0893-4c64-adde-fc3145b56d07"
Configuration value: {"hello": ConfigurationItem { value: "world", version: "", metadata: {} }}
App unsubscribed from config changes
```


59 changes: 59 additions & 0 deletions examples/configuration/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use tokio_stream::StreamExt;

const CONFIGSTORE_NAME: &str = "configstore";
type DaprClient = dapr::Client<dapr::client::TonicClient>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));

// Get the Dapr port and create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);

// Create the client
let mut client = DaprClient::connect(addr).await?;

let key = String::from("hello");

// save key-value pair in the state store
let response = client
.get_configuration(CONFIGSTORE_NAME, vec![(&key)])
.await?;
let val = response.items.get("hello").unwrap();
println!("Configuration value: {val:?}");

// Subscribe for configuration changes
let mut stream = client
.subscribe_configuration(CONFIGSTORE_NAME, vec![(&key)])
.await?;

let mut subscription_id = String::new();
while let Some(result) = stream.next().await {
let subscribe = result.unwrap();
if subscribe.items.is_empty() {
// Update the subscription_id
subscription_id = subscribe.id.clone();
println!("App subscribed to config changes with subscription id: {subscription_id:?} ");
continue;
}
println!("Configuration value: {:?}", subscribe.items);
unsubscribe(&mut client, &subscription_id).await;
}

Ok(())
}

// Function to unsubscribe from configuration updates and exit the app
async fn unsubscribe(client: &mut DaprClient, subscription_id: &str) {
match client
.unsubscribe_configuration("CONFIGSTORE_NAME", subscription_id)
.await
{
Ok(_) => println!("App unsubscribed from config changes"),
Err(e) => println!("Error unsubscribing from config updates: {}", e),
}
std::process::exit(0);
}
120 changes: 118 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use prost_types::Any;
use std::collections::HashMap;
use tonic::Streaming;
use tonic::{async_trait, transport::Channel as TonicChannel, Request};

use crate::dapr::*;
Expand Down Expand Up @@ -248,6 +248,62 @@ impl<T: DaprInterface> Client<T> {
pub async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
self.0.get_metadata().await
}

/// Get the configuration for a specific key
/// ///
/// # Arguments
///
/// * `store_name` - The name of config store.
/// * `keys` - The key of the desired configuration.
pub async fn get_configuration<S, K>(
&mut self,
store_name: S,
keys: Vec<K>,
) -> Result<GetConfigurationResponse, Error>
where
S: Into<String>,
K: Into<String>,
{
let request = GetConfigurationRequest {
store_name: store_name.into(),
keys: keys.into_iter().map(|key| key.into()).collect(),
metadata: Default::default(),
};
self.0.get_configuration(request).await
}

/// Subscribe to configuration changes
pub async fn subscribe_configuration<S>(
&mut self,
store_name: S,
keys: Vec<S>,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error>
where
S: Into<String>,
{
let request = SubscribeConfigurationRequest {
store_name: store_name.into(),
keys: keys.into_iter().map(|key| key.into()).collect(),
metadata: Default::default(),
};
self.0.subscribe_configuration(request).await
}

/// Unsubscribe from configuration changes
pub async fn unsubscribe_configuration<S>(
&mut self,
store_name: S,
id: S,
) -> Result<UnsubscribeConfigurationResponse, Error>
where
S: Into<String>,
{
let request = UnsubscribeConfigurationRequest {
id: id.into(),
store_name: store_name.into(),
};
self.0.unsubscribe_configuration(request).await
}
}

#[async_trait]
Expand All @@ -269,6 +325,18 @@ pub trait DaprInterface: Sized {
async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error>;
async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error>;
async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error>;
async fn get_configuration(
&mut self,
request: GetConfigurationRequest,
) -> Result<GetConfigurationResponse, Error>;
async fn subscribe_configuration(
&mut self,
request: SubscribeConfigurationRequest,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error>;
async fn unsubscribe_configuration(
&mut self,
request: UnsubscribeConfigurationRequest,
) -> Result<UnsubscribeConfigurationResponse, Error>;
}

#[async_trait]
Expand Down Expand Up @@ -334,6 +402,36 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
Ok(self.get_metadata(Request::new(())).await?.into_inner())
}

async fn get_configuration(
&mut self,
request: GetConfigurationRequest,
) -> Result<GetConfigurationResponse, Error> {
Ok(self
.get_configuration(Request::new(request))
.await?
.into_inner())
}

async fn subscribe_configuration(
&mut self,
request: SubscribeConfigurationRequest,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error> {
Ok(self
.subscribe_configuration(Request::new(request))
.await?
.into_inner())
}

async fn unsubscribe_configuration(
&mut self,
request: UnsubscribeConfigurationRequest,
) -> Result<UnsubscribeConfigurationResponse, Error> {
Ok(self
.unsubscribe_configuration(Request::new(request))
.await?
.into_inner())
}
}

/// A request from invoking a service
Expand Down Expand Up @@ -378,6 +476,24 @@ pub type GetMetadataResponse = dapr_v1::GetMetadataResponse;
/// A request for setting metadata
pub type SetMetadataRequest = dapr_v1::SetMetadataRequest;

/// A request for getting configuration
pub type GetConfigurationRequest = dapr_v1::GetConfigurationRequest;

/// A response from getting configuration
pub type GetConfigurationResponse = dapr_v1::GetConfigurationResponse;

/// A request for subscribing to configuration changes
pub type SubscribeConfigurationRequest = dapr_v1::SubscribeConfigurationRequest;

/// A response from subscribing tto configuration changes
pub type SubscribeConfigurationResponse = dapr_v1::SubscribeConfigurationResponse;

/// A request for unsubscribing from configuration changes
pub type UnsubscribeConfigurationRequest = dapr_v1::UnsubscribeConfigurationRequest;

/// A response from unsubscribing from configuration changes
pub type UnsubscribeConfigurationResponse = dapr_v1::UnsubscribeConfigurationResponse;

/// A tonic based gRPC client
pub type TonicClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;

Expand Down