1
- #!/usr/bin/env python
2
- #
3
- # Common corpus functions
1
+ """Common corpus functions"""
2
+
4
3
import logging
5
4
import struct
5
+ from pathlib import Path
6
+ from typing import BinaryIO , Optional
7
+
8
+ from curl_fuzzer_tools .curl_test_data import TestData
6
9
7
10
log = logging .getLogger (__name__ )
8
11
9
12
10
13
class BaseType (object ):
14
+ """Known TLV types"""
15
+
11
16
TYPE_URL = 1
12
17
TYPE_RSP0 = 2
13
18
TYPE_USERNAME = 3
@@ -59,7 +64,7 @@ class BaseType(object):
59
64
TYPE_WS_OPTIONS = 49
60
65
TYPE_CONNECT_ONLY = 50
61
66
TYPE_HSTS = 51
62
- TYPE_HTTPPOSTBODY = 52 # https://curl.se/libcurl/c/CURLOPT_HTTPPOST.html
67
+ TYPE_HTTPPOSTBODY = 52 # https://curl.se/libcurl/c/CURLOPT_HTTPPOST.html
63
68
64
69
TYPEMAP = {
65
70
TYPE_URL : "CURLOPT_URL" ,
@@ -118,56 +123,79 @@ class BaseType(object):
118
123
119
124
120
125
class TLVEncoder (BaseType ):
121
- def __init__ (self , output , test_data ):
126
+ """Class for encoding TLVs"""
127
+
128
+ def __init__ (self , output : BinaryIO , test_data : TestData ) -> None :
129
+ """Create a TLVEncoder object"""
122
130
self .output = output
123
131
self .test_data = test_data
124
132
125
- def write_string (self , tlv_type , wstring ):
133
+ def write_string (self , tlv_type : int , wstring : str ) -> None :
134
+ """Write a string TLV to the output"""
126
135
data = wstring .encode ("utf-8" )
127
136
self .write_tlv (tlv_type , len (data ), data )
128
137
129
- def write_u32 (self , tlv_type , num ):
138
+ def write_u32 (self , tlv_type : int , num : int ) -> None :
139
+ """Write an unsigned 32-bit integer TLV to the output"""
130
140
data = struct .pack ("!L" , num )
131
141
self .write_tlv (tlv_type , len (data ), data )
132
142
133
- def write_bytes (self , tlv_type , bytedata ):
143
+ def write_bytes (self , tlv_type : int , bytedata : bytes ) -> None :
144
+ """Write a bytes TLV to the output"""
134
145
self .write_tlv (tlv_type , len (bytedata ), bytedata )
135
146
136
- def maybe_write_string (self , tlv_type , wstring ):
147
+ def maybe_write_string (self , tlv_type : int , wstring : Optional [str ]) -> None :
148
+ """Write a string TLV to the output if specified"""
137
149
if wstring is not None :
138
150
self .write_string (tlv_type , wstring )
139
151
140
- def maybe_write_u32 (self , tlv_type , num ):
152
+ def maybe_write_u32 (self , tlv_type : int , num : Optional [int ]) -> None :
153
+ """Write an unsigned 32-bit integer TLV to the output if specified"""
141
154
if num is not None :
142
155
self .write_u32 (tlv_type , num )
143
156
144
- def maybe_write_response (self , rsp_type , rsp , rsp_file , rsp_test ):
157
+ def maybe_write_response (
158
+ self , rsp_type : int , rsp : Optional [str ], rsp_file : Optional [Path ], rsp_test : int
159
+ ) -> None :
160
+ """Write a response TLV to the output if specified"""
145
161
if rsp :
146
- self .write_bytes (rsp_type ,
147
- rsp .encode ("utf-8" ))
162
+ self .write_bytes (rsp_type , rsp .encode ("utf-8" ))
148
163
elif rsp_file :
149
164
with open (rsp_file , "rb" ) as g :
150
165
self .write_bytes (rsp_type , g .read ())
151
166
elif rsp_test :
152
167
wstring = self .test_data .get_test_data (rsp_test )
153
168
self .write_bytes (rsp_type , wstring .encode ("utf-8" ))
154
169
155
- def write_mimepart (self , namevalue ):
170
+ def write_mimepart (self , namevalue : str ) -> None :
171
+ """Write a MIME part TLV to the output"""
156
172
(name , value ) = namevalue .split (":" , 1 )
157
173
158
174
# Create some mimepart TLVs for the name and value
159
- name_tlv = self .encode_tlv (self .TYPE_MIME_PART_NAME , len (name ), name )
160
- value_tlv = self .encode_tlv (self .TYPE_MIME_PART_DATA , len (value ), value )
175
+ name_bytes = name .encode ("utf-8" )
176
+ value_bytes = value .encode ("utf-8" )
177
+
178
+ name_tlv = self .encode_tlv (
179
+ self .TYPE_MIME_PART_NAME , len (name_bytes ), name_bytes
180
+ )
181
+ value_tlv = self .encode_tlv (
182
+ self .TYPE_MIME_PART_DATA , len (value_bytes ), value_bytes
183
+ )
161
184
162
185
# Combine the two TLVs into a single TLV.
163
186
part_tlv = name_tlv + value_tlv
164
187
self .write_tlv (self .TYPE_MIME_PART , len (part_tlv ), part_tlv )
165
188
166
- def encode_tlv (self , tlv_type , tlv_length , tlv_data = None ):
167
- log .debug ("Encoding TLV %r, length %d, data %r" ,
168
- self .TYPEMAP .get (tlv_type , "<unknown>" ),
169
- tlv_length ,
170
- tlv_data )
189
+ def encode_tlv (
190
+ self , tlv_type : int , tlv_length : int , tlv_data : Optional [bytes ] = None
191
+ ) -> bytes :
192
+ """Encodes the Type, Length, and Value into a bytes array"""
193
+ log .debug (
194
+ "Encoding TLV %r, length %d, data %r" ,
195
+ self .TYPEMAP .get (tlv_type , "<unknown>" ),
196
+ tlv_length ,
197
+ tlv_data ,
198
+ )
171
199
172
200
data = struct .pack ("!H" , tlv_type )
173
201
data = data + struct .pack ("!L" , tlv_length )
@@ -176,56 +204,79 @@ def encode_tlv(self, tlv_type, tlv_length, tlv_data=None):
176
204
177
205
return data
178
206
179
- def write_tlv (self , tlv_type , tlv_length , tlv_data = None ):
180
- log .debug ("Writing TLV %r, length %d, data %r" ,
181
- self .TYPEMAP .get (tlv_type , "<unknown>" ),
182
- tlv_length ,
183
- tlv_data )
207
+ def write_tlv (
208
+ self , tlv_type : int , tlv_length : int , tlv_data : Optional [bytes ] = None
209
+ ) -> None :
210
+ """Writes an encoded TLV to the output as bytes"""
211
+ log .debug (
212
+ "Writing TLV %r, length %d, data %r" ,
213
+ self .TYPEMAP .get (tlv_type , "<unknown>" ),
214
+ tlv_length ,
215
+ tlv_data ,
216
+ )
184
217
185
218
data = self .encode_tlv (tlv_type , tlv_length , tlv_data )
186
219
self .output .write (data )
187
220
188
221
222
+ class TLVContents (BaseType ):
223
+ """Class for TLV contents"""
224
+
225
+ TLV_DECODE_FMT = "!HL"
226
+ TLV_DECODE_FMT_LEN = struct .calcsize (TLV_DECODE_FMT )
227
+
228
+ def __init__ (self , data : bytes ) -> None :
229
+ """Create a TLVContents object"""
230
+ # Parse the data to populate the TLV fields
231
+ (stype , slen ) = struct .unpack (
232
+ self .TLV_DECODE_FMT , data [0 : self .TLV_DECODE_FMT_LEN ]
233
+ )
234
+ self .type = int (stype )
235
+ self .length = int (slen )
236
+
237
+ # Get the remaining data and store it.
238
+ self .data = data [
239
+ self .TLV_DECODE_FMT_LEN : self .TLV_DECODE_FMT_LEN + self .length
240
+ ]
241
+
242
+ def __repr__ (self ) -> str :
243
+ """Return a string representation of the TLVContents object"""
244
+ stype = self .TYPEMAP .get (self .type , "<unknown>" )
245
+ return (
246
+ f"{ self .__class__ .__name__ } (type={ stype !r} ({ self .type !r} ), "
247
+ f"length={ self .length !r} , data={ self .data !r} )"
248
+ )
249
+
250
+ def total_length (self ) -> int :
251
+ """Return the total length of the TLV, including the header"""
252
+ return self .TLV_DECODE_FMT_LEN + self .length
253
+
254
+
189
255
class TLVDecoder (BaseType ):
190
- def __init__ (self , inputdata ):
256
+ """Class for decoding TLVs"""
257
+
258
+ def __init__ (self , inputdata : bytes ) -> None :
259
+ """Create a TLVDecoder object"""
191
260
self .inputdata = inputdata
192
261
self .pos = 0
193
- self .tlv = None
262
+ self .tlv : Optional [ "TLVContents" ] = None
194
263
195
- def __iter__ (self ):
264
+ def __iter__ (self ) -> "TLVDecoder" :
265
+ """Return an iterator for the TLVs"""
196
266
self .pos = 0
197
267
self .tlv = None
198
268
return self
199
269
200
- def __next__ (self ):
270
+ def __next__ (self ) -> "TLVContents" :
271
+ """Return the next TLV in the input data"""
201
272
if self .tlv :
202
273
self .pos += self .tlv .total_length ()
203
274
204
- if (self .pos + TLVHeader .TLV_DECODE_FMT_LEN ) > len (self .inputdata ):
275
+ if (self .pos + TLVContents .TLV_DECODE_FMT_LEN ) > len (self .inputdata ):
205
276
raise StopIteration
206
277
207
278
# Get the next TLV
208
- self .tlv = TLVHeader (self .inputdata [self .pos :])
279
+ self .tlv = TLVContents (self .inputdata [self .pos :])
209
280
return self .tlv
210
281
211
282
next = __next__
212
-
213
-
214
- class TLVHeader (BaseType ):
215
- TLV_DECODE_FMT = "!HL"
216
- TLV_DECODE_FMT_LEN = struct .calcsize (TLV_DECODE_FMT )
217
-
218
- def __init__ (self , data ):
219
- # Parse the data to populate the TLV fields
220
- (self .type , self .length ) = struct .unpack (self .TLV_DECODE_FMT , data [0 :self .TLV_DECODE_FMT_LEN ])
221
-
222
- # Get the remaining data and store it.
223
- self .data = data [self .TLV_DECODE_FMT_LEN :self .TLV_DECODE_FMT_LEN + self .length ]
224
-
225
- def __repr__ (self ):
226
- return ("{self.__class__.__name__}(type={stype!r} ({self.type!r}), length={self.length!r}, data={self.data!r})"
227
- .format (self = self ,
228
- stype = self .TYPEMAP .get (self .type , "<unknown>" )))
229
-
230
- def total_length (self ):
231
- return self .TLV_DECODE_FMT_LEN + self .length
0 commit comments