From ac1ba2145ec54a1c57ba650d8eff803671aa2ad6 Mon Sep 17 00:00:00 2001 From: Shorouq <94386553+i-be-snek@users.noreply.github.com> Date: Thu, 31 Oct 2024 12:03:58 +0100 Subject: [PATCH] 173 null dam error (#183) --- Database/insert_events.py | 6 ++- Database/parse_events.py | 65 +++++++++++++++++------------ Database/scr/normalize_locations.py | 7 ++-- Database/scr/normalize_utils.py | 22 ++++++++++ 4 files changed, 69 insertions(+), 31 deletions(-) diff --git a/Database/insert_events.py b/Database/insert_events.py index d04c46e3e..ed8f049d1 100644 --- a/Database/insert_events.py +++ b/Database/insert_events.py @@ -8,7 +8,7 @@ from pandarallel import pandarallel from tqdm import tqdm -from Database.scr.normalize_utils import GeoJsonUtils, Logging +from Database.scr.normalize_utils import GeoJsonUtils, Logging, NormalizeUtils pandarallel.initialize(progress_bar=False, nb_workers=5) @@ -110,6 +110,7 @@ args = parser.parse_args() logger = Logging.get_logger(f"database-insertion", level="INFO", filename="v1_full_run_insertion_raw.log") + utils = NormalizeUtils() connection = sqlite3.connect(args.database_name) cursor = connection.cursor() @@ -190,6 +191,7 @@ for f in tqdm(files, desc="Files"): data = pd.read_parquet(f"{args.file_dir}/{f}", engine="fastparquet") + data = utils.replace_nulls(data) logger.info("Converting everything to strings...") for c in data.columns: @@ -261,5 +263,5 @@ ) logger.error(f"Insert errors were found! THIS ROW WAS NOT INSERTED! Storing in {tmp_errors_filename}") pathlib.Path(args.error_path).mkdir(parents=True, exist_ok=True) - errors.to_json(tmp_errors_filename, orient="records") + errors.to_json(tmp_errors_filename, orient="records", indent=3) connection.close() diff --git a/Database/parse_events.py b/Database/parse_events.py index bf7f18ea0..56db32514 100644 --- a/Database/parse_events.py +++ b/Database/parse_events.py @@ -99,7 +99,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): for str_col in [x for x in events.columns if x in split_by_pipe_cols]: logger.info(f"Splitting column {str_col} by pipe") events[str_col] = events[str_col].progress_apply( - lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else None)) + lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else [])) ) logger.info("STEP: Normalizing country-level administrative areas if present") @@ -113,7 +113,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info("Normalizing administrative areas...") events[f"{admin_area_col}_Tmp"] = events["Administrative_Areas"].progress_apply( lambda admin_areas: ( - [norm_loc.normalize_locations(c, is_country=True) for c in admin_areas] + [norm_loc.normalize_locations(area=c, is_country=True) for c in admin_areas] if isinstance(admin_areas, list) else [] ) @@ -220,9 +220,9 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info("Cleaning event names...") event_name_col = [x for x in events.columns if "Event_Name" in x] if len(event_name_col) == 1: - event_name_col = event_name_col[0] - events["Event_Names"] = events[event_name_col].progress_apply( - lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else None) + event_name_col_str: str = event_name_col[0] + events["Event_Names"] = events[event_name_col_str].progress_apply( + lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else []) ) hazards, main_event = "Hazards", "Main_Event" @@ -251,16 +251,13 @@ def parse_main_events(df: pd.DataFrame, target_columns: list): logger.info(f"STEP: Validation relationship between col {hazards} and col {main_event}") events = events.progress_apply(lambda row: validation.validate_main_event_hazard_relation(row), axis=1) - logger.info("Converting annotation columns to strings to store in sqlite3") - annotation_cols = [col for col in events.columns if col.endswith(("_with_annotation", "_Annotation"))] - logger.info(f"Storing parsed results for l1 events. Target columns: {target_columns}") utils.df_to_parquet( events[[x for x in target_columns if x in events.columns]], f"{args.output_dir}/l1", 200, ) - del total_summary_cols, annotation_cols, total_cols + del total_summary_cols, total_cols return events @@ -357,17 +354,14 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ) ) logger.info(f"Normalizing dates for subevet {col}") - start_date_col, end_date_col = [c for c in sub_event.columns if col.startswith("Start_Date")], [ - c for c in sub_event.columns if col.startswith("End_Date") - ] - assert len(start_date_col) == len(end_date_col), "Check the start and end date columns" - assert len(start_date_col) <= 1, "Check the start and end date columns, there might be too many" - - if start_date_col and end_date_col: - logger.info(f"Normalizing start and end date in columns {start_date_col} and {end_date_col}") - start_date_col, end_date_col = start_date_col[0], end_date_col[0] + start_date_col, end_date_col = ( + "Start_Date" if "Start_Date" in sub_event.columns else None, + "End_Date" if "End_Date" in sub_event.columns else None, + ) + concat_list = [sub_event] + if start_date_col: + logger.info(f"Normalizing start date column {start_date_col}") start_dates = sub_event[start_date_col].progress_apply(utils.normalize_date) - end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date) start_date_cols = pd.DataFrame( start_dates.to_list(), columns=[ @@ -376,6 +370,11 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): f"{start_date_col}_Year", ], ) + concat_list.append(start_date_cols) + + if start_date_col: + logger.info(f"Normalizing end date column {end_date_col}") + end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date) end_date_cols = pd.DataFrame( end_dates.to_list(), columns=[ @@ -384,10 +383,16 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): f"{end_date_col}_Year", ], ) - sub_event.reset_index(inplace=True, drop=True) - sub_event = pd.concat([sub_event, start_date_cols, end_date_cols], axis=1) + concat_list.append(end_date_cols) + sub_event.reset_index(inplace=True, drop=True) + sub_event = pd.concat(concat_list, axis=1) + del concat_list if level == "l2" and administrative_area_col in sub_event.columns: + logger.info(f"Normalizing nulls in {administrative_area_col} for {level} {col}") + sub_event[administrative_area_col] = sub_event[administrative_area_col].progress_apply( + lambda admin_areas: utils.filter_null_list(admin_areas) if isinstance(admin_areas, list) else [] + ) logger.info(f"Normalizing administrative area names for {level} {col}") sub_event[f"{administrative_area_col}_Tmp"] = sub_event[administrative_area_col].progress_apply( lambda admin_areas: ( @@ -437,6 +442,10 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): ) elif level == "l3" and administrative_area_col in sub_event.columns: + logger.info(f"Normalizing nulls in {administrative_area_col} for {level} {col}") + sub_event[administrative_area_col] = sub_event[administrative_area_col].apply( + lambda admin_area: utils.filter_null_str(admin_area) + ) sub_event[ [ f"{administrative_area_col}_Norm", @@ -459,7 +468,12 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].progress_apply( lambda area: norm_loc.get_gadm_gid(country=area) if area else [] ) + if location_col in sub_event.columns: + logger.info(f"Normalizing nulls in {location_col} for {level} {col}") + sub_event[location_col] = sub_event[location_col].progress_apply( + lambda locations: utils.filter_null_list(locations) if isinstance(locations, list) else [] + ) logger.info(f"Normalizing location names for {level} {col}") sub_event[f"{location_col}_Tmp"] = sub_event.progress_apply( lambda row: ( @@ -528,14 +542,13 @@ def parse_sub_level_event(df, level: str, target_columns: list = []): logger.info(f"Dropped {rows_before-rows_after} row(s) in {col}") del rows_before, rows_after logger.info(f"Storing parsed results for subevent {col}") - for c in sub_event.columns: - sub_event[c] = sub_event[c].astype(str) if target_columns: + logger.info(f"Storing the following target columns for {col} {level}: {target_columns}") sub_event = sub_event[[x for x in target_columns if x in sub_event.columns]] + logger.info(f"Normalizing nulls for {level} {col}") + sub_event = utils.replace_nulls(sub_event) utils.df_to_parquet( - sub_event, - target_dir=f"{args.output_dir}/{level}/{col}", - chunk_size=200, + sub_event, target_dir=f"{args.output_dir}/{level}/{col}", chunk_size=200, object_encoding="infer" ) diff --git a/Database/scr/normalize_locations.py b/Database/scr/normalize_locations.py index fb4c70a13..5a709e8d8 100644 --- a/Database/scr/normalize_locations.py +++ b/Database/scr/normalize_locations.py @@ -140,8 +140,9 @@ def geocode_api_request( def normalize_locations( self, area: str, is_country: bool = False, in_country: str = None - ) -> tuple[str, str, dict] | None: + ) -> tuple[str, str | None, dict | None]: """Queries a geocode service for a location (country or smaller) and returns the top result""" + original_area = area try: try: if area: @@ -164,7 +165,7 @@ def normalize_locations( unsd_search_output = self._get_unsd_region(area, return_name=True) if area and is_country else None if unsd_search_output: # TODO: add geojson for unsd regions - return [unsd_search_output, "UNSD region", None] + return [unsd_search_output.title(), "UNSD region", None] area = area.lower().strip() if "_" in area: @@ -336,7 +337,7 @@ def normalize_locations( f"Could not find location {area}; is_country: {is_country}; in_country: {in_country}. Error message: {err}." ) # return un-normalized area name - return (area, None, None) + return (original_area, None, None) def _get_unsd_region( self, area, fuzzy_match_n: int = 1, fuzzy_match_cuttoff: float = 0.8, return_name: bool = False diff --git a/Database/scr/normalize_utils.py b/Database/scr/normalize_utils.py index 63add670c..0604ea1d6 100644 --- a/Database/scr/normalize_utils.py +++ b/Database/scr/normalize_utils.py @@ -132,6 +132,28 @@ def eval(self, x: list | str): ) return None + @staticmethod + def filter_null_list(lst: list) -> list: + new_list = [] + for l in lst: + if isinstance(l, str): + if l.lower().strip() not in ["null", "none"]: + new_list.append(l) + elif l == float("nan") or l is None: + pass + else: + new_list.append(l) + return new_list + + @staticmethod + def filter_null_str(l: str | None) -> str | None: + if isinstance(l, str): + if l.lower().strip() in ["null", "none"]: + return None + if l == float("nan") or l is None: + return None + return l + @staticmethod def simple_country_check(c: str): try: