16
16
17
17
"""
18
18
import abc
19
+ import io
19
20
import tempfile
20
21
from collections import OrderedDict
21
22
from datetime import datetime , timedelta
22
- from typing import Any , ClassVar , Dict , List , Mapping , Optional , Tuple , Type
23
+ from typing import (
24
+ Any ,
25
+ BinaryIO ,
26
+ ClassVar ,
27
+ Dict ,
28
+ List ,
29
+ Mapping ,
30
+ Optional ,
31
+ Tuple ,
32
+ Type ,
33
+ Union ,
34
+ )
23
35
36
+ from securesystemslib import hash as sslib_hash
24
37
from securesystemslib import keys as sslib_keys
25
38
from securesystemslib .signer import Signature , Signer
26
39
from securesystemslib .storage import FilesystemBackend , StorageBackendInterface
@@ -644,7 +657,53 @@ def remove_key(self, role: str, keyid: str) -> None:
644
657
del self .keys [keyid ]
645
658
646
659
647
- class MetaFile :
660
+ class BaseFile :
661
+ """A base class of MetaFile and TargetFile.
662
+
663
+ Encapsulates common static methods for length and hash verification.
664
+ """
665
+
666
+ @staticmethod
667
+ def _verify_hashes (
668
+ data : Union [bytes , BinaryIO ], expected_hashes : Dict [str , str ]
669
+ ) -> None :
670
+ """Verifies that the hash of 'data' matches 'expected_hashes'"""
671
+ is_bytes = isinstance (data , bytes )
672
+ for algo , exp_hash in expected_hashes .items ():
673
+ if is_bytes :
674
+ digest_object = sslib_hash .digest (algo )
675
+ digest_object .update (data )
676
+ else :
677
+ # if data is not bytes, assume it is a file object
678
+ digest_object = sslib_hash .digest_fileobject (data , algo )
679
+
680
+ observed_hash = digest_object .hexdigest ()
681
+ if observed_hash != exp_hash :
682
+ raise exceptions .LengthOrHashMismatchError (
683
+ f"Observed hash { observed_hash } does not match"
684
+ f"expected hash { exp_hash } "
685
+ )
686
+
687
+ @staticmethod
688
+ def _verify_length (
689
+ data : Union [bytes , BinaryIO ], expected_length : int
690
+ ) -> None :
691
+ """Verifies that the length of 'data' matches 'expected_length'"""
692
+ if isinstance (data , bytes ):
693
+ observed_length = len (data )
694
+ else :
695
+ # if data is not bytes, assume it is a file object
696
+ data .seek (0 , io .SEEK_END )
697
+ observed_length = data .tell ()
698
+
699
+ if observed_length != expected_length :
700
+ raise exceptions .LengthOrHashMismatchError (
701
+ f"Observed length { observed_length } does not match"
702
+ f"expected length { expected_length } "
703
+ )
704
+
705
+
706
+ class MetaFile (BaseFile ):
648
707
"""A container with information about a particular metadata file.
649
708
650
709
Attributes:
@@ -682,6 +741,13 @@ def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile":
682
741
version = meta_dict .pop ("version" )
683
742
length = meta_dict .pop ("length" , None )
684
743
hashes = meta_dict .pop ("hashes" , None )
744
+
745
+ # Do some basic input validation
746
+ if version <= 0 :
747
+ raise ValueError (f"Metafile version must be > 0, got { version } " )
748
+ if length is not None and length <= 0 :
749
+ raise ValueError (f"Metafile length must be > 0, got { length } " )
750
+
685
751
# All fields left in the meta_dict are unrecognized.
686
752
return cls (version , length , hashes , meta_dict )
687
753
@@ -700,6 +766,22 @@ def to_dict(self) -> Dict[str, Any]:
700
766
701
767
return res_dict
702
768
769
+ def verify_length_and_hashes (self , data : Union [bytes , BinaryIO ]):
770
+ """Verifies that the length and hashes of "data" match expected
771
+ values.
772
+ Args:
773
+ data: File object or its content in bytes.
774
+ Raises:
775
+ LengthOrHashMismatchError: Calculated length or hashes do not
776
+ match expected values.
777
+ """
778
+ if self .length is not None :
779
+ self ._verify_length (data , self .length )
780
+
781
+ # Skip the check in case of an empty dictionary too
782
+ if self .hashes :
783
+ self ._verify_hashes (data , self .hashes )
784
+
703
785
704
786
class Timestamp (Signed ):
705
787
"""A container for the signed part of timestamp metadata.
@@ -927,7 +1009,7 @@ def to_dict(self) -> Dict[str, Any]:
927
1009
}
928
1010
929
1011
930
- class TargetFile :
1012
+ class TargetFile ( BaseFile ) :
931
1013
"""A container with information about a particular target file.
932
1014
933
1015
Attributes:
@@ -945,12 +1027,6 @@ class TargetFile:
945
1027
946
1028
"""
947
1029
948
- @property
949
- def custom (self ):
950
- if self .unrecognized_fields is None :
951
- return None
952
- return self .unrecognized_fields .get ("custom" , None )
953
-
954
1030
def __init__ (
955
1031
self ,
956
1032
length : int ,
@@ -961,11 +1037,24 @@ def __init__(
961
1037
self .hashes = hashes
962
1038
self .unrecognized_fields = unrecognized_fields or {}
963
1039
1040
+ @property
1041
+ def custom (self ):
1042
+ if self .unrecognized_fields is None :
1043
+ return None
1044
+ return self .unrecognized_fields .get ("custom" , None )
1045
+
964
1046
@classmethod
965
1047
def from_dict (cls , target_dict : Dict [str , Any ]) -> "TargetFile" :
966
1048
"""Creates TargetFile object from its dict representation."""
967
1049
length = target_dict .pop ("length" )
968
1050
hashes = target_dict .pop ("hashes" )
1051
+
1052
+ # Do some basic validation checks
1053
+ if length <= 0 :
1054
+ raise ValueError (f"Targetfile length must be > 0, got { length } " )
1055
+ if not hashes :
1056
+ raise ValueError ("Missing targetfile hashes" )
1057
+
969
1058
# All fields left in the target_dict are unrecognized.
970
1059
return cls (length , hashes , target_dict )
971
1060
@@ -977,6 +1066,18 @@ def to_dict(self) -> Dict[str, Any]:
977
1066
** self .unrecognized_fields ,
978
1067
}
979
1068
1069
+ def verify_length_and_hashes (self , data : Union [bytes , BinaryIO ]):
1070
+ """Verifies that the length and hashes of "data" match expected
1071
+ values.
1072
+ Args:
1073
+ data: File object or its content in bytes.
1074
+ Raises:
1075
+ LengthOrHashMismatchError: Calculated length or hashes do not
1076
+ match expected values.
1077
+ """
1078
+ self ._verify_length (data , self .length )
1079
+ self ._verify_hashes (data , self .hashes )
1080
+
980
1081
981
1082
class Targets (Signed ):
982
1083
"""A container for the signed part of targets metadata.
0 commit comments