Skip to content

Commit 14fd8fe

Browse files
authored
Stability and determinism improvements (#140)
* Stability and logistical improvements: 1. Better timing mechanism via instana.util.every - avoids time drift - skip late executions (if runs become backed up) 2. Add a new FSM state: agent ready to determine when host agent is ready to accept data (allows for #3) 3. Force snapshot reporting immediately once agent is ready to accept data This fixes: - Entity queue full when metric reporting may be backed up - Sending initial spans without Infrastructure snapshot data. * Fix return value and log level * Rename the state machine
1 parent 6dbcefc commit 14fd8fe

File tree

5 files changed

+86
-28
lines changed

5 files changed

+86
-28
lines changed

instana/agent.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from .agent_const import (AGENT_DATA_PATH, AGENT_DEFAULT_HOST,
1212
AGENT_DEFAULT_PORT, AGENT_DISCOVERY_PATH,
1313
AGENT_HEADER, AGENT_RESPONSE_PATH, AGENT_TRACES_PATH)
14-
from .fsm import Fsm
14+
from .fsm import TheMachine
1515
from .log import logger
1616
from .sensor import Sensor
1717

@@ -28,7 +28,7 @@ class Agent(object):
2828
sensor = None
2929
host = AGENT_DEFAULT_HOST
3030
port = AGENT_DEFAULT_PORT
31-
fsm = None
31+
machine = None
3232
from_ = From()
3333
last_seen = None
3434
last_fork_check = None
@@ -41,7 +41,7 @@ class Agent(object):
4141
def __init__(self):
4242
logger.debug("initializing agent")
4343
self.sensor = Sensor(self)
44-
self.fsm = Fsm(self)
44+
self.machine = TheMachine(self)
4545

4646
def start(self, e):
4747
""" Starts the agent and required threads """
@@ -73,7 +73,7 @@ def can_send(self):
7373
self.handle_fork()
7474
return False
7575

76-
if self.fsm.fsm.current == "good2go":
76+
if self.machine.fsm.current == "good2go":
7777
return True
7878

7979
return False
@@ -99,7 +99,7 @@ def set_from(self, json_string):
9999
def reset(self):
100100
self.last_seen = None
101101
self.from_ = From()
102-
self.fsm.reset()
102+
self.machine.reset()
103103

104104
def handle_fork(self):
105105
"""
@@ -151,6 +151,19 @@ def announce(self, discovery):
151151
finally:
152152
return response
153153

154+
def is_agent_ready(self):
155+
"""
156+
Used after making a successful announce to test when the agent is ready to accept data.
157+
"""
158+
try:
159+
response = self.client.head(self.__data_url(), timeout=0.8)
160+
161+
if response.status_code is 200:
162+
return True
163+
return False
164+
except (requests.ConnectTimeout, requests.ConnectionError):
165+
logger.debug("is_agent_ready: host agent connection error")
166+
154167
def report_data(self, entity_data):
155168
"""
156169
Used to report entity data (metrics & snapshot) to the host agent.
@@ -162,6 +175,8 @@ def report_data(self, entity_data):
162175
headers={"Content-Type": "application/json"},
163176
timeout=0.8)
164177

178+
# logger.warn("report_data: response.status_code is %s" % response.status_code)
179+
165180
if response.status_code is 200:
166181
self.last_seen = datetime.now()
167182
except (requests.ConnectTimeout, requests.ConnectionError):
@@ -179,6 +194,9 @@ def report_traces(self, spans):
179194
data=self.to_json(spans),
180195
headers={"Content-Type": "application/json"},
181196
timeout=0.8)
197+
198+
# logger.warn("report_traces: response.status_code is %s" % response.status_code)
199+
182200
if response.status_code is 200:
183201
self.last_seen = datetime.now()
184202
except (requests.ConnectTimeout, requests.ConnectionError):

instana/fsm.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import sys
88
import threading as t
99

10-
import fysom as f
10+
from fysom import Fysom
1111
import pkg_resources
1212

