Skip to content

Commit b680765

Browse files
committed
Initial commit. Logger is working with all the main functionality
There are tons of optimisation that are required and better test to be written. At the moment all test require a Elasticsearch instance running on port 9200. There is a lot of testing required against HTTPS ES instances and using Kerberos, but this should be a good starting skeleton From the performance and app consistency point of view same applies, in some cases logging will lock to commit the buffer changes into elasticsearch. This should eventually done in a different thread
0 parents  commit b680765

File tree

8 files changed

+535
-0
lines changed

8 files changed

+535
-0
lines changed

.gitignore

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Byte-compiled / optimized / DLL files
2+
__pycache__/
3+
*.py[cod]
4+
*$py.class
5+
.idea
6+
7+
# C extensions
8+
*.so
9+
10+
# Distribution / packaging
11+
.Python
12+
env/
13+
build/
14+
develop-eggs/
15+
dist/
16+
downloads/
17+
eggs/
18+
.eggs/
19+
lib/
20+
lib64/
21+
parts/
22+
sdist/
23+
var/
24+
*.egg-info/
25+
.installed.cfg
26+
*.egg
27+
28+
# PyInstaller
29+
# Usually these files are written by a python script from a template
30+
# before PyInstaller builds the exe, so as to inject date/other infos into it.
31+
*.manifest
32+
*.spec
33+
34+
# Installer logs
35+
pip-log.txt
36+
pip-delete-this-directory.txt
37+
38+
# Unit test / coverage reports
39+
htmlcov/
40+
.tox/
41+
.coverage
42+
.coverage.*
43+
.cache
44+
nosetests.xml
45+
coverage.xml
46+
*,cover
47+
.hypothesis/
48+
49+
# Translations
50+
*.mo
51+
*.pot
52+
53+
# Django stuff:
54+
*.log
55+
local_settings.py
56+
57+
# Flask instance folder
58+
instance/
59+
60+
# Sphinx documentation
61+
docs/_build/
62+
63+
# PyBuilder
64+
target/
65+
66+
# IPython Notebook
67+
.ipynb_checkpoints
68+
69+
# pyenv
70+
.python-version

LICENSE.md

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
Copyright 2016 Carlos Manzanedo Rueda
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.

README.rst

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
Python Elasticsearch Log handler
2+
---------------------------------
3+
4+
This code sets up an elasticsearch handler that can be used to push python logs
5+
into elasticsearch for analysis.
6+
7+
Installation
8+
------------
9+
10+
Requirements
11+
------------
12+
13+
Building the sources & Testing
14+
------------------------------
15+
16+
Setting up a log handler in your python program
17+
-----------------------------------------------
18+
19+
Django integration
20+
------------------
21+
22+
Why using an appender rather than logstash or beats
23+
---------------------------------------------------
24+
25+
Contributing back
26+
-----------------

cmreshandler/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from cmreshandler import CMRESHandler
2+
3+
__all__ = ['CMRESHandler']

cmreshandler/cmreshandler.py

