Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
Signed-off-by: Hauke Jung <hauke.jung@outlook.de>
  • Loading branch information
hauju committed Nov 13, 2023
1 parent fa754de commit f314df2
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 26 deletions.
14 changes: 10 additions & 4 deletions examples/configuration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let key = String::from("hello");

// save key-value pair in the state store
let response = client.get_configuration(CONFIGSTORE_NAME, vec![(&key)]).await?;
let response = client
.get_configuration(CONFIGSTORE_NAME, vec![(&key)])
.await?;
let val = response.items.get("hello").unwrap();
println!("Configuration value: {val:?}");

// Subscribe for configuration changes
let mut stream = client.subscribe_configuration(CONFIGSTORE_NAME, vec![(&key)]).await?;
let mut stream = client
.subscribe_configuration(CONFIGSTORE_NAME, vec![(&key)])
.await?;

let mut subscription_id = String::new();
while let Some(result) = stream.next().await {
Expand All @@ -44,10 +48,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Function to unsubscribe from configuration updates and exit the app
async fn unsubscribe(client: &mut DaprClient, subscription_id: &str) {
match client.unsubscribe_configuration("CONFIGSTORE_NAME", subscription_id).await {
match client
.unsubscribe_configuration("CONFIGSTORE_NAME", subscription_id)
.await
{
Ok(_) => println!("App unsubscribed from config changes"),
Err(e) => println!("Error unsubscribing from config updates: {}", e),
}
std::process::exit(0);
}

83 changes: 61 additions & 22 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use prost_types::Any;
use std::collections::HashMap;
use tonic::Streaming;
use tonic::{async_trait, transport::Channel as TonicChannel, Request};

Expand Down Expand Up @@ -255,37 +255,49 @@ impl<T: DaprInterface> Client<T> {
///
/// * `store_name` - The name of config store.
/// * `keys` - The key of the desired configuration.
pub async fn get_configuration<S, K>(&mut self, store_name: S, keys: Vec<K>) -> Result<GetConfigurationResponse, Error>
pub async fn get_configuration<S, K>(
&mut self,
store_name: S,
keys: Vec<K>,
) -> Result<GetConfigurationResponse, Error>
where
S: Into<String>,
K: Into<String> {
S: Into<String>,
K: Into<String>,
{
let request = GetConfigurationRequest {
store_name: store_name.into(),
keys: keys.into_iter().map(|key| key.into()).collect(),
metadata: Default::default(),
metadata: Default::default(),
};
self.0.get_configuration(request).await
}

/// Subscribe to configuration changes
pub async fn subscribe_configuration<S>(&mut self, store_name: S, keys: Vec<S>) -> Result<Streaming<SubscribeConfigurationResponse>, Error>
pub async fn subscribe_configuration<S>(
&mut self,
store_name: S,
keys: Vec<S>,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error>
where
S: Into<String> {
S: Into<String>,
{
let request = SubscribeConfigurationRequest {
store_name: store_name.into(),
keys: keys.into_iter().map(|key| key.into()).collect(),
metadata: Default::default(),
metadata: Default::default(),
};
self.0.subscribe_configuration(request).await
}

/// Unsubscribe from configuration changes
pub async fn unsubscribe_configuration<S>(&mut self, store_name: S, id: S) -> Result<UnsubscribeConfigurationResponse, Error>
pub async fn unsubscribe_configuration<S>(
&mut self,
store_name: S,
id: S,
) -> Result<UnsubscribeConfigurationResponse, Error>
where
S: Into<String> {
S: Into<String>,
{
let request = UnsubscribeConfigurationRequest {
id: id.into(),
store_name: store_name.into(),
Expand Down Expand Up @@ -313,9 +325,18 @@ pub trait DaprInterface: Sized {
async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error>;
async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error>;
async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error>;
async fn get_configuration(&mut self, request: GetConfigurationRequest) -> Result<GetConfigurationResponse, Error>;
async fn subscribe_configuration(&mut self, request: SubscribeConfigurationRequest) -> Result<Streaming<SubscribeConfigurationResponse>, Error>;
async fn unsubscribe_configuration(&mut self, request: UnsubscribeConfigurationRequest) -> Result<UnsubscribeConfigurationResponse, Error>;
async fn get_configuration(
&mut self,
request: GetConfigurationRequest,
) -> Result<GetConfigurationResponse, Error>;
async fn subscribe_configuration(
&mut self,
request: SubscribeConfigurationRequest,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error>;
async fn unsubscribe_configuration(
&mut self,
request: UnsubscribeConfigurationRequest,
) -> Result<UnsubscribeConfigurationResponse, Error>;
}

#[async_trait]
Expand Down Expand Up @@ -382,16 +403,34 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
Ok(self.get_metadata(Request::new(())).await?.into_inner())
}

async fn get_configuration(&mut self, request: GetConfigurationRequest) -> Result<GetConfigurationResponse, Error> {
Ok(self.get_configuration(Request::new(request)).await?.into_inner())
async fn get_configuration(
&mut self,
request: GetConfigurationRequest,
) -> Result<GetConfigurationResponse, Error> {
Ok(self
.get_configuration(Request::new(request))
.await?
.into_inner())
}

async fn subscribe_configuration(&mut self, request: SubscribeConfigurationRequest) -> Result<Streaming<SubscribeConfigurationResponse>, Error> {
Ok(self.subscribe_configuration(Request::new(request)).await?.into_inner())
async fn subscribe_configuration(
&mut self,
request: SubscribeConfigurationRequest,
) -> Result<Streaming<SubscribeConfigurationResponse>, Error> {
Ok(self
.subscribe_configuration(Request::new(request))
.await?
.into_inner())
}

async fn unsubscribe_configuration(&mut self, request: UnsubscribeConfigurationRequest) -> Result<UnsubscribeConfigurationResponse, Error> {
Ok(self.unsubscribe_configuration(Request::new(request)).await?.into_inner())
async fn unsubscribe_configuration(
&mut self,
request: UnsubscribeConfigurationRequest,
) -> Result<UnsubscribeConfigurationResponse, Error> {
Ok(self
.unsubscribe_configuration(Request::new(request))
.await?
.into_inner())
}
}

Expand Down

0 comments on commit f314df2

Please sign in to comment.