Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jwt generation #248

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 3.9.2
current_version = 3.10.0
commit = True
tag = True

Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Release 3.10.0
- Add new `max_bitrate` option for archives
- Change to create JWTs by default in the `Client.generate_token` method. T1 tokens can still be created by setting `use_jwt=False` when generating a token.

# Release 3.9.2
- Migrate from using `python-jose` with native-python cryptographic backend to the `pyjwt` package

Expand Down
52 changes: 40 additions & 12 deletions opentok/opentok.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time # generate_token
import hmac # _sign_string
import hashlib
from typing import List # use for type hinting
from typing import List
import requests # create_session, archiving
import json # archiving
import platform # user-agent
Expand Down Expand Up @@ -174,6 +174,7 @@ def generate_token(
expire_time=None,
data=None,
initial_layout_class_list=[],
use_jwt=True,
):
"""
Generates a token for a given session.
Expand Down Expand Up @@ -212,6 +213,9 @@ def generate_token(
`live streaming broadcasts <https://tokbox.com/developer/guides/broadcast/#live-streaming>`_ and
`composed archives <https://tokbox.com/developer/guides/archiving/layout-control.html>`_

:param bool use_jwt: Whether to use JWT tokens or not. If set to False, the token will be a
plain text token. If set to True (the default), the token will be a JWT.

:rtype:
The token string.
"""
Expand Down Expand Up @@ -287,7 +291,7 @@ def generate_token(
try:
decoded_session_id = base64.b64decode(sub_session_id_bytes_padded, b("-_"))
parts = decoded_session_id.decode("utf-8").split(u("~"))
except Exception as e:
except Exception:
raise OpenTokException(
u("Cannot generate token, the session_id {0} was not valid").format(
session_id
Expand All @@ -300,6 +304,29 @@ def generate_token(
).format(session_id, self.api_key)
)

if use_jwt:
payload = {}
payload['iss'] = self.api_key
payload['ist'] = 'project'
payload['iat'] = now
payload["exp"] = expire_time
payload['nonce'] = random.randint(0, 999999)
payload['role'] = role.value
payload['scope'] = 'session.connect'
payload['session_id'] = session_id
if initial_layout_class_list:
payload['initial_layout_class_list'] = (
initial_layout_class_list_serialized
)
if data:
payload['connection_data'] = data

headers = {'alg': 'HS256', 'typ': 'JWT'}

token = encode(payload, self.api_secret, algorithm="HS256", headers=headers)

return token

data_params = dict(
session_id=session_id,
create_time=now,
Expand All @@ -322,6 +349,7 @@ def generate_token(
sentinal=self.TOKEN_SENTINEL,
base64_data=base64.b64encode(decoded_base64_bytes).decode(),
)

return token

def create_session(
Expand Down Expand Up @@ -470,7 +498,7 @@ def create_session(
try:
logger.debug(
"POST to %r with params %r, headers %r, proxies %r",
self.endpoints.session_url(),
self.endpoints.get_session_url(),
options,
self.get_headers(),
self.proxies,
Expand Down Expand Up @@ -654,7 +682,7 @@ def start_archive(

logger.debug(
"POST to %r with params %r, headers %r, proxies %r",
self.endpoints.archive_url(),
self.endpoints.get_archive_url(),
json.dumps(payload),
self.get_json_headers(),
self.proxies,
Expand Down Expand Up @@ -701,7 +729,7 @@ def stop_archive(self, archive_id):
"""
logger.debug(
"POST to %r with headers %r, proxies %r",
self.endpoints.archive_url(archive_id) + "/stop",
self.endpoints.get_archive_url(archive_id) + "/stop",
self.get_json_headers(),
self.proxies,
)
Expand Down Expand Up @@ -736,7 +764,7 @@ def delete_archive(self, archive_id):
"""
logger.debug(
"DELETE to %r with headers %r, proxies %r",
self.endpoints.archive_url(archive_id),
self.endpoints.get_archive_url(archive_id),
self.get_json_headers(),
self.proxies,
)
Expand Down Expand Up @@ -766,7 +794,7 @@ def get_archive(self, archive_id):
"""
logger.debug(
"GET to %r with headers %r, proxies %r",
self.endpoints.archive_url(archive_id),
self.endpoints.get_archive_url(archive_id),
self.get_json_headers(),
self.proxies,
)
Expand Down Expand Up @@ -959,7 +987,7 @@ def send_signal(self, session_id, payload, connection_id=None):
"""
logger.debug(
"POST to %r with params %r, headers %r, proxies %r",
self.endpoints.signaling_url(session_id, connection_id),
self.endpoints.get_signaling_url(session_id, connection_id),
json.dumps(payload),
self.get_json_headers(),
self.proxies,
Expand Down Expand Up @@ -1456,7 +1484,7 @@ def start_broadcast(self, session_id, options, stream_mode=BroadcastStreamModes.

payload.update(options)

endpoint = self.endpoints.broadcast_url()
endpoint = self.endpoints.get_broadcast_url()

logger.debug(
"POST to %r with params %r, headers %r, proxies %r",
Expand Down Expand Up @@ -1500,7 +1528,7 @@ def stop_broadcast(self, broadcast_id):
projectId, createdAt, updatedAt and resolution
"""

endpoint = self.endpoints.broadcast_url(broadcast_id, stop=True)
endpoint = self.endpoints.get_broadcast_url(broadcast_id, stop=True)

logger.debug(
"POST to %r with headers %r, proxies %r",
Expand Down Expand Up @@ -1639,7 +1667,7 @@ def get_broadcast(self, broadcast_id):
projectId, createdAt, updatedAt, resolution, broadcastUrls and status
"""

endpoint = self.endpoints.broadcast_url(broadcast_id)
endpoint = self.endpoints.get_broadcast_url(broadcast_id)

logger.debug(
"GET to %r with headers %r, proxies %r",
Expand Down Expand Up @@ -1697,7 +1725,7 @@ def set_broadcast_layout(
if stylesheet is not None:
payload["stylesheet"] = stylesheet

endpoint = self.endpoints.broadcast_url(broadcast_id, layout=True)
endpoint = self.endpoints.get_broadcast_url(broadcast_id, layout=True)

logger.debug(
"PUT to %r with params %r, headers %r, proxies %r",
Expand Down
2 changes: 1 addition & 1 deletion opentok/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# see: http://legacy.python.org/dev/peps/pep-0440/#public-version-identifiers
__version__ = "3.9.2"
__version__ = "3.10.0"

6 changes: 2 additions & 4 deletions sample/HelloWorld/helloworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
def hello():
key = api_key
session_id = session.session_id
token = opentok.generate_token(session_id)
return render_template(
"index.html", api_key=key, session_id=session_id, token=token
)
token = opentok.generate_token(session_id, use_jwt=True)
return render_template("index.html", api_key=key, session_id=session_id, token=token)


if __name__ == "__main__":
Expand Down
38 changes: 22 additions & 16 deletions tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
from six import text_type, u, b, PY3
from six import u, PY3
from six.moves.urllib.parse import parse_qs
import base64
import hmac
import hashlib
from jwt import decode


def token_decoder(token):
def token_decoder(token: str, secret: str = None):
token_data = {}
# remove sentinal
encoded = token[4:]
decoded = base64.b64decode(encoded.encode("utf-8"))
# decode the bytes object back to unicode with utf-8 encoding
if PY3:
decoded = decoded.decode()
parts = decoded.split(u(":"))
for decoded_part in iter(parts):
token_data.update(parse_qs(decoded_part))
# TODO: probably a more elegent way
for k in iter(token_data):
token_data[k] = token_data[k][0]
token_data[u("data_string")] = parts[1]
return token_data
if token.startswith("T1=="):
encoded = token[4:]

# decode the token from base64
decoded = base64.b64decode(encoded.encode("utf-8"))
# decode the bytes object back to unicode with utf-8 encoding
if PY3:
decoded = decoded.decode()
parts = decoded.split(u(":"))
for decoded_part in iter(parts):
token_data.update(parse_qs(decoded_part))
# TODO: probably a more elegant way
for k in iter(token_data):
token_data[k] = token_data[k][0]
token_data[u("data_string")] = parts[1]
return token_data

encoded = token.replace('Bearer ', '').strip()
return decode(encoded, secret, algorithms='HS256')


def token_signature_validator(token, secret):
Expand Down
1 change: 1 addition & 0 deletions tests/test_http_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def setUp(self):
def tearDown(self):
httpretty.disable()

@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
def test_timeout(self):
with pytest.raises(OpenTokException):
opentok = Client(self.api_key, self.api_secret, timeout=1)
Expand Down
12 changes: 5 additions & 7 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import unittest
from six import text_type, u, b, PY2, PY3
from six import text_type, u

from opentok import Client, Session, Roles, MediaModes
from .helpers import token_decoder, token_signature_validator
from .helpers import token_decoder


class SessionTest(unittest.TestCase):
Expand All @@ -20,15 +20,13 @@ def test_generate_token(self):
)
token = session.generate_token()
assert isinstance(token, text_type)
assert token_decoder(token)[u("session_id")] == self.session_id
assert token_signature_validator(token, self.api_secret)
assert token_decoder(token, self.api_secret)[u("session_id")] == self.session_id

def test_generate_role_token(self):
session = Session(
self.opentok, self.session_id, media_mode=MediaModes.routed, location=None
)
token = session.generate_token(role=Roles.moderator)
assert isinstance(token, text_type)
assert token_decoder(token)[u("session_id")] == self.session_id
assert token_decoder(token)[u("role")] == u("moderator")
assert token_signature_validator(token, self.api_secret)
assert token_decoder(token, self.api_secret)[u("session_id")] == self.session_id
assert token_decoder(token, self.api_secret)[u("role")] == u("moderator")
52 changes: 23 additions & 29 deletions tests/test_token_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,56 @@ def setUp(self):
)
self.opentok = Client(self.api_key, self.api_secret)

def test_generate_plain_token(self):
token = self.opentok.generate_token(self.session_id)
def test_generate_plain_token_t1(self):
token = self.opentok.generate_token(self.session_id, use_jwt=False)
assert isinstance(token, text_type)
assert token_decoder(token)[u("session_id")] == self.session_id
assert token_signature_validator(token, self.api_secret)

def test_generate_plain_token_jwt(self):
token = self.opentok.generate_token(self.session_id)
assert isinstance(token, text_type)
assert token_decoder(token, self.api_secret)[u("session_id")] == self.session_id

def test_generate_role_token(self):
token = self.opentok.generate_token(self.session_id, Roles.moderator)
assert isinstance(token, text_type)
assert token_decoder(token)[u("role")] == Roles.moderator.value
assert token_signature_validator(token, self.api_secret)
assert token_decoder(token, self.api_secret)[u("role")] == Roles.moderator.value

token = self.opentok.generate_token(self.session_id, role=Roles.moderator)
assert isinstance(token, text_type)
assert token_decoder(token)[u("role")] == Roles.moderator.value
assert token_signature_validator(token, self.api_secret)
assert token_decoder(token, self.api_secret)[u("role")] == Roles.moderator.value

token = self.opentok.generate_token(self.session_id, Roles.publisher_only)
assert token_decoder(token)["role"] == Roles.publisher_only.value
assert token_signature_validator(token, self.api_secret)
assert token_decoder(token, self.api_secret)["role"] == Roles.publisher_only.value

def test_generate_expires_token(self):
# an integer is a valid argument
expire_time = int(time.time()) + 100
token = self.opentok.generate_token(self.session_id, expire_time=expire_time)
assert isinstance(token, text_type)
assert token_decoder(token)[u("expire_time")] == text_type(expire_time)
assert token_signature_validator(token, self.api_secret)
print(token_decoder(token, self.api_secret))
assert token_decoder(token, self.api_secret)[u("exp")] == expire_time
# anything that can be coerced into an integer is also valid
expire_time = text_type(int(time.time()) + 100)
token = self.opentok.generate_token(self.session_id, expire_time=expire_time)
assert isinstance(token, text_type)
assert token_decoder(token)[u("expire_time")] == expire_time
assert token_signature_validator(token, self.api_secret)
assert token_decoder(token, self.api_secret)[u("exp")] == int(expire_time)
# a datetime object is also valid
if PY2:
expire_time = datetime.datetime.fromtimestamp(
time.time(), pytz.UTC
) + datetime.timedelta(days=1)
if PY3:
expire_time = datetime.datetime.fromtimestamp(
time.time(), datetime.timezone.utc
) + datetime.timedelta(days=1)
expire_time = datetime.datetime.fromtimestamp(
time.time(), datetime.timezone.utc
) + datetime.timedelta(days=1)
token = self.opentok.generate_token(self.session_id, expire_time=expire_time)
assert isinstance(token, text_type)
assert token_decoder(token)[u("expire_time")] == text_type(
calendar.timegm(expire_time.utctimetuple())
assert token_decoder(token, self.api_secret)[u("exp")] == calendar.timegm(
expire_time.utctimetuple()
)
assert token_signature_validator(token, self.api_secret)

def test_generate_data_token(self):
data = u("name=Johnny")
token = self.opentok.generate_token(self.session_id, data=data)
assert isinstance(token, text_type)
assert token_decoder(token)[u("connection_data")] == data
assert token_signature_validator(token, self.api_secret)
assert token_decoder(token, self.api_secret)[u("connection_data")] == data

def test_generate_initial_layout_class_list(self):
initial_layout_class_list = [u("focus"), u("small")]
Expand All @@ -84,15 +78,15 @@ def test_generate_initial_layout_class_list(self):
)
assert isinstance(token, text_type)
assert sorted(
token_decoder(token)[u("initial_layout_class_list")].split(u(" "))
token_decoder(token, self.api_secret)[u("initial_layout_class_list")].split(
u(" ")
)
) == sorted(initial_layout_class_list)
assert token_signature_validator(token, self.api_secret)

def test_generate_no_data_token(self):
token = self.opentok.generate_token(self.session_id)
assert isinstance(token, text_type)
assert u("connection_data") not in token_decoder(token)
assert token_signature_validator(token, self.api_secret)
assert u("connection_data") not in token_decoder(token, self.api_secret)

def test_does_not_generate_token_without_params(self):
with pytest.raises(TypeError):
Expand Down