Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processing_engine): Add every mode for scheduled plugins. #25891

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3_processing_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes.workspace = true
chrono.workspace = true
cron.workspace = true
data_types.workspace = true
humantime.workspace = true
hashbrown.workspace = true
hyper.workspace = true
iox_time.workspace = true
Expand Down
8 changes: 7 additions & 1 deletion influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ impl PluginChannels {
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::Every { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::RequestPath { path } => {
self.request_triggers.insert(path.to_string(), tx);
}
Expand Down Expand Up @@ -479,7 +485,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
trigger,
Arc::clone(&self.time_provider),
plugin_context,
),
)?,
PluginType::Request => plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
Expand Down
96 changes: 77 additions & 19 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,25 @@ pub(crate) fn run_schedule_plugin(
trigger_definition: TriggerDefinition,
time_provider: Arc<dyn TimeProvider>,
context: PluginContext,
) {
let TriggerSpecificationDefinition::Schedule { schedule } = &trigger_definition.trigger else {
// TODO: these linkages should be guaranteed by code.
unreachable!("this should've been checked");
};
let schedule = schedule.to_string();
) -> Result<(), Error> {
let trigger_plugin = TriggerPlugin {
trigger_definition,
db_name,
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
};
let runner = python_plugin::ScheduleTriggerRunner::try_new(
&trigger_plugin.trigger_definition.trigger,
Arc::clone(&time_provider),
)?;
tokio::task::spawn(async move {
trigger_plugin
.run_schedule_plugin(context.trigger_rx, schedule, time_provider)
.run_schedule_plugin(context.trigger_rx, runner, time_provider)
.await
.expect("cron trigger plugin failed");
});
Ok(())
}

