1
1
import hashlib
2
2
import hmac
3
+ import sys
3
4
from datetime import datetime as dt
4
5
from typing import Any , Dict , List
5
- from urllib .parse import ParseResult , urlparse , urlunparse
6
+ from urllib .parse import ParseResult , urlparse , urlunparse , parse_qsl , urlencode
6
7
7
8
from imagekitio .constants .defaults import Default
8
9
from imagekitio .constants .supported_transform import SUPPORTED_TRANS
9
10
from imagekitio .utils .formatter import camel_dict_to_snake_dict , flatten_dict
10
- from imagekitio . constants . supported_transform import SUPPORTED_TRANS
11
+
11
12
from .constants import ERRORS
12
13
13
14
TRANSFORMATION_PARAMETER = "tr"
@@ -33,106 +34,69 @@ def __init__(self, request_obj):
33
34
34
35
def generate_url (self , options : Dict = None ) -> str :
35
36
options = camel_dict_to_snake_dict (options )
36
- if options .get ("src" ):
37
- options ["transformation_position" ] = DEFAULT_TRANSFORMATION_POSITION
38
37
extended_options = self .request .extend_url_options (options )
39
38
return self .build_url (extended_options )
40
39
41
40
def build_url (self , options : dict ) -> str :
42
41
"""
43
42
builds url for from all options,
44
43
"""
45
- path = options .get ("path" , "" )
46
- src = options .get ("src" , "" )
47
- url_endpoint = options .get ("url_endpoint" , "" )
48
- transformation_position = options .get ("transformation_position" )
44
+
45
+ path = options .get ("path" , "" ).strip ("/" )
46
+ src = options .get ("src" , "" ).strip ("/" )
47
+ url_endpoint = options .get ("url_endpoint" , "" ).strip ("/" )
48
+ transformation_str = self .transformation_to_str (options .get ("transformation" ))
49
+ transformation_position = options .get ("transformation_position" ) or DEFAULT_TRANSFORMATION_POSITION
50
+
49
51
if transformation_position not in Default .VALID_TRANSFORMATION_POSITION .value :
50
52
raise ValueError (ERRORS .INVALID_TRANSFORMATION_POSITION .value )
51
53
52
- if src or (
53
- options .get ("transformation_position" ) == QUERY_TRANSFORMATION_POSITION
54
- ):
55
- src_param_used_for_url = True
56
- else :
57
- src_param_used_for_url = False
58
- if not (path or src ):
54
+ if (path is "" and src is "" ):
59
55
return ""
60
- result_url_dict = {"netloc" : "" , "path" : "" , "query" : "" }
61
- if path :
62
- parsed_url = urlparse (path )
63
- parsed_host = urlparse (url_endpoint )
64
- result_url_dict ["scheme" ] = parsed_host .scheme
65
- result_url_dict ["netloc" ] = (parsed_host .netloc + parsed_host .path ).lstrip (
66
- "/"
67
- )
68
- result_url_dict ["path" ] = parsed_url .path .strip ("/" )
69
56
57
+ if src :
58
+ temp_url = src .strip ("/" )
59
+ transformation_position = QUERY_TRANSFORMATION_POSITION
70
60
else :
71
- parsed_url = urlparse (src )
72
- host = parsed_url .netloc
73
- if parsed_url .username :
74
- # creating host like username:[email protected] if username is there in parsed url
75
- host = "{}:{}@{}" .format (
76
- parsed_url .username , parsed_url .password , parsed_url .netloc
77
- )
78
- result_url_dict ["netloc" ] = host
79
- result_url_dict ["scheme" ] = parsed_url .scheme
80
- result_url_dict ["path" ] = parsed_url .path
81
- src_param_used_for_url = True
82
-
83
- query_params = options .get ("query_parameters" , {})
84
- transformation_str = self .transformation_to_str (options .get ("transformation" ))
85
- if transformation_str :
86
- if (
87
- transformation_position == Default .QUERY_TRANSFORMATION_POSITION .value
88
- ) or src_param_used_for_url :
89
- result_url_dict ["query" ] = "{}={}" .format (
90
- TRANSFORMATION_PARAMETER , transformation_str
61
+ if transformation_position == "path" :
62
+ temp_url = "{}/{}:{}/{}" .format (
63
+ url_endpoint .strip ("/" ),
64
+ TRANSFORMATION_PARAMETER ,
65
+ transformation_str .strip ("/" ),
66
+ path .strip ("/" )
91
67
)
92
-
93
68
else :
94
- result_url_dict ["path" ] = "{}:{}/{}" .format (
95
- TRANSFORMATION_PARAMETER ,
96
- transformation_str ,
97
- result_url_dict ["path" ],
69
+ temp_url = "{}/{}" .format (
70
+ url_endpoint .strip ("/" ),
71
+ path .strip ("/" )
98
72
)
99
73
100
- result_url_dict [ "scheme" ] = result_url_dict [ "scheme" ] or "https"
74
+ url_object = urlparse ( temp_url . strip ( "/" ))
101
75
102
- # Signature String and Timestamp
103
- # We can do this only for URLs that are created using urlEndpoint and path parameter
104
- # because we need to know the endpoint to be able to remove it from the URL to create a signature
105
- # for the remaining. With the src parameter, we would not know the "pattern" in the URL
106
- if options .get ("signed" ) and (not options .get ("src" )):
76
+ query_params = dict (parse_qsl (url_object .query ))
77
+ query_params .update (options .get ("query_parameters" , {}))
78
+ if transformation_position == QUERY_TRANSFORMATION_POSITION :
79
+ query_params .update ({"tr" : transformation_str })
80
+ query_params .update ({"ik-sdk-version" : Default .SDK_VERSION .value })
81
+
82
+ # Update query params
83
+ url_object = url_object ._replace (query = urlencode (query_params ))
84
+
85
+ if options .get ("signed" ):
107
86
expire_seconds = options .get ("expire_seconds" )
108
87
private_key = options .get ("private_key" )
109
88
expiry_timestamp = self .get_signature_timestamp (expire_seconds )
110
-
111
- intermediate_url = urlunparse (
112
- result_url_dict .get (f , "" ) for f in ParseResult ._fields
113
- )
114
89
url_signature = self .get_signature (
115
90
private_key = private_key ,
116
- url = intermediate_url ,
91
+ url = url_object . geturl () ,
117
92
url_endpoint = url_endpoint ,
118
93
expiry_timestamp = expiry_timestamp ,
119
94
)
120
- if expiry_timestamp and (expiry_timestamp != DEFAULT_TIMESTAMP ):
121
- query_params [TIMESTAMP_PARAMETER ] = expiry_timestamp
122
- query_params [SIGNATURE_PARAMETER ] = url_signature
123
- query_params_str = "&" .join (
124
- str (k ) + "=" + str (v ) for k , v in query_params .items ()
125
- )
126
- result_url_dict ["query" ] = query_params_str
127
- result_url_dict = self .prepare_dict_for_unparse (result_url_dict )
128
- generated_url = urlunparse (
129
- result_url_dict .get (f , "" ) for f in ParseResult ._fields
130
- )
131
- if result_url_dict ["query" ]:
132
- generated_url = generated_url + "&sdk-version=" + Default .SDK_VERSION .value
133
- else :
134
- generated_url = generated_url + "?sdk-version=" + Default .SDK_VERSION .value
135
- return generated_url
95
+ query_params .update ({TIMESTAMP_PARAMETER : expiry_timestamp , SIGNATURE_PARAMETER : url_signature })
96
+ # Update signature related query params
97
+ url_object = url_object ._replace (query = urlencode (query_params ))
98
+
99
+ return url_object .geturl ()
136
100
137
101
@staticmethod
138
102
def get_signature_timestamp (seconds : int = None ) -> int :
@@ -165,9 +129,17 @@ def get_signature(private_key, url, url_endpoint, expiry_timestamp) -> str:
165
129
create signature(hashed hex key) from
166
130
private_key, url, url_endpoint and expiry_timestamp
167
131
"""
132
+ # ensure url_endpoint has a trailing slash
133
+ if url_endpoint [- 1 ] != '/' :
134
+ url_endpoint += '/'
135
+
136
+ if isinstance (expiry_timestamp , int ) and expiry_timestamp < 1 :
137
+ expiry_timestamp = DEFAULT_TIMESTAMP
138
+
168
139
replaced_url = url .replace (url_endpoint , "" ) + str (expiry_timestamp )
140
+
169
141
signature = hmac .new (
170
- key = replaced_url .encode (), msg = private_key .encode (), digestmod = hashlib .sha1
142
+ key = private_key .encode (), msg = replaced_url .encode (), digestmod = hashlib .sha1
171
143
)
172
144
return signature .hexdigest ()
173
145
@@ -220,6 +192,7 @@ def transformation_to_str(transformation):
220
192
)
221
193
)
222
194
223
- parsed_transforms .append (TRANSFORM_DELIMITER .join (parsed_transform_step ))
195
+ parsed_transforms .append (
196
+ TRANSFORM_DELIMITER .join (parsed_transform_step ))
224
197
225
198
return CHAIN_TRANSFORM_DELIMITER .join (parsed_transforms )
0 commit comments