Skip to content

Commit ae33000

Browse files
authored
Merge pull request #1543 from neicnordic/feat/send-remove-message-on-overwrite
[s3inbox] Send remove message on overwrite
2 parents e9d06d5 + 4c4f273 commit ae33000

File tree

5 files changed

+320
-33
lines changed

5 files changed

+320
-33
lines changed

Diff for: .github/integration/tests/sda/10_upload_test.sh

+18-5
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,9 @@ done
3333
## reupload a file under a different name
3434
s3cmd -c s3cfg put NA12878.bam.c4gh s3://test_dummy.org/NB12878.bam.c4gh
3535

36-
## reupload a file with the same name
37-
s3cmd -c s3cfg put NA12878.bam.c4gh s3://test_dummy.org/
38-
39-
4036
echo "waiting for upload to complete"
4137
RETRY_TIMES=0
42-
until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 6 ]; do
38+
until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 5 ]; do
4339
echo "waiting for upload to complete"
4440
RETRY_TIMES=$((RETRY_TIMES + 1))
4541
if [ "$RETRY_TIMES" -eq 30 ]; then
@@ -49,6 +45,23 @@ until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messag
4945
sleep 2
5046
done
5147

48+
## reupload a file with the same name
49+
s3cmd -c s3cfg put NA12878.bam.c4gh s3://test_dummy.org/
50+
51+
## expect 2 new messages, one for deletion of the overwritten file, one for the new upload
52+
echo "waiting for re-upload to complete"
53+
RETRY_TIMES=0
54+
until [ "$(curl -s -k -u guest:guest $URI/api/queues/sda/inbox | jq -r '."messages_ready"')" -eq 7 ]; do
55+
echo "waiting for re-upload to complete"
56+
RETRY_TIMES=$((RETRY_TIMES + 1))
57+
if [ "$RETRY_TIMES" -eq 30 ]; then
58+
echo "::error::Time out while waiting for re-upload to complete"
59+
exit 1
60+
fi
61+
sleep 2
62+
done
63+
64+
5265
num_rows=$(psql -U postgres -h postgres -d sda -At -c "SELECT COUNT(*) from sda.files;")
5366
if [ "$num_rows" -ne 5 ]; then
5467
echo "database queries for register_files failed, expected 5 got $num_rows"

Diff for: sda/cmd/s3inbox/healthchecks_test.go

+55-11
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,83 @@ import (
99
"github.com/neicnordic/sensitive-data-archive/internal/broker"
1010
"github.com/neicnordic/sensitive-data-archive/internal/database"
1111
"github.com/neicnordic/sensitive-data-archive/internal/helper"
12+
"github.com/neicnordic/sensitive-data-archive/internal/storage"
1213

1314
"github.com/stretchr/testify/assert"
1415
"github.com/stretchr/testify/suite"
1516
)
1617

1718
type HealthcheckTestSuite struct {
18-
ProxyTests
19+
suite.Suite
20+
S3Fakeconf storage.S3Conf // fakeserver
21+
S3conf storage.S3Conf // actual s3 container
22+
DBConf database.DBConf
23+
fakeServer *FakeServer
24+
MQConf broker.MQConf
25+
messenger *broker.AMQPBroker
26+
database *database.SDAdb
1927
}
2028

2129
func TestHealthTestSuite(t *testing.T) {
2230
suite.Run(t, new(HealthcheckTestSuite))
2331
}
2432

2533
func (suite *HealthcheckTestSuite) SetupTest() {
26-
// Reuse the setup from Proxy
27-
suite.ProxyTests.SetupTest()
34+
suite.fakeServer = startFakeServer("9024")
35+
36+
// Create an s3config for the fake server
37+
suite.S3Fakeconf = storage.S3Conf{
38+
URL: "http://127.0.0.1",
39+
Port: 9024,
40+
AccessKey: "someAccess",
41+
SecretKey: "someSecret",
42+
Bucket: "buckbuck",
43+
Region: "us-east-1",
44+
}
45+
46+
// Create a configuration for the fake MQ
47+
suite.MQConf = broker.MQConf{
48+
Host: "127.0.0.1",
49+
Port: MQport,
50+
User: "guest",
51+
Password: "guest",
52+
Vhost: "/",
53+
Exchange: "",
54+
}
55+
56+
suite.messenger = &broker.AMQPBroker{}
57+
58+
// Create a database configuration for the fake database
59+
suite.DBConf = database.DBConf{
60+
Host: "127.0.0.1",
61+
Port: DBport,
62+
User: "postgres",
63+
Password: "rootpasswd",
64+
Database: "sda",
65+
CACert: "",
66+
SslMode: "disable",
67+
ClientCert: "",
68+
ClientKey: "",
69+
}
70+
71+
suite.database = &database.SDAdb{}
2872
}
2973

