diff --git a/deployments/dev/environment.py b/deployments/dev/environment.py index 5f39fa7bc..cc253bb7a 100644 --- a/deployments/dev/environment.py +++ b/deployments/dev/environment.py @@ -257,6 +257,8 @@ def env() -> Mapping[str, Optional[str]]: 'AZUL_GOOGLE_OAUTH2_CLIENT_ID': '713613812354-aelk662bncv14d319dk8juce9p11um00.apps.googleusercontent.com', + 'AZUL_MIRRORING_ENABLED': '1', + 'azul_slack_integration': json.dumps({ 'workspace_id': 'T09P9H91S', # ucsc-gi.slack.com 'channel_id': 'C04K81HUALD' # #team-boardwalk-dev diff --git a/deployments/sandbox/environment.py b/deployments/sandbox/environment.py index b450336cd..7114c5da9 100644 --- a/deployments/sandbox/environment.py +++ b/deployments/sandbox/environment.py @@ -279,4 +279,6 @@ def env() -> Mapping[str, Optional[str]]: 'GOOGLE_PROJECT': 'platform-hca-dev', 'AZUL_GOOGLE_OAUTH2_CLIENT_ID': '713613812354-3bj4m7vnsbco82bke96idvg8cpdv6r9r.apps.googleusercontent.com', + + 'AZUL_MIRRORING_ENABLED': '1', } diff --git a/environment.py b/environment.py index 873df33f8..1b219180e 100644 --- a/environment.py +++ b/environment.py @@ -633,6 +633,11 @@ def env() -> Mapping[str, Optional[str]]: # 'AZUL_DSS_SOURCE': None, + # Mirror data files from the indexed repository in a dedicated S3 bucket + # (1 yes, 0 no). + # + 'AZUL_MIRRORING_ENABLED': '0', + # A short string (no punctuation allowed) that identifies a Terraform # component i.e., a distinct set of Terraform resources to be deployed # together but separately from resources in other components. They are diff --git a/scripts/manage_queues.py b/scripts/manage_queues.py index 7a276c9fd..ab96bd031 100644 --- a/scripts/manage_queues.py +++ b/scripts/manage_queues.py @@ -49,8 +49,8 @@ def main(argv): help='Name of the queue to purge.') sps.add_parser('purge_all', - help='Purge all messages in all queues in the current deployment. Use with caution. The ' - 'messages will be lost forever.') + help='Purge all messages in all queues (except the mirror queues) in the current deployment. ' + 'Use with caution. The messages will be lost forever.') sp = sps.add_parser('dump_all', help='Dump all messages in all queues in the current deployment. Each queue will be ' diff --git a/scripts/mirror_file.py b/scripts/mirror_file.py index a85aa850e..fa659b860 100644 --- a/scripts/mirror_file.py +++ b/scripts/mirror_file.py @@ -18,6 +18,9 @@ from azul.azulclient import ( AzulClient, ) +from azul.deployment import ( + aws, +) from azul.drs import ( AccessMethod, ) @@ -76,12 +79,13 @@ def object_key(file: JSON) -> str: def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str: + assert config.enable_mirroring, 'Mirroring must be enabled' assert config.is_tdr_enabled(catalog), 'Only TDR catalogs are supported' assert config.is_hca_enabled(catalog), 'Only HCA catalogs are supported' file = get_file(catalog, file_uuid) download_url = get_download_url(catalog, file) key = object_key(file) - storage = StorageService() + storage = StorageService(bucket_name=aws.mirror_bucket) upload = storage.create_multipart_upload(key, content_type=file['content-type']) total_size = file['size'] diff --git a/src/azul/__init__.py b/src/azul/__init__.py index 462c9d434..00a535b0d 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -285,6 +285,8 @@ def qualified_bucket_name(self, storage_term = 'storage' + mirror_term = 'datamirror' + current = Sentinel() def alb_access_log_path_prefix(self, @@ -1323,6 +1325,9 @@ def contribution_lambda_timeout(self, *, retry: bool) -> int: def aggregation_lambda_timeout(self, *, retry: bool) -> int: return (10 if retry else 1) * 60 + def mirror_lambda_timeout(self) -> int: + return 15 + service_lambda_timeout = 15 * 60 api_gateway_timeout = 29 @@ -1494,10 +1499,15 @@ def derive(self, *, retry: bool = False, fail: bool = False) -> Self: notifications_queue = Queue('notifications') tallies_queue = Queue('tallies', is_fifo=True) + mirror_queue = Queue('mirror', is_fifo=True) @property def all_queue_names(self) -> list[str]: - return self.indexer_queue_names + self.fail_queue_names + return [ + *self.indexer_queue_names, + *self.fail_queue_names, + *([self.mirror_queue.name] if self.enable_mirroring else []), + ] @property def indexer_queue_names(self) -> list[str]: @@ -1512,6 +1522,7 @@ def fail_queue_names(self) -> list[str]: return [ self.notifications_queue.fail.name, self.tallies_queue.fail.name, + *([self.mirror_queue.fail.name] if self.enable_mirroring else []), ] url_shortener_whitelist = [ @@ -1747,6 +1758,10 @@ def vpn_subnet(self) -> str: def it_flags(self) -> set[str]: return set(self.environ.get('azul_it_flags', '').split()) + @property + def enable_mirroring(self) -> bool: + return self._boolean(self.environ['AZUL_MIRRORING_ENABLED']) + config: Config = Config() # yes, the type hint does help PyCharm diff --git a/src/azul/deployment.py b/src/azul/deployment.py index 0f402905b..39dab7624 100644 --- a/src/azul/deployment.py +++ b/src/azul/deployment.py @@ -604,6 +604,11 @@ def storage_bucket(self): return self.qualified_bucket_name(config.storage_term, deployment_name=config.deployment_stage) + @property + def mirror_bucket(self): + return self.qualified_bucket_name(config.mirror_term, + deployment_name=config.deployment_stage) + # An ELB account ID, which varies depending on region, is needed to specify # the principal in bucket policies for buckets storing LB access logs. # diff --git a/terraform/s3.tf.json.template.py b/terraform/s3.tf.json.template.py index 1be912caa..96f01816d 100644 --- a/terraform/s3.tf.json.template.py +++ b/terraform/s3.tf.json.template.py @@ -24,7 +24,10 @@ config.storage_term: { 'bucket': aws.storage_bucket, 'force_destroy': True - } + }, + **({config.mirror_term: { + 'bucket': aws.mirror_bucket, + }} if config.enable_mirroring else {}) }, 'aws_s3_bucket_lifecycle_configuration': { 'storage': { @@ -45,13 +48,17 @@ } }, 'aws_s3_bucket_logging': { - config.storage_term: { - 'bucket': '${aws_s3_bucket.%s.id}' % config.storage_term, + bucket: { + 'bucket': '${aws_s3_bucket.%s.id}' % bucket, 'target_bucket': '${data.aws_s3_bucket.%s.id}' % config.logs_term, # Other S3 log deliveries, like ELB, implicitly put a slash # after the prefix. S3 doesn't, so we add one explicitly. - 'target_prefix': config.s3_access_log_path_prefix(config.storage_term) + '/' + 'target_prefix': config.s3_access_log_path_prefix(bucket) + '/' } + for bucket in ( + config.storage_term, + *([config.mirror_term] if config.enable_mirroring else []) + ) } } } diff --git a/terraform/sqs.tf.json.template.py b/terraform/sqs.tf.json.template.py index 57f579f47..629df2ffb 100644 --- a/terraform/sqs.tf.json.template.py +++ b/terraform/sqs.tf.json.template.py @@ -50,7 +50,25 @@ 'fifo_queue': True, 'name': config.tallies_queue.fail.name, 'message_retention_seconds': 14 * 24 * 60 * 60, - } + }, + **({ + config.mirror_queue.unqual_name: { + 'name': config.mirror_queue.name, + 'fifo_queue': True, + 'message_retention_seconds': 7 * 24 * 60 * 60, + 'visibility_timeout_seconds': config.mirror_lambda_timeout() + 10, + 'redrive_policy': json.dumps({ + 'maxReceiveCount': 1, + 'deadLetterTargetArn': '${aws_sqs_queue.%s.arn}' + % config.mirror_queue.fail.unqual_name + }) + }, + config.mirror_queue.fail.unqual_name: { + 'name': config.mirror_queue.fail.name, + 'fifo_queue': True, + 'message_retention_seconds': 14 * 24 * 60 * 60, + } + } if config.enable_mirroring else {}) } } ]