Skip to content

Commit 0ea92e0

Browse files
author
Daniel Abercrombie
committed
Merge remote branch 'origin/master' into user
Conflicts: workflowmonit/requirements.txt workflowmonit/stompAMQ.py
2 parents b2608c9 + ff02c43 commit 0ea92e0

File tree

10 files changed

+64
-92
lines changed

10 files changed

+64
-92
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ services:
1111
- mongodb
1212
env: TMPDIR=$PWD/tmp
1313
install:
14-
- python setup.py install
14+
- pip install .
1515
script:
1616
- pip list
1717
- package=workflowwebtools opsspace-test

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
},
2020
install_requires=[
2121
'cmstoolbox>=0.12.0',
22+
'CMSMonitoring',
2223
'more-itertools<6.0.0',
2324
'cherrypy<18.0.0',
2425
'mako',

workflowmonit/README.rst

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
workflowmonit
22
-------------
33

4-
Component which periodically fetches information of workflows in the system(list extracted from Unified database), process and organizes into static documents, then sends to CMSMONIT service via :py:mod:`workflowmonit.stompAMQ` for storage, monitoring and post-aggregation.
4+
Component which periodically fetches information of workflows in the system(list extracted from Unified database), process and organizes into static documents, then sends to CMSMONIT service via :py:mod:`CMSMonitoring.StompAMQ` for storage, monitoring and post-aggregation.
55

66
- :ref:`usedApi-ref`
77
- Composition
88
- :ref:`wmCollector-ref`
99
- :ref:`wmSender-ref`
1010
- :ref:`wmScheduler-ref`
11-
- :ref:`wmStompAMQ-ref`
1211
- :ref:`wmDocExample-ref`
1312

1413
.. _usedApi-ref:
@@ -55,20 +54,11 @@ Schedule the ``main`` function of :ref:`wmSender-ref` every hour with :py:mod:`s
5554
time.sleep(1)
5655

5756

58-
.. _wmStompAMQ-ref:
59-
60-
stompAMQ
61-
~~~~~~~~
62-
63-
.. automodule:: workflowmonit.stompAMQ
64-
:members:
65-
66-
6757
.. _wmDocExample-ref:
6858

6959
Document examples
7060
~~~~~~~~~~~~~~~~~
7161

7262
1. document describling a single workflow `example1 <http://wsi.web.cern.ch/wsi/public/toSaveExample4.json>`_
73-
2. document wrapped by ``stompAMQ`` as a batch `example2.1 <http://wsi.web.cern.ch/wsi/public/godummy2.json>`_, `example2.2 <http://wsi.web.cern.ch/wsi/public/amqMsg_190208-201142.json>`_
74-
3. document sent out by ``stomp`` (what ``stompAMQ`` wrapped around) `example3 <http://wsi.web.cern.ch/wsi/public/bab2ef60-b0f2-4b55-9434-95a9cfd00510.json>`_
63+
2. document wrapped by ``StompAMQ`` as a batch `example2.1 <http://wsi.web.cern.ch/wsi/public/godummy2.json>`_, `example2.2 <http://wsi.web.cern.ch/wsi/public/amqMsg_190208-201142.json>`_
64+
3. document sent out by ``stomp`` (what ``StompAMQ`` wrapped around) `example3 <http://wsi.web.cern.ch/wsi/public/bab2ef60-b0f2-4b55-9434-95a9cfd00510.json>`_

workflowmonit/alertingDefs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def alertWithEmail(docs, recipients):
5151

5252
for doc in docs:
5353
alertResults = [ad(doc) for ad in AlertDefs]
54-
positiveRes = filter(lambda d: d[0], alertResults)
54+
positiveRes = [r for r in alertResults if r[0]]
5555
if positiveRes:
5656
shortAlertMsgs = [x[1] for x in positiveRes]
5757
_contentMsg = '\n\n'.join([

workflowmonit/dumpWfStatusDb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def main():
2121
conn = sqlite3.connect(dbPath)
2222
with conn:
2323
c = conn.cursor()
24-
for row in c.execute("SELECT * FROM workflowStatuses WHERE status='running-open' ORDER BY failurerate"):
24+
for row in c.execute("SELECT * FROM workflowStatuses WHERE status LIKE '%archived' ORDER BY failurerate"):
2525
print('[ {1:^15} ]\t{2:.6f}\t{0}'.format(*row))
2626

2727
if __name__ == "__main__":

workflowmonit/requirements.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
11
schedule
2-
CMSMonitoring
3-
stomp.py
42
pyyaml>=5.1

workflowmonit/sendToMonit.py

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
import logging
99
import threading
1010
import logging.config
11-
from Queue import Queue
11+
try:
12+
from Queue import Queue
13+
except ImportError:
14+
from queue import Queue # pylint: disable=import-error
1215

1316
import yaml
14-
from workflowmonit.stompAMQ import stompAMQ
17+
from CMSMonitoring.StompAMQ import StompAMQ
1518
import workflowmonit.workflowCollector as wc
1619
import workflowmonit.alertingDefs as ad
1720

@@ -99,14 +102,7 @@ def getCompletedWorkflowsFromDb(configPath):
99102
"""
100103
Get completed workflow list from local status db (setup to avoid unnecessary caching)
101104
102-
Workflows whose status is one of
103-
104-
- *running-closed*
105-
- *completed*
106-
- *aborted-archived*
107-
- *rejected-arcived*
108-
109-
are removed from further caching.
105+
Workflows whose status ends with *archived* are removed from further caching.
110106
111107
:param str configPath: location of config file
112108
:returns: list of workflow (str)
@@ -127,7 +123,7 @@ def getCompletedWorkflowsFromDb(configPath):
127123
status TEXT,
128124
failurerate REAL
129125
);"""
130-
DB_QUERY_CMD = """SELECT * FROM workflowStatuses WHERE status IN ('running-closed', 'completed', 'aborted-archived', 'rejected-archived')"""
126+
DB_QUERY_CMD = """SELECT * FROM workflowStatuses WHERE status LIKE '%archived'"""
131127

132128
res = []
133129
conn = sqlite3.connect(dbPath)
@@ -205,20 +201,21 @@ def buildDoc(configpath):
205201

206202
wc.invalidate_caches('/tmp/wsi/workflowinfo')
207203

208-
q = TimeoutQueue()
209-
num_threads = min(150, len(wkfs))
210-
for wf in wkfs:
211-
q.put(wf)
212-
213204
results = list()
214-
for _ in range(num_threads):
215-
t = threading.Thread(target=worker, args=(results, q, completedWfs, ))
216-
t.daemon = True
217-
t.start()
218-
try:
219-
q.join_with_timeout(30*60) # timeout 30min
220-
except NotFinished:
221-
pass
205+
206+
q = TimeoutQueue(maxsize=500)
207+
num_threads = 500
208+
for i, wf in enumerate(wkfs, 1):
209+
q.put(wf)
210+
if q.full() or i==len(wkfs):
211+
for _ in range(num_threads):
212+
t = threading.Thread(target=worker, args=(results, q, completedWfs, ))
213+
t.daemon = True
214+
t.start()
215+
try:
216+
q.join_with_timeout(30*60) # timeout 30min
217+
except NotFinished:
218+
pass
222219

223220
updateWorkflowStatusToDb(configpath, results)
224221
logger.info('Number of updated workflows: {}'.format(len(results)))
@@ -240,12 +237,12 @@ def sendDoc(cred, docs):
240237
return []
241238

242239
try:
243-
amq = stompAMQ(
244-
None, # username
245-
None, # password
246-
cred['producer'],
247-
cred['topic'],
248-
# default [('agileinf-mb.cern.ch', 61213)]
240+
amq = StompAMQ(
241+
username = None,
242+
password = None,
243+
producer = cred['producer'],
244+
topic = cred['topic'],
245+
validation_schema = None,
249246
host_and_ports=[
250247
(cred['hostport']['host'], cred['hostport']['port'])],
251248
logger=logger,
@@ -255,7 +252,7 @@ def sendDoc(cred, docs):
255252

256253
doctype = 'workflowmonit_{}'.format(cred['producer'])
257254
notifications = [amq.make_notification(
258-
payload=doc, docType=doctype) for doc in docs]
255+
payload=doc, docType=doctype)[0] for doc in docs]
259256
failures = amq.send(notifications)
260257

261258
logger.info("{}/{} docs successfully sent to AMQ.".format(
@@ -265,6 +262,7 @@ def sendDoc(cred, docs):
265262
except Exception as e:
266263
logger.exception(
267264
"Failed to send data to StompAMQ. Error: {}".format(str(e)))
265+
raise
268266

269267

270268
def main():
@@ -291,17 +289,18 @@ def main():
291289

292290
doc_bkp = os.path.join(LOGDIR, 'toSendDoc_{}'.format(
293291
time.strftime('%y%m%d-%H%M%S')))
294-
wc.save_json(docs, doc_bkp)
295-
logger.info('Document saved at: {}.json'.format(doc_bkp))
292+
docfn = wc.save_json(docs, filename=doc_bkp, gzipped=True)
293+
logger.info('Document saved at: {}'.format(docfn))
296294

297295
failures = sendDoc(cred=cred, docs=docs)
298296

299297
failedDocs_bkp = os.path.join(
300298
LOGDIR, 'amqFailedMsg_{}'.format(time.strftime('%y%m%d-%H%M%S')))
301299
if len(failures):
302-
wc.save_json(failures, failedDocs_bkp)
303-
logger.info('Failed message saved at: {}.json'.format(failedDocs_bkp))
300+
failedDocFn = wc.save_json(failures, filename=failedDocs_bkp, gzipped=True)
301+
logger.info('Failed message saved at: {}'.format(failedDocFn))
304302
except Exception as e:
303+
logger.exception("Exception encounted, sending emails to {}".format(str(recipients)))
305304
ad.errorEmailShooter(str(e), recipients)
306305

307306

workflowmonit/stompAMQ.py

Lines changed: 0 additions & 32 deletions
This file was deleted.

workflowmonit/workflowCollector.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import sys
77
import json
88
import time
9+
import gzip
910
import shutil
1011
import threading
1112
from collections import defaultdict
@@ -16,18 +17,30 @@
1617
from workflowwebtools import errorutils
1718

1819

19-
def save_json(json_obj, filename='tmp'):
20+
def save_json(json_obj, filename='tmp', gzipped=False):
2021
"""
2122
save json object to a local formatted text file, for debug
2223
2324
:param dict json_obj: the json object
2425
:param str filename: the base name of the file to be saved
25-
:returns: None
26+
:param bool gzipped: if gzip output document, default is False
27+
:returns: full filename
28+
29+
:rtype: str
2630
"""
2731

28-
with open('%s.json' % filename, 'w') as tmp:
29-
tmp.write(json.dumps(json_obj, sort_keys=True,
30-
indent=4, separators=(',', ': ')))
32+
fn = "{}.json".format(filename)
33+
msg = json.dumps(json_obj, sort_keys=True, indent=4, separators=(',', ': '))
34+
35+
if gzipped:
36+
fn += '.gz'
37+
with gzip.open(fn, 'wb') as f:
38+
f.write(msg.encode())
39+
else:
40+
with open(fn, 'w') as f:
41+
f.write(msg)
42+
43+
return fn
3144

3245

3346
def get_yamlconfig(configPath):
@@ -619,7 +632,10 @@ def main():
619632
print("Number of workflows retrieved from Oracle DB: ", len(wfs))
620633
invalidate_caches()
621634

622-
from Queue import Queue
635+
try:
636+
from Queue import Queue
637+
except ImportError:
638+
from queue import Queue # pylint: disable=import-error
623639
q = Queue()
624640
num_threads = min(150, len(wfs))
625641

workflowmonit/workflowMonitScheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
LOGDIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'Logs')
1111
LOGGING_CONFIG = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'configLogging.yml')
1212

13-
schedule.every().hour.do(wms.main)
13+
schedule.every(30).minutes.do(wms.main)
1414

1515

1616
if __name__ == "__main__":

0 commit comments

Comments
 (0)