Skip to content

Commit

Permalink
Added PreferredStorage Action
Browse files Browse the repository at this point in the history
  • Loading branch information
DaNussi committed Mar 27, 2024
1 parent ac48aa1 commit 59b037c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
Empty file added rabbitmq/conf/rabbitmq.conf
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ private boolean initRabbitmq() {
return true;
}


public String getChannel() {
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import net.nussi.dedicated_applied_energistics.providers.virtualdisk.actions.insert.InsertPair;
import net.nussi.dedicated_applied_energistics.providers.virtualdisk.actions.insert.InsertRequest;
import net.nussi.dedicated_applied_energistics.providers.virtualdisk.actions.insert.InsertResponse;
import net.nussi.dedicated_applied_energistics.providers.virtualdisk.actions.preferredstorage.PreferredStoragePair;
import net.nussi.dedicated_applied_energistics.providers.virtualdisk.actions.preferredstorage.PreferredStorageRequest;
import net.nussi.dedicated_applied_energistics.providers.virtualdisk.actions.preferredstorage.PreferredStorageResponse;
import org.slf4j.Logger;
import redis.clients.jedis.Jedis;

Expand All @@ -42,6 +45,7 @@ public class VirtualDiskHost {
private final Queue<ExtractPair> extractQueue = new LinkedList<>();
private final Queue<AvailableStacksPair> availableStacksQueue = new LinkedList<>();
private final Queue<DescriptionPair> descriptionQueue = new LinkedList<>();
private final Queue<PreferredStoragePair> preferredStorageQueue = new LinkedList<>();

public VirtualDiskHost(MEStorage storage, int priority, InterDimensionalInterfaceBlockEntity instance) {
this.storage = storage;
Expand Down Expand Up @@ -157,8 +161,28 @@ private void initRabbitMQ() {
rabbitmq.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.toBytes());
rabbitmq.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});
rabbitmq.basicConsume(channel + "/getDescription", false, descriptionCallback, (consumerTag -> {
}));
rabbitmq.basicConsume(channel + "/getDescription", false, descriptionCallback, (consumerTag -> {}));


DeliverCallback preferredStorageCallback = ((consumerTag, delivery) -> {
var response = new PreferredStorageResponse(delivery.getProperties().getCorrelationId(), false, false);
try {
var request = new PreferredStorageRequest(delivery.getBody());
// LOGGER.info("Incomming request " + request);
var pair = new PreferredStoragePair(request);
var callback = pair.getResponseFuture();
preferredStorageQueue.add(pair);
response = callback.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
// LOGGER.info("Outgoin response " + response);

AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();
rabbitmq.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.toBytes());
rabbitmq.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});
rabbitmq.basicConsume(channel + "/isPreferredStorageFor", false, preferredStorageCallback, (consumerTag -> {}));

} catch (Exception e) {
LOGGER.warn("Failed to declare rabbit mq queue for " + channel);
Expand Down Expand Up @@ -189,7 +213,7 @@ public void onTick() {
while (extractQueue.size() > 0) {
ExtractPair pair = extractQueue.poll();
ExtractRequest request = pair.getRequest();
long data = this.storage.insert(request.what, request.amount, request.mode, IActionSource.ofMachine(instance));
long data = this.storage.extract(request.what, request.amount, request.mode, IActionSource.ofMachine(instance));
ExtractResponse response = new ExtractResponse(request.getId(), true, data);
pair.getResponseFuture().complete(response);
}
Expand All @@ -210,6 +234,13 @@ public void onTick() {
pair.getResponseFuture().complete(response);
}

while (preferredStorageQueue.size() > 0) {
PreferredStoragePair pair = preferredStorageQueue.poll();
PreferredStorageRequest request = pair.getRequest();
var data = this.storage.isPreferredStorageFor(request.getWhat(), IActionSource.ofMachine(instance));
PreferredStorageResponse response = new PreferredStorageResponse(request.getId(), true, data);
pair.getResponseFuture().complete(response);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ public PreferredStorageRequest(AEKey data) {
this.data = data;
}

public PreferredStorageRequest(byte[] bytes, AEKey data) throws Exception {
public PreferredStorageRequest(byte[] bytes) throws Exception {
super(bytes);
this.data = data;
}

@Override
Expand Down

0 comments on commit 59b037c

Please sign in to comment.