@@ -27,10 +27,7 @@ import (
2727 "time"
2828
2929 "github.com/kelseyhightower/envconfig"
30- "github.com/triggermesh/aws-custom-runtime/pkg/events"
31- "github.com/triggermesh/aws-custom-runtime/pkg/events/apigateway"
32- "github.com/triggermesh/aws-custom-runtime/pkg/events/cloudevents"
33- "github.com/triggermesh/aws-custom-runtime/pkg/events/passthrough"
30+ "github.com/triggermesh/aws-custom-runtime/pkg/sender"
3431)
3532
3633var (
@@ -67,13 +64,15 @@ type Specification struct {
6764 // Lambda API port to put function requests and get results
6865 ExternalAPIport string `envconfig:"external_api_port" default:"8080"`
6966
70- // Apply response wrapping before sending it back to the client.
71- // Common case - AWS Lambda functions usually returns data formatted for API Gateway service.
72- // Set "RESPONSE_WRAPPER: API_GATEWAY" and receive events as if they were processed by API Gateway.
73- // Opposite scenario - return responses in CloudEvent format: "RESPONSE_WRAPPER: CLOUDEVENTS"
74- // NOTE: Response wrapper does both encoding and decoding depending on the type. We should consider
75- // separating wrappers by their function.
76- ResponseWrapper string `envconfig:"response_wrapper"`
67+ Sink string `envconfig:"k_sink"`
68+ ResponseFormat string `envconfig:"response_format"`
69+ }
70+
71+ type Handler struct {
72+ * sender.Handler
73+
74+ requestSizeLimit int64
75+ functionTTL int64
7776}
7877
7978type message struct {
@@ -98,10 +97,10 @@ func (rw *responseWrapper) WriteHeader(statusCode int) {
9897 rw .StatusCode = statusCode
9998}
10099
101- func ( s * Specification ) setupEnv () error {
100+ func setupEnv (internalAPIport string ) error {
102101 environment ["_HANDLER" ], _ = os .LookupEnv ("_HANDLER" )
103102 environment ["LAMBDA_TASK_ROOT" ], _ = os .LookupEnv ("LAMBDA_TASK_ROOT" )
104- environment ["AWS_LAMBDA_RUNTIME_API" ] += ":" + s . InternalAPIport
103+ environment ["AWS_LAMBDA_RUNTIME_API" ] += ":" + internalAPIport
105104
106105 for k , v := range environment {
107106 if err := os .Setenv (k , v ); err != nil {
@@ -111,9 +110,9 @@ func (s *Specification) setupEnv() error {
111110 return nil
112111}
113112
114- func (s * Specification ) newTask (w http.ResponseWriter , r * http.Request ) {
115- requestSizeLimitInBytes := s . RequestSizeLimit * 1e+6
116- functionTTLInNanoSeconds := s . FunctionTTL * 1e+9
113+ func (h * Handler ) newTask (w http.ResponseWriter , r * http.Request ) {
114+ requestSizeLimitInBytes := h . requestSizeLimit * 1e+6
115+ functionTTLInNanoSeconds := h . functionTTL * 1e+9
117116 body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
118117 if err != nil {
119118 http .Error (w , err .Error (), http .StatusInternalServerError )
@@ -140,12 +139,19 @@ func (s *Specification) newTask(w http.ResponseWriter, r *http.Request) {
140139 select {
141140 case <- time .After (time .Duration (functionTTLInNanoSeconds )):
142141 log .Printf ("-> ! %s Deadline is reached\n " , task .id )
143- w .WriteHeader (http .StatusGone )
144- w .Write ([]byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )))
142+ // w.WriteHeader(http.StatusGone)
143+ // w.Write([]byte(fmt.Sprintf("Deadline is reached, data %s", task.data)))
144+ resp := []byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data ))
145+ if err := h .Send (resp , http .StatusGone , w ); err != nil {
146+ log .Printf ("! %s %v\n " , task .id , err )
147+ }
145148 case result := <- resultsChannel :
146149 log .Printf ("-> %s %d %s\n " , result .id , result .statusCode , result .data )
147- w .WriteHeader (result .statusCode )
148- w .Write (result .data )
150+ // w.WriteHeader(result.statusCode)
151+ // w.Write(result.data)
152+ if err := h .Send (result .data , result .statusCode , w ); err != nil {
153+ log .Printf ("! %s %v\n " , result .id , err )
154+ }
149155 }
150156 mutex .Lock ()
151157 delete (results , task .id )
@@ -231,31 +237,6 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
231237 return
232238}
233239
234- func (s * Specification ) mapEvent (h http.Handler ) http.Handler {
235- var mapper events.Mapper
236-
237- switch s .ResponseWrapper {
238- case "API_GATEWAY" :
239- mapper = apigateway .NewMapper ()
240- case "CLOUDEVENTS" :
241- mapper = cloudevents .NewMapper ()
242- if err := envconfig .Process ("CE" , mapper ); err != nil {
243- log .Fatalf ("Cannot process CloudEvents wrapper env variables: %v" , err )
244- }
245- default :
246- mapper = passthrough .NewMapper ()
247- }
248-
249- return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
250- rw := responseWrapper {
251- ResponseWriter : w ,
252- }
253- mapper .Request (r )
254- h .ServeHTTP (& rw , r )
255- mapper .Response (w , rw .StatusCode , rw .Body )
256- })
257- }
258-
259240func ping (w http.ResponseWriter , r * http.Request ) {
260241 w .WriteHeader (http .StatusOK )
261242 w .Write ([]byte ("pong" ))
@@ -282,28 +263,39 @@ func api() error {
282263}
283264
284265func main () {
266+ // parse env
285267 var spec Specification
286268 if err := envconfig .Process ("" , & spec ); err != nil {
287269 log .Fatalf ("Cannot process env variables: %v" , err )
288270 }
289271 log .Printf ("%+v\n " , spec )
290272
291273 log .Println ("Setting up runtime env" )
292- if err := spec . setupEnv (); err != nil {
274+ if err := setupEnv (spec . InternalAPIport ); err != nil {
293275 log .Fatalf ("Cannot setup runime env: %v" , err )
294276 }
295277
278+ // create sender
279+ sender := Handler {
280+ Handler : sender .New (spec .Sink , "text/plain" ),
281+ requestSizeLimit : spec .RequestSizeLimit ,
282+ functionTTL : spec .FunctionTTL ,
283+ }
284+
285+ // setup channels
296286 tasks = make (chan message , 100 )
297287 results = make (map [string ]chan message )
298288 defer close (tasks )
299289
290+ // start Lambda API
300291 log .Println ("Starting API" )
301292 go func () {
302293 if err := api (); err != nil {
303294 log .Fatalf ("Runtime internal API error: %v" , err )
304295 }
305296 }()
306297
298+ // start invokers
307299 for i := 0 ; i < spec .NumberOfinvokers ; i ++ {
308300 log .Println ("Starting bootstrap" , i + 1 )
309301 go func (i int ) {
@@ -317,9 +309,10 @@ func main() {
317309 }(i )
318310 }
319311
312+ // start external API
320313 taskRouter := http .NewServeMux ()
321- taskHandler := http .HandlerFunc (spec .newTask )
322- taskRouter .Handle ("/" , spec . mapEvent ( taskHandler ) )
314+ taskHandler := http .HandlerFunc (sender .newTask )
315+ taskRouter .Handle ("/" , taskHandler )
323316 log .Println ("Listening..." )
324317 err := http .ListenAndServe (":" + spec .ExternalAPIport , taskRouter )
325318 if err != nil && err != http .ErrServerClosed {
0 commit comments