@@ -37,7 +37,7 @@ class FluentSender {
3737 if ( this . _eventMode === 'Message' ) {
3838 this . _sendQueue = [ ] ; // queue for items waiting for being sent.
3939 this . _flushInterval = 0 ;
40- this . _messageQueueSizeLimit = options . messageQueueSizeLimit || 0 ;
40+ this . _messageQueueSizeLimit = options . messageQueueSizeLimit || 1000 ;
4141 } else {
4242 this . _sendQueue = new Map ( ) ;
4343 this . _flushInterval = options . flushInterval || 100 ;
@@ -110,10 +110,11 @@ class FluentSender {
110110 return ;
111111 }
112112
113- this . _push ( tag , timestamp , data , callback ) ;
113+ this . _push ( tag , timestamp , data ) ;
114114 this . _connect ( ( ) => {
115115 this . _flushSendQueue ( ) ;
116116 } ) ;
117+ callback ( )
117118 }
118119
119120 end ( label , data , callback ) {
@@ -182,11 +183,10 @@ class FluentSender {
182183 return msgpack . encode ( [ time , data ] , { codec : codec } ) ;
183184 }
184185
185- _push ( tag , time , data , callback ) {
186+ _push ( tag , time , data ) {
186187 if ( this . _eventMode === 'Message' ) {
187188 // Message mode
188189 const item = this . _makePacketItem ( tag , time , data ) ;
189- item . callback = callback ;
190190 if ( this . _messageQueueSizeLimit && this . _sendQueue . length === this . _messageQueueSizeLimit ) {
191191 this . _sendQueue . shift ( ) ;
192192 }
@@ -199,13 +199,10 @@ class FluentSender {
199199 const eventEntryData = this . _sendQueue . get ( tag ) ;
200200 eventEntryData . eventEntries . push ( eventEntry ) ;
201201 eventEntryData . size += eventEntry . length ;
202- if ( callback ) eventEntryData . callbacks . push ( callback ) ;
203202 } else {
204- const callbacks = callback ? [ callback ] : [ ] ;
205203 this . _sendQueue . set ( tag , {
206204 eventEntries : [ eventEntry ] ,
207205 size : eventEntry . length ,
208- callbacks : callbacks
209206 } ) ;
210207 }
211208 }
@@ -292,7 +289,7 @@ class FluentSender {
292289 this . _socket && this . _socket . destroy ( ) ;
293290 this . _socket = null ;
294291 this . _status = null ;
295- this . _connecting = false ;
292+ setTimeout ( ( ) => { this . _connecting = false } , this . reconnectInterval )
296293 }
297294
298295 _handshake ( callback ) {
@@ -375,7 +372,7 @@ class FluentSender {
375372 // nothing written;
376373 return ;
377374 }
378- this . _doWrite ( item . packet , item . options , timeoutId , [ item . callback ] ) ;
375+ this . _doWrite ( item . packet , item . options , timeoutId ) ;
379376 } else {
380377 if ( this . _sendQueue . size === 0 ) {
381378 this . _flushingSendQueue = false ;
@@ -398,11 +395,11 @@ class FluentSender {
398395 eventEntryDataSize : eventEntryData . size
399396 } ;
400397 const packet = msgpack . encode ( [ tag , entries , options ] , { codec : codec } ) ;
401- this . _doWrite ( packet , options , timeoutId , eventEntryData . callbacks ) ;
398+ this . _doWrite ( packet , options , timeoutId ) ;
402399 }
403400 }
404401
405- _doWrite ( packet , options , timeoutId , callbacks ) {
402+ _doWrite ( packet , options , timeoutId ) {
406403 const sendPacketSize = ( options && options . eventEntryDataSize ) || this . _sendQueueSize ;
407404 this . _socket . write ( packet , ( ) => {
408405 if ( this . requireAckResponse ) {
@@ -414,13 +411,7 @@ class FluentSender {
414411 'ack in response and chunk id in sent data are different' ,
415412 { ack : response . ack , chunk : options . chunk }
416413 ) ;
417- callbacks . forEach ( ( callback ) => {
418- this . _handleEvent ( 'error' , error , callback ) ;
419- } ) ;
420- } else { // no error on ack
421- callbacks . forEach ( ( callback ) => {
422- callback && callback ( ) ;
423- } ) ;
414+ this . _handleEvent ( 'error' , error ) ;
424415 }
425416 this . _sendQueueSize -= sendPacketSize ;
426417 process . nextTick ( ( ) => {
@@ -429,15 +420,10 @@ class FluentSender {
429420 } ) ;
430421 timeoutId = setTimeout ( ( ) => {
431422 const error = new FluentLoggerError . ResponseTimeout ( 'ack response timeout' ) ;
432- callbacks . forEach ( ( callback ) => {
433- this . _handleEvent ( 'error' , error , callback ) ;
434- } ) ;
423+ this . _handleEvent ( 'error' , error ) ;
435424 } , this . ackResponseTimeout ) ;
436425 } else {
437426 this . _sendQueueSize -= sendPacketSize ;
438- callbacks . forEach ( ( callback ) => {
439- callback && callback ( ) ;
440- } ) ;
441427 process . nextTick ( ( ) => {
442428 this . _waitToWrite ( ) ;
443429 } ) ;
0 commit comments