@@ -105,26 +105,19 @@ function Producer(conf, topicConf) {
105
105
}
106
106
107
107
/**
108
- * Create a topic if it does not exist. Otherwise, create it
108
+ * Create a topic if it does not exist. Otherwise, get the cached one
109
109
*
110
110
* @private
111
111
*/
112
- function maybeTopic ( name , config ) {
112
+ function maybeTopic ( name ) {
113
113
// create or get topic
114
114
var topic ;
115
115
116
116
if ( typeof ( name ) === 'string' ) {
117
-
118
- // If config is an empty object, reuse the ones we have in cache
119
- if ( config ) {
120
- topic = this . Topic ( name , config ) ;
117
+ if ( this . createdTopics [ name ] ) {
118
+ topic = this . createdTopics [ name ] ;
121
119
} else {
122
- // if config is an empty object
123
- if ( this . createdTopics [ name ] ) {
124
- topic = this . createdTopics [ name ] ;
125
- } else {
126
- topic = this . createdTopics [ name ] = this . Topic ( name , { } ) ;
127
- }
120
+ topic = this . createdTopics [ name ] = this . Topic ( name , { } ) ;
128
121
}
129
122
130
123
return topic ;
@@ -216,7 +209,7 @@ Producer.prototype.produce = function(topic, partition, message, key) {
216
209
throw new Error ( 'Producer not connected' ) ;
217
210
}
218
211
219
- if ( typeof topic === 'object' ) {
212
+ if ( typeof topic === 'object' && ! ( topic instanceof Kafka . Topic ) ) {
220
213
return this . _produceObject ( topic , partition ) ;
221
214
}
222
215
@@ -229,7 +222,7 @@ Producer.prototype.produce = function(topic, partition, message, key) {
229
222
230
223
partition = partition == null ? this . defaultPartition : partition ;
231
224
232
- topic = maybeTopic . call ( this , topic , false ) ;
225
+ topic = maybeTopic . call ( this , topic ) ;
233
226
234
227
return this . _errorWrap (
235
228
this . _client . produce ( topic , partition , message , key || null ) ) ;
0 commit comments