diff --git a/apollo-federation/src/sources/connect/models.rs b/apollo-federation/src/sources/connect/models.rs index 9c99f2a443..a6161ecd6e 100644 --- a/apollo-federation/src/sources/connect/models.rs +++ b/apollo-federation/src/sources/connect/models.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::Display; use std::fmt::Formatter; +use std::str::FromStr; use std::sync::Arc; use apollo_compiler::ast; @@ -344,6 +345,21 @@ impl HTTPMethod { } } +impl FromStr for HTTPMethod { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_uppercase().as_str() { + "GET" => Ok(HTTPMethod::Get), + "POST" => Ok(HTTPMethod::Post), + "PATCH" => Ok(HTTPMethod::Patch), + "PUT" => Ok(HTTPMethod::Put), + "DELETE" => Ok(HTTPMethod::Delete), + _ => Err(format!("Invalid HTTP method: {s}")), + } + } +} + #[derive(Clone, Debug)] pub enum HeaderSource { From(HeaderName), diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 9abd521663..ac302d97c3 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -1544,6 +1544,8 @@ expression: "&schema" "type": "boolean" }, "default": { + "connect": true, + "connect_request": true, "execution": true, "http_request": true, "parse_query": true, @@ -1554,7 +1556,7 @@ expression: "&schema" "subgraph_request": true, "supergraph": true }, - "description": "Which spans will be eligible for span stats to be collected for viewing in the APM view. Defaults to true for `request`, `router`, `query_parsing`, `supergraph`, `execution`, `query_planning`, `subgraph`, `subgraph_request` and `http_request`.", + "description": "Which spans will be eligible for span stats to be collected for viewing in the APM view. Defaults to true for `request`, `router`, `query_parsing`, `supergraph`, `execution`, `query_planning`, `subgraph`, `subgraph_request`, `connect`, `connect_request` and `http_request`.", "type": "object" } }, @@ -1997,6 +1999,32 @@ expression: "&schema" "error" ], "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "connector_request_mapping_problems": { + "$ref": "#/definitions/MappingProblems", + "description": "#/definitions/MappingProblems" + } + }, + "required": [ + "connector_request_mapping_problems" + ], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "connector_response_mapping_problems": { + "$ref": "#/definitions/MappingProblems", + "description": "#/definitions/MappingProblems" + } + }, + "required": [ + "connector_response_mapping_problems" + ], + "type": "object" } ] }, @@ -3975,6 +4003,13 @@ expression: "&schema" }, "type": "object" }, + "MappingProblems": { + "enum": [ + "problems", + "count" + ], + "type": "string" + }, "MetricAggregation": { "oneOf": [ { diff --git a/apollo-router/src/plugin/mod.rs b/apollo-router/src/plugin/mod.rs index 48bf71db3f..7f8e0c368b 100644 --- a/apollo-router/src/plugin/mod.rs +++ b/apollo-router/src/plugin/mod.rs @@ -625,6 +625,14 @@ pub(crate) trait PluginPrivate: Send + Sync + 'static { service } + /// This service handles individual requests to Apollo Connectors + fn connector_request_service( + &self, + service: crate::services::connector::request_service::BoxService, + ) -> crate::services::connector::request_service::BoxService { + service + } + /// Return the name of the plugin. fn name(&self) -> &'static str where @@ -735,6 +743,12 @@ pub(crate) trait DynPlugin: Send + Sync + 'static { service: crate::services::http::BoxService, ) -> crate::services::http::BoxService; + /// This service handles individual requests to Apollo Connectors + fn connector_request_service( + &self, + service: crate::services::connector::request_service::BoxService, + ) -> crate::services::connector::request_service::BoxService; + /// Return the name of the plugin. fn name(&self) -> &'static str; @@ -783,6 +797,13 @@ where self.http_client_service(name, service) } + fn connector_request_service( + &self, + service: crate::services::connector::request_service::BoxService, + ) -> crate::services::connector::request_service::BoxService { + self.connector_request_service(service) + } + fn name(&self) -> &'static str { self.name() } diff --git a/apollo-router/src/plugins/authentication/connector.rs b/apollo-router/src/plugins/authentication/connector.rs index 16c18c6584..c3d7d96af4 100644 --- a/apollo-router/src/plugins/authentication/connector.rs +++ b/apollo-router/src/plugins/authentication/connector.rs @@ -5,41 +5,32 @@ use tower::ServiceBuilder; use tower::ServiceExt; use crate::plugins::authentication::subgraph::SigningParamsConfig; -use crate::services::connector_service::ConnectorInfo; +use crate::services::connector; use crate::services::connector_service::ConnectorSourceRef; -use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY; -use crate::services::http::HttpRequest; pub(super) struct ConnectorAuth { pub(super) signing_params: Arc>>, } impl ConnectorAuth { - pub(super) fn http_client_service( + pub(super) fn connector_request_service( &self, - subgraph_name: &str, - service: crate::services::http::BoxService, - ) -> crate::services::http::BoxService { + service: connector::request_service::BoxService, + ) -> connector::request_service::BoxService { let signing_params = self.signing_params.clone(); - let subgraph_name = subgraph_name.to_string(); ServiceBuilder::new() - .map_request(move |req: HttpRequest| { - if let Ok(Some(connector_info)) = req - .context - .get::<&str, ConnectorInfo>(CONNECTOR_INFO_CONTEXT_KEY) - { - if let Some(source_name) = connector_info.source_name { - if let Some(signing_params) = signing_params - .get(&ConnectorSourceRef::new( - subgraph_name.clone(), - source_name.clone(), - )) - .cloned() - { - req.context - .extensions() - .with_lock(|mut lock| lock.insert(signing_params)); - } + .map_request(move |req: connector::request_service::Request| { + if let Some(ref source_name) = req.connector.id.source_name { + if let Some(signing_params) = signing_params + .get(&ConnectorSourceRef::new( + req.connector.id.subgraph_name.clone(), + source_name.clone(), + )) + .cloned() + { + req.context + .extensions() + .with_lock(|mut lock| lock.insert(signing_params)); } } req diff --git a/apollo-router/src/plugins/authentication/mod.rs b/apollo-router/src/plugins/authentication/mod.rs index 2755c7aa71..39af4bbb73 100644 --- a/apollo-router/src/plugins/authentication/mod.rs +++ b/apollo-router/src/plugins/authentication/mod.rs @@ -564,13 +564,12 @@ impl PluginPrivate for AuthenticationPlugin { } } - fn http_client_service( + fn connector_request_service( &self, - subgraph_name: &str, - service: crate::services::http::BoxService, - ) -> crate::services::http::BoxService { + service: crate::services::connector::request_service::BoxService, + ) -> crate::services::connector::request_service::BoxService { if let Some(auth) = &self.connector { - auth.http_client_service(subgraph_name, service) + auth.connector_request_service(service) } else { service } diff --git a/apollo-router/src/plugins/connectors/error.rs b/apollo-router/src/plugins/connectors/error.rs deleted file mode 100644 index 65262d7088..0000000000 --- a/apollo-router/src/plugins/connectors/error.rs +++ /dev/null @@ -1,58 +0,0 @@ -//! Connectors error types. - -use apollo_federation::sources::connect::Connector; -use tower::BoxError; - -use crate::graphql; -use crate::graphql::ErrorExtension; -use crate::json_ext::Path; - -/// Errors that apply to all connector types. These errors represent a problem invoking the -/// connector, as opposed to an error returned from the connector itself. -#[derive(Debug, thiserror::Error, displaydoc::Display)] -pub(crate) enum Error { - /// Request limit exceeded - RequestLimitExceeded, - - /// {0} - HTTPClientError(#[from] BoxError), -} - -impl Error { - /// Create a GraphQL error from this error. - #[must_use] - pub(crate) fn to_graphql_error( - &self, - connector: &Connector, - path: Option, - ) -> crate::error::Error { - use serde_json_bytes::*; - - let builder = graphql::Error::builder() - .message(self.to_string()) - .extension_code(self.extension_code()) - .extension("service", connector.id.subgraph_name.clone()) - .extension( - "connector", - Value::Object(Map::from_iter([( - "coordinate".into(), - Value::String(connector.id.coordinate().into()), - )])), - ); - if let Some(path) = path { - builder.path(path).build() - } else { - builder.build() - } - } -} - -impl ErrorExtension for Error { - fn extension_code(&self) -> String { - match self { - Self::RequestLimitExceeded => "REQUEST_LIMIT_EXCEEDED", - Self::HTTPClientError(_) => "HTTP_CLIENT_ERROR", - } - .to_string() - } -} diff --git a/apollo-router/src/plugins/connectors/handle_responses.rs b/apollo-router/src/plugins/connectors/handle_responses.rs index 9d41360d7e..caaa817353 100644 --- a/apollo-router/src/plugins/connectors/handle_responses.rs +++ b/apollo-router/src/plugins/connectors/handle_responses.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use apollo_federation::sources::connect::Connector; -use http_body::Body as HttpBody; +use axum::body::HttpBody; +use opentelemetry::KeyValue; use parking_lot::Mutex; use serde_json_bytes::ByteString; use serde_json_bytes::Value; @@ -9,16 +10,26 @@ use tracing::Span; use crate::graphql; use crate::json_ext::Path; -use crate::plugins::connectors::http::Response as ConnectorResponse; -use crate::plugins::connectors::http::Result as ConnectorResult; use crate::plugins::connectors::make_requests::ResponseKey; +use crate::plugins::connectors::mapping::aggregate_apply_to_errors; +use crate::plugins::connectors::mapping::Problem; use crate::plugins::connectors::plugin::debug::ConnectorContext; use crate::plugins::connectors::plugin::debug::ConnectorDebugHttpRequest; use crate::plugins::connectors::plugin::debug::SelectionData; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS; +use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION; +use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse; +use crate::plugins::telemetry::config_new::events::log_event; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK; use crate::services::connect::Response; +use crate::services::connector; +use crate::services::connector::request_service::transport::http::HttpResponse; +use crate::services::connector::request_service::Error; +use crate::services::connector::request_service::TransportResponse; use crate::services::fetch::AddSubgraphNameExt; use crate::services::router; use crate::Context; @@ -56,17 +67,17 @@ enum RawResponse { } impl RawResponse { - /// Returns a `MappedResponse` with the response data transformed by the - /// selection mapping. + /// Returns a response with data transformed by the selection mapping. /// /// As a side effect, this will also write to the debug context. fn map_response( self, - connector: &Connector, + result: Result, + connector: Arc, context: &Context, debug_context: &Option>>, - ) -> MappedResponse { - match self { + ) -> connector::request_service::Response { + let mapped_response = match self { RawResponse::Error { error, key } => MappedResponse::Error { error, key }, RawResponse::Data { data, @@ -83,6 +94,8 @@ impl RawResponse { let (res, apply_to_errors) = key.selection().apply_with_vars(&data, &inputs); + let mapping_problems = aggregate_apply_to_errors(&apply_to_errors); + if let Some(ref debug) = debug_context { debug.lock().push_response( debug_request.clone(), @@ -92,7 +105,7 @@ impl RawResponse { source: connector.selection.to_string(), transformed: key.selection().to_string(), result: res.clone(), - errors: apply_to_errors, + errors: mapping_problems.clone(), }), ); } @@ -100,8 +113,16 @@ impl RawResponse { MappedResponse::Data { key, data: res.unwrap_or_else(|| Value::Null), + problems: mapping_problems, } } + }; + + connector::request_service::Response { + context: context.clone(), + connector: connector.clone(), + transport_result: result, + mapped_response, } } @@ -113,13 +134,14 @@ impl RawResponse { // error with the status code. fn map_error( self, - connector: &Connector, - _context: &Context, + result: Result, + connector: Arc, + context: &Context, debug_context: &Option>>, - ) -> MappedResponse { + ) -> connector::request_service::Response { use serde_json_bytes::*; - match self { + let mapped_response = match self { RawResponse::Error { error, key } => MappedResponse::Error { error, key }, RawResponse::Data { key, @@ -157,12 +179,19 @@ impl RawResponse { MappedResponse::Error { error, key } } + }; + + connector::request_service::Response { + context: context.clone(), + connector: connector.clone(), + transport_result: result, + mapped_response, } } } // --- MAPPED RESPONSE --------------------------------------------------------- - +#[derive(Debug)] pub(crate) enum MappedResponse { /// This is equivalent to RawResponse::Error, but it also represents errors /// when the request is semantically unsuccessful (e.g. 404, 500). @@ -170,8 +199,12 @@ pub(crate) enum MappedResponse { error: graphql::Error, key: ResponseKey, }, - /// The is the response data after applying the selection mapping. - Data { data: Value, key: ResponseKey }, + /// The response data after applying the selection mapping. + Data { + data: Value, + key: ResponseKey, + problems: Vec, + }, } impl MappedResponse { @@ -258,31 +291,39 @@ impl MappedResponse { // --- handle_responses -------------------------------------------------------- pub(crate) async fn process_response( - response: ConnectorResponse, - connector: &Connector, + result: Result, Error>, + response_key: ResponseKey, + connector: Arc, context: &Context, + debug_request: Option, debug_context: &Option>>, -) -> MappedResponse { - let response_key = response.key; - let debug_request = response.debug_request; - - let raw = match response.result { +) -> connector::request_service::Response { + match result { // This occurs when we short-circuit the request when over the limit - ConnectorResult::Err(error) => RawResponse::Error { - error: error.to_graphql_error(connector, Some((&response_key).into())), - key: response_key, - }, - ConnectorResult::HttpResponse(response) => { + Err(error) => { + let raw = RawResponse::Error { + error: error.to_graphql_error(connector.clone(), Some((&response_key).into())), + key: response_key, + }; + Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); + raw.map_error(Err(error), connector, context, debug_context) + } + Ok(response) => { let (parts, body) = response.into_parts(); + let result = Ok(TransportResponse::Http(HttpResponse { + inner: parts.clone(), + })); + // If this errors, it will write to the debug context because it // has access to the raw bytes, so we can't write to it again // in any RawResponse::Error branches. - match deserialize_response( + let raw = match deserialize_response( body, &parts, - connector, - (&response_key).into(), + connector.clone(), + context, + &response_key, debug_context, &debug_request, ) @@ -298,19 +339,19 @@ pub(crate) async fn process_response( error, key: response_key, }, + }; + let is_success = match &raw { + RawResponse::Error { .. } => false, + RawResponse::Data { parts, .. } => parts.status.is_success(), + }; + if is_success { + Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK); + raw.map_response(result, connector, context, debug_context) + } else { + Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); + raw.map_error(result, connector, context, debug_context) } } - }; - - let is_success = match &raw { - RawResponse::Error { .. } => false, - RawResponse::Data { parts, .. } => parts.status.is_success(), - }; - - if is_success { - raw.map_response(connector, context, debug_context) - } else { - raw.map_error(connector, context, debug_context) } } @@ -358,8 +399,9 @@ pub(crate) fn aggregate_responses( async fn deserialize_response( body: T, parts: &http::response::Parts, - connector: &Connector, - path: Path, + connector: Arc, + context: &Context, + response_key: &ResponseKey, debug_context: &Option>>, debug_request: &Option, ) -> Result { @@ -389,9 +431,95 @@ async fn deserialize_response( .add_subgraph_name(&connector.id.subgraph_name) // for include_subgraph_errors }; + let path: Path = response_key.into(); let body = &router::body::into_bytes(body) .await .map_err(|_| make_err(path.clone()))?; + + let log_response_level = context + .extensions() + .with_lock(|lock| lock.get::().cloned()) + .and_then(|event| match event.0.condition() { + Some(condition) => { + // Create a temporary response here so we can evaluate the condition. This response + // is missing any information about the mapped response, because we don't have that + // yet. This means that we cannot correctly evaluate any condition that relies on + // the mapped response data or mapping problems. But we can't wait until we do have + // that information, because this is the only place we have the body bytes (without + // making an expensive clone of the body). So we either need to not expose any + // selector which can be used as a condition that requires mapping information, or + // we must document that such selectors cannot be used as conditions on standard + // connectors events. + + let response = connector::request_service::Response { + context: context.clone(), + connector: connector.clone(), + transport_result: Ok(TransportResponse::Http(HttpResponse { + inner: parts.clone(), + })), + mapped_response: MappedResponse::Data { + data: Value::Null, + key: response_key.clone(), + problems: vec![], + }, + }; + if condition.lock().evaluate_response(&response) { + Some(event.0.level()) + } else { + None + } + } + None => Some(event.0.level()), + }); + + if let Some(level) = log_response_level { + let mut attrs = Vec::with_capacity(4); + #[cfg(test)] + let headers = { + let mut headers: indexmap::IndexMap = parts + .headers + .clone() + .into_iter() + .filter_map(|(name, val)| Some((name?.to_string(), val))) + .collect(); + headers.sort_keys(); + headers + }; + #[cfg(not(test))] + let headers = &parts.headers; + + attrs.push(KeyValue::new( + HTTP_RESPONSE_HEADERS, + opentelemetry::Value::String(format!("{:?}", headers).into()), + )); + attrs.push(KeyValue::new( + HTTP_RESPONSE_STATUS, + opentelemetry::Value::String(format!("{}", parts.status).into()), + )); + attrs.push(KeyValue::new( + HTTP_RESPONSE_VERSION, + opentelemetry::Value::String(format!("{:?}", parts.version).into()), + )); + attrs.push(KeyValue::new( + HTTP_RESPONSE_BODY, + opentelemetry::Value::String( + String::from_utf8(body.clone().to_vec()) + .unwrap_or_default() + .into(), + ), + )); + + log_event( + level, + "connector.response", + attrs, + &format!( + "Response from connector {label:?}", + label = connector.id.label + ), + ); + } + match serde_json::from_slice::(body) { Ok(json_data) => Ok(json_data), Err(_) => { @@ -422,7 +550,6 @@ mod tests { use url::Url; use crate::plugins::connectors::handle_responses::process_response; - use crate::plugins::connectors::http::Response as ConnectorResponse; use crate::plugins::connectors::make_requests::ResponseKey; use crate::services::router; use crate::services::router::body::RouterBody; @@ -430,7 +557,7 @@ mod tests { #[tokio::test] async fn test_handle_responses_root_fields() { - let connector = Connector { + let connector = Arc::new(Connector { spec: ConnectSpec::V0_1, id: ConnectId::new( "subgraph_name".into(), @@ -453,7 +580,7 @@ mod tests { max_requests: None, request_variables: Default::default(), response_variables: Default::default(), - }; + }); let response1: http::Response = http::Response::builder() .body(router::body::from_bytes(r#"{"data":"world"}"#)) @@ -475,27 +602,25 @@ mod tests { let res = super::aggregate_responses(vec![ process_response( - ConnectorResponse { - result: response1.into(), - key: response_key1, - debug_request: None, - }, - &connector, + Ok(response1), + response_key1, + connector.clone(), &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, process_response( - ConnectorResponse { - result: response2.into(), - key: response_key2, - debug_request: None, - }, - &connector, + Ok(response2), + response_key2, + connector, &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, ]) .unwrap(); @@ -532,7 +657,7 @@ mod tests { #[tokio::test] async fn test_handle_responses_entities() { - let connector = Connector { + let connector = Arc::new(Connector { spec: ConnectSpec::V0_1, id: ConnectId::new( "subgraph_name".into(), @@ -555,7 +680,7 @@ mod tests { max_requests: None, request_variables: Default::default(), response_variables: Default::default(), - }; + }); let response1: http::Response = http::Response::builder() .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#)) @@ -577,27 +702,25 @@ mod tests { let res = super::aggregate_responses(vec![ process_response( - ConnectorResponse { - result: response1.into(), - key: response_key1, - debug_request: None, - }, - &connector, + Ok(response1), + response_key1, + connector.clone(), &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, process_response( - ConnectorResponse { - result: response2.into(), - key: response_key2, - debug_request: None, - }, - &connector, + Ok(response2), + response_key2, + connector, &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, ]) .unwrap(); @@ -640,7 +763,7 @@ mod tests { #[tokio::test] async fn test_handle_responses_entity_field() { - let connector = Connector { + let connector = Arc::new(Connector { spec: ConnectSpec::V0_1, id: ConnectId::new( "subgraph_name".into(), @@ -663,7 +786,7 @@ mod tests { max_requests: None, request_variables: Default::default(), response_variables: Default::default(), - }; + }); let response1: http::Response = http::Response::builder() .body(router::body::from_bytes(r#"{"data":"value1"}"#)) @@ -689,27 +812,25 @@ mod tests { let res = super::aggregate_responses(vec![ process_response( - ConnectorResponse { - result: response1.into(), - key: response_key1, - debug_request: None, - }, - &connector, + Ok(response1), + response_key1, + connector.clone(), &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, process_response( - ConnectorResponse { - result: response2.into(), - key: response_key2, - debug_request: None, - }, - &connector, + Ok(response2), + response_key2, + connector, &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, ]) .unwrap(); @@ -758,7 +879,7 @@ mod tests { #[tokio::test] async fn test_handle_responses_errors() { - let connector = Connector { + let connector = Arc::new(Connector { spec: ConnectSpec::V0_1, id: ConnectId::new( "subgraph_name".into(), @@ -781,7 +902,7 @@ mod tests { max_requests: None, request_variables: Default::default(), response_variables: Default::default(), - }; + }); let response_plaintext: http::Response = http::Response::builder() .body(router::body::from_bytes(r#"plain text"#)) @@ -823,49 +944,45 @@ mod tests { let res = super::aggregate_responses(vec![ process_response( - ConnectorResponse { - result: response_plaintext.into(), - key: response_key_plaintext, - debug_request: None, - }, - &connector, + Ok(response_plaintext), + response_key_plaintext, + connector.clone(), &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, process_response( - ConnectorResponse { - result: response1.into(), - key: response_key1, - debug_request: None, - }, - &connector, + Ok(response1), + response_key1, + connector.clone(), &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, process_response( - ConnectorResponse { - result: response2.into(), - key: response_key2, - debug_request: None, - }, - &connector, + Ok(response2), + response_key2, + connector.clone(), &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, process_response( - ConnectorResponse { - result: response3.into(), - key: response_key3, - debug_request: None, - }, - &connector, + Ok(response3), + response_key3, + connector, &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, ]) .unwrap(); @@ -1016,7 +1133,7 @@ mod tests { #[tokio::test] async fn test_handle_responses_status() { let selection = JSONSelection::parse("$status").unwrap(); - let connector = Connector { + let connector = Arc::new(Connector { spec: ConnectSpec::V0_1, id: ConnectId::new( "subgraph_name".into(), @@ -1039,7 +1156,7 @@ mod tests { max_requests: None, request_variables: Default::default(), response_variables: selection.external_variables().collect(), - }; + }); let response1: http::Response = http::Response::builder() .status(201) @@ -1053,16 +1170,15 @@ mod tests { let res = super::aggregate_responses(vec![ process_response( - ConnectorResponse { - result: response1.into(), - key: response_key1, - debug_request: None, - }, - &connector, + Ok(response1), + response_key1, + connector, &Context::default(), + None, &None, ) - .await, + .await + .mapped_response, ]) .unwrap(); diff --git a/apollo-router/src/plugins/connectors/http.rs b/apollo-router/src/plugins/connectors/http.rs deleted file mode 100644 index 772a92846b..0000000000 --- a/apollo-router/src/plugins/connectors/http.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! HTTP-based connector implementation types. - -use http_body::Body as HttpBody; - -use crate::plugins::connectors::error::Error as ConnectorError; -use crate::plugins::connectors::make_requests::ResponseKey; -use crate::plugins::connectors::plugin::debug::ConnectorDebugHttpRequest; -use crate::services::router::body::RouterBody; - -/// A result of a connector -pub(crate) enum Result { - /// The connector was not invoked because of an error - Err(ConnectorError), - - /// The connector was invoked and returned an HTTP response - HttpResponse(http::Response), -} - -impl From> for Result { - fn from(value: http::Response) -> Self { - Result::HttpResponse(value) - } -} - -impl From for Result { - fn from(value: ConnectorError) -> Self { - Result::Err(value) - } -} - -/// The result of a connector and the associated response key -pub(crate) struct Response { - pub(crate) result: Result, - pub(crate) key: ResponseKey, - pub(crate) debug_request: Option, -} - -#[derive(Debug)] -pub(crate) struct Request { - pub(crate) request: http::Request, - pub(crate) key: ResponseKey, - pub(crate) debug_request: Option, -} diff --git a/apollo-router/src/plugins/connectors/http_json_transport.rs b/apollo-router/src/plugins/connectors/http_json_transport.rs index 444aad0b3d..1a1761f52a 100644 --- a/apollo-router/src/plugins/connectors/http_json_transport.rs +++ b/apollo-router/src/plugins/connectors/http_json_transport.rs @@ -18,21 +18,22 @@ use thiserror::Error; use url::Url; use super::form_encoding::encode_json_as_form; +use crate::plugins::connectors::mapping::aggregate_apply_to_errors; +use crate::plugins::connectors::mapping::Problem; use crate::plugins::connectors::plugin::debug::serialize_request; use crate::plugins::connectors::plugin::debug::ConnectorContext; -use crate::plugins::connectors::plugin::debug::ConnectorDebugHttpRequest; use crate::plugins::connectors::plugin::debug::SelectionData; use crate::services::connect; +use crate::services::connector::request_service::transport::http::HttpRequest; +use crate::services::connector::request_service::TransportRequest; use crate::services::router; -use crate::services::router::body::RouterBody; pub(crate) fn make_request( transport: &HttpJsonTransport, inputs: IndexMap, original_request: &connect::Request, debug: &Option>>, -) -> Result<(http::Request, Option), HttpJsonTransportError> -{ +) -> Result<(TransportRequest, Vec), HttpJsonTransportError> { let uri = make_uri( transport.source_url.as_ref(), &transport.connect_template, @@ -89,6 +90,8 @@ pub(crate) fn make_request( .body(body) .map_err(HttpJsonTransportError::InvalidNewRequest)?; + let mapping_problems = aggregate_apply_to_errors(&apply_to_errors); + let debug_request = debug.as_ref().map(|_| { if is_form_urlencoded { serialize_request( @@ -101,7 +104,7 @@ pub(crate) fn make_request( source: body.to_string(), transformed: body.to_string(), // no transformation so this is the same result: json_body, - errors: apply_to_errors, + errors: mapping_problems.clone(), }), ) } else { @@ -113,13 +116,19 @@ pub(crate) fn make_request( source: body.to_string(), transformed: body.to_string(), // no transformation so this is the same result: json_body.clone(), - errors: apply_to_errors, + errors: mapping_problems.clone(), }), ) } }); - Ok((request, debug_request)) + Ok(( + TransportRequest::Http(HttpRequest { + inner: request, + debug: debug_request, + }), + mapping_problems, + )) } fn make_uri( @@ -704,21 +713,27 @@ mod tests { assert_debug_snapshot!(req, @r###" ( - Request { - method: POST, - uri: http://localhost:8080/, - version: HTTP/1.1, - headers: { - "content-type": "application/json", - "content-length": "8", + Http( + HttpRequest { + inner: Request { + method: POST, + uri: http://localhost:8080/, + version: HTTP/1.1, + headers: { + "content-type": "application/json", + "content-length": "8", + }, + body: UnsyncBoxBody, + }, + debug: None, }, - body: UnsyncBoxBody, - }, - None, + ), + [], ) "###); - let body = body::into_string(req.0.into_body()).await.unwrap(); + let TransportRequest::Http(HttpRequest { inner: req, .. }) = req.0; + let body = body::into_string(req.into_body()).await.unwrap(); insta::assert_snapshot!(body, @r#"{"a":42}"#); } @@ -756,21 +771,27 @@ mod tests { assert_debug_snapshot!(req, @r###" ( - Request { - method: POST, - uri: http://localhost:8080/, - version: HTTP/1.1, - headers: { - "content-type": "application/x-www-form-urlencoded", - "content-length": "4", + Http( + HttpRequest { + inner: Request { + method: POST, + uri: http://localhost:8080/, + version: HTTP/1.1, + headers: { + "content-type": "application/x-www-form-urlencoded", + "content-length": "4", + }, + body: UnsyncBoxBody, + }, + debug: None, }, - body: UnsyncBoxBody, - }, - None, + ), + [], ) "###); - let body = body::into_string(req.0.into_body()).await.unwrap(); + let TransportRequest::Http(HttpRequest { inner: req, .. }) = req.0; + let body = body::into_string(req.into_body()).await.unwrap(); insta::assert_snapshot!(body, @r#"a=42"#); } } diff --git a/apollo-router/src/plugins/connectors/make_requests.rs b/apollo-router/src/plugins/connectors/make_requests.rs index 7bbc53b4a2..f8e204f594 100644 --- a/apollo-router/src/plugins/connectors/make_requests.rs +++ b/apollo-router/src/plugins/connectors/make_requests.rs @@ -15,13 +15,13 @@ use serde_json_bytes::ByteString; use serde_json_bytes::Map; use serde_json_bytes::Value; -use super::http::Request; use super::http_json_transport::make_request; use super::http_json_transport::HttpJsonTransportError; use crate::json_ext::Path; use crate::json_ext::PathElement; use crate::plugins::connectors::plugin::debug::ConnectorContext; use crate::services::connect; +use crate::services::connector::request_service::Request; use crate::Context; const REPRESENTATIONS_VAR: &str = "representations"; @@ -169,27 +169,41 @@ impl From<&ResponseKey> for Path { pub(crate) fn make_requests( request: connect::Request, - connector: &Connector, + context: &Context, + connector: Arc, + service_name: &str, debug: &Option>>, ) -> Result, MakeRequestError> { let request_params = match connector.entity_resolver { - Some(EntityResolver::Explicit) => entities_from_request(connector, &request), - Some(EntityResolver::Implicit) => entities_with_fields_from_request(connector, &request), - None => root_fields(connector, &request), + Some(EntityResolver::Explicit) => entities_from_request(connector.clone(), &request), + Some(EntityResolver::Implicit) => { + entities_with_fields_from_request(connector.clone(), &request) + } + None => root_fields(connector.clone(), &request), }?; - request_params_to_requests(connector, request_params, &request, debug) + request_params_to_requests( + context, + connector, + service_name, + request_params, + &request, + debug, + ) } fn request_params_to_requests( - connector: &Connector, + context: &Context, + connector: Arc, + service_name: &str, request_params: Vec, original_request: &connect::Request, debug: &Option>>, ) -> Result, MakeRequestError> { let mut results = vec![]; for response_key in request_params { - let (request, debug_request) = make_request( + let connector = connector.clone(); + let (transport_request, mapping_problems) = make_request( &connector.transport, response_key.inputs().merge( &connector.request_variables, @@ -202,9 +216,12 @@ fn request_params_to_requests( )?; results.push(Request { - request, + context: context.clone(), + connector, + service_name: service_name.to_string(), + transport_request, key: response_key, - debug_request, + mapping_problems, }); } @@ -253,7 +270,7 @@ pub(crate) enum MakeRequestError { /// } /// ``` fn root_fields( - connector: &Connector, + connector: Arc, request: &connect::Request, ) -> Result, MakeRequestError> { use MakeRequestError::*; @@ -329,7 +346,7 @@ fn root_fields( /// /// Returns a list of request inputs and the response key (index in the array). fn entities_from_request( - connector: &Connector, + connector: Arc, request: &connect::Request, ) -> Result, MakeRequestError> { use MakeRequestError::*; @@ -398,7 +415,7 @@ fn entities_from_request( /// Return a list of request inputs with the response key (index in list and /// name/alias of field) for each. fn entities_with_fields_from_request( - connector: &Connector, + connector: Arc, request: &connect::Request, ) -> Result, MakeRequestError> { use MakeRequestError::*; @@ -564,6 +581,7 @@ mod tests { use crate::graphql; use crate::query_planner::fetch::Variables; + use crate::services::connector::request_service::TransportRequest; use crate::Context; #[test] @@ -620,7 +638,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::root_fields(&connector, &req), @r###" + assert_debug_snapshot!(super::root_fields(Arc::new(connector), &req), @r###" Ok( [ RootField { @@ -751,7 +769,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::root_fields(&connector, &req), @r###" + assert_debug_snapshot!(super::root_fields(Arc::new(connector), &req), @r###" Ok( [ RootField { @@ -910,7 +928,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::root_fields(&connector, &req), @r###" + assert_debug_snapshot!(super::root_fields(Arc::new(connector), &req), @r###" Ok( [ RootField { @@ -1139,7 +1157,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::entities_from_request(&connector, &req).unwrap(), @r###" + assert_debug_snapshot!(super::entities_from_request(Arc::new(connector), &req).unwrap(), @r###" [ Entity { index: 0, @@ -1457,7 +1475,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::entities_from_request(&connector, &req).unwrap(), @r###" + assert_debug_snapshot!(super::entities_from_request(Arc::new(connector), &req).unwrap(), @r###" [ Entity { index: 0, @@ -1756,7 +1774,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::entities_from_request(&connector, &req).unwrap(), @r###" + assert_debug_snapshot!(super::entities_from_request(Arc::new(connector), &req).unwrap(), @r###" [ RootField { name: "a", @@ -1975,7 +1993,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::entities_with_fields_from_request(&connector, &req).unwrap(), @r###" + assert_debug_snapshot!(super::entities_with_fields_from_request(Arc::new(connector), &req).unwrap(), @r###" [ EntityField { index: 0, @@ -2253,7 +2271,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::entities_with_fields_from_request(&connector, &req).unwrap(), @r###" + assert_debug_snapshot!(super::entities_with_fields_from_request(Arc::new(connector), &req).unwrap(), @r###" [ EntityField { index: 0, @@ -2528,7 +2546,7 @@ mod tests { response_variables: Default::default(), }; - assert_debug_snapshot!(super::entities_with_fields_from_request(&connector ,&req).unwrap(), @r###" + assert_debug_snapshot!(super::entities_with_fields_from_request(Arc::new(connector), &req).unwrap(), @r###" [ EntityField { index: 0, @@ -2619,9 +2637,9 @@ mod tests { #[test] fn make_requests() { let schema = Schema::parse_and_validate("type Query { hello: String }", "./").unwrap(); - + let service_name = String::from("subgraph_Query_a_0"); let req = crate::services::connect::Request::builder() - .service_name("subgraph_Query_a_0".into()) + .service_name(service_name.clone().into()) .context(Context::default()) .operation(Arc::new( ExecutableDocument::parse_and_validate( @@ -2668,16 +2686,23 @@ mod tests { response_variables: Default::default(), }; - let requests: Vec<_> = super::make_requests(req, &connector, &None) - .unwrap() - .into_iter() - .map(|req| { - let (parts, _body) = req.request.into_parts(); - let new_req = - http::Request::from_parts(parts, http_body_util::Empty::::new()); - (new_req, req.key, req.debug_request) - }) - .collect(); + let requests: Vec<_> = super::make_requests( + req, + &Context::default(), + Arc::new(connector), + &service_name, + &None, + ) + .unwrap() + .into_iter() + .map(|req| { + let TransportRequest::Http(http_request) = req.transport_request; + let (parts, _body) = http_request.inner.into_parts(); + let new_req = + http::Request::from_parts(parts, http_body_util::Empty::::new()); + (new_req, req.key, http_request.debug) + }) + .collect(); assert_debug_snapshot!(requests, @r###" [ diff --git a/apollo-router/src/plugins/connectors/mapping.rs b/apollo-router/src/plugins/connectors/mapping.rs new file mode 100644 index 0000000000..29d9dff41d --- /dev/null +++ b/apollo-router/src/plugins/connectors/mapping.rs @@ -0,0 +1,48 @@ +//! Mapping from a Connectors request or response to GraphQL + +use std::collections::HashMap; + +use apollo_federation::sources::connect::ApplyToError; +use itertools::Itertools; +use serde::Deserialize; +use serde::Serialize; + +/// A mapping problem +#[derive(Clone, Debug, Serialize, Deserialize)] +#[cfg_attr(test, derive(schemars::JsonSchema))] +pub(crate) struct Problem { + pub(crate) message: String, + pub(crate) path: String, + pub(crate) count: usize, +} + +/// Aggregate a list of [`ApplyToError`] into [mapping problems](Problem) +pub(crate) fn aggregate_apply_to_errors(errors: &[ApplyToError]) -> Vec { + errors + .iter() + .fold( + HashMap::default(), + |mut acc: HashMap<(&str, String), usize>, err| { + let path = err + .path() + .iter() + .map(|p| match p.as_u64() { + Some(_) => "@", // ignore array indices for grouping + None => p.as_str().unwrap_or_default(), + }) + .join("."); + + acc.entry((err.message(), path)) + .and_modify(|c| *c += 1) + .or_insert(1); + acc + }, + ) + .iter() + .map(|(key, &count)| Problem { + message: key.0.to_string(), + path: key.1.clone(), + count, + }) + .collect() +} diff --git a/apollo-router/src/plugins/connectors/mod.rs b/apollo-router/src/plugins/connectors/mod.rs index f7b5a81cf3..3a5c7b7f66 100644 --- a/apollo-router/src/plugins/connectors/mod.rs +++ b/apollo-router/src/plugins/connectors/mod.rs @@ -1,10 +1,9 @@ pub(crate) mod configuration; -pub(crate) mod error; mod form_encoding; pub(crate) mod handle_responses; -pub(crate) mod http; pub(crate) mod http_json_transport; pub(crate) mod make_requests; +pub(crate) mod mapping; pub(crate) mod plugin; pub(crate) mod query_plans; pub(crate) mod request_limit; diff --git a/apollo-router/src/plugins/connectors/plugin/debug.rs b/apollo-router/src/plugins/connectors/plugin/debug.rs index 3109b30fba..0d91974391 100644 --- a/apollo-router/src/plugins/connectors/plugin/debug.rs +++ b/apollo-router/src/plugins/connectors/plugin/debug.rs @@ -1,12 +1,9 @@ -use std::collections::HashMap; - -use apollo_federation::sources::connect::ApplyToError; use bytes::Bytes; -use itertools::Itertools; use serde::Deserialize; use serde::Serialize; use serde_json_bytes::json; +use crate::plugins::connectors::mapping::Problem; use crate::services::router::body::RouterBody; #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -99,8 +96,8 @@ pub(crate) struct SelectionData { /// Refer to [`Self::errors`] for any errors found during evaluation pub(crate) result: Option, - /// A list of errors encountered during evaluation. - pub(crate) errors: Vec, + /// A list of mapping problems encountered during evaluation. + pub(crate) errors: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -123,7 +120,7 @@ struct ConnectorDebugSelection { source: String, transformed: String, result: Option, - errors: Vec, + errors: Vec, } pub(crate) fn serialize_request( @@ -152,7 +149,7 @@ pub(crate) fn serialize_request( source: selection.source, transformed: selection.transformed, result: selection.result, - errors: aggregate_apply_to_errors(&selection.errors), + errors: selection.errors, }), }), } @@ -189,40 +186,8 @@ fn serialize_response( source: selection.source, transformed: selection.transformed, result: selection.result, - errors: aggregate_apply_to_errors(&selection.errors), + errors: selection.errors, }), }, } } - -fn aggregate_apply_to_errors(errors: &[ApplyToError]) -> Vec { - errors - .iter() - .fold( - HashMap::default(), - |mut acc: HashMap<(&str, String), usize>, err| { - let path = err - .path() - .iter() - .map(|p| match p.as_u64() { - Some(_) => "@", // ignore array indices for grouping - None => p.as_str().unwrap_or_default(), - }) - .join("."); - - acc.entry((err.message(), path)) - .and_modify(|c| *c += 1) - .or_insert(1); - acc - }, - ) - .iter() - .map(|(key, count)| { - json!({ - "message": key.0, - "path": key.1, - "count": count, - }) - }) - .collect() -} diff --git a/apollo-router/src/plugins/connectors/tracing.rs b/apollo-router/src/plugins/connectors/tracing.rs index 489fa6bf76..3615c11cf4 100644 --- a/apollo-router/src/plugins/connectors/tracing.rs +++ b/apollo-router/src/plugins/connectors/tracing.rs @@ -114,13 +114,7 @@ mod tests { async { let config = Arc::default(); let schema = Schema::parse(STEEL_THREAD_SCHEMA, &config).unwrap(); - let _factory = ConnectorServiceFactory::new( - schema.into(), - Arc::default(), - Arc::default(), - Default::default(), - Arc::default(), - ); + let _factory = ConnectorServiceFactory::empty(Arc::from(schema)); assert_gauge!( "apollo.router.schema.connectors", diff --git a/apollo-router/src/plugins/telemetry/config_new/attributes.rs b/apollo-router/src/plugins/telemetry/config_new/attributes.rs index 8f9eee75e5..fcdd297a98 100644 --- a/apollo-router/src/plugins/telemetry/config_new/attributes.rs +++ b/apollo-router/src/plugins/telemetry/config_new/attributes.rs @@ -72,10 +72,10 @@ pub(crate) const HTTP_REQUEST_URI: Key = Key::from_static_str("http.request.uri" pub(crate) const HTTP_REQUEST_VERSION: Key = Key::from_static_str("http.request.version"); pub(crate) const HTTP_REQUEST_BODY: Key = Key::from_static_str("http.request.body"); -pub(super) const HTTP_RESPONSE_HEADERS: Key = Key::from_static_str("http.response.headers"); -pub(super) const HTTP_RESPONSE_STATUS: Key = Key::from_static_str("http.response.status"); -pub(super) const HTTP_RESPONSE_VERSION: Key = Key::from_static_str("http.response.version"); -pub(super) const HTTP_RESPONSE_BODY: Key = Key::from_static_str("http.response.body"); +pub(crate) const HTTP_RESPONSE_HEADERS: Key = Key::from_static_str("http.response.headers"); +pub(crate) const HTTP_RESPONSE_STATUS: Key = Key::from_static_str("http.response.status"); +pub(crate) const HTTP_RESPONSE_VERSION: Key = Key::from_static_str("http.response.version"); +pub(crate) const HTTP_RESPONSE_BODY: Key = Key::from_static_str("http.response.body"); #[derive(Deserialize, JsonSchema, Clone, Debug, Default, Copy)] #[serde(deny_unknown_fields, rename_all = "snake_case")] diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/attributes.rs b/apollo-router/src/plugins/telemetry/config_new/connector/attributes.rs index 6e987b4fbe..6ba9dcfa09 100644 --- a/apollo-router/src/plugins/telemetry/config_new/connector/attributes.rs +++ b/apollo-router/src/plugins/telemetry/config_new/connector/attributes.rs @@ -12,8 +12,6 @@ use crate::plugins::telemetry::config_new::connector::ConnectorResponse; use crate::plugins::telemetry::config_new::DefaultForLevel; use crate::plugins::telemetry::config_new::Selectors; use crate::plugins::telemetry::otlp::TelemetryDataKind; -use crate::services::connector_service::ConnectorInfo; -use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY; use crate::Context; const CONNECTOR_HTTP_METHOD: Key = Key::from_static_str("connector.http.method"); @@ -96,41 +94,46 @@ impl Selectors for ConnectorAttributes fn on_request(&self, request: &ConnectorRequest) -> Vec { let mut attrs = Vec::new(); - if let Ok(Some(connector_info)) = request - .context - .get::<&str, ConnectorInfo>(CONNECTOR_INFO_CONTEXT_KEY) + if let Some(key) = self + .subgraph_name + .as_ref() + .and_then(|a| a.key(SUBGRAPH_NAME)) { - if let Some(key) = self - .subgraph_name - .as_ref() - .and_then(|a| a.key(SUBGRAPH_NAME)) - { - attrs.push(KeyValue::new(key, connector_info.subgraph_name.to_string())); - } - if let Some(key) = self - .connector_source_name - .as_ref() - .and_then(|a| a.key(CONNECTOR_SOURCE_NAME)) - { - if let Some(source_name) = connector_info.source_name { - attrs.push(KeyValue::new(key, source_name.to_string())); - } - } - if let Some(key) = self - .connector_http_method - .as_ref() - .and_then(|a| a.key(CONNECTOR_HTTP_METHOD)) - { - attrs.push(KeyValue::new(key, connector_info.http_method)); - } - if let Some(key) = self - .connector_url_template - .as_ref() - .and_then(|a| a.key(CONNECTOR_URL_TEMPLATE)) - { - attrs.push(KeyValue::new(key, connector_info.url_template.to_string())); + attrs.push(KeyValue::new( + key, + request.connector.id.subgraph_name.clone(), + )); + } + if let Some(key) = self + .connector_source_name + .as_ref() + .and_then(|a| a.key(CONNECTOR_SOURCE_NAME)) + { + if let Some(ref source_name) = request.connector.id.source_name { + attrs.push(KeyValue::new(key, source_name.clone())); } } + if let Some(key) = self + .connector_http_method + .as_ref() + .and_then(|a| a.key(CONNECTOR_HTTP_METHOD)) + { + attrs.push(KeyValue::new( + key, + request.connector.transport.method.as_str().to_string(), + )); + } + if let Some(key) = self + .connector_url_template + .as_ref() + .and_then(|a| a.key(CONNECTOR_URL_TEMPLATE)) + { + attrs.push(KeyValue::new( + key, + request.connector.transport.connect_template.to_string(), + )); + } + attrs } diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/events.rs b/apollo-router/src/plugins/telemetry/config_new/connector/events.rs index 8b7c7db879..d8abbbdf7d 100644 --- a/apollo-router/src/plugins/telemetry/config_new/connector/events.rs +++ b/apollo-router/src/plugins/telemetry/config_new/connector/events.rs @@ -1,17 +1,10 @@ use opentelemetry::Key; use opentelemetry::KeyValue; -use opentelemetry_semantic_conventions::trace::HTTP_REQUEST_METHOD; use parking_lot::Mutex; use schemars::JsonSchema; use serde::Deserialize; use tower::BoxError; -use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_HEADERS; -use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_URI; -use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_VERSION; -use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS; -use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS; -use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION; use crate::plugins::telemetry::config_new::connector::attributes::ConnectorAttributes; use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelector; use crate::plugins::telemetry::config_new::connector::ConnectorRequest; @@ -22,11 +15,17 @@ use crate::plugins::telemetry::config_new::events::CustomEventInner; use crate::plugins::telemetry::config_new::events::CustomEvents; use crate::plugins::telemetry::config_new::events::Event; use crate::plugins::telemetry::config_new::events::EventLevel; +use crate::plugins::telemetry::config_new::events::StandardEvent; use crate::plugins::telemetry::config_new::events::StandardEventConfig; use crate::plugins::telemetry::config_new::extendable::Extendable; use crate::plugins::telemetry::config_new::instruments::Instrumented; use crate::Context; +#[derive(Clone)] +pub(crate) struct ConnectorEventRequest(pub(crate) StandardEvent); +#[derive(Clone)] +pub(crate) struct ConnectorEventResponse(pub(crate) StandardEvent); + #[derive(Clone, Deserialize, JsonSchema, Debug, Default)] #[serde(deny_unknown_fields, default)] pub(crate) struct ConnectorEventsConfig { @@ -86,104 +85,30 @@ impl Instrumented type EventResponse = (); fn on_request(&self, request: &Self::Request) { + // Any condition on the request is NOT evaluated here. It must be evaluated later when + // getting the ConnectorEventRequest from the context. The request context is shared + // between all connector requests, so any request could find this ConnectorEventRequest in + // the context. Its presence on the context cannot be conditional on an individual request. if self.request.level() != EventLevel::Off { - if let Some(condition) = self.request.condition() { - if condition.lock().evaluate_request(request) != Some(true) { - return; - } - } - let mut attrs = Vec::with_capacity(5); - #[cfg(test)] - let headers = { - let mut headers: indexmap::IndexMap = request - .http_request - .headers() - .clone() - .into_iter() - .filter_map(|(name, val)| Some((name?.to_string(), val))) - .collect(); - headers.sort_keys(); - headers - }; - #[cfg(not(test))] - let headers = request.http_request.headers(); + request + .context + .extensions() + .with_lock(|mut lock| lock.insert(ConnectorEventRequest(self.request.clone()))); + } - attrs.push(KeyValue::new( - HTTP_REQUEST_HEADERS, - opentelemetry::Value::String(format!("{:?}", headers).into()), - )); - attrs.push(KeyValue::new( - HTTP_REQUEST_METHOD, - opentelemetry::Value::String(format!("{}", request.http_request.method()).into()), - )); - attrs.push(KeyValue::new( - HTTP_REQUEST_URI, - opentelemetry::Value::String(format!("{}", request.http_request.uri()).into()), - )); - attrs.push(KeyValue::new( - HTTP_REQUEST_VERSION, - opentelemetry::Value::String( - format!("{:?}", request.http_request.version()).into(), - ), - )); - // FIXME: need to re-introduce this the same way we did for router request body but we need a request id in order to - // match the request to the element in context.extensions to make sure to not mismatch the request body settings to another one - // attrs.push(KeyValue::new( - // HTTP_REQUEST_BODY, - // opentelemetry::Value::String(format!("{:?}", request.http_request.body()).into()), - // )); - log_event(self.request.level(), "connector.request", attrs, ""); + if self.response.level() != EventLevel::Off { + request + .context + .extensions() + .with_lock(|mut lock| lock.insert(ConnectorEventResponse(self.response.clone()))); } + for custom_event in &self.custom { custom_event.on_request(request); } } fn on_response(&self, response: &Self::Response) { - if self.response.level() != EventLevel::Off { - if let Some(condition) = self.response.condition() { - if !condition.lock().evaluate_response(response) { - return; - } - } - let mut attrs = Vec::with_capacity(4); - #[cfg(test)] - let headers = { - let mut headers: indexmap::IndexMap = response - .http_response - .headers() - .clone() - .into_iter() - .filter_map(|(name, val)| Some((name?.to_string(), val))) - .collect(); - headers.sort_keys(); - headers - }; - #[cfg(not(test))] - let headers = response.http_response.headers(); - - attrs.push(KeyValue::new( - HTTP_RESPONSE_HEADERS, - opentelemetry::Value::String(format!("{:?}", headers).into()), - )); - attrs.push(KeyValue::new( - HTTP_RESPONSE_STATUS, - opentelemetry::Value::String(format!("{}", response.http_response.status()).into()), - )); - attrs.push(KeyValue::new( - HTTP_RESPONSE_VERSION, - opentelemetry::Value::String( - format!("{:?}", response.http_response.version()).into(), - ), - )); - // FIXME: need to re-introduce this the same way we did for router response body but we need a request id in order to - // match the request to the element in context.extensions to make sure to not mismatch the response body settings to another one - // attrs.push(KeyValue::new( - // HTTP_RESPONSE_BODY, - // opentelemetry::Value::String(format!("{:?}", response.http_response.body()).into()), - // )); - log_event(self.response.level(), "connector.response", attrs, ""); - } for custom_event in &self.custom { custom_event.on_response(response); } diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/instruments.rs b/apollo-router/src/plugins/telemetry/config_new/connector/instruments.rs index 4090327d8b..d4a94d3c2f 100644 --- a/apollo-router/src/plugins/telemetry/config_new/connector/instruments.rs +++ b/apollo-router/src/plugins/telemetry/config_new/connector/instruments.rs @@ -16,7 +16,6 @@ use crate::plugins::telemetry::config_new::connector::selectors::ConnectorSelect use crate::plugins::telemetry::config_new::connector::selectors::ConnectorValue; use crate::plugins::telemetry::config_new::connector::ConnectorRequest; use crate::plugins::telemetry::config_new::connector::ConnectorResponse; -use crate::plugins::telemetry::config_new::connector::HttpRequest; use crate::plugins::telemetry::config_new::extendable::Extendable; use crate::plugins::telemetry::config_new::instruments::CustomHistogram; use crate::plugins::telemetry::config_new::instruments::CustomHistogramInner; @@ -339,7 +338,7 @@ impl Instrumented for ConnectorInstruments { } pub(crate) type ConnectorCustomInstruments = CustomInstruments< - HttpRequest, + ConnectorRequest, ConnectorResponse, (), ConnectorAttributes, diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs b/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs index 6242ccc39d..8ced6036b6 100644 --- a/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs +++ b/apollo-router/src/plugins/telemetry/config_new/connector/mod.rs @@ -1,13 +1,10 @@ //! Connectors telemetry. -use crate::services::http::HttpRequest; -use crate::services::http::HttpResponse; - pub(crate) mod attributes; pub(crate) mod events; pub(crate) mod instruments; pub(crate) mod selectors; pub(crate) mod spans; -pub(crate) type ConnectorRequest = HttpRequest; -pub(crate) type ConnectorResponse = HttpResponse; +pub(crate) type ConnectorRequest = crate::services::connector::request_service::Request; +pub(crate) type ConnectorResponse = crate::services::connector::request_service::Response; diff --git a/apollo-router/src/plugins/telemetry/config_new/connector/selectors.rs b/apollo-router/src/plugins/telemetry/config_new/connector/selectors.rs index 7f491f549b..8116a66b3c 100644 --- a/apollo-router/src/plugins/telemetry/config_new/connector/selectors.rs +++ b/apollo-router/src/plugins/telemetry/config_new/connector/selectors.rs @@ -1,9 +1,12 @@ use derivative::Derivative; +use opentelemetry::Array; +use opentelemetry::StringValue; use opentelemetry::Value; use schemars::JsonSchema; use serde::Deserialize; use tower::BoxError; +use crate::plugins::connectors::handle_responses::MappedResponse; use crate::plugins::telemetry::config::AttributeValue; use crate::plugins::telemetry::config_new::connector::ConnectorRequest; use crate::plugins::telemetry::config_new::connector::ConnectorResponse; @@ -13,8 +16,8 @@ use crate::plugins::telemetry::config_new::selectors::ErrorRepr; use crate::plugins::telemetry::config_new::selectors::ResponseStatus; use crate::plugins::telemetry::config_new::Selector; use crate::plugins::telemetry::config_new::Stage; -use crate::services::connector_service::ConnectorInfo; -use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY; +use crate::services::connector::request_service::TransportRequest; +use crate::services::connector::request_service::TransportResponse; use crate::Context; #[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] @@ -39,6 +42,14 @@ impl From<&ConnectorValue> for InstrumentValue { } } } + +#[derive(Deserialize, JsonSchema, Clone, Debug, PartialEq)] +#[serde(deny_unknown_fields, rename_all = "snake_case")] +pub(crate) enum MappingProblems { + Problems, + Count, +} + #[derive(Deserialize, JsonSchema, Clone, Derivative)] #[serde(deny_unknown_fields, rename_all = "snake_case", untagged)] #[derivative(Debug, PartialEq)] @@ -91,6 +102,14 @@ pub(crate) enum ConnectorSelector { /// Critical error if it happens error: ErrorRepr, }, + RequestMappingProblems { + /// Request mapping problems, if any + connector_request_mapping_problems: MappingProblems, + }, + ResponseMappingProblems { + /// Response mapping problems, if any + connector_response_mapping_problems: MappingProblems, + }, } impl Selector for ConnectorSelector { @@ -99,134 +118,127 @@ impl Selector for ConnectorSelector { type EventResponse = (); fn on_request(&self, request: &Self::Request) -> Option { - let connector_info = request - .context - .get::<&str, ConnectorInfo>(CONNECTOR_INFO_CONTEXT_KEY); match self { - ConnectorSelector::SubgraphName { subgraph_name } if *subgraph_name => connector_info - .ok() - .flatten() - .map(|info| info.subgraph_name.clone()) - .map(opentelemetry::Value::from), - ConnectorSelector::ConnectorSource { .. } => connector_info - .ok() - .flatten() - .and_then(|info| info.source_name.clone()) + ConnectorSelector::SubgraphName { subgraph_name } if *subgraph_name => Some( + opentelemetry::Value::from(request.connector.id.subgraph_name.clone()), + ), + ConnectorSelector::ConnectorSource { .. } => request + .connector + .id + .source_name + .as_ref() + .cloned() .map(opentelemetry::Value::from), ConnectorSelector::ConnectorHttpMethod { connector_http_method, - } if *connector_http_method => connector_info - .ok() - .flatten() - .map(|info| info.http_method.clone()) - .map(opentelemetry::Value::from), + } if *connector_http_method => Some(opentelemetry::Value::from( + request.connector.transport.method.as_str().to_string(), + )), ConnectorSelector::ConnectorUrlTemplate { connector_url_template, - } if *connector_url_template => connector_info - .ok() - .flatten() - .map(|info| info.url_template.clone()) - .map(opentelemetry::Value::from), + } if *connector_url_template => Some(opentelemetry::Value::from( + request.connector.transport.connect_template.to_string(), + )), ConnectorSelector::HttpRequestHeader { connector_http_request_header: connector_request_header, default, .. - } => request - .http_request - .headers() - .get(connector_request_header) - .and_then(|h| Some(h.to_str().ok()?.to_string())) - .or_else(|| default.clone()) - .map(opentelemetry::Value::from), + } => { + let TransportRequest::Http(ref http_request) = request.transport_request; + http_request + .inner + .headers() + .get(connector_request_header) + .and_then(|h| Some(h.to_str().ok()?.to_string())) + .or_else(|| default.clone()) + .map(opentelemetry::Value::from) + } + ConnectorSelector::RequestMappingProblems { + connector_request_mapping_problems: mapping_problems, + } => match mapping_problems { + MappingProblems::Problems => Some(Value::Array(Array::String( + request + .mapping_problems + .iter() + .filter_map(|problem| { + serde_json::to_string(problem).ok().map(StringValue::from) + }) + .collect(), + ))), + MappingProblems::Count => Some(Value::I64( + request + .mapping_problems + .iter() + .map(|problem| problem.count as i64) + .sum(), + )), + }, ConnectorSelector::StaticField { r#static } => Some(r#static.clone().into()), _ => None, } } fn on_response(&self, response: &Self::Response) -> Option { - let connector_info = response - .context - .get::<&str, ConnectorInfo>(CONNECTOR_INFO_CONTEXT_KEY); match self { - ConnectorSelector::SubgraphName { subgraph_name } if *subgraph_name => connector_info - .ok() - .flatten() - .map(|info| info.subgraph_name.clone()) - .map(opentelemetry::Value::from), - ConnectorSelector::ConnectorSource { .. } => connector_info - .ok() - .flatten() - .and_then(|info| info.source_name.clone()) - .map(opentelemetry::Value::from), - ConnectorSelector::ConnectorHttpMethod { - connector_http_method, - } if *connector_http_method => connector_info - .ok() - .flatten() - .map(|info| info.http_method.clone()) - .map(opentelemetry::Value::from), - ConnectorSelector::ConnectorUrlTemplate { - connector_url_template, - } if *connector_url_template => connector_info - .ok() - .flatten() - .map(|info| info.url_template.clone()) - .map(opentelemetry::Value::from), ConnectorSelector::ConnectorResponseHeader { connector_http_response_header: connector_response_header, default, .. - } => response - .http_response - .headers() - .get(connector_response_header) - .and_then(|h| Some(h.to_str().ok()?.to_string())) - .or_else(|| default.clone()) - .map(opentelemetry::Value::from), + } => { + if let Ok(TransportResponse::Http(ref http_response)) = response.transport_result { + http_response + .inner + .headers + .get(connector_response_header) + .and_then(|h| Some(h.to_str().ok()?.to_string())) + .or_else(|| default.clone()) + .map(opentelemetry::Value::from) + } else { + None + } + } ConnectorSelector::ConnectorResponseStatus { connector_http_response_status: response_status, - } => match response_status { - ResponseStatus::Code => { - Some(Value::I64(response.http_response.status().as_u16() as i64)) + } => { + if let Ok(TransportResponse::Http(ref http_response)) = response.transport_result { + let status = http_response.inner.status; + match response_status { + ResponseStatus::Code => Some(Value::I64(status.as_u16() as i64)), + ResponseStatus::Reason => { + status.canonical_reason().map(|reason| reason.into()) + } + } + } else { + None } - ResponseStatus::Reason => response - .http_response - .status() - .canonical_reason() - .map(|reason| reason.into()), - }, - ConnectorSelector::StaticField { r#static } => Some(r#static.clone().into()), + } + ConnectorSelector::ResponseMappingProblems { + connector_response_mapping_problems: mapping_problems, + } => { + if let MappedResponse::Data { ref problems, .. } = response.mapped_response { + match mapping_problems { + MappingProblems::Problems => Some(Value::Array(Array::String( + problems + .iter() + .filter_map(|problem| { + serde_json::to_string(problem).ok().map(StringValue::from) + }) + .collect(), + ))), + MappingProblems::Count => Some(Value::I64( + problems.iter().map(|problem| problem.count as i64).sum(), + )), + } + } else { + None + } + } _ => None, } } - fn on_error(&self, error: &BoxError, ctx: &Context) -> Option { - let connector_info = ctx.get::<&str, ConnectorInfo>(CONNECTOR_INFO_CONTEXT_KEY); + fn on_error(&self, error: &BoxError, _: &Context) -> Option { match self { - ConnectorSelector::SubgraphName { subgraph_name } if *subgraph_name => connector_info - .ok() - .flatten() - .map(|info| info.subgraph_name.clone()) - .map(opentelemetry::Value::from), - ConnectorSelector::ConnectorSource { .. } => connector_info - .ok() - .flatten() - .and_then(|info| info.source_name.clone()) - .map(opentelemetry::Value::from), - ConnectorSelector::ConnectorHttpMethod { - connector_http_method, - } if *connector_http_method => connector_info - .ok() - .flatten() - .map(|info| info.http_method.clone()) - .map(opentelemetry::Value::from), - ConnectorSelector::ConnectorUrlTemplate { - connector_url_template, - } if *connector_url_template => connector_info - .ok() - .flatten() - .map(|info| info.url_template.clone()) - .map(opentelemetry::Value::from), ConnectorSelector::Error { .. } => Some(error.to_string().into()), ConnectorSelector::StaticField { r#static } => Some(r#static.clone().into()), _ => None, @@ -250,6 +262,7 @@ impl Selector for ConnectorSelector { | ConnectorSelector::ConnectorHttpMethod { .. } | ConnectorSelector::ConnectorUrlTemplate { .. } | ConnectorSelector::StaticField { .. } + | ConnectorSelector::RequestMappingProblems { .. } ), Stage::Response => matches!( self, @@ -260,6 +273,7 @@ impl Selector for ConnectorSelector { | ConnectorSelector::ConnectorHttpMethod { .. } | ConnectorSelector::ConnectorUrlTemplate { .. } | ConnectorSelector::StaticField { .. } + | ConnectorSelector::ResponseMappingProblems { .. } ), Stage::ResponseEvent => false, Stage::ResponseField => false, @@ -279,20 +293,38 @@ impl Selector for ConnectorSelector { #[cfg(test)] mod tests { + use std::str::FromStr; + use std::sync::Arc; + + use apollo_compiler::name; + use apollo_federation::sources::connect::ConnectId; + use apollo_federation::sources::connect::ConnectSpec; + use apollo_federation::sources::connect::Connector; use apollo_federation::sources::connect::HTTPMethod; + use apollo_federation::sources::connect::HttpJsonTransport; + use apollo_federation::sources::connect::JSONSelection; + use apollo_federation::sources::connect::URLTemplate; + use http::HeaderValue; use http::StatusCode; + use opentelemetry::Array; + use opentelemetry::StringValue; + use opentelemetry::Value; use super::ConnectorSelector; use super::ConnectorSource; - use crate::plugins::telemetry::config_new::connector::ConnectorRequest; - use crate::plugins::telemetry::config_new::connector::ConnectorResponse; + use super::MappingProblems; + use crate::plugins::connectors::handle_responses::MappedResponse; + use crate::plugins::connectors::make_requests::ResponseKey; + use crate::plugins::connectors::mapping::Problem; use crate::plugins::telemetry::config_new::selectors::ResponseStatus; use crate::plugins::telemetry::config_new::Selector; - use crate::services::connector_service::ConnectorInfo; - use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY; - use crate::services::http::HttpRequest; - use crate::services::http::HttpResponse; + use crate::services::connector::request_service::transport; + use crate::services::connector::request_service::Request; + use crate::services::connector::request_service::Response; + use crate::services::connector::request_service::TransportRequest; + use crate::services::connector::request_service::TransportResponse; use crate::services::router::body; + use crate::services::router::body::RouterBody; use crate::Context; const TEST_SUBGRAPH_NAME: &str = "test_subgraph_name"; @@ -302,61 +334,165 @@ mod tests { const TEST_HEADER_VALUE: &str = "test_header_value"; const TEST_STATIC: &str = "test_static"; - fn connector_info() -> ConnectorInfo { - ConnectorInfo { - subgraph_name: TEST_SUBGRAPH_NAME.to_string(), - source_name: Some(TEST_SOURCE_NAME.to_string()), - http_method: HTTPMethod::Get.as_str().to_string(), - url_template: TEST_URL_TEMPLATE.to_string(), - } + fn context() -> Context { + Context::default() } - fn context(connector_info: ConnectorInfo) -> Context { - let context = Context::default(); - context - .insert(CONNECTOR_INFO_CONTEXT_KEY, connector_info) - .unwrap(); - context + fn connector() -> Connector { + Connector { + id: ConnectId::new( + TEST_SUBGRAPH_NAME.into(), + Some(TEST_SOURCE_NAME.into()), + name!(Query), + name!(users), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str(TEST_URL_TEMPLATE).unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + } } - fn http_request(context: Context) -> ConnectorRequest { - HttpRequest { - http_request: http::Request::builder().body(body::empty()).unwrap(), - context, + fn response_key() -> ResponseKey { + ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), } } - fn http_request_with_header(context: Context) -> ConnectorRequest { - HttpRequest { - http_request: http::Request::builder() - .header(TEST_HEADER_NAME, TEST_HEADER_VALUE) - .body(body::empty()) - .unwrap(), - context, + fn http_request() -> http::Request { + http::Request::builder().body(body::empty()).unwrap() + } + + fn http_request_with_header() -> http::Request { + let mut http_request = http::Request::builder().body(body::empty()).unwrap(); + http_request.headers_mut().insert( + TEST_HEADER_NAME, + HeaderValue::from_static(TEST_HEADER_VALUE), + ); + http_request + } + + fn connector_request(http_request: http::Request) -> Request { + connector_request_with_mapping_problems(http_request, vec![]) + } + + fn connector_request_with_mapping_problems( + http_request: http::Request, + mapping_problems: Vec, + ) -> Request { + Request { + context: context(), + connector: Arc::new(connector()), + service_name: Default::default(), + transport_request: TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }), + key: response_key(), + mapping_problems, } } - fn http_response(context: Context, status_code: StatusCode) -> ConnectorResponse { - HttpResponse { - http_response: http::Response::builder() - .status(status_code) - .body(body::empty()) - .unwrap(), - context, + fn connector_response(status_code: StatusCode) -> Response { + connector_response_with_mapping_problems(status_code, vec![]) + } + + fn connector_response_with_mapping_problems( + status_code: StatusCode, + mapping_problems: Vec, + ) -> Response { + Response { + context: context(), + connector: connector().into(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() + .status(status_code) + .body(body::empty()) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: response_key(), + problems: mapping_problems, + }, } } - fn http_response_with_header(context: Context, status_code: StatusCode) -> ConnectorResponse { - HttpResponse { - http_response: http::Response::builder() - .status(status_code) - .header(TEST_HEADER_NAME, TEST_HEADER_VALUE) - .body(body::empty()) - .unwrap(), - context, + fn connector_response_with_header() -> Response { + Response { + context: context(), + connector: connector().into(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() + .status(200) + .header(TEST_HEADER_NAME, TEST_HEADER_VALUE) + .body(body::empty()) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: response_key(), + problems: vec![], + }, } } + fn mapping_problems() -> Vec { + vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ] + } + + fn mapping_problem_array() -> Value { + Value::Array(Array::String(vec![ + StringValue::from(String::from( + "{\"message\":\"error message\",\"path\":\"@.id\",\"count\":1}", + )), + StringValue::from(String::from( + "{\"message\":\"warn message\",\"path\":\"@.id\",\"count\":2}", + )), + StringValue::from(String::from( + "{\"message\":\"info message\",\"path\":\"@.id\",\"count\":3}", + )), + ])) + } + #[test] fn connector_on_request_static_field() { let selector = ConnectorSelector::StaticField { @@ -364,7 +500,7 @@ mod tests { }; assert_eq!( Some(TEST_STATIC.into()), - selector.on_request(&http_request(context(connector_info()))) + selector.on_request(&connector_request(http_request())) ); } @@ -375,7 +511,7 @@ mod tests { }; assert_eq!( Some(TEST_SUBGRAPH_NAME.into()), - selector.on_request(&http_request(context(connector_info()))) + selector.on_request(&connector_request(http_request())) ); } @@ -386,7 +522,7 @@ mod tests { }; assert_eq!( Some(TEST_SOURCE_NAME.into()), - selector.on_request(&http_request(context(connector_info()))) + selector.on_request(&connector_request(http_request())) ); } @@ -397,7 +533,7 @@ mod tests { }; assert_eq!( Some(TEST_URL_TEMPLATE.into()), - selector.on_request(&http_request(context(connector_info()))) + selector.on_request(&connector_request(http_request())) ); } @@ -410,7 +546,7 @@ mod tests { }; assert_eq!( Some("defaulted".into()), - selector.on_request(&http_request(context(connector_info()))) + selector.on_request(&connector_request(http_request())) ); } @@ -423,127 +559,166 @@ mod tests { }; assert_eq!( Some(TEST_HEADER_VALUE.into()), - selector.on_request(&http_request_with_header(context(connector_info()))) + selector.on_request(&connector_request(http_request_with_header())) ); } #[test] - fn connector_on_response_static_field() { - let selector = ConnectorSelector::StaticField { - r#static: TEST_STATIC.into(), + fn connector_on_response_header_defaulted() { + let selector = ConnectorSelector::ConnectorResponseHeader { + connector_http_response_header: TEST_HEADER_NAME.to_string(), + redact: None, + default: Some("defaulted".into()), }; assert_eq!( - Some(TEST_STATIC.into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some("defaulted".into()), + selector.on_response(&connector_response(StatusCode::OK)) ); } #[test] - fn connector_on_response_subgraph_name() { - let selector = ConnectorSelector::SubgraphName { - subgraph_name: true, + fn connector_on_response_header_with_value() { + let selector = ConnectorSelector::ConnectorResponseHeader { + connector_http_response_header: TEST_HEADER_NAME.to_string(), + redact: None, + default: None, }; assert_eq!( - Some(TEST_SUBGRAPH_NAME.into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some(TEST_HEADER_VALUE.into()), + selector.on_response(&connector_response_with_header()) ); } #[test] - fn connector_on_response_connector_source() { - let selector = ConnectorSelector::ConnectorSource { - connector_source: ConnectorSource::Name, + fn connector_on_response_status_code() { + let selector = ConnectorSelector::ConnectorResponseStatus { + connector_http_response_status: ResponseStatus::Code, }; assert_eq!( - Some(TEST_SOURCE_NAME.into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some(200.into()), + selector.on_response(&connector_response(StatusCode::OK)) ); } #[test] - fn connector_on_response_url_template() { - let selector = ConnectorSelector::ConnectorUrlTemplate { - connector_url_template: true, + fn connector_on_response_status_reason_ok() { + let selector = ConnectorSelector::ConnectorResponseStatus { + connector_http_response_status: ResponseStatus::Reason, }; assert_eq!( - Some(TEST_URL_TEMPLATE.into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some("OK".into()), + selector.on_response(&connector_response(StatusCode::OK)) ); } #[test] - fn connector_on_response_header_defaulted() { - let selector = ConnectorSelector::ConnectorResponseHeader { - connector_http_response_header: TEST_HEADER_NAME.to_string(), - redact: None, - default: Some("defaulted".into()), + fn connector_on_response_status_code_not_found() { + let selector = ConnectorSelector::ConnectorResponseStatus { + connector_http_response_status: ResponseStatus::Reason, }; assert_eq!( - Some("defaulted".into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some("Not Found".into()), + selector.on_response(&connector_response(StatusCode::NOT_FOUND)) ); } #[test] - fn connector_on_response_header_with_value() { - let selector = ConnectorSelector::ConnectorResponseHeader { - connector_http_response_header: TEST_HEADER_NAME.to_string(), - redact: None, - default: None, + fn connector_on_request_mapping_problems_none() { + let selector = ConnectorSelector::RequestMappingProblems { + connector_request_mapping_problems: MappingProblems::Problems, }; assert_eq!( - Some(TEST_HEADER_VALUE.into()), - selector.on_response(&http_response_with_header( - context(connector_info()), - StatusCode::OK + Some(Value::Array(Array::String(vec![]))), + selector.on_request(&connector_request(http_request())) + ); + } + + #[test] + fn connector_on_request_mapping_problems_count_zero() { + let selector = ConnectorSelector::RequestMappingProblems { + connector_request_mapping_problems: MappingProblems::Count, + }; + assert_eq!( + Some(0.into()), + selector.on_request(&connector_request(http_request())) + ); + } + + #[test] + fn connector_on_request_mapping_problems() { + let selector = ConnectorSelector::RequestMappingProblems { + connector_request_mapping_problems: MappingProblems::Problems, + }; + assert_eq!( + Some(mapping_problem_array()), + selector.on_request(&connector_request_with_mapping_problems( + http_request(), + mapping_problems() )) ); } #[test] - fn connector_on_response_status_code() { - let selector = ConnectorSelector::ConnectorResponseStatus { - connector_http_response_status: ResponseStatus::Code, + fn connector_on_request_mapping_problems_count() { + let selector = ConnectorSelector::RequestMappingProblems { + connector_request_mapping_problems: MappingProblems::Count, }; assert_eq!( - Some(200.into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some(6.into()), + selector.on_request(&connector_request_with_mapping_problems( + http_request(), + mapping_problems() + )) ); } #[test] - fn connector_on_response_status_reason_ok() { - let selector = ConnectorSelector::ConnectorResponseStatus { - connector_http_response_status: ResponseStatus::Reason, + fn connector_on_response_mapping_problems_none() { + let selector = ConnectorSelector::ResponseMappingProblems { + connector_response_mapping_problems: MappingProblems::Problems, }; assert_eq!( - Some("OK".into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some(Value::Array(Array::String(vec![]))), + selector.on_response(&connector_response(StatusCode::OK)) ); } #[test] - fn connector_on_response_status_code_not_found() { - let selector = ConnectorSelector::ConnectorResponseStatus { - connector_http_response_status: ResponseStatus::Reason, + fn connector_on_response_mapping_problems_count_zero() { + let selector = ConnectorSelector::ResponseMappingProblems { + connector_response_mapping_problems: MappingProblems::Count, }; assert_eq!( - Some("Not Found".into()), - selector.on_response(&http_response( - context(connector_info()), - StatusCode::NOT_FOUND + Some(0.into()), + selector.on_response(&connector_response(StatusCode::OK)) + ); + } + + #[test] + fn connector_on_response_mapping_problems() { + let selector = ConnectorSelector::ResponseMappingProblems { + connector_response_mapping_problems: MappingProblems::Problems, + }; + assert_eq!( + Some(mapping_problem_array()), + selector.on_response(&connector_response_with_mapping_problems( + StatusCode::OK, + mapping_problems() )) ); } #[test] - fn connector_on_response_http_method() { - let selector = ConnectorSelector::ConnectorHttpMethod { - connector_http_method: true, + fn connector_on_response_mapping_problems_count() { + let selector = ConnectorSelector::ResponseMappingProblems { + connector_response_mapping_problems: MappingProblems::Count, }; assert_eq!( - Some(HTTPMethod::Get.as_str().into()), - selector.on_response(&http_response(context(connector_info()), StatusCode::OK)) + Some(6.into()), + selector.on_response(&connector_response_with_mapping_problems( + StatusCode::OK, + mapping_problems() + )) ); } diff --git a/apollo-router/src/plugins/telemetry/config_new/events.rs b/apollo-router/src/plugins/telemetry/config_new/events.rs index 2887a4bf49..7ea219bf2a 100644 --- a/apollo-router/src/plugins/telemetry/config_new/events.rs +++ b/apollo-router/src/plugins/telemetry/config_new/events.rs @@ -838,7 +838,16 @@ pub(crate) fn log_event(level: EventLevel, kind: &str, attributes: Vec #[cfg(test)] mod tests { + use std::str::FromStr; + + use apollo_compiler::name; + use apollo_federation::sources::connect::ConnectId; + use apollo_federation::sources::connect::ConnectSpec; + use apollo_federation::sources::connect::Connector; use apollo_federation::sources::connect::HTTPMethod; + use apollo_federation::sources::connect::HttpJsonTransport; + use apollo_federation::sources::connect::JSONSelection; + use apollo_federation::sources::connect::URLTemplate; use http::header::CONTENT_LENGTH; use http::HeaderValue; use router::body; @@ -849,12 +858,15 @@ mod tests { use crate::context::CONTAINS_GRAPHQL_ERROR; use crate::context::OPERATION_NAME; use crate::graphql; + use crate::plugins::connectors::handle_responses::MappedResponse; + use crate::plugins::connectors::make_requests::ResponseKey; use crate::plugins::telemetry::Telemetry; use crate::plugins::test::PluginTestHarness; - use crate::services::connector_service::ConnectorInfo; - use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY; - use crate::services::http::HttpRequest; - use crate::services::http::HttpResponse; + use crate::services::connector::request_service::transport; + use crate::services::connector::request_service::Request; + use crate::services::connector::request_service::Response; + use crate::services::connector::request_service::TransportRequest; + use crate::services::connector::request_service::TransportResponse; #[tokio::test(flavor = "multi_thread")] async fn test_router_events() { @@ -1188,36 +1200,73 @@ mod tests { .await; async { - let connector_info = ConnectorInfo { - subgraph_name: "subgraph".to_string(), - source_name: Some("source".to_string()), - http_method: HTTPMethod::Get.as_str().to_string(), - url_template: "/test".to_string(), - }; - let context = Context::default(); - context - .insert(CONNECTOR_INFO_CONTEXT_KEY, connector_info) - .unwrap(); + let context = crate::Context::default(); let mut http_request = http::Request::builder().body(body::empty()).unwrap(); http_request .headers_mut() .insert("x-log-request", HeaderValue::from_static("log")); - let http_request = HttpRequest { - http_request, + let transport_request = TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }); + let connector = Connector { + id: ConnectId::new( + "subgraph".into(), + Some("source".into()), + name!(Query), + name!(users), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str("/test").unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + }; + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + let connector_request = Request { context: context.clone(), + connector: Arc::new(connector.clone()), + service_name: Default::default(), + transport_request, + key: response_key.clone(), + mapping_problems: vec![], }; test_harness - .http_client_service("subgraph", |http_request| async move { - Ok(HttpResponse { - http_response: http::Response::builder() + .call_connector_request_service(connector_request, |request| Response { + context: request.context.clone(), + connector: request.connector.clone(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() .status(200) .header("x-log-request", HeaderValue::from_static("log")) .body(body::empty()) - .expect("expecting valid response"), - context: http_request.context.clone(), - }) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: request.key.clone(), + problems: vec![], + }, }) - .call(http_request) .await .expect("expecting successful response"); } @@ -1233,36 +1282,73 @@ mod tests { .await; async { - let connector_info = ConnectorInfo { - subgraph_name: "subgraph".to_string(), - source_name: Some("source".to_string()), - http_method: HTTPMethod::Get.as_str().to_string(), - url_template: "/test".to_string(), - }; - let context = Context::default(); - context - .insert(CONNECTOR_INFO_CONTEXT_KEY, connector_info) - .unwrap(); + let context = crate::Context::default(); let mut http_request = http::Request::builder().body(body::empty()).unwrap(); http_request .headers_mut() .insert("x-log-response", HeaderValue::from_static("log")); - let http_request = HttpRequest { - http_request, + let transport_request = TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }); + let connector = Connector { + id: ConnectId::new( + "subgraph".into(), + Some("source".into()), + name!(Query), + name!(users), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str("/test").unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + }; + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + let connector_request = Request { context: context.clone(), + connector: Arc::new(connector.clone()), + service_name: Default::default(), + transport_request, + key: response_key.clone(), + mapping_problems: vec![], }; test_harness - .http_client_service("subgraph", |http_request| async move { - Ok(HttpResponse { - http_response: http::Response::builder() + .call_connector_request_service(connector_request, |request| Response { + context: request.context.clone(), + connector: request.connector.clone(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() .status(200) .header("x-log-response", HeaderValue::from_static("log")) .body(body::empty()) - .expect("expecting valid response"), - context: http_request.context.clone(), - }) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: request.key.clone(), + problems: vec![], + }, }) - .call(http_request) .await .expect("expecting successful response"); } diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_counter_with_conditions/test.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_counter_with_conditions/test.yaml index 37f88836a0..07f3455092 100644 --- a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_counter_with_conditions/test.yaml +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_counter_with_conditions/test.yaml @@ -1,23 +1,30 @@ description: Custom counter with conditions events: - - - context: - map: - "apollo_router::connector::info": - subgraph_name: users - source_name: user_api - http_method: GET - url_template: "/user/{$this.userid}" - - http_request: + - - connector_request: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/user/{$this.userid}" uri: "/user/1" - method: GET - - http_response: + - connector_response: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/user/{$this.userid}" status: 200 body: | { "username": "foo" } - - http_request: + - connector_request: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/user/{$this.userid}" uri: "/user/1" - method: GET - - http_response: + - connector_response: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/user/{$this.userid}" status: 404 body: | { "error": "not found" } diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_histogram/test.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_histogram/test.yaml index ec048a9937..bf1a127429 100644 --- a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_histogram/test.yaml +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/custom_histogram/test.yaml @@ -1,25 +1,32 @@ description: Both subgraph and connector HTTP client duration metrics events: - - - context: - map: - "apollo_router::connector::info": - subgraph_name: users - source_name: user_api - http_method: GET - url_template: "/users" - - http_request: + - - connector_request: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" uri: "/users" - method: GET - - http_response: + - connector_response: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" status: 200 headers: x-ratelimit-remaining: 999 body: | { "username": "foo" } - - http_request: + - connector_request: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" uri: "/users" - method: GET - - http_response: + - connector_response: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" status: 200 headers: x-ratelimit-remaining: 500 diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/http_client_request_duration/test.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/http_client_request_duration/test.yaml index 4673c57c64..48987a1e11 100644 --- a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/http_client_request_duration/test.yaml +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/http_client_request_duration/test.yaml @@ -1,18 +1,18 @@ description: Connector HTTP client duration metric events: - - - context: - map: - "apollo_router::connector::info": - subgraph_name: posts - source_name: posts_api - http_method: GET - url_template: "/posts" - - http_request: + - - connector_request: + subgraph_name: posts + source_name: posts_api + http_method: GET + url_template: "/posts" uri: "/posts" - method: GET headers: custom_request_header: custom_request_header_value - - http_response: + - connector_response: + subgraph_name: posts + source_name: posts_api + http_method: GET + url_template: "/posts" status: 200 body: | { "foo": "bar" } diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/metrics.snap b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/metrics.snap new file mode 100644 index 0000000000..23c079705b --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/metrics.snap @@ -0,0 +1,45 @@ +--- +source: apollo-router/src/plugins/telemetry/config_new/instruments.rs +description: Both subgraph and connector HTTP client duration metrics +expression: "&metrics.all()" +info: + telemetry: + instrumentation: + instruments: + default_requirement_level: none + connector: + request.mapping.problems: + description: Count of connectors request mapping problems + value: + connector_request_mapping_problems: count + attributes: + connector.source: + connector_source: name + unit: count + type: counter + response.mapping.problems: + description: Count of connectors response mapping problems + value: + connector_response_mapping_problems: count + attributes: + connector.source: + connector_source: name + unit: count + type: counter +--- +- name: request.mapping.problems + description: Count of connectors request mapping problems + unit: count + data: + datapoints: + - value: 15 + attributes: + connector.source: user_api +- name: response.mapping.problems + description: Count of connectors response mapping problems + unit: count + data: + datapoints: + - value: 5 + attributes: + connector.source: user_api diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/router.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/router.yaml new file mode 100644 index 0000000000..4404da4815 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/router.yaml @@ -0,0 +1,23 @@ +telemetry: + instrumentation: + instruments: + default_requirement_level: none + connector: + request.mapping.problems: + description: "Count of connectors request mapping problems" + value: + connector_request_mapping_problems: count + attributes: + connector.source: + connector_source: name + unit: count + type: counter + response.mapping.problems: + description: "Count of connectors response mapping problems" + value: + connector_response_mapping_problems: count + attributes: + connector.source: + connector_source: name + unit: count + type: counter \ No newline at end of file diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/test.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/test.yaml new file mode 100644 index 0000000000..ab010c94a1 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/mapping_problems/test.yaml @@ -0,0 +1,58 @@ +description: Both subgraph and connector HTTP client duration metrics +events: + - - connector_request: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" + uri: "/users" + mapping_problems: + - level: Warn + count: 3 + message: "request warn message" + path: "@.id" + - level: Info + count: 7 + message: "request info message" + path: "@.id" + - connector_response: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" + status: 200 + body: | + { "username": "foo" } + mapping_problems: + - level: Info + count: 3 + message: "response info message" + path: "@.id" + - connector_request: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" + uri: "/users" + mapping_problems: + - level: Info + count: 5 + message: "request info message" + path: "@.id" + - connector_response: + subgraph_name: users + source_name: user_api + http_method: GET + url_template: "/users" + status: 200 + body: | + { "username": "foo" } + mapping_problems: + - level: Warn + count: 1 + message: "response warn message" + path: "@.id" + - level: Error + count: 1 + message: "response error message" + path: "@.id" \ No newline at end of file diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/subgraph_and_connector/test.yaml b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/subgraph_and_connector/test.yaml index 62c554512a..bb03ee1b69 100644 --- a/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/subgraph_and_connector/test.yaml +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/connector/subgraph_and_connector/test.yaml @@ -18,17 +18,17 @@ events: status: 200 data: hello: "world" - - context: - map: - "apollo_router::connector::info": - subgraph_name: reviews - source_name: reviews_api - http_method: GET - url_template: "/reviews" - - http_request: + - connector_request: + subgraph_name: reviews + source_name: reviews_api + http_method: GET + url_template: "/reviews" uri: "/reviews" - method: GET - - http_response: + - connector_response: + subgraph_name: reviews + source_name: reviews_api + http_method: GET + url_template: "/reviews" status: 200 body: | { "foo": "bar" } diff --git a/apollo-router/src/plugins/telemetry/config_new/fixtures/schema.json b/apollo-router/src/plugins/telemetry/config_new/fixtures/schema.json index ecc1409f59..7c3f398bd8 100644 --- a/apollo-router/src/plugins/telemetry/config_new/fixtures/schema.json +++ b/apollo-router/src/plugins/telemetry/config_new/fixtures/schema.json @@ -447,14 +447,17 @@ { "type": "object", "required": [ - "http_request" + "connector_request" ], "properties": { - "http_request": { + "connector_request": { "type": "object", "required": [ - "method", - "uri" + "http_method", + "source_name", + "subgraph_name", + "uri", + "url_template" ], "properties": { "body": { @@ -470,11 +473,27 @@ "type": "string" } }, - "method": { + "http_method": { + "type": "string" + }, + "mapping_problems": { + "default": [], + "type": "array", + "items": { + "$ref": "#/definitions/Problem" + } + }, + "source_name": { + "type": "string" + }, + "subgraph_name": { "type": "string" }, "uri": { "type": "string" + }, + "url_template": { + "type": "string" } }, "additionalProperties": false @@ -485,14 +504,18 @@ { "type": "object", "required": [ - "http_response" + "connector_response" ], "properties": { - "http_response": { + "connector_response": { "type": "object", "required": [ "body", - "status" + "http_method", + "source_name", + "status", + "subgraph_name", + "url_template" ], "properties": { "body": { @@ -505,10 +528,29 @@ "type": "string" } }, + "http_method": { + "type": "string" + }, + "mapping_problems": { + "default": [], + "type": "array", + "items": { + "$ref": "#/definitions/Problem" + } + }, + "source_name": { + "type": "string" + }, "status": { "type": "integer", "format": "uint16", "minimum": 0.0 + }, + "subgraph_name": { + "type": "string" + }, + "url_template": { + "type": "string" } }, "additionalProperties": false @@ -527,6 +569,28 @@ "subscription" ] }, + "Problem": { + "description": "A mapping problem", + "type": "object", + "required": [ + "count", + "message", + "path" + ], + "properties": { + "count": { + "type": "integer", + "format": "uint", + "minimum": 0.0 + }, + "message": { + "type": "string" + }, + "path": { + "type": "string" + } + } + }, "TypedValueMirror": { "oneOf": [ { diff --git a/apollo-router/src/plugins/telemetry/config_new/instruments.rs b/apollo-router/src/plugins/telemetry/config_new/instruments.rs index 5b1e5911de..c47d11ac87 100644 --- a/apollo-router/src/plugins/telemetry/config_new/instruments.rs +++ b/apollo-router/src/plugins/telemetry/config_new/instruments.rs @@ -2622,7 +2622,15 @@ mod tests { use apollo_compiler::ast::NamedType; use apollo_compiler::executable::SelectionSet; + use apollo_compiler::name; use apollo_compiler::Name; + use apollo_federation::sources::connect::ConnectId; + use apollo_federation::sources::connect::ConnectSpec; + use apollo_federation::sources::connect::Connector; + use apollo_federation::sources::connect::HTTPMethod; + use apollo_federation::sources::connect::HttpJsonTransport; + use apollo_federation::sources::connect::JSONSelection; + use apollo_federation::sources::connect::URLTemplate; use http::HeaderMap; use http::HeaderName; use http::Method; @@ -2646,6 +2654,9 @@ mod tests { use crate::http_ext::TryIntoHeaderValue; use crate::json_ext::Path; use crate::metrics::FutureMetricsExt; + use crate::plugins::connectors::handle_responses::MappedResponse; + use crate::plugins::connectors::make_requests::ResponseKey; + use crate::plugins::connectors::mapping::Problem; use crate::plugins::telemetry::config_new::cache::CacheInstruments; use crate::plugins::telemetry::config_new::graphql::GraphQLInstruments; use crate::plugins::telemetry::config_new::instruments::Instrumented; @@ -2654,8 +2665,11 @@ mod tests { use crate::plugins::telemetry::APOLLO_PRIVATE_QUERY_DEPTH; use crate::plugins::telemetry::APOLLO_PRIVATE_QUERY_HEIGHT; use crate::plugins::telemetry::APOLLO_PRIVATE_QUERY_ROOT_FIELDS; - use crate::services::http::HttpRequest; - use crate::services::http::HttpResponse; + use crate::services::connector::request_service::transport; + use crate::services::connector::request_service::Request; + use crate::services::connector::request_service::Response; + use crate::services::connector::request_service::TransportRequest; + use crate::services::connector::request_service::TransportResponse; use crate::services::OperationKind; use crate::services::RouterRequest; use crate::services::RouterResponse; @@ -2765,18 +2779,29 @@ mod tests { ResponseField { typed_value: TypedValueMirror, }, - HttpRequest { - method: String, + ConnectorRequest { + subgraph_name: String, + source_name: String, + http_method: String, + url_template: String, uri: String, #[serde(default)] headers: HashMap, body: Option, + #[serde(default)] + mapping_problems: Vec, }, - HttpResponse { + ConnectorResponse { + subgraph_name: String, + source_name: String, + http_method: String, + url_template: String, status: u16, #[serde(default)] headers: HashMap, body: String, + #[serde(default)] + mapping_problems: Vec, }, } @@ -3203,21 +3228,69 @@ mod tests { } } } - Event::HttpRequest { - method, + Event::ConnectorRequest { + subgraph_name, + source_name, + http_method, + url_template, uri, headers, body, + mapping_problems, } => { let mut http_request = http::Request::builder() - .method(Method::from_str(&method).expect("method")) + .method(Method::from_str(&http_method).expect("method")) .uri(Uri::from_str(&uri).expect("uri")) .body(body.map(body::from_bytes).unwrap_or(body::empty())) .unwrap(); *http_request.headers_mut() = convert_http_headers(headers); - let request = HttpRequest { - http_request, - context: context.clone(), + let transport_request = + TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }); + let connector = Connector { + id: ConnectId::new( + subgraph_name, + Some(source_name), + name!(Query), + name!(field), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str( + url_template.as_str(), + ) + .unwrap(), + method: HTTPMethod::from_str(http_method.as_str()) + .unwrap(), + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + }; + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new( + JSONSelection::parse("$.data").unwrap(), + ), + }; + let request = Request { + context: Context::default(), + connector: Arc::new(connector), + service_name: Default::default(), + transport_request, + key: response_key.clone(), + mapping_problems, }; connector_instruments = Some({ let connector_instruments = config @@ -3228,23 +3301,75 @@ mod tests { connector_instruments }); } - Event::HttpResponse { + Event::ConnectorResponse { + subgraph_name, + source_name, + http_method, + url_template, status, headers, body, + mapping_problems, } => { + let connector = Connector { + id: ConnectId::new( + subgraph_name, + Some(source_name), + name!(Query), + name!(field), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str( + url_template.as_str(), + ) + .unwrap(), + method: HTTPMethod::from_str(http_method.as_str()) + .unwrap(), + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + }; + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new( + JSONSelection::parse("$.data").unwrap(), + ), + }; let mut http_response = http::Response::builder() .status(StatusCode::from_u16(status).expect("status")) .body(router::body::from_bytes(body)) .unwrap(); *http_response.headers_mut() = convert_http_headers(headers); - let response = HttpResponse { - http_response, - context: context.clone(), + let response = Response { + context: Context::default(), + connector: connector.into(), + transport_result: Ok(TransportResponse::Http( + transport::http::HttpResponse { + inner: http_response.into_parts().0, + }, + )), + mapped_response: MappedResponse::Data { + data: json!({}) + .try_into() + .expect("expecting valid JSON"), + key: response_key, + problems: mapping_problems, + }, }; connector_instruments .take() - .expect("http request must have been made first") + .expect("connector request must have been made first") .on_response(&response); } } diff --git a/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_request@logs.snap b/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_request@logs.snap index 0a8dbbea06..38c8ebbdc6 100644 --- a/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_request@logs.snap +++ b/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_request@logs.snap @@ -2,10 +2,6 @@ source: apollo-router/src/plugins/telemetry/config_new/events.rs expression: yaml --- -- fields: - kind: connector.request - level: INFO - message: "" - fields: kind: my.request.event level: INFO diff --git a/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_response@logs.snap b/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_response@logs.snap index 3ff31c6764..5fc691eb53 100644 --- a/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_response@logs.snap +++ b/apollo-router/src/plugins/telemetry/config_new/snapshots/apollo_router__plugins__telemetry__config_new__events__tests__connector_events_response@logs.snap @@ -2,10 +2,6 @@ source: apollo-router/src/plugins/telemetry/config_new/events.rs expression: yaml --- -- fields: - kind: connector.response - level: WARN - message: "" - fields: kind: my.response.event level: ERROR diff --git a/apollo-router/src/plugins/telemetry/consts.rs b/apollo-router/src/plugins/telemetry/consts.rs index dbf34f4532..ca23eb719c 100644 --- a/apollo-router/src/plugins/telemetry/consts.rs +++ b/apollo-router/src/plugins/telemetry/consts.rs @@ -20,8 +20,9 @@ pub(crate) const HTTP_REQUEST_SPAN_NAME: &str = "http_request"; pub(crate) const SUBGRAPH_REQUEST_SPAN_NAME: &str = "subgraph_request"; pub(crate) const QUERY_PARSING_SPAN_NAME: &str = "parse_query"; pub(crate) const CONNECT_SPAN_NAME: &str = "connect"; +pub(crate) const CONNECT_REQUEST_SPAN_NAME: &str = "connect_request"; -pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 9] = [ +pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 11] = [ REQUEST_SPAN_NAME, ROUTER_SPAN_NAME, SUPERGRAPH_SPAN_NAME, @@ -31,4 +32,6 @@ pub(crate) const BUILT_IN_SPAN_NAMES: [&str; 9] = [ QUERY_PLANNING_SPAN_NAME, EXECUTION_SPAN_NAME, QUERY_PARSING_SPAN_NAME, + CONNECT_SPAN_NAME, + CONNECT_REQUEST_SPAN_NAME, ]; diff --git a/apollo-router/src/plugins/telemetry/dynamic_attribute.rs b/apollo-router/src/plugins/telemetry/dynamic_attribute.rs index 08f9ce6d54..e9cc12c169 100644 --- a/apollo-router/src/plugins/telemetry/dynamic_attribute.rs +++ b/apollo-router/src/plugins/telemetry/dynamic_attribute.rs @@ -69,11 +69,6 @@ impl DynAttributeLayer { pub(crate) trait SpanDynAttribute { fn set_span_dyn_attribute(&self, key: Key, value: opentelemetry::Value); fn set_span_dyn_attributes(&self, attributes: impl IntoIterator); - fn set_span_dyn_attributes_for_span_name( - &self, - span_name: &str, - attributes: impl IntoIterator, - ); } impl SpanDynAttribute for ::tracing::Span { @@ -201,94 +196,6 @@ impl SpanDynAttribute for ::tracing::Span { } }); } - - fn set_span_dyn_attributes_for_span_name( - &self, - span_name: &str, - attributes: impl IntoIterator, - ) { - let mut attributes = attributes.into_iter().peekable(); - if attributes.peek().is_none() { - return; - } - self.with_subscriber(move |(id, dispatch)| { - if let Some(reg) = dispatch.downcast_ref::() { - match reg.span(id) { - None => eprintln!("no spanref, this is a bug"), - Some(mut s) => { - if s.name() != span_name { - while let Some(parent_span) = s.parent() { - if parent_span.name() == span_name { - s = parent_span; - break; - } - s = parent_span; - } - } - - if s.is_sampled() { - let mut extensions = s.extensions_mut(); - match extensions.get_mut::() { - Some(otel_data) => { - if otel_data.builder.attributes.is_none() { - otel_data.builder.attributes = Some( - attributes - .inspect(|attr| { - update_otel_data( - otel_data, - &attr.key, - &attr.value, - ) - }) - .collect(), - ); - } else { - let attributes: Vec = attributes - .inspect(|attr| { - update_otel_data(otel_data, &attr.key, &attr.value) - }) - .collect(); - otel_data - .builder - .attributes - .as_mut() - .unwrap() - .extend(attributes); - } - } - None => { - // Can't use ::tracing::error! because it could create deadlock on extensions - eprintln!("no OtelData, this is a bug"); - } - } - } else { - let mut attributes = attributes - .filter(|kv| { - !kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX) - && !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX) - }) - .peekable(); - if attributes.peek().is_none() { - return; - } - let mut extensions = s.extensions_mut(); - match extensions.get_mut::() { - Some(registered_attributes) => { - registered_attributes.extend(attributes); - } - None => { - // Can't use ::tracing::error! because it could create deadlock on extensions - eprintln!("no LogAttributes, this is a bug"); - } - } - } - } - }; - } else { - ::tracing::error!("no Registry, this is a bug"); - } - }); - } } fn update_otel_data(otel_data: &mut OtelData, key: &Key, value: &opentelemetry::Value) { diff --git a/apollo-router/src/plugins/telemetry/fmt_layer.rs b/apollo-router/src/plugins/telemetry/fmt_layer.rs index 330453a4cf..1a091d2e4e 100644 --- a/apollo-router/src/plugins/telemetry/fmt_layer.rs +++ b/apollo-router/src/plugins/telemetry/fmt_layer.rs @@ -260,11 +260,19 @@ impl field::Visit for FieldsVisitor<'_, '_> { #[cfg(test)] mod tests { + use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; + use apollo_compiler::name; + use apollo_federation::sources::connect::ConnectId; + use apollo_federation::sources::connect::ConnectSpec; + use apollo_federation::sources::connect::Connector; use apollo_federation::sources::connect::HTTPMethod; + use apollo_federation::sources::connect::HttpJsonTransport; + use apollo_federation::sources::connect::JSONSelection; + use apollo_federation::sources::connect::URLTemplate; use http::header::CONTENT_LENGTH; use http::HeaderValue; use tests::events::RouterResponseBodyExtensionType; @@ -276,6 +284,9 @@ mod tests { use super::*; use crate::graphql; + use crate::plugins::connectors::handle_responses::MappedResponse; + use crate::plugins::connectors::make_requests::ResponseKey; + use crate::plugins::connectors::mapping::Problem; use crate::plugins::telemetry::config_new::events; use crate::plugins::telemetry::config_new::events::log_event; use crate::plugins::telemetry::config_new::events::EventLevel; @@ -285,10 +296,11 @@ mod tests { use crate::plugins::telemetry::config_new::logging::TextFormat; use crate::plugins::telemetry::dynamic_attribute::SpanDynAttribute; use crate::plugins::telemetry::otel; - use crate::services::connector_service::ConnectorInfo; - use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY; - use crate::services::http::HttpRequest; - use crate::services::http::HttpResponse; + use crate::services::connector::request_service::transport; + use crate::services::connector::request_service::Request; + use crate::services::connector::request_service::Response; + use crate::services::connector::request_service::TransportRequest; + use crate::services::connector::request_service::TransportResponse; use crate::services::router; use crate::services::router::body; use crate::services::subgraph; @@ -372,10 +384,7 @@ subgraph: default: "missing" connector: - # Standard events - request: info - response: warn - error: error + # Standard events cannot be tested, because the test does not call the service that emits them # Custom events my.connector.request.event: @@ -390,6 +399,10 @@ connector: connector_http_method: true url_template: connector_url_template: true + mapping_problems: + connector_request_mapping_problems: problems + mapping_problems_count: + connector_request_mapping_problems: count my.connector.response.event: message: "my response event message" level: error @@ -403,7 +416,11 @@ connector: url_template: connector_url_template: true response_status: - connector_http_response_status: code"#; + connector_http_response_status: code + mapping_problems: + connector_response_mapping_problems: problems + mapping_problems_count: + connector_response_mapping_problems: count"#; #[derive(Default, Clone)] struct LogBuffer(Arc>>); @@ -829,36 +846,108 @@ connector: .expect("expecting valid response"); subgraph_events.on_response(&subgraph_resp); - let connector_info = ConnectorInfo { - subgraph_name: "connector_subgraph".to_string(), - source_name: Some("source".to_string()), - http_method: HTTPMethod::Get.as_str().to_string(), - url_template: "/test".to_string(), - }; let context = crate::Context::default(); - context - .insert(CONNECTOR_INFO_CONTEXT_KEY, connector_info) - .unwrap(); let mut http_request = http::Request::builder().body(body::empty()).unwrap(); http_request .headers_mut() .insert("x-log-request", HeaderValue::from_static("log")); - let http_request = HttpRequest { - http_request, - context, + let transport_request = TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }); + let connector = Arc::new(Connector { + id: ConnectId::new( + "connector_subgraph".into(), + Some("source".into()), + name!(Query), + name!(users), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str("/test").unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + }); + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + let connector_request = Request { + context: context.clone(), + connector: connector.clone(), + service_name: Default::default(), + transport_request, + key: response_key.clone(), + mapping_problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], }; let connector_events = event_config.new_connector_events(); - connector_events.on_request(&http_request); - - let http_response = HttpResponse { - http_response: http::Response::builder() - .status(200) - .header("x-log-response", HeaderValue::from_static("log")) - .body(body::empty()) - .expect("expecting valid response"), - context: Default::default(), + connector_events.on_request(&connector_request); + + let connector_response = Response { + context, + connector: connector.clone(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() + .status(200) + .header("x-log-response", HeaderValue::from_static("log")) + .body(body::empty()) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: response_key, + problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], + }, }; - connector_events.on_response(&http_response); + connector_events.on_response(&connector_response); }, ); @@ -1005,36 +1094,108 @@ connector: .expect("expecting valid response"); subgraph_events.on_response(&subgraph_resp); - let connector_info = ConnectorInfo { - subgraph_name: "connector_subgraph".to_string(), - source_name: Some("source".to_string()), - http_method: HTTPMethod::Get.as_str().to_string(), - url_template: "/test".to_string(), - }; let context = crate::Context::default(); - context - .insert(CONNECTOR_INFO_CONTEXT_KEY, connector_info) - .unwrap(); let mut http_request = http::Request::builder().body(body::empty()).unwrap(); http_request .headers_mut() .insert("x-log-request", HeaderValue::from_static("log")); - let http_request = HttpRequest { - http_request, - context, + let transport_request = TransportRequest::Http(transport::http::HttpRequest { + inner: http_request, + debug: None, + }); + let connector = Arc::new(Connector { + id: ConnectId::new( + "connector_subgraph".into(), + Some("source".into()), + name!(Query), + name!(users), + 0, + "label", + ), + transport: HttpJsonTransport { + source_url: None, + connect_template: URLTemplate::from_str("/test").unwrap(), + method: HTTPMethod::Get, + headers: Default::default(), + body: None, + }, + selection: JSONSelection::empty(), + config: None, + max_requests: None, + entity_resolver: None, + spec: ConnectSpec::V0_1, + request_variables: Default::default(), + response_variables: Default::default(), + }); + let response_key = ResponseKey::RootField { + name: "hello".to_string(), + inputs: Default::default(), + selection: Arc::new(JSONSelection::parse("$.data").unwrap()), + }; + let connector_request = Request { + context: context.clone(), + connector: connector.clone(), + service_name: Default::default(), + transport_request, + key: response_key.clone(), + mapping_problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], }; let connector_events = event_config.new_connector_events(); - connector_events.on_request(&http_request); - - let http_response = HttpResponse { - http_response: http::Response::builder() - .status(200) - .header("x-log-response", HeaderValue::from_static("log")) - .body(body::empty()) - .expect("expecting valid response"), - context: Default::default(), + connector_events.on_request(&connector_request); + + let connector_response = Response { + context, + connector: connector.clone(), + transport_result: Ok(TransportResponse::Http(transport::http::HttpResponse { + inner: http::Response::builder() + .status(200) + .header("x-log-response", HeaderValue::from_static("log")) + .body(body::empty()) + .expect("expecting valid response") + .into_parts() + .0, + })), + mapped_response: MappedResponse::Data { + data: serde_json::json!({}) + .try_into() + .expect("expecting valid JSON"), + key: response_key, + problems: vec![ + Problem { + count: 1, + message: "error message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 2, + message: "warn message".to_string(), + path: "@.id".to_string(), + }, + Problem { + count: 3, + message: "info message".to_string(), + path: "@.id".to_string(), + }, + ], + }, }; - connector_events.on_response(&http_response); + connector_events.on_response(&connector_response); }, ); diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 5c33c5e2cb..e29b5d366c 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -105,7 +105,6 @@ use crate::plugins::telemetry::config_new::graphql::GraphQLInstruments; use crate::plugins::telemetry::config_new::instruments::SupergraphInstruments; use crate::plugins::telemetry::config_new::trace_id; use crate::plugins::telemetry::config_new::DatadogId; -use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME; use crate::plugins::telemetry::consts::EXECUTION_SPAN_NAME; use crate::plugins::telemetry::consts::OTEL_NAME; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; @@ -134,9 +133,8 @@ use crate::plugins::telemetry::tracing::TracingConfigurator; use crate::query_planner::OperationKind; use crate::register_private_plugin; use crate::router_factory::Endpoint; -use crate::services::connector_service::CONNECTOR_INFO_CONTEXT_KEY; +use crate::services::connector; use crate::services::execution; -use crate::services::http::HttpRequest; use crate::services::layers::apq::PERSISTED_QUERY_CACHE_HIT; use crate::services::router; use crate::services::subgraph; @@ -871,44 +869,35 @@ impl PluginPrivate for Telemetry { .boxed() } - fn http_client_service( + fn connector_request_service( &self, - _subgraph_name: &str, - service: crate::services::http::BoxService, - ) -> crate::services::http::BoxService { + service: connector::request_service::BoxService, + ) -> connector::request_service::BoxService { let req_fn_config = self.config.clone(); let res_fn_config = self.config.clone(); let static_connector_instruments = self.connector_custom_instruments.read().clone(); ServiceBuilder::new() .map_future_with_request_data( - move |http_request: &HttpRequest| { - if http_request - .context - .contains_key(CONNECTOR_INFO_CONTEXT_KEY) - { - let custom_instruments = req_fn_config - .instrumentation - .instruments - .new_connector_instruments(static_connector_instruments.clone()); - custom_instruments.on_request(http_request); - let custom_events = - req_fn_config.instrumentation.events.new_connector_events(); - custom_events.on_request(http_request); - - let custom_span_attributes = req_fn_config - .instrumentation - .spans - .connector - .attributes - .on_request(http_request); + move |request: &connector::request_service::Request| { + let custom_instruments = req_fn_config + .instrumentation + .instruments + .new_connector_instruments(static_connector_instruments.clone()); + custom_instruments.on_request(request); + let custom_events = req_fn_config.instrumentation.events.new_connector_events(); + custom_events.on_request(request); - ( - http_request.context.clone(), - Some((custom_instruments, custom_events, custom_span_attributes)), - ) - } else { - (http_request.context.clone(), None) - } + let custom_span_attributes = req_fn_config + .instrumentation + .spans + .connector + .attributes + .on_request(request); + + ( + request.context.clone(), + Some((custom_instruments, custom_events, custom_span_attributes)), + ) }, move |(context, custom_telemetry): ( Context, @@ -916,34 +905,30 @@ impl PluginPrivate for Telemetry { ), f: BoxFuture< 'static, - Result, + Result, >| { let conf = res_fn_config.clone(); async move { match custom_telemetry { Some((custom_instruments, custom_events, custom_span_attributes)) => { let span = Span::current(); - span.set_span_dyn_attributes_for_span_name( - CONNECT_SPAN_NAME, - custom_span_attributes, - ); + span.set_span_dyn_attributes(custom_span_attributes); + let result = f.await; match &result { - Ok(http_response) => { - span.set_span_dyn_attributes_for_span_name( - CONNECT_SPAN_NAME, + Ok(response) => { + span.set_span_dyn_attributes( conf.instrumentation .spans .connector .attributes - .on_response(http_response), + .on_response(response), ); - custom_instruments.on_response(http_response); - custom_events.on_response(http_response); + custom_instruments.on_response(response); + custom_events.on_response(response); } Err(err) => { - span.set_span_dyn_attributes_for_span_name( - CONNECT_SPAN_NAME, + span.set_span_dyn_attributes( conf.instrumentation .spans .connector diff --git a/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__json_logging_with_custom_events_with_instrumented.snap b/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__json_logging_with_custom_events_with_instrumented.snap index 529500dc37..db133634c3 100644 --- a/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__json_logging_with_custom_events_with_instrumented.snap +++ b/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__json_logging_with_custom_events_with_instrumented.snap @@ -14,7 +14,5 @@ expression: buff.to_string() {"timestamp":"[timestamp]","level":"ERROR","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","my.custom.attribute":["{\"id\":1234,\"name\":\"first_name\"}","{\"id\":567,\"name\":\"second_name\"}"],"response_status":200,"subgraph.name":"subgraph","message":"my response event message","kind":"my.subgraph.response.event","target":"apollo_router::plugins::telemetry::config_new::events"} {"timestamp":"[timestamp]","level":"INFO","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","message":"my event message","kind":"my.subgraph.request.event","target":"apollo_router::plugins::telemetry::config_new::events"} {"timestamp":"[timestamp]","level":"ERROR","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","my.custom.attribute":"[[{\"id\":1234,\"name\":\"first_name\"},{\"id\":567,\"name\":\"second_name\"}],{\"foo\":\"bar\"}]","response_status":200,"subgraph.name":"subgraph_bis","message":"my response event message","kind":"my.subgraph.response.event","target":"apollo_router::plugins::telemetry::config_new::events"} -{"timestamp":"[timestamp]","level":"INFO","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","http.request.headers":"{\"x-log-request\": \"log\"}","http.request.method":"GET","http.request.uri":"/","http.request.version":"HTTP/1.1","message":"","kind":"connector.request","target":"apollo_router::plugins::telemetry::config_new::events"} -{"timestamp":"[timestamp]","level":"INFO","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","connector_source":"source","http_method":"GET","subgraph.name":"connector_subgraph","url_template":"/test","message":"my request event message","kind":"my.connector.request.event","target":"apollo_router::plugins::telemetry::config_new::events"} -{"timestamp":"[timestamp]","level":"WARN","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","http.response.headers":"{\"x-log-response\": \"log\"}","http.response.status":"200 OK","http.response.version":"HTTP/1.1","message":"","kind":"connector.response","target":"apollo_router::plugins::telemetry::config_new::events"} -{"timestamp":"[timestamp]","level":"ERROR","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","connector_source":"source","http_method":"GET","response_status":200,"subgraph.name":"connector_subgraph","url_template":"/test","message":"my response event message","kind":"my.connector.response.event","target":"apollo_router::plugins::telemetry::config_new::events"} +{"timestamp":"[timestamp]","level":"INFO","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","connector_source":"source","http_method":"GET","mapping_problems":["{\"message\":\"error message\",\"path\":\"@.id\",\"count\":1}","{\"message\":\"warn message\",\"path\":\"@.id\",\"count\":2}","{\"message\":\"info message\",\"path\":\"@.id\",\"count\":3}"],"mapping_problems_count":6,"subgraph.name":"connector_subgraph","url_template":"/test","message":"my request event message","kind":"my.connector.request.event","target":"apollo_router::plugins::telemetry::config_new::events"} +{"timestamp":"[timestamp]","level":"ERROR","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","connector_source":"source","http_method":"GET","mapping_problems":["{\"message\":\"error message\",\"path\":\"@.id\",\"count\":1}","{\"message\":\"warn message\",\"path\":\"@.id\",\"count\":2}","{\"message\":\"info message\",\"path\":\"@.id\",\"count\":3}"],"mapping_problems_count":6,"response_status":200,"subgraph.name":"connector_subgraph","url_template":"/test","message":"my response event message","kind":"my.connector.response.event","target":"apollo_router::plugins::telemetry::config_new::events"} diff --git a/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__text_logging_with_custom_events_with_instrumented.snap b/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__text_logging_with_custom_events_with_instrumented.snap index 99e1b32eca..f5d96036c2 100644 --- a/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__text_logging_with_custom_events_with_instrumented.snap +++ b/apollo-router/src/plugins/telemetry/snapshots/apollo_router__plugins__telemetry__fmt_layer__tests__text_logging_with_custom_events_with_instrumented.snap @@ -14,7 +14,5 @@ expression: buff.to_string() [timestamp] ERROR my.custom.attribute=["{"id":1234,"name":"first_name"}","{"id":567,"name":"second_name"}"] response_status=200 subgraph.name=subgraph my response event message kind=my.subgraph.response.event [timestamp] INFO my event message kind=my.subgraph.request.event [timestamp] ERROR my.custom.attribute=[[{"id":1234,"name":"first_name"},{"id":567,"name":"second_name"}],{"foo":"bar"}] response_status=200 subgraph.name=subgraph_bis my response event message kind=my.subgraph.response.event -[timestamp] INFO http.request.headers={"x-log-request": "log"} http.request.method=GET http.request.uri=/ http.request.version=HTTP/1.1 kind=connector.request -[timestamp] INFO connector_source=source http_method=GET subgraph.name=connector_subgraph url_template=/test my request event message kind=my.connector.request.event -[timestamp] WARN http.response.headers={"x-log-response": "log"} http.response.status=200 OK http.response.version=HTTP/1.1 kind=connector.response -[timestamp] ERROR connector_source=source http_method=GET response_status=200 subgraph.name=connector_subgraph url_template=/test my response event message kind=my.connector.response.event +[timestamp] INFO connector_source=source http_method=GET mapping_problems=["{"message":"error message","path":"@.id","count":1}","{"message":"warn message","path":"@.id","count":2}","{"message":"info message","path":"@.id","count":3}"] mapping_problems_count=6 subgraph.name=connector_subgraph url_template=/test my request event message kind=my.connector.request.event +[timestamp] ERROR connector_source=source http_method=GET mapping_problems=["{"message":"error message","path":"@.id","count":1}","{"message":"warn message","path":"@.id","count":2}","{"message":"info message","path":"@.id","count":3}"] mapping_problems_count=6 response_status=200 subgraph.name=connector_subgraph url_template=/test my response event message kind=my.connector.response.event diff --git a/apollo-router/src/plugins/telemetry/testdata/custom_events.router.yaml b/apollo-router/src/plugins/telemetry/testdata/custom_events.router.yaml index cd97d0604e..83b2bbaa55 100644 --- a/apollo-router/src/plugins/telemetry/testdata/custom_events.router.yaml +++ b/apollo-router/src/plugins/telemetry/testdata/custom_events.router.yaml @@ -146,24 +146,7 @@ telemetry: subgraph_response_data: "$.*" default: "missing" connector: - # Standard events - request: - level: info - condition: - eq: - - connector_http_request_header: x-log-request - - "log" - response: - level: warn - condition: - all: - - eq: - - connector_http_response_header: x-log-response - - "log" - - eq: - - subgraph_name: true - - "subgraph" - error: error + # Standard events cannot be tested, because the test harness mocks the service that emits them # Custom events my.disabled_request.event: diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs index 964bb3f099..ccf6ee8b91 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs @@ -99,7 +99,7 @@ pub(crate) struct Config { resource_mapping: HashMap, /// Which spans will be eligible for span stats to be collected for viewing in the APM view. - /// Defaults to true for `request`, `router`, `query_parsing`, `supergraph`, `execution`, `query_planning`, `subgraph`, `subgraph_request` and `http_request`. + /// Defaults to true for `request`, `router`, `query_parsing`, `supergraph`, `execution`, `query_planning`, `subgraph`, `subgraph_request`, `connect`, `connect_request` and `http_request`. #[serde(default = "default_span_metrics")] span_metrics: HashMap, } diff --git a/apollo-router/src/plugins/test.rs b/apollo-router/src/plugins/test.rs index f767d89645..6bfdbea0ea 100644 --- a/apollo-router/src/plugins/test.rs +++ b/apollo-router/src/plugins/test.rs @@ -17,6 +17,7 @@ use crate::plugin::DynPlugin; use crate::plugin::PluginInit; use crate::plugin::PluginPrivate; use crate::query_planner::QueryPlannerService; +use crate::services::connector; use crate::services::execution; use crate::services::http; use crate::services::router; @@ -210,6 +211,29 @@ impl> + 'static> PluginTestHarness { ServiceHandle::new(self.plugin.http_client_service(subgraph, service)) } + + #[allow(dead_code)] + pub(crate) async fn call_connector_request_service( + &self, + request: connector::request_service::Request, + response_fn: fn( + connector::request_service::Request, + ) -> connector::request_service::Response, + ) -> Result { + let service: connector::request_service::BoxService = + connector::request_service::BoxService::new( + ServiceBuilder::new().service_fn( + move |req: connector::request_service::Request| async move { + Ok((response_fn)(req)) + }, + ), + ); + + self.plugin + .connector_request_service(service) + .call(request) + .await + } } impl Deref for PluginTestHarness diff --git a/apollo-router/src/services/connector.rs b/apollo-router/src/services/connector.rs new file mode 100644 index 0000000000..f01aac875c --- /dev/null +++ b/apollo-router/src/services/connector.rs @@ -0,0 +1 @@ +pub(crate) mod request_service; diff --git a/apollo-router/src/services/connector/request_service.rs b/apollo-router/src/services/connector/request_service.rs new file mode 100644 index 0000000000..65643b91f0 --- /dev/null +++ b/apollo-router/src/services/connector/request_service.rs @@ -0,0 +1,410 @@ +//! Service which makes individual requests to Apollo Connectors over some transport + +use std::sync::Arc; +use std::task::Poll; + +use apollo_federation::sources::connect::Connector; +use futures::future::BoxFuture; +use indexmap::IndexMap; +use opentelemetry::KeyValue; +use opentelemetry_semantic_conventions::trace::HTTP_REQUEST_METHOD; +use parking_lot::Mutex; +use static_assertions::assert_impl_all; +use tower::BoxError; +use tower::ServiceBuilder; +use tower::ServiceExt; +use tracing::info_span; + +use crate::error::FetchError; +use crate::graphql; +use crate::graphql::ErrorExtension; +use crate::json_ext::Path; +use crate::layers::ServiceBuilderExt; +use crate::plugins::connectors::handle_responses::process_response; +use crate::plugins::connectors::handle_responses::MappedResponse; +use crate::plugins::connectors::make_requests::ResponseKey; +use crate::plugins::connectors::mapping::Problem; +use crate::plugins::connectors::plugin::debug::ConnectorContext; +use crate::plugins::connectors::plugin::debug::ConnectorDebugHttpRequest; +use crate::plugins::connectors::request_limit::RequestLimits; +use crate::plugins::connectors::tracing::CONNECTOR_TYPE_HTTP; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_BODY; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_HEADERS; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_URI; +use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_VERSION; +use crate::plugins::telemetry::config_new::connector::events::ConnectorEventRequest; +use crate::plugins::telemetry::config_new::events::log_event; +use crate::plugins::telemetry::config_new::events::EventLevel; +use crate::plugins::telemetry::consts::CONNECT_REQUEST_SPAN_NAME; +use crate::services::connector::request_service::transport::http::HttpRequest; +use crate::services::connector::request_service::transport::http::HttpResponse; +use crate::services::http::HttpClientServiceFactory; +use crate::services::new_service::ServiceFactory; +use crate::services::router; +use crate::services::router::body::RouterBody; +use crate::services::Plugins; +use crate::Context; + +pub(crate) mod transport; + +pub(crate) type BoxService = tower::util::BoxService; + +assert_impl_all!(Request: Send); +assert_impl_all!(Response: Send); + +/// Request type for a single connector request +#[derive(Debug)] +#[non_exhaustive] +pub(crate) struct Request { + /// The request context + pub(crate) context: Context, + + /// The connector associated with this request + // If this service moves into the public API, consider whether this exposes too much + // internal information about the connector. A new type may be needed which exposes only + // what is necessary for customizations. + pub(crate) connector: Arc, + + /// The service name for this connector + pub(crate) service_name: String, + + /// The request to the underlying transport + pub(crate) transport_request: TransportRequest, + + /// Information about how to map the response to GraphQL + pub(crate) key: ResponseKey, + + /// Mapping problems encountered when creating the transport request + pub(crate) mapping_problems: Vec, +} + +/// Response type for a connector +#[derive(Debug)] +#[non_exhaustive] +pub(crate) struct Response { + /// The response context + #[allow(dead_code)] + pub(crate) context: Context, + + /// The connector associated with this response + #[allow(dead_code)] + pub(crate) connector: Arc, + + /// The result of the transport request + pub(crate) transport_result: Result, + + /// The mapped response, including any mapping problems encountered when processing the response + pub(crate) mapped_response: MappedResponse, +} + +/// Request to an underlying transport +#[derive(Debug)] +#[non_exhaustive] +pub(crate) enum TransportRequest { + /// A request to an HTTP transport + Http(HttpRequest), +} + +/// Response from an underlying transport +#[derive(Debug)] +#[non_exhaustive] +pub(crate) enum TransportResponse { + /// A response from an HTTP transport + Http(HttpResponse), +} + +impl From for TransportRequest { + fn from(value: HttpRequest) -> Self { + Self::Http(value) + } +} + +impl From for TransportResponse { + fn from(value: HttpResponse) -> Self { + Self::Http(value) + } +} + +/// An error sending a connector request. This represents a problem with sending the request +/// to the connector, rather than an error returned from the connector itself. +#[derive(Debug, thiserror::Error, displaydoc::Display)] +pub(crate) enum Error { + /// Request limit exceeded + RequestLimitExceeded, + + /// {0} + TransportFailure(#[from] BoxError), +} + +impl Error { + /// Create a GraphQL error from this error. + #[must_use] + pub(crate) fn to_graphql_error( + &self, + connector: Arc, + path: Option, + ) -> crate::error::Error { + use serde_json_bytes::*; + + let builder = graphql::Error::builder() + .message(self.to_string()) + .extension_code(self.extension_code()) + .extension("service", connector.id.subgraph_name.clone()) + .extension( + "connector", + Value::Object(Map::from_iter([( + "coordinate".into(), + Value::String(connector.id.coordinate().into()), + )])), + ); + if let Some(path) = path { + builder.path(path).build() + } else { + builder.build() + } + } +} + +impl ErrorExtension for Error { + fn extension_code(&self) -> String { + match self { + Self::RequestLimitExceeded => "REQUEST_LIMIT_EXCEEDED", + Self::TransportFailure(_) => "HTTP_CLIENT_ERROR", + } + .to_string() + } +} + +#[derive(Clone)] +pub(crate) struct ConnectorRequestServiceFactory { + pub(crate) http_client_service_factory: Arc>, + pub(crate) plugins: Arc, +} + +impl ConnectorRequestServiceFactory { + pub(crate) fn new( + http_client_service_factory: Arc>, + plugins: Arc, + ) -> Self { + Self { + http_client_service_factory, + plugins, + } + } +} + +impl ServiceFactory for ConnectorRequestServiceFactory { + type Service = BoxService; + + fn create(&self) -> Self::Service { + ServiceBuilder::new() + .instrument(|_| { + info_span!( + CONNECT_REQUEST_SPAN_NAME, + "otel.kind" = "INTERNAL", + "otel.status_code" = tracing::field::Empty, + ) + }) + .service( + self.plugins.iter().rev().fold( + ConnectorRequestService { + http_client_service_factory: self.http_client_service_factory.clone(), + } + .boxed(), + |acc, (_, e)| e.connector_request_service(acc), + ), + ) + .boxed() + } +} + +/// A service for executing individual requests to Apollo Connectors +#[derive(Clone)] +pub(crate) struct ConnectorRequestService { + pub(crate) http_client_service_factory: Arc>, +} + +impl tower::Service for ConnectorRequestService { + type Response = Response; + type Error = BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + let original_subgraph_name = request.connector.id.subgraph_name.to_string(); + let http_client_service_factory = self.http_client_service_factory.clone(); + + // Load the information needed from the context + let (debug, connector_request_event, request_limit) = + request.context.extensions().with_lock(|lock| { + ( + lock.get::>>().cloned(), + lock.get::().cloned(), + lock.get::>() + .map(|limits| { + limits.get( + (&request.connector.id).into(), + request.connector.max_requests, + ) + }) + .unwrap_or(None), + ) + }); + + let log_request_level = connector_request_event.and_then(|s| match s.0.condition() { + Some(condition) => { + if condition.lock().evaluate_request(&request) == Some(true) { + Some(s.0.level()) + } else { + None + } + } + None => Some(s.0.level()), + }); + + Box::pin(async move { + let mut debug_request: Option = None; + let result = if request_limit.is_some_and(|request_limit| !request_limit.allow()) { + Err(Error::RequestLimitExceeded) + } else { + let result = match request.transport_request { + TransportRequest::Http(http_request) => { + debug_request = http_request.debug; + + let http_request = log_request( + http_request.inner, + log_request_level, + &request.connector.id.label, + ) + .await?; + + if let Some(http_client_service_factory) = http_client_service_factory + .get(&request.service_name) + .cloned() + { + http_client_service_factory + .create(&original_subgraph_name) + .oneshot(crate::services::http::HttpRequest { + http_request, + context: request.context.clone(), + }) + .await + .map(|result| result.http_response) + .map_err(|e| replace_subgraph_name(e, &request.connector).into()) + } else { + Err(Error::TransportFailure("no http client found".into())) + } + } + }; + + u64_counter!( + "apollo.router.operations.connectors", + "Total number of requests to connectors", + 1, + "connector.type" = CONNECTOR_TYPE_HTTP, + "subgraph.name" = original_subgraph_name + ); + + result + }; + + Ok(process_response( + result, + request.key.clone(), + request.connector, + &request.context, + debug_request, + &debug, + ) + .await) + }) + } +} + +/// Log an event for this request, if configured +async fn log_request( + request: http::Request, + log_request_level: Option, + label: &str, +) -> Result, BoxError> { + if let Some(level) = log_request_level { + let (parts, body) = request.into_parts(); + + let mut attrs = Vec::with_capacity(5); + + #[cfg(test)] + let headers = { + let mut headers: IndexMap = parts + .headers + .clone() + .into_iter() + .filter_map(|(name, val)| Some((name?.to_string(), val))) + .collect(); + headers.sort_keys(); + headers + }; + #[cfg(not(test))] + let headers = parts.headers.clone(); + + attrs.push(KeyValue::new( + HTTP_REQUEST_HEADERS, + opentelemetry::Value::String(format!("{:?}", headers).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_METHOD, + opentelemetry::Value::String(parts.method.as_str().to_string().into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_URI, + opentelemetry::Value::String(format!("{}", parts.uri).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_VERSION, + opentelemetry::Value::String(format!("{:?}", parts.version).into()), + )); + let body_bytes = router::body::into_bytes(body).await?; + attrs.push(KeyValue::new( + HTTP_REQUEST_BODY, + opentelemetry::Value::String( + String::from_utf8(body_bytes.clone().to_vec()) + .unwrap_or_default() + .into(), + ), + )); + log_event( + level, + "connector.request", + attrs, + &format!("Request to connector {label:?}"), + ); + + Ok(http::Request::::from_parts( + parts, + router::body::from_bytes(body_bytes), + )) + } else { + Ok(request) + } +} + +/// Replace the internal subgraph name in an error with the connector label +fn replace_subgraph_name(err: BoxError, connector: &Connector) -> BoxError { + match err.downcast::() { + Ok(inner) => match *inner { + FetchError::SubrequestHttpError { + status_code, + service: _, + reason, + } => Box::new(FetchError::SubrequestHttpError { + status_code, + service: connector.id.subgraph_source(), + reason, + }), + _ => inner, + }, + Err(e) => e, + } +} diff --git a/apollo-router/src/services/connector/request_service/transport.rs b/apollo-router/src/services/connector/request_service/transport.rs new file mode 100644 index 0000000000..7078a65dc2 --- /dev/null +++ b/apollo-router/src/services/connector/request_service/transport.rs @@ -0,0 +1,2 @@ +//! Transport implementations for Apollo Connectors +pub(crate) mod http; diff --git a/apollo-router/src/services/connector/request_service/transport/http.rs b/apollo-router/src/services/connector/request_service/transport/http.rs new file mode 100644 index 0000000000..1d49223b76 --- /dev/null +++ b/apollo-router/src/services/connector/request_service/transport/http.rs @@ -0,0 +1,19 @@ +//! HTTP transport for Apollo Connectors +use crate::plugins::connectors::plugin::debug::ConnectorDebugHttpRequest; +use crate::services::router::body::RouterBody; + +/// Request to an HTTP transport +#[derive(Debug)] +#[non_exhaustive] +pub(crate) struct HttpRequest { + pub(crate) inner: http::Request, + pub(crate) debug: Option, +} + +/// Response from an HTTP transport +#[derive(Debug)] +#[non_exhaustive] +pub(crate) struct HttpResponse { + /// The response parts - the body is consumed by applying the JSON mapping + pub(crate) inner: http::response::Parts, +} diff --git a/apollo-router/src/services/connector_service.rs b/apollo-router/src/services/connector_service.rs index 47f9abdb1d..72e7b7316d 100644 --- a/apollo-router/src/services/connector_service.rs +++ b/apollo-router/src/services/connector_service.rs @@ -14,29 +14,19 @@ use serde::Deserialize; use serde::Serialize; use tower::BoxError; use tower::ServiceExt; -use tracing::error; -use tracing::Instrument; +use tracing_futures::Instrument; use super::connect::BoxService; -use super::http::HttpClientServiceFactory; -use super::http::HttpRequest; use super::new_service::ServiceFactory; -use crate::error::FetchError; -use crate::plugins::connectors::error::Error as ConnectorError; use crate::plugins::connectors::handle_responses::aggregate_responses; -use crate::plugins::connectors::handle_responses::process_response; -use crate::plugins::connectors::http::Request; -use crate::plugins::connectors::http::Response as ConnectorResponse; -use crate::plugins::connectors::http::Result as ConnectorResult; use crate::plugins::connectors::make_requests::make_requests; use crate::plugins::connectors::plugin::debug::ConnectorContext; -use crate::plugins::connectors::request_limit::RequestLimits; use crate::plugins::connectors::tracing::connect_spec_version_instrument; use crate::plugins::connectors::tracing::CONNECTOR_TYPE_HTTP; use crate::plugins::subscription::SubscriptionConfig; use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME; use crate::query_planner::fetch::SubgraphSchemas; -use crate::services::router::body::RouterBody; +use crate::services::connector::request_service::ConnectorRequestServiceFactory; use crate::services::ConnectRequest; use crate::services::ConnectResponse; use crate::spec::Schema; @@ -55,36 +45,15 @@ pub(crate) const APOLLO_CONNECTOR_SOURCE_NAME: Key = Key::from_static_str("apollo.connector.source.name"); pub(crate) const APOLLO_CONNECTOR_SOURCE_DETAIL: Key = Key::from_static_str("apollo.connector.source.detail"); -pub(crate) const CONNECTOR_INFO_CONTEXT_KEY: &str = "apollo_router::connector::info"; /// A service for executing connector requests. #[derive(Clone)] pub(crate) struct ConnectorService { - pub(crate) http_service_factory: Arc>, pub(crate) _schema: Arc, pub(crate) _subgraph_schemas: Arc, pub(crate) _subscription_config: Option, pub(crate) connectors_by_service_name: Arc, Connector>>, -} - -/// Serializable information about a connector. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub(crate) struct ConnectorInfo { - pub(crate) subgraph_name: String, - pub(crate) source_name: Option, - pub(crate) http_method: String, - pub(crate) url_template: String, -} - -impl From<&Connector> for ConnectorInfo { - fn from(connector: &Connector) -> Self { - Self { - subgraph_name: connector.id.subgraph_name.to_string(), - source_name: connector.id.source_name.clone(), - http_method: connector.transport.method.as_str().to_string(), - url_template: connector.transport.connect_template.to_string(), - } - } + pub(crate) connector_request_service_factory: Arc, } /// A reference to a unique Connector source. @@ -146,19 +115,13 @@ impl tower::Service for ConnectorService { .get(&request.service_name) .cloned(); - let http_client_factory = self - .http_service_factory - .get(&request.service_name.to_string()) - .cloned(); + let connector_request_service_factory = self.connector_request_service_factory.clone(); Box::pin(async move { let Some(connector) = connector else { return Err("no connector found".into()); }; - let Some(http_client_factory) = http_client_factory else { - return Err("no http client found".into()); - }; let fetch_time_offset = request.context.created_at.elapsed().as_nanos() as i64; let span = tracing::info_span!( CONNECT_SPAN_NAME, @@ -193,141 +156,74 @@ impl tower::Service for ConnectorService { } } - execute(&http_client_factory, request, &connector) - .instrument(span) - .await + let service_name = request.service_name.to_string(); + + execute( + &connector_request_service_factory, + request, + connector, + &service_name, + ) + .instrument(span) + .await }) } } async fn execute( - http_client_factory: &HttpClientServiceFactory, + connector_request_service_factory: &ConnectorRequestServiceFactory, request: ConnectRequest, - connector: &Connector, + connector: Connector, + service_name: &str, ) -> Result { let context = request.context.clone(); - let original_subgraph_name = connector.id.subgraph_name.to_string(); - - let (ref debug, request_limit) = context.extensions().with_lock(|lock| { - let debug = lock.get::>>().cloned(); - let request_limit = lock - .get::>() - .map(|limits| limits.get((&connector.id).into(), connector.max_requests)) - .unwrap_or(None); - (debug, request_limit) - }); - - let requests = make_requests(request, connector, debug).map_err(BoxError::from)?; - - let tasks = requests.into_iter().map( - move |Request { - request: req, - key, - debug_request, - }| { - // Returning an error from this closure causes all tasks to be cancelled and the operation - // to fail. This is the reason for the Result-wrapped-in-a-Result here. An `Err` on the - // inner result fails just that one task, but an `Err` on the outer result cancels all the - // tasks and fails the whole operation. - let context = context.clone(); - if context - .insert(CONNECTOR_INFO_CONTEXT_KEY, ConnectorInfo::from(connector)) - .is_err() - { - error!("Failed to store connector info in context"); - } - let original_subgraph_name = original_subgraph_name.clone(); - let request_limit = request_limit.clone(); - async move { - let res = if request_limit.is_some_and(|request_limit| !request_limit.allow()) { - ConnectorResponse { - result: ConnectorResult::::Err( - ConnectorError::RequestLimitExceeded, - ), - key, - debug_request, - } - } else { - let client = http_client_factory.create(&original_subgraph_name); - let req = HttpRequest { - http_request: req, - context: context.clone(), - }; - let res = match client.oneshot(req).await { - Ok(res) => ConnectorResponse { - result: ConnectorResult::HttpResponse(res.http_response), - key, - debug_request, - }, - Err(e) => ConnectorResponse { - result: ConnectorResult::::Err( - ConnectorError::HTTPClientError(handle_subrequest_http_error( - e, connector, - )), - ), - key, - debug_request, - }, - }; - - u64_counter!( - "apollo.router.operations.connectors", - "Total number of requests to connectors", - 1, - "connector.type" = CONNECTOR_TYPE_HTTP, - "subgraph.name" = original_subgraph_name - ); - - res - }; - - Ok::<_, BoxError>(process_response(res, connector, &context, debug).await) - } - }, - ); - - aggregate_responses(futures::future::try_join_all(tasks).await?).map_err(BoxError::from) -} - -fn handle_subrequest_http_error(err: BoxError, connector: &Connector) -> BoxError { - match err.downcast::() { - // Replace the internal subgraph name with the connector label - Ok(inner) => match *inner { - FetchError::SubrequestHttpError { - status_code, - service: _, - reason, - } => Box::new(FetchError::SubrequestHttpError { - status_code, - service: connector.id.subgraph_source(), - reason, - }), - _ => inner, - }, - Err(e) => e, - } + let connector = Arc::new(connector); + let debug = &context + .extensions() + .with_lock(|lock| lock.get::>>().cloned()); + + let tasks = make_requests(request, &context, connector, service_name, debug) + .map_err(BoxError::from)? + .into_iter() + .map(move |request| async { + connector_request_service_factory + .create() + .oneshot(request) + .await + }); + + aggregate_responses( + futures::future::try_join_all(tasks) + .await + .map(|responses| { + responses + .into_iter() + .map(|response| response.mapped_response) + .collect() + })?, + ) + .map_err(BoxError::from) } #[derive(Clone)] pub(crate) struct ConnectorServiceFactory { pub(crate) schema: Arc, pub(crate) subgraph_schemas: Arc, - pub(crate) http_service_factory: Arc>, pub(crate) subscription_config: Option, pub(crate) connectors_by_service_name: Arc, Connector>>, _connect_spec_version_instrument: Option>, + pub(crate) connector_request_service_factory: Arc, } impl ConnectorServiceFactory { pub(crate) fn new( schema: Arc, subgraph_schemas: Arc, - http_service_factory: Arc>, subscription_config: Option, connectors_by_service_name: Arc, Connector>>, + connector_request_service_factory: Arc, ) -> Self { Self { - http_service_factory, subgraph_schemas, schema: schema.clone(), subscription_config, @@ -335,19 +231,22 @@ impl ConnectorServiceFactory { _connect_spec_version_instrument: connect_spec_version_instrument( schema.connectors.as_ref(), ), + connector_request_service_factory, } } #[cfg(test)] pub(crate) fn empty(schema: Arc) -> Self { - Self { - http_service_factory: Arc::new(Default::default()), - subgraph_schemas: Default::default(), - subscription_config: Default::default(), - connectors_by_service_name: Default::default(), + Self::new( schema, - _connect_spec_version_instrument: None, - } + Default::default(), + Default::default(), + Default::default(), + Arc::new(ConnectorRequestServiceFactory::new( + Default::default(), + Default::default(), + )), + ) } } @@ -356,11 +255,11 @@ impl ServiceFactory for ConnectorServiceFactory { fn create(&self) -> Self::Service { ConnectorService { - http_service_factory: self.http_service_factory.clone(), _schema: self.schema.clone(), _subgraph_schemas: self.subgraph_schemas.clone(), _subscription_config: self.subscription_config.clone(), connectors_by_service_name: self.connectors_by_service_name.clone(), + connector_request_service_factory: self.connector_request_service_factory.clone(), } .boxed() } diff --git a/apollo-router/src/services/mod.rs b/apollo-router/src/services/mod.rs index 68102d98af..eb962ed61a 100644 --- a/apollo-router/src/services/mod.rs +++ b/apollo-router/src/services/mod.rs @@ -28,6 +28,7 @@ pub(crate) use crate::services::supergraph::Request as SupergraphRequest; pub(crate) use crate::services::supergraph::Response as SupergraphResponse; pub(crate) mod connect; +pub(crate) mod connector; pub(crate) mod connector_service; pub mod execution; pub(crate) mod external; diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index 672a5590c6..6c7193022e 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -54,6 +54,7 @@ use crate::query_planner::QueryPlannerService; use crate::router_factory::create_http_services; use crate::router_factory::create_plugins; use crate::router_factory::create_subgraph_services; +use crate::services::connector::request_service::ConnectorRequestServiceFactory; use crate::services::connector_service::ConnectorServiceFactory; use crate::services::execution::QueryPlan; use crate::services::fetch_service::FetchServiceFactory; @@ -608,11 +609,14 @@ async fn subscription_task( Arc::new(ConnectorServiceFactory::new( execution_service_factory.schema.clone(), execution_service_factory.subgraph_schemas.clone(), - Arc::new(http_service_factory), subscription_plugin_conf, execution_service_factory.schema .connectors.as_ref().map(|c| c.by_service_name.clone()) .unwrap_or_default(), + Arc::new(ConnectorRequestServiceFactory::new( + Arc::new(http_service_factory), + execution_service_factory.plugins.clone(), + )), )), ), @@ -897,13 +901,16 @@ impl PluggableSupergraphServiceBuilder { Arc::new(ConnectorServiceFactory::new( schema.clone(), subgraph_schemas, - Arc::new(self.http_service_factory), subscription_plugin_conf, schema .connectors .as_ref() .map(|c| c.by_service_name.clone()) .unwrap_or_default(), + Arc::new(ConnectorRequestServiceFactory::new( + Arc::new(self.http_service_factory), + self.plugins.clone(), + )), )), ));