Skip to content

Commit

Permalink
Rebase from master.
Browse files Browse the repository at this point in the history
  • Loading branch information
DailyDreaming committed Jan 30, 2020
2 parents 068a3cd + 422a73c commit 56ad626
Show file tree
Hide file tree
Showing 22 changed files with 216 additions and 75 deletions.
27 changes: 14 additions & 13 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ before_script:
- docker info
- cat /etc/hosts
- export PYTHONIOENCODING=utf-8
- mkdir -p ~/.kube && cp "$GITLAB_SECRET_FILE_KUBE_CONFIG" ~/.kube/config
- mkdir -p ~/.aws && cp "$GITLAB_SECRET_FILE_AWS_CREDENTIALS" ~/.aws/credentials
after_script:
# We need to clean up any files that Toil may have made via Docker that
# aren't deletable by the Gitlab user. If we don't do this, Gitlab will try
Expand All @@ -29,11 +31,6 @@ py2_batch_systems:
- pwd
- apt update && DEBIAN_FRONTEND=noninteractive apt install -y tzdata && apt install -y jq
- virtualenv -p python2.7 venv && . venv/bin/activate && make prepare && make develop extras=[all] && pip install htcondor awscli==1.16.272
# Get Kubernetes credentials
- mkdir -p ~/.kube
- cp "$GITLAB_SECRET_FILE_KUBE_CONFIG" ~/.kube/config
- mkdir -p ~/.aws
- cp "$GITLAB_SECRET_FILE_AWS_CREDENTIALS" ~/.aws/credentials
- python -m pytest -r s src/toil/test/batchSystems/batchSystemTest.py
- python -m pytest -r s src/toil/test/mesos/MesosDataStructuresTest.py

Expand Down Expand Up @@ -92,8 +89,6 @@ py2_integration_jobstore:
- export TOIL_AWS_ZONE=us-west-2a
# This reads GITLAB_SECRET_FILE_SSH_KEYS
- python setup_gitlab_ssh.py
- mkdir -p ~/.aws
- cp "$GITLAB_SECRET_FILE_AWS_CREDENTIALS" ~/.aws/credentials
- python -m pytest src/toil/test/jobStores/jobStoreTest.py

py2_integration_sort:
Expand All @@ -107,8 +102,6 @@ py2_integration_sort:
- export TOIL_AWS_ZONE=us-west-2a
# This reads GITLAB_SECRET_FILE_SSH_KEYS
- python setup_gitlab_ssh.py
- mkdir -p ~/.aws
- cp "$GITLAB_SECRET_FILE_AWS_CREDENTIALS" ~/.aws/credentials
- python -m pytest src/toil/test/sort/sortTest.py
- python -m pytest src/toil/test/provisioners/clusterScalerTest.py

Expand All @@ -123,8 +116,6 @@ py2_integration_sort:
# - export TOIL_AWS_ZONE=us-west-2a
# # This reads GITLAB_SECRET_FILE_SSH_KEYS
# - python setup_gitlab_ssh.py
# - mkdir -p ~/.aws
# - cp "$GITLAB_SECRET_FILE_AWS_CREDENTIALS" ~/.aws/credentials
# - python -m pytest src/toil/test/provisioners/aws/awsProvisionerTest.py


Expand Down Expand Up @@ -171,6 +162,18 @@ py3_main:
- python -m pytest src/toil/test/src
- python -m pytest src/toil/test/utils

py3_appliance_build:
stage: main_tests
script:
- pwd
- apt update && DEBIAN_FRONTEND=noninteractive apt install -y tzdata && apt install -y jq
- virtualenv -p python3.6 venv && . venv/bin/activate && make prepare && make develop extras=[all] && pip install htcondor awscli==1.16.272
# This reads GITLAB_SECRET_FILE_QUAY_CREDENTIALS
- python2.7 setup_gitlab_docker.py
- export TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:$(python version_template.py dockerTag)
- echo $TOIL_APPLIANCE_SELF
- make push_docker

#py3_integration:
# stage: integration
# script:
Expand All @@ -182,6 +185,4 @@ py3_main:
# - export TOIL_AWS_ZONE=us-west-2a
# # This reads GITLAB_SECRET_FILE_SSH_KEYS
# - python setup_gitlab_ssh.py
# - mkdir -p ~/.aws
# - cp "$GITLAB_SECRET_FILE_AWS_CREDENTIALS" ~/.aws/credentials
# - python -m pytest src/toil/test/jobStores/jobStoreTest.py
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ programming.
* Check the `website`_ for a description of Toil and its features.
* Full documentation for the latest stable release can be found at
`Read the Docs`_.
* Please subscribe to low-volume `announce`_ mailing list so we keep you informed
* Google Groups discussion `forum`_
* See our occasional `blog`_ for tutorials.
* Google Groups discussion `forum`_ and videochat `invite list`_.

