Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

173 null dam error #183

Merged
merged 9 commits into from
Oct 31, 2024
6 changes: 4 additions & 2 deletions Database/insert_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pandarallel import pandarallel
from tqdm import tqdm

from Database.scr.normalize_utils import GeoJsonUtils, Logging
from Database.scr.normalize_utils import GeoJsonUtils, Logging, NormalizeUtils

pandarallel.initialize(progress_bar=False, nb_workers=5)

Expand Down Expand Up @@ -110,6 +110,7 @@

args = parser.parse_args()
logger = Logging.get_logger(f"database-insertion", level="INFO", filename="v1_full_run_insertion_raw.log")
utils = NormalizeUtils()

connection = sqlite3.connect(args.database_name)
cursor = connection.cursor()
Expand Down Expand Up @@ -190,6 +191,7 @@

for f in tqdm(files, desc="Files"):
data = pd.read_parquet(f"{args.file_dir}/{f}", engine="fastparquet")
data = utils.replace_nulls(data)

logger.info("Converting everything to strings...")
for c in data.columns:
Expand Down Expand Up @@ -261,5 +263,5 @@
)
logger.error(f"Insert errors were found! THIS ROW WAS NOT INSERTED! Storing in {tmp_errors_filename}")
pathlib.Path(args.error_path).mkdir(parents=True, exist_ok=True)
errors.to_json(tmp_errors_filename, orient="records")
errors.to_json(tmp_errors_filename, orient="records", indent=3)
connection.close()
65 changes: 39 additions & 26 deletions Database/parse_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
for str_col in [x for x in events.columns if x in split_by_pipe_cols]:
logger.info(f"Splitting column {str_col} by pipe")
events[str_col] = events[str_col].progress_apply(
lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else None))
lambda x: (x.split("|") if isinstance(x, str) else (x if isinstance(x, str) else []))
)

logger.info("STEP: Normalizing country-level administrative areas if present")
Expand All @@ -113,7 +113,7 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
logger.info("Normalizing administrative areas...")
events[f"{admin_area_col}_Tmp"] = events["Administrative_Areas"].progress_apply(
lambda admin_areas: (
[norm_loc.normalize_locations(c, is_country=True) for c in admin_areas]
[norm_loc.normalize_locations(area=c, is_country=True) for c in admin_areas]
if isinstance(admin_areas, list)
else []
)
Expand Down Expand Up @@ -220,9 +220,9 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
logger.info("Cleaning event names...")
event_name_col = [x for x in events.columns if "Event_Name" in x]
if len(event_name_col) == 1:
event_name_col = event_name_col[0]
events["Event_Names"] = events[event_name_col].progress_apply(
lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else None)
event_name_col_str: str = event_name_col[0]
events["Event_Names"] = events[event_name_col_str].progress_apply(
lambda x: ([x.strip()] if isinstance(x, str) else ([y.strip() for y in x]) if isinstance(x, list) else [])
)

hazards, main_event = "Hazards", "Main_Event"
Expand Down Expand Up @@ -251,16 +251,13 @@ def parse_main_events(df: pd.DataFrame, target_columns: list):
logger.info(f"STEP: Validation relationship between col {hazards} and col {main_event}")
events = events.progress_apply(lambda row: validation.validate_main_event_hazard_relation(row), axis=1)

logger.info("Converting annotation columns to strings to store in sqlite3")
annotation_cols = [col for col in events.columns if col.endswith(("_with_annotation", "_Annotation"))]

logger.info(f"Storing parsed results for l1 events. Target columns: {target_columns}")
utils.df_to_parquet(
events[[x for x in target_columns if x in events.columns]],
f"{args.output_dir}/l1",
200,
)
del total_summary_cols, annotation_cols, total_cols
del total_summary_cols, total_cols
return events


Expand Down Expand Up @@ -357,17 +354,14 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
)
)
logger.info(f"Normalizing dates for subevet {col}")
start_date_col, end_date_col = [c for c in sub_event.columns if col.startswith("Start_Date")], [
c for c in sub_event.columns if col.startswith("End_Date")
]
assert len(start_date_col) == len(end_date_col), "Check the start and end date columns"
assert len(start_date_col) <= 1, "Check the start and end date columns, there might be too many"

if start_date_col and end_date_col:
logger.info(f"Normalizing start and end date in columns {start_date_col} and {end_date_col}")
start_date_col, end_date_col = start_date_col[0], end_date_col[0]
start_date_col, end_date_col = (
"Start_Date" if "Start_Date" in sub_event.columns else None,
"End_Date" if "End_Date" in sub_event.columns else None,
)
concat_list = [sub_event]
if start_date_col:
logger.info(f"Normalizing start date column {start_date_col}")
start_dates = sub_event[start_date_col].progress_apply(utils.normalize_date)
end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date)
start_date_cols = pd.DataFrame(
start_dates.to_list(),
columns=[
Expand All @@ -376,6 +370,11 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
f"{start_date_col}_Year",
],
)
concat_list.append(start_date_cols)

