Skip to content

Commit

Permalink
Add option for deleting all scheduled messages on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
kenliao94 committed Dec 25, 2024
1 parent aa842da commit 6747504
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class BrokerService implements Service {
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
// to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean deleteAllScheduledMessagesOnStartup = false;
private boolean advisorySupport = true;
private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI;
Expand Down Expand Up @@ -1630,6 +1631,18 @@ public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStar
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
}

/**
* Sets whether all scheduled messages are deleted on startup
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
*/
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) {
this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup;
}

public boolean isDeleteAllScheduledMessagesOnStartup() {
return deleteAllScheduledMessagesOnStartup;
}

public URI getVmConnectorURI() {
if (vmConnectorURI == null) {
try {
Expand Down Expand Up @@ -2440,6 +2453,7 @@ protected Broker addInterceptors(Broker broker) throws Exception {
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
sb.setDeleteAllScheduledMessagesOnStartup(deleteAllScheduledMessagesOnStartup);
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
private final JobSchedulerStore store;
private JobScheduler scheduler;
private int maxRepeatAllowed = MAX_REPEAT_ALLOWED;
private boolean deleteAllScheduledMessagesOnStartup;

public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
super(next);
Expand Down Expand Up @@ -212,6 +213,9 @@ public synchronized JobScheduler getJobScheduler() throws Exception {
public void start() throws Exception {
this.started.set(true);
getInternalScheduler();
if (deleteAllScheduledMessagesOnStartup) {
deleteAllScheduledMessages();
}
super.start();
}

Expand Down Expand Up @@ -364,6 +368,11 @@ private void doSchedule(Message messageSend, Object cronValue, Object periodValu
new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
}

private void deleteAllScheduledMessages() throws Exception {
LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided");
getInternalScheduler().removeAllJobs();
}

@Override
public void scheduledJob(String id, ByteSequence job) {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
Expand Down Expand Up @@ -487,4 +496,12 @@ public int getMaxRepeatAllowed() {
public void setMaxRepeatAllowed(int maxRepeatAllowed) {
this.maxRepeatAllowed = maxRepeatAllowed;
}

public boolean getDeleteAllScheduledMessagesOnStartup() {
return deleteAllScheduledMessagesOnStartup;
}

public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) {
this.deleteAllScheduledMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.activemq.broker.scheduler;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.ScheduledMessage;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class JmsSchedulerDeleteAllMessageOnStartupOptionTest extends JobSchedulerTestSupport {

private static final transient Logger LOG = LoggerFactory.getLogger(JmsSchedulerDeleteAllMessageOnStartupOptionTest.class);

@Override
protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception {
return true;
}

@Test
public void testDeleteAllMessageOnRestart() throws Exception {
// Send a message delayed by 8 seconds
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
long time_ms = 10 * 1000;
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time_ms);
producer.send(message);
producer.close();
// Shutdown broker
restartBroker(RestartType.NORMAL);
// Make sure the consumer won't get the message
connection = createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
final int COUNT = 1;
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
});
latch.await(20, TimeUnit.SECONDS);
assertEquals(latch.getCount(), COUNT);
connection.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ protected BrokerService createBroker(boolean delete) throws Exception {
answer.setSchedulerDirectoryFile(schedulerDirectory);
answer.setSchedulerSupport(true);
answer.setUseJmx(isUseJmx());
answer.setDeleteAllScheduledMessagesOnStartup(shouldDeleteAllScheduledMessagesOnStartup());
return answer;
}

Expand All @@ -136,4 +137,8 @@ protected void restartBroker(RestartType restartType) throws Exception {
broker.start();
broker.waitUntilStarted();
}

protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void testBrokerConfig() throws Exception {
assertEquals("Broker Config Error (persistent)", false, broker.isPersistent());
assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook());
assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup());
assertEquals("Broker Config Error (deleteAllScheduledMessagesOnStartup)", true, broker.isDeleteAllScheduledMessagesOnStartup());
LOG.info("Success");

// Check specific vm transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
useLoggingForShutdownErrors="true" useJmx="true"
persistent="false" vmConnectorURI="vm://javacoola"
useShutdownHook="false" deleteAllMessagesOnStartup="true">
useShutdownHook="false" deleteAllMessagesOnStartup="true" deleteAllScheduledMessagesOnStartup="true">

<!--
|| NOTE this config file is used for unit testing the configuration mechanism
Expand Down

0 comments on commit 6747504

Please sign in to comment.