diff --git a/docs/source/overview/concepts.rst b/docs/source/overview/concepts.rst index f2905b640f..4b559e24c6 100644 --- a/docs/source/overview/concepts.rst +++ b/docs/source/overview/concepts.rst @@ -92,6 +92,6 @@ After registering ``MnistImageClassifier`` function, you can call the function i AI-Centric Query Optimization ----------------------------- -EvaDB optimizes the AI queries to save money spent on running models and reduce query execution time. It contains a novel `Cascades-style query optimizer `__ tailored for AI queries. +EvaDB optimizes the AI queries to save money spent on running models and reduce query execution time. It contains a novel `Cascades-style query optimizer `__ tailored for AI queries. -Query optimization has powered SQL database systems for several decades. It is the bridge that connects the declarative query language to efficient query execution on hardware. EvaDB accelerates AI queries using a collection of optimizations detailed in the :ref:`optimizations` page. \ No newline at end of file +Query optimization has powered SQL database systems for several decades. It is the bridge that connects the declarative query language to efficient query execution on hardware. EvaDB accelerates AI queries using a collection of optimizations detailed in the :ref:`optimizations` page. diff --git a/docs/source/reference/ai/model-forecasting.rst b/docs/source/reference/ai/model-forecasting.rst index 1d97ffa8b9..b131f30875 100644 --- a/docs/source/reference/ai/model-forecasting.rst +++ b/docs/source/reference/ai/model-forecasting.rst @@ -58,7 +58,7 @@ EvaDB's default forecast framework is `statsforecast `_ to learn details about these models. If LIBRARY is `neuralforecast`, we can select one of NHITS or NBEATS. The default is NBEATS. Check `NBEATS docs `_ for details. + - If LIBRARY is `statsforecast`, we can select one of ARIMA, ting, ETS, Theta. The default is ARIMA. Check `Automatic Forecasting `_ to learn details about these models. If LIBRARY is `neuralforecast`, we can select one of NHITS or NBEATS. The default is NBEATS. Check `NBEATS docs `_ for details. * - AUTO (str, default: 'T') - If set to 'T', it enables automatic hyperparameter optimization. Must be set to 'T' for `statsforecast` library. One may set this parameter to `false` if LIBRARY is `neuralforecast` for faster (but less reliable) results. * - CONF (int, default: 90) @@ -103,4 +103,3 @@ Below is an example query with `neuralforecast` with `trend` column as exogenous LIBRARY 'neuralforecast' AUTO 'f' FREQUENCY 'M'; - diff --git a/docs/source/reference/databases/github.rst b/docs/source/reference/databases/github.rst index 71cc9e546b..14aaa9fd8e 100644 --- a/docs/source/reference/databases/github.rst +++ b/docs/source/reference/databases/github.rst @@ -19,7 +19,7 @@ Required: Optional: -* ``github_token`` is not required for public repositories. However, the rate limit is lower without a valid github_token. Check the `Rate limits page `_ to learn more about how to check your rate limit status. Check `Managing your personal access tokens page `_ to learn how to create personal access tokens. +* ``github_token`` is not required for public repositories. However, the rate limit is lower without a valid github_token. Check the `Rate limits page `_ to learn more about how to check your rate limit status. Check `Managing your personal access tokens page `_ to learn how to create personal access tokens. Create Connection ----------------- diff --git a/evadb/catalog/catalog_manager.py b/evadb/catalog/catalog_manager.py index 20c50c9dfb..8482bf1bd6 100644 --- a/evadb/catalog/catalog_manager.py +++ b/evadb/catalog/catalog_manager.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import shutil from pathlib import Path from typing import Any, List @@ -39,6 +40,8 @@ FunctionIOCatalogEntry, FunctionMetadataCatalogEntry, IndexCatalogEntry, + JobCatalogEntry, + JobHistoryCatalogEntry, TableCatalogEntry, drop_all_tables_except_catalog, init_db, @@ -61,6 +64,8 @@ FunctionMetadataCatalogService, ) from evadb.catalog.services.index_catalog_service import IndexCatalogService +from evadb.catalog.services.job_catalog_service import JobCatalogService +from evadb.catalog.services.job_history_catalog_service import JobHistoryCatalogService from evadb.catalog.services.table_catalog_service import TableCatalogService from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig from evadb.expression.function_expression import FunctionExpression @@ -85,6 +90,10 @@ def __init__(self, db_uri: str): self._config_catalog_service = ConfigurationCatalogService( self._sql_config.session ) + self._job_catalog_service = JobCatalogService(self._sql_config.session) + self._job_history_catalog_service = JobHistoryCatalogService( + self._sql_config.session + ) self._table_catalog_service = TableCatalogService(self._sql_config.session) self._column_service = ColumnCatalogService(self._sql_config.session) self._function_service = FunctionCatalogService(self._sql_config.session) @@ -215,6 +224,137 @@ def check_native_table_exists(self, table_name: str, database_name: str): return True + "Job catalog services" + + def insert_job_catalog_entry( + self, + name: str, + queries: str, + start_time: datetime, + end_time: datetime, + repeat_interval: int, + active: bool, + next_schedule_run: datetime, + ) -> JobCatalogEntry: + """A new entry is persisted in the job catalog. + + Args: + name: job name + queries: job's queries + start_time: job start time + end_time: job end time + repeat_interval: job repeat interval + active: job status + next_schedule_run: next run time as per schedule + """ + job_entry = self._job_catalog_service.insert_entry( + name, + queries, + start_time, + end_time, + repeat_interval, + active, + next_schedule_run, + ) + + return job_entry + + def get_job_catalog_entry(self, job_name: str) -> JobCatalogEntry: + """ + Returns the job catalog entry for the given database_name + Arguments: + job_name (str): name of the job + + Returns: + JobCatalogEntry + """ + + table_entry = self._job_catalog_service.get_entry_by_name(job_name) + + return table_entry + + def drop_job_catalog_entry(self, job_entry: JobCatalogEntry) -> bool: + """ + This method deletes the job from catalog. + + Arguments: + job_entry: job catalog entry to remove + + Returns: + True if successfully deleted else False + """ + return self._job_catalog_service.delete_entry(job_entry) + + def get_next_executable_job(self, only_past_jobs: bool = False) -> JobCatalogEntry: + """Get the oldest job that is ready to be triggered by trigger time + Arguments: + only_past_jobs: boolean flag to denote if only jobs with trigger time in + past should be considered + Returns: + Returns the first job to be triggered + """ + return self._job_catalog_service.get_next_executable_job(only_past_jobs) + + def update_job_catalog_entry( + self, job_name: str, next_scheduled_run: datetime, active: bool + ): + """Update the next_scheduled_run and active column as per the provided values + Arguments: + job_name (str): job which should be updated + + next_run_time (datetime): the next trigger time for the job + + active (bool): the active status for the job + """ + self._job_catalog_service.update_next_scheduled_run( + job_name, next_scheduled_run, active + ) + + "Job history catalog services" + + def insert_job_history_catalog_entry( + self, + job_id: str, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ) -> JobCatalogEntry: + """A new entry is persisted in the job history catalog. + + Args: + job_id: job id for the execution entry + job_name: job name for the execution entry + execution_start_time: job execution start time + execution_end_time: job execution end time + """ + job_history_entry = self._job_history_catalog_service.insert_entry( + job_id, job_name, execution_start_time, execution_end_time + ) + + return job_history_entry + + def get_job_history_by_job_id(self, job_id: int) -> List[JobHistoryCatalogEntry]: + """Returns all the entries present for this job_id on in the history. + + Args: + job_id: the id of job whose history should be fetched + """ + return self._job_history_catalog_service.get_entry_by_job_id(job_id) + + def update_job_history_end_time( + self, job_id: int, execution_start_time: datetime, execution_end_time: datetime + ) -> List[JobHistoryCatalogEntry]: + """Updates the execution_end_time for this job history matching job_id and execution_start_time. + + Args: + job_id: id of the job whose history entry which should be updated + execution_start_time: the start time for the job history entry + execution_end_time: the end time for the job history entry + """ + return self._job_history_catalog_service.update_entry_end_time( + job_id, execution_start_time, execution_end_time + ) + "Table catalog services" def insert_table_catalog_entry( diff --git a/evadb/catalog/models/job_catalog.py b/evadb/catalog/models/job_catalog.py new file mode 100644 index 0000000000..07a66f622b --- /dev/null +++ b/evadb/catalog/models/job_catalog.py @@ -0,0 +1,92 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json + +from sqlalchemy import Boolean, Column, DateTime, Index, Integer, String +from sqlalchemy.orm import relationship + +from evadb.catalog.models.base_model import BaseModel +from evadb.catalog.models.utils import JobCatalogEntry + + +class JobCatalog(BaseModel): + """The `JobCatalog` catalog stores information about all the created Jobs. + `_row_id:` an autogenerated unique identifier. + `_name:` the job name. + `_queries:` the queries to run as part of this job + `_start_time:` the job's start time + `_end_time:` the job's end time + `_repeat_interval:` the job's repeat interval + `_repeat_period:` the job's repeat period + `_active:` is the job active/deleted + `_next_scheduled_run:` the next trigger time for the job as per the schedule + `_created_at:` entry creation time + `_updated_at:` entry last update time + """ + + __tablename__ = "job_catalog" + + _name = Column("name", String(100), unique=True) + _queries = Column("queries", String, nullable=False) + _start_time = Column("start_time", DateTime, default=datetime.datetime.now) + _end_time = Column("end_ts", DateTime) + _repeat_interval = Column("repeat_interval", Integer) + _active = Column("active", Boolean, default=True) + _next_scheduled_run = Column("next_scheduled_run", DateTime) + + _created_at = Column("created_at", DateTime, default=datetime.datetime.now) + _updated_at = Column( + "updated_at", + DateTime, + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + ) + + _next_run_index = Index("_next_run_index", _next_scheduled_run) + _job_history_catalog = relationship("JobHistoryCatalog", cascade="all, delete") + + def __init__( + self, + name: str, + queries: str, + start_time: datetime, + end_time: datetime, + repeat_interval: Integer, + active: bool, + next_schedule_run: datetime, + ): + self._name = name + self._queries = queries + self._start_time = start_time + self._end_time = end_time + self._repeat_interval = repeat_interval + self._active = active + self._next_scheduled_run = next_schedule_run + + def as_dataclass(self) -> "JobCatalogEntry": + return JobCatalogEntry( + row_id=self._row_id, + name=self._name, + queries=json.loads(self._queries), + start_time=self._start_time, + end_time=self._end_time, + repeat_interval=self._repeat_interval, + active=self._active, + next_scheduled_run=self._next_scheduled_run, + created_at=self._created_at, + updated_at=self._updated_at, + ) diff --git a/evadb/catalog/models/job_history_catalog.py b/evadb/catalog/models/job_history_catalog.py new file mode 100644 index 0000000000..c6c21ba3f2 --- /dev/null +++ b/evadb/catalog/models/job_history_catalog.py @@ -0,0 +1,73 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime + +from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, UniqueConstraint + +from evadb.catalog.models.base_model import BaseModel +from evadb.catalog.models.utils import JobHistoryCatalogEntry + + +class JobHistoryCatalog(BaseModel): + """The `JobHistoryCatalog` stores the execution history of jobs . + `_row_id:` an autogenerated unique identifier. + `_job_id:` job id. + `_job_name:` job name. + `_execution_start_time:` start time of this run + `_execution_end_time:` end time for this run + `_created_at:` entry creation time + `_updated_at:` entry last update time + """ + + __tablename__ = "job_history_catalog" + + _job_id = Column( + "job_id", Integer, ForeignKey("job_catalog._row_id", ondelete="CASCADE") + ) + _job_name = Column("job_name", String(100)) + _execution_start_time = Column("execution_start_time", DateTime) + _execution_end_time = Column("execution_end_time", DateTime) + _created_at = Column("created_at", DateTime, default=datetime.datetime.now) + _updated_at = Column( + "updated_at", + DateTime, + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + ) + + __table_args__ = (UniqueConstraint("job_id", "execution_start_time"), {}) + + def __init__( + self, + job_id: int, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ): + self._job_id = job_id + self._job_name = job_name + self._execution_start_time = execution_start_time + self._execution_end_time = execution_end_time + + def as_dataclass(self) -> "JobHistoryCatalogEntry": + return JobHistoryCatalogEntry( + row_id=self._row_id, + job_id=self._job_id, + job_name=self._job_name, + execution_start_time=self._execution_start_time, + execution_end_time=self._execution_end_time, + created_at=self._created_at, + updated_at=self._updated_at, + ) diff --git a/evadb/catalog/models/utils.py b/evadb/catalog/models/utils.py index 5da3a2eef5..2c2271f4bc 100644 --- a/evadb/catalog/models/utils.py +++ b/evadb/catalog/models/utils.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import contextlib +import datetime import json from dataclasses import dataclass, field from typing import List, Tuple @@ -275,3 +276,57 @@ def display_format(self): "key": self.key, "value": self.value, } + + +@dataclass(unsafe_hash=True) +class JobCatalogEntry: + """Dataclass representing an entry in the `JobCatalog`.""" + + name: str + queries: list + start_time: datetime + end_time: datetime + repeat_interval: int + active: bool + next_scheduled_run: datetime + created_at: datetime + updated_at: datetime + row_id: int = None + + def display_format(self): + return { + "row_id": self.row_id, + "name": self.name, + "queries": self.queries, + "start_time": self.start_time, + "end_time": self.end_time, + "repeat_interval": self.repeat_interval, + "active": self.active, + "next_schedule_run": self.next_scheduled_run, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + + +@dataclass(unsafe_hash=True) +class JobHistoryCatalogEntry: + """Dataclass representing an entry in the `JobHistoryCatalog`.""" + + job_id: int + job_name: str + execution_start_time: datetime + execution_end_time: datetime + created_at: datetime + updated_at: datetime + row_id: int = None + + def display_format(self): + return { + "row_id": self.row_id, + "job_id": self.job_name, + "job_name": self.job_name, + "execution_start_time": self.execution_start_time, + "execution_end_time": self.execution_end_time, + "created_at": self.created_at, + "updated_at": self.updated_at, + } diff --git a/evadb/catalog/services/job_catalog_service.py b/evadb/catalog/services/job_catalog_service.py new file mode 100644 index 0000000000..1c8ad554b9 --- /dev/null +++ b/evadb/catalog/services/job_catalog_service.py @@ -0,0 +1,163 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json + +from sqlalchemy import and_, true +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import select + +from evadb.catalog.models.job_catalog import JobCatalog +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.catalog.services.base_service import BaseService +from evadb.utils.errors import CatalogError +from evadb.utils.logging_manager import logger + + +class JobCatalogService(BaseService): + def __init__(self, db_session: Session): + super().__init__(JobCatalog, db_session) + + def insert_entry( + self, + name: str, + queries: list, + start_time: datetime, + end_time: datetime, + repeat_interval: int, + active: bool, + next_schedule_run: datetime, + ) -> JobCatalogEntry: + try: + job_catalog_obj = self.model( + name=name, + queries=json.dumps(queries), + start_time=start_time, + end_time=end_time, + repeat_interval=repeat_interval, + active=active, + next_schedule_run=next_schedule_run, + ) + job_catalog_obj = job_catalog_obj.save(self.session) + + except Exception as e: + logger.exception( + f"Failed to insert entry into job catalog with exception {str(e)}" + ) + raise CatalogError(e) + + return job_catalog_obj.as_dataclass() + + def get_entry_by_name(self, job_name: str) -> JobCatalogEntry: + """ + Get the job catalog entry with given job name. + Arguments: + job_name (str): Job name + Returns: + JobCatalogEntry - catalog entry for given job name + """ + entry = self.session.execute( + select(self.model).filter(self.model._name == job_name) + ).scalar_one_or_none() + if entry: + return entry.as_dataclass() + return entry + + def delete_entry(self, job_entry: JobCatalogEntry): + """Delete Job from the catalog + Arguments: + job (JobCatalogEntry): job to delete + Returns: + True if successfully removed else false + """ + try: + job_catalog_obj = self.session.execute( + select(self.model).filter(self.model._row_id == job_entry.row_id) + ).scalar_one_or_none() + job_catalog_obj.delete(self.session) + return True + except Exception as e: + err_msg = f"Delete Job failed for {job_entry} with error {str(e)}." + logger.exception(err_msg) + raise CatalogError(err_msg) + + def get_all_overdue_jobs(self) -> list: + """Get the list of jobs that are overdue to be triggered + Arguments: + None + Returns: + Returns the list of all active overdue jobs + """ + entries = ( + self.session.execute( + select(self.model).filter( + and_( + self.model._next_scheduled_run <= datetime.datetime.now(), + self.model._active == true(), + ) + ) + ) + .scalars() + .all() + ) + entries = [row.as_dataclass() for row in entries] + return entries + + def get_next_executable_job(self, only_past_jobs: bool) -> JobCatalogEntry: + """Get the oldest job that is ready to be triggered by trigger time + Arguments: + only_past_jobs (bool): boolean flag to denote if only jobs with trigger time in + past should be considered + Returns: + Returns the first job to be triggered + """ + entry = self.session.execute( + select(self.model) + .filter( + and_( + self.model._next_scheduled_run <= datetime.datetime.now(), + self.model._active == true(), + ) + if only_past_jobs + else self.model._active == true() + ) + .order_by(self.model._next_scheduled_run.asc()) + .limit(1) + ).scalar_one_or_none() + if entry: + return entry.as_dataclass() + return entry + + def update_next_scheduled_run( + self, job_name: str, next_scheduled_run: datetime, active: bool + ): + """Update the next_scheduled_run and active column as per the provided values + Arguments: + job_name (str): job which should be updated + + next_run_time (datetime): the next trigger time for the job + + active (bool): the active status for the job + Returns: + void + """ + job = ( + self.session.query(self.model).filter(self.model._name == job_name).first() + ) + if job: + job._next_scheduled_run = next_scheduled_run + job._active = active + self.session.commit() diff --git a/evadb/catalog/services/job_history_catalog_service.py b/evadb/catalog/services/job_history_catalog_service.py new file mode 100644 index 0000000000..7082c3f3e4 --- /dev/null +++ b/evadb/catalog/services/job_history_catalog_service.py @@ -0,0 +1,101 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +from typing import List + +from sqlalchemy import and_ +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import select + +from evadb.catalog.models.job_history_catalog import JobHistoryCatalog +from evadb.catalog.models.utils import JobHistoryCatalogEntry +from evadb.catalog.services.base_service import BaseService +from evadb.utils.errors import CatalogError +from evadb.utils.logging_manager import logger + + +class JobHistoryCatalogService(BaseService): + def __init__(self, db_session: Session): + super().__init__(JobHistoryCatalog, db_session) + + def insert_entry( + self, + job_id: str, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ) -> JobHistoryCatalogEntry: + try: + job_history_catalog_obj = self.model( + job_id=job_id, + job_name=job_name, + execution_start_time=execution_start_time, + execution_end_time=execution_end_time, + ) + job_history_catalog_obj = job_history_catalog_obj.save(self.session) + + except Exception as e: + logger.exception( + f"Failed to insert entry into job history catalog with exception {str(e)}" + ) + raise CatalogError(e) + + return job_history_catalog_obj.as_dataclass() + + def get_entry_by_job_id(self, job_id: int) -> List[JobHistoryCatalogEntry]: + """ + Get all the job history catalog entry with given job id. + Arguments: + job_id (int): Job id + Returns: + list[JobHistoryCatalogEntry]: all history catalog entries for given job id + """ + entries = ( + self.session.execute( + select(self.model).filter(self.model._job_id == job_id) + ) + .scalars() + .all() + ) + entries = [row.as_dataclass() for row in entries] + return entries + + def update_entry_end_time( + self, job_id: int, execution_start_time: datetime, execution_end_time: datetime + ): + """Update the execution_end_time of the entry as per the provided values + Arguments: + job_id (int): id of the job whose history entry which should be updated + + execution_start_time (datetime): the start time for the job history entry + + execution_end_time (datetime): the end time for the job history entry + Returns: + void + """ + job_history_entry = ( + self.session.query(self.model) + .filter( + and_( + self.model._job_id == job_id, + self.model._execution_start_time == execution_start_time, + ) + ) + .first() + ) + if job_history_entry: + job_history_entry._execution_end_time = execution_end_time + self.session.commit() diff --git a/evadb/catalog/sql_config.py b/evadb/catalog/sql_config.py index 0a460a899d..fed6630f3e 100644 --- a/evadb/catalog/sql_config.py +++ b/evadb/catalog/sql_config.py @@ -38,6 +38,8 @@ "functionio_catalog", "function_cost_catalog", "function_metadata_catalog", + "job_catalog", + "job_history_catalog", ] # Add all keywords that are restricted by EvaDB diff --git a/evadb/executor/create_function_executor.py b/evadb/executor/create_function_executor.py index be11370737..e14b9cde7d 100644 --- a/evadb/executor/create_function_executor.py +++ b/evadb/executor/create_function_executor.py @@ -18,6 +18,7 @@ import os import pickle import re +import time from pathlib import Path from typing import Dict, List @@ -56,6 +57,10 @@ from evadb.utils.logging_manager import logger +def root_mean_squared_error(y_true, y_pred): + return np.sqrt(np.mean(np.square(y_pred - y_true))) + + # From https://stackoverflow.com/a/34333710 @contextlib.contextmanager def set_env(**environ): @@ -127,6 +132,7 @@ def handle_ludwig_function(self): aggregated_batch.drop_column_alias() arg_map = {arg.key: arg.value for arg in self.node.metadata} + start_time = int(time.time()) auto_train_results = auto_train( dataset=aggregated_batch.frames, target=arg_map["predict"], @@ -136,11 +142,13 @@ def handle_ludwig_function(self): "tmp_dir" ), ) + train_time = int(time.time()) - start_time model_path = os.path.join( self.db.catalog().get_configuration_catalog_value("model_dir"), self.node.name, ) auto_train_results.best_model.save(model_path) + best_score = auto_train_results.experiment_analysis.best_result["metric_score"] self.node.metadata.append( FunctionMetadataCatalogEntry("model_path", model_path) ) @@ -153,6 +161,8 @@ def handle_ludwig_function(self): self.node.function_type, io_list, self.node.metadata, + best_score, + train_time, ) def handle_sklearn_function(self): @@ -180,7 +190,10 @@ def handle_sklearn_function(self): model = LinearRegression() Y = aggregated_batch.frames[arg_map["predict"]] aggregated_batch.frames.drop([arg_map["predict"]], axis=1, inplace=True) + start_time = int(time.time()) model.fit(X=aggregated_batch.frames, y=Y) + train_time = int(time.time()) - start_time + score = model.score(X=aggregated_batch.frames, y=Y) model_path = os.path.join( self.db.catalog().get_configuration_catalog_value("model_dir"), self.node.name, @@ -202,6 +215,8 @@ def handle_sklearn_function(self): self.node.function_type, io_list, self.node.metadata, + score, + train_time, ) def convert_to_numeric(self, x): @@ -243,9 +258,11 @@ def handle_xgboost_function(self): "estimator_list": ["xgboost"], "task": arg_map.get("task", DEFAULT_XGBOOST_TASK), } + start_time = int(time.time()) model.fit( dataframe=aggregated_batch.frames, label=arg_map["predict"], **settings ) + train_time = int(time.time()) - start_time model_path = os.path.join( self.db.catalog().get_configuration_catalog_value("model_dir"), self.node.name, @@ -262,7 +279,6 @@ def handle_xgboost_function(self): impl_path = Path(f"{self.function_dir}/xgboost.py").absolute().as_posix() io_list = self._resolve_function_io(None) best_score = model.best_loss - train_time = model.best_config_train_time return ( self.node.name, impl_path, @@ -408,7 +424,6 @@ def handle_forecasting_function(self): AutoNHITS, AutoPatchTST, AutoTFT, - AutoTimesNet, ) # from neuralforecast.auto import AutoAutoformer as AutoAFormer @@ -421,7 +436,6 @@ def handle_forecasting_function(self): FEDformer, Informer, PatchTST, - TimesNet, ) # from neuralforecast.models import Autoformer as AFormer @@ -441,8 +455,6 @@ def handle_forecasting_function(self): # "AutoAFormer": AutoAFormer, "Informer": Informer, "AutoInformer": AutoInformer, - "TimesNet": TimesNet, - "AutoTimesNet": AutoTimesNet, "TFT": TFT, "AutoTFT": AutoTFT, } @@ -590,12 +602,11 @@ def get_optuna_config(trial): crossvalidation_df.unique_id == uid ] rmses.append( - mean_squared_error( + root_mean_squared_error( crossvalidation_df_here.y, crossvalidation_df_here[ arg_map["model"] + "-median" ], - squared=False, ) / np.mean(crossvalidation_df_here.y) ) @@ -631,10 +642,9 @@ def get_optuna_config(trial): crossvalidation_df.unique_id == uid ] rmses.append( - mean_squared_error( + root_mean_squared_error( crossvalidation_df_here.y, crossvalidation_df_here[arg_map["model"]], - squared=False, ) / np.mean(crossvalidation_df_here.y) ) @@ -646,8 +656,6 @@ def get_optuna_config(trial): model_path = os.path.join(model_dir, existing_model_files[-1]) io_list = self._resolve_function_io(None) data["ds"] = data.ds.astype(str) - last_ds = list(data["ds"])[-2 * horizon :] - last_y = list(data["y"])[-2 * horizon :] metadata_here = [ FunctionMetadataCatalogEntry("model_name", arg_map["model"]), FunctionMetadataCatalogEntry("model_path", model_path), @@ -663,8 +671,6 @@ def get_optuna_config(trial): FunctionMetadataCatalogEntry("horizon", horizon), FunctionMetadataCatalogEntry("library", library), FunctionMetadataCatalogEntry("conf", conf), - FunctionMetadataCatalogEntry("last_ds", last_ds), - FunctionMetadataCatalogEntry("last_y", last_y), ] return ( @@ -752,6 +758,8 @@ def exec(self, *args, **kwargs): function_type, io_list, metadata, + best_score, + train_time, ) = self.handle_ludwig_function() elif string_comparison_case_insensitive(self.node.function_type, "Sklearn"): ( @@ -760,6 +768,8 @@ def exec(self, *args, **kwargs): function_type, io_list, metadata, + best_score, + train_time, ) = self.handle_sklearn_function() elif string_comparison_case_insensitive(self.node.function_type, "XGBoost"): ( @@ -802,7 +812,7 @@ def exec(self, *args, **kwargs): [ msg, "Validation Score: " + str(best_score), - "Training time: " + str(train_time), + "Training time: " + str(train_time) + " secs.", ] ) ) diff --git a/evadb/executor/create_job_executor.py b/evadb/executor/create_job_executor.py new file mode 100644 index 0000000000..1a614635fe --- /dev/null +++ b/evadb/executor/create_job_executor.py @@ -0,0 +1,139 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import re +from datetime import datetime + +import pandas as pd + +from evadb.database import EvaDBDatabase +from evadb.executor.abstract_executor import AbstractExecutor +from evadb.executor.executor_utils import ExecutorError +from evadb.models.storage.batch import Batch +from evadb.parser.create_statement import CreateJobStatement +from evadb.parser.parser import Parser +from evadb.utils.logging_manager import logger + + +class CreateJobExecutor(AbstractExecutor): + def __init__(self, db: EvaDBDatabase, node: CreateJobStatement): + super().__init__(db, node) + + def _parse_datetime_str(self, datetime_str: str) -> datetime: + datetime_format = "%Y-%m-%d %H:%M:%S" + date_format = "%Y-%m-%d" + + if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", datetime_str): + try: + return datetime.strptime(datetime_str, datetime_format) + except ValueError: + raise ExecutorError( + f"{datetime_str} is not in the correct datetime format. expected format: {datetime_format}." + ) + elif re.match(r"\d{4}-\d{2}-\d{2}", datetime_str): + try: + return datetime.strptime(datetime_str, date_format) + except ValueError: + raise ExecutorError( + f"{datetime_str} is not in the correct date format. expected format: {date_format}." + ) + else: + raise ValueError( + f"{datetime_str} does not match the expected date or datetime format" + ) + + def _get_repeat_time_interval_seconds( + self, repeat_interval: int, repeat_period: str + ) -> int: + unit_to_seconds = { + "seconds": 1, + "minute": 60, + "minutes": 60, + "min": 60, + "hour": 3600, + "hours": 3600, + "day": 86400, + "days": 86400, + "week": 604800, + "weeks": 604800, + "month": 2592000, + "months": 2592000, + } + assert (repeat_period is None) or ( + repeat_period in unit_to_seconds + ), "repeat period should be one of these values: seconds | minute | minutes | min | hour | hours | day | days | week | weeks | month | months" + + repeat_interval = 1 if repeat_interval is None else repeat_interval + return repeat_interval * unit_to_seconds.get(repeat_period, 0) + + def exec(self, *args, **kwargs): + # Check if the job already exists. + job_catalog_entry = self.catalog().get_job_catalog_entry(self.node.job_name) + + if job_catalog_entry is not None: + if self.node.if_not_exists: + msg = f"A job with name {self.node.job_name} already exists, nothing added." + yield Batch(pd.DataFrame([msg])) + return + else: + raise ExecutorError( + f"A job with name {self.node.job_name} already exists." + ) + + logger.debug(f"Creating job {self.node}") + + job_name = self.node.job_name + queries = [] + parser = Parser() + + for q in self.node.queries: + try: + curr_query = str(q) + parser.parse(curr_query) + queries.append(curr_query) + except Exception: + error_msg = f"Failed to parse the job query: {curr_query}" + logger.exception(error_msg) + raise ExecutorError(error_msg) + start_time = ( + self._parse_datetime_str(self.node.start_time) + if self.node.start_time is not None + else datetime.datetime.now() + ) + end_time = ( + self._parse_datetime_str(self.node.end_time) + if self.node.end_time is not None + else None + ) + repeat_interval = self._get_repeat_time_interval_seconds( + self.node.repeat_interval, self.node.repeat_period + ) + active = True + next_schedule_run = start_time + + self.catalog().insert_job_catalog_entry( + job_name, + queries, + start_time, + end_time, + repeat_interval, + active, + next_schedule_run, + ) + + yield Batch( + pd.DataFrame( + [f"The job {self.node.job_name} has been successfully created."] + ) + ) diff --git a/evadb/executor/drop_object_executor.py b/evadb/executor/drop_object_executor.py index a857f15eae..c4f108052e 100644 --- a/evadb/executor/drop_object_executor.py +++ b/evadb/executor/drop_object_executor.py @@ -46,6 +46,9 @@ def exec(self, *args, **kwargs): elif self.node.object_type == ObjectType.DATABASE: yield self._handle_drop_database(self.node.name, self.node.if_exists) + elif self.node.object_type == ObjectType.JOB: + yield self._handle_drop_job(self.node.name, self.node.if_exists) + def _handle_drop_table(self, table_name: str, if_exists: bool): if not self.catalog().check_table_exists(table_name): err_msg = "Table: {} does not exist".format(table_name) @@ -160,3 +163,24 @@ def _handle_drop_database(self, database_name: str, if_exists: bool): index=[0], ) ) + + def _handle_drop_job(self, job_name: str, if_exists: bool): + job_catalog_entry = self.catalog().get_job_catalog_entry(job_name) + if not job_catalog_entry: + err_msg = f"Job {job_name} does not exist, therefore cannot be dropped." + if if_exists: + logger.warning(err_msg) + return Batch(pd.DataFrame([err_msg])) + else: + raise RuntimeError(err_msg) + + logger.debug(f"Dropping Job {job_name}") + + self.catalog().drop_job_catalog_entry(job_catalog_entry) + + return Batch( + pd.DataFrame( + {f"Job {job_name} successfully dropped"}, + index=[0], + ) + ) diff --git a/evadb/executor/plan_executor.py b/evadb/executor/plan_executor.py index 94d290bdb3..a2921f3cbb 100644 --- a/evadb/executor/plan_executor.py +++ b/evadb/executor/plan_executor.py @@ -21,6 +21,7 @@ from evadb.executor.create_executor import CreateExecutor from evadb.executor.create_function_executor import CreateFunctionExecutor from evadb.executor.create_index_executor import CreateIndexExecutor +from evadb.executor.create_job_executor import CreateJobExecutor from evadb.executor.delete_executor import DeleteExecutor from evadb.executor.drop_object_executor import DropObjectExecutor from evadb.executor.exchange_executor import ExchangeExecutor @@ -48,7 +49,7 @@ from evadb.executor.use_executor import UseExecutor from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor from evadb.models.storage.batch import Batch -from evadb.parser.create_statement import CreateDatabaseStatement +from evadb.parser.create_statement import CreateDatabaseStatement, CreateJobStatement from evadb.parser.set_statement import SetStatement from evadb.parser.statement import AbstractStatement from evadb.parser.use_statement import UseStatement @@ -93,6 +94,8 @@ def _build_execution_tree( return UseExecutor(db=self._db, node=plan) elif isinstance(plan, SetStatement): return SetExecutor(db=self._db, node=plan) + elif isinstance(plan, CreateJobStatement): + return CreateJobExecutor(db=self._db, node=plan) # Get plan node type plan_opr_type = plan.opr_type diff --git a/evadb/interfaces/relational/db.py b/evadb/interfaces/relational/db.py index a8d66a22bf..428d0878f5 100644 --- a/evadb/interfaces/relational/db.py +++ b/evadb/interfaces/relational/db.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +import multiprocessing import pandas @@ -43,6 +44,7 @@ ) from evadb.server.command_handler import execute_statement from evadb.utils.generic_utils import find_nearest_word, is_ray_enabled_and_installed +from evadb.utils.job_scheduler import JobScheduler from evadb.utils.logging_manager import logger @@ -53,6 +55,7 @@ def __init__(self, evadb: EvaDBDatabase, reader, writer): self._cursor = None self._result: Batch = None self._evadb = evadb + self._jobs_process = None def cursor(self): """Retrieves a cursor associated with the connection. @@ -80,6 +83,23 @@ def cursor(self): self._cursor = EvaDBCursor(self) return self._cursor + def start_jobs(self): + if self._jobs_process and self._jobs_process.is_alive(): + logger.debug("The job scheduler is already running") + return + + job_scheduler = JobScheduler(self._evadb) + self._jobs_process = multiprocessing.Process(target=job_scheduler.execute) + self._jobs_process.daemon = True + self._jobs_process.start() + logger.debug("Job scheduler process started") + + def stop_jobs(self): + if self._jobs_process is not None and self._jobs_process.is_alive(): + self._jobs_process.terminate() + self._jobs_process.join() + logger.debug("Job scheduler process stopped") + class EvaDBCursor(object): def __init__(self, connection): diff --git a/evadb/parser/create_statement.py b/evadb/parser/create_statement.py index ca79a9eac4..89aee64cfa 100644 --- a/evadb/parser/create_statement.py +++ b/evadb/parser/create_statement.py @@ -12,7 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Tuple +from dataclasses import dataclass +from typing import List, Optional, Tuple from evadb.catalog.catalog_type import ColumnType, NdArrayType from evadb.parser.select_statement import SelectStatement @@ -226,3 +227,44 @@ def __str__(self) -> str: f"WITH ENGINE '{self.engine}' , \n" f"PARAMETERS = {self.param_dict};" ) + + +@dataclass +class CreateJobStatement(AbstractStatement): + job_name: str + queries: list + if_not_exists: bool + start_time: Optional[str] = None + end_time: Optional[str] = None + repeat_interval: Optional[int] = None + repeat_period: Optional[str] = None + + def __hash__(self): + return hash( + ( + super().__hash__(), + self.job_name, + tuple(self.queries), + self.start_time, + self.end_time, + self.repeat_interval, + self.repeat_period, + ) + ) + + def __post_init__(self): + super().__init__(StatementType.CREATE_JOB) + + def __str__(self): + start_str = f"\nSTART {self.start_time}" if self.start_time is not None else "" + end_str = f"\nEND {self.end_time}" if self.end_time is not None else "" + repeat_str = ( + f"\nEVERY {self.repeat_interval} {self.repeat_period}" + if self.repeat_interval is not None + else "" + ) + return ( + f"CREATE JOB {self.job_name} AS\n" + f"({(str(q) for q in self.queries)})" + f"{start_str} {end_str} {repeat_str}" + ) diff --git a/evadb/parser/evadb.lark b/evadb/parser/evadb.lark index ab4cdb6772..86798df6c0 100644 --- a/evadb/parser/evadb.lark +++ b/evadb/parser/evadb.lark @@ -1,11 +1,15 @@ // Top Level Description -start: (sql_statement? ";")+ +// create_job is intentionally not treated as an sql_statement to keep the parser clean +// because we assume that inside the job, the user can specify multiple sql_statements +// but not a create_job within a create_job. + +start: (sql_statement? ";")+ | (create_job ";") sql_statement: ddl_statement | dml_statement | utility_statement | context_statement - -ddl_statement: create_database | create_table | create_index | create_function - | drop_database | drop_table | drop_function | drop_index | rename_table + +ddl_statement: create_database | create_table | create_index | create_function | drop_database + | drop_table | drop_function | drop_index | drop_job | rename_table dml_statement: select_statement | insert_statement | update_statement | delete_statement | load_statement | set_statement @@ -14,6 +18,9 @@ utility_statement: describe_statement | show_statement | help_statement | explai context_statement: use_statement +job_sql_statements: query_string (";" query_string)* ";"? + + // Data Definition Language // Create statements @@ -29,7 +36,18 @@ create_database_engine_clause: WITH ENGINE "=" string_literal "," PARAMETERS "=" create_index: CREATE INDEX if_not_exists? uid ON table_name index_elem vector_store_type? create_table: CREATE TABLE if_not_exists? table_name (create_definitions | (AS select_statement)) - + +create_job: CREATE JOB if_not_exists? uid AS "{" job_sql_statements "}" (start_time)? (end_time)? (repeat_clause)? + +start_time: START string_literal + +end_time: END string_literal + +repeat_clause: EVERY decimal_literal simple_id + + + + // Rename statements rename_table: RENAME TABLE table_name TO table_name @@ -78,6 +96,8 @@ drop_index: DROP INDEX if_exists? uid drop_table: DROP TABLE if_exists? uid drop_function: DROP FUNCTION if_exists? uid + +drop_job: DROP JOB if_exists? uid // SET statements (configuration) set_statement: SET config_name (EQUAL_SYMBOL | TO) config_value @@ -348,7 +368,9 @@ DESC: "DESC"i DESCRIBE: "DESCRIBE"i DISTINCT: "DISTINCT"i DROP: "DROP"i +END: "END"i ENGINE: "ENGINE"i +EVERY: "EVERY"i EXIT: "EXIT"i EXISTS: "EXISTS"i EXPLAIN: "EXPLAIN"i @@ -364,6 +386,7 @@ INTO: "INTO"i INDEX: "INDEX"i INSERT: "INSERT"i IS: "IS"i +JOB: "JOB"i JOIN: "JOIN"i KEY: "KEY"i LATERAL: "LATERAL"i @@ -392,6 +415,7 @@ SET: "SET"i SHUTDOWN: "SHUTDOWN"i SHOW: "SHOW"i SOME: "SOME"i +START: "START"i TABLE: "TABLE"i TABLES: "TABLES"i TO: "TO"i @@ -567,7 +591,6 @@ REAL_LITERAL: (DEC_DIGIT+)? "." DEC_DIGIT+ DOT_ID: "." ID_LITERAL - // Identifiers ID: ID_LITERAL diff --git a/evadb/parser/lark_visitor/__init__.py b/evadb/parser/lark_visitor/__init__.py index 9ed0a1b6fe..911e886a2d 100644 --- a/evadb/parser/lark_visitor/__init__.py +++ b/evadb/parser/lark_visitor/__init__.py @@ -20,6 +20,7 @@ from evadb.parser.lark_visitor._create_statements import ( CreateDatabase, CreateIndex, + CreateJob, CreateTable, ) from evadb.parser.lark_visitor._delete_statement import Delete @@ -66,6 +67,7 @@ class LarkInterpreter( CreateTable, CreateIndex, CreateDatabase, + CreateJob, Expressions, Functions, Insert, @@ -89,3 +91,11 @@ def start(self, tree): def sql_statement(self, tree): return self.visit(tree.children[0]) + + def job_sql_statements(self, tree): + sql_statements = [] + for child in tree.children: + if isinstance(child, Tree): + if child.data == "query_string": + sql_statements.append(self.visit(child)) + return sql_statements diff --git a/evadb/parser/lark_visitor/_create_statements.py b/evadb/parser/lark_visitor/_create_statements.py index a24f1eafc9..72066b294c 100644 --- a/evadb/parser/lark_visitor/_create_statements.py +++ b/evadb/parser/lark_visitor/_create_statements.py @@ -22,6 +22,7 @@ ColConstraintInfo, ColumnDefinition, CreateDatabaseStatement, + CreateJobStatement, CreateTableStatement, ) from evadb.parser.table_ref import TableRef @@ -336,3 +337,49 @@ def create_database_engine_clause(self, tree): param_dict = self.visit(child) return engine, param_dict + + +class CreateJob: + def create_job(self, tree): + job_name = None + queries = [] + start_time = None + end_time = None + repeat_interval = None + repeat_period = None + if_not_exists = False + for child in tree.children: + if isinstance(child, Tree): + if child.data == "if_not_exists": + if_not_exists = True + if child.data == "uid": + job_name = self.visit(child) + if child.data == "job_sql_statements": + queries = self.visit(child) + elif child.data == "start_time": + start_time = self.visit(child) + elif child.data == "end_time": + end_time = self.visit(child) + elif child.data == "repeat_clause": + repeat_interval, repeat_period = self.visit(child) + + create_job = CreateJobStatement( + job_name, + queries, + if_not_exists, + start_time, + end_time, + repeat_interval, + repeat_period, + ) + + return create_job + + def start_time(self, tree): + return self.visit(tree.children[1]).value + + def end_time(self, tree): + return self.visit(tree.children[1]).value + + def repeat_clause(self, tree): + return self.visit(tree.children[1]), self.visit(tree.children[2]) diff --git a/evadb/parser/lark_visitor/_drop_statement.py b/evadb/parser/lark_visitor/_drop_statement.py index 0b397378ae..7fc96298ed 100644 --- a/evadb/parser/lark_visitor/_drop_statement.py +++ b/evadb/parser/lark_visitor/_drop_statement.py @@ -73,3 +73,17 @@ def drop_database(self, tree): database_name = self.visit(child) return DropObjectStatement(ObjectType.DATABASE, database_name, if_exists) + + # Drop Job + def drop_job(self, tree): + job_name = None + if_exists = False + + for child in tree.children: + if isinstance(child, Tree): + if child.data == "if_exists": + if_exists = True + elif child.data == "uid": + job_name = self.visit(child) + + return DropObjectStatement(ObjectType.JOB, job_name, if_exists) diff --git a/evadb/parser/lark_visitor/_expressions.py b/evadb/parser/lark_visitor/_expressions.py index 6ec01cf991..ff53ed4e1a 100644 --- a/evadb/parser/lark_visitor/_expressions.py +++ b/evadb/parser/lark_visitor/_expressions.py @@ -20,6 +20,7 @@ from evadb.expression.comparison_expression import ComparisonExpression from evadb.expression.constant_value_expression import ConstantValueExpression from evadb.expression.logical_expression import LogicalExpression +from evadb.utils.generic_utils import string_comparison_case_insensitive ################################################################## @@ -101,10 +102,12 @@ def comparison_operator(self, tree): def logical_operator(self, tree): op = str(tree.children[0]) - if op == "OR": + if string_comparison_case_insensitive(op, "OR"): return ExpressionType.LOGICAL_OR - elif op == "AND": + elif string_comparison_case_insensitive(op, "AND"): return ExpressionType.LOGICAL_AND + else: + raise NotImplementedError("Unsupported logical operator: {}".format(op)) def expressions_with_defaults(self, tree): expr_list = [] diff --git a/evadb/parser/select_statement.py b/evadb/parser/select_statement.py index b04c7148bb..69270a6b84 100644 --- a/evadb/parser/select_statement.py +++ b/evadb/parser/select_statement.py @@ -141,7 +141,11 @@ def __str__(self) -> str: orderby_list_str += str(expr[0]) + " " + sort_str + ", " orderby_list_str = orderby_list_str.rstrip(", ") - select_str = f"SELECT {target_list_str} FROM {str(self._from_table)}" + select_str = f"SELECT {target_list_str}" + + if self._from_table is not None: + select_str += " FROM " + str(self._from_table) + if self._where_clause is not None: select_str += " WHERE " + str(self._where_clause) diff --git a/evadb/parser/types.py b/evadb/parser/types.py index 751d2b5f31..227a768c7b 100644 --- a/evadb/parser/types.py +++ b/evadb/parser/types.py @@ -42,6 +42,7 @@ class StatementType(EvaDBEnum): CREATE_DATABASE # noqa: F821 USE # noqa: F821 SET # noqa: F821 + CREATE_JOB # noqa: F821 # add other types @@ -83,3 +84,4 @@ class ObjectType(EvaDBEnum): FUNCTION # noqa: F821 INDEX # noqa: F821 DATABASE # noqa: F821 + JOB # noqa: F821 diff --git a/evadb/parser/utils.py b/evadb/parser/utils.py index a2be06ec16..dd4567cf4b 100644 --- a/evadb/parser/utils.py +++ b/evadb/parser/utils.py @@ -13,7 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. from evadb.parser.create_function_statement import CreateFunctionStatement -from evadb.parser.create_statement import CreateDatabaseStatement, CreateTableStatement +from evadb.parser.create_statement import ( + CreateDatabaseStatement, + CreateJobStatement, + CreateTableStatement, +) from evadb.parser.drop_object_statement import DropObjectStatement from evadb.parser.explain_statement import ExplainStatement from evadb.parser.insert_statement import InsertTableStatement @@ -30,6 +34,7 @@ # directly to the executor. SKIP_BINDER_AND_OPTIMIZER_STATEMENTS = ( CreateDatabaseStatement, + CreateJobStatement, UseStatement, SetStatement, ) diff --git a/evadb/utils/job_scheduler.py b/evadb/utils/job_scheduler.py new file mode 100644 index 0000000000..74ee99b7fc --- /dev/null +++ b/evadb/utils/job_scheduler.py @@ -0,0 +1,120 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import sys +import time + +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.database import EvaDBDatabase +from evadb.server.command_handler import execute_query +from evadb.utils.logging_manager import logger + + +class JobScheduler: + def __init__(self, evadb: EvaDBDatabase) -> None: + self.poll_interval_seconds = 30 + self._evadb = evadb + + def _update_next_schedule_run(self, job_catalog_entry: JobCatalogEntry) -> bool: + job_end_time = job_catalog_entry.end_time + active_status = False + if job_catalog_entry.repeat_interval and job_catalog_entry.repeat_interval > 0: + next_trigger_time = datetime.datetime.now() + datetime.timedelta( + seconds=job_catalog_entry.repeat_interval + ) + if not job_end_time or next_trigger_time < job_end_time: + active_status = True + + next_trigger_time = ( + next_trigger_time if active_status else job_catalog_entry.next_scheduled_run + ) + self._evadb.catalog().update_job_catalog_entry( + job_catalog_entry.name, + next_trigger_time, + active_status, + ) + return active_status, next_trigger_time + + def _get_sleep_time(self, next_job_entry: JobCatalogEntry) -> int: + sleep_time = self.poll_interval_seconds + if next_job_entry: + sleep_time = min( + sleep_time, + ( + next_job_entry.next_scheduled_run - datetime.datetime.now() + ).total_seconds(), + ) + sleep_time = max(0, sleep_time) + return sleep_time + + def _scan_and_execute_jobs(self): + while True: + try: + for next_executable_job in iter( + lambda: self._evadb.catalog().get_next_executable_job( + only_past_jobs=True + ), + None, + ): + execution_time = datetime.datetime.now() + + # insert a job history record to mark start of this execution + self._evadb.catalog().insert_job_history_catalog_entry( + next_executable_job.row_id, + next_executable_job.name, + execution_time, + None, + ) + + # execute the queries of the job + execution_results = [ + execute_query(self._evadb, query) + for query in next_executable_job.queries + ] + logger.debug( + f"Exection result for job: {next_executable_job.name} results: {execution_results}" + ) + + # update the next trigger time for this job + self._update_next_schedule_run(next_executable_job) + + # update the previosly inserted job history record with endtime + self._evadb.catalog().update_job_history_end_time( + next_executable_job.row_id, + execution_time, + datetime.datetime.now(), + ) + + next_executable_job = self._evadb.catalog().get_next_executable_job( + only_past_jobs=False + ) + + sleep_time = self._get_sleep_time(next_executable_job) + if sleep_time > 0: + logger.debug( + f"Job scheduler process sleeping for {sleep_time} seconds" + ) + time.sleep(sleep_time) + except Exception as e: + logger.error(f"Got an exception in job scheduler: {str(e)}") + time.sleep(self.poll_interval_seconds * 0.2) + + def execute(self): + try: + self._scan_and_execute_jobs() + except KeyboardInterrupt: + logger.debug("Exiting the job scheduler process due to interrupt") + sys.exit() diff --git a/setup.py b/setup.py index ae5d6fdbb4..3334fa8361 100644 --- a/setup.py +++ b/setup.py @@ -80,7 +80,7 @@ def read(path, encoding="utf-8"): "sentence-transformers", "protobuf", "bs4", - "openai>=0.27.4", # CHATGPT + "openai==0.28", # CHATGPT "gpt4all", # PRIVATE GPT "sentencepiece", # TRANSFORMERS ] diff --git a/test/integration_tests/long/test_job_scheduler_execution.py b/test/integration_tests/long/test_job_scheduler_execution.py new file mode 100644 index 0000000000..e00b3ce537 --- /dev/null +++ b/test/integration_tests/long/test_job_scheduler_execution.py @@ -0,0 +1,94 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import unittest +from datetime import datetime, timedelta +from test.util import get_evadb_for_testing, shutdown_ray + +from mock import MagicMock + +from evadb.interfaces.relational.db import EvaDBConnection +from evadb.server.command_handler import execute_query_fetch_all + + +class JobSchedulerIntegrationTests(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @classmethod + def setUpClass(cls): + cls.evadb = get_evadb_for_testing() + # reset the catalog manager before running each test + cls.evadb.catalog().reset() + cls.job_name_1 = "test_async_job_1" + cls.job_name_2 = "test_async_job_2" + + def setUp(self): + execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {self.job_name_1};") + execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {self.job_name_2};") + + @classmethod + def tearDownClass(cls): + shutdown_ray() + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name_1};") + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name_2};") + + def create_jobs(self): + datetime_format = "%Y-%m-%d %H:%M:%S" + start_time = (datetime.now() - timedelta(seconds=10)).strftime(datetime_format) + end_time = (datetime.now() + timedelta(seconds=60)).strftime(datetime_format) + + create_csv_query = """CREATE TABLE IF NOT EXISTS MyCSV ( + id INTEGER UNIQUE, + frame_id INTEGER, + video_id INTEGER + ); + """ + job_1_query = f"""CREATE JOB IF NOT EXISTS {self.job_name_1} AS {{ + SELECT * FROM MyCSV; + }} + START '{start_time}' + END '{end_time}' + EVERY 4 seconds; + """ + job_2_query = f"""CREATE JOB IF NOT EXISTS {self.job_name_2} AS {{ + SHOW FUNCTIONS; + }} + START '{start_time}' + END '{end_time}' + EVERY 2 seconds; + """ + + execute_query_fetch_all(self.evadb, create_csv_query) + execute_query_fetch_all(self.evadb, job_1_query) + execute_query_fetch_all(self.evadb, job_2_query) + + def test_should_execute_the_scheduled_jobs(self): + self.create_jobs() + connection = EvaDBConnection(self.evadb, MagicMock(), MagicMock()) + + # start the job scheduler + connection.start_jobs() + + # let the job scheduler run for 10 seconds + time.sleep(15) + connection.stop_jobs() + + job_1_execution_count = len(self.evadb.catalog().get_job_history_by_job_id(1)) + job_2_execution_count = len(self.evadb.catalog().get_job_history_by_job_id(2)) + + self.assertGreater(job_2_execution_count, job_1_execution_count) + self.assertGreater(job_2_execution_count, 2) + self.assertGreater(job_1_execution_count, 2) diff --git a/test/integration_tests/long/test_model_forecasting.py b/test/integration_tests/long/test_model_forecasting.py index 76e5562357..c376f7610c 100644 --- a/test/integration_tests/long/test_model_forecasting.py +++ b/test/integration_tests/long/test_model_forecasting.py @@ -104,6 +104,11 @@ def test_forecast(self): ], ) + @pytest.mark.skip( + reason="Neuralforecast intergration test takes too long to complete without GPU." + ) + @forecast_skip_marker + def test_forecast_neuralforecast(self): create_predict_udf = """ CREATE FUNCTION AirPanelForecast FROM (SELECT unique_id, ds, y, trend FROM AirDataPanel) diff --git a/test/integration_tests/short/test_create_job_executor.py b/test/integration_tests/short/test_create_job_executor.py new file mode 100644 index 0000000000..9a9ce18c1b --- /dev/null +++ b/test/integration_tests/short/test_create_job_executor.py @@ -0,0 +1,126 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from datetime import datetime +from test.util import get_evadb_for_testing, shutdown_ray + +from evadb.executor.executor_utils import ExecutorError +from evadb.server.command_handler import execute_query_fetch_all + + +class CreateJobTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.evadb = get_evadb_for_testing() + # reset the catalog manager before running each test + cls.evadb.catalog().reset() + cls.job_name = "test_async_job" + + def setUp(self): + execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {self.job_name};") + + @classmethod + def tearDownClass(cls): + shutdown_ray() + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name};") + + def test_invalid_query_in_job_should_raise_exception(self): + # missing closing paranthesis in the query + query = f"""CREATE JOB {self.job_name} AS {{ + CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales + TYPE Forecasting + PREDICT 'price'; + }} + START '2023-04-01 01:10:00' + END '2023-05-01' + EVERY 2 week; + """ + with self.assertRaisesRegex(Exception, "Failed to parse the job query"): + execute_query_fetch_all(self.evadb, query) + + def test_create_job_should_add_the_entry(self): + queries = [ + """CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales ) + TYPE Forecasting + PREDICT 'price';""", + "Select HomeSalesForecast(10);", + ] + start = "2023-04-01 01:10:00" + end = "2023-05-01" + repeat_interval = 2 + repeat_period = "week" + + all_queries = "".join(queries) + query = f"""CREATE JOB {self.job_name} AS {{ + {all_queries} + }} + START '{start}' + END '{end}' + EVERY {repeat_interval} {repeat_period};""" + + execute_query_fetch_all(self.evadb, query) + + datetime_format = "%Y-%m-%d %H:%M:%S" + date_format = "%Y-%m-%d" + job_entry = self.evadb.catalog().get_job_catalog_entry(self.job_name) + self.assertEqual(job_entry.name, self.job_name) + self.assertEqual( + job_entry.start_time, datetime.strptime(start, datetime_format) + ) + self.assertEqual(job_entry.end_time, datetime.strptime(end, date_format)) + self.assertEqual(job_entry.repeat_interval, 2 * 7 * 24 * 60 * 60) + self.assertEqual(job_entry.active, True) + self.assertEqual(len(job_entry.queries), len(queries)) + + def test_should_create_job_with_if_not_exists(self): + if_not_exists = "IF NOT EXISTS" + + queries = [ + """CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales ) + TYPE Forecasting + PREDICT 'price';""", + "Select HomeSalesForecast(10);", + ] + + query = """CREATE JOB {} {} AS {{ + {} + }} + START '2023-04-01' + END '2023-05-01' + EVERY 2 week; + """ + + # Create the database. + execute_query_fetch_all( + self.evadb, query.format(if_not_exists, self.job_name, "".join(queries)) + ) + + # Trying to create the same database should raise an exception. + with self.assertRaises(ExecutorError): + execute_query_fetch_all( + self.evadb, query.format("", self.job_name, "".join(queries)) + ) + + # Trying to create the same database should warn if "IF NOT EXISTS" is provided. + execute_query_fetch_all( + self.evadb, query.format(if_not_exists, self.job_name, "".join(queries)) + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/integration_tests/short/test_drop_executor.py b/test/integration_tests/short/test_drop_executor.py index 632aa4a008..53dc1211eb 100644 --- a/test/integration_tests/short/test_drop_executor.py +++ b/test/integration_tests/short/test_drop_executor.py @@ -245,3 +245,39 @@ def test_should_drop_database(self): result = execute_query_fetch_all( self.evadb, f"DROP DATABASE IF EXISTS {database_name}" ) + + def test_should_drop_job(self): + # Create database. + job_name = "test_async_job" + + query = f"""CREATE JOB {job_name} AS {{ + SELECT * from job_catalog; + }} + START '2023-04-01' + END '2023-05-01' + EVERY 2 week;""" + + execute_query_fetch_all(self.evadb, query) + self.assertIsNotNone(self.evadb.catalog().get_job_catalog_entry(job_name)) + + # DROP JOB + execute_query_fetch_all(self.evadb, f"DROP JOB {job_name}") + self.assertIsNone(self.evadb.catalog().get_job_catalog_entry(job_name)) + + # DROP should pass with warning + result = execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {job_name}") + self.assertTrue("does not exist" in result.frames.to_string()) + + # DROP should throw error + with self.assertRaises(ExecutorError): + execute_query_fetch_all( + self.evadb, + f"DROP JOB {job_name}", + do_not_print_exceptions=True, + ) + + # We should be able to add the database again + execute_query_fetch_all(self.evadb, query) + + # clean up + result = execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {job_name}") diff --git a/test/unit_tests/parser/test_parser.py b/test/unit_tests/parser/test_parser.py index 60624b825d..6086db088e 100644 --- a/test/unit_tests/parser/test_parser.py +++ b/test/unit_tests/parser/test_parser.py @@ -21,6 +21,7 @@ from evadb.expression.comparison_expression import ComparisonExpression from evadb.expression.constant_value_expression import ConstantValueExpression from evadb.expression.function_expression import FunctionExpression +from evadb.expression.logical_expression import LogicalExpression from evadb.expression.tuple_value_expression import TupleValueExpression from evadb.parser.alias import Alias from evadb.parser.create_function_statement import CreateFunctionStatement @@ -531,6 +532,77 @@ def test_select_statement_class(self): self.assertEqual(select_stmt_new.from_table, select_stmt.from_table) self.assertEqual(str(select_stmt_new), str(select_stmt)) + def test_select_statement_where_class(self): + """ + Unit test for logical operators in the where clause. + """ + + def _verify_select_statement(evadb_statement_list): + self.assertIsInstance(evadb_statement_list, list) + self.assertEqual(len(evadb_statement_list), 1) + self.assertEqual(evadb_statement_list[0].stmt_type, StatementType.SELECT) + + select_stmt = evadb_statement_list[0] + + # target list + self.assertIsNotNone(select_stmt.target_list) + self.assertEqual(len(select_stmt.target_list), 2) + self.assertEqual( + select_stmt.target_list[0].etype, ExpressionType.TUPLE_VALUE + ) + self.assertEqual(select_stmt.target_list[0].name, "CLASS") + self.assertEqual( + select_stmt.target_list[1].etype, ExpressionType.TUPLE_VALUE + ) + self.assertEqual(select_stmt.target_list[1].name, "REDNESS") + + # from table + self.assertIsNotNone(select_stmt.from_table) + self.assertIsInstance(select_stmt.from_table, TableRef) + self.assertEqual(select_stmt.from_table.table.table_name, "TAIPAI") + + # where clause + self.assertIsNotNone(select_stmt.where_clause) + self.assertIsInstance(select_stmt.where_clause, LogicalExpression) + self.assertEqual(select_stmt.where_clause.etype, ExpressionType.LOGICAL_AND) + self.assertEqual(len(select_stmt.where_clause.children), 2) + left = select_stmt.where_clause.children[0] + right = select_stmt.where_clause.children[1] + self.assertEqual(left.etype, ExpressionType.COMPARE_EQUAL) + self.assertEqual(right.etype, ExpressionType.COMPARE_LESSER) + + self.assertEqual(len(left.children), 2) + self.assertEqual(left.children[0].etype, ExpressionType.TUPLE_VALUE) + self.assertEqual(left.children[0].name, "CLASS") + self.assertEqual(left.children[1].etype, ExpressionType.CONSTANT_VALUE) + self.assertEqual(left.children[1].value, "VAN") + + self.assertEqual(len(right.children), 2) + self.assertEqual(right.children[0].etype, ExpressionType.TUPLE_VALUE) + self.assertEqual(right.children[0].name, "REDNESS") + self.assertEqual(right.children[1].etype, ExpressionType.CONSTANT_VALUE) + self.assertEqual(right.children[1].value, 400) + + parser = Parser() + select_query = ( + "SELECT CLASS, REDNESS FROM TAIPAI WHERE CLASS = 'VAN' AND REDNESS < 400;" + ) + _verify_select_statement(parser.parse(select_query)) + + # Case insensitive test + select_query = ( + "select CLASS, REDNESS from TAIPAI where CLASS = 'VAN' and REDNESS < 400;" + ) + _verify_select_statement(parser.parse(select_query)) + + # Unsupported logical operator + select_query = ( + "SELECT CLASS, REDNESS FROM TAIPAI WHERE CLASS = 'VAN' XOR REDNESS < 400;" + ) + with self.assertRaises(NotImplementedError) as cm: + parser.parse(select_query) + self.assertEqual(str(cm.exception), "Unsupported logical operator: XOR") + def test_select_statement_groupby_class(self): """Testing sample frequency""" @@ -1171,3 +1243,30 @@ def test_class_equality(self): self.assertNotEqual(tuple_frame, table_ref) self.assertNotEqual(join_node, table_ref) self.assertNotEqual(table_ref, table_info) + + def test_create_job(self): + queries = [ + """CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales ) + TYPE Forecasting + PREDICT 'price';""", + "Select HomeSalesForecast(10);", + ] + job_query = f"""CREATE JOB my_job AS {{ + {''.join(queries)} + }} + START '2023-04-01' + END '2023-05-01' + EVERY 2 hour + """ + + parser = Parser() + job_stmt = parser.parse(job_query)[0] + self.assertEqual(job_stmt.job_name, "my_job") + self.assertEqual(len(job_stmt.queries), 2) + self.assertTrue(queries[0].rstrip(";") == str(job_stmt.queries[0])) + self.assertTrue(queries[1].rstrip(";") == str(job_stmt.queries[1])) + self.assertEqual(job_stmt.start_time, "2023-04-01") + self.assertEqual(job_stmt.end_time, "2023-05-01") + self.assertEqual(job_stmt.repeat_interval, 2) + self.assertEqual(job_stmt.repeat_period, "hour") diff --git a/test/unit_tests/utils/test_job_scheduler.py b/test/unit_tests/utils/test_job_scheduler.py new file mode 100644 index 0000000000..36f54cd9b6 --- /dev/null +++ b/test/unit_tests/utils/test_job_scheduler.py @@ -0,0 +1,81 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from datetime import datetime, timedelta + +from mock import MagicMock + +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.utils.job_scheduler import JobScheduler + + +class JobSchedulerTests(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_dummy_job_catalog_entry(self, active, job_name, next_run): + return JobCatalogEntry( + name=job_name, + queries=None, + start_time=None, + end_time=None, + repeat_interval=None, + active=active, + next_scheduled_run=next_run, + created_at=None, + updated_at=None, + ) + + def test_sleep_time_calculation(self): + past_job = self.get_dummy_job_catalog_entry( + True, "past_job", datetime.now() - timedelta(seconds=10) + ) + future_job = self.get_dummy_job_catalog_entry( + True, "future_job", datetime.now() + timedelta(seconds=20) + ) + + job_scheduler = JobScheduler(MagicMock()) + + self.assertEqual(job_scheduler._get_sleep_time(past_job), 0) + self.assertGreaterEqual(job_scheduler._get_sleep_time(future_job), 10) + self.assertEqual(job_scheduler._get_sleep_time(None), 30) + + def test_update_next_schedule_run(self): + future_time = datetime.now() + timedelta(seconds=1000) + job_scheduler = JobScheduler(MagicMock()) + job_entry = self.get_dummy_job_catalog_entry(True, "job", datetime.now()) + + # job which runs just once + job_entry.end_time = future_time + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual(status, False, "status for one time job should be false") + + # recurring job with valid end date + job_entry.end_time = future_time + job_entry.repeat_interval = 120 + expected_next_run = datetime.now() + timedelta(seconds=120) + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual(status, True, "status for recurring time job should be true") + self.assertGreaterEqual(next_run, expected_next_run) + + # recurring job with expired end date + job_entry.end_time = datetime.now() + timedelta(seconds=60) + job_entry.repeat_interval = 120 + expected_next_run = datetime.now() + timedelta(seconds=120) + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual( + status, False, "status for rexpired ecurring time job should be false" + ) + self.assertLessEqual(next_run, datetime.now())