Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of actor runtime #99

Merged
merged 36 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e455e80
actor implementation
danielgerlag Apr 29, 2023
ae7aef7
wip
danielgerlag Apr 29, 2023
7db861e
wip
danielgerlag May 2, 2023
2bd95e1
wip
danielgerlag May 2, 2023
ebf701e
wip
danielgerlag May 4, 2023
2e34b22
nits
danielgerlag May 4, 2023
c95a9c0
tests
danielgerlag May 4, 2023
45eab14
make client cloneable
danielgerlag May 10, 2023
3f286d5
logs
danielgerlag May 10, 2023
dc6156f
logging
danielgerlag May 10, 2023
7981005
async methods
danielgerlag May 16, 2023
513b334
shutdown semantics
danielgerlag Jul 18, 2023
6dbe2b7
clone actor client context
danielgerlag Jul 18, 2023
7abc466
actor implementation
danielgerlag Aug 2, 2023
0a0ff9b
wip
danielgerlag Aug 2, 2023
188753d
move tests
danielgerlag Aug 2, 2023
b9cd1f6
actor factory
danielgerlag Aug 2, 2023
8581e82
wip
danielgerlag Aug 2, 2023
d18f355
wip
danielgerlag Aug 2, 2023
b6b6fe3
readme
danielgerlag Aug 3, 2023
6971ab5
pr feedback Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>
danielgerlag Aug 21, 2023
a4241d5
cargo fmt
danielgerlag Aug 21, 2023
8e77624
cargo clippy --fix
danielgerlag Aug 21, 2023
a58192b
proc macro
danielgerlag Aug 22, 2023
6fc06ac
dependency shuffle
danielgerlag Aug 22, 2023
029f482
Update lib.rs
danielgerlag Aug 22, 2023
a908fac
docs
danielgerlag Aug 22, 2023
877efed
enable decorating type alias
danielgerlag Aug 22, 2023
a979a1f
graceful shutdown
danielgerlag Oct 11, 2023
6a3c6d8
Merge remote-tracking branch 'upstream/master' into actors-refactor
danielgerlag Jan 8, 2024
8769c21
merge issues
danielgerlag Jan 8, 2024
0030ddd
cargo fmt
danielgerlag Jan 8, 2024
d0e5384
update rust version
danielgerlag Jan 8, 2024
0c0ee4a
publish macro crate
danielgerlag Jan 8, 2024
f32f4cd
dependency issue
danielgerlag Jan 8, 2024
a2d3494
clippy warnings
danielgerlag Jan 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ env:
CARGO_TERM_COLOR: always
CARGO_TOKEN: ${{ secrets.CRATES_IO_TOKEN }}
PROTOC_VERSION: '3.x'
RUST_TOOLCHAIN: '1.67.0'
RUST_TOOLCHAIN: '1.70.0'

jobs:
lint:
Expand Down Expand Up @@ -82,5 +82,9 @@ jobs:
with:
version: ${{ env.PROTOC_VERSION }}
- uses: actions/checkout@v2
- name: cargo publish macros
run: cargo publish --manifest-path macros/Cargo.toml --token ${{ env.CARGO_TOKEN }}
- name: cargo publish
run: cargo publish --token ${{ env.CARGO_TOKEN }}


1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk
.vscode/settings.json
25 changes: 24 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dapr"
version = "0.13.0"
version = "0.14.0"
authors = ["dapr.io"]
edition = "2021"
license-file = "LICENSE"
Expand All @@ -11,16 +11,31 @@ keywords = ["microservices", "dapr"]