if start_date_col:
logger.info(f"Normalizing end date column {end_date_col}")
end_dates = sub_event[end_date_col].progress_apply(utils.normalize_date)
end_date_cols = pd.DataFrame(
end_dates.to_list(),
columns=[
Expand All @@ -384,10 +383,16 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
f"{end_date_col}_Year",
],
)
sub_event.reset_index(inplace=True, drop=True)
sub_event = pd.concat([sub_event, start_date_cols, end_date_cols], axis=1)
concat_list.append(end_date_cols)
sub_event.reset_index(inplace=True, drop=True)
sub_event = pd.concat(concat_list, axis=1)
del concat_list

if level == "l2" and administrative_area_col in sub_event.columns:
logger.info(f"Normalizing nulls in {administrative_area_col} for {level} {col}")
sub_event[administrative_area_col] = sub_event[administrative_area_col].progress_apply(
lambda admin_areas: utils.filter_null_list(admin_areas) if isinstance(admin_areas, list) else []
)
logger.info(f"Normalizing administrative area names for {level} {col}")
sub_event[f"{administrative_area_col}_Tmp"] = sub_event[administrative_area_col].progress_apply(
lambda admin_areas: (
Expand Down Expand Up @@ -437,6 +442,10 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
)

elif level == "l3" and administrative_area_col in sub_event.columns:
logger.info(f"Normalizing nulls in {administrative_area_col} for {level} {col}")
sub_event[administrative_area_col] = sub_event[administrative_area_col].apply(
lambda admin_area: utils.filter_null_str(admin_area)
)
sub_event[
[
f"{administrative_area_col}_Norm",
Expand All @@ -459,7 +468,12 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
sub_event[f"{administrative_area_col}_GID"] = sub_event[f"{administrative_area_col}_Norm"].progress_apply(
lambda area: norm_loc.get_gadm_gid(country=area) if area else []
)

if location_col in sub_event.columns:
logger.info(f"Normalizing nulls in {location_col} for {level} {col}")
sub_event[location_col] = sub_event[location_col].progress_apply(
lambda locations: utils.filter_null_list(locations) if isinstance(locations, list) else []
)
logger.info(f"Normalizing location names for {level} {col}")
sub_event[f"{location_col}_Tmp"] = sub_event.progress_apply(
lambda row: (
Expand Down Expand Up @@ -528,14 +542,13 @@ def parse_sub_level_event(df, level: str, target_columns: list = []):
logger.info(f"Dropped {rows_before-rows_after} row(s) in {col}")
del rows_before, rows_after
logger.info(f"Storing parsed results for subevent {col}")
for c in sub_event.columns:
sub_event[c] = sub_event[c].astype(str)
if target_columns:
logger.info(f"Storing the following target columns for {col} {level}: {target_columns}")
sub_event = sub_event[[x for x in target_columns if x in sub_event.columns]]
logger.info(f"Normalizing nulls for {level} {col}")
sub_event = utils.replace_nulls(sub_event)
utils.df_to_parquet(
sub_event,
target_dir=f"{args.output_dir}/{level}/{col}",
chunk_size=200,
sub_event, target_dir=f"{args.output_dir}/{level}/{col}", chunk_size=200, object_encoding="infer"
)


Expand Down
7 changes: 4 additions & 3 deletions Database/scr/normalize_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ def geocode_api_request(

def normalize_locations(
self, area: str, is_country: bool = False, in_country: str = None
) -> tuple[str, str, dict] | None:
) -> tuple[str, str | None, dict | None]:
"""Queries a geocode service for a location (country or smaller) and returns the top result"""
original_area = area
try:
try:
if area:
Expand All @@ -164,7 +165,7 @@ def normalize_locations(
unsd_search_output = self._get_unsd_region(area, return_name=True) if area and is_country else None
if unsd_search_output:
# TODO: add geojson for unsd regions
return [unsd_search_output, "UNSD region", None]
return [unsd_search_output.title(), "UNSD region", None]

area = area.lower().strip()
if "_" in area:
Expand Down Expand Up @@ -336,7 +337,7 @@ def normalize_locations(
f"Could not find location {area}; is_country: {is_country}; in_country: {in_country}. Error message: {err}."
)
# return un-normalized area name
return (area, None, None)
return (original_area, None, None)

def _get_unsd_region(
self, area, fuzzy_match_n: int = 1, fuzzy_match_cuttoff: float = 0.8, return_name: bool = False
Expand Down
22 changes: 22 additions & 0 deletions Database/scr/normalize_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ def eval(self, x: list | str):
)
return None

@staticmethod
def filter_null_list(lst: list) -> list:
new_list = []
for l in lst:
if isinstance(l, str):
if l.lower().strip() not in ["null", "none"]:
new_list.append(l)
elif l == float("nan") or l is None:
pass
else:
new_list.append(l)
return new_list

@staticmethod
def filter_null_str(l: str | None) -> str | None:
if isinstance(l, str):
if l.lower().strip() in ["null", "none"]:
return None
if l == float("nan") or l is None:
return None
return l

@staticmethod
def simple_country_check(c: str):
try:
Expand Down