Skip to content

Commit

Permalink
AMQ-9552: Add metric lastMessageTimestamp to StatisticsBroker based o…
Browse files Browse the repository at this point in the history
…n lastSampleTime of metric enqueues (if hasUpdated)
  • Loading branch information
Grzegorz Kochański authored and Grzegorz Kochański committed Aug 23, 2024
1 parent e45ee4a commit 4c307df
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ else if (dest instanceof Topic) {
tempFirstMessage.clear();
}
}
if ( stats.getEnqueues().hasUpdated() ) {
// NOTICE: Client-side, you may get the broker "now" Timestamp by msg.getJMSTimestamp()
// This allows for calculating inactivity.
statsMessage.setLong("lastMessageTimestamp", stats.getEnqueues().getLastSampleTime());
}
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class StatisticImpl implements Statistic, Resettable {
private String description;
private long startTime;
private long lastSampleTime;
private boolean hasUpdated;
private boolean doReset = true;

public StatisticImpl(String name, String unit, String description) {
Expand All @@ -38,17 +39,20 @@ public StatisticImpl(String name, String unit, String description) {
this.description = description;
this.startTime = System.currentTimeMillis();
this.lastSampleTime = this.startTime;
this.hasUpdated = false;
}

public synchronized void reset() {
if(isDoReset()) {
this.startTime = System.currentTimeMillis();
this.lastSampleTime = this.startTime;
this.hasUpdated = false;
}
}

protected synchronized void updateSampleTime() {
this.lastSampleTime = System.currentTimeMillis();
this.hasUpdated = true;
}

public synchronized String toString() {
Expand Down Expand Up @@ -101,6 +105,9 @@ public boolean isDoReset() {
return this.doReset;
}

public boolean hasUpdated(){
return hasUpdated;
}
/**
* @param doReset the doReset to set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ public void testStatistic() throws Exception {
TimeStatisticImpl stat = new TimeStatisticImpl("myTimer", "millis", "myDescription");
assertStatistic(stat, "myTimer", "millis", "myDescription");

assertFalse(stat.hasUpdated());

assertEquals(0, stat.getCount());

stat.addTime(100);
assertTrue(stat.hasUpdated());
assertEquals(1, stat.getCount());
assertEquals(100, stat.getMinTime());
assertEquals(100, stat.getMaxTime());
Expand All @@ -59,6 +62,7 @@ public void testStatistic() throws Exception {
LOG.info("Stat is: " + stat);

stat.reset();
assertFalse(stat.hasUpdated());

assertEquals(0, stat.getCount());
assertEquals(0, stat.getMinTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public void testDestinationStatsWithFirstMessageTimestamp() throws Exception {
assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0);
assertTrue(reply.getLong("firstMessageTimestamp") > 0);
assertTrue(reply.getLong("lastMessageTimestamp") > 0);
// Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp"
assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp"));
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
Expand Down

0 comments on commit 4c307df

Please sign in to comment.