Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WFLY-19706 More gracefully handle the job execution status during a server crash. #590

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.jberet.spi.BatchEnvironment;
import org.wildfly.security.manager.WildFlySecurityManager;

public class JobOperatorImpl extends AbstractJobOperator implements JobOperator {
public class DefaultJobOperatorImpl extends AbstractJobOperator implements JobOperator {

private static final PrivilegedAction<BatchEnvironment> loaderAction = new PrivilegedAction<BatchEnvironment>() {
@Override
Expand All @@ -37,11 +37,11 @@ public BatchEnvironment run() {
final JobRepository repository;
private final BatchEnvironment batchEnvironment;

public JobOperatorImpl() throws BatchRuntimeException {
public DefaultJobOperatorImpl() throws BatchRuntimeException {
this(WildFlySecurityManager.isChecking() ? AccessController.doPrivileged(loaderAction) : loaderAction.run());
}

public JobOperatorImpl(final BatchEnvironment batchEnvironment) throws BatchRuntimeException {
public DefaultJobOperatorImpl(final BatchEnvironment batchEnvironment) throws BatchRuntimeException {
if (batchEnvironment == null) {
throw BatchMessages.MESSAGES.batchEnvironmentNotFound();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.jberet.operations;

import jakarta.batch.runtime.BatchStatus;
import org.jberet.runtime.JobExecutionImpl;

public class ForceStopJobOperatorImpl extends DefaultJobOperatorImpl {
public void forceStop(final long executionId) {
final JobExecutionImpl jobExecution = getJobExecutionImpl(executionId);
jobExecution.setBatchStatus(BatchStatus.STOPPED);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can set the exitstatus into CRASHED.

And we need to update the endtime to the current time.

We also need to update the other tables here:

  • partition_execution
  • step_execution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't thread safe. You'd really need all this to happen in something similar to a transaction which either all or none are updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I should put them into a single tx like the implementation in the current stop() method in JdbcRepository :

   @Override
    public void stopJobExecution(final JobExecutionImpl jobExecution) {
        super.stopJobExecution(jobExecution);
        final String[] stopExecutionSqls = {
                sqls.getProperty(STOP_JOB_EXECUTION),
                sqls.getProperty(STOP_STEP_EXECUTION),
                sqls.getProperty(STOP_PARTITION_EXECUTION)
        };
        final String jobExecutionIdString = String.valueOf(jobExecution.getExecutionId());
        final String newBatchStatus = BatchStatus.STOPPING.toString();
        final Connection connection = getConnection();
        Statement stmt = null;
        try {
            stmt = connection.createStatement();
            for (String sql : stopExecutionSqls) {
                stmt.addBatch(sql.replace("?", jobExecutionIdString));
            }
            stmt.executeBatch();
        } catch (Exception e) {
            throw BatchMessages.MESSAGES.failToRunQuery(e, Arrays.toString(stopExecutionSqls));
        } finally {
            close(connection, stmt, null, null);
        }
    }

jobExecution.setLastUpdatedTime(System.currentTimeMillis());
getJobRepository().updateJobExecution(jobExecution, false, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The relative step executions and partition executions need to be updated here too.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ public List<JobExecution> getJobExecutions(final JobInstance jobInstance) {
}
}

// todo
@Override
public List<JobExecution> getTimeoutJobExecutions(Long timeoutSeconds) {
return List.of();
}

@Override
public List<StepExecution> getStepExecutions(final long jobExecutionId, final ClassLoader classLoader) {
final JobExecutionImpl jobExecution = getJobExecution(jobExecutionId);
Expand Down
101 changes: 80 additions & 21 deletions jberet-core/src/main/java/org/jberet/repository/JdbcRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
Expand Down Expand Up @@ -76,6 +77,7 @@ public final class JdbcRepository extends AbstractPersistentRepository {

private static final String SELECT_ALL_JOB_EXECUTIONS = "select-all-job-executions";
private static final String SELECT_JOB_EXECUTIONS_BY_JOB_INSTANCE_ID = "select-job-executions-by-job-instance-id";
private static final String SELECT_JOB_EXECUTIONS_BY_TIMEOUT_SECONDS = "select-job-executions-by-timeout-seconds";
private static final String SELECT_RUNNING_JOB_EXECUTIONS_BY_JOB_NAME = "select-running-job-executions-by-job-name";
private static final String SELECT_JOB_EXECUTIONS_BY_JOB_NAME = "select-job-executions-by-job-name";
private static final String SELECT_JOB_EXECUTION = "select-job-execution";
Expand Down Expand Up @@ -206,7 +208,7 @@ private void createTables(final Properties configProperties) {
final String tablePrefix = configProperties.getProperty(DB_TABLE_PREFIX_KEY, "").trim();
final String tableSuffix = configProperties.getProperty(DB_TABLE_SUFFIX_KEY, "").trim();
final Pattern tableNamesPattern = tablePrefix.length() > 0 || tableSuffix.length() > 0 ?
Pattern.compile("JOB_INSTANCE|JOB_EXECUTION|STEP_EXECUTION|PARTITION_EXECUTION"): null;
Pattern.compile("JOB_INSTANCE|JOB_EXECUTION|STEP_EXECUTION|PARTITION_EXECUTION") : null;

final InputStream sqlResource = getClassLoader(false).getResourceAsStream(sqlFile);
try {
Expand Down Expand Up @@ -289,7 +291,7 @@ private void createTables(final Properties configProperties) {
countJobInstancesStatement.setString(1, "A");
rs = countJobInstancesStatement.executeQuery();
BatchLogger.LOGGER.tracef(
"This invocation needed to create tables since they didn't exit, but failed to create because they've been created by another concurrent invocation, so ignore the exception and return normally: %s", e1);
"This invocation needed to create tables since they didn't exit, but failed to create because they've been created by another concurrent invocation, so ignore the exception and return normally: %s", e1);
} catch (final SQLException sqle) {
//still cannot access the table, so fail it
throw BatchMessages.MESSAGES.failToCreateTables(e1, databaseProductName, ddlFile);
Expand Down Expand Up @@ -564,15 +566,15 @@ public JobExecutionImpl getJobExecution(final long jobExecutionId) {
if (result.getEndTime() == null && rs.getTimestamp(TableColumns.ENDTIME) != null) {
final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS));
result = new JobExecutionImpl(getJobInstance(jobInstanceId),
jobExecutionId,
BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)),
rs.getTimestamp(TableColumns.CREATETIME),
rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME),
rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS),
rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutionId,
BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)),
rs.getTimestamp(TableColumns.CREATETIME),
rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME),
rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS),
rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutions.replace(jobExecutionId,
new SoftReference<JobExecutionImpl, Long>(result, jobExecutionReferenceQueue, jobExecutionId));
}
Expand Down Expand Up @@ -631,10 +633,10 @@ public List<JobExecution> getJobExecutions(final JobInstance jobInstance) {
final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS));
jobExecution1 =
new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1,
rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutions.replace(executionId,
new SoftReference<JobExecutionImpl, Long>(jobExecution1, jobExecutionReferenceQueue, executionId));
}
Expand All @@ -650,12 +652,67 @@ public List<JobExecution> getJobExecutions(final JobInstance jobInstance) {
return result;
}

@Override
public List<JobExecution> getTimeoutJobExecutions(Long timeoutSeconds) {
final String query = sqls.getProperty(SELECT_JOB_EXECUTIONS_BY_TIMEOUT_SECONDS);
long jobInstanceId = 0;
final List<JobExecution> result = new ArrayList<JobExecution>();
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(query);

Timestamp timeoutTime = Timestamp.from(Instant.now().plusSeconds(timeoutSeconds));

preparedStatement.setTimestamp(1, timeoutTime);
rs = preparedStatement.executeQuery();
while (rs.next()) {
final long executionId = rs.getLong(TableColumns.JOBEXECUTIONID);
final SoftReference<JobExecutionImpl, Long> ref = jobExecutions.get(executionId);
JobExecutionImpl jobExecution1 = (ref != null) ? ref.get() : null;
if (jobExecution1 == null) {
jobInstanceId = rs.getLong(TableColumns.JOBINSTANCEID);
final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS));
jobExecution1 =
new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1,
rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));

jobExecutions.put(executionId,
new SoftReference<JobExecutionImpl, Long>(jobExecution1, jobExecutionReferenceQueue, executionId));
} else {
if (jobExecution1.getEndTime() == null && rs.getTimestamp(TableColumns.ENDTIME) != null) {
final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS));
jobExecution1 =
new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1,
rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutions.replace(executionId,
new SoftReference<JobExecutionImpl, Long>(jobExecution1, jobExecutionReferenceQueue, executionId));
}
}
// jobExecution1 is either got from the cache, or created, now add it to the result list
result.add(jobExecution1);
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, query);
} finally {
close(connection, preparedStatement, null, rs);
}
return result;
}

