Ordered consumer performance #245
-
We have now migrated the legacy jetstream subscriptions to ordered consumers, and we are seeing a drastic drop in performance. We went from 8s to receive all messages on multiple stream subjects to 80s. Where would we start with debugging this? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
Here are the code snippets for comparison. Legacy implementation is 10 times faster, and I don't know how to explain it, or where to start looking. If we use multiple filter subjects, it's 2 times slower (but there is probably a different explanation for that). export const createJetstreamSubscription = (args: {
stream: StreamName;
subject: string;
deliverPolicy?: DeliverPolicy;
onMessage: (msg: JsMsg) => void;
onError?: (err: Error | NatsError) => void;
}) => {
let subscription: JetStreamSubscription | undefined;
const reconnect = async () => {
const conn = await createNatsConnection();
let opts = consumerOpts()
.filterSubject(args.subject)
.deliverTo(createInbox())
.ackNone()
.callback((err, msg) => {
if (err) {
throw err;
}
msg && args.onMessage(msg);
});
switch (args.deliverPolicy) {
case 'all': {
opts = opts.deliverAll();
break;
}
default: {
opts = opts.deliverLastPerSubject();
break;
}
}
subscription = await conn.jetstream().subscribe(args.subject, opts);
console.debug('Subscribed on Jetstream', {
stream: args.stream,
subject: args.subject,
consumer: subscription,
});
};
reconnect().catch((err) => {
console.error(err);
if (args.onError) {
args.onError(err);
}
setTimeout(async () => {
await reconnect();
}, RECONNECT_WAIT_IN_MS);
});
return {
stop: () => {
if (subscription) {
subscription.destroy().then(() => {
subscription = undefined;
});
console.debug('Unsubscribed on Jetstream', {
stream: args.stream,
subject: args.subject,
});
}
},
};
}; export const createJetstreamSubscription = (args: {
stream: StreamName;
subject: string;
deliverPolicy?: DeliverPolicy;
onMessage: (msg: JsMsg) => void;
onError?: (err: Error | NatsError) => void;
}) => {
let consumer: Consumer | undefined;
const reconnect = async () => {
const conn = await createNatsConnection();
consumer = await conn.jetstream().consumers.get(args.stream, {
filterSubjects: args.subject,
deliver_policy: args.deliverPolicy || DeliverPolicy.LastPerSubject,
});
console.debug('Subscribed on Jetstream', {
stream: args.stream,
subject: args.subject,
consumer,
});
return consumer.consume({
callback: args.onMessage,
});
};
reconnect().catch((err) => {
console.error(err);
if (args.onError) {
args.onError(err);
}
setTimeout(async () => {
await reconnect();
}, RECONNECT_WAIT_IN_MS);
});
return {
stop: () => {
if (consumer) {
consumer.delete().then(() => {
consumer = undefined;
});
console.debug('Unsubscribed on Jetstream', {
stream: args.stream,
subject: args.subject,
});
}
},
};
}; |
Beta Was this translation helpful? Give feedback.
-
Looks like I have found the answer by looking at the source code and experimenting with the configuration. It seems that the consumer wrapper for legacy implementation does not set |
Beta Was this translation helpful? Give feedback.
Looks like I have found the answer by looking at the source code and experimenting with the configuration. It seems that the consumer wrapper for legacy implementation does not set
max_messages
, while the customer API does. In our case, we expect around 150k messages, so with limits it would take a lot longer to load all messages sequentially. By increasing the limit to a much higher value, we are able to get most messages at once. Since connection speed is not an issue for an app hosted on a local network, this does the trick.