Skip to content

Commit bbda10c

Browse files
authored
Merge pull request confluentinc#239 from confluentinc/utf8_memleak
Fix utf8 string conversion memory leak on Python 2 (confluentinc#198)
2 parents ae2878f + 194f49a commit bbda10c

File tree

3 files changed

+59
-20
lines changed

3 files changed

+59
-20
lines changed

confluent_kafka/src/Consumer.c

+6-3
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,17 @@ static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
123123
topics = rd_kafka_topic_partition_list_new((int)PyList_Size(tlist));
124124
for (pos = 0 ; pos < PyList_Size(tlist) ; pos++) {
125125
PyObject *o = PyList_GetItem(tlist, pos);
126-
PyObject *uo;
126+
PyObject *uo, *uo8;
127127
if (!(uo = cfl_PyObject_Unistr(o))) {
128128
PyErr_Format(PyExc_TypeError,
129129
"expected list of unicode strings");
130130
rd_kafka_topic_partition_list_destroy(topics);
131131
return NULL;
132132
}
133133
rd_kafka_topic_partition_list_add(topics,
134-
cfl_PyUnistr_AsUTF8(uo),
134+
cfl_PyUnistr_AsUTF8(uo, &uo8),
135135
RD_KAFKA_PARTITION_UA);
136+
Py_XDECREF(uo8);
136137
Py_DECREF(uo);
137138
}
138139

@@ -284,6 +285,7 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
284285
return NULL;
285286
} else if (msg) {
286287
Message *m;
288+
PyObject *uo8;
287289

288290
if (PyObject_Type((PyObject *)msg) !=
289291
(PyObject *)&MessageType) {
@@ -296,8 +298,9 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
296298

297299
c_offsets = rd_kafka_topic_partition_list_new(1);
298300
rd_kafka_topic_partition_list_add(
299-
c_offsets, cfl_PyUnistr_AsUTF8(m->topic),
301+
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
300302
m->partition)->offset =m->offset + 1;
303+
Py_XDECREF(uo8);
301304

302305
} else {
303306
c_offsets = NULL;

confluent_kafka/src/confluent_kafka.c

+40-15
Original file line numberDiff line numberDiff line change
@@ -664,19 +664,27 @@ static PyMemberDef TopicPartition_members[] = {
664664

665665

666666
static PyObject *TopicPartition_str0 (TopicPartition *self) {
667-
PyObject *errstr = self->error == Py_None ? NULL :
668-
cfl_PyObject_Unistr(self->error);
667+
PyObject *errstr = NULL;
668+
PyObject *errstr8 = NULL;
669+
const char *c_errstr = NULL;
669670
PyObject *ret;
670671
char offset_str[40];
672+
671673
snprintf(offset_str, sizeof(offset_str), "%"PRId64"", self->offset);
674+
675+
if (self->error != Py_None) {
676+
errstr = cfl_PyObject_Unistr(self->error);
677+
c_errstr = cfl_PyUnistr_AsUTF8(errstr, &errstr8);
678+
}
679+
672680
ret = cfl_PyUnistr(
673681
_FromFormat("TopicPartition{topic=%s,partition=%"PRId32
674682
",offset=%s,error=%s}",
675683
self->topic, self->partition,
676684
offset_str,
677-
errstr ? cfl_PyUnistr_AsUTF8(errstr) : "None"));
678-
if (errstr)
679-
Py_DECREF(errstr);
685+
c_errstr ? c_errstr : "None"));
686+
Py_XDECREF(errstr8);
687+
Py_XDECREF(errstr);
680688
return ret;
681689
}
682690

@@ -996,8 +1004,8 @@ static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what,
9961004
}
9971005

9981006
while (PyDict_Next(dict, &pos, &ko, &vo)) {
999-
PyObject *ks;
1000-
PyObject *vs;
1007+
PyObject *ks, *ks8;
1008+
PyObject *vs, *vs8;
10011009
const char *k;
10021010
const char *v;
10031011
char errstr[256];
@@ -1017,19 +1025,23 @@ static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what,
10171025
return -1;
10181026
}
10191027

1020-
k = cfl_PyUnistr_AsUTF8(ks);
1021-
v = cfl_PyUnistr_AsUTF8(vs);
1028+
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1029+
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
10221030

10231031
if (rd_kafka_topic_conf_set(tconf, k, v,
10241032
errstr, sizeof(errstr)) !=
10251033
RD_KAFKA_CONF_OK) {
10261034
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
10271035
"%s: %s", what, errstr);
1036+
Py_XDECREF(ks8);
1037+
Py_XDECREF(vs8);
10281038
Py_DECREF(ks);
10291039
Py_DECREF(vs);
10301040
return -1;
10311041
}
10321042

1043+
Py_XDECREF(ks8);
1044+
Py_XDECREF(vs8);
10331045
Py_DECREF(ks);
10341046
Py_DECREF(vs);
10351047
}
@@ -1070,7 +1082,8 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
10701082
if ((vs = cfl_PyObject_Unistr(valobj))) {
10711083
/* Use built-in C partitioners,
10721084
* based on their name. */
1073-
val = cfl_PyUnistr_AsUTF8(vs);
1085+
PyObject *vs8;
1086+
val = cfl_PyUnistr_AsUTF8(vs, &vs8);
10741087

10751088
if (!strcmp(val, "random"))
10761089
rd_kafka_topic_conf_set_partitioner_cb(
@@ -1087,10 +1100,12 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
10871100
"unknown builtin partitioner: %s "
10881101
"(available: random, consistent, consistent_random)",
10891102
val);
1103+
Py_XDECREF(vs8);
10901104
Py_DECREF(vs);
10911105
return -1;
10921106
}
10931107

