Skip to content
This repository was archived by the owner on Jan 5, 2022. It is now read-only.

Commit 5aefb79

Browse files
committed
Re-added support for triggering via SNS.
closes #175
1 parent 09a31a4 commit 5aefb79

File tree

7 files changed

+139
-13
lines changed

7 files changed

+139
-13
lines changed

Makefile

+16-7
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
default: package
22

3-
package: solr postgres
3+
package: solr-sqs postgres-sqs solr-sns postgres-sns
44

5-
solr:
6-
GOOS=linux go build -o solr_derivative cmd/solr/main.go
5+
solr-sqs:
6+
GOOS=linux go build -o solr_derivative cmd/solr-sqs/main.go
77
zip solr_derivative.zip solr_derivative
88

9-
postgres:
10-
GOOS=linux go build -o postgres_derivative cmd/postgres/main.go
9+
postgres-sqs:
10+
GOOS=linux go build -o postgres_derivative cmd/postgres-sqs/main.go
1111
zip postgres_derivative.zip postgres_derivative
1212

13+
solr-sns:
14+
GOOS=linux go build -o solr_derivative_sns cmd/solr-sns/main.go
15+
zip solr_derivative_sns.zip solr_derivative_sns
16+
17+
postgres-sns:
18+
GOOS=linux go build -o postgres_derivative_sns cmd/postgres-sns/main.go
19+
zip postgres_derivative_sns.zip postgres_derivative_sns
20+
21+
1322
local-delete-solr:
1423
-AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws lambda \
1524
--region us-east-1 \
@@ -33,7 +42,7 @@ local-create-solr: solr local-delete-solr
3342
--environment "Variables={SOLR_HOST=http://solr:8983/solr,SOLR_COLLECTION=collection1,\
3443
SPARQL_ENDPOINT=http://triplestore:9999/blazegraph/namespace/kb/sparql, \
3544
SPARQL_RETRIES=300}" \
36-
--zip-file fileb://solr_derivative.zip
45+
--zip-file fileb://solr_derivative_sns.zip
3746

3847
local-create-postgres: postgres local-delete-postgres
3948
AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws \
@@ -51,7 +60,7 @@ local-create-postgres: postgres local-delete-postgres
5160
RDS_PORT=5432, \
5261
RDS_SSL=false, \
5362
RDS_PASSWORD=sekret}" \
54-
--zip-file fileb://postgres_derivative.zip
63+
--zip-file fileb://postgres_derivative_sns.zip
5564

5665
local-create-topic:
5766
AWS_ACCESS_KEY_ID=999999 AWS_SECRET_ACCESS_KEY=1231 aws sns \

cmd/postgres-sns/main.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
"strings"
7+
8+
"github.com/aws/aws-lambda-go/events"
9+
"github.com/aws/aws-lambda-go/lambda"
10+
"github.com/sul-dlss/rialto-derivatives/actions"
11+
"github.com/sul-dlss/rialto-derivatives/derivative"
12+
"github.com/sul-dlss/rialto-derivatives/message"
13+
"github.com/sul-dlss/rialto-derivatives/repository"
14+
"github.com/sul-dlss/rialto-derivatives/runtime"
15+
16+
// Added for the postgres driver
17+
_ "github.com/lib/pq"
18+
)
19+
20+
// Handler is the Lambda function handler
21+
func Handler(ctx context.Context, snsEvent events.SNSEvent) {
22+
repo := repository.BuildRepository()
23+
registry := runtime.NewRegistry(repo, buildDatabase(repo))
24+
for _, record := range snsEvent.Records {
25+
msg, err := message.ParseSNS(record)
26+
if err != nil {
27+
panic(err)
28+
}
29+
30+
if err = actions.DispatchMessage(msg, registry).Run(msg); err != nil {
31+
panic(err)
32+
}
33+
}
34+
}
35+
36+
func buildDatabase(repo repository.Repository) *derivative.PostgresClient {
37+
conf := derivative.NewPostgresConfig().
38+
WithUser(os.Getenv("RDS_USERNAME")).
39+
WithPassword(os.Getenv("RDS_PASSWORD")).
40+
WithDbname(os.Getenv("RDS_DB_NAME")).
41+
WithHost(os.Getenv("RDS_HOSTNAME")).
42+
WithPort(os.Getenv("RDS_PORT")).
43+
WithSSL(os.Getenv("RDS_SSL") == "" || strings.ToLower(os.Getenv("RDS_SSL")) == "true")
44+
45+
return derivative.NewPostgresClient(conf, repo)
46+
}
47+
48+
func main() {
49+
lambda.Start(Handler)
50+
}

