5
5
import time
6
6
from urllib .parse import urlparse
7
7
8
- import psycopg2 as pg
9
- from psycopg2 import extras as pgextras
8
+ import psycopg as pg
9
+ from psycopg import ClientCursor
10
+ from psycopg .rows import dict_row
10
11
11
12
from datadog_checks .base import AgentCheck , ConfigurationError , is_affirmative
12
13
from datadog_checks .pgbouncer .metrics import (
@@ -73,47 +74,46 @@ def _collect_stats(self, db):
73
74
metric_scope .append (SERVERS_METRICS )
74
75
75
76
try :
76
- with db .cursor (cursor_factory = pgextras .DictCursor ) as cursor :
77
- for scope in metric_scope :
78
- descriptors = scope ['descriptors' ]
79
- metrics = scope ['metrics' ]
80
- query = scope ['query' ]
81
-
82
- try :
83
- self .log .debug ("Running query: %s" , query )
84
- cursor .execute (query )
85
- rows = self .iter_rows (cursor )
86
-
87
- except Exception as e :
88
- self .log .exception ("Not all metrics may be available: %s" , str (e ))
89
-
90
- else :
91
- for row in rows :
92
- if 'key' in row : # We are processing "config metrics"
93
- # Make a copy of the row to allow mutation
94
- # (a `psycopg2.lib.extras.DictRow` object doesn't accept a new key)
95
- row = row .copy ()
96
- # We flip/rotate the row: row value becomes the column name
97
- row [row ['key' ]] = row ['value' ]
98
- # Skip the "pgbouncer" database
99
- elif row .get ('database' ) == self .DB_NAME :
100
- continue
101
-
102
- tags = list (self .tags )
103
- tags += ["%s:%s" % (tag , row [column ]) for (column , tag ) in descriptors if column in row ]
104
- for column , (name , reporter ) in metrics :
105
- if column in row :
106
- value = row [column ]
107
- if column in ['connect_time' , 'request_time' ]:
108
- self .log .debug ("Parsing timestamp; original value: %s" , value )
109
- # First get rid of any UTC suffix.
110
- value = re .findall (r'^[^ ]+ [^ ]+' , value )[0 ]
111
- value = time .strptime (value , '%Y-%m-%d %H:%M:%S' )
112
- value = time .mktime (value )
113
- reporter (self , name , value , tags )
114
-
115
- if not rows :
116
- self .log .warning ("No results were found for query: %s" , query )
77
+ for scope in metric_scope :
78
+ descriptors = scope ['descriptors' ]
79
+ metrics = scope ['metrics' ]
80
+ query = scope ['query' ]
81
+
82
+ try :
83
+ cursor = db .cursor (row_factory = dict_row )
84
+ self .log .debug ("Running query: %s" , query )
85
+ cursor .execute (query )
86
+ rows = self .iter_rows (cursor )
87
+
88
+ except Exception as e :
89
+ self .log .exception ("Not all metrics may be available: %s" , str (e ))
90
+
91
+ else :
92
+ for row in rows :
93
+ if 'key' in row : # We are processing "config metrics"
94
+ # Make a copy of the row to allow mutation
95
+ row = row .copy ()
96
+ # We flip/rotate the row: row value becomes the column name
97
+ row [row ['key' ]] = row ['value' ]
98
+ # Skip the "pgbouncer" database
99
+ elif row .get ('database' ) == self .DB_NAME :
100
+ continue
101
+
102
+ tags = list (self .tags )
103
+ tags += ["%s:%s" % (tag , row [column ]) for (column , tag ) in descriptors if column in row ]
104
+ for column , (name , reporter ) in metrics :
105
+ if column in row :
106
+ value = row [column ]
107
+ if column in ['connect_time' , 'request_time' ]:
108
+ self .log .debug ("Parsing timestamp; original value: %s" , value )
109
+ # First get rid of any UTC suffix.
110
+ value = re .findall (r'^[^ ]+ [^ ]+' , value )[0 ]
111
+ value = time .strptime (value , '%Y-%m-%d %H:%M:%S' )
112
+ value = time .mktime (value )
113
+ reporter (self , name , value , tags )
114
+
115
+ if not rows :
116
+ self .log .warning ("No results were found for query: %s" , query )
117
117
118
118
except pg .Error :
119
119
self .log .exception ("Connection error" )
@@ -138,21 +138,25 @@ def iter_rows(self, cursor):
138
138
139
139
def _get_connect_kwargs (self ):
140
140
"""
141
- Get the params to pass to psycopg2 .connect() based on passed-in vals
141
+ Get the params to pass to psycopg .connect() based on passed-in vals
142
142
from yaml settings file
143
143
"""
144
+ # It's important to set the client_encoding to utf-8
145
+ # PGBouncer defaults to an encoding of 'UNICODE`, which will cause psycopg to error out
144
146
if self .database_url :
145
- return {'dsn ' : self .database_url }
147
+ return {'conninfo ' : self .database_url , 'client_encoding' : 'utf-8' }
146
148
147
149
if self .host in ('localhost' , '127.0.0.1' ) and self .password == '' :
148
150
# Use ident method
149
- return {'dsn ' : "user={} dbname={}" .format (self .user , self .DB_NAME )}
151
+ return {'conninfo ' : "user={} dbname={} client_encoding=utf-8 " .format (self .user , self .DB_NAME )}
150
152
151
153
args = {
152
154
'host' : self .host ,
153
155
'user' : self .user ,
154
156
'password' : self .password ,
155
- 'database' : self .DB_NAME ,
157
+ 'dbname' : self .DB_NAME ,
158
+ 'cursor_factory' : ClientCursor ,
159
+ 'client_encoding' : 'utf-8' ,
156
160
}
157
161
if self .port :
158
162
args ['port' ] = self .port
@@ -166,8 +170,9 @@ def _get_connection(self, use_cached=None):
166
170
return self .connection
167
171
try :
168
172
connect_kwargs = self ._get_connect_kwargs ()
169
- connection = pg .connect (** connect_kwargs )
170
- connection .set_isolation_level (pg .extensions .ISOLATION_LEVEL_AUTOCOMMIT )
173
+ # Somewhat counterintuitively, we need to set autocommit to True to avoid a BEGIN/COMMIT block
174
+ # https://www.psycopg.org/psycopg3/docs/basic/transactions.html#autocommit-transactions
175
+ connection = pg .connect (** connect_kwargs , autocommit = True )
171
176
except Exception :
172
177
redacted_url = self ._get_redacted_dsn ()
173
178
message = u'Cannot establish connection to {}' .format (redacted_url )
@@ -180,6 +185,10 @@ def _get_connection(self, use_cached=None):
180
185
self .connection = connection
181
186
return connection
182
187
188
+ def _close_connection (self ):
189
+ self .connection .close ()
190
+ self .connection = None
191
+
183
192
def _get_redacted_dsn (self ):
184
193
if not self .database_url :
185
194
return u'pgbouncer://%s:******@%s:%s/%s' % (self .user , self .host , self .port , self .DB_NAME )
@@ -200,6 +209,8 @@ def check(self, instance):
200
209
201
210
self .service_check (self .SERVICE_CHECK_NAME , AgentCheck .OK , tags = self ._get_service_checks_tags ())
202
211
self ._set_metadata ()
212
+ # Avoid holding an open connection
213
+ self ._close_connection ()
203
214
204
215
def _set_metadata (self ):
205
216
if self .is_metadata_collection_enabled ():
@@ -209,14 +220,5 @@ def _set_metadata(self):
209
220
210
221
def get_version (self ):
211
222
db = self ._get_connection ()
212
- regex = r'\d+\.\d+\.\d+'
213
- with db .cursor (cursor_factory = pgextras .DictCursor ) as cursor :
214
- cursor .execute ('SHOW VERSION;' )
215
- if db .notices :
216
- data = db .notices [0 ]
217
- else :
218
- data = cursor .fetchone ()[0 ]
219
- res = re .findall (regex , data )
220
- if res :
221
- return res [0 ]
222
- self .log .debug ("Couldn't detect version from %s" , data )
223
+ version = pg .pq .version_pretty (db .connection .info .server_version )
224
+ return version
0 commit comments