Skip to content

Commit

Permalink
feat: add no_sync write_lp param for fast writes
Browse files Browse the repository at this point in the history
In cases where a user does not need the guarantees that data is
persisted to the WAL on write and needs faster ingest speed then the
no_sync param added in this commit are what they need. Rather
than waiting on a sync to the WAL a task to do so is spawned and the
code continues executing to return a successful HTTP code to the user.

The upside to this is that they can ingest data faster, but there is a
small risk that between writing the data and it eventually being written
to object storage, that the server crashes and it's irrevocably lost.
Also if the write to the WAL fails, then at most the user will get an
error printed in the logs rather than a failed response code they can
handle. The data will still be in the buffer, but will not be durable
until persisted as a parquet file in this case.

However, in many cases that might be acceptable. This commit expands on
what's possible so that the user can use InfluxDB Core the way that
works best for their workload.

Note that this feature is only added for the /api/v3/write_lp endpoint.
The legacy endpoints for writing can take the parameter, but won't do
anything with it at all.

Closes #25597
  • Loading branch information
mgattozzi committed Jan 22, 2025
1 parent f3efa35 commit 02c6d01
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 9 deletions.
58 changes: 58 additions & 0 deletions influxdb3/tests/server/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,61 @@ async fn writes_with_different_schema_should_fail() {
"the request should hae failed with an API Error"
);
}
#[tokio::test]
/// Check that the no_sync param can be used on any endpoint. However, this only means that serde
/// will parse it just fine. It is only able to be used in the v3 endpoint and will
/// default to requiring the WAL to synce before returning.
async fn api_no_sync_param() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let v1_write_url = format!("{base}/write", base = server.client_addr());
let v2_write_url = format!("{base}/api/v2/write", base = server.client_addr());
let v3_write_url = format!("{base}/api/v3/write_lp", base = server.client_addr());
let write_body = "cpu,host=a usage=0.5";

let mut params = vec![];
params.push(("db", "foo"));
params.push(("no_sync", "true"));

let resp = client
.post(&v1_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send /write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);

let mut params = vec![];
params.push(("bucket", "foo"));
params.push(("no_sync", "true"));
let resp = client
.post(&v2_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v2/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);

let mut params = vec![];
params.push(("db", "foo"));
params.push(("no_sync", "true"));
let resp = client
.post(&v3_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v3/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);
}
6 changes: 6 additions & 0 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -877,6 +878,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -937,6 +939,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1005,6 +1008,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1099,6 +1103,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1159,6 +1164,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ where
default_time,
params.accept_partial,
params.precision,
params.no_sync,
)
.await?;

Expand Down Expand Up @@ -1615,6 +1616,8 @@ pub(crate) struct WriteParams {
pub(crate) accept_partial: bool,
#[serde(default)]
pub(crate) precision: Precision,
#[serde(default)]
pub(crate) no_sync: bool,
}

impl From<iox_http::write::WriteParams> for WriteParams {
Expand All @@ -1624,6 +1627,7 @@ impl From<iox_http::write::WriteParams> for WriteParams {
// legacy behaviour was to not accept partial:
accept_partial: false,
precision: legacy.precision.into(),
no_sync: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
ingest_time: Time,
accept_partial: bool,
precision: Precision,
no_sync: bool,
) -> write_buffer::Result<BufferedWriteRequest>;

/// Returns the database schema provider
Expand Down
55 changes: 46 additions & 9 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use metric::Registry;
use metrics::WriteMetrics;
use object_store::path::Path as ObjPath;
use object_store::{ObjectMeta, ObjectStore};
use observability_deps::tracing::{debug, warn};
use observability_deps::tracing::{debug, error, warn};
use parquet_file::storage::ParquetExecInput;
use queryable_buffer::QueryableBufferArgs;
use schema::Schema;
Expand Down Expand Up @@ -263,6 +263,7 @@ impl WriteBufferImpl {
ingest_time: Time,
accept_partial: bool,
precision: Precision,
no_sync: bool,
) -> Result<BufferedWriteRequest> {
debug!("write_lp to {} in writebuffer", db_name);

Expand All @@ -284,12 +285,21 @@ impl WriteBufferImpl {
}
ops.push(WalOp::Write(result.valid_data));

// write to the wal. Behind the scenes the ops get buffered in memory and once a second (or
// whatever the configured wal flush interval is set to) the buffer is flushed and all the
// data is persisted into a single wal file in the configured object store. Then the
// contents are sent to the configured notifier, which in this case is the queryable buffer.
// Thus, after this returns, the data is both durable and queryable.
self.wal.write_ops(ops).await?;
if no_sync {
let wal = Arc::clone(&self.wal);
tokio::spawn(async move {
if let Err(err) = wal.write_ops(ops).await {
error!("Failed to persist data to the wal: {err}");
}
});
} else {
// write to the wal. Behind the scenes the ops get buffered in memory and once a second (or
// whatever the configured wal flush interval is set to) the buffer is flushed and all the
// data is persisted into a single wal file in the configured object store. Then the
// contents are sent to the configured notifier, which in this case is the queryable buffer.
// Thus, after this returns, the data is both durable and queryable.
self.wal.write_ops(ops).await?;
}

// record metrics for lines written, rejected, and bytes written
self.metrics
Expand Down Expand Up @@ -455,9 +465,17 @@ impl Bufferer for WriteBufferImpl {
ingest_time: Time,
accept_partial: bool,
precision: Precision,
no_sync: bool,
) -> Result<BufferedWriteRequest> {
self.write_lp(database, lp, ingest_time, accept_partial, precision)
.await
self.write_lp(
database,
lp,
ingest_time,
accept_partial,
precision,
no_sync,
)
.await
}

fn catalog(&self) -> Arc<Catalog> {
Expand Down Expand Up @@ -1001,6 +1019,7 @@ mod tests {
Time::from_timestamp_nanos(123),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1023,6 +1042,7 @@ mod tests {
Time::from_timestamp_nanos(124),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1034,6 +1054,7 @@ mod tests {
Time::from_timestamp_nanos(125),
false,
Precision::Nanosecond,
false,
)
.await;

Expand Down Expand Up @@ -1108,6 +1129,7 @@ mod tests {
Time::from_timestamp(20, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1165,6 +1187,7 @@ mod tests {
Time::from_timestamp(30, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1186,6 +1209,7 @@ mod tests {
Time::from_timestamp(40, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1240,6 +1264,7 @@ mod tests {
Time::from_timestamp(10, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1261,6 +1286,7 @@ mod tests {
Time::from_timestamp(65, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1284,6 +1310,7 @@ mod tests {
Time::from_timestamp(147, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1324,6 +1351,7 @@ mod tests {
Time::from_timestamp(250, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1396,6 +1424,7 @@ mod tests {
Time::from_timestamp(300, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1408,6 +1437,7 @@ mod tests {
Time::from_timestamp(330, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1577,6 +1607,7 @@ mod tests {
Time::from_timestamp(10, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1586,6 +1617,7 @@ mod tests {
Time::from_timestamp(20, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -1595,6 +1627,7 @@ mod tests {
Time::from_timestamp(30, 0).unwrap(),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2303,6 +2336,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2331,6 +2365,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2881,6 +2916,7 @@ mod tests {
Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand All @@ -2900,6 +2936,7 @@ mod tests {
Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000),
true,
Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down

0 comments on commit 02c6d01

Please sign in to comment.