Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMQ-9646] Support selecting specific messages for command line backup #1377

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ public interface MessageStore extends Service {

void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception;

void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean revertOrderIndex) throws Exception;

default void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, MessageRecoveryListener listener, final boolean revertOrderIndex) throws Exception {
throw new UnsupportedOperationException("recoverNextMessages(startMsgId,endMsgId,maxReturned,listener,revertOrderIndex) is not supported");
}

void dispose(ConnectionContext context);

/**
Expand Down Expand Up @@ -211,4 +217,5 @@ public interface MessageStore extends Service {
void updateMessage(Message message) throws IOException;

void registerIndexListener(IndexListener indexListener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryList
delegate.recoverNextMessages(offset, maxReturned, listener);
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener, boolean revertOrderIndex) throws Exception {
delegate.recoverNextMessages(offset, maxReturned, listener, revertOrderIndex);
}

@Override
public void recoverNextMessages(String startMsgId, String endMsgId, int maxReturned, MessageRecoveryListener listener, boolean revertOrderIndex) throws Exception {
delegate.recoverNextMessages(startMsgId, endMsgId, maxReturned, listener, revertOrderIndex);
}

@Override
public void resetBatching() {
delegate.resetBatching();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void delete() {
}

@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
Expand All @@ -131,7 +131,7 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
int position = 0;
Expand All @@ -156,6 +156,36 @@ public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryList
}
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener, final boolean revertOrderIndex) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
int position = 0;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
if(offset > 0 && offset > position) {
position++;
continue;
}
if (pastLackBatch) {
Object msg = entry.getValue();
lastBatchId = entry.getKey();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId) msg);
} else {
listener.recoverMessage((Message) msg);
}
} else {
pastLackBatch = entry.getKey().equals(lastBatchId);
}
position++;
}

if(revertOrderIndex) {
position = 0;
}
}
}

@Override
public void resetBatching() {
lastBatchId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
Expand All @@ -43,7 +47,7 @@
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.UTF8Buffer;
Expand All @@ -62,6 +66,11 @@ public class StoreBackup {
String queue;
Integer offset;
Integer count;
String indexes;
Collection<Integer> indexesList;

String startMsgId;
String endMsgId;

private final ObjectMapper mapper = new ObjectMapper();
private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
Expand All @@ -88,6 +97,14 @@ public void execute() throws Exception {
throw new Exception("optional --offset and --count must be specified together");
}

if ((startMsgId != null || endMsgId != null) && queue == null) {
throw new Exception("optional --queue must be specified when using startMsgId or endMsgId");
}

if (indexes != null && !indexes.isBlank()) {
indexesList = parseIndexesParam(indexes);
}

setFile(new File(filename));
System.out.println("Loading: " + config);
BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
Expand Down Expand Up @@ -178,7 +195,19 @@ public boolean recoverMessage(Message message) throws IOException {
return true;
}
};
if(offset != null) {
if(startMsgId != null || endMsgId != null) {
System.out.println("Backing up from startMsgId: " + startMsgId + " to endMsgId: " + endMsgId);
queue.recoverNextMessages(startMsgId, endMsgId, (count != null ? count : Integer.MAX_VALUE), queueRecoveryListener, true);
} else if(indexesList != null) {
System.out.println("Backing up using indexes count: " + indexesList.size());
for(int idx : indexesList) {
if(idx < 0) {
continue;
}
queue.recoverNextMessages(idx, 1, queueRecoveryListener, true);
}
} else if(offset != null) {
System.out.println("Backing up from offset: " + offset + " count: " + count);
queue.recoverNextMessages(offset, count, queueRecoveryListener);
} else {
queue.recover(queueRecoveryListener);
Expand Down Expand Up @@ -265,14 +294,14 @@ private QueueEntryPB createQueueEntryPB(Message message, long queueKey, long que
private MessagePB createMessagePB(Message message, long messageKey) throws IOException {
DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
mos.writeBoolean(TIGHT_ENCODING);
mos.writeVarInt(OPENWIRE_VERSION);
mos.writeInt(OPENWIRE_VERSION);
wireformat.marshal(message, mos);

MessagePB messageRecord = new MessagePB();
messageRecord.setCodec(codec_id);
messageRecord.setMessageKey(messageKey);
messageRecord.setSize(message.getSize());
messageRecord.setValue(new Buffer(mos.toBuffer().getData()));
messageRecord.setValue(new Buffer(mos.getData()));
return messageRecord;
}

Expand Down Expand Up @@ -323,4 +352,45 @@ public void setCount(int count) {
public Integer getCount() {
return count;
}

public void setIndexes(String indexes) {
this.indexes = indexes;
}

public String getIndexes() {
return indexes;
}

public String getStartMsgId() {
return startMsgId;
}

public void setStartMsgId(String startMsgId) {
this.startMsgId = startMsgId;
}

public String getEndMsgId() {
return endMsgId;
}

public void setEndMsgId(String endMsgId) {
this.endMsgId = endMsgId;
}

private Collection<Integer> parseIndexesParam(final String indexesParam) {
String tmp = indexesParam;
String[] parts;
if(tmp.contains(",")) {
parts = tmp.split(",");
} else {
return Set.of(Integer.valueOf(tmp));
}

List<Integer> indexes = new LinkedList<>();
for(String tmpPart : parts) {
indexes.add(Integer.valueOf(tmpPart.trim()));
}

return indexes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,28 @@ public boolean recoverMessageReference(String reference) throws Exception {
* @param listener
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
* org.apache.activemq.store.MessageRecoveryListener)
* int, org.apache.activemq.store.MessageRecoveryListener)
*/
@Override
public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener) throws Exception {
throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) is not supported.");
}

