Skip to content

Commit

Permalink
Merge pull request #2107 from solliancenet/cj-telemetry-spans
Browse files Browse the repository at this point in the history
Improve the use of OpenTelemetry spans
  • Loading branch information
codingbandit authored Dec 24, 2024
2 parents 2add5c6 + 045644d commit 99e778f
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/dotnet/CoreAPI/Controllers/CompletionsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task<IActionResult> GetCompletion(string instanceId, [FromBody] Com
{
using var telemetryActivity = TelemetryActivitySources.CoreAPIActivitySource.StartActivity(
TelemetryActivityNames.CoreAPI_Completions_GetCompletion,
ActivityKind.Consumer,
ActivityKind.Server,
parentContext: default,
tags: new Dictionary<string, object?>
{
Expand All @@ -102,7 +102,7 @@ public async Task<ActionResult<LongRunningOperation>> StartCompletionOperation(s
{
using var telemetryActivity = TelemetryActivitySources.CoreAPIActivitySource.StartActivity(
TelemetryActivityNames.CoreAPI_AsyncCompletions_StartCompletionOperation,
ActivityKind.Consumer,
ActivityKind.Server,
parentContext: default,
tags: new Dictionary<string, object?>
{
Expand Down
2 changes: 1 addition & 1 deletion src/python/AgentHubAPI/app/routers/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def refresh_cache(name: str):
The name of the cache object to refresh.
"config", for example.
"""
with tracer.start_span('refresh_cache') as span:
with tracer.start_as_current_span('refresh_cache') as span:
span.set_attribute('cache_name', name)
span.add_event(f'{API_NAME} {name} cache refresh requested.')

Expand Down
2 changes: 1 addition & 1 deletion src/python/DataSourceHubAPI/app/routers/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def refresh_cache(name: str):
The name of the cache object to refresh.
"config", for example.
"""
with tracer.start_span('refresh_cache') as span:
with tracer.start_as_current_span('refresh_cache') as span:
span.set_attribute('cache_name', name)
span.add_event(f'{API_NAME} {name} cache refresh requested.')

Expand Down
2 changes: 1 addition & 1 deletion src/python/GatekeeperIntegrationAPI/app/routers/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def analyze(request: AnalyzeRequest) -> AnalyzeResponse:
If the request includes anonymize=True, the original content
will be returned anonymized.
"""
#with tracer.start_span('analyze') as span:
#with tracer.start_as_current_span('analyze') as span:
try:
analyzer = Analyzer(request)
return analyzer.analyze()
Expand Down
2 changes: 1 addition & 1 deletion src/python/GatekeeperIntegrationAPI/app/routers/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def refresh_cache(name: str):
The name of the cache object to refresh.
"config", for example.
"""
#with tracer.start_span('refresh_cache') as span:
#with tracer.start_as_current_span('refresh_cache') as span:
# span.set_attribute('cache_name', name)
# span.add_event(f'{API_NAME} {name} cache refresh requested.')

Expand Down
10 changes: 5 additions & 5 deletions src/python/LangChainAPI/app/routers/completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def submit_completion_request(
CompletionOperation
Object containing the operation ID and status.
"""
with tracer.start_span('langchainapi_submit_completion_request', kind=SpanKind.CONSUMER) as span:
with tracer.start_as_current_span('langchainapi_submit_completion_request', kind=SpanKind.SERVER) as span:
try:
# Get the operation_id from the completion request.
operation_id = completion_request.operation_id
Expand Down Expand Up @@ -129,7 +129,7 @@ async def create_completion_response(
"""
Generates the completion response for the specified completion request.
"""
with tracer.start_span(f'langchainapi_create_completion_response', kind=SpanKind.CONSUMER) as span:
with tracer.start_as_current_span('langchainapi_create_completion_response', kind=SpanKind.SERVER) as span:
try:
span.set_attribute('operation_id', operation_id)
span.set_attribute('instance_id', instance_id)
Expand Down Expand Up @@ -212,7 +212,7 @@ async def get_operation_status(
instance_id: str,
operation_id: str
) -> LongRunningOperation:
with tracer.start_span(f'langchainapi_get_operation_status', kind=SpanKind.CONSUMER) as span:
with tracer.start_as_current_span('langchainapi_get_operation_status', kind=SpanKind.SERVER) as span:
# Create an operations manager to get the operation status.
operations_manager = OperationsManager(raw_request.app.extra['config'])

Expand Down Expand Up @@ -247,7 +247,7 @@ async def get_operation_result(
instance_id: str,
operation_id: str
) -> CompletionResponse:
with tracer.start_span(f'langchainapi_get_operation_result', kind=SpanKind.CONSUMER) as span:
with tracer.start_as_current_span('langchainapi_get_operation_result', kind=SpanKind.SERVER) as span:
# Create an operations manager to get the operation result.
operations_manager = OperationsManager(raw_request.app.extra['config'])

Expand Down Expand Up @@ -280,7 +280,7 @@ async def get_operation_logs(
instance_id: str,
operation_id: str
) -> List[LongRunningOperationLogEntry]:
with tracer.start_span(f'langchainapi_get_operation_log', kind=SpanKind.CONSUMER) as span:
with tracer.start_as_current_span('langchainapi_get_operation_log', kind=SpanKind.SERVER) as span:
# Create an operations manager to get the operation log.
operations_manager = OperationsManager(raw_request.app.extra['config'])

Expand Down
4 changes: 3 additions & 1 deletion src/python/LangChainAPI/app/routers/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"""
import time
from fastapi import APIRouter, Depends, HTTPException
from opentelemetry.trace import SpanKind

from foundationallm.telemetry import Telemetry
from app.dependencies import (
API_NAME,
Expand Down Expand Up @@ -33,7 +35,7 @@ async def refresh_cache(instance_id: str, name: str):
The name of the cache object to refresh.
"config", for example.
"""
with tracer.start_span('refresh_cache') as span:
with tracer.start_as_current_span('refresh_cache', kind=SpanKind.SERVER) as span:
span.set_attribute('instance_id', instance_id)
span.set_attribute('cache_name', name)
span.add_event(f'{API_NAME} {name} cache refresh requested.')
Expand Down
2 changes: 1 addition & 1 deletion src/python/PromptHubAPI/app/routers/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def refresh_cache(name: str):
The name of the cache object to refresh.
"config", for example.
"""
with tracer.start_span('refresh_cache') as span:
with tracer.start_as_current_span('refresh_cache') as span:
span.set_attribute('cache_name', name)
span.add_event(f'{API_NAME} {name} cache refresh requested.')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ async def invoke_async(self, request: KnowledgeManagementCompletionRequest) -> C
else:
messages = []

with self.tracer.start_span(f'langchain_invoke_external_workflow', kind=SpanKind.CONSUMER) as span:
with self.tracer.start_as_current_span('langchain_invoke_external_workflow', kind=SpanKind.SERVER) as span:
response = await workflow.invoke_async(
operation_id=request.operation_id,
user_prompt=parsed_user_prompt,
Expand Down

0 comments on commit 99e778f

Please sign in to comment.