2
2
# (c) Copyright Instana Inc. 2020
3
3
4
4
"""
5
- A Collector launches a background thread and continually collects & reports data. The data
6
- can be any combination of metrics, snapshot data and spans.
5
+ A Collector launches a background thread and continually collects & reports data.
6
+ The data can be any combination of metrics, snapshot data and spans.
7
7
"""
8
8
9
9
import queue # pylint: disable=import-error
10
10
import threading
11
11
import time
12
+ from typing import TYPE_CHECKING , Any , DefaultDict , Dict , List , Type
12
13
13
14
from instana .log import logger
14
15
from instana .util import DictionaryOfStan
15
16
17
+ if TYPE_CHECKING :
18
+ from instana .agent .base import BaseAgent
19
+ from instana .span .readable_span import ReadableSpan
20
+
16
21
17
22
class BaseCollector (object ):
18
23
"""
19
24
Base class to handle the collection & reporting of snapshot and metric data
20
25
This class launches a background thread to do this work.
21
26
"""
22
27
23
- def __init__ (self , agent ) :
28
+ def __init__ (self , agent : Type [ "BaseAgent" ]) -> None :
24
29
# The agent for this process. Can be Standard, AWSLambda or Fargate
25
30
self .agent = agent
26
31
@@ -61,7 +66,7 @@ def __init__(self, agent):
61
66
# Start time of fetching metadata
62
67
self .fetching_start_time = 0
63
68
64
- def is_reporting_thread_running (self ):
69
+ def is_reporting_thread_running (self ) -> bool :
65
70
"""
66
71
Indicates if there is a thread running with the name self.THREAD_NAME
67
72
"""
@@ -70,14 +75,14 @@ def is_reporting_thread_running(self):
70
75
return True
71
76
return False
72
77
73
- def start (self ):
78
+ def start (self ) -> None :
74
79
"""
75
80
Starts the collector and starts reporting as long as the agent is in a ready state.
76
81
@return: None
77
82
"""
78
83
if self .is_reporting_thread_running ():
79
84
if self .thread_shutdown .is_set ():
80
- # Force a restart.
85
+ # Force a restart.
81
86
self .thread_shutdown .clear ()
82
87
# Reschedule this start in 5 seconds from now
83
88
timer = threading .Timer (5 , self .start )
@@ -92,7 +97,9 @@ def start(self):
92
97
if self .agent .can_send ():
93
98
logger .debug ("BaseCollector.start: launching collection thread" )
94
99
self .thread_shutdown .clear ()
95
- self .reporting_thread = threading .Thread (target = self .background_report , args = ())
100
+ self .reporting_thread = threading .Thread (
101
+ target = self .background_report , args = ()
102
+ )
96
103
self .reporting_thread .daemon = True
97
104
self .reporting_thread .name = self .THREAD_NAME
98
105
self .reporting_thread .start ()
@@ -102,7 +109,7 @@ def start(self):
102
109
"BaseCollector.start: the agent tells us we can't send anything out"
103
110
)
104
111
105
- def shutdown (self , report_final = True ):
112
+ def shutdown (self , report_final : bool = True ) -> None :
106
113
"""
107
114
Shuts down the collector and reports any final data (if possible).
108
115
e.g. If the host agent disappeared, we won't be able to report final data.
@@ -118,10 +125,10 @@ def background_report(self) -> None:
118
125
"""
119
126
The main work-horse method to report data in the background thread.
120
127
121
- This method runs indefinitely, preparing and reporting data at regular
128
+ This method runs indefinitely, preparing and reporting data at regular
122
129
intervals.
123
130
It checks for a shutdown signal and stops execution if it's set.
124
-
131
+
125
132
@return: None
126
133
"""
127
134
while True :
@@ -134,7 +141,7 @@ def background_report(self) -> None:
134
141
self .prepare_and_report_data ()
135
142
time .sleep (self .report_interval )
136
143
137
- def prepare_and_report_data (self ):
144
+ def prepare_and_report_data (self ) -> bool :
138
145
"""
139
146
Prepare and report the data payload.
140
147
@return: Boolean
@@ -144,26 +151,26 @@ def prepare_and_report_data(self):
144
151
self .agent .report_data_payload (payload )
145
152
return True
146
153
147
- def prepare_payload (self ):
154
+ def prepare_payload (self ) -> DefaultDict [ str , Any ] :
148
155
"""
149
156
Method to prepare the data to be reported.
150
157
@return: DictionaryOfStan()
151
158
"""
152
159
logger .debug ("BaseCollector: prepare_payload needs to be overridden" )
153
160
return DictionaryOfStan ()
154
161
155
- def should_send_snapshot_data (self ):
162
+ def should_send_snapshot_data (self ) -> bool :
156
163
"""
157
164
Determines if snapshot data should be sent
158
165
@return: Boolean
159
166
"""
160
167
logger .debug ("BaseCollector: should_send_snapshot_data needs to be overridden" )
161
168
return False
162
169
163
- def collect_snapshot (self , * argv , ** kwargs ):
170
+ def collect_snapshot (self , * argv , ** kwargs ) -> None :
164
171
logger .debug ("BaseCollector: collect_snapshot needs to be overridden" )
165
172
166
- def queued_spans (self ):
173
+ def queued_spans (self ) -> List [ "ReadableSpan" ] :
167
174
"""
168
175
Get all of the queued spans
169
176
@return: list
@@ -178,7 +185,7 @@ def queued_spans(self):
178
185
spans .append (span )
179
186
return spans
180
187
181
- def queued_profiles (self ):
188
+ def queued_profiles (self ) -> List [ Dict [ str , Any ]] :
182
189
"""
183
190
Get all of the queued profiles
184
191
@return: list
0 commit comments