Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TracerDump in TracerFlare #8053

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ excludedClassesCoverage += [
'datadog.trace.common.writer.RemoteMapper.NoopRemoteMapper',
'datadog.trace.core.monitor.DDAgentStatsDConnection',
'datadog.trace.core.monitor.LoggingStatsDClient',
'datadog.trace.core.PendingTraceBuffer.DelayingPendingTraceBuffer.FlushElement',
'datadog.trace.core.PendingTraceBuffer.DelayingPendingTraceBuffer.CommandElement',
'datadog.trace.core.StatusLogger',
// covered with CI Visibility smoke tests
'datadog.trace.core.StreamingTraceCollector',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package datadog.trace.common.writer;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.trace.core.DDSpan;
import java.util.List;

public class TraceDumpJsonExporter {

private StringBuilder dumpText;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Code Quality Violation

StringBuilder can lead to memory leaks in long lasting classes (...read more)

StringBuffers and StringBuilders have the potential to grow significantly, which could lead to memory leaks if they are retained within objects with extended lifetimes.

View in Datadog  Leave us feedback  Documentation

private static final JsonAdapter<List<DDSpan>> TRACE_ADAPTER =
new Moshi.Builder()
.add(DDSpanJsonAdapter.buildFactory(false))
.build()
.adapter(Types.newParameterizedType(List.class, DDSpan.class));

public TraceDumpJsonExporter() {
dumpText = new StringBuilder();
}

public void addTrace(final List<DDSpan> trace) {
dumpText.append(TRACE_ADAPTER.toJson(trace));
dumpText.append("\n");
}

public String getDumpText() {
return dumpText.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,4 +448,8 @@ public static long getDurationNano(CoreSpan<?> span) {
PendingTrace trace = (PendingTrace) traceCollector;
return trace.getLastWriteTime() - span.getStartTime();
}

Iterable<DDSpan> getSpans() {
return spans;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.Comparator.comparingLong;

import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.api.time.TimeSource;
import datadog.trace.common.writer.TraceDumpJsonExporter;
import datadog.trace.core.monitor.HealthMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.zip.ZipOutputStream;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -47,13 +57,16 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500);
private static final long SLEEP_TIME_MS = 100;
private static final CommandElement FLUSH_ELEMENT = new CommandElement();
private static final CommandElement DUMP_ELEMENT = new CommandElement();

private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
private final TimeSource timeSource;

private volatile boolean closed = false;
private final AtomicInteger flushCounter = new AtomicInteger(0);
private final AtomicInteger dumpCounter = new AtomicInteger(0);

private final LongRunningTracesTracker runningTracesTracker;

Expand All @@ -78,6 +91,7 @@ public void enqueue(Element pendingTrace) {

@Override
public void start() {
TracerFlare.addReporter(new TracerDump(this));
worker.start();
}

Expand Down Expand Up @@ -108,10 +122,10 @@ public void flush() {
if (worker.isAlive()) {
int count = flushCounter.get();
int loop = 1;
boolean signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
boolean signaled = queue.offer(FLUSH_ELEMENT);
while (!closed && !signaled) {
yieldOrSleep(loop++);
signaled = queue.offer(FlushElement.FLUSH_ELEMENT);
signaled = queue.offer(FLUSH_ELEMENT);
}
int newCount = flushCounter.get();
while (!closed && count >= newCount) {
Expand All @@ -130,9 +144,28 @@ public void accept(Element pendingTrace) {
}
}

private static final class FlushElement implements Element {
static FlushElement FLUSH_ELEMENT = new FlushElement();
private static final class DumpDrain
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
private static final List<Element> DATA = new ArrayList<>();
private int index = 0;

@Override
public void accept(Element pendingTrace) {
DATA.add(pendingTrace);
}

@Override
public Element get() {
if (index < DATA.size()) {
return DATA.get(index++);
}
return null; // Should never reach here or else queue may break according to
// MessagePassingQueue docs
}
}

private static final class CommandElement implements Element {
@Override
public long oldestFinishedTime() {
return 0;
Expand Down Expand Up @@ -180,13 +213,20 @@ public void run() {
pendingTrace = queue.take(); // block until available;
}

if (pendingTrace instanceof FlushElement) {
if (pendingTrace == FLUSH_ELEMENT) {
// Since this is an MPSC queue, the drain needs to be called on the consumer thread
queue.drain(WriteDrain.WRITE_DRAIN);
flushCounter.incrementAndGet();
continue;
}

if (pendingTrace == DUMP_ELEMENT) {
queue.drain(DumpDrain.DUMP_DRAIN, 50);
queue.fill(DumpDrain.DUMP_DRAIN, DumpDrain.DATA.size());
dumpCounter.incrementAndGet();
continue;
}

// The element is no longer in the queue
pendingTrace.setEnqueued(false);

Expand All @@ -208,7 +248,7 @@ public void run() {
// Trace has been unmodified long enough, go ahead and write whatever is finished.
pendingTrace.write();
} else {
// Trace is too new. Requeue it and sleep to avoid a hot loop.
// Trace is too new. Requeue it and sleep to avoid a hot loop.
enqueue(pendingTrace);
Thread.sleep(SLEEP_TIME_MS);
}
Expand Down Expand Up @@ -277,4 +317,52 @@ public static PendingTraceBuffer discarding() {
public abstract void flush();

public abstract void enqueue(Element pendingTrace);

private static class TracerDump implements TracerFlare.Reporter {
private static final Comparator<Element> TRACE_BY_START_TIME =
comparingLong(trace -> trace.getRootSpan().getStartTime());
private static final Predicate<Element> NOT_PENDING_TRACE =
element -> !(element instanceof PendingTrace);
private final DelayingPendingTraceBuffer buffer;

private TracerDump(DelayingPendingTraceBuffer buffer) {
this.buffer = buffer;
}

@Override
public void prepareForFlare() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
buffer.yieldOrSleep(loop++);
newCount = buffer.dumpCounter.get();
}
}
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
DelayingPendingTraceBuffer.DumpDrain.DATA.removeIf(NOT_PENDING_TRACE);
// Storing oldest traces first
DelayingPendingTraceBuffer.DumpDrain.DATA.sort((TRACE_BY_START_TIME).reversed());

TraceDumpJsonExporter writer = new TraceDumpJsonExporter();
for (Element e : DelayingPendingTraceBuffer.DumpDrain.DATA) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.addTrace(new ArrayList<>((ConcurrentLinkedDeque<DDSpan>) trace.getSpans()));
}
}
// Releasing memory used for ArrayList in drain
DelayingPendingTraceBuffer.DumpDrain.DATA.clear();
TracerFlare.addText(zip, "trace_dump.txt", writer.getDumpText());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class LongRunningTracesTrackerTest extends DDSpecification {

PendingTrace newTraceToTrack() {
PendingTrace trace = factory.create(DDTraceId.ONE)
PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP)
PendingTraceBufferTest::newSpanOf(trace, PrioritySampling.SAMPLER_KEEP, 0)
return trace
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import datadog.communication.monitor.Monitoring
import datadog.trace.SamplingPriorityMetadataChecker
import datadog.trace.api.DDSpanId
import datadog.trace.api.DDTraceId
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.api.flare.TracerFlare
import datadog.trace.api.time.SystemTimeSource
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.NoopPathwayContext
import datadog.trace.bootstrap.instrumentation.api.ScopeSource
Expand All @@ -20,8 +20,13 @@ import spock.util.concurrent.PollingConditions
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.ZipInputStream
import java.util.zip.ZipOutputStream

import static datadog.trace.api.sampling.PrioritySampling.UNSET
import static datadog.trace.api.sampling.PrioritySampling.USER_KEEP
import static datadog.trace.core.PendingTraceBuffer.BUFFER_SIZE
import static java.nio.charset.StandardCharsets.UTF_8

@Timeout(5)
class PendingTraceBufferTest extends DDSpecification {
Expand Down Expand Up @@ -143,7 +148,7 @@ class PendingTraceBufferTest extends DDSpecification {

def "priority sampling is always sent"() {
setup:
def parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), PrioritySampling.USER_KEEP))
def parent = addContinuation(newSpanOf(factory.create(DDTraceId.ONE), USER_KEEP, 0))
def metadataChecker = new SamplingPriorityMetadataChecker()

when: "Fill the buffer - Only children - Priority taken from root"
Expand Down Expand Up @@ -443,6 +448,36 @@ class PendingTraceBufferTest extends DDSpecification {
}
}

def "testing tracer flare dump"() {
setup:
TracerFlare.addReporter {} // exercises default methods
def dumpReporter = Mock(PendingTraceBuffer.TracerDump)
TracerFlare.addReporter(dumpReporter)
def trace = factory.create(DDTraceId.ONE)
def parent = newSpanOf(trace, UNSET, System.currentTimeMillis() * 1000)
def child = newSpanOf(parent)

when:
parent.finish()
buffer.start()
def entries = buildAndExtractZip()

then:
1 * dumpReporter.prepareForFlare()
1 * dumpReporter.addReportToFlare(_)
1 * dumpReporter.cleanupAfterFlare()
entries.size() == 1
(entries["trace_dump.txt"] as String).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific

then:
child.finish()

then:
trace.size() == 0
trace.pendingReferenceCount == 0
}


def addContinuation(DDSpan span) {
def scope = scopeManager.activate(span, ScopeSource.INSTRUMENTATION, true)
continuations << scope.capture()
Expand All @@ -451,10 +486,10 @@ class PendingTraceBufferTest extends DDSpecification {
}

static DDSpan newSpanOf(PendingTrace trace) {
return newSpanOf(trace, PrioritySampling.UNSET)
return newSpanOf(trace, UNSET, 0)
}

static DDSpan newSpanOf(PendingTrace trace, int samplingPriority) {
static DDSpan newSpanOf(PendingTrace trace, int samplingPriority, long timestampMicro) {
def context = new DDSpanContext(
trace.traceId,
1,
Expand All @@ -475,7 +510,7 @@ class PendingTraceBufferTest extends DDSpecification {
NoopPathwayContext.INSTANCE,
false,
PropagationTags.factory().empty())
return DDSpan.create("test", 0, context, null)
return DDSpan.create("test", timestampMicro, context, null)
}

static DDSpan newSpanOf(DDSpan parent) {
Expand All @@ -488,7 +523,7 @@ class PendingTraceBufferTest extends DDSpecification {
"fakeService",
"fakeOperation",
"fakeResource",
PrioritySampling.UNSET,
UNSET,
null,
Collections.emptyMap(),
false,
Expand All @@ -502,4 +537,27 @@ class PendingTraceBufferTest extends DDSpecification {
PropagationTags.factory().empty())
return DDSpan.create("test", 0, context, null)
}

def buildAndExtractZip() {
TracerFlare.prepareForFlare()
def out = new ByteArrayOutputStream()
try (ZipOutputStream zip = new ZipOutputStream(out)) {
TracerFlare.addReportsToFlare(zip)
} finally {
TracerFlare.cleanupAfterFlare()
}

def entries = [:]

def zip = new ZipInputStream(new ByteArrayInputStream(out.toByteArray()))
def entry
while (entry = zip.nextEntry) {
def bytes = new ByteArrayOutputStream()
bytes << zip
entries.put(entry.name, entry.name.endsWith(".bin")
? bytes.toByteArray() : new String(bytes.toByteArray(), UTF_8))
}

return entries
}
}
Loading