From 5119ac78d5fe4bc3a02a16c4c4906af09984e94c Mon Sep 17 00:00:00 2001 From: Bruce Bujon Date: Tue, 21 Jan 2025 08:51:19 +0100 Subject: [PATCH] feat(core): Refactor action elements --- .../trace/core/PendingTraceBuffer.java | 50 ++++--------------- 1 file changed, 9 insertions(+), 41 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 219352bf6df..00c76cb08f0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -55,6 +55,8 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5); private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500); private static final long SLEEP_TIME_MS = 100; + private static final CommandElement FLUSH_ELEMENT = new CommandElement(); + private static final CommandElement DUMP_ELEMENT = new CommandElement(); private final MpscBlockingConsumerArrayQueue queue; private final Thread worker; @@ -118,10 +120,10 @@ public void flush() { if (worker.isAlive()) { int count = flushCounter.get(); int loop = 1; - boolean signaled = queue.offer(FlushElement.FLUSH_ELEMENT); + boolean signaled = queue.offer(FLUSH_ELEMENT); while (!closed && !signaled) { yieldOrSleep(loop++); - signaled = queue.offer(FlushElement.FLUSH_ELEMENT); + signaled = queue.offer(FLUSH_ELEMENT); } int newCount = flushCounter.get(); while (!closed && count >= newCount) { @@ -161,41 +163,7 @@ public Element get() { } } - private static final class FlushElement implements Element { - static FlushElement FLUSH_ELEMENT = new FlushElement(); - - @Override - public long oldestFinishedTime() { - return 0; - } - - @Override - public boolean lastReferencedNanosAgo(long nanos) { - return false; - } - - @Override - public void write() {} - - @Override - public DDSpan getRootSpan() { - return null; - } - - @Override - public boolean setEnqueued(boolean enqueued) { - return true; - } - - @Override - public boolean writeOnBufferFull() { - return true; - } - } - - private static final class DumpElement implements Element { - static DumpElement DUMP_ELEMENT = new DumpElement(); - + private static final class CommandElement implements Element { @Override public long oldestFinishedTime() { return 0; @@ -243,14 +211,14 @@ public void run() { pendingTrace = queue.take(); // block until available; } - if (pendingTrace instanceof FlushElement) { + if (pendingTrace == FLUSH_ELEMENT) { // Since this is an MPSC queue, the drain needs to be called on the consumer thread queue.drain(WriteDrain.WRITE_DRAIN); flushCounter.incrementAndGet(); continue; } - if (pendingTrace instanceof DumpElement) { + if (pendingTrace == DUMP_ELEMENT) { queue.drain(DumpDrain.DUMP_DRAIN); queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.DATA.size()); dumpCounter.incrementAndGet(); @@ -364,10 +332,10 @@ public void prepareForFlare() { if (buffer.worker.isAlive()) { int count = buffer.dumpCounter.get(); int loop = 1; - boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT); + boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); while (!buffer.closed && !signaled) { buffer.yieldOrSleep(loop++); - signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DumpElement.DUMP_ELEMENT); + signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); } int newCount = buffer.dumpCounter.get(); while (!buffer.closed && count >= newCount) {