2
2
"""
3
3
4
4
import logging
5
- from enum import Enum
6
- from elasticsearch import helpers as eshelpers
7
- from elasticsearch import Elasticsearch , RequestsHttpConnection
8
- from requests_kerberos import HTTPKerberosAuth , DISABLED
9
5
import datetime
10
6
import socket
7
+ import copy
11
8
from threading import Timer
9
+ from enum import Enum
10
+ from elasticsearch import helpers as eshelpers
11
+ from elasticsearch import Elasticsearch , RequestsHttpConnection
12
+ try :
13
+ from requests_kerberos import HTTPKerberosAuth , DISABLED
14
+ CMR_KERBEROS_SUPPORTED = True
15
+ except ImportError :
16
+ CMR_KERBEROS_SUPPORTED = False
12
17
13
18
14
19
class CMRESHandler (logging .Handler ):
@@ -30,38 +35,94 @@ class AuthType(Enum):
30
35
BASIC_AUTH = 1
31
36
KERBEROS_AUTH = 2
32
37
38
+ class IndexNameFrequency (Enum ):
39
+ """ Index type supported
40
+ the handler supports
41
+ - Daily indices
42
+ - Weekly indices
43
+ - Monthly indices
44
+ - Year indices
45
+ """
46
+ DAILY = 0
47
+ WEEKLY = 1
48
+ MONTHLY = 2
49
+ YEARLY = 3
50
+
33
51
# Defaults for the class
34
- __DEFAULT_HOST = [{'host' : 'localhost' , 'port' : 9200 }]
52
+ __DEFAULT_ELASTICSEARCH_HOST = [{'host' : 'localhost' , 'port' : 9200 }]
35
53
__DEFAULT_AUTH_USER = ''
36
54
__DEFAULT_AUTH_PASSWD = ''
37
55
__DEFAULT_USE_SSL = False
38
56
__DEFAULT_VERIFY_SSL = True
39
57
__DEFAULT_AUTH_TYPE = AuthType .NO_AUTH
58
+ __DEFAULT_INDEX_FREQUENCY = IndexNameFrequency .DAILY
40
59
__DEFAULT_BUFFER_SIZE = 1000
41
- __DEFAULT_FLUSH_FREQUENCY_IN_SEC = 1
60
+ __DEFAULT_FLUSH_FREQ_INSEC = 1
42
61
__DEFAULT_ADDITIONAL_FIELDS = {}
43
62
__DEFAULT_ES_INDEX_NAME = 'python_logger'
44
63
__DEFAULT_ES_DOC_TYPE = 'python_log'
45
- __DEFAULT_RAISE_ON_INDEXING_EXCEPTIONS = False
64
+ __DEFAULT_RAISE_ON_EXCEPTION = False
46
65
__DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp"
47
66
48
67
__LOGGING_FILTER_FIELDS = ['msecs' ,
49
68
'relativeCreated' ,
50
69
'levelno' ,
51
70
'created' ]
52
71
72
+ @staticmethod
73
+ def _get_daily_index_name (es_index_name ):
74
+ """ Returns elasticearch index name
75
+ :param: index_name the prefix to be used in the index
76
+ :return: A srting containing the elasticsearch indexname used which should include the date.
77
+ """
78
+ return "{0!s}-{1!s}" .format (es_index_name , datetime .datetime .now ().strftime ('%Y.%m.%d' ))
79
+
80
+ @staticmethod
81
+ def _get_weekly_index_name (es_index_name ):
82
+ """ Return elasticsearch index name
83
+ :param: index_name the prefix to be used in the index
84
+ :return: A srting containing the elasticsearch indexname used which should include the date and specific week
85
+ """
86
+ current_date = datetime .datetime .now ()
87
+ start_of_the_week = current_date - datetime .timedelta (days = current_date .weekday ())
88
+ return "{0!s}-{1!s}" .format (es_index_name , start_of_the_week .strftime ('%Y.%m.%d' ))
89
+
90
+ @staticmethod
91
+ def _get_monthly_index_name (es_index_name ):
92
+ """ Return elasticsearch index name
93
+ :param: index_name the prefix to be used in the index
94
+ :return: A srting containing the elasticsearch indexname used which should include the date and specific moth
95
+ """
96
+ return "{0!s}-{1!s}" .format (es_index_name , datetime .datetime .now ().strftime ('%Y.%m' ))
97
+
98
+ @staticmethod
99
+ def _get_yearly_index_name (es_index_name ):
100
+ """ Return elasticsearch index name
101
+ :param: index_name the prefix to be used in the index
102
+ :return: A srting containing the elasticsearch indexname used which should include the date and specific year
103
+ """
104
+ return "{0!s}-{1!s}" .format (es_index_name , datetime .datetime .now ().strftime ('%Y' ))
105
+
106
+ _INDEX_FREQUENCY_FUNCION_DICT = {
107
+ IndexNameFrequency .DAILY : _get_daily_index_name ,
108
+ IndexNameFrequency .WEEKLY : _get_weekly_index_name ,
109
+ IndexNameFrequency .MONTHLY : _get_monthly_index_name ,
110
+ IndexNameFrequency .YEARLY : _get_yearly_index_name
111
+ }
112
+
53
113
def __init__ (self ,
54
- hosts = __DEFAULT_HOST ,
114
+ hosts = __DEFAULT_ELASTICSEARCH_HOST ,
55
115
auth_details = (__DEFAULT_AUTH_USER , __DEFAULT_AUTH_PASSWD ),
56
116
auth_type = __DEFAULT_AUTH_TYPE ,
57
117
use_ssl = __DEFAULT_USE_SSL ,
58
118
verify_ssl = __DEFAULT_VERIFY_SSL ,
59
119
buffer_size = __DEFAULT_BUFFER_SIZE ,
60
- flush_frequency_in_sec = __DEFAULT_FLUSH_FREQUENCY_IN_SEC ,
120
+ flush_frequency_in_sec = __DEFAULT_FLUSH_FREQ_INSEC ,
61
121
es_index_name = __DEFAULT_ES_INDEX_NAME ,
122
+ index_name_frequency = __DEFAULT_INDEX_FREQUENCY ,
62
123
es_doc_type = __DEFAULT_ES_DOC_TYPE ,
63
124
es_additional_fields = __DEFAULT_ADDITIONAL_FIELDS ,
64
- raise_on_indexing_exceptions = __DEFAULT_RAISE_ON_INDEXING_EXCEPTIONS ,
125
+ raise_on_indexing_exceptions = __DEFAULT_RAISE_ON_EXCEPTION ,
65
126
default_timestamp_field_name = __DEFAULT_TIMESTAMP_FIELD_NAME ):
66
127
""" Handler constructor
67
128
@@ -80,6 +141,10 @@ def __init__(self,
80
141
if the buffer_size has not been reached yet
81
142
:param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a
82
143
date with YYYY.MM.dd, ```python_logger``` used by default
144
+ :param index_name_frequency: Defines what the date used in the postfix of the name would be. available values
145
+ are selected from the IndexNameFrequency class (IndexNameFrequency.DAILY,
146
+ IndexNameFrequency.WEEKLY, IndexNameFrequency.MONTHLY, IndexNameFrequency.YEARLY). By default
147
+ it uses daily indices.
83
148
:param es_doc_type: A string with the name of the document type that will be used ```python_log``` used
84
149
by default
85
150
:param es_additional_fields: A dictionary with all the additional fields that you would like to add
@@ -90,25 +155,29 @@ def __init__(self,
90
155
"""
91
156
logging .Handler .__init__ (self )
92
157
93
- self .hosts = hosts
158
+ self .hosts = copy . deepcopy ( hosts )
94
159
self .auth_details = auth_details
95
160
self .auth_type = auth_type
96
161
self .use_ssl = use_ssl
97
162
self .verify_certs = verify_ssl
98
163
self .buffer_size = buffer_size
99
164
self .flush_frequency_in_sec = flush_frequency_in_sec
100
165
self .es_index_name = es_index_name
166
+ self .index_name_frequency = index_name_frequency
101
167
self .es_doc_type = es_doc_type
102
- self .es_additional_fields = es_additional_fields . copy ( )
168
+ self .es_additional_fields = copy . deepcopy ( es_additional_fields )
103
169
self .es_additional_fields .update ({'host' : socket .gethostname (),
104
170
'host_ip' : socket .gethostbyname (socket .gethostname ())})
105
171
self .raise_on_indexing_exceptions = raise_on_indexing_exceptions
106
172
self .default_timestamp_field_name = default_timestamp_field_name
107
173
174
+ self ._client = None
108
175
self ._buffer = []
109
176
self ._timer = None
110
177
self .__schedule_flush ()
111
178
179
+ self ._index_name_func = CMRESHandler ._INDEX_FREQUENCY_FUNCION_DICT [self .index_name_frequency ]
180
+
112
181
def __schedule_flush (self ):
113
182
if self ._timer is None :
114
183
self ._timer = Timer (self .flush_frequency_in_sec , self .flush )
@@ -117,22 +186,34 @@ def __schedule_flush(self):
117
186
118
187
def __get_es_client (self ):
119
188
if self .auth_type == CMRESHandler .AuthType .NO_AUTH :
120
- return Elasticsearch (hosts = self .hosts ,
121
- use_ssl = self .use_ssl ,
122
- verify_certs = self .verify_certs ,
123
- connection_class = RequestsHttpConnection )
124
- elif self .auth_type == CMRESHandler .AuthType .BASIC_AUTH :
125
- return Elasticsearch (hosts = self .hosts ,
126
- http_auth = self .auth_details ,
127
- use_ssl = self .use_ssl ,
128
- verify_certs = self .verify_certs ,
129
- connection_class = RequestsHttpConnection )
130
- elif self .auth_type == CMRESHandler .AuthType .KERBEROS_AUTH :
131
- return Elasticsearch (hosts = self .hosts ,
132
- use_ssl = self .use_ssl ,
133
- verify_certs = self .verify_certs ,
134
- connection_class = RequestsHttpConnection ,
135
- http_auth = HTTPKerberosAuth (mutual_authentication = DISABLED ))
189
+ if self ._client is None :
190
+ self ._client = Elasticsearch (hosts = self .hosts ,
191
+ use_ssl = self .use_ssl ,
192
+ verify_certs = self .verify_certs ,
193
+ connection_class = RequestsHttpConnection )
194
+ return self ._client
195
+
196
+ if self .auth_type == CMRESHandler .AuthType .BASIC_AUTH :
197
+ if self ._client is None :
198
+ return Elasticsearch (hosts = self .hosts ,
199
+ http_auth = self .auth_details ,
200
+ use_ssl = self .use_ssl ,
201
+ verify_certs = self .verify_certs ,
202
+ connection_class = RequestsHttpConnection )
203
+ return self ._client
204
+
205
+ if self .auth_type == CMRESHandler .AuthType .KERBEROS_AUTH :
206
+ if CMR_KERBEROS_SUPPORTED :
207
+ # For kerberos we return a new client each time to make sure the tokens are up to date
208
+ return Elasticsearch (hosts = self .hosts ,
209
+ use_ssl = self .use_ssl ,
210
+ verify_certs = self .verify_certs ,
211
+ connection_class = RequestsHttpConnection ,
212
+ http_auth = HTTPKerberosAuth (mutual_authentication = DISABLED ))
213
+ else :
214
+ raise EnvironmentError ("Kerberos module not available. Please install \" requests-kerberos\" " )
215
+
216
+ raise ValueError ("Authentication method not supported" )
136
217
137
218
def test_es_source (self ):
138
219
""" Returns True if the handler can ping the Elasticsearch servers
@@ -144,21 +225,15 @@ def test_es_source(self):
144
225
"""
145
226
return self .__get_es_client ().ping ()
146
227
147
- def __get_es_index_name (self ):
148
- """ Returns elasticearch index name
149
- :return: A srting containing the elasticsearch indexname used which should include the date.
150
- """
151
- return "{0!s}-{1!s}" .format (self .es_index_name , datetime .datetime .now ().strftime ('%Y.%m.%d' ))
152
-
153
228
@staticmethod
154
229
def __get_es_datetime_str (timestamp ):
155
230
""" Returns elasticsearch utc formatted time for an epoch timestamp
156
231
157
232
:param timestamp: epoch, including milliseconds
158
233
:return: A string valid for elasticsearch time record
159
234
"""
160
- t = datetime .datetime .utcfromtimestamp (timestamp )
161
- return "{0!s}.{1:03d}Z" .format (t .strftime ('%Y-%m-%dT%H:%M:%S' ), int (t .microsecond / 1000 ))
235
+ current_date = datetime .datetime .utcfromtimestamp (timestamp )
236
+ return "{0!s}.{1:03d}Z" .format (current_date .strftime ('%Y-%m-%dT%H:%M:%S' ), int (current_date .microsecond / 1000 ))
162
237
163
238
def flush (self ):
164
239
""" Flushes the buffer into ES
@@ -168,19 +243,25 @@ def flush(self):
168
243
self ._timer .cancel ()
169
244
self ._timer = None
170
245
171
- # FIXME: This should probably go on a different thread to speed up the execution
172
246
if len (self ._buffer ) >= 0 :
173
247
try :
174
- actions = map (lambda x : {'_index' : self .__get_es_index_name (),
175
- '_type' : self .es_doc_type ,
176
- '_source' : x },
177
- self ._buffer )
178
- eshelpers .bulk (client = self .__get_es_client (),
179
- actions = actions ,
180
- stats_only = True )
181
- except Exception as e :
248
+ actions = (
249
+ {
250
+ '_index' : self ._index_name_func .__func__ (self .es_index_name ),
251
+ '_type' : self .es_doc_type ,
252
+ '_source' : log_record
253
+ }
254
+ for log_record in self ._buffer
255
+ )
256
+
257
+ eshelpers .bulk (
258
+ client = self .__get_es_client (),
259
+ actions = actions ,
260
+ stats_only = True
261
+ )
262
+ except Exception as exception :
182
263
if self .raise_on_indexing_exceptions :
183
- raise e
264
+ raise exception
184
265
self ._buffer = []
185
266
186
267
self .__schedule_flush ()
@@ -203,9 +284,9 @@ def emit(self, record):
203
284
:return: None
204
285
"""
205
286
rec = self .es_additional_fields .copy ()
206
- for k , v in record .__dict__ .items ():
207
- if k not in CMRESHandler .__LOGGING_FILTER_FIELDS :
208
- rec [k ] = "" if v is None else v
287
+ for key , value in record .__dict__ .items ():
288
+ if key not in CMRESHandler .__LOGGING_FILTER_FIELDS :
289
+ rec [key ] = "" if value is None else value
209
290
rec [self .default_timestamp_field_name ] = self .__get_es_datetime_str (record .created )
210
291
211
292
self ._buffer .append (rec )
0 commit comments