Skip to content

Commit a5cca92

Browse files
adding async handling in bulk send
1 parent 58cd557 commit a5cca92

File tree

3 files changed

+61
-34
lines changed

3 files changed

+61
-34
lines changed

socketlabs/injectionapi/core/retryhandler.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22
from http.client import HTTPException
33
from ..retrysettings import RetrySettings
44
from .httprequest import HttpRequest
5-
from .injectionresponseparser import InjectionResponseParser
65
from .serialization.injectionrequest import InjectionRequest
7-
import sys
86
import socket
97
import time
108

11-
class RetryHandler(object):
129

10+
class RetryHandler(object):
1311
attempts = 0
1412
ErrorStatusCodes = [
1513
HTTPStatus.INTERNAL_SERVER_ERROR,
@@ -26,7 +24,7 @@ def __init__(self, http_client: HttpRequest, settings: RetrySettings):
2624

2725
self.__http_client = http_client
2826
self.__retry_settings = settings
29-
27+
3028
def send(self, body):
3129

3230
if self.__retry_settings.maximum_number_of_retries == 0:
@@ -43,24 +41,24 @@ def send(self, body):
4341
raise HTTPException("HttpStatusCode: {0}. Response contains server error.".format(response.status))
4442

4543
return response
46-
44+
4745
except socket.timeout:
4846

4947
self.attempts += 1
5048

5149
if self.attempts > self.__retry_settings.maximum_number_of_retries:
5250
raise socket.timeout
5351
time.sleep(wait_interval.seconds)
54-
52+
5553
except HTTPException:
5654

5755
self.attempts += 1
58-
56+
5957
if self.attempts > self.__retry_settings.maximum_number_of_retries:
6058
raise HTTPException
6159
time.sleep(wait_interval.seconds)
62-
63-
def send_async( self, request: InjectionRequest, on_success_callback, on_error_callback):
60+
61+
def send_async(self, request: InjectionRequest, on_success_callback, on_error_callback):
6462

6563
wait_interval = self.__retry_settings.get_next_wait_interval(self.attempts)
6664

@@ -89,4 +87,3 @@ def on_error(exception):
8987
on_error_callback(exception)
9088

9189
self.__http_client.send_async_request(request, on_success, on_error)
92-
Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,46 @@
11
from datetime import timedelta
22
import random
33

4-
class RetrySettings(object):
54

5+
class RetrySettings(object):
66
__default_number_of_retries = 0
77
__maximum_allowed_number_of_retries = 5
88
__minimum_retry_time = timedelta(seconds=1)
99
__maximum_retry_time = timedelta(seconds=10)
1010

11-
def __init__(self, maximum_retries = None):
11+
def __init__(self, maximum_retries=None):
1212

1313
if maximum_retries:
1414

1515
if maximum_retries < 0:
1616
raise AttributeError("maximumNumberOfRetries must be greater than 0")
1717

1818
if maximum_retries > self.__maximum_allowed_number_of_retries:
19-
raise AttributeError("The maximum number of allowed retries is ", self.__maximum_allowed_number_of_retries)
19+
raise AttributeError("The maximum number of allowed retries is ",
20+
self.__maximum_allowed_number_of_retries)
2021

2122
self.__maximum_number_of_retries = maximum_retries
22-
23+
2324
else:
2425
self.__maximum_number_of_retries = self.__default_number_of_retries
25-
26+
2627
@property
2728
def maximum_number_of_retries(self):
2829
return self.__maximum_number_of_retries
29-
30+
3031
def get_next_wait_interval(self, number_of_attempts):
31-
32+
3233
interval = int(min(
3334
((self.__minimum_retry_time.seconds * 1000) + self.get_retry_delta(number_of_attempts)),
3435
(self.__maximum_retry_time.seconds * 1000)
35-
))
36-
36+
))
37+
3738
return timedelta(milliseconds=interval)
38-
39-
def get_retry_delta(self, number_of_attempts):
40-
41-
minimum = (int)((timedelta(seconds=1).seconds * 1000) * 0.8)
42-
maximum = (int)((timedelta(seconds=1).seconds * 1000) * 1.2)
4339