private boolean isExecutionStale(final JobExecutionImpl jobExecution) {
final BatchStatus jobStatus = jobExecution.getBatchStatus();
if (jobStatus.equals(BatchStatus.COMPLETED) ||
jobStatus.equals(BatchStatus.FAILED) ||
jobStatus.equals(BatchStatus.STOPPED) ||
jobStatus.equals(BatchStatus.ABANDONED) || jobExecution.getStepExecutions().size() >= 1) {
jobStatus.equals(BatchStatus.FAILED) ||
jobStatus.equals(BatchStatus.STOPPED) ||
jobStatus.equals(BatchStatus.ABANDONED) || jobExecution.getStepExecutions().size() >= 1) {
return false;
}

Expand Down Expand Up @@ -878,8 +935,9 @@ public List<PartitionExecutionImpl> getPartitionExecutions(final long stepExecut

/**
* Updates the partition execution in job repository, using the {@code updateSql} passed in.
*
* @param partitionExecution the partition execution to update to job repository
* @param updateSql the update sql to use
* @param updateSql the update sql to use
* @return the number of rows affected by this update sql execution
*/
private int updatePartitionExecution(final PartitionExecutionImpl partitionExecution, final String updateSql) {
Expand All @@ -906,8 +964,9 @@ private int updatePartitionExecution(final PartitionExecutionImpl partitionExecu

/**
* Updates the step execution in job repository, using the {@code updateSql} passed in.
*
* @param stepExecution the step execution to update to job repository
* @param updateSql the update sql to use
* @param updateSql the update sql to use
* @return the number of rows affected by this update sql execution
*/
private int updateStepExecution0(final StepExecution stepExecution, final String updateSql) {
Expand Down Expand Up @@ -1058,7 +1117,7 @@ public void executeStatements(final String statements, final String statementsRe
}

private List<Long> getJobExecutions0(final String selectSql, final String jobName, final boolean runningExecutionsOnly,
final Integer limit) {
final Integer limit) {
final List<Long> result = new ArrayList<>();
Connection connection = null;
ResultSet rs = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import jakarta.batch.runtime.StepExecution;

import org.jberet.job.model.Job;
import org.jberet.operations.DefaultJobOperatorImpl;
import org.jberet.runtime.AbstractStepExecution;
import org.jberet.runtime.JobExecutionImpl;
import org.jberet.runtime.JobInstanceImpl;
Expand Down Expand Up @@ -58,6 +59,8 @@ public interface JobRepository {
JobExecution getJobExecution(long jobExecutionId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);

List<JobExecution> getTimeoutJobExecutions(Long timeoutSeconds);

/**
* Gets job execution ids belonging to the job identified by the {@code jobName}.
* @param jobName the job name identifying the job
Expand All @@ -66,6 +69,8 @@ public interface JobRepository {
*/
List<Long> getJobExecutionsByJob(String jobName);



/**
* Gets job execution ids belonging to the job identified by the {@code jobName}.
*
Expand Down Expand Up @@ -95,7 +100,7 @@ public interface JobRepository {
* @return a list of job execution ids
*
* @since 1.1.0.Final
* @see org.jberet.operations.JobOperatorImpl#getRunningExecutions(java.lang.String)
* @see DefaultJobOperatorImpl#getRunningExecutions(java.lang.String)
*/
List<Long> getRunningExecutions(final String jobName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ private enum ChunkState {
TO_END_RETRY, //need to end retrying the current chunk
TO_START_NEW, //the current chunk is done and need to start a new chunk next
DEPLETED, //no more input items, the processing can still go to next iteration so this last item can be retried

JOB_STOPPING, //the job has been requested to stop
JOB_STOPPED
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

package org.jberet.spi;

import org.jberet.operations.JobOperatorImpl;
import org.jberet.operations.DefaultJobOperatorImpl;

/**
* A default context selector.
Expand All @@ -29,7 +29,7 @@ public class DefaultJobOperatorContextSelector implements JobOperatorContextSele
* Creates a new default context selector
*/
public DefaultJobOperatorContextSelector() {
jobOperatorContext = JobOperatorContext.create(new JobOperatorImpl());
jobOperatorContext = JobOperatorContext.create(new DefaultJobOperatorImpl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.jberet.spi;

import org.jberet.operations.ForceStopJobOperatorImpl;

public class ForceStopJobOperatorContextSelector implements JobOperatorContextSelector {
private final JobOperatorContext jobOperatorContext;

/**
* Creates a new default context selector
*/
public ForceStopJobOperatorContextSelector() {
jobOperatorContext = JobOperatorContext.create(new ForceStopJobOperatorImpl());
}

@Override
public JobOperatorContext getJobOperatorContext() {
return jobOperatorContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import jakarta.batch.operations.JobOperator;

import org.jberet.operations.JobOperatorImpl;
import org.jberet.operations.DefaultJobOperatorImpl;
import org.jberet.util.Assertions;

/**
Expand Down Expand Up @@ -58,7 +58,7 @@ public static JobOperatorContext getJobOperatorContext() {
* @return the new context
*/
public static JobOperatorContext create(final BatchEnvironment batchEnvironment) {
final JobOperator jobOperator = new JobOperatorImpl(Assertions.notNull(batchEnvironment, "batchEnvironment"));
final JobOperator jobOperator = new DefaultJobOperatorImpl(Assertions.notNull(batchEnvironment, "batchEnvironment"));
return new JobOperatorContext() {
@Override
public JobOperator getJobOperator() {
Expand Down
3 changes: 3 additions & 0 deletions jberet-core/src/main/resources/sql/jberet-sql.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ insert-job-instance = INSERT INTO JOB_INSTANCE(JOBNAME, APPLICATIONNAME) VALUES(

select-all-job-executions = SELECT * FROM JOB_EXECUTION
select-job-executions-by-job-instance-id = SELECT * FROM JOB_EXECUTION WHERE JOBINSTANCEID=? ORDER BY JOBEXECUTIONID

select-job-executions-by-timeout-seconds = SELECT * FROM JOB_EXECUTION WHERE lastupdatedtime < ? AND batchstatus in ('STOPPING', 'STARTED', 'STARTING')

select-job-execution = SELECT * FROM JOB_EXECUTION WHERE JOBEXECUTIONID=?
select-running-job-executions-by-job-name = SELECT JOB_EXECUTION.JOBEXECUTIONID FROM JOB_EXECUTION \
INNER JOIN JOB_INSTANCE ON JOB_EXECUTION.JOBINSTANCEID=JOB_INSTANCE.JOBINSTANCEID \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Properties;
import javax.transaction.xa.XAResource;

import jakarta.batch.runtime.JobExecution;
import jakarta.transaction.HeuristicMixedException;
import jakarta.transaction.HeuristicRollbackException;
import jakarta.transaction.InvalidTransactionException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public List<JobExecution> getJobExecutions(final JobInstance jobInstance) {
return result;
}

// todo
@Override
public List<JobExecution> getTimeoutJobExecutions(Long timeoutSeconds) {
return List.of();
}

@Override
public List<StepExecution> getStepExecutions(final long jobExecutionId, final ClassLoader classLoader) {
final JobExecutionImpl jobExecution = jobExecutionCache.get(jobExecutionId);
Expand Down
Loading
Loading