Skip to content

Commit

Permalink
Create mirror queues and bucket (#6859)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Feb 7, 2025
1 parent 2fb55d3 commit a6f3bf8
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 9 deletions.
2 changes: 2 additions & 0 deletions deployments/dev/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ def env() -> Mapping[str, Optional[str]]:

'AZUL_GOOGLE_OAUTH2_CLIENT_ID': '713613812354-aelk662bncv14d319dk8juce9p11um00.apps.googleusercontent.com',

'AZUL_MIRRORING_ENABLED': '1',

'azul_slack_integration': json.dumps({
'workspace_id': 'T09P9H91S', # ucsc-gi.slack.com
'channel_id': 'C04K81HUALD' # #team-boardwalk-dev
Expand Down
2 changes: 2 additions & 0 deletions deployments/sandbox/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,6 @@ def env() -> Mapping[str, Optional[str]]:
'GOOGLE_PROJECT': 'platform-hca-dev',

'AZUL_GOOGLE_OAUTH2_CLIENT_ID': '713613812354-3bj4m7vnsbco82bke96idvg8cpdv6r9r.apps.googleusercontent.com',

'AZUL_MIRRORING_ENABLED': '1',
}
5 changes: 5 additions & 0 deletions environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@ def env() -> Mapping[str, Optional[str]]:
#
'AZUL_DSS_SOURCE': None,

# Mirror data files from the indexed repository in a dedicated S3 bucket
# (1 yes, 0 no).
#
'AZUL_MIRRORING_ENABLED': '0',

# A short string (no punctuation allowed) that identifies a Terraform
# component i.e., a distinct set of Terraform resources to be deployed
# together but separately from resources in other components. They are
Expand Down
4 changes: 2 additions & 2 deletions scripts/manage_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def main(argv):
help='Name of the queue to purge.')

sps.add_parser('purge_all',
help='Purge all messages in all queues in the current deployment. Use with caution. The '
'messages will be lost forever.')
help='Purge all messages in all queues (except the mirror queues) in the current deployment. '
'Use with caution. The messages will be lost forever.')

sp = sps.add_parser('dump_all',
help='Dump all messages in all queues in the current deployment. Each queue will be '
Expand Down
6 changes: 5 additions & 1 deletion scripts/mirror_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from azul.azulclient import (
AzulClient,
)
from azul.deployment import (
aws,
)
from azul.drs import (
AccessMethod,
)
Expand Down Expand Up @@ -76,12 +79,13 @@ def object_key(file: JSON) -> str:


def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str:
assert config.enable_mirroring, 'Mirroring must be enabled'
assert config.is_tdr_enabled(catalog), 'Only TDR catalogs are supported'
assert config.is_hca_enabled(catalog), 'Only HCA catalogs are supported'
file = get_file(catalog, file_uuid)
download_url = get_download_url(catalog, file)
key = object_key(file)
storage = StorageService()
storage = StorageService(bucket_name=aws.mirror_bucket)
upload = storage.create_multipart_upload(key, content_type=file['content-type'])

total_size = file['size']
Expand Down
17 changes: 16 additions & 1 deletion src/azul/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ def qualified_bucket_name(self,

storage_term = 'storage'

mirror_term = 'datamirror'

current = Sentinel()

def alb_access_log_path_prefix(self,
Expand Down Expand Up @@ -1323,6 +1325,9 @@ def contribution_lambda_timeout(self, *, retry: bool) -> int:
def aggregation_lambda_timeout(self, *, retry: bool) -> int:
return (10 if retry else 1) * 60

def mirror_lambda_timeout(self) -> int:
return 15

service_lambda_timeout = 15 * 60

api_gateway_timeout = 29
Expand Down Expand Up @@ -1494,10 +1499,15 @@ def derive(self, *, retry: bool = False, fail: bool = False) -> Self:

notifications_queue = Queue('notifications')
tallies_queue = Queue('tallies', is_fifo=True)
mirror_queue = Queue('mirror', is_fifo=True)

@property
def all_queue_names(self) -> list[str]:
return self.indexer_queue_names + self.fail_queue_names
return [
*self.indexer_queue_names,
*self.fail_queue_names,
*([self.mirror_queue.name] if self.enable_mirroring else []),
]

@property
def indexer_queue_names(self) -> list[str]:
Expand All @@ -1512,6 +1522,7 @@ def fail_queue_names(self) -> list[str]:
return [
self.notifications_queue.fail.name,
self.tallies_queue.fail.name,
*([self.mirror_queue.fail.name] if self.enable_mirroring else []),
]

url_shortener_whitelist = [
Expand Down Expand Up @@ -1747,6 +1758,10 @@ def vpn_subnet(self) -> str:
def it_flags(self) -> set[str]:
return set(self.environ.get('azul_it_flags', '').split())

@property
def enable_mirroring(self) -> bool:
return self._boolean(self.environ['AZUL_MIRRORING_ENABLED'])


config: Config = Config() # yes, the type hint does help PyCharm

Expand Down
5 changes: 5 additions & 0 deletions src/azul/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,11 @@ def storage_bucket(self):
return self.qualified_bucket_name(config.storage_term,
deployment_name=config.deployment_stage)

@property
def mirror_bucket(self):
return self.qualified_bucket_name(config.mirror_term,
deployment_name=config.deployment_stage)

# An ELB account ID, which varies depending on region, is needed to specify
# the principal in bucket policies for buckets storing LB access logs.
#
Expand Down
15 changes: 11 additions & 4 deletions terraform/s3.tf.json.template.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
config.storage_term: {
'bucket': aws.storage_bucket,
'force_destroy': True
}
},
**({config.mirror_term: {
'bucket': aws.mirror_bucket,
}} if config.enable_mirroring else {})
},
'aws_s3_bucket_lifecycle_configuration': {
'storage': {
Expand All @@ -45,13 +48,17 @@
}
},
'aws_s3_bucket_logging': {
config.storage_term: {
'bucket': '${aws_s3_bucket.%s.id}' % config.storage_term,
bucket: {
'bucket': '${aws_s3_bucket.%s.id}' % bucket,
'target_bucket': '${data.aws_s3_bucket.%s.id}' % config.logs_term,
# Other S3 log deliveries, like ELB, implicitly put a slash
# after the prefix. S3 doesn't, so we add one explicitly.
'target_prefix': config.s3_access_log_path_prefix(config.storage_term) + '/'
'target_prefix': config.s3_access_log_path_prefix(bucket) + '/'
}
for bucket in (
config.storage_term,
*([config.mirror_term] if config.enable_mirroring else [])
)
}
}
}
Expand Down
20 changes: 19 additions & 1 deletion terraform/sqs.tf.json.template.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,25 @@
'fifo_queue': True,
'name': config.tallies_queue.fail.name,
'message_retention_seconds': 14 * 24 * 60 * 60,
}
},
**({
config.mirror_queue.unqual_name: {
'name': config.mirror_queue.name,
'fifo_queue': True,
'message_retention_seconds': 7 * 24 * 60 * 60,
'visibility_timeout_seconds': config.mirror_lambda_timeout() + 10,
'redrive_policy': json.dumps({
'maxReceiveCount': 1,
'deadLetterTargetArn': '${aws_sqs_queue.%s.arn}'
% config.mirror_queue.fail.unqual_name
})
},
config.mirror_queue.fail.unqual_name: {
'name': config.mirror_queue.fail.name,
'fifo_queue': True,
'message_retention_seconds': 14 * 24 * 60 * 60,
}
} if config.enable_mirroring else {})
}
}
]
Expand Down

0 comments on commit a6f3bf8

Please sign in to comment.