Skip to content

Commit

Permalink
Updates to worker script, monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
DanRunfola committed Feb 28, 2025
1 parent 38d4bac commit aa6e0b3
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 34 deletions.
211 changes: 187 additions & 24 deletions geoBoundaryBuilder/modules/worker_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import zipfile
import time
import psycopg2
from datetime import datetime
from datetime import datetime, timezone
import logging
import pandas as pd
import shutil
import json

# Import the builder class
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
Expand Down Expand Up @@ -64,51 +65,210 @@ def update_status_in_db(conn, status_type, status, timestamp):
Update or insert a status record in the worker_status table
"""
try:
# Initialize source_date
source_date = None

# Extract ISO and ADM from status_type
parts = status_type.split('_')
logging.info(f"Status type parts: {parts}")

if len(parts) >= 3 and parts[1].startswith('ADM'):
iso = parts[0]
adm = parts[1].replace('ADM', '')
adm = parts[1]
logging.info(f"Processing ISO: {iso}, ADM: {adm}")

# Try to get build date from metadata file
build_date = None
meta_path = f"/sciclone/geograd/geoBoundaries/database/geoBoundariesDev/releaseData/gbOpen/{iso}/{parts[1]}/geoBoundaries-{iso}-{parts[1]}-metaData.json"
try:
# List possible metadata paths
possible_paths = [
f"/sciclone/geograd/geoBoundaries/database/geoBoundariesDev/releaseData/gbOpen/{iso}/{adm}/geoBoundaries-{iso}-{adm}-metaData.json"
]

for meta_path in possible_paths:
logging.info(f"Checking metadata path: {meta_path}")
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
meta_data = json.load(f)
build_date_str = meta_data.get('buildDate')
if build_date_str:
# Convert the date string to a timestamp
build_date = datetime.datetime.strptime(build_date_str, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
except Exception as e:
logging.warning(f"Could not read build date from metadata: {e}")
logging.info(f"Found metadata file at: {meta_path}")
try:
# Try different encodings if needed
encodings = ['utf-8', 'latin-1', 'cp1252']
content = None

for encoding in encodings:
try:
with open(meta_path, 'r', encoding=encoding) as f:
content = f.read()
logging.info(f"Successfully read file with encoding: {encoding}")
break
except UnicodeDecodeError:
logging.warning(f"Failed to read file with encoding: {encoding}")
continue

if content is None:
logging.error(f"Could not read file with any encoding: {meta_path}")
continue

logging.info(f"Metadata content: {content[:200]}...") # Log first 200 chars

try:
# Try to parse as JSON
meta_data = json.loads(content)
except json.JSONDecodeError as json_err:
logging.warning(f"JSON parsing error: {json_err}. Trying to read file line by line...")
# Try to extract sourceDataUpdateDate manually using regex
import re
source_date_match = re.search(r'"sourceDataUpdateDate"\s*:\s*"([^"]+)"', content)
if source_date_match:
source_date_str = source_date_match.group(1)
logging.info(f"Extracted sourceDataUpdateDate using regex: {source_date_str}")
try:
# Try different date formats
for fmt in ['%Y-%m-%d', '%Y%m%d', '%Y/%m/%d', '%b %d, %Y', '%a %b %d %H:%M:%S %Y']:
try:
source_date = datetime.strptime(source_date_str, fmt).replace(tzinfo=timezone.utc)
logging.info(f"Successfully parsed source date as: {source_date}")
break
except ValueError:
continue

# If none of the formats worked, try a manual approach for common formats
if not source_date:
try:
# Try to handle format like "Wed Dec 18 10:12:38 2024"
# Format: Weekday Month Day HH:MM:SS Year
parts = source_date_str.split()
if len(parts) == 5: # Weekday Month Day HH:MM:SS Year
weekday, month, day, time_str, year = parts
month_map = {
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}
month_num = month_map.get(month)
if month_num:
hour, minute, second = map(int, time_str.split(':'))
source_date = datetime(int(year), month_num, int(day), hour, minute, second).replace(tzinfo=timezone.utc)
logging.info(f"Successfully parsed source date using manual parsing: {source_date}")
except Exception as e:
logging.warning(f"Error in manual date parsing: {e}")
except Exception as e:
logging.warning(f"Error parsing source date '{source_date_str}': {e}")
continue # Skip the rest of the JSON processing

# First check for sourceDataUpdateDate field directly
if 'sourceDataUpdateDate' in meta_data:
source_date_str = meta_data.get('sourceDataUpdateDate')
logging.info(f"Found sourceDataUpdateDate in metadata: {source_date_str}")
try:
# Try different date formats
for fmt in ['%Y-%m-%d', '%Y%m%d', '%Y/%m/%d', '%b %d, %Y', '%a %b %d %H:%M:%S %Y']:
try:
source_date = datetime.strptime(source_date_str, fmt).replace(tzinfo=timezone.utc)
logging.info(f"Successfully parsed source date as: {source_date}")
break
except ValueError:
continue

# If none of the formats worked, try a manual approach for common formats
if not source_date:
try:
# Try to handle format like "Wed Dec 18 10:12:38 2024"
# Format: Weekday Month Day HH:MM:SS Year
parts = source_date_str.split()
if len(parts) == 5: # Weekday Month Day HH:MM:SS Year
weekday, month, day, time_str, year = parts
month_map = {
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}
month_num = month_map.get(month)
if month_num:
hour, minute, second = map(int, time_str.split(':'))
source_date = datetime(int(year), month_num, int(day), hour, minute, second).replace(tzinfo=timezone.utc)
logging.info(f"Successfully parsed source date using manual parsing: {source_date}")
except Exception as e:
logging.warning(f"Error in manual date parsing: {e}")
except Exception as e:
logging.warning(f"Error parsing source date '{source_date_str}': {e}")

# If sourceDataUpdateDate not found, try other possible date fields
if not source_date:
date_fields = ['sourceDate', 'sourceDataDate', 'lastUpdated', 'date']
for field in date_fields:
source_date_str = meta_data.get(field)
if source_date_str:
logging.info(f"Found source date in field '{field}': {source_date_str}")
try:
# Try different date formats
for fmt in ['%Y-%m-%d', '%Y%m%d', '%Y/%m/%d', '%b %d, %Y', '%a %b %d %H:%M:%S %Y']:
try:
source_date = datetime.strptime(source_date_str, fmt).replace(tzinfo=timezone.utc)
logging.info(f"Successfully parsed source date as: {source_date}")
break
except ValueError:
continue

# If none of the formats worked, try a manual approach for common formats
if not source_date:
try:
# Try to handle format like "Wed Dec 18 10:12:38 2024"
# Format: Weekday Month Day HH:MM:SS Year
parts = source_date_str.split()
if len(parts) == 5: # Weekday Month Day HH:MM:SS Year
weekday, month, day, time_str, year = parts
month_map = {
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}
month_num = month_map.get(month)
if month_num:
hour, minute, second = map(int, time_str.split(':'))
source_date = datetime(int(year), month_num, int(day), hour, minute, second).replace(tzinfo=timezone.utc)
logging.info(f"Successfully parsed source date using manual parsing: {source_date}")
except Exception as e:
logging.warning(f"Error in manual date parsing: {e}")
if source_date:
break
except Exception as e:
logging.warning(f"Error parsing source date '{source_date_str}': {e}")

if source_date:
break # Exit the file loop if we found a date
except Exception as e:
logging.warning(f"Error reading metadata file {meta_path}: {e}")
else:
logging.info(f"Metadata file not found at: {meta_path}")

with conn.cursor() as cur:
# First, try to alter table to add BUILD_DATE column if it doesn't exist
# First, try to alter table to add SOURCE_DATE column if it doesn't exist
try:
cur.execute("""
ALTER TABLE worker_status
ADD COLUMN IF NOT EXISTS "BUILD_DATE" TIMESTAMP WITH TIME ZONE
ADD COLUMN IF NOT EXISTS "SOURCE_DATE" TIMESTAMP WITH TIME ZONE
""")
conn.commit()
except Exception as e:
logging.warning(f"Could not add BUILD_DATE column: {e}")
logging.warning(f"Could not add SOURCE_DATE column: {e}")
conn.rollback()

# Upsert query including BUILD_DATE
# Debug log the values being inserted
logging.info(f"Inserting values - STATUS_TYPE: {status_type}, STATUS: {status}, TIME: {timestamp}, SOURCE_DATE: {source_date}")

# Upsert query including SOURCE_DATE
upsert_query = """
INSERT INTO worker_status ("STATUS_TYPE", "STATUS", "TIME", "BUILD_DATE")
INSERT INTO worker_status ("STATUS_TYPE", "STATUS", "TIME", "SOURCE_DATE")
VALUES (%s, %s, %s, %s)
ON CONFLICT ("STATUS_TYPE")
DO UPDATE SET
"STATUS" = EXCLUDED."STATUS",
"TIME" = EXCLUDED."TIME",
"BUILD_DATE" = EXCLUDED."BUILD_DATE"
"SOURCE_DATE" = EXCLUDED."SOURCE_DATE"
"""
cur.execute(upsert_query, (status_type, status, timestamp, build_date))

# Execute the query and verify the result
cur.execute(upsert_query, (status_type, status, timestamp, source_date))
conn.commit()
logging.info(f"Updated worker_status for {status_type}: {status} (Build Date: {build_date})")

# Verify the update
cur.execute('SELECT "SOURCE_DATE" FROM worker_status WHERE "STATUS_TYPE" = %s', (status_type,))
result = cur.fetchone()
logging.info(f"Verified SOURCE_DATE after update for {status_type}: {result[0] if result else None}")
except Exception as e:
logging.error(f"Error updating worker_status in database: {e}")
conn.rollback()
Expand Down Expand Up @@ -175,9 +335,12 @@ def format_elapsed_time(start_time):
("checkSourceValidity", "VALIDATING_SOURCE"),
("checkBuildTabularMetaData", "BUILDING_METADATA"),
("checkBuildGeometryFiles", "PROCESSING_GEOMETRY"),
("calculateGeomMeta", "CALCULATING_GEOMETRY_META"),
("constructFiles", "CONSTRUCTING_FILES")
("calculateGeomMeta", "CALCULATING_GEOMETRY_META")
]

# Force rebuild by setting changesDetected to True
bnd.changesDetected = True
stages.append(("constructFiles", "CONSTRUCTING_FILES"))

for stage_method, stage_status in stages:
logging.info(f"Running stage: {stage_method}")
Expand Down
4 changes: 2 additions & 2 deletions geoBoundaryBuilder/monitor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_worker_grid():
with conn.cursor() as cur:
# Get all worker statuses
cur.execute("""
SELECT "STATUS_TYPE", "STATUS", "TIME"
SELECT "STATUS_TYPE", "STATUS", "TIME", "SOURCE_DATE"
FROM worker_status
WHERE "STATUS_TYPE" LIKE '%_WORKER'
""")
Expand All @@ -59,7 +59,7 @@ def get_worker_grid():
'adm': adm,
'status': status,
'time': timestamp,
'build_date': row[3] if len(row) > 3 else None
'source_date': row[3].replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('America/New_York')).isoformat() if row[3] else None
})

return jsonify({'grid_data': grid_data})
Expand Down
Loading

0 comments on commit aa6e0b3

Please sign in to comment.