Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/invoke-aio/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.9-slim

WORKDIR /app

ADD requirements.txt .
RUN pip install -r requirements.txt

COPY *.py /app/

CMD [ "python", "invoke-receiver.py" ]
118 changes: 118 additions & 0 deletions examples/invoke-aio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Example - Invoke a service

This example utilizes a receiver and a caller for the OnInvoke / Invoke functionality. It will create an aio gRPC server and bind the OnInvoke method to an async function, which gets called after a client sends a direct method invocation.

> **Note:** Make sure to use the latest proto bindings

## Pre-requisites

- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
- [Install Python 3.9+](https://www.python.org/downloads/)

## Install Dapr python-SDK

<!-- Our CI/CD pipeline automatically installs the correct version, so we can skip this step in the automation -->

```bash
pip3 install dapr dapr-ext-grpc
```

## Running in self-hosted mode

Run the following command in a terminal/command-prompt:

<!-- STEP
name: Run receiver
expected_stdout_lines:
- '== APP == {"id": 1, "message": "hello world"}'
- '== APP == {"id": 1, "message": "hello world"}'
background: true
sleep: 5
-->

```bash
# 1. Start Receiver (expose gRPC server receiver on port 50051)
dapr run --app-id invoke-receiver --app-protocol grpc --app-port 50051 python3 invoke-receiver.py
```

<!-- END_STEP -->

In another terminal/command prompt run:

<!-- STEP
name: Run caller
expected_stdout_lines:
- '== APP == text/plain'
- '== APP == INVOKE_RECEIVED'
- '== APP == text/plain'
- '== APP == INVOKE_RECEIVED'
background: true
sleep: 5
-->

```bash
# 2. Start Caller
dapr run --app-id invoke-caller --app-protocol grpc --dapr-http-port 3500 python3 invoke-caller.py
```

<!-- END_STEP -->

## Cleanup

<!-- STEP
expected_stdout_lines:
- '✅ app stopped successfully: invoke-caller'
- '✅ app stopped successfully: invoke-receiver'
name: Shutdown dapr
-->

```bash
dapr stop --app-id invoke-caller
dapr stop --app-id invoke-receiver
```

<!-- END_STEP -->

## Running in Kubernetes mode

1. Build docker image

```
docker build -t [your registry]/invokesimple:latest .
```

2. Push docker image

```
docker push [your registry]/invokesimple:latest
```

3. Edit image name to `[your registry]/invokesimple:latest` in deploy/*.yaml

4. Deploy applications

```
kubectl apply -f ./deploy/
```

5. See logs for the apps and sidecars

Logs for caller sidecar:
```
dapr logs -a invoke-caller -k
```

Logs for caller app:
```
kubectl logs -l app="invokecaller" -c invokecaller
```

Logs for receiver sidecar:
```
dapr logs -a invoke-receiver -k
```

Logs for receiver app:
```
kubectl logs -l app="invokereceiver" -c invokereceiver
```
37 changes: 37 additions & 0 deletions examples/invoke-aio/deploy/invoke-caller.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: invokecaller
labels:
app: invokecaller
spec:
replicas: 1
selector:
matchLabels:
app: invokecaller
template:
metadata:
labels:
app: invokecaller
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "invoke-caller"
dapr.io/app-protocol: "grpc"
spec:
containers:
- name: invokecaller
image: invokesimple:latest # EDIT HERE: Replace the image name
command: ["python"]
args: ["/app/invoke-caller.py"]
imagePullPolicy: Always
40 changes: 40 additions & 0 deletions examples/invoke-aio/deploy/invoke-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: invokereceiver
labels:
app: invokereceiver
spec:
replicas: 1
selector:
matchLabels:
app: invokereceiver
template:
metadata:
labels:
app: invokereceiver
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "invoke-receiver"
dapr.io/app-protocol: "grpc"
dapr.io/app-port: "50051"
spec:
containers:
- name: invokereceiver
image: invokesimple:latest # EDIT HERE: Replace the image name
command: ["python"]
args: ["/app/invoke-receiver.py"]
ports:
- containerPort: 3000
imagePullPolicy: Always
22 changes: 22 additions & 0 deletions examples/invoke-aio/invoke-caller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import json
import time

from dapr.clients import DaprClient

with DaprClient() as d:
req_data = {'id': 1, 'message': 'hello world'}

while True:
# Create a typed message with content type and body
resp = d.invoke_method(
'invoke-receiver',
'my-method',
data=json.dumps(req_data),
)

# Print the response
print(resp.content_type, flush=True)
print(resp.text(), flush=True)
print(str(resp.status_code), flush=True)

time.sleep(2)
15 changes: 15 additions & 0 deletions examples/invoke-aio/invoke-receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio
from dapr.ext.grpc.aio import App, InvokeMethodRequest, InvokeMethodResponse

app = App()


@app.method(name='my-method')
async def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
print(request.metadata, flush=True)
print(request.text(), flush=True)

return InvokeMethodResponse(b'INVOKE_RECEIVED', 'text/plain; charset=UTF-8')


asyncio.run(app.run(50051))
2 changes: 2 additions & 0 deletions examples/invoke-aio/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dapr-ext-grpc-dev >= 1.15.0.dev
dapr-dev >= 1.15.0.dev
35 changes: 35 additions & 0 deletions ext/dapr-ext-grpc/dapr/ext/grpc/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-

"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dapr.clients.grpc._request import InvokeMethodRequest, BindingRequest, JobEvent
from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse
from dapr.clients.grpc._jobs import Job, FailurePolicy, DropFailurePolicy, ConstantFailurePolicy

from dapr.ext.grpc.aio.app import App, Rule # type:ignore


__all__ = [
'App',
'Rule',
'InvokeMethodRequest',
'InvokeMethodResponse',
'BindingRequest',
'TopicEventResponse',
'Job',
'JobEvent',
'FailurePolicy',
'DropFailurePolicy',
'ConstantFailurePolicy',
]
32 changes: 32 additions & 0 deletions ext/dapr-ext-grpc/dapr/ext/grpc/aio/_health_servicer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import grpc
from typing import Callable, Optional

from dapr.proto import appcallback_service_v1
from dapr.proto.runtime.v1.appcallback_pb2 import HealthCheckResponse

HealthCheckCallable = Optional[Callable[[], None]]


class _AioHealthCheckServicer(appcallback_service_v1.AppCallbackHealthCheckServicer):
"""The implementation of HealthCheck Server.

:class:`App` provides useful decorators to register method, topic, input bindings.
"""

def __init__(self):
self._health_check_cb: Optional[HealthCheckCallable] = None

def register_health_check(self, cb: HealthCheckCallable) -> None:
if not cb:
raise ValueError('health check callback must be defined')
self._health_check_cb = cb

async def HealthCheck(self, request, context):
"""Health check."""

if not self._health_check_cb:
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
await self._health_check_cb()
return HealthCheckResponse()
Loading