Skip to content

Commit 92f1e4a

Browse files
authored
Change uploaded networks structure (#9)
* Change uploaded networks structure * Addressed Mandar and Viswa's comments
1 parent a54ac00 commit 92f1e4a

File tree

3 files changed

+149
-46
lines changed

3 files changed

+149
-46
lines changed

cmd/network-crawler/network-crawler.go

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ func publishExternalNetworks(
9292
) error {
9393
// We use the folder name as object prefix so that all the objects
9494
// uploaded as part of this run appears under the same folder
95-
folderName := getFolderName()
96-
objectPrefix := getObjectPrefix(folderName)
97-
latestPrefixFilePrefix := getObjectPrefix("")
95+
timestamp := getCurrentTimestamp()
96+
latestObjectPrefix := getObjectPrefix(common.LatestFolderName)
97+
topLevelPrefixes := getObjectPrefix("")
9898

9999
var allExternalNetworks common.ExternalNetworkSources
100100
for _, crawler := range crawlerImpls {
@@ -116,16 +116,16 @@ func publishExternalNetworks(
116116
return errors.Wrap(err, "external network sources validation failed")
117117
}
118118

119-
// Create and upload the object file
120-
err = uploadExternalNetworkSources(&allExternalNetworks, isDryRun, bucketName, objectPrefix)
119+
// Rename the existing latest files
120+
err = copyExistingLatestFilesToTimestampName(isDryRun, bucketName, latestObjectPrefix, topLevelPrefixes)
121121
if err != nil {
122-
return errors.Wrap(err, "failed to upload data to bucket")
122+
return errors.Wrap(err, "failed to rename existing latest files")
123123
}
124124

125-
// Update the latest_prefix pointer
126-
err = updateLatestPrefixPointer(isDryRun, bucketName, folderName, latestPrefixFilePrefix)
125+
// Create and upload the object file
126+
err = uploadExternalNetworkSources(&allExternalNetworks, isDryRun, bucketName, latestObjectPrefix, timestamp)
127127
if err != nil {
128-
return errors.Wrapf(err, "failed to update latest pointer with prefix: %s", objectPrefix)
128+
return errors.Wrap(err, "failed to upload data to bucket")
129129
}
130130

131131
log.Print("Finished crawling all providers.")
@@ -182,10 +182,62 @@ func validateExternalNetworks(crawlers []common.NetworkCrawler, networks *common
182182
return nil
183183
}
184184

185+
func copyExistingLatestFilesToTimestampName(isDryRun bool, bucketName, latestObjectPrefix, topLevelPrefixes string) error {
186+
existingLatestFileNames, err := utils.GetAllObjectNamesWithPrefix(bucketName, latestObjectPrefix)
187+
if err != nil {
188+
return err
189+
}
190+
if len(existingLatestFileNames) == 0 {
191+
log.Printf("No filed found under %s. Not renaming anything...", latestObjectPrefix)
192+
return nil
193+
}
194+
if len(existingLatestFileNames) != 3 {
195+
return fmt.Errorf(
196+
"there should be three different files: %s, %s, and %s",
197+
common.NetworkFileName,
198+
common.ChecksumFileName,
199+
common.TimestampFileName)
200+
}
201+
var timestampVal []byte
202+
for _, name := range existingLatestFileNames {
203+
if strings.Contains(name, common.TimestampFileName) {
204+
timestampVal, err = utils.Read(bucketName, name)
205+
if err != nil {
206+
return errors.Wrapf(err, "failed while trying to read from the existing timestamp file: %s", name)
207+
}
208+
}
209+
}
210+
for _, name := range existingLatestFileNames {
211+
var filename string
212+
switch filepath.Base(name) {
213+
case common.NetworkFileName:
214+
filename = common.NetworkFileName
215+
case common.ChecksumFileName:
216+
filename = common.ChecksumFileName
217+
case common.TimestampFileName:
218+
filename = common.TimestampFileName
219+
default:
220+
return fmt.Errorf("unrecognized file name: %s", name)
221+
}
222+
223+
newName := filepath.Join(topLevelPrefixes, string(timestampVal), filename)
224+
if isDryRun {
225+
log.Printf("Dry run specified. Not renaming %s -> %s", name, newName)
226+
} else {
227+
err := utils.Copy(bucketName, name, bucketName, newName)
228+
if err != nil {
229+
return errors.Wrap(err, "failed to copy existing latest files to timestamped folder")
230+
}
231+
}
232+
}
233+
234+
return nil
235+
}
236+
185237
func uploadExternalNetworkSources(
186238
networks *common.ExternalNetworkSources,
187239
isDryRun bool,
188-
bucketName, objectPrefix string,
240+
bucketName, objectPrefix, timestamp string,
189241
) error {
190242
data, cksum, err := marshalAndGetCksum(networks)
191243
if err != nil {
@@ -201,16 +253,21 @@ func uploadExternalNetworkSources(
201253
if err != nil {
202254
return errors.Wrapf(err, "content upload succeeded but checksum upload has failed. Checksum: %s", cksum)
203255
}
256+
err = uploadObjectWithPrefix(bucketName, objectPrefix, common.TimestampFileName, []byte(timestamp))
257+
if err != nil {
258+
return errors.Wrapf(err, "content upload succeeded but timestamp upload has failed. Checksum: %s", timestamp)
259+
}
204260
log.Print("Successfully uploaded all contents and checksum.")
205261
log.Print("++++++")
206262
log.Printf("Please check bucket: https://console.cloud.google.com/storage/browser/%s", bucketName)
207263
log.Print("++++++")
208264
} else {
209265
// In dry run, just print out the package name and hashes
210266
log.Printf(
211-
"Dry run specified. Folder name is: %s. Checksum computed is: %s",
267+
"Dry run specified. Folder name is: %s. Checksum computed is: %s. Timestamp is: %s",
212268
objectPrefix,
213-
cksum)
269+
cksum,
270+
timestamp)
214271
}
215272

216273
return nil
@@ -245,26 +302,7 @@ func getObjectPrefix(prefixes ...string) string {
245302
return filepath.Join(prefixes...)
246303
}
247304

248-
func getFolderName() string {
305+
func getCurrentTimestamp() string {
249306
// Some Go magic here. DO NOT CHANGE THIS STRING
250307
return time.Now().UTC().Format("2006-01-02 15-04-05")
251308
}
252-
253-
func updateLatestPrefixPointer(isDryRun bool, bucketName, latestFolderName, filePrefix string) error {
254-
if !isDryRun {
255-
// Write new latest_prefix file
256-
err := uploadObjectWithPrefix(bucketName, filePrefix, common.LatestPrefixFileName, []byte(latestFolderName))
257-
if err != nil {
258-
return errors.Wrapf(err, "failed to write latest_prefix file under bucket: %s", bucketName)
259-
}
260-
} else {
261-
// Dry run specified.
262-
log.Printf(
263-
"Dry run specified. Skipping the update of %s with folder name %s under bucket %s",
264-
common.LatestPrefixFileName,
265-
latestFolderName,
266-
bucketName)
267-
}
268-
269-
return nil
270-
}

pkg/common/constants.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ const NetworkFileName = "networks"
1919
// network ranges file
2020
const ChecksumFileName = "checksum"
2121

22-
// LatestPrefixFileName is the name of the file which contains the
22+
// TimestampFileName is the name which contains the timestamp of the
23+
// network ranges files
24+
const TimestampFileName = "timestamp"
25+
26+
// LatestFolderName is the name of the file which contains the
2327
// folder name of the latest crawler output
24-
const LatestPrefixFileName = "latest_prefix"
28+
const LatestFolderName = "latest"
2529

2630
// MasterBucketPrefix is the top level prefix we use for all the uploads we do
2731
// in this crawler

pkg/common/utils/gstorage.go

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package utils
22

33
import (
44
"context"
5+
"io/ioutil"
56
"path/filepath"
67
"time"
78

@@ -49,13 +50,46 @@ func WriteToBucket(
4950
// DeleteObjectWithPrefix deletes any object in the specified bucket that
5051
// starts with the specified prefix.
5152
func DeleteObjectWithPrefix(bucketName, prefix string) error {
53+
names, err := GetAllObjectNamesWithPrefix(bucketName, prefix)
54+
if err != nil {
55+
return err
56+
}
57+
5258
ctx, cancel := context.WithTimeout(context.Background(), gCloudClientTimeout)
5359
defer cancel()
5460
client, err := storage.NewClient(ctx)
5561
if err != nil {
5662
return err
5763
}
5864

65+
bucket := client.Bucket(bucketName)
66+
67+
for _, name := range names {
68+
err = bucket.Object(name).Delete(ctx)
69+
if err != nil {
70+
errors.Wrapf(
71+
err,
72+
"failed to delete object with name %s under bucket %s. Please clean up manually",
73+
name,
74+
bucketName)
75+
// Return early
76+
return err
77+
}
78+
}
79+
80+
return nil
81+
}
82+
83+
// GetAllObjectNamesWithPrefix returns all object start with specified prefix
84+
// under the specified bucket
85+
func GetAllObjectNamesWithPrefix(bucketName, prefix string) ([]string, error) {
86+
ctx, cancel := context.WithTimeout(context.Background(), gCloudClientTimeout)
87+
defer cancel()
88+
client, err := storage.NewClient(ctx)
89+
if err != nil {
90+
return nil, err
91+
}
92+
5993
bucket := client.Bucket(bucketName)
6094
query := &storage.Query{Prefix: prefix}
6195

@@ -68,23 +102,50 @@ func DeleteObjectWithPrefix(bucketName, prefix string) error {
68102
}
69103
if err != nil {
70104
errors.Wrapf(err, "failed while trying to list and traverse objects in bucket %s", bucketName)
71-
return err
105+
return nil, err
72106
}
73107
names = append(names, attrs.Name)
74108
}
75109

76-
for _, name := range names {
77-
err = bucket.Object(name).Delete(ctx)
78-
if err != nil {
79-
errors.Wrapf(
80-
err,
81-
"failed to delete object with name %s under bucket %s. Please clean up manually",
82-
name,
83-
bucketName)
84-
// Return early
85-
return err
86-
}
110+
return names, nil
111+
}
112+
113+
// Read returns the content of the specified file under specified bucket
114+
func Read(bucketName, objectName string) ([]byte, error) {
115+
ctx, cancel := context.WithTimeout(context.Background(), gCloudClientTimeout)
116+
defer cancel()
117+
client, err := storage.NewClient(ctx)
118+
if err != nil {
119+
return nil, err
87120
}
88121

122+
reader, err := client.Bucket(bucketName).Object(objectName).NewReader(ctx)
123+
if err != nil {
124+
return nil, err
125+
}
126+
defer reader.Close()
127+
128+
data, err := ioutil.ReadAll(reader)
129+
if err != nil {
130+
return nil, err
131+
}
132+
return data, nil
133+
}
134+
135+
// Copy copies the specified object into the specified target
136+
func Copy(srcBucketName, srcObjectName, dstBucketName, dstObjectName string) error {
137+
ctx, cancel := context.WithTimeout(context.Background(), gCloudClientTimeout)
138+
defer cancel()
139+
client, err := storage.NewClient(ctx)
140+
if err != nil {
141+
return err
142+
}
143+
144+
src := client.Bucket(srcBucketName).Object(srcObjectName)
145+
dst := client.Bucket(dstBucketName).Object(dstObjectName)
146+
147+
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
148+
return errors.Wrapf(err, "failed while copying to %s from %s", dstObjectName, srcObjectName)
149+
}
89150
return nil
90151
}

0 commit comments

Comments
 (0)