Skip to content

Commit 6a9ba07

Browse files
author
Peter Giacomo Lombardo
authored
Collector: Thread safeties (#277)
* Remove unused variable * Add Collector thread launching safeties * Better thread shutdown checks * Fix warning message
1 parent 366ea6a commit 6a9ba07

File tree

3 files changed

+58
-25
lines changed

3 files changed

+58
-25
lines changed

instana/agent/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
class BaseAgent(object):
1010
""" Base class for all agent flavors """
1111
client = None
12-
sensor = None
1312
options = None
1413

1514
def __init__(self):

instana/collector/base.py

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ def __init__(self, agent):
2525
# The agent for this process. Can be Standard, AWSLambda or Fargate
2626
self.agent = agent
2727

28+
# The name assigned to the spawned thread
29+
self.THREAD_NAME = "Instana Collector"
30+
2831
# The Queue where we store finished spans before they are sent
2932
self.span_queue = queue.Queue()
3033

@@ -50,30 +53,56 @@ def __init__(self, agent):
5053
# Reporting interval for the background thread(s)
5154
self.report_interval = 1
5255

56+
# Flag to indicate if start/shutdown state
57+
self.started = False
58+
59+
def is_reporting_thread_running(self):
60+
"""
61+
Indicates if there is a thread running with the name self.THREAD_NAME
62+
"""
63+
for thread in threading.enumerate():
64+
if thread.name == self.THREAD_NAME:
65+
return True
66+
return False
67+
5368
def start(self):
5469
"""
5570
Starts the collector and starts reporting as long as the agent is in a ready state.
5671
@return: None
5772
"""
73+
if self.is_reporting_thread_running():
74+
if self.thread_shutdown.is_set():
75+
# Shutdown still in progress; Reschedule this start in 5 seconds from now
76+
timer = threading.Timer(5, self.start)
77+
timer.daemon = True
78+
timer.name = "Collector Timed Start"
79+
timer.start()
80+
return
81+
logger.debug("Collecter.start non-fatal: call but thread already running (started: %s)", self.started)
82+
return
83+
5884
if self.agent.can_send():
5985
logger.debug("BaseCollector.start: launching collection thread")
6086
self.thread_shutdown.clear()
6187
self.reporting_thread = threading.Thread(target=self.thread_loop, args=())
6288
self.reporting_thread.setDaemon(True)
89+
self.reporting_thread.setName(self.THREAD_NAME)
6390
self.reporting_thread.start()
91+
self.started = True
6492
else:
65-
logger.warning("BaseCollector.start: the agent tells us we can't send anything out.")
93+
logger.warning("BaseCollector.start: the agent tells us we can't send anything out")
6694

6795
def shutdown(self, report_final=True):
6896
"""
69-
Shuts down the collector and reports any final data.
97+
Shuts down the collector and reports any final data (if possible).
98+
e.g. If the host agent disappeared, we won't be able to report final data.
7099
@return: None
71100
"""
72101
logger.debug("Collector.shutdown: Reporting final data.")
73102
self.thread_shutdown.set()
74-
75103
if report_final is True:
76104
self.prepare_and_report_data()
105+
self.started = False
77106

78107
def thread_loop(self):
79108
"""
@@ -90,15 +119,31 @@ def background_report(self):
90119
if self.thread_shutdown.is_set():
91120
logger.debug("Thread shutdown signal is active: Shutting down reporting thread")
92121
return False
93-
return self.prepare_and_report_data()
94122

95-
def should_send_snapshot_data(self):
123+
self.prepare_and_report_data()
124+
125+
if self.thread_shutdown.is_set():
126+
logger.debug("Thread shutdown signal is active: Shutting down reporting thread")
127+
return False
128+
129+
return True
130+
131+
def prepare_and_report_data(self):
96132
"""
97-
Determines if snapshot data should be sent
133+
Prepare and report the data payload.
98134
@return: Boolean
99135
"""
100-
logger.debug("BaseCollector: should_send_snapshot_data needs to be overridden")
101-
return False
136+
if env_is_test is False:
137+
lock_acquired = self.background_report_lock.acquire(False)
138+
if lock_acquired:
139+
try:
140+
payload = self.prepare_payload()
141+
self.agent.report_data_payload(payload)
142+
finally:
143+
self.background_report_lock.release()
144+
else:
145+
logger.debug("prepare_and_report_data: Couldn't acquire lock")
146+
return True
102147

103148
def prepare_payload(self):
104149
"""
@@ -108,24 +153,13 @@ def prepare_payload(self):
108153
logger.debug("BaseCollector: prepare_payload needs to be overridden")
109154
return DictionaryOfStan()
110155

111-
def prepare_and_report_data(self):
156+
def should_send_snapshot_data(self):
112157
"""
113-
Prepare and report the data payload.
158+
Determines if snapshot data should be sent
114159
@return: Boolean
115160
"""
116-
if env_is_test is True:
117-
return True
118-
119-
lock_acquired = self.background_report_lock.acquire(False)
120-
if lock_acquired:
121-
try:
122-
payload = self.prepare_payload()
123-
self.agent.report_data_payload(payload)
124-
finally:
125-
self.background_report_lock.release()
126-
else:
127-
logger.debug("prepare_and_report_data: Couldn't acquire lock")
128-
return True
161+
logger.debug("BaseCollector: should_send_snapshot_data needs to be overridden")
162+
return False
129163

130164
def collect_snapshot(self, *argv, **kwargs):
131165
logger.debug("BaseCollector: collect_snapshot needs to be overridden")

instana/collector/host.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,6 @@ def prepare_payload(self):
7777
if with_snapshot is True:
7878
self.snapshot_data_last_sent = int(time())
7979
except Exception:
80-
logger.debug("collect_snapshot error", exc_info=True)
80+
logger.debug("non-fatal prepare_payload:", exc_info=True)
8181

8282
return payload

0 commit comments

Comments
 (0)