[dependencies]
dapr-macros = {version="0.14.0", path = "macros" }
futures = "0.3"
tonic = "0.8"
prost = "0.11"
bytes = "1"
prost-types = "0.11"
async-trait = "0.1"
env_logger = "0.10"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
axum = {version = "0.6.19", features = ["default", "headers"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, this will only work with HTTP (hyper underneath), and not gRPC (tonic underneath)?

Will the actors work with for Dapr in gRPC mode?

My assumption here is that it only works in HTTP mode (because of axum, with hyper underneath).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do see Tonic being used in the client example, so I assume this is a non-issue. Will leave the comment there anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dapr side car currently does not support invoking actors via grpc as far as I know, so any actor host would have to support http.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dapr side car currently does not support invoking actors via grpc as far as I know, so any actor host would have to support http.

That's correct

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaron2 Is it supposed to be upgraded soon ? or has it be ?

tokio = { version = "1.29", features = ["sync"] }
chrono = "0.4.24"

[build-dependencies]
tonic-build = "0.8"

[dev-dependencies]
axum-test = "12.1.0"
once_cell = "1.18.0"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1.4.0", features = ["v4"] }
dapr = {path="./"}
tokio-test = "0.4.2"
tokio-stream = { version = "0.1" }

[[example]]
Expand Down Expand Up @@ -54,3 +69,11 @@ path = "examples/invoke/grpc-proxying/client.rs"
[[example]]
name = "invoke-grpc-proxying-server"
path = "examples/invoke/grpc-proxying/server.rs"

[[example]]
name = "actor-client"
path = "examples/actors/client.rs"

[[example]]
name = "actor-server"
path = "examples/actors/server.rs"
101 changes: 101 additions & 0 deletions examples/actors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Actor Example

This example demonstrates the Dapr actor framework. To author an actor,

1. Create a struc decorated with the `#[dapr::actor]` macro to house your custom actor methods that map to [Axum handlers](https://docs.rs/axum/latest/axum/handler/index.html), use [Axum extractors](https://docs.rs/axum/latest/axum/extract/index.html) to access the incoming request and return an [`impl IntoResponse`](https://docs.rs/axum/latest/axum/response/trait.IntoResponse.html).
Use the `DaprJson` extractor to deserialize the request from Json coming from a Dapr sidecar.
```rust
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I think the example should also include the struct MyActor {] and any derive macros necessary.

#[dapr::actor]
struct MyActor {
id: String,
client: ActorContextClient
}

#[derive(Serialize, Deserialize)]
pub struct MyRequest {
pub name: String,
}

#[derive(Serialize, Deserialize)]
pub struct MyResponse {
pub available: bool,
}

impl MyActor {
fn do_stuff(&self, DaprJson(data): DaprJson<MyRequest>) -> Json<MyResponse> {
println!("doing stuff with {}", data.name);
Json(MyResponse {
available: true
})
}
}
```

There are many ways to write your actor method signature, using Axum handlers, but you also have access to the actor instance via `self`. Here is a super simple example:
```rust
pub async fn method_2(&self) -> impl IntoResponse {
StatusCode::OK
}
```
1. Implement the `Actor` trait. This trait exposes the following methods:
- `on_activate` - Called when an actor is activated on a host
- `on_deactivate` - Called when an actor is deactivated on a host
- `on_reminder` - Called when a reminder is recieved from the Dapr sidecar
- `on_timer` - Called when a timer is recieved from the Dapr sidecar


```rust
#[async_trait]
impl Actor for MyActor {

async fn on_activate(&self) -> Result<(), ActorError> {
println!("on_activate {}", self.id);
Ok(())
}

async fn on_deactivate(&self) -> Result<(), ActorError> {
println!("on_deactivate");
Ok(())
}
}
```

1. An actor host requires an Http server to recieve callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes an `ActorTypeRegistration` which specifies
- The actor type name (used by Actor clients), and concrete struct
- A factory to construct a new instance of that actor type when one is required to be activated by the runtime. The parameters passed to the factory will be the actor type, actor ID, and a Dapr client for managing state, timers and reminders for the actor.
- The methods that you would like to expose to external clients.

```rust
let mut dapr_server = dapr::server::DaprHttpServer::new();

dapr_server.register_actor(ActorTypeRegistration::new::<MyActor>("MyActor",
Box::new(|actor_type, id, client| Arc::new(MyActor{
actor_type,
id,
client
})))
.register_method("do_stuff", MyActor::do_stuff)
.register_method("do_other_stuff", MyActor::do_other_stuff));

dapr_server.start(None).await?;
```


## Running

> Before you run the example make sure local redis state store is running by executing:
> ```
> docker ps
> ```

To run this example:

1. Start actor host (expose Http server receiver on port 50051):
```bash
dapr run --app-id actor-host --app-protocol http --app-port 50051 cargo run -- --example actor-server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth pointing out that only --app-protocol http is support (if that is indeed the case).
Is it expected that --app-protocol grpc might one day be available?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One day, but not today :)

```

2. Start actor client:
```bash
dapr run --app-id actor-client --dapr-grpc-port 3502 cargo run -- --example actor-client
```
37 changes: 37 additions & 0 deletions examples/actors/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct MyResponse {
pub available: bool,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct MyRequest {
pub name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));

// Get the Dapr port and create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);

// Create the client
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;

let data = MyRequest {
name: "test".to_string(),
};

let resp: Result<MyResponse, dapr::error::Error> = client
.invoke_actor("MyActor", "a1", "do_stuff", data, None)
.await;

println!("Response: {:#?}", resp);

Ok(())
}
86 changes: 86 additions & 0 deletions examples/actors/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use async_trait::async_trait;
use axum::Json;
use dapr::server::{
actor::{
context_client::ActorContextClient, runtime::ActorTypeRegistration, Actor, ActorError,
},
utils::DaprJson,
};
use dapr_macros::actor;
use serde::{Deserialize, Serialize};
use std::{str::from_utf8, sync::Arc};

#[derive(Serialize, Deserialize, Debug)]
pub struct MyResponse {
pub available: bool,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct MyRequest {
pub name: String,
}

#[actor]
struct MyActor {
id: String,
client: ActorContextClient,
}

impl MyActor {
async fn do_stuff(&self, DaprJson(req): DaprJson<MyRequest>) -> Json<MyResponse> {
println!("doing stuff with {}", req.name);
let mut dapr = self.client.clone();
let r = dapr.get_actor_state("key1").await.unwrap();
println!("get_actor_state {:?}", r);
Json(MyResponse { available: true })
}
}

#[async_trait]
impl Actor for MyActor {
async fn on_activate(&self) -> Result<(), ActorError> {
println!("on_activate {}", self.id);
Ok(())
}

async fn on_deactivate(&self) -> Result<(), ActorError> {
println!("on_deactivate");
Ok(())
}

async fn on_reminder(&self, reminder_name: &str, data: Vec<u8>) -> Result<(), ActorError> {
println!("on_reminder {} {:?}", reminder_name, from_utf8(&data));
Ok(())
}

async fn on_timer(&self, timer_name: &str, data: Vec<u8>) -> Result<(), ActorError> {
println!("on_timer {} {:?}", timer_name, from_utf8(&data));
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let mut dapr_server = dapr::server::DaprHttpServer::new().await;

dapr_server
.register_actor(
ActorTypeRegistration::new::<MyActor>(
"MyActor",
Box::new(|_actor_type, actor_id, context| {
Arc::new(MyActor {
id: actor_id.to_string(),
client: context,
})
}),
)
.register_method("do_stuff", MyActor::do_stuff)
.register_method("do_stuff2", MyActor::do_stuff),
)
.await;

dapr_server.start(None).await?;

Ok(())
}
2 changes: 1 addition & 1 deletion examples/pubsub/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl AppCallback for AppCallbackService {
let data = &r.data;
let data_content_type = &r.data_content_type;

let message = String::from_utf8_lossy(&data);
let message = String::from_utf8_lossy(data);
println!("Message: {}", &message);
println!("Content-Type: {}", &data_content_type);

Expand Down
1 change: 1 addition & 0 deletions macros/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target
14 changes: 14 additions & 0 deletions macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "dapr-macros"
version = "0.14.0"
edition = "2021"

[lib]
proc-macro = true

[dependencies]
async-trait = "0.1"
log = "0.4"
axum = "0.6.19"
syn = {version="2.0.29",features=["full"]}
quote = "1.0.8"
61 changes: 61 additions & 0 deletions macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::iter;

use proc_macro::TokenStream;
use quote::quote;

#[proc_macro_attribute]
pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream {
let actor_struct_name = match syn::parse::<syn::ItemStruct>(item.clone()) {
Ok(actor_struct) => actor_struct.ident.clone(),
Err(_) => match syn::parse::<syn::ItemType>(item.clone()) {
Ok(ty) => ty.ident.clone(),
Err(e) => panic!("Error parsing actor struct: {}", e),
},
};

let mut result = TokenStream::from(quote!(
#[async_trait::async_trait]
impl axum::extract::FromRequestParts<dapr::server::actor::runtime::ActorState> for &#actor_struct_name {
type Rejection = dapr::server::actor::ActorRejection;

async fn from_request_parts(
parts: &mut axum::http::request::Parts,
state: &dapr::server::actor::runtime::ActorState,
) -> Result<Self, Self::Rejection> {
let path = match axum::extract::Path::<dapr::server::actor::ActorPath>::from_request_parts(parts, state).await {
Ok(path) => path,
Err(e) => {
log::error!("Error getting path: {}", e);
return Err(dapr::server::actor::ActorRejection::Path(e));
}
};
let actor_type = state.actor_type.clone();
let actor_id = path.actor_id.clone();
log::info!(
"Request for actor_type: {}, actor_id: {}",
actor_type,
actor_id
);
let actor = match state
.runtime
.get_or_create_actor(&actor_type, &actor_id)
.await
{
Ok(actor) => actor,
Err(e) => {
log::error!("Error getting actor: {}", e);
return Err(dapr::server::actor::ActorRejection::ActorError(e.to_string()));
}
};
let actor = actor.as_ref();
let well_known_actor =
unsafe { &*(actor as *const dyn dapr::server::actor::Actor as *const #actor_struct_name) };
Ok(well_known_actor)
}
}
));

result.extend(iter::once(item));

result
}