This repository has been archived by the owner on Mar 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathLoadGenerator.java
93 lines (77 loc) · 3.76 KB
/
LoadGenerator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.meltwater.rxrabbit.example;
import com.google.common.base.Charsets;
import com.meltwater.rxrabbit.BrokerAddresses;
import com.meltwater.rxrabbit.ChannelFactory;
import com.meltwater.rxrabbit.ConnectionSettings;
import com.meltwater.rxrabbit.DefaultPublisherFactory;
import com.meltwater.rxrabbit.DeliveryMode;
import com.meltwater.rxrabbit.Exchange;
import com.meltwater.rxrabbit.Payload;
import com.meltwater.rxrabbit.PublisherFactory;
import com.meltwater.rxrabbit.PublisherSettings;
import com.meltwater.rxrabbit.RabbitPublisher;
import com.meltwater.rxrabbit.RoutingKey;
import com.meltwater.rxrabbit.impl.DefaultChannelFactory;
import com.meltwater.rxrabbit.util.FibonacciBackoffAlgorithm;
import com.meltwater.rxrabbit.util.Logger;
import com.rabbitmq.client.AMQP;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static rx.Observable.from;
//TODO add publish listener!!
public class LoadGenerator {
private static final Logger log = new Logger(LoadGenerator.class);
public static void main(String[] args) throws IOException {
Properties prop = new Properties();
prop.load(LoadGenerator.class.getResourceAsStream("/load_generator.properties"));
prop.putAll(System.getProperties());
publishTestMessages(
new BrokerAddresses(prop.getProperty("rabbit.broker.uris")),
prop.getProperty("publish.to.exchange"),
Long.parseLong(prop.getProperty("publish.message.count")));
}
public static void publishTestMessages(BrokerAddresses addresses, String outputExchange, long nrToPublish) throws IOException {
ConnectionSettings connectionSettings = new ConnectionSettings();
connectionSettings.withHeartbeatSecs(5);
connectionSettings.withShutdownTimeoutMillis(10_000);
PublisherSettings publisherSettings = new PublisherSettings();
publisherSettings.withPublisherConfirms(true);
publisherSettings.withBackoffAlgorithm(new FibonacciBackoffAlgorithm());
publisherSettings.withRetryCount(10);
ChannelFactory channelFactory = new DefaultChannelFactory(addresses, connectionSettings);
PublisherFactory publisherFactory = new DefaultPublisherFactory(channelFactory, publisherSettings);
final RabbitPublisher publish = publisherFactory.createPublisher();
List<Long> ids = new ArrayList<>();
for (long i = 1; i<=nrToPublish; i++){
ids.add(i);
}
log.infoWithParams("Publishing messages to exchange.",
"numToPublish", nrToPublish,
"exchange", outputExchange);
from(ids)
.flatMap( id -> {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.messageId(String.valueOf(id));
builder.deliveryMode(DeliveryMode.persistent.code);
builder.appId("load-generator");
String msgPayload = "Message nr " + id;
return publish.call(
new Exchange(outputExchange),
new RoutingKey("routing.key"),
builder.build(),
new Payload(msgPayload.getBytes(Charsets.UTF_8)))
.toObservable();
})
.doOnError(throwable -> log.errorWithParams("Unexpected error when publishing.", throwable))
.timeout(30, TimeUnit.SECONDS)
.toBlocking()
.last();
log.infoWithParams("All messages sent to exchange.",
"numSent", nrToPublish,
"exchange", outputExchange);
publish.close();
}
}