Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: Add scheduler worker to create reoccuring processors #427

Draft
wants to merge 30 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4ae54ee
basic scheduler
dale-wahl Feb 26, 2024
833ec77
Merge branch 'master' into scheduler
dale-wahl Feb 26, 2024
afe790c
update scheduler view, add functions to update job interval
dale-wahl Feb 26, 2024
c67a728
Merge branch 'master' into scheduler
dale-wahl Feb 27, 2024
5344192
revert .env
dale-wahl Apr 1, 2024
ce8800d
working scheduler!
dale-wahl Apr 1, 2024
a1fe00b
basic scheduler view w/ datasets
dale-wahl Apr 1, 2024
6268673
fix postgres tag
dale-wahl Apr 23, 2024
4153431
update job status in scheduled_jobs table
dale-wahl May 27, 2024
a747677
fix timestamp; end_date needed for last run check; add dataset label
dale-wahl May 27, 2024
51dff84
improve scheduler view
dale-wahl May 27, 2024
bd13685
remove dataset from scheduled_jobs table on delete
dale-wahl May 27, 2024
d2ff74c
scheduler view order by last creation
dale-wahl May 28, 2024
00fcf7d
scheduler views: separate scheduler list from scheduled dataset list
dale-wahl Jun 3, 2024
6c07d98
pagination: add route_args
dale-wahl Jun 4, 2024
5ecfbd2
fix up scheduler header
dale-wahl Jun 5, 2024
f0536e2
Merge branch 'master' into scheduler
dale-wahl Oct 8, 2024
557ca0e
yikes: seconds to days in interval
dale-wahl Oct 8, 2024
7ac0e97
fix up scheduler to use self.modules when creating datasets
dale-wahl Oct 8, 2024
d67f070
get last_update timestamp from log; display in dataset results
dale-wahl Oct 24, 2024
7670699
Merge branch 'finished_at' into scheduler
dale-wahl Oct 24, 2024
7545a55
use finished timestamp in scheduler view
dale-wahl Oct 24, 2024
651b144
ensure key exists
dale-wahl Oct 24, 2024
b8fb1a1
add check and reconnect method to Database class
dale-wahl Nov 19, 2024
39f48ef
build in retries and wait time
dale-wahl Nov 26, 2024
8ddf3f3
Merge branch 'master' into scheduler
dale-wahl Nov 26, 2024
67e9cde
Merge branch 'db_reconnect' into scheduler
dale-wahl Nov 26, 2024
d97a87f
Merge branch 'master' into scheduler
dale-wahl Dec 17, 2024
b4af6bb
Merge branch 'master' into scheduler
dale-wahl Jan 6, 2025
81451ee
Merge branch 'master' into scheduler
dale-wahl Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions backend/workers/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""
Schedule processors on a regular basis
"""
import json
from datetime import datetime

from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.lib.job import Job


class Scheduler(BasicWorker):
"""
Similar to the manager, this worker schedules other workers to run at regular intervals. This worker will maintain
the schedule for related jobs. If a job does not complete successfully, it will be rescheduled according to some
criteria. Primarily, this worker is different from a normal job run at intervals in that it maintains a relationship
between multiple jobs which otherwise does not exist and allows us to maintain the one to one relationship between
job, processors, and datasets.
"""
type = "scheduler"
max_workers = 5 # this seems a bit arbitrary, we can run many and this should only be updating the database and creating other jobs as necessary
work_report = None
details = None

@staticmethod
def ensure_database(db):
"""
Ensure that the database is set up for this worker

job_id: primary key of each scheduled job
scheduler_id: job id of the original scheduler
jobtype: type of job to be run
dataset_id: dataset id for each specific job
status: status of the job
last_run: last time the job was run
details: additional details for the job (potentially different for each job)
"""
# create the table if it doesn't exist
db.execute("CREATE TABLE IF NOT EXISTS scheduled_jobs (job_id int PRIMARY KEY, scheduler_id int NOT NULL, jobtype text NOT NULL, dataset_id text NOT NULL, status text NOT NULL, created_at integer, details jsonb)")

def work(self):
"""
Check previous scheduled jobs and schedule new ones as necessary
"""
# TODO: ensure_database could be called when workers are validated; we could even check for packages there
self.ensure_database(self.db)
# Ensure clean work report
self.work_report = {}

# Get job details
self.details = self.job.details
self.log.debug(f"Scheduler started: {self.details}")

# get this worker's previously scheduled jobs, order by last run
jobs = self.db.fetchall("SELECT * FROM scheduled_jobs where scheduler_id = %s", (self.job.data["id"],))

if not jobs:
# No jobs, schedule first one
self.schedule_job(first=True)
else:
# Check to see if jobs need to be rescheduled and do so
self.reschedule_jobs(jobs)

# Check to see if time for new job and schedule if necessary,
if self.check_schedule(jobs):
self.schedule_job()

if self.check_last_run():
# If last job has been scheduled, all jobs completed and updated, delete this Scheduler job
self.job.finish(delete=True)
else:
self.job.finish()

def check_schedule(self, jobs):
"""
Check if it is time to schedule a new job

