@@ -5,6 +5,7 @@ const Q = require('q');
5
5
const logger = require ( 'cf-logs' ) . Logger ( 'codefresh:containerLogger' ) ;
6
6
const CFError = require ( 'cf-errors' ) ;
7
7
const LoggerStrategy = require ( './enums' ) . LoggerStrategy ;
8
+ const { Transform, pipeline } = require ( 'stream' ) ;
8
9
9
10
class ContainerLogger extends EventEmitter {
10
11
@@ -42,27 +43,25 @@ class ContainerLogger extends EventEmitter {
42
43
} )
43
44
. then ( ( [ stdout , stderr ] ) => {
44
45
logger . info ( `Attached stream to container: ${ this . containerId } ` ) ;
46
+
45
47
// Listening on the stream needs to be performed different depending if a tty is attached or not
46
48
// See documentation of the docker api here: https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
47
49
if ( this . tty ) {
48
- this . _handleTtyStream ( stdout , false ) ;
49
- if ( stderr ) {
50
- this . _handleTtyStream ( stderr , true ) ;
51
- }
52
- } else {
53
- this . _handleNonTtyStream ( stdout , false ) ;
54
- }
55
-
56
- stdout . on ( 'end' , ( ) => {
57
- this . stepFinished = true ;
58
- logger . info ( `stdout end event was fired for container: ${ this . containerId } ` ) ;
59
- } ) ;
60
50
61
- if ( stderr ) {
62
- stderr . on ( 'end' , ( ) => {
51
+ stdout . on ( 'end' , ( ) => {
63
52
this . stepFinished = true ;
64
- logger . info ( `stderr end event was fired for container: ${ this . containerId } ` ) ;
53
+ logger . info ( `stdout end event was fired for container: ${ this . containerId } ` ) ;
65
54
} ) ;
55
+
56
+ if ( this . stepLogger . opts && this . stepLogger . opts . logsRateLimitConfig ) {
57
+ logger . info ( `Found logger rate limit configuration, using streams api` ) ;
58
+ this . _streamTty ( stdout , stderr ) ;
59
+ return ;
60
+ }
61
+
62
+ this . _registerToTtyStreams ( stdout , stderr ) ;
63
+ } else {
64
+ this . _handleNonTtyStream ( stdout , false ) ;
66
65
}
67
66
} , ( err ) => {
68
67
return Q . reject ( new CFError ( {
@@ -99,6 +98,47 @@ class ContainerLogger extends EventEmitter {
99
98
] ) ;
100
99
}
101
100
101
+ _streamTty ( stdout , stderr ) {
102
+
103
+ pipeline ( stdout , this . _logSizeLimitStream ( ) , this . stepLogger . writeStream ( ) , ( err ) => {
104
+ if ( err ) {
105
+ logger . error ( `Stdout streams pipeline failed on: ${ err } ` ) ;
106
+ return ;
107
+ }
108
+ logger . info ( 'Stdout streams pipeline succeeded.' ) ;
109
+ } ) ;
110
+
111
+ if ( ! stderr ) {
112
+ return ;
113
+ }
114
+
115
+ pipeline ( stderr , this . _logSizeLimitStream ( ) , this . _errorTransformerStream ( ) , this . stepLogger . writeStream ( ) , ( err ) => {
116
+ if ( err ) {
117
+ logger . error ( `Stderr streams pipeline failed on: ${ err } ` ) ;
118
+ return ;
119
+ }
120
+ logger . info ( 'Stderr streams pipeline succeeded.' ) ;
121
+ } ) ;
122
+
123
+ stderr . once ( 'end' , ( ) => {
124
+ this . stepFinished = true ;
125
+ logger . info ( `stderr end event was fired for container: ${ this . containerId } ` ) ;
126
+ } ) ;
127
+
128
+ }
129
+
130
+ _registerToTtyStreams ( stdout , stderr ) {
131
+ this . _handleTtyStream ( stdout , false ) ;
132
+
133
+ if ( stderr ) {
134
+ stderr . once ( 'end' , ( ) => {
135
+ this . stepFinished = true ;
136
+ logger . info ( `stderr end event was fired for container: ${ this . containerId } ` ) ;
137
+ } ) ;
138
+ this . _handleTtyStream ( stderr , true ) ;
139
+ }
140
+ }
141
+
102
142
_handleTtyStream ( stream , isError ) {
103
143
stream . on ( 'data' , ( chunk ) => {
104
144
const buf = new Buffer ( chunk ) ;
@@ -150,6 +190,41 @@ class ContainerLogger extends EventEmitter {
150
190
this . emit ( 'message.logged' ) ;
151
191
}
152
192
193
+ _errorTransformerStream ( ) {
194
+ return new Transform ( {
195
+ transform : ( data , encoding , done ) => {
196
+ const message = `\x1B[31m${ data . toString ( 'utf8' ) } \x1B[0m` ;
197
+ done ( null , Buffer . from ( message ) ) ;
198
+ }
199
+ } ) ;
200
+ }
201
+
202
+ _logSizeLimitStream ( ) {
203
+ return new Transform ( {
204
+ transform : ( data , encoding , done ) => {
205
+ if ( this . logSizeLimit && ( this . _stepLogSizeExceeded ( ) || this . isWorkflowLogSizeExceeded ( ) ) ) {
206
+ if ( ! this . logExceededLimitsNotified ) {
207
+ this . logExceededLimitsNotified = true ;
208
+ const message = `\x1B[01;93mLog size exceeded for ${ this . _stepLogSizeExceeded ( ) ? 'this step' : 'the workflow' } .\nThe step will continue to execute until it finished but new logs will not be stored.\x1B[0m\r\n` ;
209
+ done ( null , Buffer . from ( message ) ) ;
210
+ return ;
211
+ }
212
+
213
+ done ( null , Buffer . alloc ( 0 ) ) ; // discard chunk
214
+ return ;
215
+ }
216
+
217
+ if ( this . logSizeLimit ) {
218
+ this . logSize += Buffer . byteLength ( data ) ;
219
+ this . stepLogger . setLogSize ( this . logSize ) ;
220
+ }
221
+
222
+ this . emit ( 'message.logged' ) ;
223
+ done ( null , data ) ;
224
+ }
225
+ } ) ;
226
+ }
227
+
153
228
}
154
229
155
230
module . exports = ContainerLogger ;
0 commit comments