Skip to content

Cleaned simple kubernetes manager #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ FROM python:3.11 AS builder
WORKDIR /build
COPY . .

RUN pip install .
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential gcc python3-dev pkg-config && \
rm -rf /var/lib/apt/lists/* && \
pip install --upgrade pip setuptools wheel build

RUN python3 setup.py sdist && \
pip install dist/*.tar.gz --target /install
RUN python -m build && \
pip install dist/*.tar.gz --prefix=/install

# minimal runtime image
# runtime stage
FROM python:3.11-slim AS runtime

COPY --from=builder /install /usr/local/lib/python3.11/site-packages
# copy installed Python libs + CLI scripts
COPY --from=builder /install /usr/local

COPY etc/*.json etc/*.yaml /etc/villas/controller/
COPY villas-controller.service /etc/systemd/system/

Expand Down
14 changes: 14 additions & 0 deletions etc/config_simplekub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
components:
- type: generic
category: manager
name: Generic Manager
location: VM Iris
uuid: eddb51a0-557b-4848-ac7a-faccc7c51fa3

- category: manager
type: kubernetes-simple
name: Simple Kubernetes Manager
location: VM Iris
uuid: 4f8fb73e-7e74-11eb-8f63-f3ccc3ab82f6
namespace: villas-controller
4 changes: 2 additions & 2 deletions etc/params_k8s_dpsim.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ properties:
name: dpsim
spec:
suspend: true
activeDeadlineSeconds: 120 # kill the Job after 1h
activeDeadlineSeconds: 3600 # kill the Job after 1h
backoffLimit: 0 # only try to run pod once, no retries
ttlSecondsAfterFinished: 120 # delete the Job resources 1h after completion
ttlSecondsAfterFinished: 3600 # delete the Job resources 1h after completion
template:
spec:
restartPolicy: Never
Expand Down
13 changes: 10 additions & 3 deletions villas/controller/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ def load_schema(self):
fo = resources.open_text(pkg, res)
loadedschema = yaml.load(fo, yaml.SafeLoader)

schema[name] = Draft202012Validator(loadedschema)
try:
Draft202012Validator.check_schema(loadedschema)
schema[name] = loadedschema
except jsonschema.exceptions.SchemaError:
self.logger.warning("Schema is invalid!")

return schema

Expand Down Expand Up @@ -148,7 +152,7 @@ def status(self):
**self.headers
},
'schema': {
name: v.schema for name, v in self.schema.items()
name: v for name, v in self.schema.items()
}
}

Expand Down Expand Up @@ -277,12 +281,15 @@ def from_dict(dict):

def publish_status(self):
if not self.mixin:
self.logger.warn('No mixin!')
return

self.mixin.publish(self.status, headers=self.headers)

def publish_status_periodically(self):
self.logger.info('Start state publish thread')
self.logger.info('Start state publish thread, initial status: %s',
self.status)
self.publish_status() # publish the first update immediately

while not self.publish_status_thread_stop.wait(
self.publish_status_interval):
Expand Down
4 changes: 3 additions & 1 deletion villas/controller/components/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def from_dict(dict):
if type == 'kubernetes':
from villas.controller.components.managers import kubernetes
return kubernetes.KubernetesManager(**dict)
if type == 'kubernetes-simple':
from villas.controller.components.managers import kubernetes_simple
return kubernetes_simple.KubernetesManagerSimple(**dict)
if type == 'villas-node':
from villas.controller.components.managers import villas_node # noqa E501
return villas_node.VILLASnodeManager(**dict)
Expand All @@ -43,7 +46,6 @@ def from_dict(dict):
def add_component(self, comp):
if comp.uuid in self.mixin.components:
existing_comp = self.mixin.components[comp.uuid]

raise SimulationException(self, 'Component with same UUID ' +
'already exists!',
component=existing_comp)
Expand Down
54 changes: 18 additions & 36 deletions villas/controller/components/managers/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,30 @@ class KubernetesManager(Manager):
def __init__(self, **args):
super().__init__(**args)

self.thread_stop = threading.Event()

self.pod_watcher_thread = threading.Thread(
target=self._run_pod_watcher)
self.job_watcher_thread = threading.Thread(
target=self._run_job_watcher)
self.event_watcher_thread = threading.Thread(
target=self._run_event_watcher)

if os.environ.get('KUBECONFIG'):
k8s.config.load_kube_config()
else:
k8s.config.load_incluster_config()

self.namespace = args.get('namespace', 'default')
# the namespace in which to create the jobs
# and to watch for events
self.namespace = os.environ.get('NAMESPACE')
if self.namespace:
self.namespace = ''.join([self.namespace, '-controller'])
else:
self.namespace = 'villas-controller'

self._check_namespace(self.namespace)

self.my_namespace = os.environ.get('NAMESPACE')
# name and UID of the pod in which this controller is running
# used in kubernetes simulator to set the owner reference
self.my_pod_name = os.environ.get('POD_NAME')
self.my_pod_uid = os.environ.get('POD_UID')

self._check_namespace(self.namespace)
self.thread_stop = threading.Event()

# self.pod_watcher_thread.start()
# self.job_watcher_thread.start()
self.event_watcher_thread = threading.Thread(
target=self._run_event_watcher)
self.event_watcher_thread.setDaemon(True)
self.event_watcher_thread.start()

Expand All @@ -59,28 +59,6 @@ def _check_namespace(self, ns):

raise RuntimeError(f'Namespace {ns} does not exist')

def _run_pod_watcher(self):
w = k8s.watch.Watch()
c = k8s.client.CoreV1Api()

for sts in w.stream(c.list_namespaced_pod,
namespace=self.namespace):
stso = sts.get('object')
typ = sts.get('type')

self.logger.info('%s Pod: %s', typ, stso.metadata.name)

def _run_job_watcher(self):
w = k8s.watch.Watch()
b = k8s.client.BatchV1Api()

for sts in w.stream(b.list_namespaced_job,
namespace=self.namespace):
stso = sts.get('object')
typ = sts.get('type')

self.logger.info('%s Job: %s', typ, stso.metadata.name)

def _run_event_watcher(self):
while not self.thread_stop.is_set():
w = k8s.watch.Watch()
Expand All @@ -107,6 +85,10 @@ def _run_event_watcher(self):

if _match(comp.job.metadata.name,
eo.involved_object.name):
if comp._state == 'stopping':
# incoming events are old repetitions
continue

if eo.reason == 'Completed':
comp.change_state('stopping', True)
elif eo.reason == 'Started':
Expand Down
79 changes: 79 additions & 0 deletions villas/controller/components/managers/kubernetes_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from villas.controller.components.managers.kubernetes import KubernetesManager
from villas.controller.components.simulators.kubernetes import KubernetesJob


def build_parameters(sim_name, jobname, image, adls=3600,
privileged=False, name=None, uuid=None):
parameters = {
'type': 'kubernetes',
'category': 'simulator',
'uuid': uuid,
'name': name or sim_name,
'properties': {
'job': {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {
'name': jobname
},
'spec': {
'activeDeadlineSeconds': adls,
'backoffLimit': 2,
'template': {
'spec': {
'restartPolicy': 'Never',
'containers': [
{
'image': image,
'imagePullPolicy': 'Always',
'name': 'jobcontainer',
'securityContext': {
'privileged': privileged
}
}
]
}
}
}
}
}
}
return parameters


class KubernetesManagerSimple(KubernetesManager):

def __init__(self, **args):
super().__init__(**args)

def create(self, payload):
params = payload.get('parameters', {})
sim_name = payload.get('name', 'Kubernetes Simulator')
jobname = params.get('jobname', 'noname')
adls = params.get('activeDeadlineSeconds', 3600)
if type(adls) is str:
adls = int(adls)
image = params.get('image')
name = params.get('name')
privileged = params.get('privileged', False)
uuid = params.get('uuid')
self.logger.info('uuid:')
self.logger.info(uuid)

if image is None:
self.logger.error('No image given, will try super.create')
super().create(payload)
return

parameters = build_parameters(
sim_name=sim_name,
jobname=jobname,
image=image,
adls=adls,
privileged=privileged,
name=name,
uuid=uuid
)

comp = KubernetesJob(self, **parameters)
self.add_component(comp)
5 changes: 3 additions & 2 deletions villas/controller/components/simulators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ def start(self, payload):
self.properties['job_name'] = self.job.metadata.name
self.properties['namespace'] = self.manager.namespace

def stop(self, payload):
def stop(self, message):
self.change_state('stopping', True)
self._delete_job()

self.change_state('idle')

def _send_signal(self, sig):
Expand Down Expand Up @@ -227,6 +227,7 @@ def resume(self, payload):
self.change_state('running')

def reset(self, payload):
self.change_state('resetting', True)
self._delete_job()
super().reset(payload)

Expand Down
2 changes: 2 additions & 0 deletions villas/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def _drain_publish_queue(self):
self.producer.publish(body, **kwargs)
except queue.Empty:
pass
except TimeoutError:
LOGGER.warn('TimeoutError, let kombu reconnect..')
Copy link
Preview

Copilot AI Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use LOGGER.warning instead of the deprecated LOGGER.warn for consistency with the Python logging API.

Suggested change
LOGGER.warn('TimeoutError, let kombu reconnect..')
LOGGER.warning('TimeoutError, let kombu reconnect..')

Copilot uses AI. Check for mistakes.

Comment on lines +87 to +88
Copy link
Preview

Copilot AI Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Include the caught exception in the log message to aid debugging, e.g., LOGGER.warning('TimeoutError occurred, reconnecting: %s', err).

Suggested change
except TimeoutError:
LOGGER.warn('TimeoutError, let kombu reconnect..')
except TimeoutError as err:
LOGGER.warn('TimeoutError occurred, let kombu reconnect: %s', err)

Copilot uses AI. Check for mistakes.


def on_iteration(self):
# Drain publish queue
Expand Down
Empty file.
33 changes: 33 additions & 0 deletions villas/controller/schemas/manager/kubernetes-simple/create.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
$schema: http://json-schema.org/draft-04/schema#

type: object
title: 'Simple Kubernetes Job'
required:
- image
properties:
name:
type: string
title: 'Simulator Name'
default: 'Kubernetes Simulator'
uuid:
type: string
title: UUID
default: 8dfd03b2-1c78-11ec-9621-0242ac130002
jobname:
type: string
title: 'Jobname'
default: myjob
activeDeadlineSeconds:
type: number
title: activeDeadlineSeconds
default: 3600
image:
type: string
title: Image
default: perl
privileged:
type: boolean
title: Privileged
default: false
description: 'WARNING: If true, the container has root privileges on the host'