Skip to content

Commit f81924c

Browse files
author
Yurii Konovaliuk
committed
Handled expiration exceptions during selection of decryption method.
1 parent da6c833 commit f81924c

File tree

2 files changed

+276
-10
lines changed

2 files changed

+276
-10
lines changed

jose.py

+5
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,11 @@ def decrypt(*args, **kwargs):
449449
"""
450450
try:
451451
return legacy_decrypt(*args, **kwargs)
452+
except (NotYetValid, Expired) as e:
453+
# these should be raised immediately.
454+
# The token has been decrypted successfully to get to here.
455+
# decrypting using `legacy_decrypt` will not help things.
456+
raise e
452457
except (Error, ValueError) as e:
453458
return spec_compliant_decrypt(*args, **kwargs)
454459

tests.py

+271-10
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def test_jwe_invalid_base64(self):
186186
except jose.Error as e:
187187
pass
188188

189-
self.assertEquals(
189+
self.assertEqual(
190190
e.args[0],
191191
'Unable to decode base64: Incorrect padding'
192192
)
@@ -206,7 +206,7 @@ def test_jwe_expired_error_with_exp_claim(self):
206206
except jose.Expired as e:
207207
pass
208208

209-
self.assertEquals(
209+
self.assertEqual(
210210
e.args[0],
211211
'Token expired at {}'.format(
212212
jose._format_timestamp(claims[jose.CLAIM_EXPIRATION_TIME])
@@ -233,7 +233,7 @@ def test_jwe_expired_error_with_iat_claim(self):
233233
pass
234234

235235
expiration_time = claims[jose.CLAIM_ISSUED_AT] + expiry_seconds
236-
self.assertEquals(
236+
self.assertEqual(
237237
e.args[0],
238238
'Token expired at {}'.format(
239239
jose._format_timestamp(expiration_time)
@@ -255,7 +255,7 @@ def test_jwe_not_yet_valid_error_with_nbf_claim(self):
255255
except jose.NotYetValid as e:
256256
pass
257257

258-
self.assertEquals(
258+
self.assertEqual(
259259
e.args[0],
260260
'Token not valid until {}'.format(
261261
jose._format_timestamp(claims[jose.CLAIM_NOT_BEFORE])
@@ -269,7 +269,7 @@ def test_jwe_ignores_expired_token_if_validate_claims_is_false(self):
269269
validate_claims=False)
270270

271271
def test_format_timestamp(self):
272-
self.assertEquals(
272+
self.assertEqual(
273273
jose._format_timestamp(1403054056),
274274
'2014-06-18T01:14:16Z'
275275
)
@@ -366,7 +366,7 @@ def test_jwe_invalid_base64(self):
366366
except jose.Error as e:
367367
pass
368368

369-
self.assertEquals(
369+
self.assertEqual(
370370
e.args[0],
371371
'Unable to decode base64: Incorrect padding'
372372
)
@@ -386,7 +386,7 @@ def test_jwe_expired_error_with_exp_claim(self):
386386
except jose.Expired as e:
387387
pass
388388

389-
self.assertEquals(
389+
self.assertEqual(
390390
e.args[0],
391391
'Token expired at {}'.format(
392392
jose._format_timestamp(claims[jose.CLAIM_EXPIRATION_TIME])
@@ -413,7 +413,7 @@ def test_jwe_expired_error_with_iat_claim(self):
413413
pass
414414

415415
expiration_time = claims[jose.CLAIM_ISSUED_AT] + expiry_seconds
416-
self.assertEquals(
416+
self.assertEqual(
417417
e.args[0],
418418
'Token expired at {}'.format(
419419
jose._format_timestamp(expiration_time)
@@ -435,7 +435,7 @@ def test_jwe_not_yet_valid_error_with_nbf_claim(self):
435435
except jose.NotYetValid as e:
436436
pass
437437

438-
self.assertEquals(
438+
self.assertEqual(
439439
e.args[0],
440440
'Token not valid until {}'.format(
441441
jose._format_timestamp(claims[jose.CLAIM_NOT_BEFORE])
@@ -449,7 +449,7 @@ def test_jwe_ignores_expired_token_if_validate_claims_is_false(self):
449449
validate_claims=False)
450450

451451
def test_format_timestamp(self):
452-
self.assertEquals(
452+
self.assertEqual(
453453
jose._format_timestamp(1403054056),
454454
'2014-06-18T01:14:16Z'
455455
)
@@ -619,6 +619,81 @@ def test_jwe_decrypt_compliant(self):
619619
}
620620
self.assertEqual(jwt.header, expected_header)
621621

622+
def test_jwe_decrypt_compliant_incorrect_jwk(self):
623+
jwk_for_decrypt = {'k': RSA.generate(2048).exportKey('PEM')}
624+
625+
legacy_patch = mock.patch.object(
626+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
627+
)
628+
spec_patch = mock.patch.object(
629+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
630+
)
631+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
632+
with self.assertRaises(jose.Error) as decryption_error:
633+
jose.decrypt(
634+
jose.deserialize_compact(SPEC_COMPLIANT_TOKEN),
635+
jwk_for_decrypt)
636+
637+
self.assertEqual(legacy_mock.call_count, 1)
638+
self.assertEqual(spec_mock.call_count, 1)
639+
self.assertEqual(decryption_error.exception.message,
640+
"Incorrect decryption.")
641+
642+
def test_jwe_decrypt_compliant_expiry(self):
643+
expiry_seconds = 10
644+
claims = {jose.CLAIM_ISSUED_AT: int(time()) - 15}
645+
646+
jwe = jose.spec_compliant_encrypt(claims, rsa_pub_key)
647+
648+
legacy_patch = mock.patch.object(
649+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
650+
)
651+
spec_patch = mock.patch.object(
652+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
653+
)
654+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
655+
with self.assertRaises(jose.Expired) as expiry_error:
656+
jose.decrypt(jwe, rsa_priv_key, expiry_seconds=expiry_seconds)
657+
658+
expiration_time = claims[jose.CLAIM_ISSUED_AT] + expiry_seconds
659+
660+
# when the error is expiry, we should not fall back to legacy.
661+
self.assertEqual(legacy_mock.call_count, 1)
662+
self.assertEqual(spec_mock.call_count, 1)
663+
self.assertEqual(
664+
expiry_error.exception.message,
665+
'Token expired at {}'.format(
666+
jose._format_timestamp(expiration_time)
667+
)
668+
)
669+
670+
def test_jwe_decrypt_compliant_not_before(self):
671+
# not valid for another hour.
672+
claim_not_before = int(time()) + 3600
673+
claims = {jose.CLAIM_NOT_BEFORE: claim_not_before}
674+
675+
jwe = jose.spec_compliant_encrypt(claims, rsa_pub_key)
676+
677+
legacy_patch = mock.patch.object(
678+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
679+
)
680+
spec_patch = mock.patch.object(
681+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
682+
)
683+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
684+
with self.assertRaises(jose.NotYetValid) as not_valid_error:
685+
jose.decrypt(jwe, rsa_priv_key)
686+
687+
# when the error is expiry, we should not fall back to legacy.
688+
self.assertEqual(legacy_mock.call_count, 1)
689+
self.assertEqual(spec_mock.call_count, 1)
690+
self.assertEqual(
691+
not_valid_error.exception.message,
692+
'Token not valid until {}'.format(
693+
jose._format_timestamp(claim_not_before)
694+
)
695+
)
696+
622697
def test_jwe_decrypt_legacy_v1(self):
623698
jwk = {'k': PRIVATE_KEY}
624699
legacy_patch = mock.patch.object(
@@ -640,6 +715,192 @@ def test_jwe_decrypt_legacy_v1(self):
640715
}
641716
self.assertEqual(jwt.header, expected_header)
642717

718+
def test_jwe_decrypt_legacy_v1_expiry(self):
719+
expiry_seconds = 10
720+
claims = {jose.CLAIM_ISSUED_AT: int(time()) - 15}
721+
722+
jwe = jose.encrypt(claims, rsa_pub_key)
723+
724+
legacy_patch = mock.patch.object(
725+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
726+
)
727+
spec_patch = mock.patch.object(
728+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
729+
)
730+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
731+
with self.assertRaises(jose.Expired) as expiry_error:
732+
jose.decrypt(jwe, rsa_priv_key, expiry_seconds=expiry_seconds)
733+
734+
expiration_time = claims[jose.CLAIM_ISSUED_AT] + expiry_seconds
735+
736+
self.assertEqual(legacy_mock.call_count, 1)
737+
self.assertEqual(spec_mock.call_count, 0)
738+
739+
self.assertEqual(
740+
expiry_error.exception.message,
741+
'Token expired at {}'.format(
742+
jose._format_timestamp(expiration_time)
743+
)
744+
)
745+
746+
def test_jwe_decrypt_legacy_v1_not_yet_valid(self):
747+
# not valid for another hour.
748+
claim_not_before = int(time()) + 3600
749+
claims = {jose.CLAIM_NOT_BEFORE: claim_not_before}
750+
751+
jwe = jose.encrypt(claims, rsa_pub_key)
752+
753+
legacy_patch = mock.patch.object(
754+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
755+
)
756+
spec_patch = mock.patch.object(
757+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
758+
)
759+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
760+
with self.assertRaises(jose.NotYetValid) as not_valid_error:
761+
jose.decrypt(jwe, rsa_priv_key)
762+
763+
self.assertEqual(legacy_mock.call_count, 1)
764+
self.assertEqual(spec_mock.call_count, 0)
765+
766+
self.assertEqual(
767+
not_valid_error.exception.message,
768+
'Token not valid until {}'.format(
769+
jose._format_timestamp(claim_not_before)
770+
)
771+
)
772+
773+
def test_jwe_decrypt_legacy_v1_incorrect_jwk(self):
774+
jwk_for_decrypt = {'k': RSA.generate(2048).exportKey('PEM')}
775+
776+
legacy_patch = mock.patch.object(
777+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
778+
)
779+
spec_patch = mock.patch.object(
780+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
781+
)
782+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
783+
with self.assertRaises(jose.Error) as decryption_error:
784+
jose.decrypt(
785+
jose.deserialize_compact(LEGACY_V1_TOKEN),
786+
jwk_for_decrypt)
787+
788+
self.assertEqual(legacy_mock.call_count, 1)
789+
self.assertEqual(spec_mock.call_count, 1)
790+
791+
self.assertEqual(decryption_error.exception.message,
792+
"Incorrect decryption.")
793+
794+
def test_jwe_decrypt_legacy_v1_without_temp_ver(self):
795+
jwk = {'k': PRIVATE_KEY}
796+
797+
legacy_patch = mock.patch.object(
798+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
799+
)
800+
spec_patch = mock.patch.object(
801+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
802+
)
803+
804+
legacy_legacy_temp_ver = jose.serialize_compact(
805+
legacy_encrypt(claims, jwk)
806+
)
807+
808+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
809+
jwt = jose.decrypt(
810+
jose.deserialize_compact(legacy_legacy_temp_ver), jwk)
811+
812+
self.assertEqual(legacy_mock.call_count, 1)
813+
self.assertEqual(spec_mock.call_count, 0)
814+
self.assertEqual(jwt.claims, claims)
815+
expected_header = {
816+
'alg': 'RSA-OAEP',
817+
'enc': 'A128CBC-HS256',
818+
}
819+
self.assertEqual(jwt.header, expected_header)
820+
self.assertNotIn('__v', jwt.header)
821+
822+
def test_jwe_decrypt_legacy_v1_without_temp_ver_incorrect_jwk(self):
823+
jwk_for_encrypt = {'k': PRIVATE_KEY}
824+
825+
legacy_legacy_temp_ver = jose.serialize_compact(
826+
legacy_encrypt(claims, jwk_for_encrypt)
827+
)
828+
829+
jwk_for_decrypt = {'k': RSA.generate(2048).exportKey('PEM')}
830+
831+
legacy_patch = mock.patch.object(
832+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
833+
)
834+
spec_patch = mock.patch.object(
835+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
836+
)
837+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
838+
with self.assertRaises(jose.Error) as decryption_error:
839+
jose.decrypt(
840+
jose.deserialize_compact(legacy_legacy_temp_ver),
841+
jwk_for_decrypt)
842+
843+
self.assertEqual(legacy_mock.call_count, 1)
844+
self.assertEqual(spec_mock.call_count, 1)
845+
846+
self.assertEqual(decryption_error.exception.message,
847+
"Incorrect decryption.")
848+
849+
def test_jwe_decrypt_legacy_v1_without_temp_var_expiry(self):
850+
expiry_seconds = 10
851+
claims = {jose.CLAIM_ISSUED_AT: int(time()) - 15}
852+
853+
jwe = legacy_encrypt(claims, rsa_pub_key)
854+
855+
legacy_patch = mock.patch.object(
856+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
857+
)
858+
spec_patch = mock.patch.object(
859+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
860+
)
861+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
862+
with self.assertRaises(jose.Expired) as expiry_error:
863+
jose.decrypt(jwe, rsa_priv_key, expiry_seconds=expiry_seconds)
864+
865+
expiration_time = claims[jose.CLAIM_ISSUED_AT] + expiry_seconds
866+
867+
self.assertEqual(legacy_mock.call_count, 1)
868+
self.assertEqual(spec_mock.call_count, 0)
869+
870+
self.assertEqual(
871+
expiry_error.exception.message,
872+
'Token expired at {}'.format(
873+
jose._format_timestamp(expiration_time)
874+
)
875+
)
876+
877+
def test_jwe_decrypt_legacy_v1_without_temp_ver_not_yet_valid(self):
878+
# not valid for another hour.
879+
claim_not_before = int(time()) + 3600
880+
claims = {jose.CLAIM_NOT_BEFORE: claim_not_before}
881+
882+
jwe = legacy_encrypt(claims, rsa_pub_key)
883+
884+
legacy_patch = mock.patch.object(
885+
jose, 'legacy_decrypt', wraps=jose.legacy_decrypt
886+
)
887+
spec_patch = mock.patch.object(
888+
jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt
889+
)
890+
with legacy_patch as legacy_mock, spec_patch as spec_mock:
891+
with self.assertRaises(jose.NotYetValid) as not_valid_error:
892+
jose.decrypt(jwe, rsa_priv_key)
893+
894+
self.assertEqual(legacy_mock.call_count, 1)
895+
self.assertEqual(spec_mock.call_count, 0)
896+
897+
self.assertEqual(
898+
not_valid_error.exception.message,
899+
'Token not valid until {}'.format(
900+
jose._format_timestamp(claim_not_before)
901+
)
902+
)
903+
643904

644905
class TestUtils(unittest.TestCase):
645906
def test_b64encode_url_utf8(self):

0 commit comments

Comments
 (0)