Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Plugins/AWSLambdaPackager/PluginUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
//===----------------------------------------------------------------------===//

import Dispatch
import Foundation
import PackagePlugin
import Synchronization
import Foundation

@available(macOS 15.0, *)
struct Utils {
Expand Down Expand Up @@ -47,7 +47,7 @@ struct Utils {
let outputSync = DispatchGroup()
let outputQueue = DispatchQueue(label: "AWSLambdaPackager.output")
let unsafeTransfer = UnsafeTransfer(value: stdout)
let outputHandler = { @Sendable (data: Data?) in
let outputHandler = { @Sendable(data:Data?) in
dispatchPrecondition(condition: .onQueue(outputQueue))

outputSync.enter()
Expand Down
56 changes: 29 additions & 27 deletions Sources/AWSLambdaRuntimeCore/ControlPlaneRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,39 @@ enum ControlPlaneResponse: Hashable {
case error(ErrorResponse)
}

package struct InvocationMetadata: Hashable {
package let requestID: String
package let deadlineInMillisSinceEpoch: Int64
package let invokedFunctionARN: String
package let traceID: String
package let clientContext: String?
package let cognitoIdentity: String?
package
struct InvocationMetadata: Hashable {
package let requestID: String
package let deadlineInMillisSinceEpoch: Int64
package let invokedFunctionARN: String
package let traceID: String
package let clientContext: String?
package let cognitoIdentity: String?

package init(headers: HTTPHeaders) throws(LambdaRuntimeError) {
guard let requestID = headers.first(name: AmazonHeaders.requestID), !requestID.isEmpty else {
throw LambdaRuntimeError(code: .nextInvocationMissingHeaderRequestID)
}

guard let deadline = headers.first(name: AmazonHeaders.deadline),
let unixTimeInMilliseconds = Int64(deadline)
else {
throw LambdaRuntimeError(code: .nextInvocationMissingHeaderDeadline)
}
package init(headers: HTTPHeaders) throws
(LambdaRuntimeError) {
guard let requestID = headers.first(name: AmazonHeaders.requestID), !requestID.isEmpty else {
throw LambdaRuntimeError(code: .nextInvocationMissingHeaderRequestID)
}

guard let invokedFunctionARN = headers.first(name: AmazonHeaders.invokedFunctionARN) else {
throw LambdaRuntimeError(code: .nextInvocationMissingHeaderInvokeFuctionARN)
}
guard let deadline = headers.first(name: AmazonHeaders.deadline),
let unixTimeInMilliseconds = Int64(deadline)
else {
throw LambdaRuntimeError(code: .nextInvocationMissingHeaderDeadline)
}

self.requestID = requestID
self.deadlineInMillisSinceEpoch = unixTimeInMilliseconds
self.invokedFunctionARN = invokedFunctionARN
self.traceID =
headers.first(name: AmazonHeaders.traceID) ?? "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=0"
self.clientContext = headers["Lambda-Runtime-Client-Context"].first
self.cognitoIdentity = headers["Lambda-Runtime-Cognito-Identity"].first
guard let invokedFunctionARN = headers.first(name: AmazonHeaders.invokedFunctionARN) else {
throw LambdaRuntimeError(code: .nextInvocationMissingHeaderInvokeFuctionARN)
}

self.requestID = requestID
self.deadlineInMillisSinceEpoch = unixTimeInMilliseconds
self.invokedFunctionARN = invokedFunctionARN
self.traceID =
headers.first(name: AmazonHeaders.traceID) ?? "Root=\(AmazonHeaders.generateXRayTraceID());Sampled=0"
self.clientContext = headers["Lambda-Runtime-Client-Context"].first
self.cognitoIdentity = headers["Lambda-Runtime-Cognito-Identity"].first
}
}

struct ErrorResponse: Hashable, Codable {
Expand Down
51 changes: 26 additions & 25 deletions Sources/AWSLambdaRuntimeCore/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,38 @@ import ucrt
#endif

public enum Lambda {
package static func runLoop<RuntimeClient: LambdaRuntimeClientProtocol, Handler>(
runtimeClient: RuntimeClient,
handler: Handler,
logger: Logger
) async throws where Handler: StreamingLambdaHandler {
var handler = handler
package
static func runLoop<RuntimeClient: LambdaRuntimeClientProtocol, Handler>(
runtimeClient: RuntimeClient,
handler: Handler,
logger: Logger
) async throws where Handler: StreamingLambdaHandler {
var handler = handler

while !Task.isCancelled {
let (invocation, writer) = try await runtimeClient.nextInvocation()
while !Task.isCancelled {
let (invocation, writer) = try await runtimeClient.nextInvocation()

do {
try await handler.handle(
invocation.event,
responseWriter: writer,
context: LambdaContext(
requestID: invocation.metadata.requestID,
traceID: invocation.metadata.traceID,
invokedFunctionARN: invocation.metadata.invokedFunctionARN,
deadline: DispatchWallTime(millisSinceEpoch: invocation.metadata.deadlineInMillisSinceEpoch),
logger: logger
)
do {
try await handler.handle(
invocation.event,
responseWriter: writer,
context: LambdaContext(
requestID: invocation.metadata.requestID,
traceID: invocation.metadata.traceID,
invokedFunctionARN: invocation.metadata.invokedFunctionARN,
deadline: DispatchWallTime(millisSinceEpoch: invocation.metadata.deadlineInMillisSinceEpoch),
logger: logger
)
} catch {
try await writer.reportError(error)
continue
}
)
} catch {
try await writer.reportError(error)
continue
}
}
}

/// The default EventLoop the Lambda is scheduled on.
public static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
/// The default EventLoop the Lambda is scheduled on.
public static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
}

// MARK: - Public API
Expand Down
34 changes: 17 additions & 17 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,20 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
"\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))"
}

/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
package static func __forTestsOnly(
requestID: String,
traceID: String,
invokedFunctionARN: String,
timeout: DispatchTimeInterval,
logger: Logger
) -> LambdaContext {
LambdaContext(
requestID: requestID,
traceID: traceID,
invokedFunctionARN: invokedFunctionARN,
deadline: .now() + timeout,
logger: logger
)
}
}
/// This interface is not part of the public API and must not be used by adopters. This API is not part of semver versioning.
package
static func __forTestsOnly(
requestID: String,
traceID: String,
invokedFunctionARN: String,
timeout: DispatchTimeInterval,
logger: Logger
) -> LambdaContext {
LambdaContext(
requestID: requestID,
traceID: traceID,
invokedFunctionARN: invokedFunctionARN,
deadline: .now() + timeout,
logger: logger
)
}}
8 changes: 5 additions & 3 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@

import Foundation
import Logging
import NIOCore
import NIOConcurrencyHelpers
import NIOCore

// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today.
// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this
// sadly crashes the compiler today.
public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: StreamingLambdaHandler {
// TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore
let handlerMutex: NIOLockedValueBox<Optional<Handler>>
let handlerMutex: NIOLockedValueBox<Handler?>
let logger: Logger
let eventLoop: EventLoop

public init(
handler: sending Handler,
handler: sending
Handler
,
eventLoop: EventLoop = Lambda.defaultEventLoop,
logger: Logger = Logger(label: "LambdaRuntime")
) {
Expand Down
5 changes: 3 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
]
}

func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
switch self.state {
case .connected(let context, .idle):
return try await withCheckedThrowingContinuation {
Expand All @@ -484,7 +484,8 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
}

func reportError(
isolation: isolated (any Actor)? = #isolation,
isolation: isolated (any Actor)? =
#isolation,
_ error: any Error,
requestID: String
) async throws {
Expand Down
15 changes: 9 additions & 6 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntimeClientProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,27 @@

import NIOCore

package protocol LambdaRuntimeClientResponseStreamWriter: LambdaResponseStreamWriter {
package
protocol LambdaRuntimeClientResponseStreamWriter: LambdaResponseStreamWriter {
func write(_ buffer: ByteBuffer) async throws
func finish() async throws
func writeAndFinish(_ buffer: ByteBuffer) async throws
func reportError(_ error: any Error) async throws
}

package protocol LambdaRuntimeClientProtocol {
package
protocol LambdaRuntimeClientProtocol {
associatedtype Writer: LambdaRuntimeClientResponseStreamWriter

func nextInvocation() async throws -> (Invocation, Writer)
}

package struct Invocation {
package var metadata: InvocationMetadata
package var event: ByteBuffer
package
struct Invocation {
package var metadata: InvocationMetadata
package var event: ByteBuffer

package init(metadata: InvocationMetadata, event: ByteBuffer) {
package init(metadata: InvocationMetadata, event: ByteBuffer) {
self.metadata = metadata
self.event = event
}
Expand Down
11 changes: 6 additions & 5 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
//
//===----------------------------------------------------------------------===//

package struct LambdaRuntimeError: Error {
package enum Code {
package
struct LambdaRuntimeError: Error {
package enum Code {
case closingRuntimeClient

case connectionToControlPlaneLost
Expand All @@ -34,12 +35,12 @@ package struct LambdaRuntimeError: Error {
case invalidPort
}

package init(code: Code, underlying: (any Error)? = nil) {
package init(code: Code, underlying: (any Error)? = nil) {
self.code = code
self.underlying = underlying
}

package var code: Code
package var underlying: (any Error)?
package var code: Code
package var underlying: (any Error)?

}
Loading