Skip to content

Commit

Permalink
Merge pull request #28 from cancervariants/staging
Browse files Browse the repository at this point in the history
Staging
  • Loading branch information
korikuzma authored Mar 31, 2022
2 parents 085cfee + 7f41c70 commit eb05554
Show file tree
Hide file tree
Showing 19 changed files with 598 additions and 370 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exclude =
source
outputs
evidence/version.py
build/*
inline-quotes = "
import-order-style = pep8
application-import-names =
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ pyproject.toml

# data
evidence/data/*
evidence/dev/etl/data/*
3 changes: 0 additions & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ name = "pypi"
[packages]
pydantic = "*"
requests = "*"
bravado = "*"
fastapi = "*"
boto3 = "*"
xlrd = "*"

[dev-packages]
evidence = {editable = true, path = "."}
Expand All @@ -27,4 +25,3 @@ variation-normalizer = "*"
click = "*"
openpyxl = "*"
pandas = "*"
xlwt = "*"
10 changes: 7 additions & 3 deletions evidence/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging

APP_ROOT = Path(__file__).resolve().parents[0]
DATA_DIR_PATH = APP_ROOT / "data"
DATA_DIR_PATH = environ.get("DATA_DIR_PATH", APP_ROOT / "data")
SEQREPO_DATA_PATH = environ.get("SEQREPO_DATA_PATH", "/usr/local/share/seqrepo/latest")

if environ.get("EVIDENCE_PROD") == "True":
Expand All @@ -21,10 +21,14 @@
logger = logging.getLogger("evidence")
logger.setLevel(logging.DEBUG)

logging.getLogger("bravado").setLevel(logging.INFO)
logging.getLogger("bravado_core").setLevel(logging.INFO)
logging.getLogger("boto3").setLevel(logging.INFO)
logging.getLogger("botocore").setLevel(logging.INFO)
logging.getLogger("python_jsonschema_objects").setLevel(logging.INFO)
logging.getLogger("swagger_spec_validator").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
logging.getLogger("s3transfer.utils").setLevel(logging.INFO)
logging.getLogger("s3transfer.tasks").setLevel(logging.INFO)
logging.getLogger("s3transfer.futures").setLevel(logging.INFO)
logging.getLogger("hgvs.parser").setLevel(logging.INFO)
logging.getLogger("biocommons.seqrepo.seqaliasdb.seqaliasdb").setLevel(logging.INFO)
logging.getLogger("biocommons.seqrepo.fastadir.fastadir").setLevel(logging.INFO)
54 changes: 53 additions & 1 deletion evidence/data_sources/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Module for the base data source class"""
import json
import hashlib
from pathlib import Path
from typing import Optional

from evidence.schemas import Response
from evidence import logger
from evidence.schemas import Response, SourceDataType


class DataSource:
Expand All @@ -22,3 +25,52 @@ def format_response(resp: Response) -> Response:
digest = hashlib.md5(blob)
resp.id = f"normalize.evidence:{digest.hexdigest()}"
return resp


class DownloadableDataSource(DataSource):
"""A base class for sources that use downloadable data"""

def __init__(self, data_url: str, src_dir_path: Path,
ignore_transformed_data: bool) -> None:
"""Initialize DownloadableDataSource class
:param str data_url: URL to data file
:param Path src_dir_path: Path to source data directory
:param bool ignored_transformed_data: `True` if only bare init is needed. This
is intended for developers when using the CLI to transform source data.
`False` will load the transformed data from s3
"""
self.data_url = data_url
self.src_dir_path = src_dir_path
self.src_dir_path.mkdir(exist_ok=True, parents=True)
self.ignore_transformed_data = ignore_transformed_data

def download_s3_data(self, src_data_type: SourceDataType) -> Path:
"""Download data from public s3 bucket if it does not already exist in data
directory and set the corresponding data path
:param SourceDataType src_data_type: The data type contained in the
transformed data file
"""
raise NotImplementedError

def get_transformed_data_path(self, transformed_data_path: Path,
src_data_type: SourceDataType) -> Optional[Path]:
"""Get transformed data path for source
:param Path transformed_data_path: The path to the transformed data file
:param SourceDataType src_data_type: The data type contained in the
transformed data file
:return: Path to transformed data file
"""
data_path = None
if not self.ignore_transformed_data:
if transformed_data_path:
if transformed_data_path.exists():
data_path = transformed_data_path
else:
logger.error(f"The supplied path at {transformed_data_path} "
f"does not exist.")
else:
data_path = self.download_s3_data(src_data_type)
return data_path
156 changes: 77 additions & 79 deletions evidence/data_sources/cancer_hotspots.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,98 +6,91 @@
from os import remove
import shutil
from pathlib import Path
from typing import Dict, Optional, List
from typing import Dict, Optional, List, Tuple
import csv

import xlrd
import boto3
from botocore.config import Config

from evidence import DATA_DIR_PATH, logger
from evidence.data_sources.base import DataSource
from evidence.schemas import SourceMeta, Response, Sources
from evidence.data_sources.base import DownloadableDataSource
from evidence.schemas import SourceDataType, SourceMeta, Response, Sources


class CancerHotspots(DataSource):
class CancerHotspots(DownloadableDataSource):
"""Class for Cancer Hotspots Data Access."""

def __init__(
self, data_url: str = "https://www.cancerhotspots.org/files/hotspots_v2.xls",
src_dir_path: Path = DATA_DIR_PATH / "cancer_hotspots",
normalized_data_path: Optional[Path] = None,
ignore_normalized_data: bool = False
snv_transformed_data_path: Optional[Path] = None,
indel_transformed_data_path: Optional[Path] = None,
ignore_transformed_data: bool = False
) -> None:
"""Initialize Cancer Hotspots class
:param str data_url: URL to data file
:param Path src_dir_path: Path to cancer hotspots data directory
:param Optional[Path] normalized_data_path: Path to normalized cancer
hotspots file
:param bool ignore_normalized_data: `True` if only bare init is needed. This
is intended for developers when using the CLI to normalize cancer hotspots
data. Ignores path set in `normalized_data_path`.
`False` will load normalized data from s3 and load normalized
excel sheet data.
:param Optional[Path] snv_transformed_data_path: Path to transformed cancer
hotspots SNV file
:param Optional[Path] indel_transformed_data_path: Path to transformed cancer
hotspots INDEL file
:param bool ignore_transformed_data: `True` if only bare init is needed. This
is intended for developers when using the CLI to transform cancer hotspots
data. Ignores paths set in `snv_transformed_data_path` and
`indel_transformed_data_path`. `False` will load transformed data from s3
"""
self.data_url = data_url
fn = self.data_url.split("/")[-1]
self.src_dir_path = src_dir_path
self.src_dir_path.mkdir(exist_ok=True, parents=True)
self.data_path = self.src_dir_path / fn
self.og_snv_sheet_name = "SNV-hotspots"
self.og_indel_sheet_name = "INDEL-hotspots"
self.new_snv_sheet_name = "snv_hotspots"
self.new_indel_sheet_name = "indel_hotspots"
super().__init__(data_url, src_dir_path, ignore_transformed_data)

self.source_meta = SourceMeta(label=Sources.CANCER_HOTSPOTS, version="2")
self.snv_transformed_data_path = self.get_transformed_data_path(
snv_transformed_data_path, SourceDataType.CANCER_HOTSPOTS_SNV)
self.indel_transformed_data_path = self.get_transformed_data_path(
indel_transformed_data_path, SourceDataType.CANCER_HOTSPOTS_INDEL)

if not ignore_normalized_data:
self.normalized_data_path = None
if not normalized_data_path:
self.get_normalized_data_path()
else:
if normalized_data_path.exists():
self.normalized_data_path = normalized_data_path
else:
logger.error(f"The supplied path at `normalized_data_path`, "
f"{normalized_data_path}, for Cancer Hotspots does "
f"not exist.")

if not self.normalized_data_path:
raise FileNotFoundError(
"Unable to retrieve path for normalized Cancer Hotspots data")

wb = xlrd.open_workbook(self.normalized_data_path)
self.snv_hotspots = wb.sheet_by_name(self.og_snv_sheet_name)
self.snv_headers = self.snv_hotspots.row_values(0)
self.indel_hotspots = wb.sheet_by_name(self.og_indel_sheet_name)
self.indel_headers = self.indel_hotspots.row_values(0)

def get_normalized_data_path(self) -> None:
"""Download latest normalized data from public s3 bucket if it does not already
exist in data dir and set normalized_data_path
def download_s3_data(
self, src_data_type: SourceDataType = SourceDataType.CANCER_HOTSPOTS_SNV
) -> None:
"""Download Cancer Hotspots SNV and INDEL data from public s3 bucket if it
does not already exist in data directory and set the corresponding data path
:param SourceDataType src_data_type: The data type contained in the file
:return: Path to transformed data file
"""
logger.info("Retrieving normalized data from s3 bucket...")
data_path = None
is_snv = src_data_type == SourceDataType.CANCER_HOTSPOTS_SNV
data_type = "snv" if is_snv else "indel"
logger.info(f"Retrieving transformed {data_type} data from s3 bucket...")
s3 = boto3.resource("s3", config=Config(region_name="us-east-2"))
bucket = sorted(list(s3.Bucket("vicc-normalizers").objects.filter(
Prefix="evidence_normalization/cancer_hotspots/normalized_hotspots_v").all()), key=lambda o: o.key) # noqa: E501
prefix = \
f"evidence_normalization/cancer_hotspots/hotspots_{data_type}_v"
bucket = sorted(list(s3.Bucket("vicc-normalizers").objects.filter(Prefix=prefix).all()), key=lambda o: o.key) # noqa: E501
if len(bucket) > 0:
obj = bucket.pop().Object()
obj_s3_path = obj.key
zip_fn = obj_s3_path.split("/")[-1]
fn = zip_fn[:-4]
normalized_data_path = self.src_dir_path / fn
if not normalized_data_path.exists():
transformed_data_path = self.src_dir_path / fn
if not transformed_data_path.exists():
zip_path = self.src_dir_path / zip_fn
with open(zip_path, "wb") as f:
obj.download_fileobj(f)
shutil.unpack_archive(zip_path, self.src_dir_path)
remove(zip_path)
logger.info("Successfully downloaded normalized Cancer Hotspots data")
logger.info(f"Successfully downloaded transformed Cancer Hotspots "
f"{data_type} data")
else:
logger.info(f"Latest transformed Cancer Hotspots {data_type} data "
f"already exists")

if is_snv:
data_path = transformed_data_path
else:
logger.info("Latest normalized Cancer Hotspots data already exists")
self.normalized_data_path = normalized_data_path
data_path = transformed_data_path
else:
logger.warning("Could not find normalized Cancer Hotspots"
" data in vicc-normalizers s3 bucket")
logger.warning(f"Could not find transformed Cancer Hotspots {data_type}"
f" data in vicc-normalizers s3 bucket")
return data_path

def mutation_hotspots(self, so_id: str, vrs_variation_id: str) -> Response:
"""Get cancer hotspot data for a variant
Expand All @@ -117,41 +110,46 @@ def mutation_hotspots(self, so_id: str, vrs_variation_id: str) -> Response:
)

@staticmethod
def get_row(sheet: xlrd.sheet.Sheet, vrs_identifier: str) -> Optional[List]:
def get_row(transformed_data_path: Path,
vrs_identifier: str) -> Tuple[Optional[List], Optional[List]]:
"""Get row from xls sheet if vrs_identifier matches value in last column
:param xlrd.sheet.Sheet sheet: The sheet to use
:param Path transformed_data_path: Path to transformed data file
:param str vrs_identifier: The vrs_identifier to match on
:return: Row represented as a list if vrs_identifier match was found, else None
:return: Row represented as a list if vrs_identifier match was found and
headers if match was found
"""
row = None
for row_idx in range(1, sheet.nrows):
tmp_row = sheet.row_values(row_idx)
if tmp_row[-1] == vrs_identifier:
row = tmp_row
break
return row
matched_row = None
headers = None
with open(transformed_data_path) as f:
data = csv.reader(f)
headers = next(data)
for row in data:
if row[headers.index("vrs_identifier")] == vrs_identifier:
matched_row = row
break
return matched_row, headers

def query_snv_hotspots(self, vrs_variation_id: str) -> Optional[Dict]:
"""Return data for SNV
:param str vrs_variation_id: VRS digest for variation
:return: SNV data for vrs_variation_id
"""
row = self.get_row(self.snv_hotspots, vrs_variation_id)
row, headers = self.get_row(self.snv_transformed_data_path, vrs_variation_id)
if not row:
return None

ref = row[self.snv_headers.index("ref")]
pos = row[self.snv_headers.index("Amino_Acid_Position")]
alt = row[self.snv_headers.index("Variant_Amino_Acid")]
ref = row[headers.index("ref")]
pos = row[headers.index("Amino_Acid_Position")]
alt = row[headers.index("Variant_Amino_Acid")]
mutation, observations = alt.split(":")
return {
"codon": f"{ref}{pos}",
"mutation": f"{ref}{pos}{mutation}",
"q_value": row[self.snv_headers.index("qvalue")],
"q_value": float(row[headers.index("qvalue")]),
"observations": int(observations),
"total_observations": int(row[self.snv_headers.index("Mutation_Count")])
"total_observations": int(row[headers.index("Mutation_Count")])
}

def query_indel_hotspots(self, vrs_variation_id: str) -> Optional[Dict]:
Expand All @@ -160,17 +158,17 @@ def query_indel_hotspots(self, vrs_variation_id: str) -> Optional[Dict]:
:param str vrs_variation_id: VRS digest for variation
:return: INDEL data for vrs_variation_id
"""
row = self.get_row(self.indel_hotspots, vrs_variation_id)
row, headers = self.get_row(self.indel_transformed_data_path, vrs_variation_id)
if not row:
return None

pos = row[self.indel_headers.index("Amino_Acid_Position")]
alt = row[self.indel_headers.index("Variant_Amino_Acid")]
pos = row[headers.index("Amino_Acid_Position")]
alt = row[headers.index("Variant_Amino_Acid")]
mutation, observations = alt.split(":")
return {
"codon": pos,
"mutation": mutation,
"q_value": row[self.indel_headers.index("qvalue")],
"q_value": float(row[headers.index("qvalue")]),
"observations": int(observations),
"total_observations": int(row[self.indel_headers.index("Mutation_Count")])
"total_observations": int(row[headers.index("Mutation_Count")])
}
Loading

0 comments on commit eb05554

Please sign in to comment.