1108+
Py_XDECREF(vs8);
10941109
Py_DECREF(vs);
10951110

10961111
} else {
@@ -1210,8 +1225,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12101225

12111226
/* Convert kwargs dict to config key-value pairs. */
12121227
while (PyDict_Next(kwargs, &pos, &ko, &vo)) {
1213-
PyObject *ks;
1214-
PyObject *vs = NULL;
1228+
PyObject *ks, *ks8;
1229+
PyObject *vs = NULL, *vs8 = NULL;
12151230
const char *k;
12161231
const char *v;
12171232
char errstr[256];
@@ -1226,15 +1241,15 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12261241
return NULL;
12271242
}
12281243

1229-
k = cfl_PyUnistr_AsUTF8(ks);
1244+
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
12301245
if (!strcmp(k, "default.topic.config")) {
12311246
if (populate_topic_conf(tconf, k, vo) == -1) {
12321247
Py_DECREF(ks);
12331248
rd_kafka_topic_conf_destroy(tconf);
12341249
rd_kafka_conf_destroy(conf);
12351250
return NULL;
12361251
}
1237-
1252+
Py_XDECREF(ks8);
12381253
Py_DECREF(ks);
12391254
continue;
12401255

@@ -1245,6 +1260,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12451260
"as a callable function");
12461261
rd_kafka_topic_conf_destroy(tconf);
12471262
rd_kafka_conf_destroy(conf);
1263+
Py_XDECREF(ks8);
12481264
Py_DECREF(ks);
12491265
return NULL;
12501266
}
@@ -1256,6 +1272,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12561272
h->error_cb = vo;
12571273
Py_INCREF(h->error_cb);
12581274
}
1275+
Py_XDECREF(ks8);
12591276
Py_DECREF(ks);
12601277
continue;
12611278
} else if (!strcmp(k, "stats_cb")) {
@@ -1265,6 +1282,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12651282
"as a callable function");
12661283
rd_kafka_topic_conf_destroy(tconf);
12671284
rd_kafka_conf_destroy(conf);
1285+
Py_XDECREF(ks8);
12681286
Py_DECREF(ks);
12691287
return NULL;
12701288
}
@@ -1277,6 +1295,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12771295
h->stats_cb = vo;
12781296
Py_INCREF(h->stats_cb);
12791297
}
1298+
Py_XDECREF(ks8);
12801299
Py_DECREF(ks);
12811300
continue;
12821301
}
@@ -1288,6 +1307,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
12881307
r = consumer_conf_set_special(h, conf, tconf, k, vo);
12891308
if (r == -1) {
12901309
/* Error */
1310+
Py_XDECREF(ks8);
12911311
Py_DECREF(ks);
12921312
rd_kafka_topic_conf_destroy(tconf);
12931313
rd_kafka_conf_destroy(conf);
@@ -1312,10 +1332,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
13121332
"unicode string");
13131333
rd_kafka_topic_conf_destroy(tconf);
13141334
rd_kafka_conf_destroy(conf);
1335+
Py_XDECREF(ks8);
13151336
Py_DECREF(ks);
13161337
return NULL;
13171338
}
1318-
v = cfl_PyUnistr_AsUTF8(vs);
1339+
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
13191340
}
13201341

13211342
if (rd_kafka_conf_set(conf, k, v, errstr, sizeof(errstr)) !=
@@ -1324,12 +1345,16 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
13241345
"%s", errstr);
13251346
rd_kafka_topic_conf_destroy(tconf);
13261347
rd_kafka_conf_destroy(conf);
1348+
Py_XDECREF(vs8);
13271349
Py_XDECREF(vs);
1350+
Py_XDECREF(ks8);
13281351
Py_DECREF(ks);
13291352
return NULL;
13301353
}
13311354

1355+
Py_XDECREF(vs8);
13321356
Py_XDECREF(vs);
1357+
Py_XDECREF(ks8);
13331358
Py_DECREF(ks);
13341359
}
13351360

confluent_kafka/src/confluent_kafka.h

+13-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,15 @@
7272

7373
/**
7474
* @returns Unicode Python object as char * in UTF-8 encoding
75+
* @param uobjp might be set to NULL or a new object reference (depending
76+
* on Python version) which needs to be cleaned up with
77+
* Py_XDECREF() after finished use of the returned string.
7578
*/
76-
#define cfl_PyUnistr_AsUTF8(X) PyUnicode_AsUTF8(X)
79+
static __inline const char *
80+
cfl_PyUnistr_AsUTF8 (PyObject *o, PyObject **uobjp) {
81+
*uobjp = NULL; /* No intermediary object needed in Py3 */
82+
return PyUnicode_AsUTF8(o);
83+
}
7784

7885
/**
7986
* @returns Unicode Python string object
@@ -85,7 +92,11 @@
8592
/* See comments above */
8693
#define cfl_PyBin(X) PyString ## X
8794
#define cfl_PyUnistr(X) PyUnicode ## X
88-
#define cfl_PyUnistr_AsUTF8(X) PyBytes_AsString(PyUnicode_AsUTF8String(X))
95+
static __inline const char *
96+
cfl_PyUnistr_AsUTF8 (PyObject *o, PyObject **uobjp) {
97+
*uobjp = PyUnicode_AsUTF8String(o); /*UTF8 intermediary object on Py2*/
98+
return PyBytes_AsString(*uobjp);
99+
}
89100
#define cfl_PyObject_Unistr(X) PyObject_Unicode(X)
90101
#endif
91102

0 commit comments

Comments
 (0)