Skip to content

Commit

Permalink
grid
Browse files Browse the repository at this point in the history
  • Loading branch information
DanRunfola committed Feb 18, 2025
1 parent a359aba commit 8b4643c
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 24 deletions.
2 changes: 1 addition & 1 deletion geoBoundaryBuilder/modules/builder_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ def checkChange(self, metaTXT):
#We can't simply do a binary contrast here, as the new meta.txt will have new timestamps,
#so this requires we only load part of each file.
newCSVpath = metaTXT
oldCSVpath = self.targetPath + "geoBoundaries-" + str(self.ISO) + "-" + str(self.ADM) + "-metaData.txt"
oldCSVpath = self.targetPath + "/geoBoundaries-" + str(self.ISO) + "-" + str(self.ADM) + "-metaData.txt"

with open(newCSVpath,'r') as f:
newMetaChunk = f.readlines()[:19]
Expand Down
57 changes: 53 additions & 4 deletions geoBoundaryBuilder/modules/git_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,58 @@ def git_pull():
logging.error(f"Could not log error to database: {db_error}")

def check_git_pull_status():
"""Check if git pull is needed based on timestamp.
"""Check if git pull is needed based on conditions:
1. No tasks are currently set to 'ready'
2. At least 3 hours since last task was processed
3. At least 12 hours since last pull
Returns:
bool: True if a git pull was performed, False otherwise
"""
try:
with connect_to_db() as conn:
with conn.cursor() as cur:
# Select the last git pull timestamp
# Check for ready tasks
cur.execute('SELECT COUNT(*) FROM tasks WHERE status = \'ready\'')
ready_count = cur.fetchone()[0]
if ready_count > 0:
logging.info(f"Found {ready_count} ready tasks. Skipping git pull.")
# Get last successful pull time for status message
cur.execute('SELECT "TIME" FROM status WHERE "STATUS_TYPE" = \'GIT_PULL\'')
last_pull = cur.fetchone()
last_pull_time = last_pull[0] if last_pull else 'Never'
status_msg = f"Git pull skipped due to ongoing tasks; last successful pull was {last_pull_time}"
cur.execute(
'UPDATE status SET "TIME" = %s, "STATUS" = %s WHERE "STATUS_TYPE" = \'GIT_PULL\'',
(datetime.now(), status_msg)
)
conn.commit()
return False

# Check time since last processed task
cur.execute("""
SELECT MAX(status_time)
FROM tasks
WHERE status = 'COMPLETE'
""")
last_task_time = cur.fetchone()[0]
if last_task_time:
time_since_last_task = datetime.now() - last_task_time
if time_since_last_task < timedelta(hours=3):
logging.info(f"Only {time_since_last_task} since last task completion. Skipping git pull.")
# Get last successful pull time for status message
cur.execute('SELECT "TIME" FROM status WHERE "STATUS_TYPE" = \'GIT_PULL\'')
last_pull = cur.fetchone()
last_pull_time = last_pull[0] if last_pull else 'Never'
status_msg = f"Git pull skipped due to recent task activity; last successful pull was {last_pull_time}"
cur.execute(
'UPDATE status SET "TIME" = %s, "STATUS" = %s WHERE "STATUS_TYPE" = \'GIT_PULL\'',
(datetime.now(), status_msg)
)
conn.commit()
return False

# Check time since last git pull
cur.execute('SELECT "TIME" FROM status WHERE "STATUS_TYPE" = \'GIT_PULL\'')
result = cur.fetchone()

Expand All @@ -232,15 +276,20 @@ def check_git_pull_status():
return True

last_pull_time = result[0]
current_time = datetime.now()
time_since_last_pull = current_time - last_pull_time
time_since_last_pull = datetime.now() - last_pull_time

if time_since_last_pull >= timedelta(hours=12):
logging.info(f"Last git pull was {time_since_last_pull} ago. Running git pull.")
git_pull()
return True
else:
logging.info(f"Last git pull was {time_since_last_pull} ago. No action needed.")
status_msg = f"Git pull skipped (too soon); last successful pull was {last_pull_time}"
cur.execute(
'UPDATE status SET "TIME" = %s, "STATUS" = %s WHERE "STATUS_TYPE" = \'GIT_PULL\'',
(datetime.now(), status_msg)
)
conn.commit()
return False
except Exception as e:
logging.error(f"Error checking git pull status: {e}")
Expand Down
17 changes: 0 additions & 17 deletions geoBoundaryBuilder/modules/worker_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,6 @@ def create_worker_pod(iso, adm, filesize, taskid):
run_as_user=71032,
run_as_group=9915
),
affinity=client.V1Affinity(
node_affinity=client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
node_selector_terms=[
client.V1NodeSelectorTerm(
match_expressions=[
client.V1NodeSelectorRequirement(
key="kubernetes.io/hostname",
operator="In",
values=["d3i00.sciclone.wm.edu"]
)
]
)
]
)
)
),
containers=[
client.V1Container(
name="worker-container",
Expand Down
37 changes: 37 additions & 0 deletions geoBoundaryBuilder/monitor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,43 @@ def get_db_connection():
port=DB_PORT
)

@app.route('/api/worker-grid')
def get_worker_grid():
"""Get status for all ISO/ADM combinations"""
try:
with get_db_connection() as conn:
with conn.cursor() as cur:
# Get all worker statuses
cur.execute("""
SELECT "STATUS_TYPE", "STATUS", "TIME"
FROM status
WHERE "STATUS_TYPE" LIKE '%_WORKER'
""")
rows = cur.fetchall()

# Process into grid format
grid_data = []
for row in rows:
# Parse ISO and ADM from STATUS_TYPE (format: ISO_ADM#_WORKER)
parts = row[0].split('_')
if len(parts) >= 3:
iso = parts[0]
adm = parts[1].replace('ADM', '')
status = row[1]
timestamp = row[2].replace(tzinfo=ZoneInfo('UTC')).astimezone(ZoneInfo('America/New_York')).isoformat() if row[2] else None

grid_data.append({
'iso': iso,
'adm': adm,
'status': status,
'time': timestamp
})

return jsonify({'grid_data': grid_data})
except Exception as e:
logging.error(f"Error getting worker grid data: {e}")
return jsonify({'error': str(e)}), 500

@app.route('/api/stats')
def get_stats():
try:
Expand Down
144 changes: 142 additions & 2 deletions geoBoundaryBuilder/monitor/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,92 @@
text-align: center;
padding: 20px;
}

.worker-grid {
margin-top: 20px;
overflow-x: auto;
margin-bottom: 40px;
}

.worker-grid table {
border-collapse: collapse;
width: 100%;
margin: 0 auto;
}

.worker-grid th {
background-color: #f5f5f5;
padding: 10px;
text-align: center;
border: 1px solid #ddd;
position: sticky;
top: 0;
z-index: 1;
}

.worker-grid td {
padding: 10px;
text-align: center;
border: 1px solid #ddd;
min-width: 50px;
}

.worker-grid td:first-child {
font-weight: bold;
background-color: #f5f5f5;
position: sticky;
left: 0;
z-index: 2;
}

.status-square {
width: 20px;
height: 20px;
margin: 0 auto;
cursor: pointer;
transition: transform 0.2s;
border-radius: 3px;
}

.status-square:hover {
transform: scale(1.2);
box-shadow: 0 0 5px rgba(0,0,0,0.3);
}
</style>
</head>
<body>
<div class="container">
<h1>geoBoundaries Monitor</h1>

<h2>System Status</h2>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-title">Ready Tasks</div>
<div class="stat-value" id="ready-tasks">-</div>
</div>
<div class="stat-card">
<div class="stat-title">Worker Status</div>
<div class="stat-value" id="worker-status">-</div>
</div>

<h2>Worker Status Grid</h2>
<div class="worker-grid">
<table>
<thead>
<tr>
<th>ISO</th>
<th>ADM0</th>
<th>ADM1</th>
<th>ADM2</th>
<th>ADM3</th>
<th>ADM4</th>
<th>ADM5</th>
</tr>
</thead>
<tbody id="worker-grid-body">
</tbody>
</table>
</div>
<div class="stat-card">
<div class="stat-title">Processed in Last 24h</div>
<div class="stat-value" id="processed-24h">-</div>
Expand Down Expand Up @@ -208,6 +284,66 @@ <h2>System Status</h2>
}
}

function getStatusColor(status, timestamp) {
const ageInHours = (new Date() - new Date(timestamp)) / (1000 * 60 * 60);
const baseColor = status.includes('COMPLETE') ? [46, 204, 113] : // Green
status.includes('ERROR') ? [231, 76, 60] : // Red
[241, 196, 15]; // Yellow

// Darken based on age (up to 72 hours)
const darkening = Math.min(ageInHours / 72, 1) * 0.7; // Max 70% darker
return `rgb(${baseColor.map(c => Math.round(c * (1 - darkening))).join(',')})`;
}

function updateWorkerGrid() {
fetch('/api/worker-grid')
.then(response => response.json())
.then(data => {
// Group by ISO
const isoMap = new Map();
data.grid_data.forEach(item => {
if (!isoMap.has(item.iso)) {
isoMap.set(item.iso, new Map());
}
isoMap.get(item.iso).set(item.adm, {
status: item.status,
time: item.time
});
});

// Sort ISOs
const sortedIsos = Array.from(isoMap.keys()).sort();

// Build table rows
const tbody = document.getElementById('worker-grid-body');
tbody.innerHTML = '';

sortedIsos.forEach(iso => {
const row = document.createElement('tr');
row.innerHTML = `<td>${iso}</td>`;

// Add cells for each ADM level
for (let adm = 0; adm <= 5; adm++) {
const cell = document.createElement('td');
const status = isoMap.get(iso).get(adm.toString());

if (status) {
const square = document.createElement('div');
square.className = 'status-square';
square.style.backgroundColor = getStatusColor(status.status, status.time);
square.title = `${status.status}\nLast Updated: ${formatDateEST(status.time)}`;
cell.appendChild(square);
}

row.appendChild(cell);
}

tbody.appendChild(row);
});
})
.catch(error => console.error('Error fetching worker grid:', error));
}

function updateStats() {
fetch('/api/stats')
.then(response => response.json())
Expand Down Expand Up @@ -259,9 +395,13 @@ <h2>System Status</h2>
});
}

// Update stats immediately and then every 5 seconds
// Update stats and grid immediately and then every 5 seconds
updateStats();
setInterval(updateStats, 5000);
updateWorkerGrid();
setInterval(() => {
updateStats();
updateWorkerGrid();
}, 5000);
</script>
</body>
</html>

0 comments on commit 8b4643c

Please sign in to comment.