Skip to content

Commit

Permalink
Initial implementation of actor runtime (#99)
Browse files Browse the repository at this point in the history
* actor implementation

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* wip

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* wip

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* wip

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* wip

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* nits

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* tests

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* make client cloneable

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* logs

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* logging

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* async methods

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* shutdown semantics

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* clone actor client context

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* actor implementation

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* wip

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* move tests

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* actor factory

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* wip

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* wip

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* readme

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* pr feedback Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* cargo fmt

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* cargo clippy --fix

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* proc macro

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* dependency shuffle

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* Update lib.rs

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* docs

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* enable decorating type alias

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* graceful shutdown

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* merge issues

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* cargo fmt

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* update rust version

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* publish macro crate

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* dependency issue

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

* clippy warnings

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>

---------

Signed-off-by: Daniel Gerlag <daniel@gerlag.ca>
  • Loading branch information
danielgerlag committed Jan 9, 2024
1 parent 6950787 commit 8db69d8
Show file tree
Hide file tree
Showing 23 changed files with 1,708 additions and 27 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ env:
CARGO_TERM_COLOR: always
CARGO_TOKEN: ${{ secrets.CRATES_IO_TOKEN }}
PROTOC_VERSION: '3.x'
RUST_TOOLCHAIN: '1.67.0'
RUST_TOOLCHAIN: '1.70.0'

jobs:
lint:
Expand Down Expand Up @@ -82,5 +82,9 @@ jobs:
with:
version: ${{ env.PROTOC_VERSION }}
- uses: actions/checkout@v2
- name: cargo publish macros
run: cargo publish --manifest-path macros/Cargo.toml --token ${{ env.CARGO_TOKEN }}
- name: cargo publish
run: cargo publish --token ${{ env.CARGO_TOKEN }}


1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk
.vscode/settings.json
25 changes: 24 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dapr"
version = "0.13.0"
version = "0.14.0"
authors = ["dapr.io"]
edition = "2021"
license-file = "LICENSE"
Expand All @@ -11,16 +11,31 @@ keywords = ["microservices", "dapr"]


[dependencies]
dapr-macros = {version="0.14.0", path = "macros" }
futures = "0.3"
tonic = "0.8"
prost = "0.11"
bytes = "1"
prost-types = "0.11"
async-trait = "0.1"
env_logger = "0.10"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
axum = {version = "0.6.19", features = ["default", "headers"] }
tokio = { version = "1.29", features = ["sync"] }
chrono = "0.4.24"

[build-dependencies]
tonic-build = "0.8"

[dev-dependencies]
axum-test = "12.1.0"
once_cell = "1.18.0"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1.4.0", features = ["v4"] }
dapr = {path="./"}
tokio-test = "0.4.2"
tokio-stream = { version = "0.1" }

[[example]]
Expand Down Expand Up @@ -54,3 +69,11 @@ path = "examples/invoke/grpc-proxying/client.rs"
[[example]]
name = "invoke-grpc-proxying-server"
path = "examples/invoke/grpc-proxying/server.rs"

[[example]]
name = "actor-client"
path = "examples/actors/client.rs"

[[example]]
name = "actor-server"
path = "examples/actors/server.rs"
101 changes: 101 additions & 0 deletions examples/actors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Actor Example

This example demonstrates the Dapr actor framework. To author an actor,

1. Create a struc decorated with the `#[dapr::actor]` macro to house your custom actor methods that map to [Axum handlers](https://docs.rs/axum/latest/axum/handler/index.html), use [Axum extractors](https://docs.rs/axum/latest/axum/extract/index.html) to access the incoming request and return an [`impl IntoResponse`](https://docs.rs/axum/latest/axum/response/trait.IntoResponse.html).
Use the `DaprJson` extractor to deserialize the request from Json coming from a Dapr sidecar.
```rust
#[dapr::actor]
struct MyActor {
id: String,
client: ActorContextClient
}

#[derive(Serialize, Deserialize)]
pub struct MyRequest {
pub name: String,
}

#[derive(Serialize, Deserialize)]
pub struct MyResponse {
pub available: bool,
}

impl MyActor {
fn do_stuff(&self, DaprJson(data): DaprJson<MyRequest>) -> Json<MyResponse> {
println!("doing stuff with {}", data.name);
Json(MyResponse {
available: true
})
}
}
```

There are many ways to write your actor method signature, using Axum handlers, but you also have access to the actor instance via `self`. Here is a super simple example:
```rust
pub async fn method_2(&self) -> impl IntoResponse {
StatusCode::OK
}
```
1. Implement the `Actor` trait. This trait exposes the following methods:
- `on_activate` - Called when an actor is activated on a host
- `on_deactivate` - Called when an actor is deactivated on a host
- `on_reminder` - Called when a reminder is recieved from the Dapr sidecar
- `on_timer` - Called when a timer is recieved from the Dapr sidecar


```rust
#[async_trait]
impl Actor for MyActor {

async fn on_activate(&self) -> Result<(), ActorError> {
println!("on_activate {}", self.id);
Ok(())
}

async fn on_deactivate(&self) -> Result<(), ActorError> {
println!("on_deactivate");
Ok(())
}
}
```

1. An actor host requires an Http server to recieve callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes an `ActorTypeRegistration` which specifies
- The actor type name (used by Actor clients), and concrete struct
- A factory to construct a new instance of that actor type when one is required to be activated by the runtime. The parameters passed to the factory will be the actor type, actor ID, and a Dapr client for managing state, timers and reminders for the actor.
- The methods that you would like to expose to external clients.

```rust
let mut dapr_server = dapr::server::DaprHttpServer::new();

dapr_server.register_actor(ActorTypeRegistration::new::<MyActor>("MyActor",
Box::new(|actor_type, id, client| Arc::new(MyActor{
actor_type,
id,
client
})))
.register_method("do_stuff", MyActor::do_stuff)
.register_method("do_other_stuff", MyActor::do_other_stuff));

dapr_server.start(None).await?;
```


## Running

> Before you run the example make sure local redis state store is running by executing:
> ```
> docker ps
> ```
To run this example:

1. Start actor host (expose Http server receiver on port 50051):
```bash
dapr run --app-id actor-host --app-protocol http --app-port 50051 cargo run -- --example actor-server
```

2. Start actor client:
```bash
dapr run --app-id actor-client --dapr-grpc-port 3502 cargo run -- --example actor-client
```
37 changes: 37 additions & 0 deletions examples/actors/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct MyResponse {
pub available: bool,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct MyRequest {
pub name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));

// Get the Dapr port and create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);

// Create the client
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;

let data = MyRequest {
name: "test".to_string(),
};

let resp: Result<MyResponse, dapr::error::Error> = client
.invoke_actor("MyActor", "a1", "do_stuff", data, None)
.await;

println!("Response: {:#?}", resp);

Ok(())
}
86 changes: 86 additions & 0 deletions examples/actors/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use async_trait::async_trait;
use axum::Json;
use dapr::server::{
actor::{
context_client::ActorContextClient, runtime::ActorTypeRegistration, Actor, ActorError,
},
utils::DaprJson,
};
use dapr_macros::actor;
use serde::{Deserialize, Serialize};
use std::{str::from_utf8, sync::Arc};

#[derive(Serialize, Deserialize, Debug)]
pub struct MyResponse {
pub available: bool,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct MyRequest {
pub name: String,
}

#[actor]
struct MyActor {
id: String,
client: ActorContextClient,
}

impl MyActor {
async fn do_stuff(&self, DaprJson(req): DaprJson<MyRequest>) -> Json<MyResponse> {
println!("doing stuff with {}", req.name);
let mut dapr = self.client.clone();
let r = dapr.get_actor_state("key1").await.unwrap();
println!("get_actor_state {:?}", r);
Json(MyResponse { available: true })
}
}

#[async_trait]
impl Actor for MyActor {
async fn on_activate(&self) -> Result<(), ActorError> {
println!("on_activate {}", self.id);
Ok(())
}

async fn on_deactivate(&self) -> Result<(), ActorError> {
println!("on_deactivate");
Ok(())
}

async fn on_reminder(&self, reminder_name: &str, data: Vec<u8>) -> Result<(), ActorError> {
println!("on_reminder {} {:?}", reminder_name, from_utf8(&data));
Ok(())
}

async fn on_timer(&self, timer_name: &str, data: Vec<u8>) -> Result<(), ActorError> {
println!("on_timer {} {:?}", timer_name, from_utf8(&data));
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let mut dapr_server = dapr::server::DaprHttpServer::new().await;

dapr_server
.register_actor(
ActorTypeRegistration::new::<MyActor>(
"MyActor",
Box::new(|_actor_type, actor_id, context| {
Arc::new(MyActor {
id: actor_id.to_string(),
client: context,
})
}),
)
.register_method("do_stuff", MyActor::do_stuff)
.register_method("do_stuff2", MyActor::do_stuff),
)
.await;

dapr_server.start(None).await?;

Ok(())
}
2 changes: 1 addition & 1 deletion examples/pubsub/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl AppCallback for AppCallbackService {
let data = &r.data;
let data_content_type = &r.data_content_type;

let message = String::from_utf8_lossy(&data);
let message = String::from_utf8_lossy(data);
println!("Message: {}", &message);
println!("Content-Type: {}", &data_content_type);

Expand Down
1 change: 1 addition & 0 deletions macros/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target
14 changes: 14 additions & 0 deletions macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "dapr-macros"
version = "0.14.0"
edition = "2021"

[lib]
proc-macro = true

[dependencies]
async-trait = "0.1"
log = "0.4"
axum = "0.6.19"
syn = {version="2.0.29",features=["full"]}
quote = "1.0.8"
61 changes: 61 additions & 0 deletions macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::iter;

use proc_macro::TokenStream;
use quote::quote;

#[proc_macro_attribute]
pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream {
let actor_struct_name = match syn::parse::<syn::ItemStruct>(item.clone()) {
Ok(actor_struct) => actor_struct.ident.clone(),
Err(_) => match syn::parse::<syn::ItemType>(item.clone()) {
Ok(ty) => ty.ident.clone(),
Err(e) => panic!("Error parsing actor struct: {}", e),
},
};

let mut result = TokenStream::from(quote!(
#[async_trait::async_trait]
impl axum::extract::FromRequestParts<dapr::server::actor::runtime::ActorState> for &#actor_struct_name {
type Rejection = dapr::server::actor::ActorRejection;

async fn from_request_parts(
parts: &mut axum::http::request::Parts,
state: &dapr::server::actor::runtime::ActorState,
) -> Result<Self, Self::Rejection> {
let path = match axum::extract::Path::<dapr::server::actor::ActorPath>::from_request_parts(parts, state).await {
Ok(path) => path,
Err(e) => {
log::error!("Error getting path: {}", e);
return Err(dapr::server::actor::ActorRejection::Path(e));
}
};
let actor_type = state.actor_type.clone();
let actor_id = path.actor_id.clone();
log::info!(
"Request for actor_type: {}, actor_id: {}",
actor_type,
actor_id
);
let actor = match state
.runtime
.get_or_create_actor(&actor_type, &actor_id)
.await
{
Ok(actor) => actor,
Err(e) => {
log::error!("Error getting actor: {}", e);
return Err(dapr::server::actor::ActorRejection::ActorError(e.to_string()));
}
};
let actor = actor.as_ref();
let well_known_actor =
unsafe { &*(actor as *const dyn dapr::server::actor::Actor as *const #actor_struct_name) };
Ok(well_known_actor)
}
}
));

result.extend(iter::once(item));

result
}

0 comments on commit 8db69d8

Please sign in to comment.