44-
return (int)((pow(2.0, number_of_attempts) - 1.0) * random.randint(minimum, maximum))
40+
@staticmethod
41+
def get_retry_delta(number_of_attempts):
42+
43+
minimum = int((timedelta(seconds=1).seconds * 1000) * 0.8)
44+
maximum = int((timedelta(seconds=1).seconds * 1000) * 1.2)
45+
46+
return int((pow(2.0, number_of_attempts) - 1.0) * random.randint(minimum, maximum))

socketlabs/injectionapi/socketlabsclient.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,13 @@ def __send_basic_message(self, message: BasicMessage):
122122
req_factory = InjectionRequestFactory(self._server_id, self._api_key)
123123
body = req_factory.generate_request(message)
124124

125-
request = self.__build_http_request()
126-
retry_handler = RetryHandler(request, RetrySettings(self.number_of_retries))
127-
result = retry_handler.send(body)
125+
retry_handler = RetryHandler(self.__build_http_request(), RetrySettings(self.number_of_retries))
126+
response = retry_handler.send(body)
127+
128+
data = response.read().decode("utf-8")
129+
response_code = response.status
130+
result = InjectionResponseParser.parse(data, response_code)
131+
128132
return result
129133

130134
def __send_bulk_message(self, message: BulkMessage):
@@ -142,12 +146,13 @@ def __send_bulk_message(self, message: BulkMessage):
142146
req_factory = InjectionRequestFactory(self._server_id, self._api_key)
143147
body = req_factory.generate_request(message)
144148

145-
request = self.__build_http_request()
146-
retry_handler = RetryHandler(request, RetrySettings(self.number_of_retries))
149+
retry_handler = RetryHandler(self.__build_http_request(), RetrySettings(self.number_of_retries))
147150
response = retry_handler.send(body)
151+
148152
data = response.read().decode("utf-8")
149153
response_code = response.status
150154
result = InjectionResponseParser.parse(data, response_code)
155+
151156
return result
152157

153158
def send_async(self, message: BasicMessage, on_success, on_error):
@@ -184,16 +189,21 @@ def __send_basic_message_async(self, message: BasicMessage, on_success, on_error
184189
req_factory = InjectionRequestFactory(self._server_id, self._api_key)
185190
body = req_factory.generate_request(message)
186191

187-
request = self.__build_http_request()
188-
retry_handler = RetryHandler(request, RetrySettings(self.number_of_retries))
189-
def on_sucess_callback(response):
192+
retry_handler = RetryHandler(self.__build_http_request(), RetrySettings(self.number_of_retries))
193+
194+
def on_success_callback(response):
195+
response = retry_handler.send(body)
196+
190197
data = response.read().decode("utf-8")
191198
response_code = response.status
192199
result = InjectionResponseParser.parse(data, response_code)
200+
193201
on_success(result)
202+
194203
def on_error_callback(exception):
195204
on_error(exception)
196-
retry_handler.send_async(body, on_sucess_callback, on_error_callback)
205+
206+
retry_handler.send_async(body, on_success_callback, on_error_callback)
197207

198208
def __send_bulk_message_async(self, message: BulkMessage, on_success, on_error):
199209
"""
@@ -212,9 +222,27 @@ def __send_bulk_message_async(self, message: BulkMessage, on_success, on_error):
212222
req_factory = InjectionRequestFactory(self._server_id, self._api_key)
213223
body = req_factory.generate_request(message)
214224

225+
retry_handler = RetryHandler(self.__build_http_request(), RetrySettings(self.number_of_retries))
226+
227+
def on_success_callback(response):
228+
response = retry_handler.send(body)
229+
230+
data = response.read().decode("utf-8")
231+
response_code = response.status
232+
result = InjectionResponseParser.parse(data, response_code)
233+
234+
on_success(result)
235+
236+
def on_error_callback(exception):
237+
on_error(exception)
238+
239+
retry_handler.send_async(body, on_success_callback, on_error_callback)
240+
241+
"""
215242
request = self.__build_http_request()
216243
retry_handler = RetryHandler(request, RetrySettings(self.number_of_retries))
217244
retry_handler.send_async(body, on_success, on_error)
245+
"""
218246

219247
def __validate_basic_message(self, message: BasicMessage):
220248
"""

0 commit comments

Comments
 (0)