:param list jobs: List of jobs
:return bool: Whether to schedule a new job
"""
# Currently main job is scheduled by Manager, so we don't need to check for it
return True

def schedule_job(self, first=False):
"""
Schedule a new job

TODO: How should we handle sensitive data in a Scheduler type scenario? I feel that we should allow it,
but perhaps have a warning popup? Right now, I'm looking at storing dataset parameters in the job.details
column and they may contain parameters. I thought I could perhaps get away with using the latest dataset
created by the Scheduler (I am not sure how to do this yet, but I want to be able to update some parameters
such as dates if we were to say want a rolling start date for a query)

:param bool first: Whether this is the first job
"""
if first:
# Schedule the first job; dataset already exists
dataset = DataSet(key=self.job.data["remote_id"].replace("scheduler-",""), db=self.db, modules=self.modules)

# Dataset processor
processor = dataset.get_own_processor()

# Store necessaries for future datasets
# These may contain sensitive parameters, but those will be needed for future jobs...
given_parameters = dataset.parameters.copy()
all_parameters = processor.get_options(dataset)
parameters = {
param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
for param in [*all_parameters.keys(), *given_parameters.keys()]
}
self.update_details({
"owner": dataset.creator,
"processor_type": processor.type,
"extension": processor.get_extension(dataset.get_parent()),
"is_private": dataset.is_private,
"parameters": dataset.parameters.copy(),
"label": dataset.get_label(),
"last_dataset": dataset.key
})

else:
# Create new dataset
dataset = DataSet(
parameters=self.details.get("parameters"),
db=self.db,
type=self.details.get("processor_type"),
extension=self.details.get("extension"),
is_private=self.details.get("is_private"),
owner=self.details.get("owner"),
modules=self.modules
)
self.update_details({"last_dataset": dataset.key})

# Create new job; interval is 0 as this scheduler is responsible for scheduling the next job
# Job details contains the scheduler_id for job to update scheduler table on finish
self.queue.add_job(jobtype=self.details.get("processor_type"), remote_id=dataset.key, interval=0, details={"scheduler_id": self.job.data["id"]})
# Get new job w/ ID
new_job = Job.get_by_remote_ID(dataset.key, self.db)

# Link new job to dataset
dataset.link_job(new_job)

# Update scheduler table
self.db.insert("scheduled_jobs", data={
"job_id": new_job.data["id"],
"scheduler_id": self.job.data["id"],
"jobtype": self.details.get("processor_type"),
"dataset_id": dataset.key,
"status": "scheduled",
"created_at": int(datetime.now().timestamp()),
"details": json.dumps(self.details)
})
self.log.info(f"Scheduler created {self.details.get('processor_type')} job: dataset {self.details.get('last_dataset')}")

def update_details(self, details):
"""
Update parameters for the next job. If none exist, create them.

Unsure if I want this, but we could allow users to update the parameters for the next job
"""
self.details.update(details)
self.db.update("jobs", where={"jobtype": self.job.data["jobtype"], "remote_id": self.job.data["remote_id"]},
data={"details": json.dumps(self.details)})

def reschedule_jobs(self, jobs):
"""
Reschedule jobs that need it

:param list jobs: List of jobs
"""
pass

def check_last_run(self):
"""
Check if the last job has been run

:return bool:
"""
end_date = self.details.get("enddate")
if end_date and datetime.now() >= datetime.strptime(end_date, "%Y-%m-%d"):
return True
else:
return False
6 changes: 6 additions & 0 deletions common/lib/config_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
"help": "Can export datasets",
"tooltip": "Allows users to export datasets they own to other 4CAT instances."
},
"privileges.can_schedule_datasources": {
"type": UserInput.OPTION_TOGGLE,
"default": False,
"help": "Can schedule data sources and view scheduler",
"tooltip": "Controls whether users can add intervals for data sources to be collected and view scheduler tab."
},
"privileges.admin.can_manage_users": {
"type": UserInput.OPTION_TOGGLE,
"default": False,
Expand Down
31 changes: 30 additions & 1 deletion common/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,33 @@ def __init__(self, logger, dbname=None, user=None, password=None, host=None, por
if self.log is None:
self.log = logging

self.commit()
def check_and_reconnect(self, tries=3, wait=10):
"""
Check if the connection is closed and reconnect if necessary.

