Skip to content

Commit 45848ed

Browse files
authored
Better fork detection & handling (#156)
* Better os.fork() handling * improved load conditions * Add harmless exception handling * Fix comment of explanation * Better; smarter; stronger Fork handling
1 parent d87a8c5 commit 45848ed

File tree

8 files changed

+146
-60
lines changed

8 files changed

+146
-60
lines changed

instana/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,11 @@ def boot_agent():
9595
do_not_load_list = ["pip", "pip2", "pip3", "pipenv", "docker-compose", "easy_install", "easy_install-2.7",
9696
"smtpd.py", "ufw", "unattended-upgrade"]
9797

98-
if os.path.basename(sys.argv[0]) in do_not_load_list:
98+
# There are cases when sys.argv may not be defined at load time. Seems to happen in embedded Python,
99+
# and some Pipenv installs. If this is the case, it's best effort.
100+
if hasattr(sys, 'argv') and len(sys.argv) > 0 and (os.path.basename(sys.argv[0]) in do_not_load_list):
99101
if "INSTANA_DEBUG" in os.environ:
100-
print("Instana: No use in monitoring this process type (%s). Will go sit in a corner quietly.", sys.argv[0])
102+
print("Instana: No use in monitoring this process type (%s). Will go sit in a corner quietly." % os.path.basename(sys.argv[0]))
101103
else:
102104
if "INSTANA_MAGIC" in os.environ:
103105
# If we're being loaded into an already running process, then delay agent initialization

instana/agent.py

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import json
44
import os
55
from datetime import datetime
6-
6+
from threading import Timer
77
import requests
88

99
import instana.singletons
@@ -25,6 +25,16 @@ def __init__(self, **kwds):
2525

2626

2727
class Agent(object):
28+
"""
29+
The Agent class is the central controlling entity for the Instana Python language sensor. The key
30+
parts it handles are the announce state and the collection and reporting of metrics and spans to the
31+
Instana Host agent.
32+
33+
To do this, there are 3 major components to this class:
34+
1. TheMachine - finite state machine related to announce state
35+
2. Sensor -> Meter - metric collection and reporting
36+
3. Tracer -> Recorder - span queueing and reporting
37+
"""
2838
sensor = None
2939
host = AGENT_DEFAULT_HOST
3040
port = AGENT_DEFAULT_PORT
@@ -45,9 +55,27 @@ def __init__(self):
4555

4656
def start(self, e):
4757
""" Starts the agent and required threads """
48-
logger.debug("Spawning metric & trace reporting threads")
49-
self.sensor.meter.run()
50-
instana.singletons.tracer.recorder.run()
58+
logger.debug("Spawning metric & span reporting threads")
59+
self.sensor.start()
60+
instana.singletons.tracer.recorder.start()
61+
62+
def handle_fork(self):
63+
"""
64+
Forks happen. Here we handle them. Affected components are the singletons: Agent, Sensor & Tracers
65+
"""
66+
# Reset the Agent
67+
self.reset()
68+
69+
# Ask the sensor to handle the fork
70+
self.sensor.handle_fork()
71+
72+
# Ask the tracer to handle the fork
73+
instana.singletons.tracer.handle_fork()
74+
75+
def reset(self):
76+
self.last_seen = None
77+
self.from_ = From()
78+
self.machine.reset()
5179

5280
def to_json(self, o):
5381
def extractor(o):
@@ -66,10 +94,11 @@ def is_timed_out(self):
6694
return False
6795

6896
def can_send(self):
69-
# Watch for pid change in the case of ; if so, re-announce
97+
# Watch for pid change (fork)
7098
current_pid = os.getpid()
7199
if self._boot_pid != current_pid:
72100
self._boot_pid = current_pid
101+
logger.debug("Fork detected; Handling like a pro...")
73102
self.handle_fork()
74103
return False
75104

@@ -96,19 +125,6 @@ def set_from(self, json_string):
96125

97126
self.from_ = From(pid=res_data['pid'], agentUuid=res_data['agentUuid'])
98127

99-
def reset(self):
100-
self.last_seen = None
101-
self.from_ = From()
102-
self.machine.reset()
103-
104-
def handle_fork(self):
105-
"""
106-
Forks happen. Here we handle them.
107-
"""
108-
self.reset()
109-
self.sensor.handle_fork()
110-
instana.singletons.tracer.handle_fork()
111-
112128
def is_agent_listening(self, host, port):
113129
"""
114130
Check if the Instana Agent is listening on <host> and <port>.

instana/fsm.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,20 @@ def __init__(self, agent):
6262
("pending", "announced", "wait4init"),
6363
("ready", "wait4init", "good2go")],
6464
"callbacks": {
65+
# Can add the following to debug
66+
# "onchangestate": self.print_state_change,
6567
"onlookup": self.lookup_agent_host,
6668
"onannounce": self.announce_sensor,
6769
"onpending": self.agent.start,
68-
"onready": self.on_ready,
69-
"onchangestate": self.printstatechange}})
70+
"onready": self.on_ready}})
7071

7172
self.timer = t.Timer(5, self.fsm.lookup)
7273
self.timer.daemon = True
73-
self.timer.name = "Startup"
74+
self.timer.name = "Instana Machine"
7475
self.timer.start()
7576

76-
def printstatechange(self, e):
77+
@staticmethod
78+
def print_state_change(e):
7779
logger.debug('========= (%i#%s) FSM event: %s, src: %s, dst: %s ==========' %
7880
(os.getpid(), t.current_thread().name, e.event, e.src, e.dst))
7981

instana/meter.py

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sys
88
import threading
99
from types import ModuleType
10+
from fysom import FysomError
1011

1112
from pkg_resources import DistributionNotFound, get_distribution
1213

@@ -105,16 +106,19 @@ def to_dict(self):
105106

106107
class Meter(object):
107108
SNAPSHOT_PERIOD = 600
108-
snapshot_countdown = 0
109+
THREAD_NAME = "Instana Metric Collection"
109110

110111
# The agent that this instance belongs to
111112
agent = None
112113

114+
# We send Snapshot data every 10 minutes. This is the countdown variable.
115+
snapshot_countdown = 0
116+
113117
last_usage = None
114118
last_collect = None
115119
last_metrics = None
116120
djmw = None
117-
thr = None
121+
thread = None
118122

119123
# A True value signals the metric reporting thread to shutdown
120124
_shutdown = False
@@ -123,33 +127,57 @@ def __init__(self, agent):
123127
self.agent = agent
124128
pass
125129

126-
def run(self):
127-
""" Spawns the metric reporting thread """
128-
self.thr = threading.Thread(target=self.collect_and_report)
129-
self.thr.daemon = True
130-
self.thr.name = "Instana Metric Collection"
131-
self.thr.start()
130+
def start(self):
131+
"""
132+
This function can be called at first boot or after a fork. In either case, it will
133+
assure that the Meter is in a proper state (via reset()) and spawn a new background
134+
thread to periodically report queued spans
135+
136+
Note that this will abandon any previous thread object that (in the case of an `os.fork()`)
137+
should no longer exist in the forked process.
138+
139+
(Forked processes carry forward only the thread that called `os.fork()`
140+
into the new process space. All other background threads need to be recreated.)
141+
142+
Calling this directly more than once without an actual fork will cause errors.
143+
"""
144+
self.reset()
145+
146+
if self.thread.isAlive() is False:
147+
self.thread.start()
132148

133149
def reset(self):
134150
"""" Reset the state as new """
135151
self.last_usage = None
136152
self.last_collect = None
137153
self.last_metrics = None
138154
self.snapshot_countdown = 0
155+
self.thread = None
156+
157+
# Prepare the thread for metric collection/reporting
158+
for thread in threading.enumerate():
159+
if thread.getName() == self.THREAD_NAME:
160+
# Metric thread already exists; Make sure we re-use this one.
161+
self.thread = thread
162+
163+
if self.thread is None:
164+
self.thread = threading.Thread(target=self.collect_and_report)
165+
self.thread.daemon = True
166+
self.thread.name = self.THREAD_NAME
139167

140168
def handle_fork(self):
141-
self.reset()
142-
self.run()
169+
self.start()
143170

144171
def collect_and_report(self):
145172
"""
146173
Target function for the metric reporting thread. This is a simple loop to
147174
collect and report entity data every 1 second.
148175
"""
149-
logger.debug("Metric reporting thread is now alive")
176+
logger.debug(" -> Metric reporting thread is now alive")
150177

151178
def metric_work():
152179
self.process()
180+
153181
if self.agent.is_timed_out():
154182
logger.warn("Host agent offline for >1 min. Going to sit in a corner...")
155183
self.agent.reset()
@@ -160,12 +188,17 @@ def metric_work():
160188

161189
def process(self):
162190
""" Collects, processes & reports metrics """
163-
if self.agent.machine.fsm.current is "wait4init":
164-
# Test the host agent if we're ready to send data
165-
if self.agent.is_agent_ready():
166-
self.agent.machine.fsm.ready()
167-
else:
168-
return
191+
try:
192+
if self.agent.machine.fsm.current is "wait4init":
193+
# Test the host agent if we're ready to send data
194+
if self.agent.is_agent_ready():
195+
if self.agent.machine.fsm.current is not "good2go":
196+
self.agent.machine.fsm.ready()
197+
else:
198+
return
199+
except FysomError:
200+
logger.debug('Harmless state machine thread disagreement. Will self-correct on next timer cycle.')
201+
return
169202

170203
if self.agent.can_send():
171204
self.snapshot_countdown = self.snapshot_countdown - 1

instana/recorder.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import os
44
import sys
5-
import threading as t
5+
import threading
66

77
import opentracing.ext.tags as ext
88
from basictracer import Sampler, SpanRecorder
@@ -15,13 +15,14 @@
1515
from .log import logger
1616
from .util import every
1717

18-
if sys.version_info.major is 2:
18+
if sys.version_info.major == 2:
1919
import Queue as queue
2020
else:
2121
import queue
2222

2323

2424
class InstanaRecorder(SpanRecorder):
25+
THREAD_NAME = "Instana Span Reporting"
2526
registered_spans = ("aiohttp-client", "aiohttp-server", "django", "log", "memcache", "mysql",
2627
"rabbitmq", "redis", "rpc-client", "rpc-server", "sqlalchemy", "soap",
2728
"tornado-client", "tornado-server", "urllib3", "wsgi")
@@ -35,23 +36,51 @@ class InstanaRecorder(SpanRecorder):
3536
entry_kind = ["entry", "server", "consumer"]
3637
exit_kind = ["exit", "client", "producer"]
3738

38-
queue = queue.Queue()
39-
40-
timer = None
39+
# Recorder thread for collection/reporting of spans
40+
thread = None
4141

4242
def __init__(self):
4343
super(InstanaRecorder, self).__init__()
44+
self.queue = queue.Queue()
45+
46+
def start(self):
47+
"""
48+
This function can be called at first boot or after a fork. In either case, it will
49+
assure that the Recorder is in a proper state (via reset()) and spawn a new background
50+
thread to periodically report queued spans
51+
52+
Note that this will abandon any previous thread object that (in the case of an `os.fork()`)
53+
should no longer exist in the forked process.
54+
55+
(Forked processes carry forward only the thread that called `os.fork()`
56+
into the new process space. All other background threads need to be recreated.)
57+
58+
Calling this directly more than once without an actual fork will cause errors.
59+
"""
60+
self.reset()
61+
62+
if self.thread.isAlive() is False:
63+
self.thread.start()
64+
65+
def reset(self):
66+
# Prepare the thread for metric collection/reporting
67+
for thread in threading.enumerate():
68+
if thread.getName() == self.THREAD_NAME:
69+
# Span reporting thread already exists; Make sure we re-use this one.
70+
self.thread = thread
71+
72+
# Prepare the thread for span collection/reporting
73+
if self.thread is None:
74+
self.thread = threading.Thread(target=self.report_spans)
75+
self.thread.daemon = True
76+
self.thread.name = self.THREAD_NAME
4477

45-
def run(self):
46-
""" Span a background thread to periodically report queued spans """
47-
self.timer = t.Thread(target=self.report_spans)
48-
self.timer.daemon = True
49-
self.timer.name = "Instana Span Reporting"
50-
self.timer.start()
78+
def handle_fork(self):
79+
self.start()
5180

5281
def report_spans(self):
5382
""" Periodically report the queued spans """
54-
logger.debug("Span reporting thread is now alive")
83+
logger.debug(" -> Span reporting thread is now alive")
5584

5685
def span_work():
5786
queue_size = self.queue.qsize()
@@ -69,14 +98,15 @@ def queue_size(self):
6998

7099
def queued_spans(self):
71100
""" Get all of the spans in the queue """
101+
span = None
72102
spans = []
73103
while True:
74104
try:
75-
s = self.queue.get(False)
105+
span = self.queue.get(False)
76106
except queue.Empty:
77107
break
78108
else:
79-
spans.append(s)
109+
spans.append(span)
80110
return spans
81111

82112
def clear_spans(self):

instana/sensor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ def set_options(self, options):
2323
if not self.options:
2424
self.options = Options()
2525

26+
def start(self):
27+
# Nothing to do for the Sensor; Pass onto Meter
28+
self.meter.start()
29+
2630
def handle_fork(self):
2731
# Nothing to do for the Sensor; Pass onto Meter
2832
self.meter.handle_fork()

instana/singletons.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
from .agent import Agent
55
from .tracer import InstanaTracer, InstanaRecorder
66

7-
from distutils.version import LooseVersion
8-
97

108
# The Instana Agent which carries along with it a Sensor that collects metrics.
119
agent = Agent()

instana/tracer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ def __init__(self, options=Options(), scope_manager=None, recorder=None):
2929
self._propagators[ot.Format.HTTP_HEADERS] = HTTPPropagator()
3030
self._propagators[ot.Format.TEXT_MAP] = TextPropagator()
3131

32+
def handle_fork(self):
33+
# Nothing to do for the Tracer; Pass onto Recorder
34+
self.recorder.handle_fork()
35+
3236
def start_active_span(self,
3337
operation_name,
3438
child_of=None,
@@ -118,9 +122,6 @@ def extract(self, format, carrier):
118122
else:
119123
raise ot.UnsupportedFormatException()
120124

121-
def handle_fork(self):
122-
self.recorder = InstanaRecorder()
123-
124125
def __add_stack(self, span, limit=None):
125126
""" Adds a backtrace to this span """
126127
span.stack = []

0 commit comments

Comments
 (0)