Skip to content

Commit

Permalink
feat(core): Refactor action elements
Browse files Browse the repository at this point in the history
  • Loading branch information
PerfectSlayer committed Jan 21, 2025
1 parent 0ff9b1e commit 5119ac7
Showing 1 changed file with 9 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500);
private static final long SLEEP_TIME_MS = 100;
private static final CommandElement FLUSH_ELEMENT = new CommandElement();
private static final CommandElement DUMP_ELEMENT = new CommandElement();

private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
Expand Down Expand Up @@ -118,10 +120,10 @@ public void flush() {
if (worker.isAlive()) {
int count = flushCounter.get();
int loop = 1;
boolean signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
boolean signaled = queue.offer(FLUSH_ELEMENT);
while (!closed && !signaled) {
yieldOrSleep(loop++);
signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
signaled = queue.offer(FLUSH_ELEMENT);
}
int newCount = flushCounter.get();
while (!closed && count >= newCount) {
Expand Down Expand Up @@ -161,41 +163,7 @@ public Element get() {
}
}

private static final class FlushElement implements Element {
static FlushElement FLUSH_ELEMENT = new FlushElement();

@Override
public long oldestFinishedTime() {
return 0;
}

@Override
public boolean lastReferencedNanosAgo(long nanos) {
return false;
}

@Override
public void write() {}

@Override
public DDSpan getRootSpan() {
return null;
}

@Override
public boolean setEnqueued(boolean enqueued) {
return true;
}

@Override
public boolean writeOnBufferFull() {
return true;
}
}

private static final class DumpElement implements Element {
static DumpElement DUMP_ELEMENT = new DumpElement();

private static final class CommandElement implements Element {
@Override
public long oldestFinishedTime() {
return 0;
Expand Down Expand Up @@ -243,14 +211,14 @@ public void run() {
pendingTrace = queue.take(); // block until available;
}

if (pendingTrace instanceof FlushElement) {
if (pendingTrace == FLUSH_ELEMENT) {
// Since this is an MPSC queue, the drain needs to be called on the consumer thread
queue.drain(WriteDrain.WRITE_DRAIN);
flushCounter.incrementAndGet();
continue;
}

if (pendingTrace instanceof DumpElement) {
if (pendingTrace == DUMP_ELEMENT) {
queue.drain(DumpDrain.DUMP_DRAIN);
queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.DATA.size());
dumpCounter.incrementAndGet();
Expand Down Expand Up @@ -364,10 +332,10 @@ public void prepareForFlare() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT);
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
Expand Down

0 comments on commit 5119ac7

Please sign in to comment.