Skip to content

Commit

Permalink
173 null dam error (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
i-be-snek authored Oct 31, 2024
1 parent 71ac081 commit ac1ba21
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 31 deletions.
6 changes: 4 additions & 2 deletions Database/insert_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pandarallel import pandarallel
from tqdm import tqdm

from Database.scr.normalize_utils import GeoJsonUtils, Logging
from Database.scr.normalize_utils import GeoJsonUtils, Logging, NormalizeUtils

pandarallel.initialize(progress_bar=False, nb_workers=5)

Expand Down Expand Up @@ -110,6 +110,7 @@

args = parser.parse_args()
logger = Logging.get_logger(f"database-insertion", level="INFO", filename="v1_full_run_insertion_raw.log")
utils = NormalizeUtils()

connection = sqlite3.connect(args.database_name)
cursor = connection.cursor()
Expand Down Expand Up @@ -190,6 +191,7 @@

for f in tqdm(files, desc="Files"):
data = pd.read_parquet(f"{args.file_dir}/{f}", engine="fastparquet")
data = utils.replace_nulls(data)

logger.info("Converting everything to strings...")
for c in data.columns:
Expand Down Expand Up @@ -261,5 +263,5 @@
)
logger.error(f"Insert errors were found! THIS ROW WAS NOT INSERTED! Storing in {tmp_errors_filename}")
pathlib.Path(args.error_path).mkdir(parents=True, exist_ok=True)
errors.to_json(tmp_errors_filename, orient="records")
errors.to_json(tmp_errors_filename, orient="records", indent=3)
connection.close()
65 changes: 39 additions & 26 deletions Database/parse_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
for str_col in [x for x in events.columns if x in split_by_pipe_cols]:
logger.info(f"Splitting column {str_col} by pipe")
events[str_col] = events[str_col].progress_apply(
lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else None))
lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else []))
)

logger.info("STEP: Normalizing country-level administrative areas if present")
Expand All @@ -113,7 +113,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
logger.info("Normalizing administrative areas...")
events[f"{admin_area_col}_Tmp"] = events["Administrative_Areas"].progress_apply(
lambda admin_areas: (
[norm_loc.normalize_locations(c, is_country=True) for c in admin_areas]
[norm_loc.normalize_locations(area=c, is_country=True) for c in admin_areas]
if isinstance(admin_areas, list)
else []
)
Expand Down Expand Up @@ -220,9 +220,9 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
logger.info("Cleaning event names...")
event_name_col = [x for x in events.columns if "Event_Name" in x]
if len(event_name_col) == 1:
event_name_col = event_name_col[0]
events["Event_Names"] = events[event_name_col].progress_apply(
lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else None)
event_name_col_str: str = event_name_col[0]
events["Event_Names"] = events[event_name_col_str].progress_apply(
lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else [])
)

hazards, main_event = "Hazards", "Main_Event"
Expand Down Expand Up @@ -251,16 +251,13 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
logger.info(f"STEP: Validation relationship between col {hazards} and col {main_event}")
events = events.progress_apply(lambda row: validation.validate_main_event_hazard_relation(row), axis=1)

logger.info("Converting annotation columns to strings to store in sqlite3")
annotation_cols = [col for col in events.columns if col.endswith(("_with_annotation", "_Annotation"))]

logger.info(f"Storing parsed results for l1 events. Target columns: {target_columns}")
utils.df_to_parquet(
events[[x for x in target_columns if x in events.columns]],
f"{args.output_dir}/l1",
200,
)
del total_summary_cols, annotation_cols, total_cols
del total_summary_cols, total_cols
return events


Expand Down Expand Up @@ -357,17 +354,14 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
)
)
logger.info(f"Normalizing dates for subevet {col}")
start_date_col, end_date_col = [c for c in sub_event.columns if col.startswith("Start_Date")], [
c for c in sub_event.columns if col.startswith("End_Date")
]
assert len(start_date_col) == len(end_date_col), "Check the start and end date columns"
assert len(start_date_col) <= 1, "Check the start and end date columns, there might be too many"

if start_date_col and end_date_col:
logger.info(f"Normalizing start and end date in columns {start_date_col} and {end_date_col}")
start_date_col, end_date_col = start_date_col[0], end_date_col[0]
start_date_col, end_date_col = (
"Start_Date" if "Start_Date" in sub_event.columns else None,
"End_Date" if "End_Date" in sub_event.columns else None,
)
concat_list = [sub_event]
if start_date_col:
logger.info(f"Normalizing start date column {start_date_col}")
start_dates = sub_event[start_date_col].progress_apply(utils.normalize_date)
end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date)
start_date_cols = pd.DataFrame(
start_dates.to_list(),
columns=[
Expand All @@ -376,6 +370,11 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
f"{start_date_col}_Year",
],
)
concat_list.append(start_date_cols)

