Skip to content

Commit d81f45b

Browse files
committed
Initial revision.
This project started as an internal tool at Mochi Media.
0 parents  commit d81f45b

9 files changed

+1598
-0
lines changed

AUTHORS

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Ralph Meijer <[email protected]>

LICENSE

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Copyright (c) 2012-2013 Mochi Media, Inc.
2+
3+
Permission is hereby granted, free of charge, to any person obtaining
4+
a copy of this software and associated documentation files (the
5+
"Software"), to deal in the Software without restriction, including
6+
without limitation the rights to use, copy, modify, merge, publish,
7+
distribute, sublicense, and/or sell copies of the Software, and to
8+
permit persons to whom the Software is furnished to do so, subject to
9+
the following conditions:
10+
11+
The above copyright notice and this permission notice shall be
12+
included in all copies or substantial portions of the Software.
13+
14+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15+
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17+
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18+
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19+
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20+
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

README.md

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
Vör
2+
---
3+
4+
What is this?
5+
=============
6+
7+
Vör is a set of services for polling metrics to be sent to Graphite.
8+
9+
10+
Requirements
11+
============
12+
13+
- Twisted
14+
- simplejson
15+
16+
17+
Copyright and Warranty
18+
======================
19+
20+
The code in this distribution started as an internal tool at Mochi Media and is
21+
made available under the MIT License. See the included LICENSE file for details.
22+
23+
24+
Name
25+
====
26+
27+
In Norse mythology, Vör is a goddess associated with wisdom. She is described
28+
as "wise and inquiring, so that nothing can be concealed from her".

doc/examples/poller.tac

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from twisted.application import service
2+
from twisted.internet import reactor
3+
4+
from vor.elasticsearch import ElasticSearchNodeStatsGraphiteService
5+
from vor.elasticsearch import ElasticSearchHealthGraphiteService
6+
from vor.graphite import GraphiteClientFactory
7+
8+
application = service.Application("Test")
9+
10+
collector = ElasticSearchNodeStatsGraphiteService('http://localhost:9200/')
11+
collector.setServiceParent(application)
12+
13+
factory = GraphiteClientFactory(collector)
14+
reactor.connectTCP('localhost', 2003, factory)
15+
16+
collector = ElasticSearchHealthGraphiteService('http://localhost:9200/')
17+
collector.setServiceParent(application)
18+
19+
factory = GraphiteClientFactory(collector)
20+
reactor.connectTCP('localhost', 2003, factory)

vor/__init__.py

Whitespace-only changes.

vor/elasticsearch.py

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""
2+
ElasticSearch stats poller that sends statistics to Graphite.
3+
"""
4+
5+
import simplejson
6+
7+
from twisted.application import service
8+
from twisted.internet import task
9+
from twisted.python import log
10+
from twisted.web.client import getPage
11+
12+
class BaseElasticSearchGraphiteService(service.Service):
13+
"""
14+
Base service for polling ElasticSearch stats to send to Graphite.
15+
16+
@ivar protocol: The Graphite protocol.
17+
@type protocol: L{vor.graphite.GraphiteLineProtocol}
18+
"""
19+
interval = 5
20+
suffixes = ()
21+
API = None
22+
23+
def __init__(self, baseURL):
24+
"""
25+
@param baseURL: Base URL of the ElasticSearch API, including trailing
26+
slash.
27+
"""
28+
self.endpoint = baseURL + self.API
29+
self.protocol = None
30+
31+
32+
def _flattenValue(self, data, value, prefix, key, timestamp):
33+
key = key.replace(' ', '')
34+
flatKey = "%s.%s" % (prefix, key)
35+
if hasattr(value, 'upper'):
36+
# Skip strings unless they have suffixed counterparts with a
37+
# metric.
38+
for suffix in self.suffixes:
39+
suffixedKey = key + suffix
40+
if suffixedKey in data:
41+
self.sendMetric(flatKey, data[suffixedKey], timestamp)
42+
break
43+
elif hasattr(value, 'iteritems'):
44+
# Dicts are flattened.
45+
self._flattenDict(value, flatKey, timestamp)
46+
elif hasattr(value, 'index'):
47+
# Give each item in a sequence their own index and flatten the
48+
# value.
49+
index = 0
50+
for item in value:
51+
self._flattenValue(None, item, flatKey, '%d' % index,
52+
timestamp)
53+
index += 1
54+
else:
55+
# A regular metric.
56+
self.sendMetric(flatKey, value, timestamp)
57+
58+
59+
def _flattenDict(self, data, prefix, timestamp):
60+
if 'timestamp' in data:
61+
timestamp = data['timestamp'] / 1000.
62+
63+
for key, value in data.iteritems():
64+
if (key.endswith(self.suffixes) or
65+
key == 'timestamp'):
66+
continue
67+
else:
68+
self._flattenValue(data, value, prefix, key, timestamp)
69+
70+
71+
def sendMetric(self, path, value, timestamp):
72+
if self.protocol:
73+
self.protocol.sendMetric(path, float(value), timestamp)
74+
75+
76+
def collectStats(self):
77+
d = getPage(self.endpoint)
78+
d.addCallback(simplejson.loads)
79+
d.addCallback(self.flatten)
80+
d.addErrback(log.err)
81+
return d
82+
83+
84+
def startService(self):
85+
self.call = task.LoopingCall(self.collectStats)
86+
self.call.start(self.interval, now=False)
87+
88+
89+
def stopService(self):
90+
self.call.stop()
91+
92+
93+
94+
class ElasticSearchNodeStatsGraphiteService(BaseElasticSearchGraphiteService):
95+
"""
96+
Service that polls ElasticSearch node stats and sends them to Graphite.
97+
98+
@ivar protocol: The Graphite protocol.
99+
@type protocol: L{vor.graphite.GraphiteLineProtocol}
100+
"""
101+
102+
suffixes = ('_in_bytes', '_in_millis')
103+
API = '_cluster/nodes/stats?all=1'
104+
105+
def flatten(self, data):
106+
for node in data['nodes'].itervalues():
107+
name = node['name']
108+
timestamp = node['timestamp']
109+
110+
prefix = 'es.nodes.'+ name
111+
if ('attributes' in node and
112+
node['attributes'].get('data', 'false') != 'true'):
113+
del node['indices']
114+
self._flattenDict(node, prefix, timestamp)

vor/graphite.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from twisted.internet import protocol
2+
from twisted.protocols.basic import LineReceiver
3+
4+
class GraphiteLineProtocol(LineReceiver):
5+
delimiter = '\n'
6+
service = None
7+
8+
def connectionMade(self):
9+
self.service.protocol = self
10+
print self.transport
11+
12+
13+
def connectionLost(self, reason):
14+
self.service.protocol = None
15+
16+
17+
def lineReceived(self, line):
18+
pass
19+
20+
21+
def sendMetric(self, path, value, timestamp):
22+
"""
23+
Send a single metric.
24+
"""
25+
self.sendLine('%s %s %s' % (path, value, timestamp))
26+
27+
28+
29+
class GraphiteClientFactory(protocol.ReconnectingClientFactory):
30+
protocol = GraphiteLineProtocol
31+
32+
def __init__(self, service):
33+
self.service = service
34+
35+
36+
def buildProtocol(self, addr):
37+
self.resetDelay()
38+
p = self.protocol()
39+
p.service = self.service
40+
return p

vor/test/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)