Skip to content

Commit

Permalink
Feature/configuration (dapr#110)
Browse files Browse the repository at this point in the history
* Add configuration example

Signed-off-by: Hauke Jung <hauke.jung@outlook.de>

* Set tokio-stream as dev dependency

Signed-off-by: Hauke Jung <hauke.jung@outlook.de>

* cargo fmt

Signed-off-by: Hauke Jung <hauke.jung@outlook.de>

---------

Signed-off-by: Hauke Jung <hauke.jung@outlook.de>
  • Loading branch information
hauju committed Jan 3, 2024
1 parent b1ba3cc commit 2664e64
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ tonic-build = "0.8"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1" }

[[example]]
name = "client"
path = "examples/client/client.rs"

[[example]]
name = "configuration"
path = "examples/configuration/main.rs"

[[example]]
name = "publisher"
path = "examples/pubsub/publisher.rs"
Expand Down
12 changes: 12 additions & 0 deletions examples/components/configstore.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: configstore
spec:
type: configuration.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
33 changes: 33 additions & 0 deletions examples/configuration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Before you run the example make sure local redis state store is running by executing:
```bash
docker ps
```

1. To run the example we need to first build the examples using the following command:

```bash
cargo build --examples
```

2. Run the example with dapr using the following command:

```bash
dapr run --app-id=rustapp --resources-path ./examples/components --dapr-grpc-port 3500 -- cargo run --example configuration
```

3. Change the value of the key `hello` in redis using the following command:

```bash
docker exec dapr_redis redis-cli MSET hello "world"
```


If everything went well you should see the following output along with dapr logs:
```
Configuration value: ConfigurationItem { value: "world", version: "", metadata: {} }
App subscribed to config changes with subscription id: "d383169a-0893-4c64-adde-fc3145b56d07"
Configuration value: {"hello": ConfigurationItem { value: "world", version: "", metadata: {} }}
App unsubscribed from config changes
```


59 changes: 59 additions & 0 deletions examples/configuration/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use tokio_stream::StreamExt;

const CONFIGSTORE_NAME: &str = "configstore";
type DaprClient = dapr::Client<dapr::client::TonicClient>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));

// Get the Dapr port and create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);

// Create the client
let mut client = DaprClient::connect(addr).await?;

let key = String::from("hello");

// save key-value pair in the state store
let response = client
.get_configuration(CONFIGSTORE_NAME, vec![(&key)])
.await?;
let val = response.items.get("hello").unwrap();
println!("Configuration value: {val:?}");

// Subscribe for configuration changes
let mut stream = client
.subscribe_configuration(CONFIGSTORE_NAME, vec![(&key)])
.await?;

let mut subscription_id = String::new();
while let Some(result) = stream.next().await {
let subscribe = result.unwrap();
if subscribe.items.is_empty() {
// Update the subscription_id
subscription_id = subscribe.id.clone();
println!("App subscribed to config changes with subscription id: {subscription_id:?} ");
continue;
}
println!("Configuration value: {:?}", subscribe.items);
unsubscribe(&mut client, &subscription_id).await;
}

Ok(())
}

// Function to unsubscribe from configuration updates and exit the app
async fn unsubscribe(client: &mut DaprClient, subscription_id: &str) {
match client
.unsubscribe_configuration("CONFIGSTORE_NAME", subscription_id)
.await
{
Ok(_) => println!("App unsubscribed from config changes"),
Err(e) => println!("Error unsubscribing from config updates: {}", e),
}
std::process::exit(0);
}
120 changes: 118 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use prost_types::Any;
use std::collections::HashMap;
use tonic::Streaming;
use tonic::{async_trait, transport::Channel as TonicChannel, Request};

use crate::dapr::*;
Expand Down Expand Up @@ -248,6 +248,62 @@ impl<T: DaprInterface> Client<T> {
pub async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
self.0.get_metadata().await
}