.. _website: http://toil.ucsc-cgl.org/
.. _Read the Docs: https://toil.readthedocs.io/en/latest
.. _announce: https://groups.google.com/forum/#!forum/toil-announce
.. _forum: https://groups.google.com/forum/#!forum/toil-community
.. _invite list: https://groups.google.com/forum/#!forum/toil-community-videochats
.. _blog: https://toilpipelines.wordpress.com/

.. image:: https://badges.gitter.im/bd2k-genomics-toil/Lobby.svg
Expand Down
24 changes: 15 additions & 9 deletions docker/Dockerfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@

from __future__ import print_function
import os
import sys
import textwrap

applianceSelf = os.environ['TOIL_APPLIANCE_SELF']
sdistName = os.environ['_TOIL_SDIST_NAME']

# Make sure to install packages into the pip for the version of Python we are
# building for.
pip = 'pip{}'.format(sys.version_info[0])


dependencies = ' '.join(['libffi-dev', # For client side encryption for extras with PyNACL
'python3.6',
Expand Down Expand Up @@ -56,7 +61,7 @@ def heredoc(s):
Run toil <workflow>.py --help to see all options for running your workflow.
For more information see http://toil.readthedocs.io/en/latest/
Copyright (C) 2015-2018 Regents of the University of California
Copyright (C) 2015-2020 Regents of the University of California
Version: {applianceSelf}
Expand Down Expand Up @@ -85,8 +90,9 @@ def heredoc(s):
apt-get clean && \
rm -rf /var/lib/apt/lists/*
RUN wget https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz && \
tar xvf go1.13.3.linux-amd64.tar.gz && \
RUN wget -q https://dl.google.com/go/go1.13.3.linux-amd64.tar.gz && \
tar xf go1.13.3.linux-amd64.tar.gz && \
rm go1.13.3.linux-amd64.tar.gz && \
mv go/bin/* /usr/bin/ && \
mv go /usr/local/
Expand All @@ -110,16 +116,16 @@ def heredoc(s):
RUN chmod 777 /usr/bin/waitForKey.sh && chmod 777 /usr/bin/customDockerInit.sh
# The stock pip is too old and can't install from sdist with extras
RUN pip install --upgrade pip==9.0.1
RUN {pip} install --upgrade pip==9.0.1
# Default setuptools is too old
RUN pip install --upgrade setuptools==36.5.0
RUN {pip} install --upgrade setuptools==36.5.0
# Include virtualenv, as it is still the recommended way to deploy pipelines
RUN pip install --upgrade virtualenv==15.0.3
RUN {pip} install --upgrade virtualenv==15.0.3
# Install s3am (--never-download prevents silent upgrades to pip, wheel and setuptools)
RUN virtualenv --never-download /home/s3am \
RUN virtualenv --python python3 --never-download /home/s3am \
&& /home/s3am/bin/pip install s3am==2.0 \
&& ln -s /home/s3am/bin/s3am /usr/local/bin/
Expand All @@ -129,7 +135,7 @@ def heredoc(s):
&& chmod u+x /usr/local/bin/docker
# Fix for Mesos interface dependency missing on ubuntu
RUN pip install protobuf==3.0.0
RUN {pip} install protobuf==3.0.0
# Fix for https://issues.apache.org/jira/browse/MESOS-3793
ENV MESOS_LAUNCHER=posix
Expand All @@ -151,7 +157,7 @@ def heredoc(s):
# This component changes most frequently and keeping it last maximizes Docker cache hits.
COPY {sdistName} .
RUN pip install {sdistName}[all]
RUN {pip} install {sdistName}[all]
RUN rm {sdistName}
# We intentionally inherit the default ENTRYPOINT and CMD from the base image, to the effect
Expand Down
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Toil Documentation
Toil is an open-source pure-Python workflow engine that lets people write better pipelines.

Check out our `website`_ for a comprehensive list of Toil's features and read our `paper`_ to learn what Toil can do
in the real world. Feel free to also join us on `GitHub`_ and `Gitter`_.
in the real world. Please subscribe to our low-volume `announce`_ mailing list and feel free to also join us on `GitHub`_ and `Gitter`_.

If using Toil for your research, please cite

Expand All @@ -13,6 +13,7 @@ If using Toil for your research, please cite
http://doi.org/10.1038/nbt.3772

.. _website: http://toil.ucsc-cgl.org/
.. _announce: https://groups.google.com/forum/#!forum/toil-announce
.. _GridEngine: http://gridscheduler.sourceforge.net/
.. _Parasol: http://genecats.soe.ucsc.edu/eng/parasol.html
.. _Apache Mesos: http://mesos.apache.org/
Expand Down
13 changes: 6 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
# toil issue: https://github.com/DataBiosphere/toil/issues/2924
# very similar to this issue: https://github.com/mcfletch/pyopengl/issues/11
# the "right way" is waiting for a fix from "http-parser", but this fixes things in the meantime since that might take a while
cppflags_args = [i.strip() for i in os.environ.get('CPPFLAGS', '').split(' ') if i.strip()]
python_version = float('{}.{}'.format(str(sys.version_info.major), str(sys.version_info.minor)))
if python_version >= 3.7 and '-DPYPY_VERSION' not in cppflags_args:
raise RuntimeError('Toil requires the environment variable "CPPFLAGS" to contain "-DPYPY_VERSION" when installed '
'on python3.7 or higher. This can be set with:\n\n'
' export CPPFLAGS=$CPPFLAGS" -DPYPY_VERSION"\n\n')
cppflags = os.environ.get('CPPFLAGS')
if cppflags:
# note, duplicate options don't affect things here so we don't check - Mark D
os.environ['CPPFLAGS'] = ' '.join([cppflags, '-DPYPY_VERSION'])
else:
os.environ['CPPFLAGS'] = '-DPYPY_VERSION'


def runSetup():
Expand Down Expand Up @@ -98,7 +98,6 @@ def runSetup():
kubernetes_reqs = [
kubernetes]
mesos_reqs = [
http_parser,
pymesos,
psutil]
wdl_reqs = []
Expand Down
1 change: 0 additions & 1 deletion src/toil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

log = logging.getLogger(__name__)


def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""
Copy-pasted in from python3.6's shutil.which().
Expand Down
11 changes: 9 additions & 2 deletions src/toil/batchSystems/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,15 @@ def getUpdatedBatchJob(self, maxWait):
if containerStatuses is None or len(containerStatuses) == 0:
logger.debug("No job container statuses for job %s" % (pod.metadata.owner_references[0].name))
return (int(pod.metadata.owner_references[0].name[len(self.jobPrefix):]), -1, 0)
logger.info("REASON: %s Exit Code: %s" % (pod.status.container_statuses[0].state.terminated.reason,
pod.status.container_statuses[0].state.terminated.exit_code))

# Get termination onformation from the pod
termination = pod.status.container_statuses[0].state.terminated
logger.info("REASON: %s Exit Code: %s", termination.reason, termination.exit_code)

if termination.exit_code != 0:
# The pod failed. Dump information about it.
logger.debug('Failed pod information: %s', str(pod))
logger.warning('Log from failed pod: %s', self._getLogForPod(pod))
jobID = int(pod.metadata.owner_references[0].name[len(self.jobPrefix):])
terminated = pod.status.container_statuses[0].state.terminated
runtime = slow_down((terminated.finished_at - terminated.started_at).total_seconds())
Expand Down
34 changes: 25 additions & 9 deletions src/toil/deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ class DeferredFunctionManager(object):
be locked, and will take them over.
"""

# Define what directory the state directory should actaully be, under the base
STATE_DIR_STEM = 'deferred'
# Have a prefix to distinguish our deferred functions from e.g. NFS
# "silly rename" files, or other garbage that people put in our
# directory
PREFIX = 'func'
# And a suffix to distingusidh in-progress from completed files
WIP_SUFFIX = '.tmp'

def __init__(self, stateDirBase):
"""
Create a new DeferredFunctionManager, sharing state with other
Expand All @@ -115,12 +124,14 @@ def __init__(self, stateDirBase):
"""

# Work out where state files live
self.stateDir = os.path.join(stateDirBase, "deferred")
self.stateDir = os.path.join(stateDirBase, self.STATE_DIR_STEM)
mkdir_p(self.stateDir)

# We need to get a state file, locked by us and not somebody scanning for abandoned state files.
# So we suffix not-yet-ready ones with .tmp
self.stateFD, self.stateFileName = tempfile.mkstemp(dir=self.stateDir, suffix='.tmp')
# So we suffix not-yet-ready ones with our suffix
self.stateFD, self.stateFileName = tempfile.mkstemp(dir=self.stateDir,
prefix=self.PREFIX,
suffix=self.WIP_SUFFIX)

# Lock the state file. The lock will automatically go away if our process does.
try:
Expand All @@ -129,9 +140,9 @@ def __init__(self, stateDirBase):
# Someone else might have locked it even though they should not have.
raise RuntimeError("Could not lock deferred function state file %s: %s" % (self.stateFileName, str(e)))

# Rename it to remove the ".tmp"
os.rename(self.stateFileName, self.stateFileName[:-4])
self.stateFileName = self.stateFileName[:-4]
# Rename it to remove the suffix
os.rename(self.stateFileName, self.stateFileName[:-len(self.WIP_SUFFIX)])
self.stateFileName = self.stateFileName[:-len(self.WIP_SUFFIX)]

# Wrap the FD in a Python file object, which we will use to actually use it.
# Problem: we can't be readable and writable at the same time. So we need two file objects.
Expand Down Expand Up @@ -208,7 +219,7 @@ def cleanupWorker(cls, stateDirBase):
# Clean up the directory we have been using.
# It might not be empty if .tmp files escaped: nobody can tell they
# aren't just waiting to be locked.
shutil.rmtree(os.path.join(stateDirBase, "deferred"))
shutil.rmtree(os.path.join(stateDirBase, cls.STATE_DIR_STEM))



Expand Down Expand Up @@ -280,10 +291,14 @@ def _runOrphanedDeferredFunctions(self):
for filename in os.listdir(self.stateDir):
# Scan the whole directory for work nobody else owns.

if filename.endswith(".tmp"):
if filename.endswith(self.WIP_SUFFIX):
# Skip files from instances that are still being set up
continue

if not filename.startswith(self.PREFIX):
# Skip NFS deleted files and any other contaminants
continue

fullFilename = os.path.join(self.stateDir, filename)

if fullFilename == self.stateFileName:
Expand Down Expand Up @@ -332,7 +347,8 @@ def _runOrphanedDeferredFunctions(self):
# Unlock it
fcntl.lockf(fd, fcntl.LOCK_UN)

# Now close it.
# Now close it. This closes the backing file descriptor. See
# <https://stackoverflow.com/a/24984929>
fileObj.close()


Expand Down
31 changes: 28 additions & 3 deletions src/toil/lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
from __future__ import absolute_import
from future.utils import raise_
from builtins import range
import logging
import math
import sys
import threading
import traceback
if sys.version_info >= (3, 0):
from threading import BoundedSemaphore
else:
from threading import _BoundedSemaphore as BoundedSemaphore

import psutil

log = logging.getLogger(__name__)

class BoundedEmptySemaphore( BoundedSemaphore ):
"""
A bounded semaphore that is initially empty.
Expand Down Expand Up @@ -118,32 +122,53 @@ def cpu_count():
Ignores the cgroup's cpu shares value, because it's extremely difficult to
interpret. See https://github.com/kubernetes/kubernetes/issues/81021.
Caches result for efficiency.
:return: Integer count of available CPUs, minimum 1.
:rtype: int
"""

cached = getattr(cpu_count, 'result', None)
if cached is not None:
# We already got a CPU count.
return cached

# Get the fallback answer of all the CPUs on the machine
total_machine_size = psutil.cpu_count(logical=True)

log.debug('Total machine size: %d cores', total_machine_size)

try:
with open('/sys/fs/cgroup/cpu/cpu.cfs_quota_us', 'r') as stream:
# Read the quota
quota = int(stream.read)
quota = int(stream.read())

log.debug('CPU quota: %d', quota)

if quota == -1:
# Assume we can use the whole machine
return total_machine_size

with open('/sys/fs/cgroup/cpu/cpu.cfs_period_us', 'r') as stream:
# Read the period in which we are allowed to burn the quota
period = int(stream.read)
period = int(stream.read())

log.debug('CPU quota period: %d', period)

# The thread count is how many multiples of a wall clcok period we can burn in that period.
cgroup_size = int(math.ceil(float(quota)/float(period)))

log.debug('Cgroup size in cores: %d', cgroup_size)

except:
# We can't actually read these cgroup fields. Maybe we are a mac or something.
log.debug('Could not inspect cgroup: %s', traceback.format_exc())
cgroup_size = float('inf')

# Return the smaller of the actual thread count and the cgroup's limit, minimum 1.
return max(1, min(cgroup_size, total_machine_size))
result = max(1, min(cgroup_size, total_machine_size))
log.debug('cpu_count: %s', str(result))
# Make sure to remember it for the next call
setattr(cpu_count, 'result', result)
return result

Loading

0 comments on commit 56ad626

Please sign in to comment.