1
- import concurrent .futures
1
+ """
2
+ This command will create Zip files in order to bundle all collections data,
3
+ and all attachments of collections that have the `attachment.bundle` flag in
4
+ their metadata.
5
+ It then uploads these zip files to Google Cloud Storage.
6
+ """
7
+
2
8
import io
3
9
import json
4
10
import os
8
14
import requests
9
15
from google .cloud import storage
10
16
11
- from . import KintoClient , retry_timeout
17
+ from . import KintoClient , call_parallel , retry_timeout
12
18
13
19
14
20
SERVER = os .getenv ("SERVER" )
15
- REQUESTS_PARALLEL_COUNT = int (os .getenv ("REQUESTS_PARALLEL_COUNT" , "4 " ))
21
+ REQUESTS_PARALLEL_COUNT = int (os .getenv ("REQUESTS_PARALLEL_COUNT" , "8 " ))
16
22
BUNDLE_MAX_SIZE_BYTES = int (os .getenv ("BUNDLE_MAX_SIZE_BYTES" , "20_000_000" ))
17
- BUILD_ALL = os .getenv ("BUILD_ALL" , "0" ) in "1yY"
18
- STORAGE_BUCKET_NAME = os .getenv ("STORAGE_BUCKET_NAME" , "rs-attachments" )
23
+ STORAGE_BUCKET_NAME = os .getenv ("STORAGE_BUCKET_NAME" , "remote-settings-nonprod-stage-attachments" )
19
24
DESTINATION_FOLDER = os .getenv ("DESTINATION_FOLDER" , "bundles" )
25
+ # Flags for local development
26
+ BUILD_ALL = os .getenv ("BUILD_ALL" , "0" ) in "1yY"
20
27
SKIP_UPLOAD = os .getenv ("SKIP_UPLOAD" , "0" ) in "1yY"
21
28
22
29
23
- def call_parallel (func , args_list ):
24
- results = []
25
- with concurrent .futures .ThreadPoolExecutor (max_workers = REQUESTS_PARALLEL_COUNT ) as executor :
26
- futures = [executor .submit (func , * args ) for args in args_list ]
27
- results = [future .result () for future in futures ]
28
- return results
29
-
30
-
31
30
def fetch_all_changesets (client ):
31
+ """
32
+ Return the `/changeset` responses for all collections listed
33
+ in the `monitor/changes` endpoint.
34
+ The result contains the metadata and all the records of all collections
35
+ for both preview and main buckets.
36
+ """
32
37
random_cache_bust = random .randint (999999000000 , 999999999999 )
33
38
monitor_changeset = client .get_changeset ("monitor" , "changes" , random_cache_bust )
34
39
print ("%s collections" % len (monitor_changeset ["changes" ]))
@@ -37,7 +42,7 @@ def fetch_all_changesets(client):
37
42
(c ["bucket" ], c ["collection" ], c ["last_modified" ]) for c in monitor_changeset ["changes" ]
38
43
]
39
44
all_changesets = call_parallel (
40
- lambda bid , cid , ts : client .get_changeset (bid , cid , ts ), args_list
45
+ lambda bid , cid , ts : client .get_changeset (bid , cid , ts ), args_list , REQUESTS_PARALLEL_COUNT
41
46
)
42
47
return [
43
48
{"bucket" : bid , ** changeset } for (bid , _ , _ ), changeset in zip (args_list , all_changesets )
@@ -51,7 +56,11 @@ def fetch_attachment(url):
51
56
return resp .content
52
57
53
58
54
- def write_zip (output_path , content ):
59
+ def write_zip (output_path : str , content : list [tuple [str , bytes ]]):
60
+ """
61
+ Write a Zip at the specified `output_path` location with the specified `content`.
62
+ The content is specified as a list of file names and their binary content.
63
+ """
55
64
parent_folder = os .path .dirname (output_path )
56
65
os .makedirs (parent_folder , exist_ok = True )
57
66
@@ -64,11 +73,15 @@ def write_zip(output_path, content):
64
73
print ("Wrote %r" % output_path )
65
74
66
75
67
- def sync_cloud_storage (folder ):
76
+ def sync_cloud_storage (folder , storage_bucket ):
77
+ """
78
+ Synchronizes a local folder (eg. `bundles/`) with a remote one in the specified
79
+ `storage_bucket` name.
80
+ """
68
81
# Ensure you have set the GOOGLE_APPLICATION_CREDENTIALS environment variable
69
82
# to the path of your Google Cloud service account key file before running this script.
70
83
client = storage .Client ()
71
- bucket = client .bucket (STORAGE_BUCKET_NAME )
84
+ bucket = client .bucket (storage_bucket )
72
85
local_files = set ()
73
86
for root , _ , files in os .walk (folder ):
74
87
for file in files :
@@ -77,17 +90,25 @@ def sync_cloud_storage(folder):
77
90
78
91
blob = bucket .blob (remote_file_path )
79
92
blob .upload_from_filename (local_file_path )
80
- print (f"Uploaded { local_file_path } to gs://{ STORAGE_BUCKET_NAME } /{ remote_file_path } " )
93
+ print (f"Uploaded { local_file_path } to gs://{ storage_bucket } /{ remote_file_path } " )
81
94
local_files .add (remote_file_path )
82
95
83
96
blobs = bucket .list_blobs (prefix = folder )
84
97
for blob in blobs :
85
98
if blob .name not in local_files :
86
99
blob .delete ()
87
- print (f"Deleted gs://{ STORAGE_BUCKET_NAME } /{ blob .name } " )
100
+ print (f"Deleted gs://{ storage_bucket } /{ blob .name } " )
88
101
89
102
90
103
def build_bundles (event , context ):
104
+ """
105
+ Main command entry point that:
106
+ - fetches all collections changesets
107
+ - builds a `bundles/changesets.zip`
108
+ - fetches attachments of all collections with bundle flag
109
+ - builds `bundles/{bid}--{cid}.zip` for each of them
110
+ - synchronizes the `bundles/` folder with a remote Cloud storage bucket
111
+ """
91
112
rs_server = event .get ("server" ) or SERVER
92
113
93
114
client = KintoClient (server_url = rs_server )
@@ -127,12 +148,12 @@ def build_bundles(event, context):
127
148
128
149
# Fetch all attachments and build "{bid}--{cid}.zip"
129
150
args_list = [(f'{ base_url } { r ["attachment" ]["location" ]} ' ,) for r in records ]
130
- all_attachments = call_parallel (fetch_attachment , args_list )
151
+ all_attachments = call_parallel (fetch_attachment , args_list , REQUESTS_PARALLEL_COUNT )
131
152
write_zip (
132
153
f"{ DESTINATION_FOLDER } /{ bid } --{ cid } .zip" ,
133
154
[(f'{ record ["id" ]} .meta.json' , json .dumps (record )) for record in records ]
134
155
+ [(record ["id" ], attachment ) for record , attachment in zip (records , all_attachments )],
135
156
)
136
157
137
158
if not SKIP_UPLOAD :
138
- sync_cloud_storage (DESTINATION_FOLDER )
159
+ sync_cloud_storage (DESTINATION_FOLDER , STORAGE_BUCKET_NAME )
0 commit comments