Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions dapr/aio/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
] = None,
http_timeout_seconds: Optional[int] = None,
max_grpc_message_length: Optional[int] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime and via gRPC and HTTP.

Expand All @@ -79,8 +80,10 @@ def __init__(
http_timeout_seconds (int): specify a timeout for http connections
max_grpc_messsage_length (int, optional): The maximum grpc send and receive
message length in bytes.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
super().__init__(address, interceptors, max_grpc_message_length)
super().__init__(address, interceptors, max_grpc_message_length, api_token)
self.invocation_client = None

invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()
Expand All @@ -89,7 +92,9 @@ def __init__(
if http_timeout_seconds is None:
http_timeout_seconds = settings.DAPR_HTTP_TIMEOUT_SECONDS
self.invocation_client = DaprInvocationHttpClient(
headers_callback=headers_callback, timeout=http_timeout_seconds
headers_callback=headers_callback,
timeout=http_timeout_seconds,
api_token=api_token,
)
elif invocation_protocol == 'GRPC':
pass
Expand Down
12 changes: 9 additions & 3 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(
] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime and initialize gRPC client stub.

Expand All @@ -152,8 +153,13 @@ def __init__(
StreamStreamClientInterceptor, optional): gRPC interceptors.
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
DaprHealth.wait_until_ready()
# Use explicit token if provided, otherwise fall back to global setting
api_token = api_token if api_token is not None else settings.DAPR_API_TOKEN

DaprHealth.wait_until_ready(api_token=api_token)
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
Expand Down Expand Up @@ -184,10 +190,10 @@ def __init__(
else:
interceptors.append(DaprClientTimeoutInterceptorAsync())

if settings.DAPR_API_TOKEN:
if api_token is not None:
api_token_interceptor = DaprClientInterceptorAsync(
[
('dapr-api-token', settings.DAPR_API_TOKEN),
('dapr-api-token', api_token),
]
)
interceptors.append(api_token_interceptor)
Expand Down
7 changes: 5 additions & 2 deletions dapr/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
http_timeout_seconds: Optional[int] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime via gRPC and HTTP.

Expand All @@ -84,8 +85,10 @@ def __init__(
http_timeout_seconds (int): specify a timeout for http connections
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
super().__init__(address, interceptors, max_grpc_message_length, retry_policy)
super().__init__(address, interceptors, max_grpc_message_length, retry_policy, api_token)
self.invocation_client = None

invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()
Expand All @@ -94,7 +97,7 @@ def __init__(
if http_timeout_seconds is None:
http_timeout_seconds = settings.DAPR_HTTP_TIMEOUT_SECONDS
self.invocation_client = DaprInvocationHttpClient(
headers_callback=headers_callback, timeout=http_timeout_seconds
headers_callback=headers_callback, timeout=http_timeout_seconds, api_token=api_token
)
elif invocation_protocol == 'GRPC':
pass
Expand Down
12 changes: 9 additions & 3 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def __init__(
] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime and initializes gRPC client stub.

Expand All @@ -144,8 +145,13 @@ def __init__(
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
retry_policy (RetryPolicy optional): Specifies retry behaviour
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
DaprHealth.wait_until_ready()
# Use explicit token if provided, otherwise fall back to global setting
api_token = api_token if api_token is not None else settings.DAPR_API_TOKEN

DaprHealth.wait_until_ready(api_token=api_token)
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
Expand Down Expand Up @@ -184,10 +190,10 @@ def __init__(

self._channel = grpc.intercept_channel(self._channel, DaprClientTimeoutInterceptor()) # type: ignore

if settings.DAPR_API_TOKEN:
if api_token is not None:
api_token_interceptor = DaprClientInterceptor(
[
('dapr-api-token', settings.DAPR_API_TOKEN),
('dapr-api-token', api_token),
]
)
self._channel = grpc.intercept_channel( # type: ignore
Expand Down
15 changes: 9 additions & 6 deletions dapr/clients/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import urllib.request
import urllib.error
import time
import urllib.error
import urllib.request

from typing import Optional

from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT
from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, DAPR_USER_AGENT, USER_AGENT_HEADER
from dapr.clients.http.helpers import get_api_url
from dapr.conf import settings


class DaprHealth:
@staticmethod
def wait_until_ready():
def wait_until_ready(api_token: Optional[str] = None):
health_url = f'{get_api_url()}/healthz/outbound'
headers = {USER_AGENT_HEADER: DAPR_USER_AGENT}
if settings.DAPR_API_TOKEN is not None:
headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
token = api_token if api_token is not None else settings.DAPR_API_TOKEN
if token is not None:
headers[DAPR_API_TOKEN_HEADER] = token
timeout = float(settings.DAPR_HEALTH_TIMEOUT)

start = time.time()
Expand Down
13 changes: 10 additions & 3 deletions dapr/clients/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,21 @@ def __init__(
timeout: Optional[int] = 60,
headers_callback: Optional[Callable[[], Dict[str, str]]] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Invokes Dapr over HTTP.

Args:
message_serializer (Serializer): Dapr serializer.
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
DaprHealth.wait_until_ready()
# use provided api_token if any or fallback to env var DAPR_API_TOKEN
self._api_token = api_token if api_token is not None else settings.DAPR_API_TOKEN

DaprHealth.wait_until_ready(api_token=self._api_token)

self._timeout = aiohttp.ClientTimeout(total=timeout)
self._serializer = message_serializer
Expand All @@ -71,8 +77,9 @@ async def send_bytes(
if not headers_map.get(CONTENT_TYPE_HEADER):
headers_map[CONTENT_TYPE_HEADER] = DEFAULT_JSON_CONTENT_TYPE

if settings.DAPR_API_TOKEN is not None:
headers_map[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
# Use explicit token if provided, otherwise fall back to global setting
if self._api_token is not None:
headers_map[DAPR_API_TOKEN_HEADER] = self._api_token

if self._headers_callback is not None:
trace_headers = self._headers_callback()
Expand Down
7 changes: 6 additions & 1 deletion dapr/clients/http/dapr_actor_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
timeout: int = 60,
headers_callback: Optional[Callable[[], Dict[str, str]]] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Invokes Dapr Actors over HTTP.

Expand All @@ -44,8 +45,12 @@ def __init__(
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
retry_policy (RetryPolicy optional): Specifies retry behaviour
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
self._client = DaprHttpClient(message_serializer, timeout, headers_callback, retry_policy)
self._client = DaprHttpClient(
message_serializer, timeout, headers_callback, retry_policy, api_token
)

async def invoke_method(
self, actor_type: str, actor_id: str, method: str, data: Optional[bytes] = None
Expand Down
9 changes: 8 additions & 1 deletion dapr/clients/http/dapr_invocation_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,23 @@ def __init__(
timeout: int = 60,
headers_callback: Optional[Callable[[], Dict[str, str]]] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Invokes Dapr's API for method invocation over HTTP.

Args:
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
retry_policy (RetryPolicy optional): Specifies retry behaviour
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
self._client = DaprHttpClient(
DefaultJSONSerializer(), timeout, headers_callback, retry_policy=retry_policy
DefaultJSONSerializer(),
timeout,
headers_callback,
retry_policy=retry_policy,
api_token=api_token,
)

async def invoke_method_async(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
host: Optional[str] = None,
port: Optional[str] = None,
logger_options: Optional[LoggerOptions] = None,
api_token: Optional[str] = None,
):
address = getAddress(host, port)

Expand All @@ -63,8 +64,9 @@ def __init__(
self._logger = Logger('DaprWorkflowClient', logger_options)

metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
api_token = api_token if api_token is not None else settings.DAPR_API_TOKEN
if api_token is not None:
metadata = ((DAPR_API_TOKEN_HEADER, api_token),)
options = self._logger.get_options()
self.__obj = client.TaskHubGrpcClient(
host_address=uri.endpoint,
Expand Down
6 changes: 4 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def __init__(
host: Optional[str] = None,
port: Optional[str] = None,
logger_options: Optional[LoggerOptions] = None,
api_token: Optional[str] = None,
):
self._logger = Logger('WorkflowRuntime', logger_options)
metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
api_token = api_token if api_token is not None else settings.DAPR_API_TOKEN
if api_token is not None:
metadata = ((DAPR_API_TOKEN_HEADER, api_token),)
address = getAddress(host, port)

try:
Expand Down
Loading