Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add trusted issuers for request token verification #488

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions pybotx/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
)
Expand Down Expand Up @@ -302,12 +303,13 @@ def async_execute_raw_bot_command(
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
logging_command: bool = True,
trusted_issuers: Optional[Set[str]] = None,
) -> None:
if logging_command:
log_incoming_request(raw_bot_command, message="Got command: ")

if verify_request:
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

try:
bot_api_command: BotAPICommand = parse_obj_as(
Expand Down Expand Up @@ -336,6 +338,7 @@ async def sync_execute_raw_smartapp_event(
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
logging_command: bool = True,
trusted_issuers: Optional[Set[str]] = None,
) -> BotAPISyncSmartAppEventResponse:
if logging_command:
log_incoming_request(
Expand All @@ -344,7 +347,7 @@ async def sync_execute_raw_smartapp_event(
)

if verify_request:
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

try:
bot_api_smartapp_event: BotAPISyncSmartAppEvent = parse_obj_as(
Expand Down Expand Up @@ -374,16 +377,15 @@ async def raw_get_status(
query_params: Dict[str, str],
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
trusted_issuers: Optional[Set[str]] = None,
) -> Dict[str, Any]:
logger.opt(lazy=True).debug(
"Got status: {status}",
status=lambda: pformat_jsonable_obj(query_params),
)

if verify_request:
if request_headers is None:
raise RequestHeadersNotProvidedError
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

try:
bot_api_status_recipient = BotAPIStatusRecipient.parse_obj(query_params)
Expand All @@ -406,13 +408,12 @@ async def set_raw_botx_method_result(
raw_botx_method_result: Dict[str, Any],
verify_request: bool = True,
request_headers: Optional[Mapping[str, str]] = None,
trusted_issuers: Optional[Set[str]] = None,
) -> None:
logger.debug("Got callback: {callback}", callback=raw_botx_method_result)

if verify_request:
if request_headers is None:
raise RequestHeadersNotProvidedError
self._verify_request(request_headers)
self._verify_request(request_headers, trusted_issuers=trusted_issuers)

callback: BotXMethodCallback = parse_obj_as(
# Same ignore as in pydantic
Expand Down Expand Up @@ -2068,6 +2069,8 @@ async def collect_metric(
def _verify_request( # noqa: WPS231, WPS238
self,
headers: Optional[Mapping[str, str]],
*,
trusted_issuers: Optional[Set[str]] = None,
) -> None:
if headers is None:
raise RequestHeadersNotProvidedError
Expand Down Expand Up @@ -2108,11 +2111,20 @@ def _verify_request( # noqa: WPS231, WPS238
leeway=1,
options={
"verify_aud": False,
"verify_iss": False,
},
)
except jwt.InvalidTokenError as exc:
raise UnverifiedRequestError(exc.args[0]) from exc

issuer = token_payload.get("iss")
if issuer is None:
raise UnverifiedRequestError('Token is missing the "iss" claim')

if issuer != bot_account.host:
if not trusted_issuers or issuer not in trusted_issuers:
raise UnverifiedRequestError("Invalid issuer")

@staticmethod
def _build_main_collector(
collectors: Sequence[HandlerCollector],
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pybotx"
version = "0.72.0"
version = "0.73.0"
description = "A python library for interacting with eXpress BotX API"
authors = [
"Sidnev Nikolay <[email protected]>",
Expand Down
123 changes: 122 additions & 1 deletion tests/test_verify_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,85 @@ async def test__verify_request__invalid_issuer(
assert "Invalid issuer" in str(exc.value)


async def test__verify_request__trusted_issuers_have_token_issuer(
bot_account: BotAccountWithSecret,
authorization_token_payload: Dict[str, Any],
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
token_issuer = "another.example.com"
authorization_token_payload["iss"] = token_issuer
token = jwt.encode(
payload=authorization_token_payload,
key=bot_account.secret_key,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
bot._verify_request(
{"authorization": f"Bearer {token}"},
trusted_issuers={token_issuer},
)


async def test__verify_request__trusted_issuers_have_not_token_issuer(
bot_account: BotAccountWithSecret,
authorization_token_payload: Dict[str, Any],
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
authorization_token_payload["iss"] = "another.example.com"
token = jwt.encode(
payload=authorization_token_payload,
key=bot_account.secret_key,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
with pytest.raises(UnverifiedRequestError) as exc:
bot._verify_request(
{"authorization": f"Bearer {token}"},
trusted_issuers={"another-another.example.com"},
)

# - Assert -
assert "Invalid issuer" in str(exc.value)


async def test__verify_request__token_issuer_is_missed(
bot_account: BotAccountWithSecret,
authorization_token_payload: Dict[str, Any],
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
del authorization_token_payload["iss"]
token = jwt.encode(
payload=authorization_token_payload,
key=bot_account.secret_key,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
with pytest.raises(UnverifiedRequestError) as exc:
bot._verify_request(
{"authorization": f"Bearer {token}"},
)

# - Assert -
assert 'Token is missing the "iss" claim' in str(exc.value)


@pytest.mark.parametrize(
"target_func_name",
("async_execute_raw_bot_command", "raw_get_status", "set_raw_botx_method_result"),
(
"async_execute_raw_bot_command",
"sync_execute_raw_smartapp_event",
"raw_get_status",
"set_raw_botx_method_result",
),
)
async def test__verify_request__without_headers(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
Expand Down Expand Up @@ -311,6 +387,31 @@ async def test__async_execute_raw_bot_command__verify_request__called(
bot._verify_request.assert_called()


async def test__sync_execute_raw_smartapp_event__verify_request__called(
api_sync_smartapp_event_factory: Callable[..., Dict[str, Any]],
collector_with_sync_smartapp_event_handler: HandlerCollector,
bot_account: BotAccountWithSecret,
) -> None:
# - Arrange -
built_bot = Bot(
collectors=[collector_with_sync_smartapp_event_handler],
bot_accounts=[bot_account],
)
payload = api_sync_smartapp_event_factory(bot_id=bot_account.id)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
bot._verify_request = Mock() # type: ignore
await bot.sync_execute_raw_smartapp_event(
payload,
verify_request=True,
request_headers={},
)

# - Assert -
bot._verify_request.assert_called()


async def test__raw_get_status__verify_request__called(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
bot_account: BotAccountWithSecret,
Expand Down Expand Up @@ -384,6 +485,26 @@ async def test__async_execute_raw_bot_command__verify_request__not_called(
bot.async_execute_bot_command.assert_called()


async def test__sync_execute_raw_smartapp_event__verify_request__not_called(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
bot_account: BotAccountWithSecret,
) -> None:
# - Arrange -
collector = HandlerCollector()
built_bot = Bot(collectors=[collector], bot_accounts=[bot_account])
payload = api_incoming_message_factory()

# - Act -
async with lifespan_wrapper(built_bot) as bot:
bot._verify_request = Mock() # type: ignore
bot.sync_execute_raw_smartapp_event = Mock() # type: ignore
bot.sync_execute_raw_smartapp_event(payload, verify_request=False)

# - Assert -
bot._verify_request.assert_not_called()
bot.sync_execute_raw_smartapp_event.assert_called()


async def test__raw_get_status__verify_request__not_called(
api_incoming_message_factory: Callable[..., Dict[str, Any]],
bot_account: BotAccountWithSecret,
Expand Down
Loading