3074
func (suite *HealthcheckTestSuite) TearDownTest() {
31-
// Reuse the teardown from Proxy
32-
suite.ProxyTests.TearDownTest()
75+
suite.fakeServer.Close()
76+
suite.database.Close()
3377
}
3478

3579
func (suite *HealthcheckTestSuite) TestHttpsGetCheck() {
36-
p := NewProxy(suite.S3conf, &helper.AlwaysAllow{}, suite.messenger, suite.database, new(tls.Config))
80+
p := NewProxy(suite.S3Fakeconf, &helper.AlwaysAllow{}, suite.messenger, suite.database, new(tls.Config))
3781

3882
url, _ := p.getS3ReadyPath()
3983
assert.NoError(suite.T(), p.httpsGetCheck(url))
4084
assert.Error(suite.T(), p.httpsGetCheck("http://127.0.0.1:8888/nonexistent"), "404 should fail")
4185
}
4286

4387
func (suite *HealthcheckTestSuite) TestS3URL() {
44-
p := NewProxy(suite.S3conf, &helper.AlwaysAllow{}, suite.messenger, suite.database, new(tls.Config))
88+
p := NewProxy(suite.S3Fakeconf, &helper.AlwaysAllow{}, suite.messenger, suite.database, new(tls.Config))
4589

4690
_, err := p.getS3ReadyPath()
4791
assert.NoError(suite.T(), err)
@@ -57,7 +101,7 @@ func (suite *HealthcheckTestSuite) TestHealthchecks() {
57101
database, _ := database.NewSDAdb(suite.DBConf)
58102
messenger, err := broker.NewMQ(suite.MQConf)
59103
assert.NoError(suite.T(), err)
60-
p := NewProxy(suite.S3conf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
104+
p := NewProxy(suite.S3Fakeconf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
61105

62106
w := httptest.NewRecorder()
63107
p.CheckHealth(w, httptest.NewRequest(http.MethodGet, "https://dummy/health", nil))
@@ -72,7 +116,7 @@ func (suite *HealthcheckTestSuite) TestClosedDBHealthchecks() {
72116
database, _ := database.NewSDAdb(suite.DBConf)
73117
messenger, err := broker.NewMQ(suite.MQConf)
74118
assert.NoError(suite.T(), err)
75-
p := NewProxy(suite.S3conf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
119+
p := NewProxy(suite.S3Fakeconf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
76120

77121
// Check that 200 is reported
78122
w := httptest.NewRecorder()
@@ -95,7 +139,7 @@ func (suite *HealthcheckTestSuite) TestNoS3Healthchecks() {
95139
database, _ := database.NewSDAdb(suite.DBConf)
96140
messenger, err := broker.NewMQ(suite.MQConf)
97141
assert.NoError(suite.T(), err)
98-
p := NewProxy(suite.S3conf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
142+
p := NewProxy(suite.S3Fakeconf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
99143

100144
// S3 unavailable, check that 503 is reported
101145
w := httptest.NewRecorder()
@@ -111,7 +155,7 @@ func (suite *HealthcheckTestSuite) TestNoMQHealthchecks() {
111155
database, _ := database.NewSDAdb(suite.DBConf)
112156
messenger, err := broker.NewMQ(suite.MQConf)
113157
assert.NoError(suite.T(), err)
114-
p := NewProxy(suite.S3conf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
158+
p := NewProxy(suite.S3Fakeconf, &helper.AlwaysAllow{}, messenger, database, new(tls.Config))
115159

116160
// Messenger unavailable, check that 503 is reported
117161
p.messenger.Conf.Port = 123456

Diff for: sda/cmd/s3inbox/proxy.go

+69-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/minio/minio-go/v6/pkg/signer"
2222
"github.com/neicnordic/sensitive-data-archive/internal/broker"
2323
"github.com/neicnordic/sensitive-data-archive/internal/database"
24+
"github.com/neicnordic/sensitive-data-archive/internal/schema"
2425
"github.com/neicnordic/sensitive-data-archive/internal/storage"
2526
"github.com/neicnordic/sensitive-data-archive/internal/userauth"
2627
log "github.com/sirupsen/logrus"
@@ -156,8 +157,9 @@ func (p *Proxy) allowedResponse(w http.ResponseWriter, r *http.Request, token jw
156157
return
157158
}
158159

159-
// register file in database if it's the start of an upload
160+
// if this is an upload request
160161
if p.detectRequestType(r) == Put && p.fileIds[r.URL.Path] == "" {
162+
// register file in database
161163
log.Debugf("registering file %v in the database", r.URL.Path)
162164
p.fileIds[r.URL.Path], err = p.database.RegisterFile(filepath, username)
163165
log.Debugf("fileId: %v", p.fileIds[r.URL.Path])
@@ -166,6 +168,16 @@ func (p *Proxy) allowedResponse(w http.ResponseWriter, r *http.Request, token jw
166168

167169
return
168170
}
171+
172+
// check if the file already exists, in that case send an overwrite message,
173+
// so that the FEGA portal is informed that a new version of the file exists.
174+
err = p.sendMessageOnOverwrite(r, rawFilepath, token)
175+
if err != nil {
176+
p.internalServerError(w, r, err.Error())
177+
178+
return
179+
}
180+
169181
}
170182

171183
log.Debug("Forwarding to backend")
@@ -285,18 +297,21 @@ func (p *Proxy) checkAndSendMessage(jsonMessage []byte, r *http.Request) error {
285297

286298
func (p *Proxy) uploadFinishedSuccessfully(req *http.Request, response *http.Response) bool {
287299
if response.StatusCode != 200 {
300+
288301
return false
289302
}
290303

291304
switch req.Method {
292305
case http.MethodPut:
293306
if !strings.Contains(req.URL.String(), "partNumber") {
307+
294308
return true
295309
}
296310

297311
return false
298312
case http.MethodPost:
299313
if strings.Contains(req.URL.String(), "uploadId") {
314+
300315
return true
301316
}
302317

@@ -511,6 +526,59 @@ func (p *Proxy) requestInfo(fullPath string) (string, int64, error) {
511526

512527
}
513528

529+
// checkFileExists makes a request to the S3 to check whether the file already
530+
// is uploaded. Returns a bool indicating whether the file was found.
531+
func (p *Proxy) checkFileExists(fullPath string) (bool, error) {
532+
filePath := strings.Replace(fullPath, "/"+p.s3.Bucket+"/", "", 1)
533+
client, err := storage.NewS3Client(p.s3)
534+
if err != nil {
535+
return false, fmt.Errorf("could not connect to s3: %v", err)
536+
}
537+
538+
result, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
539+
Bucket: &p.s3.Bucket,
540+
Key: &filePath,
541+
})
542+
543+
if err != nil && strings.Contains(err.Error(), "StatusCode: 404") {
544+
log.Debugf("s3 could not find file %s: %s", fullPath, err.Error())
545+
546+
return false, nil
547+
}
548+
549+
return result != nil, err
550+
}
551+
552+
func (p *Proxy) sendMessageOnOverwrite(r *http.Request, rawFilepath string, token jwt.Token) error {
553+
exist, err := p.checkFileExists(r.URL.Path)
554+
if err != nil {
555+
return err
556+
}
557+
if exist {
558+
username := token.Subject()
559+
msg := schema.InboxRemove{
560+
User: username,
561+
FilePath: rawFilepath,
562+
Operation: "remove",
563+
}
564+
565+
privateClaims := token.PrivateClaims()
566+
log.Info("user ", msg.User, " with pilot ", privateClaims["pilot"], " will overwrite file ", msg.FilePath, " at ", time.Now())
567+
568+
jsonMessage, err := json.Marshal(msg)
569+
if err != nil {
570+
return err
571+
}
572+
573+
err = p.checkAndSendMessage(jsonMessage, r)
574+
if err != nil {
575+
return err
576+
}
577+
}
578+
579+
return nil
580+
}
581+
514582
// FormatUploadFilePath ensures that path separators are "/", and returns error if the
515583
// filepath contains a disallowed character matched with regex
516584
func formatUploadFilePath(filePath string) (string, error) {

0 commit comments

Comments
 (0)