Skip to content

Commit e777bc5

Browse files
use bigger buffer (#110)
* use bigger buffer * added buffers size * import new task logger * improved log size calculation * fixing stream updates * fix setLogSize of undefined * added flush time limit to stream * reduced buffer to 2MiB and reduced print info interval * update task logger * fixed tests
1 parent 764aac8 commit e777bc5

10 files changed

+270
-245
lines changed

Dockerfile

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ FROM node:16.18.1-bullseye-slim
22

33
WORKDIR /root/cf-runtime
44

5-
RUN apt-get update && apt upgrade -y
5+
RUN apt-get update && apt upgrade -y && \
6+
apt-get install g++ git make python3 -y
67

78
COPY package.json yarn.lock ./
89

910
# install cf-runtime required binaries
10-
RUN apt-get install g++ git make python3 -y && \
11-
yarn install --frozen-lockfile --production && \
11+
RUN yarn install --frozen-lockfile --production && \
1212
yarn cache clean && \
13-
apt-get purge g++ git make python3 -y && \
13+
apt-get purge g++ git make python3 -y && \
1414
apt-get autoremove -y && \
1515
apt-get clean -y && \
1616
rm -rf /tmp/* && \

lib/ContainerLogger.js

+21-14
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ const promiseRetry = require('promise-retry');
44
const logger = require('cf-logs').Logger('codefresh:containerLogger');
55
const CFError = require('cf-errors');
66
const { Transform } = require('stream');
7+
const _ = require('lodash');
78
const { LoggerStrategy } = require('./enums');
89

910
const CONTAINER_START_RETRY_TIMEOUT_SECONDS = 1;
1011
const CONTAINER_START_RETRY_LIMIT = 10;
12+
const BUFFER_SIZE = 2 * 1024 * 1024; // 2 MiB
1113

1214
class ContainerLogger extends EventEmitter {
1315

@@ -27,6 +29,7 @@ class ContainerLogger extends EventEmitter {
2729
this.tty = false;
2830
this.logSizeLimit = logSizeLimit;
2931
this.logSize = 0;
32+
this.bufferUsed = 0.0;
3033
this.isWorkflowLogSizeExceeded = isWorkflowLogSizeExceeded;
3134
this.stepFinished = false;
3235
this.finishedStreams = 0;
@@ -222,12 +225,12 @@ class ContainerLogger extends EventEmitter {
222225
}
223226

224227
this.stepLogger.write(message);
225-
228+
const curLogSize = Buffer.byteLength(message);
226229
if (this.logSizeLimit) {
227-
this.logSize += Buffer.byteLength(message);
230+
this.logSize += curLogSize;
228231
this.stepLogger.setLogSize(this.logSize);
229232
}
230-
this.emit('message.logged');
233+
this.emit('message.logged', curLogSize);
231234
}
232235

233236
_errorTransformerStream() {
@@ -240,13 +243,14 @@ class ContainerLogger extends EventEmitter {
240243
}
241244

242245
_logSizeLimitStream() {
246+
const self = this;
243247
return new Transform({
244-
transform: (data, encoding, done) => {
245-
if (this.logSizeLimit && (this._stepLogSizeExceeded() || this.isWorkflowLogSizeExceeded())) {
246-
if (!this.logExceededLimitsNotified) {
247-
this.logExceededLimitsNotified = true;
248-
const message = `\x1B[01;93mLog size exceeded for ${this._stepLogSizeExceeded()
249-
? 'this step'
248+
transform(data, encoding, done) {
249+
if (self.logSizeLimit && (self._stepLogSizeExceeded() || self.isWorkflowLogSizeExceeded())) {
250+
if (!self.logExceededLimitsNotified) {
251+
self.logExceededLimitsNotified = true;
252+
const message = `\x1B[01;93mLog size exceeded for ${self._stepLogSizeExceeded()
253+
? 'self step'
250254
: 'the workflow'}.\nThe step will continue to execute until it finished but new logs will not be stored.\x1B[0m\r\n`;
251255
done(null, Buffer.from(message));
252256
return;
@@ -256,14 +260,17 @@ class ContainerLogger extends EventEmitter {
256260
return;
257261
}
258262

259-
if (this.logSizeLimit) {
260-
this.logSize += Buffer.byteLength(data);
261-
this.stepLogger.setLogSize(this.logSize);
263+
const curLogSize = Buffer.byteLength(data);
264+
if (self.logSizeLimit) {
265+
self.logSize += curLogSize;
266+
self.stepLogger.setLogSize(self.logSize);
262267
}
263268

264-
this.emit('message.logged');
269+
self.emit('message.logged', curLogSize);
270+
self.bufferUsed = _.get(this, '_readableState.length') / BUFFER_SIZE;
265271
done(null, data);
266-
}
272+
},
273+
highWaterMark: BUFFER_SIZE,
267274
});
268275
}
269276

lib/enums.js

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const ContainerHandlingStatus = {
1010
INITIALIZING: 'initializing',
1111
LISTENING: 'listening',
1212
WAITING_FOR_START: 'waitingForStart',
13+
FINISHED: 'finished',
1314
};
1415

1516
const BuildFinishedSignalFilename = 'build_finished';

lib/isReady.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ const { ContainerHandlingStatus } = require('./enums');
55
function isContainerReady(state, containerId) {
66
console.log(`checking if container ${containerId} is ready`);
77
const containerState = _.get(state, `containers[${containerId}]`, {});
8-
const isReady = [ContainerHandlingStatus.LISTENING, ContainerHandlingStatus.WAITING_FOR_START].includes(containerState.status);
8+
const isReady = [
9+
ContainerHandlingStatus.LISTENING,
10+
ContainerHandlingStatus.WAITING_FOR_START,
11+
ContainerHandlingStatus.FINISHED,
12+
].includes(containerState.status);
913
console.log(`container ${containerId} is: ${isReady ? 'READY' : 'NOT READY'}`);
1014
return isReady;
1115
}

lib/logger.js

+17-15
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class Logger {
3333
this.findExistingContainers = findExistingContainers === 'true';
3434
this.logSizeLimit = logSizeLimit;
3535
this.containerLoggers = [];
36-
this.logSize = 0;
36+
this.totalLogSize = 0;
3737
this.taskLogger = undefined;
3838
this.buildFinishedPromise = buildFinishedPromise || Q.resolve();
3939
this.finishedContainers = 0;
@@ -54,7 +54,7 @@ class Logger {
5454
});
5555
this._readState();
5656
this._handleBuildFinished();
57-
this._updateStateInterval = setInterval(this._updateStateFile.bind(this), 1000);
57+
this._updateStateInterval = setInterval(this._updateStateFile.bind(this), 3000);
5858
}
5959

6060
/**
@@ -173,13 +173,7 @@ class Logger {
173173
logLimitExceeded() {
174174
// TODO in the future when we allow a workflow to use multuple dinds, this will not be correct
175175
// we need to get the total size of logs from all dinds
176-
return this.logSizeLimit && this._getTotalLogSize() > this.logSizeLimit;
177-
}
178-
179-
_getTotalLogSize() {
180-
return _.reduce(this.containerLoggers, (sum, containerLogger) => {
181-
return sum + containerLogger.logSize;
182-
}, 0);
176+
return this.logSizeLimit && this.totalLogSize > this.logSizeLimit;
183177
}
184178

185179
/**
@@ -263,7 +257,7 @@ class Logger {
263257
});
264258
this.containerLoggers.push(containerLogger);
265259
containerLogger.on('message.logged', this._updateTotalLogSize.bind(this));
266-
containerLogger.once('end', this._handleContainerStreamEnd.bind(this));
260+
containerLogger.once('end', this._handleContainerStreamEnd.bind(this, containerId));
267261

268262
containerLogger.start()
269263
.done(() => {
@@ -287,9 +281,9 @@ class Logger {
287281
_.set(this, 'state.logsStatus.missingLogs', writeCalls - resolvedCalls - rejectedCalls);
288282
}
289283

290-
_updateTotalLogSize() {
291-
this.logSize = this._getTotalLogSize();
292-
this.taskLogger.setLogSize(this.logSize);
284+
_updateTotalLogSize(size) {
285+
this.totalLogSize += size;
286+
this.taskLogger.setLogSize(this.totalLogSize);
293287
}
294288

295289
_updateLastLoggingDate() {
@@ -301,9 +295,16 @@ class Logger {
301295
clearInterval(this._updateStateInterval);
302296
} else {
303297
this._writeNewState(true);
304-
305298
if (this.showProgress) {
299+
const activeContainerLoggers = this.containerLoggers.filter(
300+
(containerLogger) => this.state.containers[containerLogger.containerId].status === ContainerHandlingStatus.LISTENING
301+
);
302+
const buffers = activeContainerLoggers.reduce((acc, cur) => {
303+
acc[cur.containerId] = cur.bufferUsed;
304+
return acc;
305+
}, {});
306306
logger.debug(`logger progress update: ${JSON.stringify(this.state.logsStatus)}`);
307+
logger.debug(`buffers: ${JSON.stringify(buffers)}`);
307308
}
308309
}
309310
}
@@ -376,9 +377,10 @@ class Logger {
376377
});
377378
}
378379

379-
_handleContainerStreamEnd() {
380+
_handleContainerStreamEnd(containerId) {
380381
this.finishedContainers++;
381382
this.finishedContainersEmitter.emit('end');
383+
this.state.containers[containerId] = { status: ContainerHandlingStatus.FINISHED };
382384
}
383385

384386
// do not call before build is finished

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"redis": "^3.1.1"
1111
},
1212
"dependencies": {
13-
"@codefresh-io/task-logger": "^1.9.24",
13+
"@codefresh-io/task-logger": "^1.10.0",
1414
"body-parser": "^1.19.0",
1515
"cf-errors": "^0.1.16",
1616
"cf-logs": "^1.1.25",
@@ -52,4 +52,4 @@
5252
"start": "node lib/index.js",
5353
"version": "exit 0"
5454
}
55-
}
55+
}

service.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version: 1.9.0
1+
version: 1.10.0

test/isReady.unit.spec.js

+10
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ describe('isReady script', () => {
6060
});
6161
expect(process.exit).to.have.been.calledOnceWith(0);
6262
});
63+
it('Should check exit with 0 code if container is finished status', () => {
64+
const state = JSON.stringify({ status: 'ready', containers: { 'container-id': { status: ContainerHandlingStatus.FINISHED } } })
65+
process.argv = ['foo', 'bar', 'container-id'];
66+
proxyquire('../lib/isReady.js', {
67+
'fs': {
68+
readFileSync: () => Buffer.from(state),
69+
},
70+
});
71+
expect(process.exit).to.have.been.calledOnceWith(0);
72+
});
6373
it('Should check exit with 1 code if container is not ready', () => {
6474
const state = JSON.stringify({ status: 'ready', containers: { 'container-id': { status: ContainerHandlingStatus.INITIALIZING } } })
6575
process.argv = ['foo', 'bar', 'container-id'];

0 commit comments

Comments
 (0)