forked from swift-server/swift-aws-lambda-runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLambdaRuntimeClient.swift
149 lines (138 loc) · 5.96 KB
/
LambdaRuntimeClient.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOHTTP1
/// An HTTP based client for AWS Runtime Engine. This encapsulates the RESTful methods exposed by the Runtime Engine:
/// * /runtime/invocation/next
/// * /runtime/invocation/response
/// * /runtime/invocation/error
/// * /runtime/init/error
struct LambdaRuntimeClient {
private let eventLoop: EventLoop
private let allocator = ByteBufferAllocator()
private let httpClient: HTTPClient
init(eventLoop: EventLoop, configuration: LambdaConfiguration.RuntimeEngine) {
self.eventLoop = eventLoop
self.httpClient = HTTPClient(eventLoop: eventLoop, configuration: configuration)
}
/// Requests invocation from the control plane.
func getNextInvocation(logger: Logger) -> EventLoopFuture<(InvocationMetadata, ByteBuffer)> {
let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix
logger.debug("requesting work from lambda runtime engine using \(url)")
return self.httpClient.get(url: url, headers: LambdaRuntimeClient.defaultHeaders).flatMapThrowing { response in
guard response.status == .ok else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
let invocation = try InvocationMetadata(headers: response.headers)
guard let event = response.body else {
throw LambdaRuntimeError.noBody
}
return (invocation, event)
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Reports a result to the Runtime Engine.
func reportResults(
logger: Logger,
invocation: InvocationMetadata,
result: Result<ByteBuffer?, Error>
) -> EventLoopFuture<Void> {
var url = Consts.invocationURLPrefix + "/" + invocation.requestID
var body: ByteBuffer?
let headers: HTTPHeaders
switch result {
case .success(let buffer):
url += Consts.postResponseURLSuffix
body = buffer
headers = LambdaRuntimeClient.defaultHeaders
case .failure(let error):
url += Consts.postErrorURLSuffix
let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
body = self.allocator.buffer(capacity: bytes.count)
body!.writeBytes(bytes)
headers = LambdaRuntimeClient.errorHeaders
}
logger.debug("reporting results to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in
guard response.status == .accepted else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Reports an initialization error to the Runtime Engine.
func reportInitializationError(logger: Logger, error: Error) -> EventLoopFuture<Void> {
let url = Consts.postInitErrorURL
let errorResponse = ErrorResponse(errorType: Consts.initializationError, errorMessage: "\(error)")
let bytes = errorResponse.toJSONBytes()
var body = self.allocator.buffer(capacity: bytes.count)
body.writeBytes(bytes)
logger.warning("reporting initialization error to lambda runtime engine using \(url)")
return self.httpClient.post(url: url, headers: LambdaRuntimeClient.errorHeaders, body: body).flatMapThrowing {
response in
guard response.status == .accepted else {
throw LambdaRuntimeError.badStatusCode(response.status)
}
return ()
}.flatMapErrorThrowing { error in
switch error {
case HTTPClient.Errors.timeout:
throw LambdaRuntimeError.upstreamError("timeout")
case HTTPClient.Errors.connectionResetByPeer:
throw LambdaRuntimeError.upstreamError("connectionResetByPeer")
default:
throw error
}
}
}
/// Cancels the current request, if one is running. Only needed for debugging purposes
func cancel() {
self.httpClient.cancel()
}
}
enum LambdaRuntimeError: Error {
case badStatusCode(HTTPResponseStatus)
case upstreamError(String)
case invocationMissingHeader(String)
case noBody
case json(Error)
case shutdownError(shutdownError: Error, runnerResult: Result<Int, Error>)
}
extension LambdaRuntimeClient {
static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")])
/// These headers must be sent along an invocation or initialization error report
static let errorHeaders = HTTPHeaders([
("user-agent", "Swift-Lambda/Unknown"),
("lambda-runtime-function-error-type", "Unhandled"),
])
}