Skip to content

Commit

Permalink
Cleanup (#254)
Browse files Browse the repository at this point in the history
* Return Result when creating DaprHttpServer

Especially when running custom docker setups, the container that
contains the sidecar may not be running exactly when the Rust code
starts running.

To fix this, before we needed to sleep(2s) to avoid a panic in the Rust
program.
With this patch, this can be handled on the user side (e.g. the
connection can be retried multiple times with timeouts in-between).

Signed-off-by: Leon Matthes <[email protected]>

* release: v0.16.0-rc.4

Signed-off-by: Mike Nguyen <[email protected]>

* chore(deps): remove unused crates

Signed-off-by: Mike Nguyen <[email protected]>

* doc: missing expression closure

Signed-off-by: Mike Nguyen <[email protected]>

* refactor: lint issues and correctness improvements

Signed-off-by: Mike Nguyen <[email protected]>

---------

Signed-off-by: Leon Matthes <[email protected]>
Signed-off-by: Mike Nguyen <[email protected]>
Co-authored-by: Leon Matthes <[email protected]>
  • Loading branch information
mikeee and LeonMatthesKDAB authored Jan 15, 2025
1 parent 6973b7d commit 8bf6013
Show file tree
Hide file tree
Showing 20 changed files with 43 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dapr-bot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ rust-version = "1.70.0"

[dependencies]
exitcode = "1.1.2"
octocrab = "0.34.1"
octocrab = "0.42.1"
serde_json = "1.0.114"
tokio = { version = "1.36.0", features = ["full"] }
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@ resolver = "2"
[workspace.dependencies]
async-trait = "0.1"
prost = "0.13.1"
prost-build = "0.13.1"
prost-types = "0.13.1"

serde = "1.0"
serde_json = "1.0"

tokio = "1.39"
tokio-stream = "0.1"
tokio-test = "0.4"
tokio-util = "0.7"

tonic = "0.12.1"
tonic-build = "0.12.1"

[workspace.package]
version = "0.16.0-rc.3"
version = "0.16.0-rc.4"
authors = [
"Mike Nguyen <[email protected]>",
"The Dapr Authors <[email protected]>"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Add the following to your `Cargo.toml` file:

```toml
[dependencies]
dapr = "0.16.0-rc.3"
dapr = "0.16.0-rc.4"
```

Here's a basic example to create a client:
Expand Down
3 changes: 0 additions & 3 deletions dapr-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ rust-version.workspace = true
proc-macro = true

[dependencies]
async-trait = { workspace = true }
axum = "0.7"
log = "0.4"
proc-macro2 = "1.0"
quote = "1.0"
syn = { version = "2.0", features = ["full"] }
2 changes: 0 additions & 2 deletions dapr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ rust-version.workspace = true
[dependencies]
async-trait = { workspace = true }
axum = "0.7"
bytes = "1.7"
chrono = "0.4"
futures = "0.3"
log = "0.4"
Expand All @@ -33,5 +32,4 @@ dapr = { path = "./" }
dapr-macros = { path = "../dapr-macros" }
tokio = { workspace = true, features = ["full"] }
uuid = { version = "1.10", features = ["v4"] }
tokio-test = { workspace = true }
tokio-stream = { workspace = true }
19 changes: 9 additions & 10 deletions dapr/src/appcallback.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
use crate::dapr;
use crate::dapr::proto::runtime::v1::app_callback_server::AppCallback;
use crate::dapr::proto::{common, runtime};
use std::collections::HashMap;
use tonic::{Code, Request, Response, Status};

/// InvokeRequest is the message to invoke a method with the data.
pub type InvokeRequest = dapr::proto::common::v1::InvokeRequest;
pub type InvokeRequest = common::v1::InvokeRequest;

/// InvokeResponse is the response message inclduing data and its content type
/// from app callback.
pub type InvokeResponse = dapr::proto::common::v1::InvokeResponse;
pub type InvokeResponse = common::v1::InvokeResponse;

/// ListTopicSubscriptionsResponse is the message including the list of the subscribing topics.
pub type ListTopicSubscriptionsResponse = dapr::proto::runtime::v1::ListTopicSubscriptionsResponse;
pub type ListTopicSubscriptionsResponse = runtime::v1::ListTopicSubscriptionsResponse;

/// TopicSubscription represents a topic and it's metadata (session id etc.)
pub type TopicSubscription = dapr::proto::runtime::v1::TopicSubscription;
pub type TopicSubscription = runtime::v1::TopicSubscription;

/// TopicEventRequest message is compatiable with CloudEvent spec v1.0.
pub type TopicEventRequest = dapr::proto::runtime::v1::TopicEventRequest;
pub type TopicEventRequest = runtime::v1::TopicEventRequest;

/// TopicEventResponse is response from app on published message
pub type TopicEventResponse = dapr::proto::runtime::v1::TopicEventResponse;
pub type TopicEventResponse = runtime::v1::TopicEventResponse;

/// ListInputBindingsResponse is the message including the list of input bindings.
pub type ListInputBindingsResponse = dapr::proto::runtime::v1::ListInputBindingsResponse;
pub type ListInputBindingsResponse = runtime::v1::ListInputBindingsResponse;

/// BindingEventRequest represents input bindings event.
pub type BindingEventRequest = dapr::proto::runtime::v1::BindingEventRequest;
pub type BindingEventRequest = runtime::v1::BindingEventRequest;

/// BindingEventResponse includes operations to save state or
/// send data to output bindings optionally.
pub type BindingEventResponse = dapr::proto::runtime::v1::BindingEventResponse;
pub type BindingEventResponse = runtime::v1::BindingEventResponse;

impl ListTopicSubscriptionsResponse {
/// Create `ListTopicSubscriptionsResponse` with a topic.
Expand Down
24 changes: 12 additions & 12 deletions dapr/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,13 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
Ok(dapr_v1::dapr_client::DaprClient::connect(addr).await?)
}

async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> {
self.publish_event(Request::new(request))
.await?
.into_inner();
Ok(())
}

async fn invoke_service(
&mut self,
request: InvokeServiceRequest,
Expand All @@ -676,13 +683,6 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
.into_inner())
}

async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> {
self.publish_event(Request::new(request))
.await?
.into_inner();
Ok(())
}

async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error> {
Ok(self.get_secret(Request::new(request)).await?.into_inner())
}
Expand All @@ -701,6 +701,11 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
Ok(self.get_state(Request::new(request)).await?.into_inner())
}

