Skip to content

Commit

Permalink
Feature/worker improvements (#278)
Browse files Browse the repository at this point in the history
* worker: self post model_settings
* Add fmcalc version info
* Update schemas
* Switch enable summaries report flag to disable
* Rename `KTOOLS_BATCH_COUNT` to `KTOOLS_NUM_PROCESSES`
* Keep /tmp/ clean in docker build
* Store release tag, github hash, publish date in docker images
* Fix tasks test
* return Platfrom version from connecting workers
* Fix version tag
* Log worker component versions at run
  • Loading branch information
sambles authored Dec 9, 2019
1 parent 97e2627 commit 129fd82
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 26 deletions.
5 changes: 3 additions & 2 deletions Dockerfile.api_server
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ ENV OASIS_DEBUG=false
RUN mkdir -p /var/log/oasis

RUN apt-get update && apt-get install -y --no-install-recommends vim libmariadbclient-dev-compat libspatialindex-dev && rm -rf /var/lib/apt/lists/*
COPY ./requirements.txt /tmp/
RUN pip install -r /tmp/requirements.txt
COPY ./requirements.txt ./
RUN pip install -r ./requirements.txt
RUN pip install mysqlclient

# Copy startup script + server config
Expand All @@ -30,6 +30,7 @@ COPY ./src/ ./src
COPY ./src/server /var/www/oasis/src/server
COPY ./src/common /var/www/oasis/src/common
COPY ./src/conf /var/www/oasis/src/conf
COPY ./VERSION /var/www/oasis/VERSION

COPY ./model_resource.json /var/www/oasis/src/server/static/model_resource.json
RUN OASIS_API_SECRET_KEY=supersecret python manage.py collectstatic --noinput
Expand Down
5 changes: 3 additions & 2 deletions Dockerfile.model_worker
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ RUN adduser --shell /bin/bash --disabled-password --gecos "" worker
WORKDIR /home/worker

# Install requirements
COPY ./requirements-worker.in /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt
COPY ./requirements-worker.in ./requirements.txt
RUN pip install -r ./requirements.txt

# Copy startup script + server config
COPY ./src/startup_worker.sh ./startup.sh
Expand All @@ -23,6 +23,7 @@ COPY ./src/model_execution_worker/ ./src/model_execution_worker/
COPY ./src/utils/ ./src/utils/
COPY ./src/utils/worker_bashrc /root/.bashrc
COPY ./tests/integration /home/worker/tests/integration
COPY ./VERSION ./

RUN mkdir -p /var/oasis && \
mkdir -p /var/log/oasis && \
Expand Down
1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
develop
5 changes: 3 additions & 2 deletions conf.ini
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[default]
MODEL_DATA_DIRECTORY = /var/oasis
MODEL_SETTINGS_FILE = /var/oasis/meta-data/model_settings.json
LOG_LEVEL = DEBUG
LOG_MAX_SIZE_IN_BYTES=10000
LOG_BACKUP_COUNT = 5
LOG_DIRECTORY = '/var/log/oasis'
KTOOLS_ERROR_GUARD = True
KTOOLS_BATCH_COUNT = -1
KTOOLS_NUM_PROCESSES = -1
KTOOLS_ALLOC_RULE_GUL = 1
KTOOLS_ALLOC_RULE_IL = 2
KTOOLS_ALLOC_RULE_RI = 3
Expand All @@ -30,6 +31,6 @@ TOKEN_REFRESH_ROTATE = True
OASISLMF_CONFIG = /var/oasis/oasislmf.json
DEBUG_MODE = False
KEEP_RUN_DIR = False
WRITE_EXPOSURE_SUMMARY = True
DISABLE_EXPOSURE_SUMMARY = False

[celery]
5 changes: 5 additions & 0 deletions jenkins/oasis_platform.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ node {
}
}
}
stage('Set version file'){
dir(oasis_workspace){
sh "echo ${env.TAG_RELEASE} - " + '$(git rev-parse --short HEAD), $(date) > VERSION'
}
}
parallel(
build_oasis_api_server: {
stage('Build: API server') {
Expand Down
62 changes: 54 additions & 8 deletions src/model_execution_worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
from __future__ import absolute_import

import glob
import json
import logging
import os
import shutil
import tarfile
import uuid
from contextlib import contextmanager, suppress
import subprocess

import fasteners
import tempfile

from contextlib import contextmanager, suppress

from celery import Celery, signature
from celery.task import task
from celery.signals import worker_ready

from oasislmf.cli.model import GenerateOasisFilesCmd, GenerateLossesCmd
from oasislmf.utils.status import OASIS_TASK_STATUS
from oasislmf.utils.exceptions import OasisException
from oasislmf.utils.log import oasis_log
from oasislmf.utils.status import OASIS_TASK_STATUS
from oasislmf import __version__ as mdk_version
from pathlib2 import Path

from ..conf import celeryconf as celery_conf
Expand All @@ -38,13 +42,15 @@

logging.info("Started worker")
logging.info("MODEL_DATA_DIRECTORY: {}".format(settings.get('worker', 'MODEL_DATA_DIRECTORY')))
logging.info("MODEL_SETTINGS_FILE: {}".format(settings.get('worker', 'MODEL_SETTINGS_FILE')))
logging.info("KTOOLS_ERROR_GUARD: {}".format(settings.get('worker', 'KTOOLS_ERROR_GUARD')))
logging.info("KTOOLS_BATCH_COUNT: {}".format(settings.get('worker', 'KTOOLS_BATCH_COUNT')))
logging.info("KTOOLS_NUM_PROCESSES: {}".format(settings.get('worker', 'KTOOLS_NUM_PROCESSES')))
logging.info("KTOOLS_ALLOC_RULE_GUL: {}".format(settings.get('worker', 'KTOOLS_ALLOC_RULE_GUL')))
logging.info("KTOOLS_ALLOC_RULE_IL: {}".format(settings.get('worker', 'KTOOLS_ALLOC_RULE_IL')))
logging.info("KTOOLS_ALLOC_RULE_RI: {}".format(settings.get('worker', 'KTOOLS_ALLOC_RULE_RI')))
logging.info("DEBUG_MODE: {}".format(settings.get('worker', 'DEBUG_MODE', fallback=False)))
logging.info("KEEP_RUN_DIR: {}".format(settings.get('worker', 'KEEP_RUN_DIR', fallback=False)))
logging.info("DISABLE_EXPOSURE_SUMMARY: {}".format(settings.get('worker', 'DISABLE_EXPOSURE_SUMMARY', fallback=False)))
logging.info("LOCK_RETRY_COUNTDOWN_IN_SECS: {}".format(settings.get('worker', 'LOCK_RETRY_COUNTDOWN_IN_SECS')))
logging.info("MEDIA_ROOT: {}".format(settings.get('worker', 'MEDIA_ROOT')))

Expand All @@ -63,17 +69,54 @@ def __exit__(self, exc_type, exc_value, traceback):
if not self.persist and os.path.isdir(self.name):
shutil.rmtree(self.name)

def get_model_settings():
""" Read the settings file from the path OASIS_MODEL_SETTINGS
returning the contents as a python dict (none if not found)
"""
settings_data = None
settings_fp = settings.get('worker', 'MODEL_SETTINGS_FILE', fallback=None)
try:
if os.path.isfile(settings_fp):
with open(settings_fp) as f:
settings_data = json.load(f)
except Exception as e:
logging.error("Failed to load Model settings: {}".format(e.message))

return settings_data



def get_worker_versions():
""" Search and return the versions of Oasis components
"""
ktool_ver_str = subprocess.getoutput('fmcalc -v')
plat_ver_file = '/home/worker/VERSION'

if os.path.isfile(plat_ver_file):
with open(plat_ver_file, 'r') as f:
plat_ver_str = f.read().strip()
else:
plat_ver_str = ""

return {"worker_verisons": {
"oasislmf": mdk_version,
"ktools": ktool_ver_str,
"platform": plat_ver_str
}}


# When a worker connects send a task to the worker-monitor to register a new model
@worker_ready.connect
def register_worker(sender, **k):
m_supplier = os.environ.get('OASIS_MODEL_SUPPLIER_ID')
m_name = os.environ.get('OASIS_MODEL_ID')
m_id = os.environ.get('OASIS_MODEL_VERSION_ID')
m_settings = get_model_settings()
m_version = get_worker_versions()
logging.info('register_worker: SUPPLIER_ID={}, MODEL_ID={}, VERSION_ID={}'.format(m_supplier, m_name, m_id))
signature(
'run_register_worker',
args=(m_supplier, m_name, m_id),
args=(m_supplier, m_name, m_id, m_settings, m_version),
queue='celery'
).delay()

Expand Down Expand Up @@ -161,7 +204,7 @@ def start_analysis_task(self, input_location, analysis_settings_file, complex_da
try:
logging.info("MEDIA_ROOT: {}".format(settings.get('worker', 'MEDIA_ROOT')))
logging.info("MODEL_DATA_DIRECTORY: {}".format(settings.get('worker', 'MODEL_DATA_DIRECTORY')))
logging.info("KTOOLS_BATCH_COUNT: {}".format(settings.get('worker', 'KTOOLS_BATCH_COUNT')))
logging.info("KTOOLS_NUM_PROCESSES: {}".format(settings.get('worker', 'KTOOLS_NUM_PROCESSES')))

self.update_state(state=RUNNING_TASK_STATUS)
output_location = start_analysis(
Expand Down Expand Up @@ -192,6 +235,8 @@ def start_analysis(analysis_settings_file, input_location, complex_data_files=No
"""
# Check that the input archive exists and is valid
logging.info("args: {}".format(str(locals())))
logging.info(str(get_worker_versions()))

media_root = settings.get('worker', 'MEDIA_ROOT')
input_archive = os.path.join(media_root, input_location)

Expand Down Expand Up @@ -221,7 +266,7 @@ def start_analysis(analysis_settings_file, input_location, complex_data_files=No
'--config', config_path,
'--model-run-dir', run_dir,
'--analysis-settings-json', analysis_settings_file,
'--ktools-num-processes', settings.get('worker', 'KTOOLS_BATCH_COUNT'),
'--ktools-num-processes', settings.get('worker', 'KTOOLS_NUM_PROCESSES'),
'--ktools-alloc-rule-gul', settings.get('worker', 'KTOOLS_ALLOC_RULE_GUL'),
'--ktools-alloc-rule-il', settings.get('worker', 'KTOOLS_ALLOC_RULE_IL'),
'--ktools-alloc-rule-ri', settings.get('worker', 'KTOOLS_ALLOC_RULE_RI'),
Expand Down Expand Up @@ -283,6 +328,7 @@ def generate_input(loc_file,
"""
logging.info("args: {}".format(str(locals())))
logging.info(str(get_worker_versions()))

media_root = settings.get('worker', 'MEDIA_ROOT')
location_file = os.path.join(media_root, loc_file)
Expand Down Expand Up @@ -314,8 +360,8 @@ def generate_input(loc_file,
if complex_data_files:
prepare_complex_model_file_inputs(complex_data_files, media_root, input_data_dir)
run_args += ['--user-data-dir', input_data_dir]
if settings.getboolean('worker', 'WRITE_EXPOSURE_SUMMARY', fallback=True):
run_args.append('--summarise-exposure')
if settings.getboolean('worker', 'DISABLE_EXPOSURE_SUMMARY', fallback=False):
run_args.append('--disable-summarise-exposure')

# Log MDK generate command
args_list = run_args + [''] if (len(run_args) % 2) else run_args
Expand Down
39 changes: 31 additions & 8 deletions src/server/oasisapi/analyses/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,57 @@

import uuid

from django.utils import timezone
from celery.utils.log import get_task_logger
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django.core.files import File
from django.http import HttpRequest
from django.utils import timezone
from six import StringIO

from src.server.oasisapi.files.models import RelatedFile
from src.server.oasisapi.files.views import handle_json_data
from src.server.oasisapi.schemas.serializers import ModelSettingsSerializer

from ..celery import celery_app
logger = get_task_logger(__name__)


@celery_app.task(name='run_register_worker')
def run_register_worker(m_supplier, m_name, m_id):
def run_register_worker(m_supplier, m_name, m_id, m_settings, m_version):
logger.info('model_supplier: {}, model_name: {}, model_id: {}'.format(m_supplier, m_name, m_id))
try:
from django.contrib.auth.models import User
from src.server.oasisapi.analysis_models.models import AnalysisModel
user = User.objects.get(username='admin')
new_model = AnalysisModel.objects.create(model_id=m_name,
supplier_id=m_supplier,
version_id=m_id,
creator=user)

try:
model = AnalysisModel.objects.get(
model_id=m_name,
supplier_id=m_supplier,
version_id=m_id
)
except ObjectDoesNotExist:
user = User.objects.get(username='admin')
model = AnalysisModel.objects.create(
model_id=m_name,
supplier_id=m_supplier,
version_id=m_id,
creator=user
)

if m_settings:
logger.info('Updating model settings')
request = HttpRequest()
request.data = {**m_settings, **m_version}
request.method = 'post'
request.user = model.creator
handle_json_data(model, 'resource_file', request, ModelSettingsSerializer)


# Log unhandled execptions
except Exception as e:
logger.exception(str(e))
logger.exception(new_model)
logger.exception(model)


@celery_app.task(name='run_analysis_success')
Expand Down
13 changes: 10 additions & 3 deletions src/server/oasisapi/schemas/analysis_settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
"type": "integer",
"multipleOf": 1,
"title": "Summary ID",
"description": "Identifier for the summary set."
"description": "Identifier for the summary set.",
"minimum": 1,
"maximum": 9
},
"oed_fields": {
"type": "array",
Expand Down Expand Up @@ -213,8 +215,13 @@
"title": "Reinsurance net loss summary outputs",
"description": "Specified which outputs should be generated for which summary sets, for reinsurance net losses.",
"$ref": "#/definitions/output_summaries"
},
"full_correlation": {
"type": "boolean",
"title": "Produce fully correlated output",
"description": "If true generate losses for fully correlated output, i.e. no independence between groups, in addition to losses for default output.",
"default": false
}

},
"required": [
"source_tag",
Expand All @@ -226,5 +233,5 @@
"model_settings",
"gul_output",
"gul_summaries"
]
]
}
24 changes: 24 additions & 0 deletions src/server/oasisapi/schemas/model_settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,30 @@
}
}
}
},
"worker_versions":{
"type": "object",
"uniqueItems": false,
"title": "Worker component versions",
"description": "OasisLMF versions deployed in the model runner",
"properties":{
"oasislmf": {
"type": "string",
"title": "oasislmf package",
"description": "MDK python package version"
},
"ktools": {
"type": "string",
"title": "Ktools build version",
"description": "Reports the version info from the default fmcalc in path"
},
"platform": {
"type": "string",
"title": "Base worker Tag",
"description": "The OasisPlatform tag used in the based model_worker image"
}
},
"required": ["oasislmf", "ktools", "platform"]
}
},
"required": ["model_settings", "lookup_settings"]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_custom_model_runner_does_not_exist___generate_losses_is_called_output_f
'--config', get_oasislmf_config_path(settings.get('worker', 'model_id')),
'--model-run-dir', ANY,
'--analysis-settings-json', 'analysis_settings.json',
'--ktools-num-processes', settings.get('worker', 'KTOOLS_BATCH_COUNT'),
'--ktools-num-processes', settings.get('worker', 'KTOOLS_NUM_PROCESSES'),
'--ktools-alloc-rule-gul', settings.get('worker', 'KTOOLS_ALLOC_RULE_GUL'),
'--ktools-alloc-rule-il', settings.get('worker', 'KTOOLS_ALLOC_RULE_IL'),
'--ktools-alloc-rule-ri', settings.get('worker', 'KTOOLS_ALLOC_RULE_RI'),
Expand Down

0 comments on commit 129fd82

Please sign in to comment.