Skip to content

Commit

Permalink
Fail the run if publish thread pool times out (nextflow-io#5578) [ci …
Browse files Browse the repository at this point in the history
…fast]



Signed-off-by: Ben Sherman <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
bentsherman and pditommaso authored Jan 23, 2025
1 parent 55f25b3 commit 5325e5a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 42 deletions.
14 changes: 12 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.file.Paths
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException

import com.google.common.hash.HashCode
import groovy.transform.CompileDynamic
Expand Down Expand Up @@ -689,8 +690,17 @@ class Session implements ISession {
try {
log.trace "Session > destroying"
// shutdown thread pools
finalizePoolManager?.shutdown(aborted)
publishPoolManager?.shutdown(aborted)
try {
finalizePoolManager?.shutdownOrAbort(aborted,this)
publishPoolManager?.shutdownOrAbort(aborted,this)
}
catch( TimeoutException e ) {
final ignoreErrors = config.navigate('workflow.output.ignoreErrors', false)
if( !ignoreErrors )
throw new AbortOperationException("Timed out while waiting to publish outputs")
else
log.warn e.message
}
// invoke shutdown callbacks
shutdown0()
log.trace "Session > after cleanup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package nextflow.util
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import jdk.internal.vm.ThreadContainer

/**
* Thread pool helpers
*
Expand All @@ -34,7 +34,7 @@ import jdk.internal.vm.ThreadContainer
@Slf4j
class ThreadPoolHelper {

static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) {
static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) throws TimeoutException {
final max = maxAwait.millis
final t0 = System.currentTimeMillis()
// wait for ongoing file transfer to complete
Expand All @@ -45,10 +45,8 @@ class ThreadPoolHelper {
break

final delta = System.currentTimeMillis()-t0
if( delta > max ) {
log.warn(exitMsg)
break
}
if( delta > max )
throw new TimeoutException(exitMsg)

// log to console every 10 minutes (120 * 5 sec)
if( count % 120 == 0 ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package nextflow.util

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.ISession
import nextflow.Session
import nextflow.exception.AbortOperationException

/**
* Holder object for file transfer thread pool
*
Expand Down Expand Up @@ -129,6 +132,19 @@ class ThreadPoolManager {
log.debug "Thread pool '$name' shutdown completed (hard=$hard)"
}

void shutdownOrAbort(boolean hard, Session session) throws AbortOperationException {
try {
shutdown(hard)
}
catch( TimeoutException e ) {
final ignoreErrors = session.config.navigate('workflow.output.ignoreErrors', false)
if( ignoreErrors )
log.warn(e.message)
else
throw new AbortOperationException("Timed out while waiting to publish outputs", e)
}
}

static ExecutorService create(String name, int maxThreads=0) {
final session = Global.session as Session
new ThreadPoolManager(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import spock.lang.Specification
*/
class TaskConfigTest extends Specification {


def testShell() {

when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.cloud.aws.batch

import java.nio.file.Path
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import com.amazonaws.services.batch.AWSBatch
import com.amazonaws.services.batch.model.AWSBatchException
Expand Down Expand Up @@ -320,7 +321,17 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
reaper.shutdown()
final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)"
final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated"
ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg)
awaitCompletion(reaper, Duration.of('60min'), waitMsg, exitMsg)

}

protected void awaitCompletion(ThrottlingExecutor executor, Duration duration, String waitMsg, String exitMsg) {
try {
ThreadPoolHelper.await(executor, duration, waitMsg, exitMsg)
}
catch( TimeoutException e ) {
log.warn(e.message, e)
}
}

@Override
Expand Down
14 changes: 0 additions & 14 deletions plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,18 +660,4 @@ public void uploadDirectory(File source, S3Path target) {
String getObjectKmsKeyId(String bucketName, String key) {
return getObjectMetadata(bucketName,key).getSSEAwsKmsKeyId();
}

protected void showdownTransferPool(boolean hard) {
log.debug("Initiating transfer manager shutdown (hard={})", hard);
if( hard ) {
transferPool.shutdownNow();
}
else {
// await pool completion
transferPool.shutdown();
final String waitMsg = "[AWS S3] Waiting files transfer to complete (%d files)";
final String exitMsg = "[AWS S3] Exiting before FileTransfer thread pool complete -- Some files maybe lost";
ThreadPoolHelper.await(transferPool, Duration.of("1h"), waitMsg, exitMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -665,22 +665,4 @@ static synchronized ExecutorService getOrCreateExecutor(int maxThreads) {
return executorSingleton;
}

/**
* Shutdown the executor and clear the singleton
*/
static void shutdownExecutor(boolean hard) {
if( hard ) {
executorSingleton.shutdownNow();
}
else {
executorSingleton.shutdown();
log.trace("Uploader await completion");
final String waitMsg = "[AWS S3] Waiting stream uploader to complete (%d files)";
final String exitMsg = "[AWS S3] Exiting before stream uploader thread pool complete -- Some files maybe lost";
ThreadPoolHelper.await(executorSingleton, Duration.of("1h") ,waitMsg, exitMsg);
log.trace("Uploader shutdown completed");
executorSingleton = null;
}
}

}

0 comments on commit 5325e5a

Please sign in to comment.