:param int tries: Number of tries to reconnect
:param int wait: Time to wait between tries (first try is immediate)
"""
try:
self.connection.cursor().execute('SELECT 1')
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
self.log.warning(f"Database connection closed. Reconnecting...\n{e}")
current = 1
while current <= tries:
try:
self.connection = psycopg2.connect(dbname=self.connection.info.dbname,
user=self.connection.info.user,
password=self.connection.info.password,
host=self.connection.info.host,
port=self.connection.info.port,
application_name=self.appname)
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
break
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
self.log.warning(f"Database connection closed. Reconnecting...\n{e}")
time.sleep(wait)
current += 1


def query(self, query, replacements=None, cursor=None):
"""
Expand All @@ -62,6 +88,8 @@ def query(self, query, replacements=None, cursor=None):
"""
if not cursor:
cursor = self.get_cursor()
else:
self.check_and_reconnect()

self.log.debug("Executing query %s" % self.cursor.mogrify(query, replacements))

Expand Down Expand Up @@ -422,4 +450,5 @@ def get_cursor(self):

:return: Cursor
"""
self.check_and_reconnect()
return self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
22 changes: 20 additions & 2 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from common.config_manager import config
from common.lib.job import Job, JobNotFoundException
from common.lib.module_loader import ModuleCollector
from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version
from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, get_last_line
from common.lib.item_mapping import MappedItem, MissingMappedField, DatasetItem
from common.lib.fourcat_module import FourcatModule
from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException,
Expand Down Expand Up @@ -546,6 +546,12 @@ def delete(self, commit=True):
self.db.delete("datasets", where={"key": self.key}, commit=commit)
self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
#TODO: remove when migrate script ensures scheduled_jobs table exists
scheduler = self.db.fetchone(
"SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = %s AND table_name = %s )",
("public", "scheduled_jobs"))
if scheduler["exists"]:
self.db.delete("scheduled_jobs", where={"dataset_id": self.key}, commit=commit)

# delete from drive
try:
Expand Down Expand Up @@ -1099,6 +1105,18 @@ def update_progress(self, progress):
updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"progress": progress})
return updated > 0

def get_last_update(self):
"""
Get the last update time of the dataset. If dataset is completed, this will be the last status update (usually
"Dataset Completed".

Returns None if there is no last update time.
"""
if self.get_log_path().exists():
return datetime.datetime.strptime(get_last_line(self.get_log_path())[:24], "%c")
else:
return None

def get_progress(self):
"""
Get dataset progress
Expand Down Expand Up @@ -1175,7 +1193,7 @@ def get_version_url(self, file):
:param file: File to link within the repository
:return: URL, or an empty string
"""
if not self.data["software_source"]:
if "software_source" not in self.data or not self.data["software_source"]:
return ""

filepath = self.data.get("software_file", "")
Expand Down
13 changes: 13 additions & 0 deletions common/lib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ def finish(self, delete=False):

self.is_finished = True

# Update scheduler table, if job was scheduled
if self.details.get("scheduler_id"):
self.db.update("scheduled_jobs", data={"status": "finished"},
where={"job_id": self.data["id"], "scheduler_id": self.details["scheduler_id"]})

def release(self, delay=0, claim_after=0):
"""
Release a job so it may be claimed again
Expand All @@ -151,6 +156,14 @@ def is_claimable(self):
"""
return not self.is_claimed and not self.is_finished

