Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,10 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfCompleted() {
if( !podName ) throw new IllegalStateException("Missing K8s ${resourceType.lower()} name - cannot check if complete")
def state = getState()
if( !podName )
throw new IllegalStateException("Missing K8s ${resourceType.lower()} name - cannot check if complete")

final state = getState()
if( state && state.terminated ) {
if( state.nodeTermination instanceof NodeTerminationException ||
state.nodeTermination instanceof PodUnschedulableException ) {
Expand All @@ -441,8 +443,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
task.stderr = errorFile
}
status = TaskStatus.COMPLETED
savePodLogOnError(task)
deletePodIfSuccessful(task)
saveJobLogOnError(task)
deleteJobIfSuccessful(task)
updateTimestamps(state.terminated as Map)
determineNode()
return true
Expand All @@ -451,7 +453,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
return false
}

protected void savePodLogOnError(TaskRun task) {
protected void saveJobLogOnError(TaskRun task) {
if( task.isSuccess() )
return

Expand Down Expand Up @@ -491,53 +493,55 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
*/
@Override
protected void killTask() {
if( !podName )
return

if( cleanupDisabled() )
return

if( podName ) {
log.trace "[K8s] deleting ${resourceType.lower()} name=$podName"
if ( useJobResource() )
client.jobDelete(podName)
else
client.podDelete(podName)
}
else {
log.debug "[K8s] Invalid delete action"
}
log.trace "[K8s] deleting ${resourceType.lower()} name=$podName"
delete0(podName)
}

protected boolean cleanupDisabled() {
!k8sConfig.getCleanup()
}

protected void deletePodIfSuccessful(TaskRun task) {
protected void deleteJobIfSuccessful(TaskRun task) {
if( !podName )
return

if( cleanupDisabled() )
return

if( !task.isSuccess() ) {
// do not delete successfully executed pods for debugging purpose
// preserve failed pods for debugging purposes
if( !task.isSuccess() )
return
}

// k8s cluster will cleanup job on its own if TTL is set
if( useJobResource() && getPodOptions().getTtlSecondsAfterFinished() != null )
return

delete0(podName)
}

private void delete0(String podName) {
try {
if ( useJobResource() )
client.jobDelete(podName)
else
client.podDelete(podName)
}
catch( Exception e ) {
log.warn "Unable to cleanup ${resourceType.lower()}: $podName -- see the log file for details", e
log.warn "Unable to delete ${resourceType.lower()}: $podName -- see the log file for details", e
}
}

private void determineNode(){
private void determineNode() {
try {
if ( k8sConfig.fetchNodeName() && !runsOnNode )
runsOnNode = client.getNodeOfPod( podName )
} catch ( Exception e ){
} catch ( Exception e ) {
log.warn ("Unable to get the node name of pod $podName -- see the log file for details", e)
}
}
Expand Down
45 changes: 36 additions & 9 deletions plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ class K8sTaskHandlerTest extends Specification {
1 * handler.getState() >> fullState
1 * handler.updateTimestamps(termState)
1 * handler.readExitFile() >> EXIT_STATUS
1 * handler.deletePodIfSuccessful(task) >> null
1 * handler.savePodLogOnError(task) >> null
1 * handler.deleteJobIfSuccessful(task) >> null
1 * handler.saveJobLogOnError(task) >> null
handler.task.exitStatus == EXIT_STATUS
handler.task.@stdout == OUT_FILE
handler.task.@stderr == ERR_FILE
Expand Down Expand Up @@ -528,8 +528,8 @@ class K8sTaskHandlerTest extends Specification {
1 * handler.getState() >> [terminated: termState]
1 * handler.updateTimestamps(termState)
0 * handler.readExitFile()
1 * handler.deletePodIfSuccessful(task) >> null
1 * handler.savePodLogOnError(task) >> null
1 * handler.deleteJobIfSuccessful(task) >> null
1 * handler.saveJobLogOnError(task) >> null
handler.task.exitStatus == 137
handler.status == TaskStatus.COMPLETED
result == true
Expand Down Expand Up @@ -764,30 +764,57 @@ class K8sTaskHandlerTest extends Specification {
def executor = Mock(K8sExecutor)
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
handler.useJobResource() >> false
and:
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true
def TASK_FAIL = Mock(TaskRun); TASK_FAIL.isSuccess() >> false

when:
handler.deletePodIfSuccessful(TASK_OK)
handler.deleteJobIfSuccessful(TASK_OK)
then:
1 * executor.getK8sConfig() >> new K8sConfig()
1 * client.podDelete(POD_NAME) >> null

when:
handler.deletePodIfSuccessful(TASK_OK)
handler.deleteJobIfSuccessful(TASK_OK)
then:
1 * executor.getK8sConfig() >> new K8sConfig(cleanup: true)
1 * client.podDelete(POD_NAME) >> null

when:
handler.deletePodIfSuccessful(TASK_FAIL)
handler.deleteJobIfSuccessful(TASK_FAIL)
then:
1 * executor.getK8sConfig() >> new K8sConfig(cleanup: false)
0 * client.podDelete(POD_NAME) >> null

}

def 'should not delete job if ttlSecondsAfterFinished is set' () {

given:
def POD_NAME = 'the-job-name'
def executor = Mock(K8sExecutor)
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
handler.useJobResource() >> true
and:
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true

when: 'job with ttlSecondsAfterFinished should not be deleted'
handler.deleteJobIfSuccessful(TASK_OK)
then:
1 * executor.getK8sConfig() >> new K8sConfig()
1 * handler.getPodOptions() >> new PodOptions([[ttlSecondsAfterFinished: 100]])
0 * client.jobDelete(POD_NAME)

when: 'job without ttlSecondsAfterFinished should be deleted'
handler.deleteJobIfSuccessful(TASK_OK)
then:
1 * executor.getK8sConfig() >> new K8sConfig()
1 * handler.getPodOptions() >> new PodOptions()
1 * client.jobDelete(POD_NAME) >> null
}

def 'should save pod log' () {

given:
Expand All @@ -803,13 +830,13 @@ class K8sTaskHandlerTest extends Specification {
def handler = Spy(new K8sTaskHandler(executor: executor, client: client, podName: POD_NAME))

when:
handler.savePodLogOnError(task)
handler.saveJobLogOnError(task)
then:
task.isSuccess() >> true
0 * client.podLog(_)

when:
handler.savePodLogOnError(task)
handler.saveJobLogOnError(task)
then:
task.isSuccess() >> false
task.getWorkDir() >> folder
Expand Down
Loading