/**
* @param offset
* @param maxReturned
* @param listener
* @param resetOrderIndex
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
* int, org.apache.activemq.store.MessageRecoveryListener,
* boolean)
*/
@Override
public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener, final boolean resetOrderIndex) throws Exception {
throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener,resetOrderIndex) is not supported.");
}

public void trackRollbackAck(Message message) {
synchronized (rolledBackAcks) {
rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,24 +734,31 @@ public void execute(Transaction tx) throws Exception {

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
recoverNextMessages(offset, maxReturned, listener, false);
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener, final boolean revertOrderIndex) throws Exception {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int tmpOffset = (offset >= 0 ? offset : 0);
int position = 0;
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();

if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
if(tmpOffset > position) {
position++;
continue;
}

if(offset > 0 && offset > position) {
if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
position++;
continue;
}
Expand All @@ -766,6 +773,83 @@ public void execute(Transaction tx) throws Exception {
}
}
sd.orderIndex.stoppedIterating();

if(revertOrderIndex && position > 0) {
sd.orderIndex.resetCursorPosition();
}
}
});
} finally {
indexLock.writeLock().unlock();
}
}

@Override
public void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, final MessageRecoveryListener listener, final boolean revertOrderIndex) throws Exception {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long startOffset = null;
Long endOffset = null;

if(startMsgId != null && !startMsgId.isBlank()) {
startOffset = sd.messageIdIndex.get(tx, startMsgId);
}

if(startOffset == null) {
startOffset = Long.valueOf(0l);
}

if(endMsgId != null && !endMsgId.isBlank()) {
endOffset = sd.messageIdIndex.get(tx, endMsgId);
if(endOffset != null) {
endOffset++;
}
}

if(endOffset == null) {
endOffset = startOffset + Long.valueOf(maxReturned);
}

if(endOffset < startOffset) {
// Fast fail on invalid arguments
return;
}

Entry<Long, MessageKeys> entry = null;
int position = 0;
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();

if(startOffset > position) {
position++;
continue;
}

if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
position++;
continue;
}

Message msg = loadMessage(entry.getValue().location);
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg);
counter++;
position++;
if (position >= endOffset || counter >= maxReturned || !listener.canRecoveryNextMessage()) {
break;
}
}
sd.orderIndex.stoppedIterating();

if(revertOrderIndex && position > 0) {
sd.orderIndex.resetCursorPosition();
}
}
});
} finally {
Expand Down
Loading