diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs index 9d5810bbb8c..8376f67a017 100644 --- a/influxdb3/tests/server/write.rs +++ b/influxdb3/tests/server/write.rs @@ -342,3 +342,61 @@ async fn writes_with_different_schema_should_fail() { "the request should hae failed with an API Error" ); } +#[tokio::test] +/// Check that the no_sync param can be used on any endpoint. However, this only means that serde +/// will parse it just fine. It is only able to be used in the v3 endpoint and will +/// default to requiring the WAL to synce before returning. +async fn api_no_sync_param() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let v1_write_url = format!("{base}/write", base = server.client_addr()); + let v2_write_url = format!("{base}/api/v2/write", base = server.client_addr()); + let v3_write_url = format!("{base}/api/v3/write_lp", base = server.client_addr()); + let write_body = "cpu,host=a usage=0.5"; + + let mut params = vec![]; + params.push(("db", "foo")); + params.push(("no_sync", "true")); + + let resp = client + .post(&v1_write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send /write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(status, StatusCode::NO_CONTENT); + + let mut params = vec![]; + params.push(("bucket", "foo")); + params.push(("no_sync", "true")); + let resp = client + .post(&v2_write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send api/v2/write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(status, StatusCode::NO_CONTENT); + + let mut params = vec![]; + params.push(("db", "foo")); + params.push(("no_sync", "true")); + let resp = client + .post(&v3_write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send api/v3/write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(status, StatusCode::NO_CONTENT); +} diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index a0ac0f8ee8b..5c091092b6a 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -802,6 +802,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -877,6 +878,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -937,6 +939,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -1005,6 +1008,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -1099,6 +1103,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -1159,6 +1164,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index d0aeffbce0c..66778cfeff9 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -518,6 +518,7 @@ where default_time, params.accept_partial, params.precision, + params.no_sync, ) .await?; @@ -1615,6 +1616,8 @@ pub(crate) struct WriteParams { pub(crate) accept_partial: bool, #[serde(default)] pub(crate) precision: Precision, + #[serde(default)] + pub(crate) no_sync: bool, } impl From for WriteParams { @@ -1624,6 +1627,7 @@ impl From for WriteParams { // legacy behaviour was to not accept partial: accept_partial: false, precision: legacy.precision.into(), + no_sync: false, } } } diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 99548ace282..6edb97935b0 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -888,6 +888,7 @@ mod tests { Time::from_timestamp_nanos(time), false, influxdb3_write::Precision::Nanosecond, + false, ) .await .unwrap(); diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index e89c02706f0..5db7fe6202a 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -103,6 +103,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static { ingest_time: Time, accept_partial: bool, precision: Precision, + no_sync: bool, ) -> write_buffer::Result; /// Returns the database schema provider diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index d6717bae934..5a68382dbe6 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -46,7 +46,7 @@ use metric::Registry; use metrics::WriteMetrics; use object_store::path::Path as ObjPath; use object_store::{ObjectMeta, ObjectStore}; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::{debug, error, warn}; use parquet_file::storage::ParquetExecInput; use queryable_buffer::QueryableBufferArgs; use schema::Schema; @@ -263,6 +263,7 @@ impl WriteBufferImpl { ingest_time: Time, accept_partial: bool, precision: Precision, + no_sync: bool, ) -> Result { debug!("write_lp to {} in writebuffer", db_name); @@ -284,12 +285,21 @@ impl WriteBufferImpl { } ops.push(WalOp::Write(result.valid_data)); - // write to the wal. Behind the scenes the ops get buffered in memory and once a second (or - // whatever the configured wal flush interval is set to) the buffer is flushed and all the - // data is persisted into a single wal file in the configured object store. Then the - // contents are sent to the configured notifier, which in this case is the queryable buffer. - // Thus, after this returns, the data is both durable and queryable. - self.wal.write_ops(ops).await?; + if no_sync { + let wal = Arc::clone(&self.wal); + tokio::spawn(async move { + if let Err(err) = wal.write_ops(ops).await { + error!("Failed to persist data to the wal: {err}"); + } + }); + } else { + // write to the wal. Behind the scenes the ops get buffered in memory and once a second (or + // whatever the configured wal flush interval is set to) the buffer is flushed and all the + // data is persisted into a single wal file in the configured object store. Then the + // contents are sent to the configured notifier, which in this case is the queryable buffer. + // Thus, after this returns, the data is both durable and queryable. + self.wal.write_ops(ops).await?; + } // record metrics for lines written, rejected, and bytes written self.metrics @@ -455,9 +465,17 @@ impl Bufferer for WriteBufferImpl { ingest_time: Time, accept_partial: bool, precision: Precision, + no_sync: bool, ) -> Result { - self.write_lp(database, lp, ingest_time, accept_partial, precision) - .await + self.write_lp( + database, + lp, + ingest_time, + accept_partial, + precision, + no_sync, + ) + .await } fn catalog(&self) -> Arc { @@ -1001,6 +1019,7 @@ mod tests { Time::from_timestamp_nanos(123), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1023,6 +1042,7 @@ mod tests { Time::from_timestamp_nanos(124), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1034,6 +1054,7 @@ mod tests { Time::from_timestamp_nanos(125), false, Precision::Nanosecond, + false, ) .await; @@ -1108,6 +1129,7 @@ mod tests { Time::from_timestamp(20, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1165,6 +1187,7 @@ mod tests { Time::from_timestamp(30, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1186,6 +1209,7 @@ mod tests { Time::from_timestamp(40, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1240,6 +1264,7 @@ mod tests { Time::from_timestamp(10, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1261,6 +1286,7 @@ mod tests { Time::from_timestamp(65, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1284,6 +1310,7 @@ mod tests { Time::from_timestamp(147, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1324,6 +1351,7 @@ mod tests { Time::from_timestamp(250, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1396,6 +1424,7 @@ mod tests { Time::from_timestamp(300, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1408,6 +1437,7 @@ mod tests { Time::from_timestamp(330, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1577,6 +1607,7 @@ mod tests { Time::from_timestamp(10, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1586,6 +1617,7 @@ mod tests { Time::from_timestamp(20, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1595,6 +1627,7 @@ mod tests { Time::from_timestamp(30, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2303,6 +2336,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2331,6 +2365,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2881,6 +2916,7 @@ mod tests { Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2900,6 +2936,7 @@ mod tests { Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000), true, Precision::Nanosecond, + false, ) .await .unwrap();