-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
174 lines (151 loc) · 4.98 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"use strict";
// @ts-check
const { GetObjectCommand, S3Client } = require("@aws-sdk/client-s3");
const zlib = require("zlib");
const util = require("util");
const gunzip = util.promisify(zlib.gunzip);
const readline = require("readline");
const dgram = require("dgram");
const stream = require("stream");
const inspectorAddr = process.env["INSPECTOR_LOGSTREAM_LISTEN_ADDR"];
/**
* AWS Lambda handler
* @param { {awslogs?: {data: string}, Records: {s3?: {bucket: {name:string}, object: {key: string}}}[] } } event - The event object
* @param {Object} _context - The context object
* @param {Function} callback - The callback function
*/
exports.handler = async (event, _context, callback) => {
if (!inspectorAddr) {
const err = "missing INSPECTOR_LOGSTREAM_LISTEN_ADDR env variable";
console.log(err);
callback(err);
return;
}
console.log(`inspector addr: ${inspectorAddr}`);
const arr = inspectorAddr.split(":");
if (arr.length !== 2) {
callback(
`invalid inspector addr format: ${inspectorAddr}. Expected host:port`,
);
return;
}
const [inspectorHost, inspectorPortStr] = arr;
const inspectorPort = inspectorPortStr ? parseInt(inspectorPortStr, 10) : 0;
if (isNaN(inspectorPort)) {
callback(`invalid inspector port: ${inspectorPortStr}`);
return;
}
const client = dgram.createSocket("udp4");
const record = event.Records[0];
if (event.awslogs) {
console.log("awslogs event");
const payload = Buffer.from(event.awslogs.data, "base64"); // decode base64 to binary
const result = await gunzip(payload);
const parsedRequest = JSON.parse(result.toString("utf8"));
await /** @type {Promise<void>} */ (
new Promise((resolve) => {
for (let i = 0; i < parsedRequest.logEvents.length; i++) {
if (
parsedRequest.logEvents[i].message.length &&
parsedRequest.logEvents[i].message[0] === "#"
) {
continue;
}
const current = i;
const message = parsedRequest.logEvents[i].message.endsWith("\n")
? Buffer.from(parsedRequest.logEvents[i].message)
: Buffer.from(parsedRequest.logEvents[i].message + "\n");
client.send(
message,
0,
message.length,
inspectorPort,
inspectorHost,
(err) => {
if (err) console.error(err);
if (current === parsedRequest.logEvents.length - 1) {
client.close();
console.log(
`sent ${parsedRequest.logEvents.length} lines for inspection`,
);
callback(
null,
`sent ${parsedRequest.logEvents.length} lines for inspection`,
);
resolve();
}
},
);
}
})
);
} else if (!!record && !!record.s3) {
const bucket = record.s3.bucket.name;
console.log("S3 bucket: ", bucket);
const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, " "));
// Retrieve S3 Object
const s3Client = new S3Client();
const getObjectCommand = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
const response = await s3Client.send(getObjectCommand);
if (!response.Body) {
callback("no body in S3 object");
return;
}
/** @type {stream.Readable} */
let body;
// Check if body is a Blob and convert it to ReadableStream
if (response.Body instanceof Blob) {
const readableStream = response.Body.stream();
body = stream.Readable.from(readableStream);
} else if (response.Body instanceof stream.Readable) {
body = response.Body;
} else {
callback(
"Unexpected body type: response.Body is not a compatible stream type.",
);
return;
}
const lineReader = readline.createInterface({
input: body.pipe(zlib.createGunzip()),
});
let lineCount = 0;
let sentCount = 0;
let last = false;
await /** @type {Promise<void>} */ (
new Promise((resolve) => {
lineReader.on("line", (line) => {
if (line[0] !== "#") {
const message = Buffer.from(line + "\n");
++lineCount;
client.send(
message,
0,
message.length,
inspectorPort,
inspectorHost,
(err) => {
if (err) console.error(err);
++sentCount;
if (last && lineCount === sentCount) {
client.close();
console.log(`sent ${sentCount} lines for inspection`);
callback(null, `sent ${sentCount} lines for inspection`);
resolve();
}
},
);
}
});
lineReader.on("close", () => {
last = true;
console.log(`processed lines ${lineCount}`);
});
})
);
} else {
callback("unsupported even type");
}
};