1313
from .agent_const import AGENT_DEFAULT_HOST, AGENT_DEFAULT_PORT
@@ -35,7 +35,7 @@ def to_dict(self):
3535
return kvs
3636

3737

38-
class Fsm(object):
38+
class TheMachine(object):
3939
RETRY_PERIOD = 30
4040

4141
agent = None
@@ -55,15 +55,17 @@ def __init__(self, agent):
5555
logger.debug("initializing fsm")
5656

5757
self.agent = agent
58-
self.fsm = f.Fysom({
58+
self.fsm = Fysom({
5959
"events": [
6060
("lookup", "*", "found"),
6161
("announce", "found", "announced"),
62-
("ready", "announced", "good2go")],
62+
("pending", "announced", "wait4init"),
63+
("ready", "wait4init", "good2go")],
6364
"callbacks": {
6465
"onlookup": self.lookup_agent_host,
6566
"onannounce": self.announce_sensor,
66-
"onready": self.agent.start,
67+
"onpending": self.agent.start,
68+
"onready": self.on_ready,
6769
"onchangestate": self.printstatechange}})
6870

6971
self.timer = t.Timer(5, self.fsm.lookup)
@@ -79,6 +81,7 @@ def reset(self):
7981
self.fsm.lookup()
8082

8183
def lookup_agent_host(self, e):
84+
logger.debug("lookup_agent_host")
8285
host, port = self.__get_agent_host_port()
8386

8487
if self.agent.is_agent_listening(host, port):
@@ -95,7 +98,7 @@ def lookup_agent_host(self, e):
9598
self.fsm.announce()
9699
return True
97100

98-
if (self.warnedPeriodic is False):
101+
if self.warnedPeriodic is False:
99102
logger.warn("Instana Host Agent couldn't be found. Will retry periodically...")
100103
self.warnedPeriodic = True
101104

@@ -143,9 +146,8 @@ def announce_sensor(self, e):
143146

144147
if response and (response.status_code is 200) and (len(response.content) > 2):
145148
self.agent.set_from(response.content)
146-
self.fsm.ready()
147-
logger.info("Host agent available. We're in business. Announced pid: %s (true pid: %s)" %
148-
(str(pid), str(self.agent.from_.pid)))
149+
self.fsm.pending()
150+
logger.debug("Announced pid: %s (true pid: %s) Waiting for Agent Ready" % (str(pid), str(self.agent.from_.pid)))
149151
return True
150152
else:
151153
logger.debug("Cannot announce sensor. Scheduling retry.")
@@ -159,6 +161,10 @@ def schedule_retry(self, fun, e, name):
159161
self.timer.name = name
160162
self.timer.start()
161163

164+
def on_ready(self, e):
165+
logger.info("Host agent available. We're in business. Announced pid: %s (true pid: %s)" %
166+
(str(os.getpid()), str(self.agent.from_.pid)))
167+
162168
def __get_real_pid(self):
163169
"""
164170
Attempts to determine the true process ID by querying the

instana/meter.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@
66
import resource
77
import sys
88
import threading
9-
import time
109
from types import ModuleType
1110

1211
from pkg_resources import DistributionNotFound, get_distribution
1312

1413
from .log import logger
15-
from .util import get_py_source, package_version
14+
from .util import get_py_source, package_version, every
1615

1716

1817
class Snapshot(object):
@@ -106,16 +105,16 @@ def to_dict(self):
106105

107106
class Meter(object):
108107
SNAPSHOT_PERIOD = 600
109-
snapshot_countdown = 5
108+
snapshot_countdown = 0
110109

111110
# The agent that this instance belongs to
112111
agent = None
113112

114113
last_usage = None
115114
last_collect = None
116115
last_metrics = None
117-
last_data_report_status = None
118116
djmw = None
117+
thr = None
119118

120119
# A True value signals the metric reporting thread to shutdown
121120
_shutdown = False
@@ -136,7 +135,7 @@ def reset(self):
136135
self.last_usage = None
137136
self.last_collect = None
138137
self.last_metrics = None
139-
self.snapshot_countdown = 5
138+
self.snapshot_countdown = 0
140139
self.run()
141140

142141
def collect_and_report(self):
@@ -145,22 +144,33 @@ def collect_and_report(self):
145144
collect and report entity data every 1 second.
146145
"""
147146
logger.debug("Metric reporting thread is now alive")
148-
while 1:
147+
148+
def metric_work():
149149
self.process()
150150
if self.agent.is_timed_out():
151151
logger.warn("Host agent offline for >1 min. Going to sit in a corner...")
152152
self.agent.reset()
153-
break
154-
time.sleep(1)
153+
return False
154+
return True
155+
156+
every(1, metric_work, "Metrics Collection")
155157

156158
def process(self):
157159
""" Collects, processes & reports metrics """
160+
if self.agent.machine.fsm.current is "wait4init":
161+
# Test the host agent if we're ready to send data
162+
if self.agent.is_agent_ready():
163+
self.agent.machine.fsm.ready()
164+
else:
165+
return
166+
158167
if self.agent.can_send():
159168
self.snapshot_countdown = self.snapshot_countdown - 1
160169
ss = None
161170
cm = self.collect_metrics()
162171

163-
if self.snapshot_countdown < 1 and self.last_data_report_status is 200:
172+
if self.snapshot_countdown < 1:
173+
logger.debug("Sending process snapshot data")
164174
self.snapshot_countdown = self.SNAPSHOT_PERIOD
165175
ss = self.collect_snapshot()
166176
md = copy.deepcopy(cm).delta_data(None)
@@ -171,8 +181,6 @@ def process(self):
171181
response = self.agent.report_data(ed)
172182

173183
if response:
174-
self.last_data_report_status = response.status_code
175-
176184
if response.status_code is 200 and len(response.content) > 2:
177185
# The host agent returned something indicating that is has a request for us that we
178186
# need to process.

instana/recorder.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import socket
55
import sys
66
import threading as t
7-
import time
87

98
import opentracing.ext.tags as ext
109
from basictracer import Sampler, SpanRecorder
@@ -15,6 +14,7 @@
1514
RabbitmqData, RedisData, RPCData, SDKData, SoapData,
1615
SQLAlchemyData)
1716
from .log import logger
17+
from .util import every
1818

