You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The current dataJoint-python approach for jobs reservation, orchestration, and execution (i.e. the autopopulate) faces scalability limitations. While its original design effectively handled job reservation/distribution for parallelization, it falls short when building a comprehensive data platform.
Limitations of the jobs table
The existing jobs table functions more as an error/reserve table than a true jobs queue.
Limited Job Statuses: It primarily records error (failed jobs) and reserved (jobs in progress) states. It lacks crucial statuses such as:
pending/scheduled (jobs not yet started)
success (record of successfully completed jobs and their duration).
Inefficient Job Queue: It doesn't operate as a true jobs queue where workers can efficiently pull tasks.
Each worker must individually call key_source to get a list of jobs, which, while ensuring up-to-date information, strains the database.
Non-Queryable for Status: The table is not easily queryable for overall job status, hindering the development of dashboards, monitoring tools, and reporting.
Limitations of key_source Behavior/Usage
The default key_source (an inner-join of parent tables) is intended to represent all possible jobs for a given table.
Frequent Modification Needed: In practice, the actual set of jobs of interest is often a subset of this, requiring frequent modifications to key_source (e.g., restricting by paramset or other tables).
Local Visibility Only: Modified key_source settings are only visible to the local code executing the pipeline, not globally at the database level. This leads to:
Out-of-sync code and key_source definitions.
Lack of visibility and accessibility via "virtual modules."
The need to install the entire pipeline/codebase to run specific parts, increasing complexity for microservices in a platform like Works.
Performance Bottleneck:(Table.key_source - Table).fetch('KEY') is DataJoint's method for retrieving the job queue and can be an expensive operation, especially when called frequently by multiple workers. This significantly strains the database server, as observed by other users.
Proposed Solution: New Jobs Table
Step 1: A New JOB2 Table (Name TBD)
A new table, tentatively named JOB2, would be introduced with the following schema:
table_name: varchar(255) - The className of the table.
key_hash: char(32) - A hash of the job's key.
status: enum('reserved','error','ignore','scheduled','success') - The current status of the job.
key: json - A JSON structure containing the job's key.
status_message: varchar(2000) - e.g., error message if failed.
error_stack: mediumblob - The error stack if the job failed.
timestamp: timestamp - The scheduled time (UTC) for the job to run.
run_duration: float - The run duration in seconds.
run_version: json - Representation of the code/environment version of the run (e.g., git commit hash).
user: varchar(255) - The database user.
host: varchar(255) - The system hostname.
pid: int unsigned - The system process ID.
connection_id: bigint unsigned - The database connection ID.
Step 2: Mechanism to "Hydrate"/"Refresh" the JOB2 Table
A new class method, refresh_jobs(), would be introduced for every Autopopulate table. This method would:
Call the key_source of the table.
Add new "scheduled" jobs to JOB2.
Remove invalid job entries (regardless of status) from JOB2 due to upstream record deletions.
The key challenge here is how and when to trigger refresh_jobs(). If triggered by every populate(reserved_jobs=True) call, it could become a bottleneck due to read/write operations to JOB2 and potential race conditions/deadlocks.
Step 3: New/Updated populate() Function
The populate() function would be updated to:
Query JOB2 for a list of "scheduled" jobs.
Call populate1(key) as usual for each job.
Upon success, update the job's status in JOB2 to success and add additional information (e.g., run duration, code version).
Considerations
refresh_jobs() Frequency and Staleness: How often should refresh_jobs() be called, and what level of staleness in JOB2 is acceptable? A centralized process could refresh jobs for each research project on a schedule (e.g., every 10, 15, or 30 minutes), similar to current worker-manager cron jobs. This would address the performance issues related to key_source that many users have experienced.
refresh_jobs() without Pipeline Code: Should refresh_jobs() be callable without the pipeline code installed (i.e., from a "virtual module")? Yes, to avoid the complexity and expense of requiring full code installation.
Notes
We have considered adopting and integrating with other industry standards for workflow orchestration such as Airflow, Flyte or Prefect, and have produced and evaluated multiple working prototypes.
However, we think that the additional burden of deployment & maintenance of those tools is too much for a python open-source project such as DataJoint - the enhanced features come with significant DevOps requirements & burden.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
Problem Statement:
The current dataJoint-python approach for jobs reservation, orchestration, and execution (i.e. the
autopopulate
) faces scalability limitations. While its original design effectively handled job reservation/distribution for parallelization, it falls short when building a comprehensive data platform.Limitations of the
jobs
tableThe existing
jobs
table functions more as an error/reserve table than a true jobs queue.error
(failed jobs) andreserved
(jobs in progress) states. It lacks crucial statuses such as:pending
/scheduled
(jobs not yet started)success
(record of successfully completed jobs and their duration).key_source
to get a list of jobs, which, while ensuring up-to-date information, strains the database.Limitations of
key_source
Behavior/UsageThe default
key_source
(an inner-join of parent tables) is intended to represent all possible jobs for a given table.key_source
(e.g., restricting byparamset
or other tables).key_source
settings are only visible to the local code executing the pipeline, not globally at the database level. This leads to:key_source
definitions.(Table.key_source - Table).fetch('KEY')
is DataJoint's method for retrieving the job queue and can be an expensive operation, especially when called frequently by multiple workers. This significantly strains the database server, as observed by other users.Proposed Solution: New Jobs Table
Step 1: A New
JOB2
Table (Name TBD)A new table, tentatively named
JOB2
, would be introduced with the following schema:table_name
:varchar(255)
- TheclassName
of the table.key_hash
:char(32)
- A hash of the job's key.status
:enum('reserved','error','ignore','scheduled','success')
- The current status of the job.key
:json
- A JSON structure containing the job's key.status_message
:varchar(2000)
- e.g., error message if failed.error_stack
:mediumblob
- The error stack if the job failed.timestamp
:timestamp
- The scheduled time (UTC) for the job to run.run_duration
:float
- The run duration in seconds.run_version
:json
- Representation of the code/environment version of the run (e.g., git commit hash).user
:varchar(255)
- The database user.host
:varchar(255)
- The system hostname.pid
:int unsigned
- The system process ID.connection_id
:bigint unsigned
- The database connection ID.Step 2: Mechanism to "Hydrate"/"Refresh" the
JOB2
TableA new class method,
refresh_jobs()
, would be introduced for every Autopopulate table. This method would:key_source
of the table.JOB2
.JOB2
due to upstream record deletions.The key challenge here is how and when to trigger
refresh_jobs()
. If triggered by everypopulate(reserved_jobs=True)
call, it could become a bottleneck due to read/write operations toJOB2
and potential race conditions/deadlocks.Step 3: New/Updated
populate()
FunctionThe
populate()
function would be updated to:JOB2
for a list of "scheduled" jobs.populate1(key)
as usual for each job.JOB2
tosuccess
and add additional information (e.g., run duration, code version).Considerations
refresh_jobs()
Frequency and Staleness: How often shouldrefresh_jobs()
be called, and what level of staleness inJOB2
is acceptable? A centralized process could refresh jobs for each research project on a schedule (e.g., every 10, 15, or 30 minutes), similar to current worker-manager cron jobs. This would address the performance issues related tokey_source
that many users have experienced.refresh_jobs()
without Pipeline Code: Shouldrefresh_jobs()
be callable without the pipeline code installed (i.e., from a "virtual module")? Yes, to avoid the complexity and expense of requiring full code installation.Notes
We have considered adopting and integrating with other industry standards for workflow orchestration such as Airflow, Flyte or Prefect, and have produced and evaluated multiple working prototypes.
However, we think that the additional burden of deployment & maintenance of those tools is too much for a python open-source project such as DataJoint - the enhanced features come with significant DevOps requirements & burden.
Beta Was this translation helpful? Give feedback.
All reactions