16
16
17
17
"""
18
18
import abc
19
+ import io
19
20
import tempfile
20
21
from datetime import datetime , timedelta
21
- from typing import Any , ClassVar , Dict , List , Mapping , Optional , Tuple , Type
22
+ from typing import (
23
+ Any ,
24
+ BinaryIO ,
25
+ ClassVar ,
26
+ Dict ,
27
+ List ,
28
+ Mapping ,
29
+ Optional ,
30
+ Tuple ,
31
+ Type ,
32
+ Union ,
33
+ )
22
34
35
+ from securesystemslib import hash as sslib_hash
23
36
from securesystemslib import keys as sslib_keys
24
37
from securesystemslib .signer import Signature , Signer
25
38
from securesystemslib .storage import FilesystemBackend , StorageBackendInterface
@@ -622,7 +635,53 @@ def remove_key(self, role: str, keyid: str) -> None:
622
635
del self .keys [keyid ]
623
636
624
637
625
- class MetaFile :
638
+ class BaseFile :
639
+ """A base class of MetaFile and TargetFile.
640
+
641
+ Encapsulates common static methods for length and hash verification.
642
+ """
643
+
644
+ @staticmethod
645
+ def _verify_hashes (
646
+ data : Union [bytes , BinaryIO ], expected_hashes : Dict [str , str ]
647
+ ) -> None :
648
+ """Verifies that the hash of 'data' matches 'expected_hashes'"""
649
+ is_bytes = isinstance (data , bytes )
650
+ for algo , exp_hash in expected_hashes .items ():
651
+ if is_bytes :
652
+ digest_object = sslib_hash .digest (algo )
653
+ digest_object .update (data )
654
+ else :
655
+ # if data is not bytes, assume it is a file object
656
+ digest_object = sslib_hash .digest_fileobject (data , algo )
657
+
658
+ observed_hash = digest_object .hexdigest ()
659
+ if observed_hash != exp_hash :
660
+ raise exceptions .LengthOrHashMismatchError (
661
+ f"Observed hash { observed_hash } does not match"
662
+ f"expected hash { exp_hash } "
663
+ )
664
+
665
+ @staticmethod
666
+ def _verify_length (
667
+ data : Union [bytes , BinaryIO ], expected_length : int
668
+ ) -> None :
669
+ """Verifies that the length of 'data' matches 'expected_length'"""
670
+ if isinstance (data , bytes ):
671
+ observed_length = len (data )
672
+ else :
673
+ # if data is not bytes, assume it is a file object
674
+ data .seek (0 , io .SEEK_END )
675
+ observed_length = data .tell ()
676
+
677
+ if observed_length != expected_length :
678
+ raise exceptions .LengthOrHashMismatchError (
679
+ f"Observed length { observed_length } does not match"
680
+ f"expected length { expected_length } "
681
+ )
682
+
683
+
684
+ class MetaFile (BaseFile ):
626
685
"""A container with information about a particular metadata file.
627
686
628
687
Attributes:
@@ -678,6 +737,22 @@ def to_dict(self) -> Dict[str, Any]:
678
737
679
738
return res_dict
680
739
740
+ def verify_length_and_hashes (self , data : Union [bytes , BinaryIO ]):
741
+ """Verifies that the length and hashes of "data" match expected
742
+ values.
743
+ Args:
744
+ data: File object or its content in bytes.
745
+ Raises:
746
+ LengthOrHashMismatchError: Calculated length or hashes do not
747
+ match expected values.
748
+ """
749
+ if self .length is not None :
750
+ self ._verify_length (data , self .length )
751
+
752
+ # Skip the check in case of an empty dictionary too
753
+ if self .hashes :
754
+ self ._verify_hashes (data , self .hashes )
755
+
681
756
682
757
class Timestamp (Signed ):
683
758
"""A container for the signed part of timestamp metadata.
@@ -905,7 +980,7 @@ def to_dict(self) -> Dict[str, Any]:
905
980
}
906
981
907
982
908
- class TargetFile :
983
+ class TargetFile ( BaseFile ) :
909
984
"""A container with information about a particular target file.
910
985
911
986
Attributes:
@@ -923,12 +998,6 @@ class TargetFile:
923
998
924
999
"""
925
1000
926
- @property
927
- def custom (self ):
928
- if self .unrecognized_fields is None :
929
- return None
930
- return self .unrecognized_fields .get ("custom" , None )
931
-
932
1001
def __init__ (
933
1002
self ,
934
1003
length : int ,
@@ -939,6 +1008,12 @@ def __init__(
939
1008
self .hashes = hashes
940
1009
self .unrecognized_fields = unrecognized_fields or {}
941
1010
1011
+ @property
1012
+ def custom (self ):
1013
+ if self .unrecognized_fields is None :
1014
+ return None
1015
+ return self .unrecognized_fields .get ("custom" , None )
1016
+
942
1017
@classmethod
943
1018
def from_dict (cls , target_dict : Dict [str , Any ]) -> "TargetFile" :
944
1019
"""Creates TargetFile object from its dict representation."""
@@ -955,6 +1030,18 @@ def to_dict(self) -> Dict[str, Any]:
955
1030
** self .unrecognized_fields ,
956
1031
}
957
1032
1033
+ def verify_length_and_hashes (self , data : Union [bytes , BinaryIO ]):
1034
+ """Verifies that the length and hashes of "data" match expected
1035
+ values.
1036
+ Args:
1037
+ data: File object or its content in bytes.
1038
+ Raises:
1039
+ LengthOrHashMismatchError: Calculated length or hashes do not
1040
+ match expected values.
1041
+ """
1042
+ self ._verify_length (data , self .length )
1043
+ self ._verify_hashes (data , self .hashes )
1044
+
958
1045
959
1046
class Targets (Signed ):
960
1047
"""A container for the signed part of targets metadata.
0 commit comments