def update_interval(self, interval):
"""
Change the interval for a job
"""
self.db.update("jobs", data={"interval": interval},
where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
self.data["interval"] = interval

def get_place_in_queue(self):
"""
Get the place of this job in the queue
Expand Down
1 change: 1 addition & 0 deletions webtool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import webtool.views.views_user
import webtool.views.views_dataset
import webtool.views.views_misc
import webtool.views.views_scheduler
import webtool.views.api_explorer
import webtool.views.api_standalone
import webtool.views.api_tool
Expand Down
23 changes: 15 additions & 8 deletions webtool/templates/components/result-details.html
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,21 @@ <h2 class="blocktitle{% if current_user.is_authenticated and (__user_config("pri
{% endif %}

<div class="onequarter">
<dt>Created</dt>
<dd>
{% if dataset.parameters.original_timestamp %}
<i class="fa fa-file-import tooltip-trigger" aria-hidden="true" aria-controls="tooltip-{{ dataset.key }}-imported"></i> <span class="sr-only">This dataset was imported and was originally created on {{ dataset.parameters.original_timestamp|datetime(fmt="%d %B %Y, %H:%M")|safe }}.</span>
<span role="tooltip" aria-hidden="true" id="tooltip-{{ dataset.key }}-imported">This dataset was imported and was originally created on {{ dataset.parameters.original_timestamp|datetime(fmt="%d %B %Y, %H:%M")|safe }}.</span>
{% endif %}
{{ dataset.timestamp|datetime(fmt="%d %B %Y, %H:%M")|safe }}
</dd>
{% if dataset.is_finished() and dataset.get_last_update() %}
<dt>Finished</dt>
<dd>
{{ dataset.get_last_update().timestamp()|int|datetime(fmt="%d %B %Y, %H:%M")|safe }}
</dd>
{% else %}
<dt>Created</dt>
<dd>
{% if dataset.parameters.original_timestamp %}
<i class="fa fa-file-import tooltip-trigger" aria-hidden="true" aria-controls="tooltip-{{ dataset.key }}-imported"></i> <span class="sr-only">This dataset was imported and was originally created on {{ dataset.parameters.original_timestamp|datetime(fmt="%d %B %Y, %H:%M")|safe }}.</span>
<span role="tooltip" aria-hidden="true" id="tooltip-{{ dataset.key }}-imported">This dataset was imported and was originally created on {{ dataset.parameters.original_timestamp|datetime(fmt="%d %B %Y, %H:%M")|safe }}.</span>
{% endif %}
{{ dataset.timestamp|datetime(fmt="%d %B %Y, %H:%M")|safe }}
</dd>
{% endif %}
</div>

<div class="threequarters dataset-owner-list">
Expand Down
19 changes: 19 additions & 0 deletions webtool/templates/create-dataset.html
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ <h2><span>Create new dataset</span></h2>
<p role="tooltip" id="tooltip-dataset-email">This will only function if your username is your email address.</p>
</div>
{% endif %}
{% if __user_config("privileges.can_schedule_datasources") %}
<div class="form-element">
<label for="data-schedule-collection">Schedule repeat collection:</label>
<div class="filter-parameters">
<label><input type="checkbox" name="schedule-collection" id="data-schedule-collection"></label>
<label>Number of days between collection:<input id="data-schedule-interval" type="number" name="schedule-interval"></label>
<button class="tooltip-trigger" aria-controls="tooltip-schedule-interval" aria-label="Extended help for option">?</button>
</div>
<p role="tooltip" id="tooltip-schedule-interval">Note: you will need to use the Scheduler view to stop continued collection.</p>
</div>
<div class="form-element">
<label for="data-schedule-enddate">Until:</label>
<div class="filter-parameters">
<label>End after:<input id="data-schedule-enddate" type="date" name="schedule-enddate"></label>
<button class="tooltip-trigger" aria-controls="tooltip-schedule-enddate" aria-label="Extended help for option">?</button>
</div>
<p role="tooltip" id="tooltip-schedule-enddate">No jobs additional jobs will be collected after this point.</p>
</div>
{% endif %}
<div class="form-element dataset-labeling">
<label for="dataset-label">Dataset name:</label>
<input id="dataset-label" name="label">
Expand Down
3 changes: 3 additions & 0 deletions webtool/templates/layout.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ <h1>
{% endif %}
<li{% if navigation.current == "dataset" %} class="current"{% endif %}><a href="{{ url_for('show_results') }}">Datasets</a></li>
<li{% if navigation.current == "datasources" %} class="current"{% endif %}><a href="{{ url_for('data_overview') }}">Data sources</a></li>
{% if __user_config("privileges.can_schedule_datasources") %}
<li{% if navigation.current == "scheduler" %} class="current"{% endif %}><a href="{{ url_for('show_scheduler') }}">Scheduler</a></li>
{% endif %}
{% if current_user.is_authenticated and not current_user.is_special and __user_config("privileges.can_create_api_token") %}
<li{% if navigation.current == "api-access" %} class="current"{% endif %}><a href="{{ url_for('show_access_tokens') }}">API Access</a></li>
{% endif %}
Expand Down
Loading
Loading