1919
if sys.version_info.major is 2:
2020
import Queue as queue
@@ -37,6 +37,8 @@ class InstanaRecorder(SpanRecorder):
3737

3838
queue = queue.Queue()
3939

40+
timer = None
41+
4042
def __init__(self):
4143
super(InstanaRecorder, self).__init__()
4244

@@ -50,13 +52,16 @@ def run(self):
5052
def report_spans(self):
5153
""" Periodically report the queued spans """
5254
logger.debug("Span reporting thread is now alive")
53-
while 1:
55+
56+
def span_work():
5457
queue_size = self.queue.qsize()
5558
if queue_size > 0 and instana.singletons.agent.can_send():
5659
response = instana.singletons.agent.report_traces(self.queued_spans())
5760
if response:
5861
logger.debug("reported %d spans" % queue_size)
59-
time.sleep(1)
62+
return True
63+
64+
every(2, span_work, "Span Reporting")
6065

6166
def queue_size(self):
6267
""" Return the size of the queue; how may spans are queued, """

instana/util.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,5 +231,26 @@ def get_py_source(file):
231231
return response
232232

233233

234+
def every(delay, task, name):
235+
"""
236+
Executes a task every `delay` seconds
237+
238+
:param delay: the delay in seconds
239+
:param task: the method to run. The method should return False if you want the loop to stop.
240+
:return: None
241+
"""
242+
next_time = time.time() + delay
243+
244+
while True:
245+
time.sleep(max(0, next_time - time.time()))
246+
try:
247+
if task() is False:
248+
break
249+
except Exception:
250+
logger.debug("Problem while executing repetitive task: %s" % name, exc_info=True)
251+
252+
# skip tasks if we are behind schedule:
253+
next_time += (time.time() - next_time) // delay * delay + delay
254+
234255
# Used by get_py_source
235256
regexp_py = re.compile('\.py$')

0 commit comments

Comments
 (0)