+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
""" Elasticsearch logging handler
2+
"""
3+
4+
import logging
5+
from enum import Enum
6+
from elasticsearch import helpers as ESHelpers
7+
from elasticsearch import Elasticsearch, RequestsHttpConnection
8+
from requests_kerberos import HTTPKerberosAuth, DISABLED
9+
import datetime
10+
import socket
11+
from threading import Timer
12+
13+
14+
class CMRESHandler(logging.Handler):
15+
""" Elasticsearch log handler
16+
17+
Allows to log to elasticsearch into json format.
18+
All LogRecord fields are serialised and inserted
19+
"""
20+
21+
class AuthType(Enum):
22+
""" Authentication types supported
23+
24+
The handler supports
25+
- No authentication
26+
- Basic authentication
27+
- Kerberos or SSO authentication (on windows and linux)
28+
"""
29+
NO_AUTH = 0
30+
BASIC_AUTH = 1
31+
KERBEROS_AUTH = 2
32+
33+
# Defauls for the class
34+
__DEFAULT_HOST = [{'host': 'localhost', 'port': 9200}]
35+
__DEFAULT_AUTH_USER = ''
36+
__DEFAULT_AUTH_PASSWD = ''
37+
__DEFAULT_USE_SSL = False
38+
__DEFAULT_VERIFY_SSL = True
39+
__DEFAULT_AUTH_TYPE = AuthType.NO_AUTH
40+
__DEFAULT_BUFFER_SIZE = 1000
41+
__DEFAULT_FLUSH_FREQUENCY_IN_SEC = 1
42+
__DEFAULT_ADDITIONAL_FIELDS = {}
43+
__DEFAULT_ES_INDEX_NAME = 'python_logger'
44+
__DEFAULT_ES_DOC_TYPE = 'python_log'
45+
46+
__LOGGING_FILTER_FIELDS = ['msecs',
47+
'relativeCreated',
48+
'levelno',
49+
'created']
50+
51+
52+
def __init__(self,
53+
hosts=__DEFAULT_HOST,
54+
auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD),
55+
auth_type=__DEFAULT_AUTH_TYPE,
56+
use_ssl=__DEFAULT_USE_SSL,
57+
verify_ssl=__DEFAULT_VERIFY_SSL,
58+
buffer_size=__DEFAULT_BUFFER_SIZE,
59+
flush_frequency_in_sec=__DEFAULT_FLUSH_FREQUENCY_IN_SEC,
60+
es_index_name=__DEFAULT_ES_INDEX_NAME,
61+
es_doc_type=__DEFAULT_ES_DOC_TYPE,
62+
es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS):
63+
""" Handler constructor
64+
65+
:param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided
66+
in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]``` to
67+
make sure the client supports failover of one of the instertion nodes
68+
:param auth_details: When ```CMRESHandler.AuthType.BASIC_AUTH``` is used this argument must contain
69+
a tuple of string with the user and password that will be used to authenticate against
70+
the Elasticsearch servers, for example```('User','Password')
71+
:param auth_type: The authentication type to be used in the connection ```CMRESHandler.AuthType```
72+
Currently, NO_AUTH, BASIC_AUTH, KERBEROS_AUTH are supported
73+
:param use_ssl: A boolean that defines if the communications should use SSL encrypted communication
74+
:param verify_ssl: A boolean that defines if the SSL certificates are validated or not
75+
:param buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES
76+
:param flush_frequency_in_sec: A float representing how often and when the buffer will be flushed, even
77+
if the buffer_size has not been reached yet
78+
:param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a
79+
date with YYYY.MM.dd, ```python_logger``` used by default
80+
:param es_doc_type: A string with the name of the document type that will be used ```python_log``` used
81+
by default
82+
:param es_additional_fields: A dictionary with all the additional fields that you would like to add
83+
to the logs, such the application, environment, etc.
84+
:return: A ready to be used CMRESHandler.
85+
"""
86+
logging.Handler.__init__(self)
87+
88+
self.hosts = hosts
89+
self.auth_details = auth_details,
90+
self.auth_type = auth_type
91+
self.use_ssl = use_ssl
92+
self.verify_certs = verify_ssl
93+
self.buffer_size = buffer_size
94+
self.flush_frequency_in_sec = flush_frequency_in_sec
95+
self.es_index_name = es_index_name
96+
self.es_doc_type = es_doc_type
97+
self.es_additional_fileds = es_additional_fields.copy()
98+
self.es_additional_fileds.update({'host': socket.gethostname(),
99+
'host_ip': socket.gethostbyname(socket.gethostname())})
100+
101+
self._buffer = []
102+
self._timer = None
103+
self.__schedule_flush()
104+
105+
def __schedule_flush(self):
106+
if self._timer is None:
107+
self._timer = Timer(self.flush_frequency_in_sec, self.flush)
108+
self._timer.setDaemon(True)
109+
self._timer.start()
110+
111+
def __get_es_client(self):
112+
if self.auth_type == CMRESHandler.AuthType.NO_AUTH:
113+
return Elasticsearch(hosts=self.hosts,
114+
use_ssl=self.use_ssl,
115+
verify_certs=self.verify_certs,
116+
connection_class=RequestsHttpConnection)
117+
elif self.auth_type == CMRESHandler.AuthType.BASIC_AUTH:
118+
return Elasticsearch(hosts=self.hosts,
119+
http_auth=self.auth_details,
120+
use_ssl=self.use_ssl,
121+
verify_certs=self.verify_certs,
122+
connection_class=RequestsHttpConnection)
123+
elif self.auth_type == CMRESHandler.AuthType.KERBEROS_AUTH:
124+
return Elasticsearch(hosts=self.hosts,
125+
use_ssl=self.use_ssl,
126+
verify_certs=self.verify_certs,
127+
connection_class=RequestsHttpConnection(
128+
http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED)))
129+
130+
def test_es_source(self):
131+
""" Returns True if the handler can ping the Elasticsearch servers
132+
133+
Can be used to confirm the setup of a handler has been properly done and confirm
134+
that things like the authentication is working properly
135+
136+
:return: A boolean, True if the connection against elasticserach host was successful
137+
"""
138+
return self.__get_es_client().ping()
139+
140+
def __get_es_index_name(self):
141+
""" Returns elasticearch index name
142+
:return: A srting containing the elasticsearch indexname used which should include the date.
143+
"""
144+
return "%s-%s" % (self.es_index_name, datetime.datetime.now().strftime('%Y.%m.%d'))
145+
146+
def __get_es_datetime_str(self, timestamp):
147+
""" Returns elasticsearch utc formatted time for an epoch timestamp
148+
149+
:param timestamp: epoch, including milliseconds
150+
:return: A string valid for elasticsearch time record
151+
"""
152+
t = datetime.datetime.utcfromtimestamp(timestamp)
153+
return "%s.%03.dZ" % (t.strftime('%Y-%m-%dT%H:%M:%S'), int(t.microsecond/1000))
154+
155+
def flush(self):
156+
""" Flushes the buffer into ES
157+
:return: None
158+
"""
159+
if not self._timer is None and self._timer.is_alive():
160+
self._timer.cancel()
161+
self._timer = None
162+
163+
# FIXME: This should probably go on a different thread to speed up the execution
164+
if len(self._buffer) >= 0:
165+
try:
166+
actions = map(lambda x: {'_index': self.__get_es_index_name(),
167+
'_type': self.es_doc_type,
168+
'_source': x},
169+
self._buffer)
170+
results = ESHelpers.bulk(client=self.__get_es_client(),
171+
actions=actions,
172+
stats_only=True)
173+
except Exception as e:
174+
pass
175+
self._buffer = []
176+
177+
self.__schedule_flush()
178+
179+
def close(self):
180+
""" Flushes the buffer and release any outstanding resource
181+
182+
:return: None
183+
"""
184+
self.flush()
185+
self._timer.cancel()
186+
self._timer = None
187+
188+
def emit(self, record):
189+
""" Emit overrides the abstract logging.Handler logRecord emit method
190+
191+
records the log
192+
193+
:param record: A class of type ```logging.LogRecord```
194+
:return: None
195+
"""
196+
rec = self.es_additional_fileds.copy()
197+
for k, v in record.__dict__.items():
198+
if not k in CMRESHandler.__LOGGING_FILTER_FIELDS:
199+
rec[k] = "" if v is None else v
200+
rec['timestamp'] = self.__get_es_datetime_str(record.created)
201+
202+
self._buffer.append(rec)
203+
if len(self._buffer) >= self.buffer_size:
204+
self.flush()
205+

0 commit comments

Comments
 (0)