Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Record transfer limit for data refresh (#474)
Browse files Browse the repository at this point in the history
* Unify approach type

* Move existing unit tests to separate unit test subfolder

* Add data refresh limit for primary copy query

* Add tests for checking that limits/order bys are included when needed

* Fallback should be int

* Remove gc
  • Loading branch information
AetherUnbound authored Jan 21, 2022
1 parent ab26c74 commit 01a3330
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 189 deletions.
6 changes: 6 additions & 0 deletions ingestion_server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ The server is designed to be run in a private network only. You must not expose

If a `SLACK_WEBHOOK` variable is provided, the ingestion server will provide periodic updates on the progress of a data refresh, or relay any errors that may occur during the process.

## Data refresh limit

The `DATA_REFRESH_LIMIT` variable can be used to define a limit to the number of rows pulled from the upstream
catalog database. If the server is running in an `ENVIRONMENT` that is not `prod` or `production`, this is
automatically set to 100k records.

## Running on the host

1. Create environment variables from the template file.
Expand Down
4 changes: 4 additions & 0 deletions ingestion_server/ingestion_server/constants/internal_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from typing import Literal


ApproachType = Literal["basic", "advanced"]
24 changes: 18 additions & 6 deletions ingestion_server/ingestion_server/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
"""

import datetime
import gc
import logging as log
import multiprocessing

import psycopg2
from decouple import config
Expand All @@ -26,6 +26,7 @@

from ingestion_server import slack
from ingestion_server.cleanup import clean_image_data
from ingestion_server.constants.internal_types import ApproachType
from ingestion_server.indexer import database_connect
from ingestion_server.queries import (
get_copy_data_query,
Expand Down Expand Up @@ -247,7 +248,12 @@ def _update_progress(progress, new_value):
progress.value = new_value


def reload_upstream(table, progress=None, finish_time=None, approach="advanced"):
def reload_upstream(
table: str,
progress: multiprocessing.Value = None,
finish_time: multiprocessing.Value = None,
approach: ApproachType = "advanced",
):
"""
Import updates from the upstream catalog database into the API. The
process involves the following steps.
Expand All @@ -265,6 +271,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")
:param table: The upstream table to copy.
:param progress: multiprocessing.Value float for sharing task progress
:param finish_time: multiprocessing.Value int for sharing finish timestamp
:param approach: whether to use advanced logic specific to media ingestion
"""

# Step 1: Get the list of overlapping columns
Expand Down Expand Up @@ -298,7 +305,15 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")

# Step 3: Import data into a temporary table
log.info("Copying upstream data...")
copy_data = get_copy_data_query(table, shared_cols, approach=approach)
environment = config("ENVIRONMENT", default="local").lower()
limit_default = 100_000
if environment in {"prod", "production"}:
# If we're in production, turn off limits unless it's explicitly provided
limit_default = 0
limit = config("DATA_REFRESH_LIMIT", cast=int, default=limit_default)
copy_data = get_copy_data_query(
table, shared_cols, approach=approach, limit=limit
)
log.info(f"Running copy-data query: \n{copy_data.as_string(downstream_cur)}")
downstream_cur.execute(copy_data)

Expand All @@ -320,9 +335,6 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced")
f"_Next: re-applying indices & constraints_"
)

# The server sometimes hangs on or before this next step. This is a pre-emptive
# garbage collection to try and assist with that.
gc.collect()
downstream_db = database_connect()
with downstream_db.cursor() as downstream_cur:
# Step 5: Recreate indices from the original table
Expand Down
93 changes: 66 additions & 27 deletions ingestion_server/ingestion_server/queries.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Literal
from textwrap import dedent as d
from typing import Optional

from psycopg2.sql import SQL, Identifier
from psycopg2.sql import Literal as PgLiteral

from ingestion_server.constants.internal_types import ApproachType


def get_existence_queries(table):
"""
Expand Down Expand Up @@ -75,7 +78,10 @@ def get_fdw_query(


def get_copy_data_query(
table: str, columns: list[str], approach: Literal["basic", "advanced"]
table: str,
columns: list[str],
approach: ApproachType,
limit: Optional[int] = 100_000,
):
"""
Get the query for copying data from the upstream table to a temporary table
Expand All @@ -84,75 +90,108 @@ def get_copy_data_query(
table and avoids entries from the deleted table with the "api_deleted"
prefix. After the copying process, the "upstream" schema is dropped.
When running this on a non-production environment, the results will be ordered
by `identifier` to simulate a random sample and only the first 100k records
will be pulled from the upstream database.
:param table: the name of the downstream table being replaced
:param columns: the names of the columns to copy from upstream
:param approach: whether to use advanced logic specific to media ingestion
:param limit: number of rows to copy when
:return: the SQL query for copying the data
"""

table_creation = """
table_creation = d(
"""
DROP TABLE IF EXISTS {temp_table};
CREATE TABLE {temp_table} (LIKE {table} INCLUDING DEFAULTS INCLUDING CONSTRAINTS);
"""
)

id_column_setup = """
id_column_setup = d(
"""
ALTER TABLE {temp_table} ADD COLUMN IF NOT EXISTS
id serial;
CREATE TEMP SEQUENCE IF NOT EXISTS id_temp_seq;
ALTER TABLE {temp_table} ALTER COLUMN
id SET DEFAULT nextval('id_temp_seq'::regclass);
"""
)

timestamp_column_setup = """
timestamp_column_setup = d(
"""
ALTER TABLE {temp_table} ALTER COLUMN
created_on SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE {temp_table} ALTER COLUMN
updated_on SET DEFAULT CURRENT_TIMESTAMP;
"""
)

metric_column_setup = """
metric_column_setup = d(
"""
ALTER TABLE {temp_table} ADD COLUMN IF NOT EXISTS
standardized_popularity double precision;
ALTER TABLE {temp_table} ALTER COLUMN
view_count SET DEFAULT 0;
"""
)

conclusion = """
conclusion = d(
"""
ALTER TABLE {temp_table} ADD PRIMARY KEY (id);
DROP SERVER upstream CASCADE;
"""
)

if approach == "basic":
steps = [
table_creation,
id_column_setup,
timestamp_column_setup,
tertiary_column_setup = timestamp_column_setup
select_insert = d(
"""
INSERT INTO {temp_table} ({columns}) SELECT {columns} from {upstream_table};
""",
conclusion,
]
INSERT INTO {temp_table} ({columns}) SELECT {columns} FROM {upstream_table}
"""
)
else: # approach == 'advanced'
steps = [
table_creation,
id_column_setup,
metric_column_setup,
tertiary_column_setup = metric_column_setup
select_insert = d(
"""
INSERT INTO {temp_table} ({columns})
SELECT {columns} from {upstream_table} AS u
WHERE NOT EXISTS(
SELECT FROM {deleted_table} WHERE identifier = u.identifier
)
"""
)

# If a limit is requested, add the condition onto the select at the very end
if limit:
# The audioset view does not have identifiers associated with it
if table != "audioset":
select_insert += d(
"""
ORDER BY identifier"""
)
select_insert += d(
"""
INSERT INTO {temp_table} ({columns})
SELECT {columns} from {upstream_table} AS u
WHERE NOT EXISTS(
SELECT FROM {deleted_table} WHERE identifier = u.identifier
);
""",
conclusion,
]
LIMIT {limit}"""
)
# Always add a semi-colon at the end
select_insert += ";"

steps = [
table_creation,
id_column_setup,
tertiary_column_setup,
select_insert,
conclusion,
]

return SQL("".join(steps)).format(
table=Identifier(table),
temp_table=Identifier(f"temp_import_{table}"),
upstream_table=Identifier("upstream_schema", f"{table}_view"),
deleted_table=Identifier(f"api_deleted{table}"),
columns=SQL(",").join([Identifier(col) for col in columns]),
limit=PgLiteral(limit),
)


Expand Down
Loading

0 comments on commit 01a3330

Please sign in to comment.