Skip to content

Commit d0f5694

Browse files
updated retry in send async
1 parent f8ceabd commit d0f5694

File tree

4 files changed

+99
-27
lines changed

4 files changed

+99
-27
lines changed

python-examples/basic/basic_send_with_retry.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,23 @@
55
from socketlabs.injectionapi.proxy import Proxy
66
from socketlabs.injectionapi.message.__imports__ import BasicMessage, EmailAddress
77

8+
def on_success(response):
9+
"""
10+
Handle the success response from the client
11+
:param response: the SendResponse
12+
:return: SendResponse
13+
"""
14+
print(json.dumps(response.to_json(), indent=2))
15+
16+
17+
def on_error(exception):
18+
"""
19+
Handle the error response from the client
20+
:param exception: the Exception
21+
:return: Exception
22+
"""
23+
print(exception)
24+
825
# build the message
926
message = BasicMessage()
1027

@@ -33,5 +50,6 @@
3350

3451
# send the message
3552
response = client.send(message)
53+
client.send_async(message, on_success, on_error)
3654

3755
print(json.dumps(response.to_json(), indent=2))

socketlabs/injectionapi/core/httprequest.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,26 +106,21 @@ def send_async_request(self, request: InjectionRequest, on_success_callback, on_
106106
:param on_error_callback: the callback method for error
107107
:type on_error_callback: method
108108
"""
109-
req_queue = queue.Queue()
109+
110110
try:
111111

112112
th = threading.Thread(target=self.__queue_request,
113113
kwargs={
114114
"request": request,
115-
"out_queue": req_queue
115+
"on_success_callback": on_success_callback,
116+
"on_error_callback": on_error_callback
116117
})
117118
th.start()
118-
th.join()
119-
120-
while not th.is_alive():
121-
response = req_queue.get()
122-
on_success_callback(response)
123-
break
124-
119+
125120
except Exception as e:
126121
on_error_callback(e)
127122

128-
def __queue_request(self, request: InjectionRequest, out_queue):
123+
def __queue_request(self, request: InjectionRequest, on_success_callback, on_error_callback):
129124
"""
130125
queue method for the threaded send request.
131126
:param request: the injection request to send
@@ -135,9 +130,10 @@ def __queue_request(self, request: InjectionRequest, out_queue):
135130
"""
136131
try:
137132
response = self.send_request(request)
138-
out_queue.put(response)
133+
on_success_callback(response)
134+
139135
except Exception:
140-
raise
136+
on_error_callback(sys.exc_info()[0])
141137

142138
def send_request(self, request: InjectionRequest):
143139
"""
@@ -155,14 +151,14 @@ def send_request(self, request: InjectionRequest):
155151

156152
json_body = json.dumps(request.to_json())
157153

158-
connection = self.__get_connection()
159-
connection.request("POST", self._endpoint.url, json_body, headers)
160-
response = connection.getresponse()
154+
try:
155+
connection = self.__get_connection()
156+
connection.request("POST", self._endpoint.url, json_body, headers)
157+
response = connection.getresponse()
161158

162-
# data = response.read().decode("utf-8")
163-
# response_code = response.status
159+
except Exception as e:
160+
raise e
164161

165-
# result = InjectionResponseParser.parse(data, response_code)
166162
return response
167163

168164
def __get_connection(self):
Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
11
from http import HTTPStatus
2+
from http.client import HTTPException
23
from ..retrysettings import RetrySettings
34
from .httprequest import HttpRequest
45
from .injectionresponseparser import InjectionResponseParser
6+
from .serialization.injectionrequest import InjectionRequest
57
import sys
68
import socket
79
import time
810

911
class RetryHandler(object):
1012

13+
attempts = 0
1114
ErrorStatusCodes = [
1215
HTTPStatus.INTERNAL_SERVER_ERROR,
1316
HTTPStatus.BAD_GATEWAY,
1417
HTTPStatus.SERVICE_UNAVAILABLE,
1518
HTTPStatus.GATEWAY_TIMEOUT
1619
]
20+
Exceptions = [
21+
socket.timeout,
22+
HTTPException
23+
]
1724

1825
def __init__(self, http_client: HttpRequest, settings: RetrySettings):
1926

@@ -23,17 +30,23 @@ def __init__(self, http_client: HttpRequest, settings: RetrySettings):
2330
def send(self, body):
2431

2532
if self.__retry_settings.maximum_number_of_retries == 0:
26-
return self.__http_client.send_request(body)
2733

28-
attempts = 0
34+
response = self.__http_client.send_request(body)
35+
data = response.read().decode("utf-8")
36+
response_code = response.status
37+
result = InjectionResponseParser.parse(data, response_code)
38+
39+
return result
40+
2941
while True:
30-
wait_interval = self.__retry_settings.get_next_wait_interval(attempts)
42+
wait_interval = self.__retry_settings.get_next_wait_interval(self.attempts)
3143

3244
try:
45+
3346
response = self.__http_client.send_request(body)
3447

3548
if response.status in self.ErrorStatusCodes:
36-
raise Exception("Server Error")
49+
raise HTTPException
3750

3851
data = response.read().decode("utf-8")
3952
response_code = response.status
@@ -42,9 +55,52 @@ def send(self, body):
4255
return result
4356

4457
except socket.timeout:
45-
attempts += 1
46-
print("Retry Attempt : ", sys.exc_info()[0])
47-
if attempts > self.__retry_settings.maximum_number_of_retries:
58+
59+
self.attempts += 1
60+
61+
if self.attempts > self.__retry_settings.maximum_number_of_retries:
4862
raise socket.timeout
4963
time.sleep(wait_interval.seconds)
64+
65+
except HTTPException:
66+
67+
self.attempts += 1
68+
69+
if self.attempts > self.__retry_settings.maximum_number_of_retries:
70+
raise HTTPException
71+
time.sleep(wait_interval.seconds)
5072

73+
def send_async( self, request: InjectionRequest, on_success_callback, on_error_callback):
74+
75+
wait_interval = self.__retry_settings.get_next_wait_interval(self.attempts)
76+
77+
def on_success(response):
78+
79+
if response.status in self.ErrorStatusCodes and self.attempts < self.__retry_settings.maximum_number_of_retries:
80+
81+
self.attempts += 1
82+
time.sleep(wait_interval.seconds)
83+
self.send_async(request, on_success_callback, on_error_callback)
84+
85+
else:
86+
87+
data = response.read().decode("utf-8")
88+
response_code = response.status
89+
result = InjectionResponseParser.parse(data, response_code)
90+
on_success_callback(result)
91+
92+
def on_error(exception):
93+
94+
if exception in self.Exceptions and self.attempts < self.__retry_settings.maximum_number_of_retries:
95+
96+
self.attempts += 1
97+
time.sleep(wait_interval.seconds)
98+
self.send_async(request, on_success_callback, on_error_callback)
99+
100+
else:
101+
102+
self.attempts = self.__retry_settings.maximum_number_of_retries + 1
103+
on_error_callback(exception)
104+
105+
self.__http_client.send_async_request(request, on_success, on_error)
106+

socketlabs/injectionapi/socketlabsclient.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ def __send_basic_message_async(self, message: BasicMessage, on_success, on_error
181181
body = req_factory.generate_request(message)
182182

183183
request = self.__build_http_request()
184-
request.send_async_request(body, on_success, on_error)
184+
retry_handler = RetryHandler(request, RetrySettings(self.number_of_retries))
185+
retry_handler.send_async(body, on_success, on_error)
185186

186187
def __send_bulk_message_async(self, message: BulkMessage, on_success, on_error):
187188
"""
@@ -201,7 +202,8 @@ def __send_bulk_message_async(self, message: BulkMessage, on_success, on_error):
201202
body = req_factory.generate_request(message)
202203

203204
request = self.__build_http_request()
204-
request.send_async_request(body, on_success, on_error)
205+
retry_handler = RetryHandler(request, RetrySettings(self.number_of_retries))
206+
retry_handler.send_async(body, on_success, on_error)
205207

206208
def __validate_basic_message(self, message: BasicMessage):
207209
"""

0 commit comments

Comments
 (0)