cmd/postgres/main.go cmd/postgres-sqs/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func Handler(ctx context.Context, sqsEvent events.SQSEvent) {
2222
repo := repository.BuildRepository()
2323
registry := runtime.NewRegistry(repo, buildDatabase(repo))
2424
for _, record := range sqsEvent.Records {
25-
msg, err := message.Parse(record)
25+
msg, err := message.ParseSQS(record)
2626
if err != nil {
2727
panic(err)
2828
}

cmd/solr-sns/main.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
7+
"github.com/aws/aws-lambda-go/events"
8+
"github.com/aws/aws-lambda-go/lambda"
9+
"github.com/sul-dlss/rialto-derivatives/actions"
10+
"github.com/sul-dlss/rialto-derivatives/derivative"
11+
"github.com/sul-dlss/rialto-derivatives/message"
12+
"github.com/sul-dlss/rialto-derivatives/repository"
13+
"github.com/sul-dlss/rialto-derivatives/runtime"
14+
"github.com/sul-dlss/rialto-derivatives/transform"
15+
)
16+
17+
// Handler is the Lambda function handler
18+
func Handler(ctx context.Context, snsEvent events.SNSEvent) {
19+
repo := repository.BuildRepository()
20+
registry := runtime.NewRegistry(repo, buildSolrClient(repo))
21+
for _, record := range snsEvent.Records {
22+
msg, err := message.ParseSNS(record)
23+
if err != nil {
24+
panic(err)
25+
}
26+
27+
if err = actions.DispatchMessage(msg, registry).Run(msg); err != nil {
28+
panic(err)
29+
}
30+
}
31+
}
32+
33+
func buildSolrClient(repo repository.Repository) *derivative.SolrClient {
34+
indexer := transform.NewCompositeIndexer(repo)
35+
36+
host := os.Getenv("SOLR_HOST")
37+
collection := os.Getenv("SOLR_COLLECTION")
38+
return derivative.NewSolrClient(host, collection, indexer)
39+
}
40+
41+
func main() {
42+
lambda.Start(Handler)
43+
}

cmd/solr/main.go cmd/solr-sqs/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func Handler(ctx context.Context, sqsEvent events.SQSEvent) {
1919
repo := repository.BuildRepository()
2020
registry := runtime.NewRegistry(repo, buildSolrClient(repo))
2121
for _, record := range sqsEvent.Records {
22-
msg, err := message.Parse(record)
22+
msg, err := message.ParseSQS(record)
2323
if err != nil {
2424
panic(err)
2525
}

message/parser.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ type SQSBody struct {
1818
Message string
1919
}
2020

21-
// Parse transforms an SQSMessage into a message
22-
func Parse(record events.SQSMessage) (*Message, error) {
21+
// ParseSQS transforms an SQSMessage into a message
22+
func ParseSQS(record events.SQSMessage) (*Message, error) {
2323
body := &SQSBody{}
2424
err := json.Unmarshal([]byte(record.Body), body)
2525
if err != nil {
@@ -33,3 +33,15 @@ func Parse(record events.SQSMessage) (*Message, error) {
3333

3434
return msg, nil
3535
}
36+
37+
// ParseSNS transforms a SNSEventRecord into a message
38+
func ParseSNS(record events.SNSEventRecord) (*Message, error) {
39+
data := record.SNS.Message
40+
msg := &Message{}
41+
err := json.Unmarshal([]byte(data), msg)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
return msg, nil
47+
}

message/parser_test.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,24 @@ import (
77
"github.com/stretchr/testify/assert"
88
)
99

10-
func TestParse(t *testing.T) {
10+
func TestParseSQS(t *testing.T) {
1111
evtRecord := events.SQSMessage{
1212
Body: "{\"Message\": \"{\\\"Action\\\": \\\"touch\\\", \\\"Entities\\\":[\\\"http://example.com/foo1\\\"] }\"}",
1313
}
1414

15-
event, _ := Parse(evtRecord)
15+
event, _ := ParseSQS(evtRecord)
16+
assert.Equal(t, "touch", event.Action)
17+
assert.Equal(t, []string{"http://example.com/foo1"}, event.Entities)
18+
19+
}
20+
21+
func TestParseSNS(t *testing.T) {
22+
evtRecord := events.SNSEventRecord{
23+
SNS: events.SNSEntity{
24+
Message: "{\"Action\": \"touch\", \"Entities\":[\"http://example.com/foo1\"] }",
25+
},
26+
}
27+
event, _ := ParseSNS(evtRecord)
1628
assert.Equal(t, "touch", event.Action)
1729
assert.Equal(t, []string{"http://example.com/foo1"}, event.Entities)
1830

0 commit comments

Comments
 (0)