diff --git a/.dockerignore b/.dockerignore index 513b17705..80995449d 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,5 +5,3 @@ **/.gitignore **/.dockerignore **/.gitlab-ci.yml -celerybeat-schedule.dir -celerybeat-schedule.pag diff --git a/.flake8 b/.flake8 index c54167a9c..ddea46394 100644 --- a/.flake8 +++ b/.flake8 @@ -18,4 +18,3 @@ exclude = ./libcloud/, ./paramiko/, ./run_script/, - ./celerybeat-mongo/, diff --git a/.gitignore b/.gitignore index a1846395e..6f69345cf 100644 --- a/.gitignore +++ b/.gitignore @@ -72,10 +72,6 @@ target/ # pyenv .python-version -# celery beat schedule file -celerybeat-schedule -celerybeat-schedule-py3.db - # SageMath parsed files *.sage.py @@ -108,9 +104,5 @@ ENV/ /src/mist/api/templates/landing.pt /src/mist/api/templates/manage.pt -# Celerybeat files -/celerybeat-schedule.dir -/celerybeat-schedule.pag - # openapi spec /openapi/spec.yml diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0f7ac61ab..f63f237dd 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -96,7 +96,7 @@ flake8: before_script: - cd /mist.api script: - - flake8 --ignore=E402,E722,F632,F841,W504,W605 --exclude=v2,paramiko,celerybeat-mongo,libcloud,run_script + - flake8 --ignore=E402,E722,F632,F841,W504,W605 --exclude=v2,paramiko,libcloud,run_script uniq: stage: test diff --git a/.gitmodules b/.gitmodules index f0a7ef575..e608eed06 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,9 +7,6 @@ [submodule "paramiko"] path = paramiko url = ../../mistio/paramiko.git -[submodule "celerybeat-mongo"] - path = celerybeat-mongo - url = ../../mistio/celerybeat-mongo.git [submodule "v2"] path = v2 url = ../mist-api-v2.git diff --git a/Dockerfile b/Dockerfile index 273aa0596..51d17c32d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM mist/python3:latest # Install libvirt which requires system dependencies. -RUN apk add --update --no-cache g++ gcc libvirt libvirt-dev libxml2-dev libxslt-dev gnupg ca-certificates wget mongodb-tools +RUN apk add --update --no-cache g++ gcc libvirt libvirt-dev libxml2-dev libxslt-dev gnupg ca-certificates wget mongodb-tools libmemcached-dev RUN wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.4-static_linux_amd64.tar.gz && \ tar xvfz influxdb-1.8.4-static_linux_amd64.tar.gz && rm influxdb-1.8.4-static_linux_amd64.tar.gz @@ -21,13 +21,11 @@ COPY requirements.txt /requirements-mist.api.txt WORKDIR /mist.api/ COPY paramiko /mist.api/paramiko -COPY celerybeat-mongo /mist.api/celerybeat-mongo COPY libcloud /mist.api/libcloud COPY v2 /mist.api/v2 RUN pip install --no-cache-dir -r /mist.api/requirements.txt && \ pip install -e paramiko/ && \ - pip install -e celerybeat-mongo/ && \ pip install -e libcloud/ && \ pip install -e v2/ && \ pip install --no-cache-dir -r v2/requirements.txt diff --git a/bin/apscheduler b/bin/apscheduler deleted file mode 100755 index efaf1a48f..000000000 --- a/bin/apscheduler +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python - - -from mist.api.scheduler import start - -if __name__ == "__main__": - start() diff --git a/bin/beat b/bin/beat deleted file mode 100755 index c8a87cc5d..000000000 --- a/bin/beat +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/sh - -set -e - -LOGLEVEL="INFO" -TASKS="${TASKS:-mist.api.tasks}" - -USAGE="Usage: $0 [-h] [-l ] [-t ] - -Start beat (used for internal/portal periodic tasks) - -Options: - -h Show this help message and exit. - -l Log level. Defaults to $LOGLEVEL. - -t Tasks file to import. Defaults to $TASKS. -" - -while getopts "hl:t:" opt; do - case "$opt" in - h) - echo "$USAGE" - exit - ;; - l) - LOGLEVEL=$OPTARG - ;; - t) - TASKS=$OPTARG - ;; - \?) - echo "Invalid option: -$OPTARG" >&2 - echo "$USAGE" >&2 - exit 1 - esac -done - -set -x - -exec celery beat \ - -A $TASKS \ - -S mist.api.portal.schedulers.RunImmediatelyPersistentScheduler \ - --pidfile= \ - -s celerybeat-schedule-py3 \ - -l $LOGLEVEL diff --git a/bin/celery b/bin/celery deleted file mode 100755 index 778fab423..000000000 --- a/bin/celery +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/sh - -set -e - -QUEUES="${QUEUES:-celery,machines,scripts,command,ping,probe,rules,deployments,mappings,networks,volumes}" -LOGLEVEL="INFO" -TASKS="${TASKS:-mist.api.celery_app}" -CONCURRENCY=${CONCURRENCY:-8} -POOL=${POOL:-prefork} - -USAGE="Usage: $0 [-h] [-q ] [-l ] [-t ] [-P ] - -Start uwsgi - -Options: - -h Show this help message and exit. - -q Comma separated list of queues to subscribe to. Defaults to - $QUEUES. - -l Log level. Defaults to $LOGLEVEL. - -t Tasks file to import. Defaults to $TASKS. - -c Number of children processes. Defaults to $CONCURRENCY. - -P Celery pool. Defaults to $POOL. -" - -while getopts "hq:l:t:c:P:" opt; do - case "$opt" in - h) - echo "$USAGE" - exit - ;; - q) - QUEUES=$OPTARG - ;; - l) - LOGLEVEL=$OPTARG - ;; - t) - TASKS=$OPTARG - ;; - c) - CONCURRENCY=$OPTARG - ;; - P) - POOL=$OPTARG - ;; - \?) - echo "Invalid option: -$OPTARG" >&2 - echo "$USAGE" >&2 - exit 1 - esac -done - -set -x -export C_FORCE_ROOT=1 -if [ "$POOL" = prefork ]; then - export CELERY_CONTEXT=1 -fi -exec celery worker \ - -A $TASKS \ - -Q $QUEUES \ - -l $LOGLEVEL \ - -c $CONCURRENCY \ - -P $POOL \ - -Ofair diff --git a/bin/celery-deployments b/bin/celery-deployments deleted file mode 100755 index 26e8aa269..000000000 --- a/bin/celery-deployments +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh - -set -e - -CONCURRENCY=${CONCURRENCY:-16} - -export QUEUES="${QUEUES:-deployments,mappings,command,scripts,celery}" - -exec ./bin/celery $@ diff --git a/bin/celery-gevent b/bin/celery-gevent deleted file mode 100755 index 4cf8fa034..000000000 --- a/bin/celery-gevent +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh - -set -e - -export QUEUES="${QUEUES:-rules,probe,ping}" -export POOL=gevent -export CONCURRENCY=${CONCURRENCY:-100} - -exec ./bin/celery $@ diff --git a/bin/celery-prefork b/bin/celery-prefork deleted file mode 100755 index 8685e8444..000000000 --- a/bin/celery-prefork +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh - -set -e - -export QUEUES="${QUEUES:-celery,machines,command,deployments,mappings,networks,volumes,scripts,zones,buckets}" - -exec ./bin/celery $@ diff --git a/bin/cilia b/bin/cilia deleted file mode 100755 index 9334ed952..000000000 --- a/bin/cilia +++ /dev/null @@ -1,43 +0,0 @@ -#!/bin/sh - -set -e - -LOGLEVEL="INFO" -TASKS="${TASKS:-mist.api.tasks}" - -USAGE="Usage: $0 [-h] [-l ] [-t ] - -Start cilia - -Options: - -h Show this help message and exit. - -l Log level. Defaults to $LOGLEVEL. - -t Tasks file to import. Defaults to $TASKS. -" - -while getopts "hl:t:" opt; do - case "$opt" in - h) - echo "$USAGE" - exit - ;; - l) - LOGLEVEL=$OPTARG - ;; - t) - TASKS=$OPTARG - ;; - \?) - echo "Invalid option: -$OPTARG" >&2 - echo "$USAGE" >&2 - exit 1 - esac -done - -set -x - -exec celery beat \ - -A $TASKS \ - -S mist.api.rules.schedulers.RuleScheduler \ - --pidfile= \ - -l $LOGLEVEL diff --git a/bin/docker-init b/bin/docker-init index ccc24743b..40fdf8ea3 100755 --- a/bin/docker-init +++ b/bin/docker-init @@ -14,7 +14,6 @@ if [ ! -e clean ]; then echo "{\"sha\":\"$VERSION_SHA\",\"name\":\"$VERSION_NAME\",\"repo\":\"$VERSION_REPO\",\"modified\":true}" > /mist-version.json pip install -e $DIR/../paramiko/ pip install -e $DIR/../libcloud/ - pip install -e $DIR/../celerybeat-mongo/ pip install -e $DIR/../src/ set +e diff /requirements-mist.api.txt $DIR/../requirements.txt diff --git a/bin/dramatiq b/bin/dramatiq index d2a1d1c1f..1a40ef0e9 100755 --- a/bin/dramatiq +++ b/bin/dramatiq @@ -2,17 +2,36 @@ import os import importlib - -os.system("pip install -U 'dramatiq[rabbitmq]'") +from mist.api import config BROKER = 'mist.api.dramatiq_app' -QUEUES = ['dramatiq_create_machine', - 'dramatiq_post_deploy_steps', - 'dramatiq_ssh_tasks', - 'dramatiq_mappings', - 'dramatiq_schedules'] +QUEUES = os.getenv('QUEUES', '').split(',') or [ + 'default', + 'provisioning', + 'scripts', + 'schedules', + 'rules', + 'polling', + 'mappings', + 'ping_probe', + 'ssh_probe'] + +POSSIBLE_MODULES = [] # 'mist.api.dramatiq_tasks', 'mist.rbac.dramatiq_tasks'] + +for root, dirs, files in os.walk("src"): + for file in files: + if file.endswith('tasks.py'): + module = os.path.join( + root, file[:-3]).split('src/')[1].replace('/', '.') + POSSIBLE_MODULES.append(module) + print(module) + +for root, dirs, files in os.walk("/opt"): + for file in files: + if file.endswith('tasks.py'): + print('.'.join(os.path.join(root, file[:-3]).split('/')[3:])) + -POSSIBLE_MODULES = ['mist.api.dramatiq_tasks', 'mist.rbac.dramatiq_tasks'] modules = [] for module in POSSIBLE_MODULES: @@ -23,6 +42,15 @@ for module in POSSIBLE_MODULES: modules.append(module) print(f" * {module}") +for plugin in config.PLUGINS: + module = 'mist.' + plugin + '.tasks' + try: + importlib.import_module(module) + except ModuleNotFoundError: + continue + modules.append(module) + print(f" * {module}") + print(f"Will execute command: dramatiq {BROKER} {' '.join(modules)} -Q {' '.join(QUEUES)}") os.system(f"dramatiq {BROKER} {' '.join(modules)} -Q {' '.join(QUEUES)}") diff --git a/bin/poller b/bin/poller deleted file mode 100755 index 2ebf8a955..000000000 --- a/bin/poller +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/sh - -set -e - -LOGLEVEL="INFO" -TASKS="${TASKS:-mist.api.tasks}" -SCHEDULER="${SCHEDULER:-mist.api.poller.schedulers.PollingScheduler}" - -USAGE="Usage: $0 [-h] [-l ] [-t ] [-s ] - -Start poller - -Options: - -h Show this help message and exit. - -l Log level. Defaults to $LOGLEVEL. - -t Tasks file to import. Defaults to $TASKS. - -s Scheduler class. Defaults to $SCHEDULER. -" - -while getopts "hl:t:s:" opt; do - case "$opt" in - h) - echo "$USAGE" - exit - ;; - l) - LOGLEVEL=$OPTARG - ;; - t) - TASKS=$OPTARG - ;; - s) - SCHEDULER=$OPTARG - ;; - \?) - echo "Invalid option: -$OPTARG" >&2 - echo "$USAGE" >&2 - exit 1 - esac -done - -set -x - -exec celery beat \ - -A $TASKS \ - -S $SCHEDULER \ - --pidfile= \ - -l $LOGLEVEL diff --git a/bin/scheduler b/bin/scheduler index 2be0ff5a1..628d77e24 100755 --- a/bin/scheduler +++ b/bin/scheduler @@ -1,43 +1,24 @@ -#!/bin/sh - -set -e - -LOGLEVEL="INFO" -TASKS="${TASKS:-mist.api.tasks}" - -USAGE="Usage: $0 [-h] [-l ] [-t ] - -Start scheduler - -Options: - -h Show this help message and exit. - -l Log level. Defaults to $LOGLEVEL. - -t Tasks file to import. Defaults to $TASKS. -" - -while getopts "hl:t:" opt; do - case "$opt" in - h) - echo "$USAGE" - exit - ;; - l) - LOGLEVEL=$OPTARG - ;; - t) - TASKS=$OPTARG - ;; - \?) - echo "Invalid option: -$OPTARG" >&2 - echo "$USAGE" >&2 - exit 1 - esac -done - -set -x - -exec celery beat \ - -A $TASKS \ - -S mist.api.schedules.models.UserScheduler \ - --pidfile= \ - -l $LOGLEVEL +#!/usr/bin/env python +import argparse + +from mist.api.scheduler import start + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Start task scheduler" + ) + parser.add_argument('--builtin', default=False, action='store_true') + parser.add_argument('--user', default=False, action='store_true') + parser.add_argument('--polling', default=False, action='store_true') + parser.add_argument('--rules', default=False, action='store_true') + args = parser.parse_args() + if not (args.builtin or args.user or args.polling or args.rules): + kwargs = {} + else: + kwargs = { + 'builtin': args.builtin, + 'user': args.user, + 'polling': args.polling, + 'rules': args.rules, + } + start(**kwargs) diff --git a/celerybeat-mongo b/celerybeat-mongo deleted file mode 160000 index 61fa5816c..000000000 --- a/celerybeat-mongo +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 61fa5816cb54392ce02610208bc87aacc6f1afa0 diff --git a/openapi/generate_api_spec.py b/openapi/generate_api_spec.py index 624b03329..ede76bc49 100755 --- a/openapi/generate_api_spec.py +++ b/openapi/generate_api_spec.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python import sys import yaml @@ -10,7 +10,7 @@ this_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(this_dir) -paths = ['src', 'libcloud', 'celerybeat-mongo'] +paths = ['src', 'libcloud'] for p in paths: sys.path.append(os.path.join(parent_dir, p)) BASE_FILE_PATH = os.path.join(this_dir, 'base.yml') diff --git a/requirements-frozen.txt b/requirements-frozen.txt index e1a837927..6ff92f89e 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -11,8 +11,6 @@ attrs==19.3.0 bcrypt==3.1.7 billiard==3.6.3.0 beautifulsoup4==4.9.3 -celery==4.4.7 -kombu==4.6.11 certifi==2019.11.28 cffi==1.14.0 Chameleon==3.6.2 @@ -41,6 +39,7 @@ Jinja2==2.11.3 jsonpatch==1.25 jsonpickle==1.3 jsonpointer==2.0 +kombu==4.6.11 Logbook==1.5.3 lxml==4.6.3 Mako==1.1.2 diff --git a/requirements.txt b/requirements.txt index 91e1e49cd..32fe6e10d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,14 +6,8 @@ ## ensures that the build won't break because of a new release of some ## dependency. -# amqp v2 requires explicit call to connect, and use of drain_events -# see https://github.com/celery/py-amqp/issues/105 -# compatible with kombu's requirement amqp<3.0 apscheduler -# ansible v2 has a incompatible api -# see https://serversforhackers.com/running-ansible-2-programmatically -celery>=4.0,<5.0 beautifulsoup4 dnspython dateparser @@ -43,6 +37,7 @@ pingparsing pretty prometheus-client pycryptodome +pylibmc pymongo pyyaml pyramid diff --git a/src/mist/api/auth/methods.py b/src/mist/api/auth/methods.py index d30f5a634..65116ce50 100644 --- a/src/mist/api/auth/methods.py +++ b/src/mist/api/auth/methods.py @@ -233,7 +233,8 @@ def reissue_cookie_session(request, user_id='', su='', org=None, after=0, raise Exception("Can not reissue an API token session.") if after: - revoke_token.apply_async(args=(session.token, ), countdown=after) + revoke_token.send_with_options( + args=(session.token, ), delay=after * 1000) else: session.invalidate() session.save() diff --git a/src/mist/api/auth/tasks.py b/src/mist/api/auth/tasks.py index 0d69b5e82..9ec6b810a 100644 --- a/src/mist/api/auth/tasks.py +++ b/src/mist/api/auth/tasks.py @@ -1,11 +1,10 @@ -from mist.api.celery_app import app - +from mist.api.dramatiq_app import dramatiq from mist.api.auth.models import AuthToken __all__ = ['revoke_token'] -@app.task +@dramatiq.actor(time_limit=20_000, max_retries=3) def revoke_token(token): auth_token = AuthToken.objects.get(token=token) auth_token.invalidate() diff --git a/src/mist/api/celery_app.py b/src/mist/api/celery_app.py deleted file mode 100644 index 0e0cb84d4..000000000 --- a/src/mist/api/celery_app.py +++ /dev/null @@ -1,19 +0,0 @@ -from celery import Celery - -from mist.api.config import CELERY_SETTINGS, PLUGINS - - -app = Celery('tasks') -app.conf.update(**CELERY_SETTINGS) -app.autodiscover_tasks([ - 'mist.api', - 'mist.api.poller', - 'mist.api.portal', - 'mist.api.monitoring', - 'mist.api.metering', - 'mist.api.rules', - 'mist.api.auth', -]) - -if PLUGINS: - app.autodiscover_tasks(['mist.%s' % _plugin for _plugin in PLUGINS]) diff --git a/src/mist/api/clouds/controllers/compute/controllers.py b/src/mist/api/clouds/controllers/compute/controllers.py index 467bf1690..ad20437e1 100644 --- a/src/mist/api/clouds/controllers/compute/controllers.py +++ b/src/mist/api/clouds/controllers/compute/controllers.py @@ -3333,9 +3333,13 @@ def _stop_machine(self, machine, node): return self.connection.stop_container(node) def _destroy_machine(self, machine, node): - if node.state == ContainerState.RUNNING: - self.connection.stop_container(node) - return self.connection.destroy_container(node) + try: + if node.state == ContainerState.RUNNING: + self.connection.stop_container(node) + return self.connection.destroy_container(node) + except Exception as e: + log.error('Destroy failed: %r' % e) + return False def _list_sizes__fetch_sizes(self): return [] @@ -3692,13 +3696,13 @@ def _list_machines__get_custom_size(self, node): updated = False try: _size = CloudSize.objects.get( - cloud=self.cloud, external_id=node['size'].get('id')) + cloud=self.cloud, external_id=node['size'].get('name')) except me.DoesNotExist: _size = CloudSize(cloud=self.cloud, - external_id=node['size'].get('id')) + external_id=node['size'].get('name')) updated = True - if _size.ram != node['size'].get('ram'): - _size.ram = node['size'].get('ram') + if int(_size.ram or 0) != int(node['size'].get('ram', 0)): + _size.ram = int(node['size'].get('ram')) updated = True if _size.cpus != node['size'].get('extra', {}).get('cpus'): _size.cpus = node['size'].get('extra', {}).get('cpus') diff --git a/src/mist/api/clouds/controllers/main/base.py b/src/mist/api/clouds/controllers/main/base.py index 8a7ddd78a..246e6b9bf 100644 --- a/src/mist/api/clouds/controllers/main/base.py +++ b/src/mist/api/clouds/controllers/main/base.py @@ -308,7 +308,7 @@ def update(self, fail_on_error=True, fail_on_invalid_params=True, try: schedule_id = str(ListImagesPollingSchedule.objects.get( cloud=self.cloud).id) - list_images.apply_async((schedule_id,)) + list_images.send(schedule_id,) except ListImagesPollingSchedule.DoesNotExist: pass @@ -353,10 +353,12 @@ def disable(self): # up the change to `self.cloud.enabled` in time to stop scheduling # further polling tasks. This may result in `missing_since` being reset # to `None`. For that, we schedule a task in the future to ensure that - # celery has executed all respective poller tasks first. + # dramatiq has executed all respective poller tasks first. from mist.api.tasks import set_missing_since, delete_periodic_tasks - set_missing_since.apply_async((self.cloud.id, ), countdown=30) - delete_periodic_tasks.apply_async((self.cloud.id, ), countdown=30) + set_missing_since.send_with_options( + args=(self.cloud.id, ), delay=30_000) + delete_periodic_tasks.send_with_options( + args=(self.cloud.id, ), delay=30_000) def dns_enable(self): self.cloud.dns_enabled = True @@ -482,7 +484,8 @@ def delete(self, expire=False): from mist.api.tasks import set_missing_since self.cloud.deleted = datetime.datetime.utcnow() self.cloud.save() - set_missing_since.apply_async((self.cloud.id, ), countdown=30) + set_missing_since.send_with_options( + args=(self.cloud.id, ), delay=30_000) def disconnect(self): self.compute.disconnect() diff --git a/src/mist/api/clouds/controllers/network/controllers.py b/src/mist/api/clouds/controllers/network/controllers.py index f0a04450a..ddab43c3a 100644 --- a/src/mist/api/clouds/controllers/network/controllers.py +++ b/src/mist/api/clouds/controllers/network/controllers.py @@ -240,7 +240,11 @@ def _list_networks__fetch_networks(self): from mist.api.machines.models import Machine hosts = Machine.objects(cloud=self.cloud, parent=None, missing_since=None) - loop = asyncio.get_event_loop() + try: + loop = asyncio.get_event_loop() + except RuntimeError: + asyncio.set_event_loop(asyncio.new_event_loop()) + loop = asyncio.get_event_loop() all_nets = loop.run_until_complete(self.list_networks_all_hosts(hosts, loop)) return [net for host_nets in all_nets for net in host_nets] diff --git a/src/mist/api/clouds/models.py b/src/mist/api/clouds/models.py index 9062c21bc..a6eab5ea6 100644 --- a/src/mist/api/clouds/models.py +++ b/src/mist/api/clouds/models.py @@ -183,6 +183,10 @@ def __init__(self, *args, **kwargs): def name(self): return self.title + @property + def org(self): + return self.owner + @property def provider(self): return self.ctl.provider diff --git a/src/mist/api/clouds/views.py b/src/mist/api/clouds/views.py index cce04603f..570efb25f 100644 --- a/src/mist/api/clouds/views.py +++ b/src/mist/api/clouds/views.py @@ -5,7 +5,7 @@ from mist.api.clouds.models import Cloud from mist.api.auth.methods import auth_context_from_request -from mist.api.dramatiq_tasks import dramatiq_async_session_update +from mist.api.tasks import async_session_update from mist.api.helpers import trigger_session_update from mist.api.helpers import view_config, params_from_request @@ -246,7 +246,7 @@ def add_cloud(request): if config.HAS_RBAC: owner.mapper.update( cloud, - callback=dramatiq_async_session_update, + callback=async_session_update, args=(owner.id, ['clouds'], ) ) diff --git a/src/mist/api/config.py b/src/mist/api/config.py index f6173375d..2f84b8be8 100755 --- a/src/mist/api/config.py +++ b/src/mist/api/config.py @@ -42,6 +42,7 @@ def dirname(path, num=1): CORE_URI = "http://localhost" LICENSE_KEY = "" AMQP_URI = "rabbitmq:5672" +MEMCACHED_HOST = ["memcached:11211"] BROKER_URL = "amqp://guest:guest@rabbitmq/" SSL_VERIFY = True THEME = "" @@ -1346,11 +1347,6 @@ def dirname(path, num=1): # celery settings CELERY_SETTINGS = { - 'broker_url': BROKER_URL, - # Disable heartbeats because celery workers & beat fail to actually send - # them and the connection dies. - 'broker_heartbeat': 0, - 'task_serializer': 'json', # Disable custom log format because we miss out on worker/task specific # metadata. # 'worker_log_format': PY_LOG_FORMAT, @@ -1359,11 +1355,8 @@ def dirname(path, num=1): 'worker_max_tasks_per_child': 32, 'worker_max_memory_per_child': 1024000, # 1024,000 KiB - 1000 MiB 'worker_send_task_events': True, - 'mongodb_scheduler_db': 'mist2', - 'mongodb_scheduler_collection': 'schedules', - 'mongodb_scheduler_url': MONGO_URI, - 'task_routes': { + 'task_routes': { # Command queue 'mist.api.tasks.ssh_command': {'queue': 'command'}, @@ -3145,15 +3138,6 @@ def dirname(path, num=1): TELEGRAF_TARGET = CORE_URI + '/ingress' -# Update celery settings. -CELERY_SETTINGS.update({ - 'broker_url': BROKER_URL, - 'mongodb_scheduler_url': MONGO_URI, - # Disable custom log format because we miss out on worker/task specific - # metadata. - # 'worker_log_format': PY_LOG_FORMAT, - # 'worker_task_log_format': PY_LOG_FORMAT, -}) _schedule = {} if VERSION_CHECK: _schedule['version-check'] = { @@ -3183,9 +3167,6 @@ def dirname(path, num=1): 'schedule': datetime.timedelta(hours=BACKUP_INTERVAL), } -if _schedule: - CELERY_SETTINGS.update({'beat_schedule': _schedule}) - # Configure libcloud to not verify certain hosts. if NO_VERIFY_HOSTS: diff --git a/src/mist/api/dramatiq_app.py b/src/mist/api/dramatiq_app.py index 30960fbb7..70a8d2475 100644 --- a/src/mist/api/dramatiq_app.py +++ b/src/mist/api/dramatiq_app.py @@ -2,11 +2,18 @@ from threading import local from time import perf_counter +# import mongoengine as me import dramatiq + from dramatiq.middleware import Middleware from dramatiq.brokers.rabbitmq import RabbitmqBroker +from dramatiq.results.backends import MemcachedBackend +from dramatiq.results import Results from mist.api import config +# from mist.api.poller.models import PollingSchedule +# from mist.api.rules.models import Rule +# from mist.api.schedules.models import Schedule log = logging.getLogger(__name__) @@ -17,8 +24,49 @@ class LoggingMiddleware(Middleware): state = local() def before_process_message(self, broker, message): - msg_id = '{}[{}]'.format(message.actor_name, message.message_id) - log.info('Starting task: %s', msg_id) + msg_id = '{}: {}{}'.format( + message.message_id, message.actor_name, str(message.args)) + msg = 'Starting task %s' % msg_id + + # try: + # sched = None + # if message._message.args: + # args = message._message.args + # if len(args) > 4: + # try: + # sched = Schedule.objects.get( + # name=args[2], owner=args[0], deleted=None) + # except Schedule.DoesNotExist: + # pass + # if not sched and args: + # try: + # sched = PollingSchedule.objects.get( + # id=message._message.args[0]) + # except me.ValidationError: + # try: + # sched = Rule.objects.get( + # id=message._message.args[0]) + # except Rule.DoesNotExist: + # log.warn("args: ", message._message, + # dir(message._message)) + # if getattr(sched, 'org', None): + # msg += "\nOrg: %s" % sched.org.name + # elif getattr(sched, 'cloud', None): + # msg += "\nCloud: %s\nOrg: %s" % ( + # sched.cloud.name, sched.cloud.org.name) + # elif getattr(sched, 'machine', None): + # msg += "\nMachine: %s\nCloud: %s\nOrg: %s" % ( + # sched.machine.name, sched.machine.cloud.name, + # sched.machine.org.name) + # elif getattr(sched, 'get_resources', None): + # msg += "\nSchedule: %s\nResources: %s" % ( + # sched.name, sched.get_resources()) + # else: + # msg += "\n%s - %s" % ( + # sched.__class__, getattr(sched, 'task', None) + # except Exception as exc: + # log.error('%r' % exc) + log.info(msg) self.state.msg_id = msg_id self.state.start = perf_counter() @@ -27,7 +75,7 @@ def after_process_message(self, broker, message, *, try: delta = perf_counter() - self.state.start outcome = 'Task failed' if exception else 'Completed task' - log.info("%s: %s - %.02fms elapsed.", outcome, + log.info("%s %s - %.02fms elapsed.", outcome, self.state.msg_id, delta * 1000) del self.state.start del self.state.msg_id @@ -37,6 +85,17 @@ def after_process_message(self, broker, message, *, after_skip_message = after_process_message +class MongoConnectMiddleware(Middleware): + """Connect to mongodb on worker boot""" + + def after_worker_boot(self, broker, worker): + from mist.api import mongo_connect + mongo_connect() + + broker = RabbitmqBroker(url=config.BROKER_URL + '?heartbeat=600') broker.add_middleware(LoggingMiddleware()) +broker.add_middleware(MongoConnectMiddleware()) +result_backend = MemcachedBackend(servers=config.MEMCACHED_HOST) +broker.add_middleware(Results(backend=result_backend)) dramatiq.set_broker(broker) diff --git a/src/mist/api/dramatiq_tasks.py b/src/mist/api/dramatiq_tasks.py deleted file mode 100644 index d106bf859..000000000 --- a/src/mist/api/dramatiq_tasks.py +++ /dev/null @@ -1,502 +0,0 @@ -import time -import datetime -import uuid -import logging -import importlib - -import secrets -import dramatiq - -import mongoengine as me - -from dramatiq import actor -from dramatiq.errors import Retry - -from paramiko.ssh_exception import SSHException - -from mist.api.clouds.models import Cloud -from mist.api.machines.models import Machine -from mist.api.schedules.models import Schedule -from mist.api.keys.models import Key -from mist.api.dns.models import RECORDS - -from mist.api.exceptions import MachineNotFoundError -from mist.api.exceptions import ServiceUnavailableError -from mist.api.exceptions import MachineUnavailableError - -from mist.api import config -from mist.api.dramatiq_app import broker -from mist.api.methods import probe_ssh_only -from mist.api.methods import notify_user, notify_admin -from mist.api.helpers import trigger_session_update -from mist.api.auth.methods import AuthContext -from mist.api.logs.methods import log_event -from mist.api.tag.methods import resolve_id_and_set_tags -from mist.api.monitoring.methods import enable_monitoring -from mist.api.shell import Shell -from mist.api.tasks import run_script -from mist.api.poller.models import ListMachinesPollingSchedule - - -logging.basicConfig( - level=config.PY_LOG_LEVEL, - format=config.PY_LOG_FORMAT, - datefmt=config.PY_LOG_FORMAT_DATE, -) -log = logging.getLogger(__name__) - -task_modules = [ - 'mist.api.tasks', - 'mist.api.auth.tasks', - 'mist.api.metering.tasks', - 'mist.api.portal.tasks', - 'mist.api.poller.tasks', - 'mist.api.rules.tasks', - 'mist.api.monitoring.tasks' -] -task_map = { - module: (lambda x: getattr(importlib.import_module(x), '__all__'))(module) - for module in task_modules -} - -actors = {} - -print('Loading task modules') -for task_module in task_map.keys(): - print(' * %s:' % task_module) - for task_name in task_map[task_module]: - task = getattr(importlib.import_module(task_module), task_name) - actors[task_name] = actor( - task, - actor_name=task_name, - time_limit=task.time_limit or 5 * 60 * 1000, # 5 minutes - max_retries=task.max_retries, - broker=broker, - queue_name='dramatiq_schedules' - ) - print(' - %s' % task_name) - - -def tmp_log_error(msg, *args): - log.error("Post deploy: %s" % msg, *args) - - -def tmp_log(msg, *args): - log.info("Post deploy: %s" % msg, *args) - - -@actor(queue_name="dramatiq_mappings", broker=broker) -def dramatiq_async_session_update(owner, sections=None): - if sections is None: - sections = [ - 'org', 'user', 'keys', 'clouds', 'stacks', - 'scripts', 'schedules', 'templates', 'monitoring' - ] - trigger_session_update(owner, sections) - - -@actor(queue_name="dramatiq_create_machine", broker=broker, max_retries=0) -def dramatiq_multicreate_async( - auth_context_serialized, plan, job_id=None, job=None -): - job_id = job_id or uuid.uuid4().hex - auth_context = AuthContext.deserialize(auth_context_serialized) - log_event(auth_context.owner.id, 'job', 'async_machine_creation_started', - user_id=auth_context.user.id, job_id=job_id, job=job, - **plan) - - messages = [] - name = plan['machine_name'] - quantity = plan['quantity'] - - if quantity == 1: - messages.append(dramatiq_create_machine_async.message( - auth_context_serialized, plan, job_id, job)) - else: - for _ in range(quantity): - temp_plan = plan.copy() - temp_plan['machine_name'] = name + '-' + secrets.token_hex(5) - messages.append(dramatiq_create_machine_async.message( - auth_context_serialized, temp_plan, job_id, job)) - - dramatiq.group(messages, broker=broker).run() - - -@actor(queue_name="dramatiq_create_machine", - broker=broker, - max_retries=0) -def dramatiq_create_machine_async( - auth_context_serialized, plan, job_id=None, job=None -): - - job_id = job_id or uuid.uuid4().hex - auth_context = AuthContext.deserialize(auth_context_serialized) - cloud = Cloud.objects.get(id=plan["cloud"]["id"]) - - log_event( - auth_context.owner.id, 'job', 'sending_create_machine_request', - job=job, job_id=job_id, cloud_id=plan['cloud']['id'], - machine_name=plan['machine_name'], user_id=auth_context.user.id,) - - try: - node = cloud.ctl.compute.create_machine(plan) - except Exception as exc: - error = f"Machine creation failed with exception: {str(exc)}" - tmp_log_error(error) - log_event( - auth_context.owner.id, 'job', 'machine_creation_finished', - job=job, job_id=job_id, cloud_id=plan['cloud']['id'], - machine_name=plan['machine_name'], user_id=auth_context.user.id, - error=error - ) - raise - - tmp_log('Overriding default polling interval') - schedule = ListMachinesPollingSchedule.objects.get( - cloud=plan['cloud']['id']) - schedule.add_interval(10, ttl=600) - schedule.save() - - for i in range(1, 11): - try: - machine = Machine.objects.get(cloud=cloud, machine_id=node.id) - break - except me.DoesNotExist: - time.sleep(i * 10) - else: - error = f"Machine with external_id: {node.id} was not found" - tmp_log_error(error) - log_event( - auth_context.owner.id, 'job', 'machine_creation_finished', - job=job, job_id=job_id, cloud_id=plan['cloud']['id'], - machine_name=plan['machine_name'], external_id=node.id, - user_id=auth_context.user.id, error=error - ) - raise MachineNotFoundError - - machine.assign_to(auth_context.user) - - if plan.get('expiration'): - try: - add_expiration_for_machine(auth_context, plan['expiration'], - machine) - except Exception as exc: - tmp_log_error('Got exception %s while adding expiration' - % str(exc)) - # Associate key. - if plan.get('key'): - try: - key = Key.objects.get(id=plan["key"]["id"]) - username = (plan['key'].get('user') or - plan.get('user') or - node.extra.get("username", "")) - # TODO port could be something else - machine.ctl.associate_key( - key, username=username, port=22, no_connect=True - ) - except Exception as exc: - tmp_log_error('Got exception %s in key association' - % str(exc)) - - if plan.get('tags'): - resolve_id_and_set_tags(auth_context.owner, 'machine', node.id, - plan['tags'], cloud_id=cloud.id) - - machine = Machine.objects.get(cloud=cloud, machine_id=node.id) - - # first_run is set to True becase poller has already - # logged an observation event for this machine - # and we don't want to send it again. - cloud.ctl.compute.produce_and_publish_patch({}, - [machine], - first_run=True - ) - - log_event( - auth_context.owner.id, 'job', 'machine_creation_finished', - job=job, job_id=job_id, cloud_id=plan['cloud']['id'], - machine_name=plan['machine_name'], external_id=node.id, - user_id=auth_context.user.id - ) - - dramatiq_post_deploy.send(auth_context_serialized, cloud.id, machine.id, - node.id, plan, job_id=job_id, job=job) - - -@actor(queue_name="dramatiq_post_deploy_steps", - broker=broker, - throws=(me.DoesNotExist, MachineUnavailableError)) -def dramatiq_post_deploy(auth_context_serialized, cloud_id, - machine_id, external_id, plan, - job_id=None, job=None): - - auth_context = AuthContext.deserialize(auth_context_serialized) - job_id = job_id or uuid.uuid4().hex - - tmp_log( - "Entering post deploy steps for %s %s %s", - auth_context.owner.id, - cloud_id, - machine_id, - ) - - try: - cloud = Cloud.objects.get(owner=auth_context.owner, id=cloud_id, - deleted=None) - except Cloud.DoesNotExist: - tmp_log_error("Cloud %s not found. Exiting", cloud_id) - raise me.DoesNotExist from None - - try: - machine = Machine.objects.get(cloud=cloud, machine_id=external_id) - except Machine.DoesNotExist: - tmp_log_error("Machine %s not found.Exiting", machine_id) - raise me.DoesNotExist from None - - msg = "Cloud:\n Name: %s\n Id: %s\n" % (cloud.title, cloud_id) - msg += "Machine:\n Name: %s\n Id: %s\n" % (machine.name, machine.id) - tmp_log("Machine found, proceeding to post deploy steps\n%s" % msg) - - if machine.state == 'terminated': - tmp_log_error("Machine %s terminated. Exiting", machine_id) - raise MachineUnavailableError - elif machine.state != 'running': - tmp_log_error("not running state") - raise Retry(delay=60000) - - ips = [ - ip for ip in machine.public_ips + machine.private_ips if ":" not in ip - ] - try: - host = ips[0] - except IndexError: - tmp_log_error("ip not found, retrying") - raise Retry(delay=60000) from None - - log_dict = { - "owner_id": auth_context.owner.id, - "event_type": "job", - "cloud_id": cloud_id, - "machine_id": machine_id, - "external_id": external_id, - "job_id": job_id, - "job": job, - "host": host, - "key_id": plan.get("key", {}).get("id"), - } - - add_schedules(auth_context, machine, log_dict, plan.get("schedules")) - - add_dns_record(auth_context, host, log_dict, plan.get("fqdn")) - - dramatiq_ssh_tasks.send(auth_context_serialized, cloud_id, - plan.get("key", {}).get("id"), host, external_id, - machine.name, machine_id, plan.get('scripts'), - log_dict, monitoring=plan.get('monitoring', False), - plugins=None, job_id=job_id, - username=None, password=None, port=22) - - -@actor(queue_name="dramatiq_ssh_tasks", broker=broker) -def dramatiq_ssh_tasks(auth_context_serialized, cloud_id, key_id, host, - external_id, machine_name, machine_id, scripts, - log_dict, monitoring=False, plugins=None, - job_id=None, username=None, password=None, port=22): - - auth_context = AuthContext.deserialize(auth_context_serialized) - try: - shell = Shell(host) - cloud_post_deploy(auth_context, cloud_id, shell, key_id, external_id, - machine_name, username=username, password=password, - port=port) - create_key_association(auth_context, shell, cloud_id, key_id, - machine_id, host, log_dict, username=username, - password=password, port=port) - run_scripts(auth_context, shell, scripts, cloud_id, host, machine_id, - machine_name, log_dict, job_id) - shell.disconnect() - except (ServiceUnavailableError, SSHException) as exc: - tmp_log_error(repr(exc)) - raise Retry(delay=60000) - - if monitoring: - try: - enable_monitoring(auth_context.owner, cloud_id, external_id, - no_ssh=False, dry=False, job_id=job_id, - plugins=plugins, deploy_async=False) - except Exception as e: - print(repr(e)) - notify_user( - auth_context.owner, - "Enable monitoring failed for machine %s" % machine_id, - repr(e) - ) - notify_admin('Enable monitoring on creation failed for ' - 'user %s machine %s: %r' - % (str(auth_context.owner), machine_id, e)) - log_event(action='enable_monitoring_failed', error=repr(e), - **log_dict) - log_event(action='post_deploy_finished', error=False, **log_dict) - - -def add_expiration_for_machine(auth_context, expiration, machine): - if expiration.get('notify'): - # convert notify value from datetime str to seconds - notify = datetime.datetime.strptime(expiration['date'], - '%Y-%m-%d %H:%M:%S') \ - - datetime.datetime.strptime(expiration['notify'], - '%Y-%m-%d %H:%M:%S') - expiration['notify'] = int(notify.total_seconds()) - params = { - "schedule_type": "one_off", - "description": "Scheduled to run when machine expires", - "schedule_entry": expiration.get("date"), - "action": expiration.get("action"), - "selectors": [{"type": "machines", "ids": [machine.id]}], - "task_enabled": True, - "notify": expiration.get("notify", ""), - "notify_msg": expiration.get("notify_msg", ""), - } - name = (f'{machine.name}-expiration-{machine.machine_id[:4]}' - f'-{secrets.token_hex(3)}') - machine.expiration = Schedule.add(auth_context, name, **params) - machine.save() - - -def add_schedules(auth_context, machine, log_dict, schedules): - schedules = schedules or [] - for schedule in schedules: - type_ = schedule.get('action') or 'script' - try: - name = (f'{machine.name}-{type_}-' - f'{machine.machine_id[:4]}-{secrets.token_hex(3)}') - tmp_log("Add scheduler entry %s", name) - schedule["selectors"] = [{"type": "machines", - "ids": [machine.id]}] - schedule_info = Schedule.add(auth_context, name, **schedule) - tmp_log("A new scheduler was added") - log_event( - action="add_schedule_entry", - scheduler=schedule_info.as_dict(), - **log_dict - ) - except Exception as e: - tmp_log_error("Exception occured %s", repr(e)) - error = repr(e) - notify_user( - auth_context.owner, - "Add scheduler entry failed for machine %s" - % machine.machine_id, - repr(e), - error=error, - ) - log_event( - action="add_schedule_entry", error=error, - **log_dict - ) - - -def add_dns_record(auth_context, host, log_dict, fqdn): - if fqdn: - kwargs = {} - try: - kwargs["name"] = fqdn - kwargs["type"] = "A" - kwargs["data"] = host - kwargs["ttl"] = 3600 - - dns_cls = RECORDS[kwargs["type"]] - dns_cls.add(owner=auth_context.owner, **kwargs) - log_event(action="create_A_record", hostname=fqdn, **log_dict) - tmp_log("Added A Record, fqdn: %s IP: %s", fqdn, host) - except Exception as exc: - log_event(action="create_A_record", hostname=fqdn, - error=str(exc), **log_dict) - - -def cloud_post_deploy(auth_context, cloud_id, shell, key_id, external_id, - machine_name, username=None, password=None, port=22): - try: - cloud_post_deploy_steps = config.CLOUD_POST_DEPLOY.get( - cloud_id, []) - except AttributeError: - cloud_post_deploy_steps = [] - for post_deploy_step in cloud_post_deploy_steps: - predeployed_key_id = post_deploy_step.get('key') - if predeployed_key_id and key_id: - # Use predeployed key to deploy the user selected key - shell.autoconfigure( - auth_context.owner, cloud_id, external_id, - predeployed_key_id, - username, password, port - ) - retval, output = shell.command( - 'echo %s >> ~/.ssh/authorized_keys' - % Key.objects.get(id=key_id).public) - if retval > 0: - notify_admin('Deploy user key failed for machine %s' - % machine_name) - command = post_deploy_step.get('script', '').replace( - '${node.name}', machine_name) - if command and key_id: - tmp_log('Executing cloud post deploy cmd: %s' % command) - shell.autoconfigure( - auth_context.owner, cloud_id, machine_name, - key_id, username, password, port - ) - retval, output = shell.command(command) - if retval > 0: - notify_admin('Cloud post deploy command `%s` failed ' - 'for machine %s' % (command, machine_name)) - - -def create_key_association(auth_context, shell, cloud_id, key_id, machine_id, - host, log_dict, username=None, password=None, - port=22): - if key_id: - # connect with ssh even if no command, to create association - # to be able to enable monitoring - tmp_log('attempting to connect to shell') - key_id, ssh_user = shell.autoconfigure( - auth_context.owner, cloud_id, machine_id, key_id, username, - password, port - ) - tmp_log('connected to shell') - result = probe_ssh_only(auth_context.owner, cloud_id, machine_id, - host=None, key_id=key_id, - ssh_user=ssh_user, shell=shell) - - log_dict['ssh_user'] = ssh_user - log_event(action='probe', result=result, **log_dict) - - -def run_scripts(auth_context, shell, scripts, cloud_id, host, machine_id, - machine_name, log_dict, job_id): - scripts = scripts or [] - for script in scripts: - if script.get('id'): - tmp_log('will run script_id %s', script['id']) - params = script.get('params', '') - ret = run_script.run( - auth_context.owner, script['id'], machine_id, - params=params, host=host, job_id=job_id - ) - tmp_log('executed script_id %s', script['id']) - elif script.get('inline'): - tmp_log('will run inline script') - log_event(action='script_started', command=script, - **log_dict) - start_time = time.time() - retval, output = shell.command(script['inline']) - tmp_log('executed script') - execution_time = time.time() - start_time - title = "Deployment script %s" % ('failed' if retval - else 'succeeded') - notify_user(auth_context.owner, title, cloud_id=cloud_id, - machine_id=machine_id, machine_name=machine_name, - command=script, output=output, duration=execution_time, - retval=retval, error=retval > 0) - log_event(action='script_finished', - error=retval > 0, return_value=retval, - command=script, stdout=output, - **log_dict) diff --git a/src/mist/api/dummy/rbac.py b/src/mist/api/dummy/rbac.py index a6ca06215..f707167e2 100644 --- a/src/mist/api/dummy/rbac.py +++ b/src/mist/api/dummy/rbac.py @@ -49,7 +49,7 @@ def _get_matching_constraints(self, rtype, action): def serialize(self): """This returns the basic context info in a dict of strings and can - safely be passed to celery tasks etc. To recreate the context, just + safely be passed to dramatiq tasks etc. To recreate the context, just feed it to AuthContext.deserialize""" return { 'user_id': self.user.id, diff --git a/src/mist/api/helpers.py b/src/mist/api/helpers.py index 03a23daeb..69fde70bc 100644 --- a/src/mist/api/helpers.py +++ b/src/mist/api/helpers.py @@ -1289,29 +1289,6 @@ def mac_verify(kwargs=None, key='', mac_len=0, mac_format='hex'): del kwargs[kw] -def maybe_submit_cloud_task(cloud, task_name): - """Decide whether a task should be submitted to celery - - This method helps us prevent submitting new celery tasks, which are - guaranteed to return/exit immediately without performing any actual - actions. - - For instance, such cases include async tasks for listing DNS zones - for clouds that have no DNS support or DNS is temporarily disabled. - - Note that this is just a helper method used to make an initial decision. - - The `cloud` argument must be a `mist.api.clouds.models.Cloud` mongoengine - objects and `task_name` must be the name/identifier of the corresponding - celery task. - - """ - if task_name == 'list_projects': - if cloud.ctl.provider != 'equinixmetal': - return False - return True - - def is_resource_missing(obj): """Return True if either resource or its parent is missing or has been deleted. Note that `obj` is meant to be a subclass of me.Document.""" diff --git a/src/mist/api/machines/methods.py b/src/mist/api/machines/methods.py index 5833b1df2..f8bea8572 100644 --- a/src/mist/api/machines/methods.py +++ b/src/mist/api/machines/methods.py @@ -646,23 +646,23 @@ def create_machine(auth_context, cloud_id, key_id, machine_name, location_id, if cloud.ctl.provider == Provider.AZURE.value: # for Azure, connect with the generated password, deploy the ssh key # when this is ok, it calls post_deploy for script/monitoring - mist.api.tasks.azure_post_create_steps.delay( + mist.api.tasks.azure_post_create_steps.send( auth_context.owner.id, cloud_id, node.id, monitoring, key_id, node.extra.get('username'), node.extra.get('password'), public_key, - script=script, - script_id=script_id, script_params=script_params, job_id=job_id, - hostname=hostname, plugins=plugins, post_script_id=post_script_id, + script=script, script_id=script_id, script_params=script_params, + job_id=job_id, hostname=hostname, plugins=plugins, + post_script_id=post_script_id, post_script_params=post_script_params, schedule=schedule, job=job, ) elif cloud.ctl.provider == Provider.OPENSTACK.value: if associate_floating_ip: networks = list_networks(auth_context.owner, cloud_id) - mist.api.tasks.openstack_post_create_steps.delay( + mist.api.tasks.openstack_post_create_steps.send( auth_context.owner.id, cloud_id, node.id, monitoring, key_id, node.extra.get('username'), node.extra.get('password'), public_key, script=script, script_id=script_id, - script_params=script_params, - job_id=job_id, job=job, hostname=hostname, plugins=plugins, + script_params=script_params, job_id=job_id, job=job, + hostname=hostname, plugins=plugins, post_script_params=post_script_params, networks=networks, schedule=schedule, ) @@ -670,7 +670,7 @@ def create_machine(auth_context, cloud_id, key_id, machine_name, location_id, # for Rackspace First Gen, cannot specify ssh keys. When node is # created we have the generated password, so deploy the ssh key # when this is ok and call post_deploy for script/monitoring - mist.api.tasks.rackspace_first_gen_post_create_steps.delay( + mist.api.tasks.rackspace_first_gen_post_create_steps.send( auth_context.owner.id, cloud_id, node.id, monitoring, key_id, node.extra.get('password'), public_key, script=script, script_id=script_id, script_params=script_params, @@ -680,7 +680,7 @@ def create_machine(auth_context, cloud_id, key_id, machine_name, location_id, ) else: - mist.api.tasks.post_deploy_steps.delay( + mist.api.tasks.post_deploy_steps.send( auth_context.owner.id, cloud_id, node.id, monitoring, script=script, key_id=key_id, script_id=script_id, script_params=script_params, job_id=job_id, job=job, port=ssh_port, @@ -2291,19 +2291,15 @@ def destroy_machine(user, cloud_id, machine_id): machine = Machine.objects.get(cloud=cloud_id, machine_id=machine_id) - if not machine.monitoring.hasmonitoring: - machine.ctl.destroy() - return - - # if machine has monitoring, disable it. the way we disable depends on - # whether this is a standalone io installation or not - try: - disable_monitoring(user, cloud_id, machine_id, no_ssh=True) - except Exception as exc: - log.warning("Didn't manage to disable monitoring, maybe the " - "machine never had monitoring enabled. Error: %r", exc) + # if machine has monitoring, disable it. + if machine.monitoring.hasmonitoring: + try: + disable_monitoring(user, cloud_id, machine_id, no_ssh=True) + except Exception as exc: + log.warning("Didn't manage to disable monitoring, maybe the " + "machine never had monitoring enabled. Error: %r", exc) - machine.ctl.destroy() + return machine.ctl.destroy() # SEC diff --git a/src/mist/api/machines/models.py b/src/mist/api/machines/models.py index 12e7acef3..a98f63d99 100644 --- a/src/mist/api/machines/models.py +++ b/src/mist/api/machines/models.py @@ -39,8 +39,8 @@ class InstallationStatus(me.EmbeddedDocument): # automatic: # - preparing: Set on first API call before everything else - # - pending: Enabled on mist.monitor, submitted celery task - # - installing: Celery task running + # - pending: Enabled on mist.monitor, submitted dramatiq task + # - installing: Dramatiq task running # - failed: Ansible job failed (also set finished_at) # - succeeded: Ansible job succeeded (also set finished_at) # manual: @@ -352,6 +352,10 @@ def __init__(self, *args, **kwargs): super(Machine, self).__init__(*args, **kwargs) self.ctl = MachineController(self) + @property + def org(self): + return self.owner + def clean(self): # Remove any KeyAssociation, whose `keypair` has been deleted. Do NOT # perform an atomic update on self, but rather remove items from the diff --git a/src/mist/api/machines/views.py b/src/mist/api/machines/views.py index ef4e7dc07..a50c29898 100644 --- a/src/mist/api/machines/views.py +++ b/src/mist/api/machines/views.py @@ -13,7 +13,7 @@ from mist.api.machines.models import Machine, KeyMachineAssociation from mist.api.clouds.methods import filter_list_clouds -from mist.api import tasks +from mist.api.tasks import create_machine_async, clone_machine_async from mist.api.auth.methods import auth_context_from_request from mist.api.helpers import view_config, params_from_request @@ -512,7 +512,8 @@ def create_machine(request): else: args = (auth_context.serialize(), ) + args kwargs.update({'quantity': quantity, 'persist': persist}) - tasks.create_machine_async.apply_async(args, kwargs, countdown=2) + create_machine_async.send_with_options( + args=args, kwargs=kwargs, delay=1_000) ret = {'job_id': job_id} ret.update({'job': job}) return ret @@ -875,10 +876,10 @@ def machine_actions(request): clone_async = True # False for debug ret = {} if clone_async: - argz = (auth_context.serialize(), machine.id, name) + args = (auth_context.serialize(), machine.id, name) kwargs = {'job': job, 'job_id': job_id} - tasks.clone_machine_async.apply_async(argz, kwargs, - countdown=2) + clone_machine_async.send_with_options( + args=args, kwargs=kwargs, delay=1_000) else: ret = getattr(machine.ctl, action)(name) ret.update({'job': job, 'job_id': job_id}) diff --git a/src/mist/api/metering/tasks.py b/src/mist/api/metering/tasks.py index 7efc61770..e790dcb62 100644 --- a/src/mist/api/metering/tasks.py +++ b/src/mist/api/metering/tasks.py @@ -2,8 +2,9 @@ import requests import datetime +from mist.api.dramatiq_app import dramatiq + from mist.api import config -from mist.api.celery_app import app from mist.api.rules.models import Rule from mist.api.machines.models import Machine from mist.api.monitoring.methods import get_stats @@ -29,7 +30,7 @@ def _skip_metering(machine): return False -@app.task +@dramatiq.actor def find_machine_cores(machine_id): """Decide on the number of vCPUs for all machines""" @@ -78,7 +79,7 @@ def _get_cores_from_libcloud_size(machine): log.error('Failed to get cores of machine %s: %r', machine.id, exc) -@app.task +@dramatiq.actor def push_metering_info(owner_id): """Collect and push new metering data to InfluxDB""" now = datetime.datetime.utcnow() diff --git a/src/mist/api/methods.py b/src/mist/api/methods.py index 1812659c6..c8e345d6f 100644 --- a/src/mist/api/methods.py +++ b/src/mist/api/methods.py @@ -201,7 +201,7 @@ def list_storage_accounts(owner, cloud_id): # TODO deprecate this! # We should decouple probe_ssh_only from ping. -# Use them as two separate functions instead & through celery +# Use them as two separate functions instead & through dramatiq def probe(owner, cloud_id, machine_id, host, key_id='', ssh_user=''): """Ping and SSH to machine and collect various metrics.""" diff --git a/src/mist/api/monitoring/graphite/methods.py b/src/mist/api/monitoring/graphite/methods.py index 76737d9ed..f85aac9fc 100644 --- a/src/mist/api/monitoring/graphite/methods.py +++ b/src/mist/api/monitoring/graphite/methods.py @@ -58,7 +58,7 @@ def get_stats(machine, start="", stop="", step="", metrics=None): istatus.activated_at = time.time() istatus.state = 'succeeded' machine.save() - add_nodata_rule.delay(machine.owner.id) + add_nodata_rule.send(machine.owner.id) trigger_session_update(machine.owner, ['monitoring']) break diff --git a/src/mist/api/monitoring/influxdb/handlers.py b/src/mist/api/monitoring/influxdb/handlers.py index e8d52127e..849a54b91 100644 --- a/src/mist/api/monitoring/influxdb/handlers.py +++ b/src/mist/api/monitoring/influxdb/handlers.py @@ -298,7 +298,7 @@ def _update_status(self, results): owner = self.machine.owner # FIXME Resolve circular imports. from mist.api.rules.tasks import add_nodata_rule - add_nodata_rule.delay(owner.id, 'influxdb') + add_nodata_rule.send(owner.id, 'influxdb') trigger_session_update(owner, ['monitoring']) return diff --git a/src/mist/api/monitoring/methods.py b/src/mist/api/monitoring/methods.py index 95de83942..251600f87 100644 --- a/src/mist/api/monitoring/methods.py +++ b/src/mist/api/monitoring/methods.py @@ -536,7 +536,7 @@ def enable_monitoring( # Install Telegraf func = mist.api.monitoring.tasks.install_telegraf if deploy_async: - func = func.delay + func = func.send func(machine.id, job, job_id, plugins) else: raise Exception("Invalid monitoring method") @@ -590,7 +590,7 @@ def disable_monitoring(owner, cloud_id, machine_id, no_ssh=False, job_id=""): "telegraf-victoriametrics" ): # Schedule undeployment of Telegraf. - mist.api.monitoring.tasks.uninstall_telegraf.delay( + mist.api.monitoring.tasks.uninstall_telegraf.send( machine.id, job, job_id ) if job_id: diff --git a/src/mist/api/monitoring/tasks.py b/src/mist/api/monitoring/tasks.py index 65a19837c..8d3a18403 100644 --- a/src/mist/api/monitoring/tasks.py +++ b/src/mist/api/monitoring/tasks.py @@ -2,7 +2,8 @@ import time import logging -from mist.api.celery_app import app + +from mist.api.dramatiq_app import dramatiq import mist.api.shell @@ -25,7 +26,7 @@ ] -@app.task(soft_time_limit=480, time_limit=600) +@dramatiq.actor(time_limit=600_000, max_retries=1, queue_name='scripts') def install_telegraf(machine_id, job=None, job_id=None, plugins=None): """Deploy Telegraf over SSH.""" machine = Machine.objects.get(id=machine_id) @@ -98,7 +99,7 @@ def install_telegraf(machine_id, job=None, job_id=None, plugins=None): trigger_session_update(machine.owner, ['monitoring']) -@app.task(soft_time_limit=480, time_limit=600) +@dramatiq.actor(time_limit=600_000, max_retries=1, queue_name='scripts') def uninstall_telegraf(machine_id, job=None, job_id=None): """Undeploy Telegraf.""" machine = Machine.objects.get(id=machine_id) @@ -140,7 +141,7 @@ def uninstall_telegraf(machine_id, job=None, job_id=None): trigger_session_update(machine.owner, ['monitoring']) -@app.task +@dramatiq.actor(time_limit=60_000, max_retries=1) def reset_traefik_config(): try: _get_config() diff --git a/src/mist/api/monitoring/victoriametrics/methods.py b/src/mist/api/monitoring/victoriametrics/methods.py index f408ac80f..4dee44c13 100644 --- a/src/mist/api/monitoring/victoriametrics/methods.py +++ b/src/mist/api/monitoring/victoriametrics/methods.py @@ -85,7 +85,7 @@ def get_stats(machine, start="", stop="", step="", metrics=None): istatus.activated_at = time.time() istatus.state = 'succeeded' machine.save() - add_nodata_rule.delay(machine.owner.id, 'victoriametrics') + add_nodata_rule.send(machine.owner.id, 'victoriametrics') trigger_session_update(machine.owner, ['monitoring']) break diff --git a/src/mist/api/poller/models.py b/src/mist/api/poller/models.py index 64bc3651e..2f5003acf 100644 --- a/src/mist/api/poller/models.py +++ b/src/mist/api/poller/models.py @@ -1,8 +1,6 @@ import logging import datetime -import celery - import mongoengine as me from mist.api import config @@ -53,13 +51,6 @@ class PollingSchedule(ShardedScheduleMixin, me.Document): # the `clean` method. name = me.StringField(unique=True) - # The following fields are defined in celerybeatmongo.models.PeriodicTask. - # Here, we define no fields in the base class, and expect subclasses to - # either define their fields, or simply use properties. - # task = me.StringField(required=True) - # args = me.ListField() - # kwargs = me.DictField() - # Scheduling information. Don't edit them directly, just use the model # methods. default_interval = me.EmbeddedDocumentField( @@ -127,14 +118,6 @@ def interval(self): interval = i return interval - @property - def schedule(self): - """Return a celery schedule instance - - This is used internally by celerybeatmongo scheduler - """ - return celery.schedules.schedule(self.interval.timedelta) - @property def expires(self): return None @@ -188,6 +171,10 @@ class OwnerPollingSchedule(PollingSchedule): owner = me.ReferenceField('Organization', reverse_delete_rule=me.CASCADE) + @property + def org(self): + return self.owner + @classmethod def add(cls, owner, run_immediately=True, interval=None, ttl=300): try: diff --git a/src/mist/api/poller/schedulers.py b/src/mist/api/poller/schedulers.py deleted file mode 100644 index e1ed3b48f..000000000 --- a/src/mist/api/poller/schedulers.py +++ /dev/null @@ -1,42 +0,0 @@ -from celerybeatmongo.schedulers import MongoScheduler - -from mist.api.sharding.mixins import ShardManagerMixin - -from mist.api.poller.models import PollingSchedule -from mist.api.poller.models import OwnerPollingSchedule -from mist.api.poller.models import CloudPollingSchedule -from mist.api.poller.models import MachinePollingSchedule - -import datetime - - -class PollingScheduler(MongoScheduler): - Model = PollingSchedule - UPDATE_INTERVAL = datetime.timedelta(seconds=20) - - -class OwnerPollingScheduler(MongoScheduler): - Model = OwnerPollingSchedule - UPDATE_INTERVAL = datetime.timedelta(seconds=20) - - -class CloudPollingScheduler(MongoScheduler): - Model = CloudPollingSchedule - UPDATE_INTERVAL = datetime.timedelta(seconds=20) - - -class MachinePollingScheduler(MongoScheduler): - Model = MachinePollingSchedule - UPDATE_INTERVAL = datetime.timedelta(seconds=20) - - -class ShardedOwnerScheduler(ShardManagerMixin, OwnerPollingScheduler): - pass - - -class ShardedCloudScheduler(ShardManagerMixin, CloudPollingScheduler): - pass - - -class ShardedMachineScheduler(ShardManagerMixin, MachinePollingScheduler): - pass diff --git a/src/mist/api/poller/tasks.py b/src/mist/api/poller/tasks.py index bbf1fdf17..d0f0378e8 100644 --- a/src/mist/api/poller/tasks.py +++ b/src/mist/api/poller/tasks.py @@ -1,7 +1,7 @@ import logging import datetime -from mist.api.celery_app import app +from mist.api.dramatiq_app import dramatiq from mist.api.methods import notify_user @@ -34,7 +34,7 @@ def autodisable_cloud(cloud): notify_user(cloud.owner, title=cloud, message=message, email_notify=True) -@app.task +@dramatiq.actor def debug(schedule_id): # FIXME: Resolve circular imports from mist.api.poller.models import DebugPollingSchedule @@ -46,7 +46,7 @@ def debug(schedule_id): fobj.write(msg) -@app.task(time_limit=280, soft_time_limit=255) +@dramatiq.actor(queue_name='polling', time_limit=280_000, max_age=30_000) def list_machines(schedule_id): """Perform list machines. Cloud controller stores results in mongodb.""" @@ -60,7 +60,7 @@ def list_machines(schedule_id): pass -@app.task(time_limit=160, soft_time_limit=155) +@dramatiq.actor(queue_name='polling', time_limit=160_000, max_age=30_000) def list_locations(schedule_id): """Perform list locations. Cloud controller stores results in mongodb.""" @@ -69,7 +69,7 @@ def list_locations(schedule_id): sched.cloud.ctl.compute.list_locations(persist=False) -@app.task(time_limit=60, soft_time_limit=55) +@dramatiq.actor(queue_name='polling', time_limit=60_000, max_age=30_000) def list_sizes(schedule_id): """Perform list sizes. Cloud controller stores results in mongodb.""" @@ -78,7 +78,7 @@ def list_sizes(schedule_id): sched.cloud.ctl.compute.list_sizes(persist=False) -@app.task(time_limit=60, soft_time_limit=55) +@dramatiq.actor(queue_name='polling', time_limit=60_000, max_age=30_000) def list_images(schedule_id): """Perform list images. Cloud controller stores results in mongodb.""" @@ -87,7 +87,7 @@ def list_images(schedule_id): sched.cloud.ctl.compute.list_images(persist=False) -@app.task(time_limit=60, soft_time_limit=55) +@dramatiq.actor(queue_name='polling', time_limit=60_000, max_age=30_000) def list_networks(schedule_id): """Perform list networks and subnets (inside list_networks). Cloud controller stores results in mongodb.""" @@ -97,7 +97,7 @@ def list_networks(schedule_id): sched.cloud.ctl.network.list_networks(persist=False) -@app.task(time_limit=60, soft_time_limit=55) +@dramatiq.actor(queue_name='polling', time_limit=60_000, max_age=30_000) def list_zones(schedule_id): """Perform list zones and records. Cloud controller stores results in mongodb. @@ -108,7 +108,7 @@ def list_zones(schedule_id): sched.cloud.ctl.dns.list_zones(persist=False) -@app.task(time_limit=60, soft_time_limit=55) +@dramatiq.actor(queue_name='polling', time_limit=60_000, max_age=30_000) def list_volumes(schedule_id): """Perform list volumes. Cloud controller stores results in mongodb.""" @@ -117,7 +117,7 @@ def list_volumes(schedule_id): sched.cloud.ctl.storage.list_volumes(persist=False) -@app.task(time_limit=60, soft_time_limit=55) +@dramatiq.actor(queue_name='polling', time_limit=60_000, max_age=300_000) def list_buckets(schedule_id): """ Perform list buckets. @@ -129,7 +129,7 @@ def list_buckets(schedule_id): sched.cloud.ctl.objectstorage.list_buckets(persist=False) -@app.task(time_limit=45, soft_time_limit=40) +@dramatiq.actor(queue_name='ping_probe', time_limit=45_000, max_age=30_000) def ping_probe(schedule_id): """Perform ping probe""" @@ -145,7 +145,7 @@ def ping_probe(schedule_id): log.error("Error while ping-probing %s: %r", sched.machine, exc) -@app.task(time_limit=45, soft_time_limit=40) +@dramatiq.actor(queue_name='ssh_probe', time_limit=45_000, max_age=30_000) def ssh_probe(schedule_id): """Perform ssh probe""" diff --git a/src/mist/api/portal/schedulers.py b/src/mist/api/portal/schedulers.py deleted file mode 100644 index be2ad64c8..000000000 --- a/src/mist/api/portal/schedulers.py +++ /dev/null @@ -1,20 +0,0 @@ -import datetime - -from celery.beat import PersistentScheduler, ScheduleEntry - - -class RunImmediatelyScheduleEntry(ScheduleEntry): - def _default_now(self): - """This function returns the default value for `last_run_at` - - We return a very old date to cause schedule entries to run immediately - if no `last_run_at` value is present. - - """ - if not self.total_run_count: - return datetime.datetime(2000, 1, 1) - return super(RunImmediatelyScheduleEntry, self)._default_now() - - -class RunImmediatelyPersistentScheduler(PersistentScheduler): - Entry = RunImmediatelyScheduleEntry diff --git a/src/mist/api/portal/tasks.py b/src/mist/api/portal/tasks.py index 633417079..b46a29109 100644 --- a/src/mist/api/portal/tasks.py +++ b/src/mist/api/portal/tasks.py @@ -2,7 +2,7 @@ import requests -from mist.api.celery_app import app +from mist.api.dramatiq_app import dramatiq from mist.api import config from mist.api.portal.models import Portal, AvailableUpgrade @@ -32,7 +32,7 @@ def get_version_params(portal=None): return params -@app.task +@dramatiq.actor def check_new_versions(url="https://mist.io/api/v1/version-check"): portal = Portal.get_singleton() params = get_version_params(portal) @@ -64,7 +64,7 @@ def get_usage_params(portal=None): return params -@app.task +@dramatiq.actor def usage_survey(url="https://mist.io/api/v1/usage-survey"): portal = Portal.get_singleton() params = get_usage_params(portal) diff --git a/src/mist/api/rules/methods.py b/src/mist/api/rules/methods.py index 55e33d71a..80757b3b6 100644 --- a/src/mist/api/rules/methods.py +++ b/src/mist/api/rules/methods.py @@ -11,7 +11,7 @@ from mist.api.rules.models import Rule from mist.api.rules.models import NoDataAction from mist.api.rules.models import NotificationAction -from mist.api.dramatiq_tasks import actors +from mist.api.rules.tasks import run_action_by_id log = logging.getLogger(__name__) @@ -57,7 +57,7 @@ def run_chained_actions(rule_id, incident_id, resource_id, resource_type, if not (triggered and triggered_now): action = rule.actions[0] if isinstance(action, NotificationAction): - actors['run_action_by_id'].send( + run_action_by_id.send( rule_id, incident_id, action.id, resource_id, resource_type, value, triggered, timestamp, ) @@ -65,8 +65,8 @@ def run_chained_actions(rule_id, incident_id, resource_id, resource_type, # Get a list of task signatures for every task, excluding the first one. tasks = [] - for action in rule.actions[1:]: - task = actors['run_action_by_id'].message( + for action in rule.actions: + task = run_action_by_id.message( rule_id, incident_id, action.id, resource_id, resource_type, value, triggered, timestamp, ) @@ -76,7 +76,6 @@ def run_chained_actions(rule_id, incident_id, resource_id, resource_type, # Buffer no-data alerts so that we can decide on false-positives. if isinstance(rule.actions[0], NoDataAction): delay = config.NO_DATA_ALERT_BUFFER_PERIOD * 1000 - + from mist.api.dramatiq_app import dramatiq # Apply all tasks in parallel - from dramatiq import group - group(tasks).run(delay=delay) + dramatiq.group(tasks).run(delay=delay) diff --git a/src/mist/api/rules/models/main.py b/src/mist/api/rules/models/main.py index 76b11ab38..428caa0f2 100644 --- a/src/mist/api/rules/models/main.py +++ b/src/mist/api/rules/models/main.py @@ -1,5 +1,4 @@ import uuid -import celery import mongoengine as me from mist.api import config @@ -98,11 +97,10 @@ class Rule(me.Document): # Disable the rule organization-wide. disabled = me.BooleanField(default=False) - # Fields passed to celerybeat as optional arguments. + # Fields passed to scheduler as optional arguments. queue = me.StringField() exchange = me.StringField() routing_key = me.StringField() - soft_time_limit = me.IntField() # Fields updated by the scheduler. last_run_at = me.DateTimeField() @@ -110,7 +108,7 @@ class Rule(me.Document): total_run_count = me.IntField(min_value=0, default=0) total_check_count = me.IntField(min_value=0, default=0) - # Field updated by celery workers. This is where celery workers keep state. + # Field updated by dramatiq workers. This is where workers keep state. states = me.MapField(field=me.EmbeddedDocumentField(RuleState)) meta = { @@ -193,6 +191,13 @@ def owner(self): """ return Organization.objects.get(id=self.owner_id) + @property + def org(self): + """Return the Organization (instance) owning self. + + """ + return self.owner + @property def plugin(self): """Return the instance of a backend plugin. @@ -206,19 +211,16 @@ def plugin(self): @property def name(self): - """Return the name of the celery task. - - This must be globally unique, since celerybeat-mongo uses schedule - names as keys of the dictionary of schedules to run. + """Return the name of the task. """ return 'Org(%s):Rule(%s)' % (self.owner_id, self.id) @property def task(self): - """Return the celery task to run. + """Return the dramatiq task to run. - This is the most basic celery task that should be used for most rule + This is the most basic dramatiq task that should be used for most rule evaluations. However, subclasses may provide their own property or class attribute based on their needs. @@ -227,12 +229,12 @@ class attribute based on their needs. @property def args(self): - """Return the args of the celery task.""" + """Return the args of the dramatiq task.""" return (self.id, ) @property def kwargs(self): - """Return the kwargs of the celery task.""" + """Return the kwargs of the dramatiq task.""" return {} @property @@ -242,22 +244,13 @@ def expires(self): @property def enabled(self): - """Return True if the celery task is currently enabled. + """Return True if the dramatiq task is currently enabled. Subclasses MAY override or extend this property. """ return not self.disabled - @property - def schedule(self): - """Return a celery schedule instance. - - Used internally by the scheduler. Subclasses MUST NOT override this. - - """ - return celery.schedules.schedule(self.frequency.timedelta) - def is_arbitrary(self): """Return True if self is arbitrary. diff --git a/src/mist/api/rules/schedulers.py b/src/mist/api/rules/schedulers.py deleted file mode 100644 index 796d4cfab..000000000 --- a/src/mist/api/rules/schedulers.py +++ /dev/null @@ -1,10 +0,0 @@ -from celerybeatmongo.schedulers import MongoScheduler - -from mist.api.rules.models import Rule - -import datetime - - -class RuleScheduler(MongoScheduler): - Model = Rule - UPDATE_INTERVAL = datetime.timedelta(seconds=20) diff --git a/src/mist/api/rules/tasks.py b/src/mist/api/rules/tasks.py index 94f3599eb..bf3713c50 100644 --- a/src/mist/api/rules/tasks.py +++ b/src/mist/api/rules/tasks.py @@ -1,6 +1,6 @@ import logging -from mist.api.celery_app import app +from mist.api.dramatiq_app import dramatiq from mist.api.helpers import get_resource_model from mist.api.helpers import rtype_to_classpath @@ -25,14 +25,14 @@ ] -@app.task +@dramatiq.actor def evaluate(rule_id): """Perform a full rule evaluation.""" rule = Rule.objects.get(id=rule_id) rule.ctl.evaluate(update_state=True, trigger_actions=True) -@app.task +@dramatiq.actor def add_nodata_rule(owner_id, backend='graphite'): """Idempotently setup a NoDataRule for the given Organization.""" try: @@ -42,8 +42,8 @@ def add_nodata_rule(owner_id, backend='graphite'): NoDataRule(owner_id=owner_id).ctl.auto_setup(backend=backend) -@app.task(bind=True, default_retry_delay=5, max_retries=3) -def run_action_by_id(self, rule_id, incident_id, action_id, +@dramatiq.actor(max_retries=3, max_age=60_000, queue_name='rules') +def run_action_by_id(rule_id, incident_id, action_id, resource_id, resource_type, value, triggered, timestamp): """Run a Rule's action asynchronously. @@ -76,16 +76,10 @@ def run_action_by_id(self, rule_id, incident_id, action_id, action.run(resource, value, triggered, timestamp, incident_id) except (ServiceUnavailableError, CloudUnavailableError) as err: # Catch errors due to SSH connectivity issues and the cloud provider's - # API being unresponsive. Log the failure if there are no more retries. - if self.request.retries >= self.max_retries: - _log_alert(resource, rule, value, triggered, - timestamp, incident_id, error=str(err)) - # Retry task with a linear back-off to minimize the chances of hitting - # the same error again. - countdown = (self.default_retry_delay * (self.request.retries + 1)) - # After max_retries have been exceeded, this will re-raise the original - # exception. - self.retry(exc=err, countdown=countdown) + # API being unresponsive. Log the failure even if it will be retried + _log_alert(resource, rule, value, triggered, + timestamp, incident_id, error=str(err)) + raise except MachineUnauthorizedError as err: # Catch exception, log it, and re-raise to improve auditing. Re-raising # the exception is important in order to stop the chain's execution. diff --git a/src/mist/api/scheduler.py b/src/mist/api/scheduler.py index 79d01efa9..6985681f1 100644 --- a/src/mist/api/scheduler.py +++ b/src/mist/api/scheduler.py @@ -1,24 +1,22 @@ +import datetime import logging import importlib import pytz from time import sleep -import dramatiq - from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.jobstores.base import JobLookupError from mist.api import config -from mist.api.dramatiq_app import broker from mist.api.models import Schedule from mist.api.poller.models import PollingSchedule from mist.api.rules.models import Rule log = logging.getLogger(__name__) -RELOAD_INTERVAL = 10 +RELOAD_INTERVAL = 5 def schedule_to_actor(schedule): @@ -28,17 +26,10 @@ def schedule_to_actor(schedule): task_path = schedule.task_type.task.split('.') method = task_path[-1] module = '.'.join(task_path[:-1]) - task = getattr(importlib.import_module(module), method) - return dramatiq.actor( - task, - queue_name="dramatiq_schedules", - time_limit=30 * 60 * 1000, # 30 minutes - max_retries=None, - broker=broker - ) + return getattr(importlib.import_module(module), method) -def add_job(scheduler, schedule, actor): +def add_job(scheduler, schedule, actor, first_run=False): job = { 'id': str(schedule.id), 'name': schedule.name, @@ -81,7 +72,11 @@ def add_job(scheduler, schedule, actor): ) else: log.error('Invalid task type: %s' % schedule.task_type._cls) - scheduler.add_job(actor.send, **job) + + new_job = scheduler.add_job(actor.send, **job) + if not first_run and schedule.run_immediately: + new_job.modify(next_run_time=datetime.datetime.now()) + return new_job def update_job(scheduler, schedule, actor, existing): @@ -99,8 +94,11 @@ def update_job(scheduler, schedule, actor, existing): else: if schedule.schedule_type.type == 'interval' and interval: # Update interval + delta = datetime.timedelta(**{ + schedule.schedule_type.period: schedule.schedule_type.every + }) if interval.total_seconds() != \ - schedule.schedule_type.schedule.seconds: + delta.total_seconds(): changes[schedule.schedule_type['period']] = \ schedule.schedule_type['every'] elif schedule.schedule_type.type == 'crontab': @@ -153,20 +151,13 @@ def load_config_schedules(scheduler): task_path = config._schedule[sched]['task'].split('.') method = task_path[-1] module = '.'.join(task_path[:-1]) - task = getattr(importlib.import_module(module), method) - actor = dramatiq.actor( - task, - queue_name="dramatiq_schedules", - time_limit=5 * 60 * 1000, # 5 minutes - max_retries=None, - broker=broker - ) + actor = getattr(importlib.import_module(module), method) interval = config._schedule[sched]['schedule'].total_seconds() scheduler.add_job( actor.send, trigger='interval', seconds=interval, name=sched) -def load_schedules_from_db(scheduler, schedules): +def load_schedules_from_db(scheduler, schedules, first_run=False): """ Load schedules from db """ old_schedule_ids = [] new_schedule_ids = [] @@ -181,7 +172,7 @@ def load_schedules_from_db(scheduler, schedules): if existing: # Update existing job update_job(scheduler, schedule, actor, existing) else: # Add new job - add_job(scheduler, schedule, actor) + add_job(scheduler, schedule, actor, first_run=first_run) # Cleanup deleted schedules for sid in old_schedule_ids: @@ -193,23 +184,46 @@ def load_schedules_from_db(scheduler, schedules): old_schedule_ids = new_schedule_ids -def start(): +def start(**kwargs): + if not kwargs.keys(): + kwargs['builtin'] = True + kwargs['user'] = True + kwargs['polling'] = True + kwargs['rules'] = True + # Init scheduler scheduler = BackgroundScheduler() # Load schedules from config - load_config_schedules(scheduler) + if kwargs.get('builtin'): + load_config_schedules(scheduler) try: # Start scheduler scheduler.start() + first_run = True while True: # Start main loop - log.info('Reloading schedules') - load_schedules_from_db(scheduler, Schedule.objects(deleted=False)) - log.info('Reloading polling schedules') - load_schedules_from_db(scheduler, PollingSchedule.objects()) - log.info('Reloading rules') - load_schedules_from_db(scheduler, Rule.objects()) + if kwargs.get('user'): + log.info('Reloading user schedules') + load_schedules_from_db( + scheduler, + Schedule.objects(deleted=False), + first_run=first_run + ) + if kwargs.get('polling'): + log.info('Reloading polling schedules') + load_schedules_from_db( + scheduler, + PollingSchedule.objects(), + first_run=first_run + ) + if kwargs.get('rules'): + log.info('Reloading rules') + load_schedules_from_db( + scheduler, + Rule.objects() + ) sleep(RELOAD_INTERVAL) + first_run = False except KeyboardInterrupt: import ipdb ipdb.set_trace() diff --git a/src/mist/api/schedules/models.py b/src/mist/api/schedules/models.py index c24176734..62c569c42 100644 --- a/src/mist/api/schedules/models.py +++ b/src/mist/api/schedules/models.py @@ -2,12 +2,11 @@ import datetime import logging from uuid import uuid4 -import celery.schedules + import mongoengine as me from mist.api.tag.models import Tag from mist.api.exceptions import BadRequestError from mist.api.users.models import Organization -from celerybeatmongo.schedulers import MongoScheduler from mist.api.exceptions import ScheduleNameExistsError from mist.api.exceptions import RequiredParameterMissingError from mist.api.selectors.models import SelectorClassMixin @@ -38,11 +37,6 @@ class Interval(BaseScheduleType): every = me.IntField(min_value=0, default=0, required=True) period = me.StringField(choices=PERIODS) - @property - def schedule(self): - return celery.schedules.schedule( - datetime.timedelta(**{self.period: self.every})) - @property def period_singular(self): return self.period[:-1] @@ -91,14 +85,6 @@ class Crontab(BaseScheduleType): day_of_month = me.StringField(default='*', required=True) month_of_year = me.StringField(default='*', required=True) - @property - def schedule(self): - return celery.schedules.crontab(minute=self.minute, - hour=self.hour, - day_of_week=self.day_of_week, - day_of_month=self.day_of_month, - month_of_year=self.month_of_year) - def __unicode__(self): def rfield(x): @@ -240,7 +226,6 @@ class Schedule(OwnershipMixin, me.Document, SelectorClassMixin): queue = me.StringField() exchange = me.StringField() routing_key = me.StringField() - soft_time_limit = me.IntField() # mist specific fields schedule_type = me.EmbeddedDocumentField(BaseScheduleType, required=True) @@ -370,27 +355,12 @@ def validate(self, clean=True): """ if isinstance(self.schedule_type, Crontab): - cronj_entry = self.schedule_type.as_dict() try: - for k, v in list(cronj_entry.items()): - if k == 'minute': - celery.schedules.crontab_parser(60).parse(v) - elif k == 'hour': - celery.schedules.crontab_parser(24).parse(v) - elif k == 'day_of_week': - celery.schedules.crontab_parser(7).parse(v) - elif k == 'day_of_month': - celery.schedules.crontab_parser(31, 1).parse(v) - elif k == 'month_of_year': - celery.schedules.crontab_parser(12, 1).parse(v) - else: - raise me.ValidationError( - 'You should provide valid period of time') - except celery.schedules.ParseException: - raise me.ValidationError('Crontab entry is not valid') - except Exception as exc: - raise me.ValidationError('Crontab entry is not valid:%s' - % str(exc)) + from apscheduler.triggers.cron import CronTrigger + CronTrigger.from_crontab(self.schedule_type.as_cron()) + except ValueError as exc: + raise me.ValidationError('Crontab validation failed: %s' % exc) + super(Schedule, self).validate(clean=True) def clean(self): @@ -435,26 +405,3 @@ def as_dict(self): } return sdict - - -class NonDeletedSchedule(object): - # NOTE This wrapper class is used by the UserScheduler. It allows to trick - # the scheduler by providing an interface similar to that of a mongoengine - # Document subclass in order to prevent schedules marked as deleted from - # being loaded. Similarly, we could have used a custom QuerySet manager to - # achieve this. However, subclasses of mongoengine models, which are not a - # direct subclass of the main `Document` class, do not fetch the documents - # of the corresponding superclass. In that case, we'd have to override the - # QuerySet class in a more exotic way, but there is no such need for now. - @classmethod - def objects(cls): - return Schedule.objects(deleted=None) - - @classmethod - def _get_collection(cls): - return Schedule._get_collection() - - -class UserScheduler(MongoScheduler): - Model = NonDeletedSchedule - UPDATE_INTERVAL = datetime.timedelta(seconds=20) diff --git a/src/mist/api/scripts/views.py b/src/mist/api/scripts/views.py index 179478d00..248913b24 100644 --- a/src/mist/api/scripts/views.py +++ b/src/mist/api/scripts/views.py @@ -462,11 +462,11 @@ def run_script(request): ) key_id = KeyMachineAssociation.objects(id=key_association_id)[0].key.id - tasks.run_script.delay(auth_context.owner.id, script.id, - machine.id, params=script_params, - env=env, su=su, job_id=job_id, job=job, - key_id=key_id, host=host, - username=username, port=port) + tasks.run_script.send(auth_context.owner.id, script.id, + machine.id, params=script_params, + env=env, su=su, job_id=job_id, job=job, + key_id=key_id, host=host, + username=username, port=port) return {'job_id': job_id, 'job': job} diff --git a/src/mist/api/sock.py b/src/mist/api/sock.py index 397244e96..908650794 100644 --- a/src/mist/api/sock.py +++ b/src/mist/api/sock.py @@ -334,7 +334,7 @@ def periodic_update_poller(self): def update_poller(self): """Increase polling frequency for all clouds""" - tasks.update_poller.delay(self.owner.id) + tasks.update_poller.send(self.owner.id) def update_user(self): self.send('user', get_user_data(self.auth_context)) diff --git a/src/mist/api/tasks.py b/src/mist/api/tasks.py index e7b7aec46..f1769f862 100644 --- a/src/mist/api/tasks.py +++ b/src/mist/api/tasks.py @@ -3,27 +3,31 @@ import uuid import logging import datetime +import secrets + import mongoengine as me from time import time, sleep import paramiko +from dramatiq.errors import Retry + from libcloud.compute.types import NodeState from libcloud.container.base import Container -from celery.exceptions import SoftTimeLimitExceeded - from paramiko.ssh_exception import SSHException from mist.api.exceptions import MistError, PolicyUnauthorizedError from mist.api.exceptions import ServiceUnavailableError +from mist.api.exceptions import MachineNotFoundError +from mist.api.exceptions import MachineUnavailableError + from mist.api.shell import Shell from mist.api.users.models import Owner, Organization from mist.api.clouds.models import Cloud, DockerCloud, CloudLocation, CloudSize from mist.api.networks.models import Network -from mist.api.dns.models import Zone from mist.api.volumes.models import Volume from mist.api.machines.models import Machine, KeyMachineAssociation from mist.api.images.models import CloudImage @@ -31,7 +35,7 @@ from mist.api.scripts.models import Script from mist.api.schedules.models import Schedule from mist.api.dns.models import RECORDS -from mist.api.keys.models import SSHKey +from mist.api.keys.models import SSHKey, Key from mist.api.tag.methods import add_tags_to_resource from mist.api.rules.models import NoDataRule @@ -57,9 +61,11 @@ from mist.api.logs.methods import log_event -from mist.api import config +from mist.api.tag.methods import resolve_id_and_set_tags -from mist.api.celery_app import app +from mist.api.dramatiq_app import dramatiq + +from mist.api import config logging.basicConfig(level=config.PY_LOG_LEVEL, @@ -88,7 +94,7 @@ ] -@app.task +@dramatiq.actor def ssh_command(owner_id, cloud_id, machine_id, host, command, key_id=None, username=None, password=None, port=22): @@ -104,8 +110,8 @@ def ssh_command(owner_id, cloud_id, machine_id, host, command, (machine_id, host), output) -@app.task(bind=True, default_retry_delay=3 * 60) -def post_deploy_steps(self, owner_id, cloud_id, machine_id, monitoring, +@dramatiq.actor(queue_name='provisioning', store_results=True) +def post_deploy_steps(owner_id, cloud_id, machine_id, monitoring, key_id=None, username=None, password=None, port=22, script_id='', script_params='', job_id=None, job=None, hostname='', plugins=None, script='', @@ -114,7 +120,6 @@ def post_deploy_steps(self, owner_id, cloud_id, machine_id, monitoring, # TODO: break into subtasks from mist.api.methods import connect_provider, probe_ssh_only from mist.api.methods import notify_user, notify_admin - from mist.api.keys.models import Key from mist.api.monitoring.methods import enable_monitoring job_id = job_id or uuid.uuid4().hex @@ -142,8 +147,9 @@ def tmp_log(msg, *args): msg = "Cloud:\n Name: %s\n Id: %s\n" % (cloud.title, cloud_id) msg += "Machine:\n Name: %s\n Id: %s\n" % (node.name, node.id) tmp_log('Machine found, proceeding to post deploy steps\n%s' % msg) - except: - raise self.retry(exc=Exception(), countdown=10, max_retries=10) + except Exception as e: + log.error('%r' % e) + raise e if node and isinstance(node, Container): node = cloud.ctl.compute.inspect_node(node) @@ -153,21 +159,21 @@ def tmp_log(msg, *args): ips = [ip for ip in node.public_ips + node.private_ips if ':' not in ip] if not ips: - raise self.retry(exc=Exception(), countdown=60, max_retries=20) + raise host = ips[0] else: tmp_log('ip not found, retrying') - raise self.retry(exc=Exception(), countdown=60, max_retries=20) + raise if node.state != NodeState.RUNNING: tmp_log('not running state') - raise self.retry(exc=Exception(), countdown=120, max_retries=30) + raise try: machine = Machine.objects.get(cloud=cloud, machine_id=machine_id, state__ne='terminated') except Machine.DoesNotExist: - raise self.retry(countdown=60, max_retries=60) + raise log_dict = { 'owner_id': owner.id, @@ -355,7 +361,7 @@ def tmp_log(msg, *args): except (ServiceUnavailableError, SSHException) as exc: tmp_log(repr(exc)) - raise self.retry(exc=exc, countdown=60, max_retries=15) + raise except Exception as exc: tmp_log(repr(exc)) if str(exc).startswith('Retry'): @@ -378,8 +384,8 @@ def tmp_log(msg, *args): ) -@app.task(bind=True, default_retry_delay=2 * 60) -def openstack_post_create_steps(self, owner_id, cloud_id, machine_id, +@dramatiq.actor(queue_name='provisioning', store_results=True) +def openstack_post_create_steps(owner_id, cloud_id, machine_id, monitoring, key_id, username, password, public_key, script='', script_id='', script_params='', job_id=None, @@ -401,7 +407,7 @@ def openstack_post_create_steps(self, owner_id, cloud_id, machine_id, break if node and node.state == 0 and len(node.public_ips): - post_deploy_steps.delay( + post_deploy_steps.send( owner.id, cloud_id, machine_id, monitoring, key_id, script=script, script_id=script_id, script_params=script_params, job_id=job_id, job=job, @@ -444,7 +450,7 @@ def openstack_post_create_steps(self, owner_id, cloud_id, machine_id, ext_net_id = networks['public'][0]['network_id'] conn.ex_create_floating_ip(ext_net_id, machine_port_id) - post_deploy_steps.delay( + post_deploy_steps.send( owner.id, cloud_id, machine_id, monitoring, key_id, script=script, script_id=script_id, script_params=script_params, @@ -454,14 +460,14 @@ def openstack_post_create_steps(self, owner_id, cloud_id, machine_id, ) except: - raise self.retry(exc=Exception(), max_retries=20) + raise except Exception as exc: if str(exc).startswith('Retry'): raise -@app.task(bind=True, default_retry_delay=2 * 60) -def azure_post_create_steps(self, owner_id, cloud_id, machine_id, monitoring, +@dramatiq.actor(queue_name='provisioning', store_results=True) +def azure_post_create_steps(owner_id, cloud_id, machine_id, monitoring, key_id, username, password, public_key, script='', script_id='', script_params='', job_id=None, job=None, hostname='', plugins=None, @@ -485,7 +491,7 @@ def azure_post_create_steps(self, owner_id, cloud_id, machine_id, monitoring, ips = [ip for ip in node.public_ips if ':' not in ip] host = ips[0] else: - raise self.retry(exc=Exception(), max_retries=20) + raise try: # login with user, password. Deploy the public key, enable sudo @@ -524,7 +530,7 @@ def azure_post_create_steps(self, owner_id, cloud_id, machine_id, monitoring, ssh.close() - post_deploy_steps.delay( + post_deploy_steps.send( owner.id, cloud_id, machine_id, monitoring, key_id, script=script, script_id=script_id, script_params=script_params, @@ -534,15 +540,15 @@ def azure_post_create_steps(self, owner_id, cloud_id, machine_id, monitoring, ) except Exception as exc: - raise self.retry(exc=exc, countdown=10, max_retries=15) + raise except Exception as exc: if str(exc).startswith('Retry'): raise -@app.task(bind=True, default_retry_delay=2 * 60) +@dramatiq.actor(queue_name='provisioning', store_results=True) def rackspace_first_gen_post_create_steps( - self, owner_id, cloud_id, machine_id, monitoring, key_id, password, + owner_id, cloud_id, machine_id, monitoring, key_id, password, public_key, username='root', script='', script_id='', script_params='', job_id=None, job=None, hostname='', plugins=None, post_script_id='', post_script_params='', schedule={}): @@ -565,7 +571,7 @@ def rackspace_first_gen_post_create_steps( ips = [ip for ip in node.public_ips if ':' not in ip] host = ips[0] else: - raise self.retry(exc=Exception(), max_retries=20) + raise try: # login with user, password and deploy the ssh public key. @@ -588,7 +594,7 @@ def rackspace_first_gen_post_create_steps( ssh.close() - post_deploy_steps.delay( + post_deploy_steps.send( owner.id, cloud_id, machine_id, monitoring, key_id, script=script, script_id=script_id, script_params=script_params, @@ -598,13 +604,12 @@ def rackspace_first_gen_post_create_steps( ) except Exception as exc: - raise self.retry(exc=exc, countdown=10, max_retries=15) - except Exception as exc: - if str(exc).startswith('Retry'): raise + except Exception as exc: + raise -@app.task +@dramatiq.actor(queue_name='provisioning', store_results=True) def clone_machine_async(auth_context_serialized, machine_id, name, job=None, job_id=None): from mist.api.exceptions import MachineCreationError @@ -677,7 +682,7 @@ def clone_machine_async(auth_context_serialized, machine_id, name, print('clone_machine_async: results: {}'.format(node)) -@app.task +@dramatiq.actor(queue_name='provisioning', store_results=True) def create_machine_async( auth_context_serialized, cloud_id, key_id, machine_name, location_id, image_id, size, image_extra, disk, @@ -788,17 +793,17 @@ def create_machine_wrapper(args_kwargs): print('create_machine_async: results: {}'.format(real_results)) -@app.task(bind=True, default_retry_delay=5, max_retries=3) -def send_email(self, subject, body, recipients, sender=None, bcc=None, +@dramatiq.actor(max_retries=3) +def send_email(subject, body, recipients, sender=None, bcc=None, html_body=None): if not helper_send_email(subject, body, recipients, sender=sender, bcc=bcc, attempts=1, html_body=html_body): - raise self.retry() + raise return True -@app.task +@dramatiq.actor(store_results=True) def group_machines_actions(owner_id, action, name, machines_uuids): """ Accepts a list of lists in form cloud_id,machine_id and pass them @@ -829,7 +834,7 @@ def group_machines_actions(owner_id, action, name, machines_uuids): } log_event(action='schedule_started', **log_dict) log.info('Schedule action started: %s', log_dict) - + tasks = [] for machine_uuid in machines_uuids: found = False _action = action @@ -849,27 +854,36 @@ def group_machines_actions(owner_id, action, name, machines_uuids): _action = 'stop' try: - run_machine_action.s(owner_id, _action, name, - machine_uuid)() + task = run_machine_action.message(owner_id, _action, name, + machine_uuid) + tasks.append(task) except Exception as exc: log_dict['error'] = '%s %r\n' % (log_dict.get('error', ''), exc) - - log_dict.update({'last_run_at': str(schedule.last_run_at or ''), - 'total_run_count': schedule.total_run_count or 0, - 'error': log_dict['error']} - ) + # Apply all tasks in parallel + from dramatiq import group + g = group(tasks).run() + g.wait(timeout=3600_000) + log_dict.update({ + 'last_run_at': str(schedule.last_run_at or ''), + 'total_run_count': schedule.total_run_count or 0, + 'error': log_dict['error'] + }) log_event(action='schedule_finished', **log_dict) if log_dict['error']: log.info('Schedule action failed: %s', log_dict) else: log.info('Schedule action succeeded: %s', log_dict) + + schedule.total_run_count += 1 + schedule.save() + owner = Owner.objects.get(id=owner_id) trigger_session_update(owner, ['schedules']) return log_dict -@app.task(soft_time_limit=3600, time_limit=3630) +@dramatiq.actor(time_limit=3_600_000) def run_machine_action(owner_id, action, name, machine_uuid): """ Calls specific action for a machine and log the info @@ -1012,7 +1026,7 @@ def run_machine_action(owner_id, action, name, machine_uuid): ) -@app.task +@dramatiq.actor(store_results=True) def group_run_script(owner_id, script_id, name, machines_uuids, params=''): """ Accepts a list of lists in form cloud_id,machine_id and pass them @@ -1046,15 +1060,19 @@ def group_run_script(owner_id, script_id, name, machines_uuids, params=''): log_event(action='schedule_started', **log_dict) log.info('Schedule started: %s', log_dict) - + tasks = [] for machine_uuid in machines_uuids: try: - run_script.s(owner_id, script_id, machine_uuid, - params=params, - job_id=job_id, job='schedule')() + task = run_script.message(owner_id, script_id, machine_uuid, + params=params, job_id=job_id, + job='schedule') + tasks.append(task) except Exception as exc: - log_dict['error'] = log_dict.get('error', '') + str(exc) + '\n' - + log_dict['error'] = "%s %r\n" % (log_dict.get('error', ''), exc) + # Apply all tasks in parallel + from dramatiq import group + g = group(tasks).run() + g.wait(timeout=3_600_000) log_dict.update({'last_run_at': str(schedule.last_run_at or ''), 'total_run_count': schedule.total_run_count or 0, 'error': log_dict['error']} @@ -1064,12 +1082,16 @@ def group_run_script(owner_id, script_id, name, machines_uuids, params=''): log.info('Schedule run_script failed: %s', log_dict) else: log.info('Schedule run_script succeeded: %s', log_dict) + + schedule.total_run_count += 1 + schedule.save() + owner = Owner.objects.get(id=owner_id) trigger_session_update(owner, ['schedules']) return log_dict -@app.task(soft_time_limit=3600, time_limit=3630) +@dramatiq.actor(time_limit=3_600_000, store_results=True) def run_script(owner, script_id, machine_uuid, params='', host='', key_id='', username='', password='', port=22, job_id='', job='', action_prefix='', su=False, env=""): @@ -1165,27 +1187,15 @@ def run_script(owner, script_id, machine_uuid, params='', host='', shell, params=params, job_id=ret.get('job_id') ) - with open(os.path.join( - os.path.dirname(os.path.dirname(os.path.dirname( - os.path.dirname(os.path.abspath(__file__))))), - 'run_script', 'run.py' - )) as fobj: - wscript = fobj.read() - - # check whether python exists + command = "chmod +x %s && %s %s" % (path, path, params) - exit_code, wstdout = shell.command("command -v python") - - if exit_code > 0: - command = "chmod +x %s && %s %s" % (path, path, params) - else: - command = "python - %s << EOF\n%s\nEOF\n" % (wparams, wscript) if su: command = "sudo sh -c '%s'" % command ret['command'] = command except Exception as exc: ret['error'] = str(exc) log_event(event_type='job', action=action_prefix + 'script_started', **ret) + ret.pop('command') log.info('Script started: %s', ret) if not ret['error']: try: @@ -1209,29 +1219,13 @@ def run_script(owner, script_id, machine_uuid, params='', host='', exit_code, wstdout = shell.command(command) shell.disconnect() wstdout = wstdout.replace('\r\n', '\n').replace('\r', '\n') - ret['wrapper_stdout'] = wstdout ret['exit_code'] = exit_code ret['stdout'] = wstdout - try: - parts = re.findall( - r'-----part-([^-]*)-([^-]*)-----\n(.*?)-----part-end-\2-----\n', # noqa - wstdout, re.DOTALL) - if parts: - randid = parts[0][1] - for part in parts: - if part[1] != randid: - raise Exception('Different rand ids') - for part in parts: - if part[0] == 'script': - ret['stdout'] = part[2] - elif part[0] == 'outfile': - ret['extra_output'] = part[2] - except Exception as exc: - pass if exit_code > 0: ret['error'] = 'Script exited with return code %s' % exit_code - except SoftTimeLimitExceeded: - ret['error'] = 'Script execution time limit exceeded' + # TODO: Fix for dramatiq + # except SoftTimeLimitExceeded: + # ret['error'] = 'Script execution time limit exceeded' except Exception as exc: ret['error'] = str(exc) log_event(event_type='job', action=action_prefix + 'script_finished', @@ -1263,7 +1257,7 @@ def run_script(owner, script_id, machine_uuid, params='', host='', return ret -@app.task +@dramatiq.actor def update_poller(org_id): org = Organization.objects.get(id=org_id) update_threshold = datetime.datetime.now() - datetime.timedelta( @@ -1305,9 +1299,9 @@ def update_poller(org_id): org.save() -@app.task +@dramatiq.actor def gc_schedulers(): - """Delete disabled celerybeat schedules. + """Delete disabled schedules. This takes care of: @@ -1317,9 +1311,7 @@ def gc_schedulers(): 3. Removing inactive no-data rules. They are added idempotently the first time get_stats receives data for a newly monitored machine. - Note that this task does not run GC on user-defined schedules. The - UserScheduler has its own mechanism for choosing which documents to - load. + Note that this task does not run GC on user-defined schedules. """ for collection in (PollingSchedule, NoDataRule, ): @@ -1334,16 +1326,16 @@ def gc_schedulers(): log.error(exc) -@app.task +@dramatiq.actor def set_missing_since(cloud_id): for Model in (Machine, CloudLocation, CloudSize, CloudImage, - Network, Volume, Bucket, Zone): + Network, Volume, Bucket): Model.objects(cloud=cloud_id, missing_since=None).update( missing_since=datetime.datetime.utcnow() ) -@app.task +@dramatiq.actor def delete_periodic_tasks(cloud_id): from mist.api.concurrency.models import PeriodicTaskInfo for section in ['machines', 'volumes', 'networks', 'zones', 'buckets']: @@ -1355,7 +1347,7 @@ def delete_periodic_tasks(cloud_id): pass -@app.task +@dramatiq.actor def create_backup(): """Create mongo backup if s3 creds are set. """ @@ -1405,7 +1397,7 @@ def create_backup(): config.BACKUP['bucket'], portal_host, dt)) -@app.task +@dramatiq.actor def async_session_update(owner, sections=None): if sections is None: sections = [ @@ -1413,3 +1405,413 @@ def async_session_update(owner, sections=None): 'scripts', 'schedules', 'templates', 'monitoring' ] trigger_session_update(owner, sections) + + +def tmp_log_error(msg, *args): + log.error("Post deploy: %s" % msg, *args) + + +def tmp_log(msg, *args): + log.info("Post deploy: %s" % msg, *args) + + +@dramatiq.actor(queue_name="provisioning", max_retries=0) +def multicreate_async_v2( + auth_context_serialized, plan, job_id=None, job=None +): + job_id = job_id or uuid.uuid4().hex + auth_context = AuthContext.deserialize(auth_context_serialized) + log_event(auth_context.owner.id, 'job', 'async_machine_creation_started', + user_id=auth_context.user.id, job_id=job_id, job=job, + **plan) + + messages = [] + name = plan['machine_name'] + quantity = plan['quantity'] + + if quantity == 1: + messages.append(create_machine_async_v2.message( + auth_context_serialized, plan, job_id, job)) + else: + for _ in range(quantity): + temp_plan = plan.copy() + temp_plan['machine_name'] = name + '-' + secrets.token_hex(5) + messages.append(create_machine_async_v2.message( + auth_context_serialized, temp_plan, job_id, job)) + + dramatiq.group(messages).run() + + +@dramatiq.actor(queue_name="provisioning", max_retries=0) +def create_machine_async_v2( + auth_context_serialized, plan, job_id=None, job=None +): + job_id = job_id or uuid.uuid4().hex + auth_context = AuthContext.deserialize(auth_context_serialized) + cloud = Cloud.objects.get(id=plan["cloud"]["id"]) + + log_event( + auth_context.owner.id, 'job', 'sending_create_machine_request', + job=job, job_id=job_id, cloud_id=plan['cloud']['id'], + machine_name=plan['machine_name'], user_id=auth_context.user.id,) + + try: + node = cloud.ctl.compute.create_machine(plan) + except Exception as exc: + error = f"Machine creation failed with exception: {str(exc)}" + tmp_log_error(error) + log_event( + auth_context.owner.id, 'job', 'machine_creation_finished', + job=job, job_id=job_id, cloud_id=plan['cloud']['id'], + machine_name=plan['machine_name'], user_id=auth_context.user.id, + error=error + ) + raise + + tmp_log('Overriding default polling interval') + schedule = ListMachinesPollingSchedule.objects.get( + cloud=plan['cloud']['id']) + schedule.add_interval(10, ttl=600) + schedule.save() + + for i in range(1, 11): + try: + machine = Machine.objects.get(cloud=cloud, machine_id=node.id) + break + except me.DoesNotExist: + sleep(i * 10) + else: + error = f"Machine with external_id: {node.id} was not found" + tmp_log_error(error) + log_event( + auth_context.owner.id, 'job', 'machine_creation_finished', + job=job, job_id=job_id, cloud_id=plan['cloud']['id'], + machine_name=plan['machine_name'], external_id=node.id, + user_id=auth_context.user.id, error=error + ) + raise MachineNotFoundError + + machine.assign_to(auth_context.user) + + if plan.get('expiration'): + try: + add_expiration_for_machine(auth_context, plan['expiration'], + machine) + except Exception as exc: + tmp_log_error('Got exception %s while adding expiration' + % str(exc)) + # Associate key. + if plan.get('key'): + try: + key = Key.objects.get(id=plan["key"]["id"]) + username = (plan['key'].get('user') or + plan.get('user') or + node.extra.get("username", "")) + # TODO port could be something else + machine.ctl.associate_key( + key, username=username, port=22, no_connect=True + ) + except Exception as exc: + tmp_log_error('Got exception %s in key association' + % str(exc)) + + if plan.get('tags'): + resolve_id_and_set_tags(auth_context.owner, 'machine', node.id, + plan['tags'], cloud_id=cloud.id) + + machine = Machine.objects.get(cloud=cloud, machine_id=node.id) + + # first_run is set to True becase poller has already + # logged an observation event for this machine + # and we don't want to send it again. + cloud.ctl.compute.produce_and_publish_patch({}, + [machine], + first_run=True + ) + + log_event( + auth_context.owner.id, 'job', 'machine_creation_finished', + job=job, job_id=job_id, cloud_id=plan['cloud']['id'], + machine_name=plan['machine_name'], external_id=node.id, + user_id=auth_context.user.id + ) + + post_deploy_v2.send(auth_context_serialized, cloud.id, machine.id, + node.id, plan, job_id=job_id, job=job) + + +@dramatiq.actor(queue_name="provisioning", + throws=(me.DoesNotExist, MachineUnavailableError)) +def post_deploy_v2(auth_context_serialized, cloud_id, machine_id, external_id, + plan, job_id=None, job=None): + + auth_context = AuthContext.deserialize(auth_context_serialized) + job_id = job_id or uuid.uuid4().hex + + tmp_log("Entering post deploy steps for %s %s %s", + auth_context.owner.id, cloud_id, machine_id) + + try: + cloud = Cloud.objects.get(owner=auth_context.owner, id=cloud_id, + deleted=None) + except Cloud.DoesNotExist: + tmp_log_error("Cloud %s not found. Exiting", cloud_id) + raise me.DoesNotExist from None + + try: + machine = Machine.objects.get(cloud=cloud, machine_id=external_id) + except Machine.DoesNotExist: + tmp_log_error("Machine %s not found.Exiting", machine_id) + raise me.DoesNotExist from None + + msg = "Cloud:\n Name: %s\n Id: %s\n" % (cloud.title, cloud_id) + msg += "Machine:\n Name: %s\n Id: %s\n" % (machine.name, machine.id) + tmp_log("Machine found, proceeding to post deploy steps\n%s" % msg) + + if machine.state == 'terminated': + tmp_log_error("Machine %s terminated. Exiting", machine_id) + raise MachineUnavailableError + elif machine.state != 'running': + tmp_log_error("not running state") + raise Retry(delay=60000) + + ips = [ + ip for ip in machine.public_ips + machine.private_ips if ":" not in ip + ] + try: + host = ips[0] + except IndexError: + tmp_log_error("ip not found, retrying") + raise Retry(delay=60000) from None + + log_dict = { + "owner_id": auth_context.owner.id, + "event_type": "job", + "cloud_id": cloud_id, + "machine_id": machine_id, + "external_id": external_id, + "job_id": job_id, + "job": job, + "host": host, + "key_id": plan.get("key", {}).get("id"), + } + + add_schedules(auth_context, machine, log_dict, plan.get("schedules")) + + add_dns_record(auth_context, host, log_dict, plan.get("fqdn")) + + ssh_tasks.send(auth_context_serialized, cloud_id, + plan.get("key", {}).get("id"), host, external_id, + machine.name, machine_id, plan.get('scripts'), log_dict, + monitoring=plan.get('monitoring', False), plugins=None, + job_id=job_id, username=None, password=None, port=22) + + +@dramatiq.actor(queue_name="provisioning") +def ssh_tasks(auth_context_serialized, cloud_id, key_id, host, external_id, + machine_name, machine_id, scripts, log_dict, monitoring=False, + plugins=None, job_id=None, username=None, password=None, + port=22): + from mist.api.methods import notify_user, notify_admin + from mist.api.monitoring.methods import enable_monitoring + auth_context = AuthContext.deserialize(auth_context_serialized) + try: + shell = Shell(host) + cloud_post_deploy(auth_context, cloud_id, shell, key_id, external_id, + machine_name, username=username, password=password, + port=port) + create_key_association(auth_context, shell, cloud_id, key_id, + machine_id, host, log_dict, username=username, + password=password, port=port) + run_scripts(auth_context, shell, scripts, cloud_id, host, machine_id, + machine_name, log_dict, job_id) + shell.disconnect() + except (ServiceUnavailableError, SSHException) as exc: + tmp_log_error(repr(exc)) + raise Retry(delay=60000) + + if monitoring: + try: + enable_monitoring(auth_context.owner, cloud_id, external_id, + no_ssh=False, dry=False, job_id=job_id, + plugins=plugins, deploy_async=False) + except Exception as e: + print(repr(e)) + notify_user( + auth_context.owner, + "Enable monitoring failed for machine %s" % machine_id, + repr(e) + ) + notify_admin('Enable monitoring on creation failed for ' + 'user %s machine %s: %r' + % (str(auth_context.owner), machine_id, e)) + log_event(action='enable_monitoring_failed', error=repr(e), + **log_dict) + log_event(action='post_deploy_finished', error=False, **log_dict) + + +def add_expiration_for_machine(auth_context, expiration, machine): + if expiration.get('notify'): + # convert notify value from datetime str to seconds + notify = datetime.datetime.strptime(expiration['date'], + '%Y-%m-%d %H:%M:%S') \ + - datetime.datetime.strptime(expiration['notify'], + '%Y-%m-%d %H:%M:%S') + expiration['notify'] = int(notify.total_seconds()) + params = { + "schedule_type": "one_off", + "description": "Scheduled to run when machine expires", + "schedule_entry": expiration.get("date"), + "action": expiration.get("action"), + "selectors": [{"type": "machines", "ids": [machine.id]}], + "task_enabled": True, + "notify": expiration.get("notify", ""), + "notify_msg": expiration.get("notify_msg", ""), + } + name = (f'{machine.name}-expiration-{machine.machine_id[:4]}' + f'-{secrets.token_hex(3)}') + machine.expiration = Schedule.add(auth_context, name, **params) + machine.save() + + +def add_schedules(auth_context, machine, log_dict, schedules): + from mist.api.methods import notify_user + schedules = schedules or [] + for schedule in schedules: + type_ = schedule.get('action') or 'script' + try: + name = (f'{machine.name}-{type_}-' + f'{machine.machine_id[:4]}-{secrets.token_hex(3)}') + tmp_log("Add scheduler entry %s", name) + schedule["selectors"] = [{"type": "machines", + "ids": [machine.id]}] + schedule_info = Schedule.add(auth_context, name, **schedule) + tmp_log("A new scheduler was added") + log_event( + action="add_schedule_entry", + scheduler=schedule_info.as_dict(), + **log_dict + ) + except Exception as e: + tmp_log_error("Exception occured %s", repr(e)) + error = repr(e) + notify_user( + auth_context.owner, + "Add scheduler entry failed for machine %s" + % machine.machine_id, + repr(e), + error=error, + ) + log_event( + action="add_schedule_entry", error=error, + **log_dict + ) + + +def add_dns_record(auth_context, host, log_dict, fqdn): + if fqdn: + kwargs = {} + try: + kwargs["name"] = fqdn + kwargs["type"] = "A" + kwargs["data"] = host + kwargs["ttl"] = 3600 + + dns_cls = RECORDS[kwargs["type"]] + dns_cls.add(owner=auth_context.owner, **kwargs) + log_event(action="create_A_record", hostname=fqdn, **log_dict) + tmp_log("Added A Record, fqdn: %s IP: %s", fqdn, host) + except Exception as exc: + log_event(action="create_A_record", hostname=fqdn, + error=str(exc), **log_dict) + + +def cloud_post_deploy(auth_context, cloud_id, shell, key_id, external_id, + machine_name, username=None, password=None, port=22): + from mist.api.methods import notify_admin + try: + cloud_post_deploy_steps = config.CLOUD_POST_DEPLOY.get( + cloud_id, []) + except AttributeError: + cloud_post_deploy_steps = [] + for post_deploy_step in cloud_post_deploy_steps: + predeployed_key_id = post_deploy_step.get('key') + if predeployed_key_id and key_id: + # Use predeployed key to deploy the user selected key + shell.autoconfigure( + auth_context.owner, cloud_id, external_id, + predeployed_key_id, + username, password, port + ) + retval, output = shell.command( + 'echo %s >> ~/.ssh/authorized_keys' + % Key.objects.get(id=key_id).public) + if retval > 0: + notify_admin('Deploy user key failed for machine %s' + % machine_name) + command = post_deploy_step.get('script', '').replace( + '${node.name}', machine_name) + if command and key_id: + tmp_log('Executing cloud post deploy cmd: %s' % command) + shell.autoconfigure( + auth_context.owner, cloud_id, machine_name, + key_id, username, password, port + ) + retval, output = shell.command(command) + if retval > 0: + notify_admin('Cloud post deploy command `%s` failed ' + 'for machine %s' % (command, machine_name)) + + +def create_key_association(auth_context, shell, cloud_id, key_id, machine_id, + host, log_dict, username=None, password=None, + port=22): + from mist.api.methods import probe_ssh_only + if key_id: + # connect with ssh even if no command, to create association + # to be able to enable monitoring + tmp_log('attempting to connect to shell') + key_id, ssh_user = shell.autoconfigure( + auth_context.owner, cloud_id, machine_id, key_id, username, + password, port + ) + tmp_log('connected to shell') + result = probe_ssh_only(auth_context.owner, cloud_id, machine_id, + host=None, key_id=key_id, + ssh_user=ssh_user, shell=shell) + + log_dict['ssh_user'] = ssh_user + log_event(action='probe', result=result, **log_dict) + + +def run_scripts(auth_context, shell, scripts, cloud_id, host, machine_id, + machine_name, log_dict, job_id): + from mist.api.methods import notify_user + scripts = scripts or [] + for script in scripts: + if script.get('id'): + tmp_log('will run script_id %s', script['id']) + params = script.get('params', '') + ret = run_script.run( + auth_context.owner, script['id'], machine_id, + params=params, host=host, job_id=job_id + ) + tmp_log('executed script_id %s', script['id']) + elif script.get('inline'): + tmp_log('will run inline script') + log_event(action='script_started', command=script, + **log_dict) + start_time = time() + retval, output = shell.command(script['inline']) + tmp_log('executed script') + execution_time = time() - start_time + title = "Deployment script %s" % ('failed' if retval + else 'succeeded') + notify_user(auth_context.owner, title, cloud_id=cloud_id, + machine_id=machine_id, machine_name=machine_name, + command=script, output=output, duration=execution_time, + retval=retval, error=retval > 0) + log_event(action='script_finished', + error=retval > 0, return_value=retval, + command=script, stdout=output, + **log_dict) diff --git a/src/mist/api/views.py b/src/mist/api/views.py index 313311143..abfa1731c 100755 --- a/src/mist/api/views.py +++ b/src/mist/api/views.py @@ -618,8 +618,7 @@ def register(request): # if user requested a demo then notify the mist.api team subject = "Demo request" body = "User %s has requested a demo\n" % user.email - tasks.send_email.delay(subject, body, - config.NOTIFICATION_EMAIL['demo']) + tasks.send_email.send(subject, body, config.NOTIFICATION_EMAIL['demo']) user.requested_demo = True user.demo_request_date = time() user.save() @@ -636,8 +635,7 @@ def register(request): subject = "Private beta request" body = "User %s <%s> has requested access to the private beta\n" % ( params.get('name').encode('utf-8', 'ignore'), email) - tasks.send_email.delay(subject, body, - config.NOTIFICATION_EMAIL['demo']) + tasks.send_email.send(subject, body, config.NOTIFICATION_EMAIL['demo']) msg = ( "Dear %s, we will contact you within 24 hours with more " @@ -2151,7 +2149,7 @@ def invite_member_to_team(request): trigger_session_update(auth_context.owner, ['org']) return return_val - tasks.send_email.delay(subject, body, user.email) + tasks.send_email.send(subject, body, user.email) ret.append(return_val) trigger_session_update(auth_context.owner, ['org']) @@ -2227,7 +2225,7 @@ def delete_member_from_team(request): except me.OperationError: raise TeamOperationError() # notify user that his invitation has been revoked - tasks.send_email.delay(subject, body, user.email) + tasks.send_email.send(subject, body, user.email) else: try: invitation.save() @@ -2277,7 +2275,7 @@ def delete_member_from_team(request): raise TeamOperationError() if user != auth_context.user: - tasks.send_email.delay(subject, body, user.email) + tasks.send_email.send(subject, body, user.email) trigger_session_update(auth_context.owner, ['org']) diff --git a/src/mist/api/volumes/views.py b/src/mist/api/volumes/views.py index 10e45dc1f..dac5d3dd3 100644 --- a/src/mist/api/volumes/views.py +++ b/src/mist/api/volumes/views.py @@ -18,7 +18,7 @@ from mist.api.exceptions import RequiredParameterMissingError from mist.api.exceptions import CloudUnauthorizedError, CloudUnavailableError -from mist.api.dramatiq_tasks import dramatiq_async_session_update +from mist.api.tasks import async_session_update from mist.api.helpers import params_from_request, view_config from mist.api.helpers import trigger_session_update @@ -165,7 +165,7 @@ def create_volume(request): if config.HAS_RBAC: owner.mapper.update( volume, - callback=dramatiq_async_session_update, + callback=async_session_update, args=(owner.id, ['volumes'], ) ) diff --git a/v2 b/v2 index 2e91633e9..d1d2d031f 160000 --- a/v2 +++ b/v2 @@ -1 +1 @@ -Subproject commit 2e91633e9c2f3e827cdc5cd04a4d807293c255f4 +Subproject commit d1d2d031f709c9c803e72909b0774bfcf3da2a48