1
1
"use strict" ;
2
2
3
+ // @ts -check
4
+
3
5
const { GetObjectCommand, S3Client } = require ( "@aws-sdk/client-s3" ) ;
4
6
const zlib = require ( "zlib" ) ;
5
7
const util = require ( "util" ) ;
6
8
const gunzip = util . promisify ( zlib . gunzip ) ;
7
9
const readline = require ( "readline" ) ;
8
10
const dgram = require ( "dgram" ) ;
11
+ const stream = require ( "stream" ) ;
9
12
10
- const inspectorAddr = process . env . INSPECTOR_LOGSTREAM_LISTEN_ADDR ;
13
+ const inspectorAddr = process . env [ " INSPECTOR_LOGSTREAM_LISTEN_ADDR" ] ;
11
14
12
- exports . handler = async ( event , context , callback ) => {
15
+ /**
16
+ * AWS Lambda handler
17
+ * @param { {awslogs?: {data: string}, Records: {s3?: {bucket: {name:string}, object: {key: string}}}[] } } event - The event object
18
+ * @param {Object } _context - The context object
19
+ * @param {Function } callback - The callback function
20
+ */
21
+ exports . handler = async ( event , _context , callback ) => {
13
22
if ( ! inspectorAddr ) {
14
23
const err = "missing INSPECTOR_LOGSTREAM_LISTEN_ADDR env variable" ;
15
24
console . log ( err ) ;
@@ -25,59 +34,67 @@ exports.handler = async (event, context, callback) => {
25
34
return ;
26
35
}
27
36
28
- const [ inspectorHost , inspectorPort ] = arr ;
37
+ const [ inspectorHost , inspectorPortStr ] = arr ;
38
+ const inspectorPort = inspectorPortStr ? parseInt ( inspectorPortStr , 10 ) : 0 ;
39
+ if ( isNaN ( inspectorPort ) ) {
40
+ callback ( `invalid inspector port: ${ inspectorPortStr } ` ) ;
41
+ return ;
42
+ }
43
+
29
44
const client = dgram . createSocket ( "udp4" ) ;
30
45
46
+ const record = event . Records [ 0 ] ;
47
+
31
48
if ( event . awslogs ) {
32
49
console . log ( "awslogs event" ) ;
33
- const payload = new Buffer . from ( event . awslogs . data , "base64" ) ; // decode base64 to binary
50
+ const payload = Buffer . from ( event . awslogs . data , "base64" ) ; // decode base64 to binary
34
51
const result = await gunzip ( payload ) ;
35
52
const parsedRequest = JSON . parse ( result . toString ( "utf8" ) ) ;
36
53
37
- await new Promise ( ( resolve ) => {
38
- for ( let i = 0 ; i < parsedRequest . logEvents . length ; i ++ ) {
39
- if (
40
- parsedRequest . logEvents [ i ] . message . length &&
41
- parsedRequest . logEvents [ i ] . message [ 0 ] === "#"
42
- ) {
43
- continue ;
44
- }
54
+ await /** @type {Promise<void> } */ (
55
+ new Promise ( ( resolve ) => {
56
+ for ( let i = 0 ; i < parsedRequest . logEvents . length ; i ++ ) {
57
+ if (
58
+ parsedRequest . logEvents [ i ] . message . length &&
59
+ parsedRequest . logEvents [ i ] . message [ 0 ] === "#"
60
+ ) {
61
+ continue ;
62
+ }
63
+
64
+ const current = i ;
65
+
66
+ const message = parsedRequest . logEvents [ i ] . message . endsWith ( "\n" )
67
+ ? Buffer . from ( parsedRequest . logEvents [ i ] . message )
68
+ : Buffer . from ( parsedRequest . logEvents [ i ] . message + "\n" ) ;
69
+ client . send (
70
+ message ,
71
+ 0 ,
72
+ message . length ,
73
+ inspectorPort ,
74
+ inspectorHost ,
75
+ ( err ) => {
76
+ if ( err ) console . error ( err ) ;
45
77
46
- const current = i ;
47
-
48
- const message = parsedRequest . logEvents [ i ] . message . endsWith ( "\n" )
49
- ? Buffer . from ( parsedRequest . logEvents [ i ] . message )
50
- : Buffer . from ( parsedRequest . logEvents [ i ] . message + "\n" ) ;
51
- client . send (
52
- message ,
53
- 0 ,
54
- message . length ,
55
- inspectorPort ,
56
- inspectorHost ,
57
- ( err ) => {
58
- if ( err ) console . error ( err ) ;
59
-
60
- if ( current === parsedRequest . logEvents . length - 1 ) {
61
- client . close ( ) ;
62
- console . log (
63
- `sent ${ parsedRequest . logEvents . length } lines for inspection` ,
64
- ) ;
65
- callback (
66
- null ,
67
- `sent ${ parsedRequest . logEvents . length } lines for inspection` ,
68
- ) ;
69
- resolve ( ) ;
70
- }
71
- } ,
72
- ) ;
73
- }
74
- } ) ;
75
- } else if ( event . Records [ 0 ] . s3 ) {
76
- const bucket = event . Records [ 0 ] . s3 . bucket . name ;
77
- console . log ( "S3 bucket: " , bucket ) ;
78
- const key = decodeURIComponent (
79
- event . Records [ 0 ] . s3 . object . key . replace ( / \+ / g, " " ) ,
78
+ if ( current === parsedRequest . logEvents . length - 1 ) {
79
+ client . close ( ) ;
80
+ console . log (
81
+ `sent ${ parsedRequest . logEvents . length } lines for inspection` ,
82
+ ) ;
83
+ callback (
84
+ null ,
85
+ `sent ${ parsedRequest . logEvents . length } lines for inspection` ,
86
+ ) ;
87
+ resolve ( ) ;
88
+ }
89
+ } ,
90
+ ) ;
91
+ }
92
+ } )
80
93
) ;
94
+ } else if ( ! ! record && ! ! record . s3 ) {
95
+ const bucket = record . s3 . bucket . name ;
96
+ console . log ( "S3 bucket: " , bucket ) ;
97
+ const key = decodeURIComponent ( record . s3 . object . key . replace ( / \+ / g, " " ) ) ;
81
98
82
99
// Retrieve S3 Object
83
100
const s3Client = new S3Client ( ) ;
@@ -87,47 +104,70 @@ exports.handler = async (event, context, callback) => {
87
104
} ) ;
88
105
89
106
const response = await s3Client . send ( getObjectCommand ) ;
107
+ if ( ! response . Body ) {
108
+ callback ( "no body in S3 object" ) ;
109
+ return ;
110
+ }
111
+
112
+ /** @type {stream.Readable } */
113
+ let body ;
114
+
115
+ // Check if body is a Blob and convert it to ReadableStream
116
+ if ( response . Body instanceof Blob ) {
117
+ const readableStream = response . Body . stream ( ) ;
118
+ body = stream . Readable . from ( readableStream ) ;
119
+ } else if ( response . Body instanceof stream . Readable ) {
120
+ body = response . Body ;
121
+ } else {
122
+ callback (
123
+ "Unexpected body type: response.Body is not a compatible stream type." ,
124
+ ) ;
125
+ return ;
126
+ }
127
+
90
128
const lineReader = readline . createInterface ( {
91
- input : response . Body . pipe ( zlib . createGunzip ( ) ) ,
129
+ input : body . pipe ( zlib . createGunzip ( ) ) ,
92
130
} ) ;
93
131
94
132
let lineCount = 0 ;
95
133
let sentCount = 0 ;
96
134
let last = false ;
97
135
98
- await new Promise ( ( resolve ) => {
99
- lineReader . on ( "line" , ( line ) => {
100
- if ( line [ 0 ] !== "#" ) {
101
- const message = Buffer . from ( line + "\n" ) ;
102
-
103
- ++ lineCount ;
104
- client . send (
105
- message ,
106
- 0 ,
107
- message . length ,
108
- inspectorPort ,
109
- inspectorHost ,
110
- ( err ) => {
111
- if ( err ) console . error ( err ) ;
112
-
113
- ++ sentCount ;
114
-
115
- if ( last && lineCount === sentCount ) {
116
- client . close ( ) ;
117
- console . log ( `sent ${ sentCount } lines for inspection` ) ;
118
- callback ( null , `sent ${ sentCount } lines for inspection` ) ;
119
- resolve ( ) ;
120
- }
121
- } ,
122
- ) ;
123
- }
124
- } ) ;
125
-
126
- lineReader . on ( "close" , ( ) => {
127
- last = true ;
128
- console . log ( `processed lines ${ lineCount } ` ) ;
129
- } ) ;
130
- } ) ;
136
+ await /** @type {Promise<void> } */ (
137
+ new Promise ( ( resolve ) => {
138
+ lineReader . on ( "line" , ( line ) => {
139
+ if ( line [ 0 ] !== "#" ) {
140
+ const message = Buffer . from ( line + "\n" ) ;
141
+
142
+ ++ lineCount ;
143
+ client . send (
144
+ message ,
145
+ 0 ,
146
+ message . length ,
147
+ inspectorPort ,
148
+ inspectorHost ,
149
+ ( err ) => {
150
+ if ( err ) console . error ( err ) ;
151
+
152
+ ++ sentCount ;
153
+
154
+ if ( last && lineCount === sentCount ) {
155
+ client . close ( ) ;
156
+ console . log ( `sent ${ sentCount } lines for inspection` ) ;
157
+ callback ( null , `sent ${ sentCount } lines for inspection` ) ;
158
+ resolve ( ) ;
159
+ }
160
+ } ,
161
+ ) ;
162
+ }
163
+ } ) ;
164
+
165
+ lineReader . on ( "close" , ( ) => {
166
+ last = true ;
167
+ console . log ( `processed lines ${ lineCount } ` ) ;
168
+ } ) ;
169
+ } )
170
+ ) ;
131
171
} else {
132
172
callback ( "unsupported even type" ) ;
133
173
}
0 commit comments