Skip to content

Commit 11f6d9a

Browse files
authored
Improved host agent communications (#100)
* Functions to support agent request handling * Revamped host agent communications - Use requests package: less memory, keep-alive and better API - Greatly simplified Agent request functions into more logical components - Much faster initial snapshot reporting (process details show up almost instantaneously now) - Better error handling when complications with host agent arise - This sensor can now handle agent tasks - More import improvements * Better exception handling/reporting - Use tuple for py3 - Shh unnecessary exception logging * Add stacks to exit spans * Remove debug remnant * Final fixes for tasks; Fix task response URL * Add stack fingerprint; Limit frames * Better frame exclusion * Better syntax * Add stack validations to tests
1 parent 17574c0 commit 11f6d9a

16 files changed

+406
-183
lines changed

instana/agent.py

Lines changed: 142 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,19 @@
22

33
import json
44
import os
5-
import threading
65
from datetime import datetime
76

7+
import requests
8+
89
import instana.singletons
910

10-
from .agent_const import AGENT_DEFAULT_HOST, AGENT_DEFAULT_PORT
11+
from .agent_const import (AGENT_DATA_PATH, AGENT_DEFAULT_HOST,
12+
AGENT_DEFAULT_PORT, AGENT_DISCOVERY_PATH,
13+
AGENT_HEADER, AGENT_RESPONSE_PATH, AGENT_TRACES_PATH)
1114
from .fsm import Fsm
1215
from .log import logger
1316
from .sensor import Sensor
1417

15-
try:
16-
import urllib.request as urllib2
17-
except ImportError:
18-
import urllib2
19-
2018

2119
class From(object):
2220
pid = ""
@@ -26,18 +24,6 @@ def __init__(self, **kwds):
2624
self.__dict__.update(kwds)
2725

2826

29-
class Head(urllib2.Request):
30-
31-
def get_method(self):
32-
return "HEAD"
33-
34-
35-
class Put(urllib2.Request):
36-
37-
def get_method(self):
38-
return "PUT"
39-
40-
4127
class Agent(object):
4228
sensor = None
4329
host = AGENT_DEFAULT_HOST
@@ -48,6 +34,7 @@ class Agent(object):
4834
last_fork_check = None
4935
_boot_pid = os.getpid()
5036
extra_headers = None
37+
client = requests.Session()
5138

5239
def __init__(self):
5340
logger.debug("initializing agent")
@@ -87,79 +74,6 @@ def can_send(self):
8774

8875
return False
8976

90-
def head(self, url):
91-
return self.request(url, "HEAD", None)
92-
93-
def request(self, url, method, o):
94-
return self.full_request_response(url, method, o, False, "")
95-
96-
def request_response(self, url, method, o):
97-
return self.full_request_response(url, method, o, True, "")
98-
99-
def request_header(self, url, method, header):
100-
return self.full_request_response(url, method, None, False, header)
101-
102-
def full_request_response(self, url, method, o, body, header):
103-
b = None
104-
h = None
105-
try:
106-
if method == "HEAD":
107-
request = Head(url)
108-
elif method == "GET":
109-
request = urllib2.Request(url)
110-
elif method == "PUT":
111-
request = Put(url, self.to_json(o))
112-
request.add_header("Content-Type", "application/json")
113-
else:
114-
request = urllib2.Request(url, self.to_json(o))
115-
request.add_header("Content-Type", "application/json")
116-
117-
response = urllib2.urlopen(request, timeout=2)
118-
119-
if not response:
120-
self.reset()
121-
else:
122-
if response.getcode() < 200 or response.getcode() >= 300:
123-
logger.error("Request returned erroneous code", response.getcode())
124-
if self.can_send():
125-
self.reset()
126-
else:
127-
self.last_seen = datetime.now()
128-
if body:
129-
b = response.read()
130-
131-
if header:
132-
h = response.info().get(header)
133-
134-
if method == "HEAD":
135-
b = True
136-
# logger.warn("%s %s --> response: %s" % (method, url, b))
137-
except Exception as e:
138-
# No need to show the initial 404s or timeouts. The agent
139-
# should handle those correctly.
140-
if not (type(e) is urllib2.HTTPError and e.code == 404):
141-
logger.debug("%s: full_request_response: %s" %
142-
(threading.current_thread().name, str(e)))
143-
144-
return (b, h)
145-
146-
def make_url(self, prefix):
147-
return self.make_host_url(self.host, prefix)
148-
149-
def make_host_url(self, host, prefix):
150-
port = self.sensor.options.agent_port
151-
if port == 0:
152-
port = AGENT_DEFAULT_PORT
153-
154-
return self.make_full_url(host, port, prefix)
155-
156-
def make_full_url(self, host, port, prefix):
157-
s = "http://%s:%s%s" % (host, str(port), prefix)
158-
if self.from_.pid != 0:
159-
s = "%s%s" % (s, self.from_.pid)
160-
161-
return s
162-
16377
def set_from(self, json_string):
16478
if type(json_string) is bytes:
16579
raw_json = json_string.decode("UTF-8")
@@ -170,7 +84,7 @@ def set_from(self, json_string):
17084

17185
if "extraHeaders" in res_data:
17286
self.extra_headers = res_data['extraHeaders']
173-
logger.debug("Will also capture these custom headers: %s", self.extra_headers)
87+
logger.info("Will also capture these custom headers: %s", self.extra_headers)
17488

17589
self.from_ = From(pid=res_data['pid'], agentUuid=res_data['agentUuid'])
17690

@@ -180,6 +94,141 @@ def reset(self):
18094
self.fsm.reset()
18195

18296
def handle_fork(self):
97+
"""
98+
Forks happen. Here we handle them.
99+
"""
183100
self.reset()
184101
self.sensor.handle_fork()
185102
instana.singletons.tracer.handle_fork()
103+
104+
def is_agent_listening(self, host, port):
105+
"""
106+
Check if the Instana Agent is listening on <host> and <port>.
107+
"""
108+
try:
109+
rv = False
110+
url = "http://%s:%s/" % (host, port)
111+
response = self.client.get(url, timeout=0.8)
112+
113+
server_header = response.headers["Server"]
114+
if server_header == AGENT_HEADER:
115+
logger.debug("Host agent found on %s:%d" % (host, port))
116+
rv = True
117+
else:
118+
logger.debug("...something is listening on %s:%d but it's not the Instana Agent: %s"
119+
% (host, port, server_header))
120+
except (requests.ConnectTimeout, requests.ConnectionError):
121+
logger.debug("No host agent listening on %s:%d" % (host, port))
122+
rv = False
123+
finally:
124+
return rv
125+
126+
def announce(self, discovery):
127+
"""
128+
With the passed in Discovery class, attempt to announce to the host agent.
129+
"""
130+
try:
131+
url = self.__discovery_url()
132+
logger.debug("making announce request to %s" % (url))
133+
response = None
134+
response = self.client.put(url,
135+
data=self.to_json(discovery),
136+
headers={"Content-Type": "application/json"},
137+
timeout=0.8)
138+
139+
if response.status_code is 200:
140+
self.last_seen = datetime.now()
141+
except (requests.ConnectTimeout, requests.ConnectionError):
142+
logger.debug("announce", exc_info=True)
143+
finally:
144+
return response
145+
146+
def report_data(self, entity_data):
147+
"""
148+
Used to report entity data (metrics & snapshot) to the host agent.
149+
"""
150+
try:
151+
response = None
152+
response = self.client.post(self.__data_url(),
153+
data=self.to_json(entity_data),
154+
headers={"Content-Type": "application/json"},
155+
timeout=0.8)
156+
157+
if response.status_code is 200:
158+
self.last_seen = datetime.now()
159+
except (requests.ConnectTimeout, requests.ConnectionError):
160+
logger.debug("report_data: host agent connection error")
161+
finally:
162+
return response
163+
164+
def report_traces(self, spans):
165+
"""
166+
Used to report entity data (metrics & snapshot) to the host agent.
167+
"""
168+
try:
169+
response = None
170+
response = self.client.post(self.__traces_url(),
171+
data=self.to_json(spans),
172+
headers={"Content-Type": "application/json"},
173+
timeout=0.8)
174+
if response.status_code is 200:
175+
self.last_seen = datetime.now()
176+
except (requests.ConnectTimeout, requests.ConnectionError):
177+
logger.debug("report_traces: host agent connection error")
178+
finally:
179+
return response
180+
181+
def task_response(self, message_id, data):
182+
"""
183+
When the host agent passes us a task and we do it, this function is used to
184+
respond with the results of the task.
185+
"""
186+
try:
187+
response = None
188+
payload = json.dumps(data)
189+
190+
logger.debug("Task response is %s: %s" % (self.__response_url(message_id), payload))
191+
192+
response = self.client.post(self.__response_url(message_id),
193+
data=payload,
194+
headers={"Content-Type": "application/json"},
195+
timeout=0.8)
196+
except (requests.ConnectTimeout, requests.ConnectionError):
197+
logger.debug("task_response", exc_info=True)
198+
except Exception:
199+
logger.debug("task_response Exception", exc_info=True)
200+
finally:
201+
return response
202+
203+
def __discovery_url(self):
204+
"""
205+
URL for announcing to the host agent
206+
"""
207+
port = self.sensor.options.agent_port
208+
if port == 0:
209+
port = AGENT_DEFAULT_PORT
210+
211+
return "http://%s:%s/%s" % (self.host, port, AGENT_DISCOVERY_PATH)
212+
213+
def __data_url(self):
214+
"""
215+
URL for posting metrics to the host agent. Only valid when announced.
216+
"""
217+
path = AGENT_DATA_PATH % self.from_.pid
218+
return "http://%s:%s/%s" % (self.host, self.port, path)
219+
220+
def __traces_url(self):
221+
"""
222+
URL for posting traces to the host agent. Only valid when announced.
223+
"""
224+
path = AGENT_TRACES_PATH % self.from_.pid
225+
return "http://%s:%s/%s" % (self.host, self.port, path)
226+
227+
def __response_url(self, message_id):
228+
"""
229+
URL for responding to agent requests.
230+
"""
231+
if self.from_.pid != 0:
232+
path = AGENT_RESPONSE_PATH % (self.from_.pid, message_id)
233+
234+
return "http://%s:%s/%s" % (self.host, self.port, path)

instana/agent_const.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
AGENT_DISCOVERY_URL = "/com.instana.plugin.python.discovery"
2-
AGENT_TRACES_URL = "/com.instana.plugin.python/traces."
3-
AGENT_DATA_URL = "/com.instana.plugin.python."
1+
AGENT_DISCOVERY_PATH = "com.instana.plugin.python.discovery"
2+
AGENT_TRACES_PATH = "com.instana.plugin.python/traces.%d"
3+
AGENT_DATA_PATH = "com.instana.plugin.python.%d"
4+
AGENT_RESPONSE_PATH = "com.instana.plugin.python/response.%d?messageId=%s"
45
AGENT_DEFAULT_HOST = "localhost"
56
AGENT_DEFAULT_PORT = 42699
67
AGENT_HEADER = "Instana Agent"

instana/fsm.py

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
import fysom as f
1111
import pkg_resources
1212

13-
from .agent_const import (AGENT_DATA_URL, AGENT_DEFAULT_HOST,
14-
AGENT_DEFAULT_PORT, AGENT_DISCOVERY_URL,
15-
AGENT_HEADER)
13+
from .agent_const import AGENT_DEFAULT_HOST, AGENT_DEFAULT_PORT
1614
from .log import logger
1715

1816

@@ -82,17 +80,15 @@ def reset(self):
8280
def lookup_agent_host(self, e):
8381
host, port = self.__get_agent_host_port()
8482

85-
h = self.check_host(host, port)
86-
if h == AGENT_HEADER:
83+
if self.agent.is_agent_listening(host, port):
8784
self.agent.host = host
8885
self.agent.port = port
8986
self.fsm.announce()
9087
return True
9188
elif os.path.exists("/proc/"):
9289
host = self.get_default_gateway()
9390
if host:
94-
h = self.check_host(host, port)
95-
if h == AGENT_HEADER:
91+
if self.agent.is_agent_listening(host, port):
9692
self.agent.host = host
9793
self.agent.port = port
9894
self.fsm.announce()
@@ -120,17 +116,9 @@ def get_default_gateway(self):
120116

121117
return None
122118

123-
def check_host(self, host, port):
124-
logger.debug("checking %s:%d" % (host, port))
125-
126-
(_, h) = self.agent.request_header(
127-
self.agent.make_host_url(host, "/"), "GET", "Server")
128-
129-
return h
130-
131119
def announce_sensor(self, e):
132120
logger.debug("announcing sensor to the agent")
133-
s = None
121+
sock = None
134122
pid = os.getpid()
135123
cmdline = []
136124

@@ -159,16 +147,16 @@ def announce_sensor(self, e):
159147

160148
# If we're on a system with a procfs
161149
if os.path.exists("/proc/"):
162-
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
163-
s.connect((self.agent.host, 42699))
164-
path = "/proc/%d/fd/%d" % (pid, s.fileno())
165-
d.fd = s.fileno()
150+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
151+
sock.connect((self.agent.host, 42699))
152+
path = "/proc/%d/fd/%d" % (pid, sock.fileno())
153+
d.fd = sock.fileno()
166154
d.inode = os.readlink(path)
167155

168-
(b, _) = self.agent.request_response(
169-
self.agent.make_url(AGENT_DISCOVERY_URL), "PUT", d)
170-
if b:
171-
self.agent.set_from(b)
156+
response = self.agent.announce(d)
157+
158+
if response and (response.status_code is 200) and (len(response.content) > 2):
159+
self.agent.set_from(response.content)
172160
self.fsm.ready()
173161
logger.info("Host agent available. We're in business. Announced pid: %s (true pid: %s)" %
174162
(str(pid), str(self.agent.from_.pid)))
@@ -185,16 +173,6 @@ def schedule_retry(self, fun, e, name):
185173
self.timer.name = name
186174
self.timer.start()
187175

188-
def test_agent(self, e):
189-
logger.debug("testing communication with the agent")
190-
191-
(b, _) = self.agent.head(self.agent.make_url(AGENT_DATA_URL))
192-
193-
if not b:
194-
self.schedule_retry(self.test_agent, e, "agent test")
195-
else:
196-
self.fsm.test()
197-
198176
def __get_real_pid(self):
199177
"""
200178
Attempts to determine the true process ID by querying the

0 commit comments

Comments
 (0)