diff --git a/ingestion_server/README.md b/ingestion_server/README.md index d36704fea..d59d0bacc 100644 --- a/ingestion_server/README.md +++ b/ingestion_server/README.md @@ -23,6 +23,12 @@ The server is designed to be run in a private network only. You must not expose If a `SLACK_WEBHOOK` variable is provided, the ingestion server will provide periodic updates on the progress of a data refresh, or relay any errors that may occur during the process. +## Data refresh limit + +The `DATA_REFRESH_LIMIT` variable can be used to define a limit to the number of rows pulled from the upstream +catalog database. If the server is running in an `ENVIRONMENT` that is not `prod` or `production`, this is +automatically set to 100k records. + ## Running on the host 1. Create environment variables from the template file. diff --git a/ingestion_server/ingestion_server/constants/internal_types.py b/ingestion_server/ingestion_server/constants/internal_types.py new file mode 100644 index 000000000..455dcd6fa --- /dev/null +++ b/ingestion_server/ingestion_server/constants/internal_types.py @@ -0,0 +1,4 @@ +from typing import Literal + + +ApproachType = Literal["basic", "advanced"] diff --git a/ingestion_server/ingestion_server/ingest.py b/ingestion_server/ingestion_server/ingest.py index d6584debd..24fd7d131 100644 --- a/ingestion_server/ingestion_server/ingest.py +++ b/ingestion_server/ingestion_server/ingest.py @@ -16,8 +16,8 @@ """ import datetime -import gc import logging as log +import multiprocessing import psycopg2 from decouple import config @@ -26,6 +26,7 @@ from ingestion_server import slack from ingestion_server.cleanup import clean_image_data +from ingestion_server.constants.internal_types import ApproachType from ingestion_server.indexer import database_connect from ingestion_server.queries import ( get_copy_data_query, @@ -247,7 +248,12 @@ def _update_progress(progress, new_value): progress.value = new_value -def reload_upstream(table, progress=None, finish_time=None, approach="advanced"): +def reload_upstream( + table: str, + progress: multiprocessing.Value = None, + finish_time: multiprocessing.Value = None, + approach: ApproachType = "advanced", +): """ Import updates from the upstream catalog database into the API. The process involves the following steps. @@ -265,6 +271,7 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") :param table: The upstream table to copy. :param progress: multiprocessing.Value float for sharing task progress :param finish_time: multiprocessing.Value int for sharing finish timestamp + :param approach: whether to use advanced logic specific to media ingestion """ # Step 1: Get the list of overlapping columns @@ -298,7 +305,15 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") # Step 3: Import data into a temporary table log.info("Copying upstream data...") - copy_data = get_copy_data_query(table, shared_cols, approach=approach) + environment = config("ENVIRONMENT", default="local").lower() + limit_default = 100_000 + if environment in {"prod", "production"}: + # If we're in production, turn off limits unless it's explicitly provided + limit_default = 0 + limit = config("DATA_REFRESH_LIMIT", cast=int, default=limit_default) + copy_data = get_copy_data_query( + table, shared_cols, approach=approach, limit=limit + ) log.info(f"Running copy-data query: \n{copy_data.as_string(downstream_cur)}") downstream_cur.execute(copy_data) @@ -320,9 +335,6 @@ def reload_upstream(table, progress=None, finish_time=None, approach="advanced") f"_Next: re-applying indices & constraints_" ) - # The server sometimes hangs on or before this next step. This is a pre-emptive - # garbage collection to try and assist with that. - gc.collect() downstream_db = database_connect() with downstream_db.cursor() as downstream_cur: # Step 5: Recreate indices from the original table diff --git a/ingestion_server/ingestion_server/queries.py b/ingestion_server/ingestion_server/queries.py index a53281cae..83c3b2eb1 100644 --- a/ingestion_server/ingestion_server/queries.py +++ b/ingestion_server/ingestion_server/queries.py @@ -1,8 +1,11 @@ -from typing import Literal +from textwrap import dedent as d +from typing import Optional from psycopg2.sql import SQL, Identifier from psycopg2.sql import Literal as PgLiteral +from ingestion_server.constants.internal_types import ApproachType + def get_existence_queries(table): """ @@ -75,7 +78,10 @@ def get_fdw_query( def get_copy_data_query( - table: str, columns: list[str], approach: Literal["basic", "advanced"] + table: str, + columns: list[str], + approach: ApproachType, + limit: Optional[int] = 100_000, ): """ Get the query for copying data from the upstream table to a temporary table @@ -84,68 +90,100 @@ def get_copy_data_query( table and avoids entries from the deleted table with the "api_deleted" prefix. After the copying process, the "upstream" schema is dropped. + When running this on a non-production environment, the results will be ordered + by `identifier` to simulate a random sample and only the first 100k records + will be pulled from the upstream database. + :param table: the name of the downstream table being replaced :param columns: the names of the columns to copy from upstream :param approach: whether to use advanced logic specific to media ingestion + :param limit: number of rows to copy when :return: the SQL query for copying the data """ - table_creation = """ + table_creation = d( + """ DROP TABLE IF EXISTS {temp_table}; CREATE TABLE {temp_table} (LIKE {table} INCLUDING DEFAULTS INCLUDING CONSTRAINTS); """ + ) - id_column_setup = """ + id_column_setup = d( + """ ALTER TABLE {temp_table} ADD COLUMN IF NOT EXISTS id serial; CREATE TEMP SEQUENCE IF NOT EXISTS id_temp_seq; ALTER TABLE {temp_table} ALTER COLUMN id SET DEFAULT nextval('id_temp_seq'::regclass); """ + ) - timestamp_column_setup = """ + timestamp_column_setup = d( + """ ALTER TABLE {temp_table} ALTER COLUMN created_on SET DEFAULT CURRENT_TIMESTAMP; ALTER TABLE {temp_table} ALTER COLUMN updated_on SET DEFAULT CURRENT_TIMESTAMP; """ + ) - metric_column_setup = """ + metric_column_setup = d( + """ ALTER TABLE {temp_table} ADD COLUMN IF NOT EXISTS standardized_popularity double precision; ALTER TABLE {temp_table} ALTER COLUMN view_count SET DEFAULT 0; """ + ) - conclusion = """ + conclusion = d( + """ ALTER TABLE {temp_table} ADD PRIMARY KEY (id); DROP SERVER upstream CASCADE; """ + ) if approach == "basic": - steps = [ - table_creation, - id_column_setup, - timestamp_column_setup, + tertiary_column_setup = timestamp_column_setup + select_insert = d( """ - INSERT INTO {temp_table} ({columns}) SELECT {columns} from {upstream_table}; - """, - conclusion, - ] + INSERT INTO {temp_table} ({columns}) SELECT {columns} FROM {upstream_table} + """ + ) else: # approach == 'advanced' - steps = [ - table_creation, - id_column_setup, - metric_column_setup, + tertiary_column_setup = metric_column_setup + select_insert = d( + """ + INSERT INTO {temp_table} ({columns}) + SELECT {columns} from {upstream_table} AS u + WHERE NOT EXISTS( + SELECT FROM {deleted_table} WHERE identifier = u.identifier + ) + """ + ) + + # If a limit is requested, add the condition onto the select at the very end + if limit: + # The audioset view does not have identifiers associated with it + if table != "audioset": + select_insert += d( + """ + ORDER BY identifier""" + ) + select_insert += d( """ - INSERT INTO {temp_table} ({columns}) - SELECT {columns} from {upstream_table} AS u - WHERE NOT EXISTS( - SELECT FROM {deleted_table} WHERE identifier = u.identifier - ); - """, - conclusion, - ] + LIMIT {limit}""" + ) + # Always add a semi-colon at the end + select_insert += ";" + + steps = [ + table_creation, + id_column_setup, + tertiary_column_setup, + select_insert, + conclusion, + ] return SQL("".join(steps)).format( table=Identifier(table), @@ -153,6 +191,7 @@ def get_copy_data_query( upstream_table=Identifier("upstream_schema", f"{table}_view"), deleted_table=Identifier(f"api_deleted{table}"), columns=SQL(",").join([Identifier(col) for col in columns]), + limit=PgLiteral(limit), ) diff --git a/ingestion_server/test/unit_test.py b/ingestion_server/test/unit_test.py deleted file mode 100644 index 16b2bc930..000000000 --- a/ingestion_server/test/unit_test.py +++ /dev/null @@ -1,156 +0,0 @@ -import datetime -from unittest.mock import MagicMock -from uuid import uuid4 - -from psycopg2.extras import Json - -from ingestion_server.cleanup import CleanupFunctions, TlsTest -from ingestion_server.elasticsearch_models import Image - - -def create_mock_image(override=None): - """ - Produce a mock image. Override default fields by passing in a dict with the - desired keys and values. - - For example, to make an image with a custom title and default everything - else: - >>> create_mock_image({'title': 'My title'}) - :return: - """ - test_popularity = {"views": 50, "likes": 3, "comments": 1} - license_url = "https://creativecommons.org/licenses/by/2.0/fr/legalcode" - meta_data = {"popularity_metrics": test_popularity, "license_url": license_url} - test_data = { - "id": 0, - "title": "Unit test title", - "identifier": str(uuid4()), - "creator": "Eric Idle", - "creator_url": "https://creativecommons.org", - "tags": [{"name": "test", "accuracy": 0.9}], - "created_on": datetime.datetime.now(), - "url": "https://creativecommons.org", - "thumbnail": "https://creativecommons.org", - "provider": "test", - "source": "test", - "license": "cc-by", - "license_version": "4.0", - "foreign_landing_url": "https://creativecommons.org", - "view_count": 0, - "height": 500, - "width": 500, - "mature": False, - "meta_data": meta_data, - } - if override: - for k, v in override.items(): - test_data[k] = v - schema = {} - row = [] - idx = 0 - for k, v in test_data.items(): - schema[k] = idx - row.append(v) - idx += 1 - return Image.database_row_to_elasticsearch_doc(row, schema) - - -class TestImage: - @staticmethod - def test_size(): - small = create_mock_image({"height": 600, "width": 300}) - assert small.size == Image.ImageSizes.SMALL.name.lower() - huge = create_mock_image({"height": 4096, "width": 4096}) - assert huge.size == Image.ImageSizes.LARGE.name.lower() - - @staticmethod - def test_aspect_ratio(): - square = create_mock_image({"height": 300, "width": 300}) - assert square.aspect_ratio == Image.AspectRatios.SQUARE.name.lower() - tall = create_mock_image({"height": 500, "width": 200}) - assert tall.aspect_ratio == Image.AspectRatios.TALL.name.lower() - wide = create_mock_image({"height": 200, "width": 500}) - assert wide.aspect_ratio == Image.AspectRatios.WIDE.name.lower() - - @staticmethod - def test_extension(): - no_extension = create_mock_image({"url": "https://creativecommons.org/hello"}) - assert no_extension.extension is None - jpg = create_mock_image({"url": "https://creativecommons.org/hello.jpg"}) - assert jpg.extension == "jpg" - - @staticmethod - def test_mature_metadata(): - # Received upstream indication the work is mature - meta = {"mature": True} - mature_metadata = create_mock_image({"meta_data": meta}) - assert mature_metadata["mature"] - - @staticmethod - def test_mature_api(): - # Manually flagged work as mature ourselves - mature_work = create_mock_image({"mature": True}) - assert mature_work["mature"] - - @staticmethod - def test_default_maturity(): - # Default to not flagged - sfw = create_mock_image() - assert not sfw["mature"] - - -class TestCleanup: - @staticmethod - def test_tag_blacklist(): - tags = [ - {"name": "cc0"}, - {"name": " cc0"}, - {"name": "valid", "accuracy": 0.99}, - {"name": "valid_no_accuracy"}, - { - "name": "garbage:=metacrap", - }, - ] - result = str(CleanupFunctions.cleanup_tags(tags)) - expected = str( - Json([{"name": "valid", "accuracy": 0.99}, {"name": "valid_no_accuracy"}]) - ) - - assert result == expected - - @staticmethod - def test_tag_no_update(): - tags = [{"name": "valid", "accuracy": 0.92}] - result = CleanupFunctions.cleanup_tags(tags) - assert result is None - - @staticmethod - def test_accuracy_filter(): - tags = [ - {"name": "inaccurate", "accuracy": 0.5}, - {"name": "accurate", "accuracy": 0.999}, - ] - result = str(CleanupFunctions.cleanup_tags(tags)) - expected = str(Json([{"name": "accurate", "accuracy": 0.999}])) - assert result == expected - - @staticmethod - def test_url_protocol_fix(): - bad_url = "flickr.com" - tls_support_cache = {} - result = CleanupFunctions.cleanup_url(bad_url, tls_support_cache) - expected = "'https://flickr.com'" - - bad_http = "neverssl.com" - TlsTest.test_tls_supported = MagicMock(return_value=False) - result_http = CleanupFunctions.cleanup_url(bad_http, tls_support_cache) - expected_http = "'http://neverssl.com'" - assert result == expected - assert result_http == expected_http - - @staticmethod - def test_rank_feature_verify(): - img = create_mock_image({"standardized_popularity": 200}) - assert img.standardized_popularity == 100 - img2 = create_mock_image({"standardized_popularity": 0}) - assert img2.standardized_popularity is None diff --git a/ingestion_server/test/unit_tests/__init__.py b/ingestion_server/test/unit_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ingestion_server/test/unit_tests/conftest.py b/ingestion_server/test/unit_tests/conftest.py new file mode 100644 index 000000000..44e1b47af --- /dev/null +++ b/ingestion_server/test/unit_tests/conftest.py @@ -0,0 +1,51 @@ +import datetime +from uuid import uuid4 + +from ingestion_server.elasticsearch_models import Image + + +def create_mock_image(override=None): + """ + Produce a mock image. Override default fields by passing in a dict with the + desired keys and values. + + For example, to make an image with a custom title and default everything + else: + >>> create_mock_image({'title': 'My title'}) + :return: + """ + test_popularity = {"views": 50, "likes": 3, "comments": 1} + license_url = "https://creativecommons.org/licenses/by/2.0/fr/legalcode" + meta_data = {"popularity_metrics": test_popularity, "license_url": license_url} + test_data = { + "id": 0, + "title": "Unit test title", + "identifier": str(uuid4()), + "creator": "Eric Idle", + "creator_url": "https://creativecommons.org", + "tags": [{"name": "test", "accuracy": 0.9}], + "created_on": datetime.datetime.now(), + "url": "https://creativecommons.org", + "thumbnail": "https://creativecommons.org", + "provider": "test", + "source": "test", + "license": "cc-by", + "license_version": "4.0", + "foreign_landing_url": "https://creativecommons.org", + "view_count": 0, + "height": 500, + "width": 500, + "mature": False, + "meta_data": meta_data, + } + if override: + for k, v in override.items(): + test_data[k] = v + schema = {} + row = [] + idx = 0 + for k, v in test_data.items(): + schema[k] = idx + row.append(v) + idx += 1 + return Image.database_row_to_elasticsearch_doc(row, schema) diff --git a/ingestion_server/test/unit_tests/test_cleanup.py b/ingestion_server/test/unit_tests/test_cleanup.py new file mode 100644 index 000000000..8e6daa2c7 --- /dev/null +++ b/ingestion_server/test/unit_tests/test_cleanup.py @@ -0,0 +1,63 @@ +from test.unit_tests.conftest import create_mock_image +from unittest.mock import MagicMock + +from psycopg2._json import Json + +from ingestion_server.cleanup import CleanupFunctions, TlsTest + + +class TestCleanup: + @staticmethod + def test_tag_blacklist(): + tags = [ + {"name": "cc0"}, + {"name": " cc0"}, + {"name": "valid", "accuracy": 0.99}, + {"name": "valid_no_accuracy"}, + { + "name": "garbage:=metacrap", + }, + ] + result = str(CleanupFunctions.cleanup_tags(tags)) + expected = str( + Json([{"name": "valid", "accuracy": 0.99}, {"name": "valid_no_accuracy"}]) + ) + + assert result == expected + + @staticmethod + def test_tag_no_update(): + tags = [{"name": "valid", "accuracy": 0.92}] + result = CleanupFunctions.cleanup_tags(tags) + assert result is None + + @staticmethod + def test_accuracy_filter(): + tags = [ + {"name": "inaccurate", "accuracy": 0.5}, + {"name": "accurate", "accuracy": 0.999}, + ] + result = str(CleanupFunctions.cleanup_tags(tags)) + expected = str(Json([{"name": "accurate", "accuracy": 0.999}])) + assert result == expected + + @staticmethod + def test_url_protocol_fix(): + bad_url = "flickr.com" + tls_support_cache = {} + result = CleanupFunctions.cleanup_url(bad_url, tls_support_cache) + expected = "'https://flickr.com'" + + bad_http = "neverssl.com" + TlsTest.test_tls_supported = MagicMock(return_value=False) + result_http = CleanupFunctions.cleanup_url(bad_http, tls_support_cache) + expected_http = "'http://neverssl.com'" + assert result == expected + assert result_http == expected_http + + @staticmethod + def test_rank_feature_verify(): + img = create_mock_image({"standardized_popularity": 200}) + assert img.standardized_popularity == 100 + img2 = create_mock_image({"standardized_popularity": 0}) + assert img2.standardized_popularity is None diff --git a/ingestion_server/test/unit_tests/test_es_models.py b/ingestion_server/test/unit_tests/test_es_models.py new file mode 100644 index 000000000..ad79c950c --- /dev/null +++ b/ingestion_server/test/unit_tests/test_es_models.py @@ -0,0 +1,47 @@ +from test.unit_tests.conftest import create_mock_image + +from ingestion_server.elasticsearch_models import Image + + +class TestImage: + @staticmethod + def test_size(): + small = create_mock_image({"height": 600, "width": 300}) + assert small.size == Image.ImageSizes.SMALL.name.lower() + huge = create_mock_image({"height": 4096, "width": 4096}) + assert huge.size == Image.ImageSizes.LARGE.name.lower() + + @staticmethod + def test_aspect_ratio(): + square = create_mock_image({"height": 300, "width": 300}) + assert square.aspect_ratio == Image.AspectRatios.SQUARE.name.lower() + tall = create_mock_image({"height": 500, "width": 200}) + assert tall.aspect_ratio == Image.AspectRatios.TALL.name.lower() + wide = create_mock_image({"height": 200, "width": 500}) + assert wide.aspect_ratio == Image.AspectRatios.WIDE.name.lower() + + @staticmethod + def test_extension(): + no_extension = create_mock_image({"url": "https://creativecommons.org/hello"}) + assert no_extension.extension is None + jpg = create_mock_image({"url": "https://creativecommons.org/hello.jpg"}) + assert jpg.extension == "jpg" + + @staticmethod + def test_mature_metadata(): + # Received upstream indication the work is mature + meta = {"mature": True} + mature_metadata = create_mock_image({"meta_data": meta}) + assert mature_metadata["mature"] + + @staticmethod + def test_mature_api(): + # Manually flagged work as mature ourselves + mature_work = create_mock_image({"mature": True}) + assert mature_work["mature"] + + @staticmethod + def test_default_maturity(): + # Default to not flagged + sfw = create_mock_image() + assert not sfw["mature"] diff --git a/ingestion_server/test/unit_tests/test_queries.py b/ingestion_server/test/unit_tests/test_queries.py new file mode 100644 index 000000000..606a68749 --- /dev/null +++ b/ingestion_server/test/unit_tests/test_queries.py @@ -0,0 +1,37 @@ +import pytest + +from ingestion_server import queries + + +def _join_seq(seq): + # Quick and dirty solution because as_string requires a database context + # which we don't want to have to construct cause that's a huge PITA to do + # just to check that two strings are equal. + # Lifted from: https://github.com/psycopg/psycopg2/issues/747#issuecomment-662857306 + parts = str(seq).split("'") + return "".join([p for i, p in enumerate(parts) if i % 2 == 1]) + + +@pytest.mark.parametrize( + "table, order_by_expected", + [ + ("sample_table", True), + ("audioset", False), + ], +) +@pytest.mark.parametrize( + "approach, limit, limit_expected", + [ + ("basic", None, False), + ("advanced", None, False), + ("basic", "100000", True), + ("advanced", "100000", True), + ], +) +def test_get_copy_data_query(table, approach, limit, limit_expected, order_by_expected): + actual = queries.get_copy_data_query(table, ["col1", "col2"], approach, limit) + as_string = _join_seq(actual.seq).replace("\\n", "\n").strip() + assert ("LIMIT 100000" in as_string) == limit_expected + assert ("ORDER BY identifier" in as_string) == ( + limit_expected and order_by_expected + )