Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
docker network create --driver bridge delphi-net
docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata
docker run --rm -d -p 6379:6379 --network delphi-net --env "REDIS_PASSWORD=1234" --name delphi_redis delphi_redis


- run: |
wget https://raw.githubusercontent.com/eficode/wait-for/master/wait-for
Expand Down Expand Up @@ -103,7 +103,7 @@ jobs:
with:
node-version: '16.x'
- name: Cache Node.js modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ~/.npm # npm cache files are stored in `~/.npm` on Linux/macOS
key: ${{ runner.OS }}-node2-${{ hashFiles('**/package-lock.json') }}
Expand Down
37 changes: 37 additions & 0 deletions src/client/delphi_epidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,43 @@ def covid_hosp_facility_lookup(state=None, ccn=None, city=None, zip=None, fips_c
# Make the API call
return Epidata._request("covid_hosp_facility_lookup", params)

# Fetch Canadian respiratory RVDSS data
@staticmethod
def rvdss(
geo_type,
time_values,
geo_value,
as_of=None,
issues=None,
):
"""Fetch RVDSS data."""
# Check parameters
if None in (geo_type, time_values, geo_value):
raise EpidataBadRequestException(
"`geo_type`, `geo_value`, and `time_values` are all required"
)

# Set up request
params = {
# Fake a time type param so that we can use some helper functions later.
"time_type": "week",
"geo_type": geo_type,
"time_values": Epidata._list(time_values),
}

if isinstance(geo_value, (list, tuple)):
params["geo_values"] = ",".join(geo_value)
else:
params["geo_values"] = geo_value
if as_of is not None:
params["as_of"] = as_of
if issues is not None:
params["issues"] = Epidata._list(issues)

# Make the API call
return Epidata._request("rvdss", params)


@staticmethod
def async_epidata(param_list, batch_size=50):
"""[DEPRECATED] Make asynchronous Epidata calls for a list of parameters."""
Expand Down
22 changes: 18 additions & 4 deletions src/server/_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,29 @@ def apply_issues_filter(self, history_table: str, issues: Optional[TimeValues])
self.where_integers("issue", issues)
return self

def apply_as_of_filter(self, history_table: str, as_of: Optional[int]) -> "QueryBuilder":
# Note: This function was originally only used by covidcast, so it assumed
# that `source` and `signal` columns are always available. To make this usable
# for rvdss, too, which doesn't have the `source` or `signal` columns, add the
# `use_source_signal` flag to toggle inclusion of `source` and `signal` columns.
#
# `use_source_signal` defaults to True so if not passed, the function
# retains the historical behavior.
def apply_as_of_filter(self, history_table: str, as_of: Optional[int], use_source_signal: Optional[bool] = True) -> "QueryBuilder":
if as_of is not None:
self.retable(history_table)
sub_condition_asof = "(issue <= :as_of)"
self.params["as_of"] = as_of
sub_fields = "max(issue) max_issue, time_type, time_value, `source`, `signal`, geo_type, geo_value"
sub_group = "time_type, time_value, `source`, `signal`, geo_type, geo_value"

alias = self.alias
sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value AND x.source = {alias}.source AND x.signal = {alias}.signal AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value"
source_signal_plain = source_signal_alias = ""
# If `use_source_signal` toggled on, append `source` and `signal` columns to group-by list and join list.
if use_source_signal:
source_signal_plain = ", `source`, `signal`"
source_signal_alias = f" AND x.source = {alias}.source AND x.signal = {alias}.signal"

sub_fields = f"max(issue) max_issue, time_type, time_value{source_signal_plain}, geo_type, geo_value"
sub_group = f"time_type, time_value{source_signal_plain}, geo_type, geo_value"
sub_condition = f"x.max_issue = {alias}.issue AND x.time_type = {alias}.time_type AND x.time_value = {alias}.time_value{source_signal_alias} AND x.geo_type = {alias}.geo_type AND x.geo_value = {alias}.geo_value"
self.subquery = f"JOIN (SELECT {sub_fields} FROM {self.table} WHERE {self.conditions_clause} AND {sub_condition_asof} GROUP BY {sub_group}) x ON {sub_condition}"
return self

Expand Down
2 changes: 2 additions & 0 deletions src/server/endpoints/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
nowcast,
paho_dengue,
quidel,
rvdss,
sensors,
twitter,
wiki,
Expand Down Expand Up @@ -59,6 +60,7 @@
nowcast,
paho_dengue,
quidel,
rvdss,
sensors,
twitter,
wiki,
Expand Down
97 changes: 97 additions & 0 deletions src/server/endpoints/rvdss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from flask import Blueprint, request

from .._params import (
extract_date,
extract_dates,
extract_strings,
parse_time_set,
)
from .._query import execute_query, QueryBuilder
from .._validate import require_all

bp = Blueprint("rvdss", __name__)

db_table_name = "rvdss"

@bp.route("/", methods=("GET", "POST"))
def handle():
require_all(request, "time_type", "time_values", "geo_type", "geo_values")

time_set = parse_time_set()
geo_type = extract_strings("geo_type")
geo_values = extract_strings("geo_values")
issues = extract_dates("issues")
as_of = extract_date("as_of")

# basic query info
q = QueryBuilder(db_table_name, "rv")

fields_string = [
"geo_type",
"geo_value",
"region",
"time_type",
]
fields_int = [
"epiweek",
"time_value",
"issue",
"week",
"weekorder",
"year",
]
fields_float = [
"adv_pct_positive",
"adv_positive_tests",
"adv_tests",
"evrv_pct_positive",
"evrv_positive_tests",
"evrv_tests",
"flu_pct_positive",
"flu_positive_tests",
"flu_tests",
"flua_pct_positive",
"flua_positive_tests",
"flua_tests",
"fluah1n1pdm09_positive_tests",
"fluah3_positive_tests",
"fluauns_positive_tests",
"flub_pct_positive",
"flub_positive_tests",
"flub_tests",
"hcov_pct_positive",
"hcov_positive_tests",
"hcov_tests",
"hmpv_pct_positive",
"hmpv_positive_tests",
"hmpv_tests",
"hpiv1_positive_tests",
"hpiv2_positive_tests",
"hpiv3_positive_tests",
"hpiv4_positive_tests",
"hpiv_pct_positive",
"hpiv_positive_tests",
"hpiv_tests",
"hpivother_positive_tests",
"rsv_pct_positive",
"rsv_positive_tests",
"rsv_tests",
"sarscov2_pct_positive",
"sarscov2_positive_tests",
"sarscov2_tests",
]

q.set_sort_order("epiweek", "time_value", "geo_type", "geo_value", "issue")
q.set_fields(fields_string, fields_int, fields_float)

q.where_strings("geo_type", geo_type)
# Only apply geo_values filter if wildcard "*" was NOT used.
if not (len(geo_values) == 1 and geo_values[0] == "*"):
q.where_strings("geo_value", geo_values)

q.apply_time_filter("time_type", "time_value", time_set)
q.apply_issues_filter(db_table_name, issues)
q.apply_as_of_filter(db_table_name, as_of, use_source_signal = False)

# send query
return execute_query(str(q), q.params, fields_string, fields_int, fields_float)
Loading