/// Get the configuration for a specific key
/// ///
/// # Arguments
///
/// * `store_name` - The name of config store.
/// * `keys` - The key of the desired configuration.
pub async fn get_configuration<S, K>(
&mut self,
store_name: S,
keys: Vec<K>,
) -> Result<GetConfigurationResponse, Error>
where
S: Into<String>,
K: Into<String>,
{
let request = GetConfigurationRequest {
store_name: store_name.into(),
keys: keys.into_iter().map(|key| key.into()).collect(),
metadata: Default::default(),
};
self.0.get_configuration(request).await
}

/// Subscribe to configuration changes
pub async fn subscribe_configuration<S>(
&mut self,
store_name: S,
keys: Vec<S>,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error>
where
S: Into<String>,
{
let request = SubscribeConfigurationRequest {
store_name: store_name.into(),
keys: keys.into_iter().map(|key| key.into()).collect(),
metadata: Default::default(),
};
self.0.subscribe_configuration(request).await
}

/// Unsubscribe from configuration changes
pub async fn unsubscribe_configuration<S>(
&mut self,
store_name: S,
id: S,
) -> Result<UnsubscribeConfigurationResponse, Error>
where
S: Into<String>,
{
let request = UnsubscribeConfigurationRequest {
id: id.into(),
store_name: store_name.into(),
};
self.0.unsubscribe_configuration(request).await
}
}

#[async_trait]
Expand All @@ -269,6 +325,18 @@ pub trait DaprInterface: Sized {
async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error>;
async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error>;
async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error>;
async fn get_configuration(
&mut self,
request: GetConfigurationRequest,
) -> Result<GetConfigurationResponse, Error>;
async fn subscribe_configuration(
&mut self,
request: SubscribeConfigurationRequest,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error>;
async fn unsubscribe_configuration(
&mut self,
request: UnsubscribeConfigurationRequest,
) -> Result<UnsubscribeConfigurationResponse, Error>;
}

#[async_trait]
Expand Down Expand Up @@ -334,6 +402,36 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
Ok(self.get_metadata(Request::new(())).await?.into_inner())
}

async fn get_configuration(
&mut self,
request: GetConfigurationRequest,
) -> Result<GetConfigurationResponse, Error> {
Ok(self
.get_configuration(Request::new(request))
.await?
.into_inner())
}

async fn subscribe_configuration(
&mut self,
request: SubscribeConfigurationRequest,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error> {
Ok(self
.subscribe_configuration(Request::new(request))
.await?
.into_inner())
}

async fn unsubscribe_configuration(
&mut self,
request: UnsubscribeConfigurationRequest,
) -> Result<UnsubscribeConfigurationResponse, Error> {
Ok(self
.unsubscribe_configuration(Request::new(request))
.await?
.into_inner())
}
}

/// A request from invoking a service
Expand Down Expand Up @@ -378,6 +476,24 @@ pub type GetMetadataResponse = dapr_v1::GetMetadataResponse;
/// A request for setting metadata
pub type SetMetadataRequest = dapr_v1::SetMetadataRequest;

/// A request for getting configuration
pub type GetConfigurationRequest = dapr_v1::GetConfigurationRequest;

/// A response from getting configuration
pub type GetConfigurationResponse = dapr_v1::GetConfigurationResponse;

/// A request for subscribing to configuration changes
pub type SubscribeConfigurationRequest = dapr_v1::SubscribeConfigurationRequest;

/// A response from subscribing tto configuration changes
pub type SubscribeConfigurationResponse = dapr_v1::SubscribeConfigurationResponse;

/// A request for unsubscribing from configuration changes
pub type UnsubscribeConfigurationRequest = dapr_v1::UnsubscribeConfigurationRequest;

/// A response from unsubscribing from configuration changes
pub type UnsubscribeConfigurationResponse = dapr_v1::UnsubscribeConfigurationResponse;

/// A tonic based gRPC client
pub type TonicClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;

Expand Down

0 comments on commit 2664e64

Please sign in to comment.