#[cfg(feature = "system-py")]
Expand Down Expand Up @@ -163,9 +163,10 @@ struct TriggerPlugin {
mod python_plugin {
use super::*;
use anyhow::{anyhow, Context};
use chrono::{DateTime, Utc};
use cron::{OwnedScheduleIterator, Schedule};
use chrono::{DateTime, Duration, Utc};
use cron::{OwnedScheduleIterator, Schedule as CronSchedule};
use data_types::NamespaceName;
use humantime::{format_duration, parse_duration};
use hyper::http::HeaderValue;
use hyper::{Body, Response, StatusCode};
use influxdb3_catalog::catalog::DatabaseSchema;
Expand Down Expand Up @@ -219,12 +220,9 @@ mod python_plugin {
pub(crate) async fn run_schedule_plugin(
&self,
mut receiver: Receiver<PluginEvent>,
schedule: String,
mut runner: ScheduleTriggerRunner,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), Error> {
let schedule = Schedule::from_str(schedule.as_str())?;
let mut runner = ScheduleTriggerRunner::new(schedule, Arc::clone(&time_provider));

loop {
let Some(next_run_instant) = runner.next_run_time() else {
break;
Expand Down Expand Up @@ -379,7 +377,12 @@ mod python_plugin {
TriggerSpecificationDefinition::Schedule {
schedule
} => {
return Err(anyhow!("unexpectedly found scheduled trigger specification {} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::Every {
duration,
} => {
return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::RequestPath { path } => {
return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into())
Expand Down Expand Up @@ -459,17 +462,69 @@ mod python_plugin {
}
}

struct ScheduleTriggerRunner {
schedule: OwnedScheduleIterator<Utc>,
enum Schedule {
Cron(OwnedScheduleIterator<Utc>),
Every(Duration),
}

pub(crate) struct ScheduleTriggerRunner {
schedule: Schedule,
next_trigger_time: Option<DateTime<Utc>>,
}

impl ScheduleTriggerRunner {
fn new(cron_schedule: Schedule, time_provider: Arc<dyn TimeProvider>) -> Self {
pub(crate) fn try_new(
trigger_spec: &TriggerSpecificationDefinition,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self, Error> {
match trigger_spec {
TriggerSpecificationDefinition::AllTablesWalWrite
| TriggerSpecificationDefinition::SingleTableWalWrite { .. } => {
Err(anyhow!("shouldn't have table trigger for scheduled plugin").into())
}
TriggerSpecificationDefinition::RequestPath { .. } => {
Err(anyhow!("shouldn't have request path trigger for scheduled plugin").into())
}
TriggerSpecificationDefinition::Schedule { schedule } => {
let schedule = CronSchedule::from_str(schedule.as_str())
.context("cron schedule should be parsable")?;
Ok(Self::new_cron(schedule, time_provider))
}
TriggerSpecificationDefinition::Every { duration } => {
// check that duration isn't longer than a year, so we avoid overflows.
if *duration > parse_duration("1 year").unwrap() {
return Err(
anyhow!("schedule duration cannot be greater than 1 year").into()
);
}
Ok(Self::new_every(
Duration::from_std(*duration)
.context("should be able to convert durations. ")?,
time_provider,
))
}
}
}
fn new_cron(cron_schedule: CronSchedule, time_provider: Arc<dyn TimeProvider>) -> Self {
let mut schedule = cron_schedule.after_owned(time_provider.now().date_time());
let next_trigger_time = schedule.next();
Self {
schedule,
schedule: Schedule::Cron(schedule),
next_trigger_time,
}
}

fn new_every(duration: Duration, time_provider: Arc<dyn TimeProvider>) -> Self {
let now = time_provider.now().date_time();
let duration_millis = duration.num_milliseconds();
let now_millis = now.timestamp_millis();
let next_trigger_millis = ((now_millis / duration_millis) + 1) * duration_millis;
let next_trigger_time = Some(
DateTime::from_timestamp_millis(next_trigger_millis)
.expect("can't be out of range"),
);
Self {
schedule: Schedule::Every(duration),
next_trigger_time,
}
}
Expand Down Expand Up @@ -503,7 +558,10 @@ mod python_plugin {
}

fn advance_time(&mut self) {
self.next_trigger_time = self.schedule.next();
self.next_trigger_time = match &mut self.schedule {
Schedule::Cron(schedule) => schedule.next(),
Schedule::Every(duration) => self.next_trigger_time.map(|time| time + *duration),
};
}

/// A funky little method to get a tokio Instant that we can call `tokio::time::sleep_until()` on.
Expand Down
1 change: 1 addition & 0 deletions influxdb3_wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ cron = "0.14"
crc32fast.workspace = true
futures-util.workspace = true
hashbrown.workspace = true
humantime.workspace = true
indexmap.workspace = true
object_store.workspace = true
parking_lot.workspace = true
Expand Down
38 changes: 33 additions & 5 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_trait::async_trait;
use cron::Schedule;
use data_types::Timestamp;
use hashbrown::HashMap;
use humantime::{format_duration, parse_duration};
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
Expand Down Expand Up @@ -58,8 +59,11 @@ pub enum Error {
#[error("invalid WAL file path")]
InvalidWalFilePath,

#[error("failed to parse trigger from {}", trigger_spec)]
TriggerSpecificationParseError { trigger_spec: String },
#[error("failed to parse trigger from {trigger_spec}{}", .context.as_ref().map(|context| format!(": {context}")).unwrap_or_default())]
TriggerSpecificationParseError {
trigger_spec: String,
context: Option<String>,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -671,6 +675,7 @@ pub enum TriggerSpecificationDefinition {
AllTablesWalWrite,
Schedule { schedule: String },
RequestPath { path: String },
Every { duration: Duration },
}

impl TriggerSpecificationDefinition {
Expand All @@ -682,29 +687,48 @@ impl TriggerSpecificationDefinition {
if table_name.is_empty() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("table name is empty".to_string()),
});
}
Ok(TriggerSpecificationDefinition::SingleTableWalWrite {
table_name: table_name.to_string(),
})
}
"all_tables" => Ok(TriggerSpecificationDefinition::AllTablesWalWrite),
s if s.starts_with("schedule:") => {
let cron_schedule = s.trim_start_matches("schedule:").trim();
s if s.starts_with("cron:") => {
let cron_schedule = s.trim_start_matches("cron:").trim();
if cron_schedule.is_empty() || Schedule::from_str(cron_schedule).is_err() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: None,
});
}
Ok(TriggerSpecificationDefinition::Schedule {
schedule: cron_schedule.to_string(),
})
}
s if s.starts_with("every:") => {
let duration_str = s.trim_start_matches("every:").trim();
let Ok(duration) = parse_duration(duration_str) else {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("couldn't parse to duration".to_string()),
});
};
if duration > parse_duration("1 year").unwrap() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("don't support every schedules of over 1 year".to_string()),
});
}
Ok(TriggerSpecificationDefinition::Every { duration })
}
s if s.starts_with("request:") => {
let path = s.trim_start_matches("request:").trim();
if path.is_empty() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: None,
});
}
Ok(TriggerSpecificationDefinition::RequestPath {
Expand All @@ -713,6 +737,7 @@ impl TriggerSpecificationDefinition {
}
_ => Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("expect one of the following prefixes: 'table:', 'all_tables:', 'cron:', or 'every:'".to_string()),
}),
}
}
Expand All @@ -724,7 +749,10 @@ impl TriggerSpecificationDefinition {
}
TriggerSpecificationDefinition::AllTablesWalWrite => "all_tables".to_string(),
TriggerSpecificationDefinition::Schedule { schedule } => {
format!("schedule:{}", schedule)
format!("cron:{}", schedule)
}
TriggerSpecificationDefinition::Every { duration } => {
format!("every:{}", format_duration(*duration))
}
TriggerSpecificationDefinition::RequestPath { path } => {
format!("request:{}", path)
Expand Down
Loading