if start_date_col:
logger.info(f"Normalizing end date column {end_date_col}")
end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date)
end_date_cols = pd.DataFrame(
end_dates.to_list(),
columns=[
Expand All @@ -384,10 +383,16 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
f"{end_date_col}_Year",
],
)
sub_event.reset_index(inplace=True, drop=True)
sub_event = pd.concat([sub_event, start_date_cols, end_date_cols], axis=1)
concat_list.append(end_date_cols)
sub_event.reset_index(inplace=True, drop=True)
sub_event = pd.concat(concat_list, axis=1)
del concat_list

if level == "l2" and administrative_area_col in sub_event.columns:
logger.info(f"Normalizing nulls in {administrative_area_col} for {level} {col}")
sub_event[administrative_area_col] = sub_event[administrative_area_col].progress_apply(
lambda admin_areas: utils.filter_null_list(admin_areas) if isinstance(admin_areas, list) else []
)
logger.info(f"Normalizing administrative area names for {level} {col}")
sub_event[f"{administrative_area_col}_Tmp"] = sub_event[administrative_area_col].progress_apply(
lambda admin_areas: (
Expand Down Expand Up @@ -437,6 +442,10 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
)

elif level == "l3" and administrative_area_col in sub_event.columns:
logger.info(f"Normalizing nulls in {administrative_area_col} for {level} {col}")
sub_event[administrative_area_col] = sub_event[administrative_area_col].apply(
lambda admin_area: utils.filter_null_str(admin_area)
)
sub_event[
[
f"{administrative_area_col}_Norm",
Expand All @@ -459,7 +468,12 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].progress_apply(
lambda area: norm_loc.get_gadm_gid(country=area) if area else []
)

if location_col in sub_event.columns:
logger.info(f"Normalizing nulls in {location_col} for {level} {col}")
sub_event[location_col] = sub_event[location_col].progress_apply(
lambda locations: utils.filter_null_list(locations) if isinstance(locations, list) else []
)
logger.info(f"Normalizing location names for {level} {col}")
sub_event[f"{location_col}_Tmp"] = sub_event.progress_apply(
lambda row: (
Expand Down Expand Up @@ -528,14 +542,13 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
logger.info(f"Dropped {rows_before-rows_after} row(s) in {col}")
del rows_before, rows_after
logger.info(f"Storing parsed results for subevent {col}")
for c in sub_event.columns:
sub_event[c] = sub_event[c].astype(str)
if target_columns:
logger.info(f"Storing the following target columns for {col} {level}: {target_columns}")
sub_event = sub_event[[x for x in target_columns if x in sub_event.columns]]
logger.info(f"Normalizing nulls for {level} {col}")
sub_event = utils.replace_nulls(sub_event)
utils.df_to_parquet(
sub_event,
target_dir=f"{args.output_dir}/{level}/{col}",
chunk_size=200,
sub_event, target_dir=f"{args.output_dir}/{level}/{col}", chunk_size=200, object_encoding="infer"
)


Expand Down
7 changes: 4 additions & 3 deletions Database/scr/normalize_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ def geocode_api_request(

def normalize_locations(
self, area: str, is_country: bool = False, in_country: str = None
) -> tuple[str, str, dict] | None:
) -> tuple[str, str | None, dict | None]:
"""Queries a geocode service for a location (country or smaller) and returns the top result"""
original_area = area
try:
try:
if area:
Expand All @@ -164,7 +165,7 @@ def normalize_locations(
unsd_search_output = self._get_unsd_region(area, return_name=True) if area and is_country else None
if unsd_search_output:
# TODO: add geojson for unsd regions
return [unsd_search_output, "UNSD region", None]
return [unsd_search_output.title(), "UNSD region", None]

area = area.lower().strip()
if "_" in area:
Expand Down Expand Up @@ -336,7 +337,7 @@ def normalize_locations(
f"Could not find location {area}; is_country: {is_country}; in_country: {in_country}. Error message: {err}."
)
# return un-normalized area name
return (area, None, None)
return (original_area, None, None)

def _get_unsd_region(
self, area, fuzzy_match_n: int = 1, fuzzy_match_cuttoff: float = 0.8, return_name: bool = False
Expand Down
22 changes: 22 additions & 0 deletions Database/scr/normalize_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ def eval(self, x: list | str):
)
return None

@staticmethod
def filter_null_list(lst: list) -> list:
new_list = []
for l in lst:
if isinstance(l, str):
if l.lower().strip() not in ["null", "none"]:
new_list.append(l)
elif l == float("nan") or l is None:
pass
else:
new_list.append(l)
return new_list

@staticmethod
def filter_null_str(l: str | None) -> str | None:
if isinstance(l, str):
if l.lower().strip() in ["null", "none"]:
return None
if l == float("nan") or l is None:
return None
return l

@staticmethod
def simple_country_check(c: str):
try:
Expand Down

0 comments on commit ac1ba21

Please sign in to comment.