async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error> {
self.save_state(Request::new(request)).await?.into_inner();
Ok(())
}

async fn query_state_alpha1(
&mut self,
request: QueryStateRequest,
Expand All @@ -711,11 +716,6 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
.into_inner())
}

async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error> {
self.save_state(Request::new(request)).await?.into_inner();
Ok(())
}

async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error> {
self.delete_state(Request::new(request)).await?.into_inner();
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions daprdocs/content/en/rust-sdk-docs/rust-client/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dapr = "0.16.0"
You can either reference `dapr::Client` or bind the full path to a new name as follows:

```rust
use dapr::Client as DaprClient
use dapr::Client as DaprClient;
```

## Instantiating the Dapr client
Expand All @@ -43,7 +43,7 @@ use dapr::Client as DaprClient
let addr = "https://127.0.0.1".to_string();

let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr,
port).await?;
port).await?;
```

## Building blocks
Expand Down
1 change: 0 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ rust-version.workspace = true

[dependencies]
async-trait = { workspace = true }
base64 = "0.22"
dapr = { path = "../dapr" }
dapr-macros = { path = "../dapr-macros" }
env_logger = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion examples/src/actors/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct MyRequest {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));
tokio::time::sleep(std::time::Duration::new(2, 0)).await;

// Define the Dapr address
let addr = "https://127.0.0.1".to_string();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/bindings/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl AppCallback for AppCallbackService {
let name = &r.name;
let data = &r.data;

let message = String::from_utf8_lossy(&data);
let message = String::from_utf8_lossy(data);
println!("Binding Name: {}", &name);
println!("Message: {}", &message);

Expand Down
4 changes: 2 additions & 2 deletions examples/src/bindings/output.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{collections::HashMap, thread, time::Duration};
use std::{collections::HashMap, time::Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
thread::sleep(Duration::from_secs(2));
tokio::time::sleep(Duration::from_secs(2)).await;

// Get the Dapr port and create a connection
let addr = "https://127.0.0.1".to_string();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));
tokio::time::sleep(std::time::Duration::new(2, 0)).await;

// Set the Dapr address
let addr = "https://127.0.0.1".to_string();
Expand Down
6 changes: 3 additions & 3 deletions examples/src/configuration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type DaprClient = dapr::Client<dapr::client::TonicClient>;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));
tokio::time::sleep(std::time::Duration::new(2, 0)).await;

// Set the Dapr address
let addr = "https://127.0.0.1".to_string();
Expand All @@ -19,14 +19,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// get key-value pair in the state store
let response = client
.get_configuration(CONFIGSTORE_NAME, vec![(&key)], None)
.get_configuration(CONFIGSTORE_NAME, vec![&key], None)
.await?;
let val = response.items.get("hello").unwrap();
println!("Configuration value: {val:?}");

// Subscribe for configuration changes
let mut stream = client
.subscribe_configuration(CONFIGSTORE_NAME, vec![(&key)], None)
.subscribe_configuration(CONFIGSTORE_NAME, vec![&key], None)
.await?;

let mut subscription_id = String::new();
Expand Down
3 changes: 1 addition & 2 deletions examples/src/conversation/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use dapr::client::{ConversationInputBuilder, ConversationRequestBuilder};
use std::thread;
use std::time::Duration;

type DaprClient = dapr::Client<dapr::client::TonicClient>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sleep to allow for the server to become available
thread::sleep(Duration::from_secs(5));
tokio::time::sleep(Duration::from_secs(5)).await;

// Set the Dapr address
let address = "https://127.0.0.1".to_string();
Expand Down
4 changes: 2 additions & 2 deletions examples/src/invoke/grpc-proxying/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{thread, time::Duration};
use std::time::Duration;

use hello_world::{greeter_client::GreeterClient, HelloRequest};

Expand All @@ -11,7 +11,7 @@ pub mod hello_world {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sleep to allow for the server to become available
thread::sleep(Duration::from_secs(5));
tokio::time::sleep(Duration::from_secs(5)).await;

// Get the Dapr port and create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT").unwrap().parse().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions examples/src/invoke/grpc/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::hello_world::HelloReply;
use std::{thread, time::Duration};
use std::time::Duration;

use prost::Message;

Expand All @@ -12,7 +12,7 @@ type DaprClient = dapr::Client<dapr::client::TonicClient>;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sleep to allow for the server to become available
thread::sleep(Duration::from_secs(5));
tokio::time::sleep(Duration::from_secs(5)).await;

// Set the Dapr address
let address = "https://127.0.0.1".to_string();
Expand Down
4 changes: 2 additions & 2 deletions examples/src/pubsub/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, thread, time::Duration};
use std::{collections::HashMap, time::Duration};

use tokio::time;

Expand All @@ -21,7 +21,7 @@ struct Refund {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
thread::sleep(Duration::from_secs(2));
tokio::time::sleep(Duration::from_secs(2)).await;

// Set address for Dapr connection
let addr = "https://127.0.0.1".to_string();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/query_state/query1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(5, 0));
tokio::time::sleep(std::time::Duration::new(5, 0)).await;

// Set the Dapr address and create a connection
let addr = "https://127.0.0.1".to_string();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/query_state/query2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(5, 0));
tokio::time::sleep(std::time::Duration::new(5, 0)).await;

// Set the Dapr address and create a connection
let addr = "https://127.0.0.1".to_string();
Expand Down

0 comments on commit 8bf6013

Please sign in to comment.