5
5
import time
6
6
from urllib .parse import urlparse
7
7
8
- import psycopg as pg
9
- from psycopg import ClientCursor
10
- from psycopg .rows import dict_row
8
+ import psycopg2 as pg
9
+ from psycopg2 import extras as pgextras
11
10
12
11
from datadog_checks .base import AgentCheck , ConfigurationError , is_affirmative
13
12
from datadog_checks .pgbouncer .metrics import (
@@ -74,46 +73,47 @@ def _collect_stats(self, db):
74
73
metric_scope .append (SERVERS_METRICS )
75
74
76
75
try :
77
- for scope in metric_scope :
78
- descriptors = scope ['descriptors' ]
79
- metrics = scope ['metrics' ]
80
- query = scope ['query' ]
81
-
82
- try :
83
- cursor = db .cursor (row_factory = dict_row )
84
- self .log .debug ("Running query: %s" , query )
85
- cursor .execute (query )
86
- rows = self .iter_rows (cursor )
87
-
88
- except Exception as e :
89
- self .log .exception ("Not all metrics may be available: %s" , str (e ))
90
-
91
- else :
92
- for row in rows :
93
- if 'key' in row : # We are processing "config metrics"
94
- # Make a copy of the row to allow mutation
95
- row = row .copy ()
96
- # We flip/rotate the row: row value becomes the column name
97
- row [row ['key' ]] = row ['value' ]
98
- # Skip the "pgbouncer" database
99
- elif row .get ('database' ) == self .DB_NAME :
100
- continue
101
-
102
- tags = list (self .tags )
103
- tags += ["%s:%s" % (tag , row [column ]) for (column , tag ) in descriptors if column in row ]
104
- for column , (name , reporter ) in metrics :
105
- if column in row :
106
- value = row [column ]
107
- if column in ['connect_time' , 'request_time' ]:
108
- self .log .debug ("Parsing timestamp; original value: %s" , value )
109
- # First get rid of any UTC suffix.
110
- value = re .findall (r'^[^ ]+ [^ ]+' , value )[0 ]
111
- value = time .strptime (value , '%Y-%m-%d %H:%M:%S' )
112
- value = time .mktime (value )
113
- reporter (self , name , value , tags )
114
-
115
- if not rows :
116
- self .log .warning ("No results were found for query: %s" , query )
76
+ with db .cursor (cursor_factory = pgextras .DictCursor ) as cursor :
77
+ for scope in metric_scope :
78
+ descriptors = scope ['descriptors' ]
79
+ metrics = scope ['metrics' ]
80
+ query = scope ['query' ]
81
+
82
+ try :
83
+ self .log .debug ("Running query: %s" , query )
84
+ cursor .execute (query )
85
+ rows = self .iter_rows (cursor )
86
+
87
+ except Exception as e :
88
+ self .log .exception ("Not all metrics may be available: %s" , str (e ))
89
+
90
+ else :
91
+ for row in rows :
92
+ if 'key' in row : # We are processing "config metrics"
93
+ # Make a copy of the row to allow mutation
94
+ # (a `psycopg2.lib.extras.DictRow` object doesn't accept a new key)
95
+ row = row .copy ()
96
+ # We flip/rotate the row: row value becomes the column name
97
+ row [row ['key' ]] = row ['value' ]
98
+ # Skip the "pgbouncer" database
99
+ elif row .get ('database' ) == self .DB_NAME :
100
+ continue
101
+
102
+ tags = list (self .tags )
103
+ tags += ["%s:%s" % (tag , row [column ]) for (column , tag ) in descriptors if column in row ]
104
+ for column , (name , reporter ) in metrics :
105
+ if column in row :
106
+ value = row [column ]
107
+ if column in ['connect_time' , 'request_time' ]:
108
+ self .log .debug ("Parsing timestamp; original value: %s" , value )
109
+ # First get rid of any UTC suffix.
110
+ value = re .findall (r'^[^ ]+ [^ ]+' , value )[0 ]
111
+ value = time .strptime (value , '%Y-%m-%d %H:%M:%S' )
112
+ value = time .mktime (value )
113
+ reporter (self , name , value , tags )
114
+
115
+ if not rows :
116
+ self .log .warning ("No results were found for query: %s" , query )
117
117
118
118
except pg .Error :
119
119
self .log .exception ("Connection error" )
@@ -138,25 +138,21 @@ def iter_rows(self, cursor):
138
138
139
139
def _get_connect_kwargs (self ):
140
140
"""
141
- Get the params to pass to psycopg .connect() based on passed-in vals
141
+ Get the params to pass to psycopg2 .connect() based on passed-in vals
142
142
from yaml settings file
143
143
"""
144
- # It's important to set the client_encoding to utf-8
145
- # PGBouncer defaults to an encoding of 'UNICODE`, which will cause psycopg to error out
146
144
if self .database_url :
147
- return {'conninfo ' : self .database_url , 'client_encoding' : 'utf-8' }
145
+ return {'dsn ' : self .database_url }
148
146
149
147
if self .host in ('localhost' , '127.0.0.1' ) and self .password == '' :
150
148
# Use ident method
151
- return {'conninfo ' : "user={} dbname={} client_encoding=utf-8 " .format (self .user , self .DB_NAME )}
149
+ return {'dsn ' : "user={} dbname={}" .format (self .user , self .DB_NAME )}
152
150
153
151
args = {
154
152
'host' : self .host ,
155
153
'user' : self .user ,
156
154
'password' : self .password ,
157
- 'dbname' : self .DB_NAME ,
158
- 'cursor_factory' : ClientCursor ,
159
- 'client_encoding' : 'utf-8' ,
155
+ 'database' : self .DB_NAME ,
160
156
}
161
157
if self .port :
162
158
args ['port' ] = self .port
@@ -170,9 +166,8 @@ def _get_connection(self, use_cached=None):
170
166
return self .connection
171
167
try :
172
168
connect_kwargs = self ._get_connect_kwargs ()
173
- # Somewhat counterintuitively, we need to set autocommit to True to avoid a BEGIN/COMMIT block
174
- # https://www.psycopg.org/psycopg3/docs/basic/transactions.html#autocommit-transactions
175
- connection = pg .connect (** connect_kwargs , autocommit = True )
169
+ connection = pg .connect (** connect_kwargs )
170
+ connection .set_isolation_level (pg .extensions .ISOLATION_LEVEL_AUTOCOMMIT )
176
171
except Exception :
177
172
redacted_url = self ._get_redacted_dsn ()
178
173
message = u'Cannot establish connection to {}' .format (redacted_url )
@@ -185,10 +180,6 @@ def _get_connection(self, use_cached=None):
185
180
self .connection = connection
186
181
return connection
187
182
188
- def _close_connection (self ):
189
- self .connection .close ()
190
- self .connection = None
191
-
192
183
def _get_redacted_dsn (self ):
193
184
if not self .database_url :
194
185
return u'pgbouncer://%s:******@%s:%s/%s' % (self .user , self .host , self .port , self .DB_NAME )
@@ -209,8 +200,6 @@ def check(self, instance):
209
200
210
201
self .service_check (self .SERVICE_CHECK_NAME , AgentCheck .OK , tags = self ._get_service_checks_tags ())
211
202
self ._set_metadata ()
212
- # Avoid holding an open connection
213
- self ._close_connection ()
214
203
215
204
def _set_metadata (self ):
216
205
if self .is_metadata_collection_enabled ():
@@ -220,5 +209,14 @@ def _set_metadata(self):
220
209
221
210
def get_version (self ):
222
211
db = self ._get_connection ()
223
- version = pg .pq .version_pretty (db .connection .info .server_version )
224
- return version
212
+ regex = r'\d+\.\d+\.\d+'
213
+ with db .cursor (cursor_factory = pgextras .DictCursor ) as cursor :
214
+ cursor .execute ('SHOW VERSION;' )
215
+ if db .notices :
216
+ data = db .notices [0 ]
217
+ else :
218
+ data = cursor .fetchone ()[0 ]
219
+ res = re .findall (regex , data )
220
+ if res :
221
+ return res [0 ]
222
+ self .log .debug ("Couldn't detect version from %s" , data )
0 commit comments