Skip to content

Commit

Permalink
feat(processing_engine): Add every mode for scheduled plugins. (#25891)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse authored Jan 23, 2025
1 parent 63bd509 commit f1ea2d8
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3_processing_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes.workspace = true
chrono.workspace = true
cron.workspace = true
data_types.workspace = true
humantime.workspace = true
hashbrown.workspace = true
hyper.workspace = true
iox_time.workspace = true
Expand Down
8 changes: 7 additions & 1 deletion influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ impl PluginChannels {
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::Every { .. } => {
self.active_triggers
.entry(db)
.or_default()
.insert(trigger, tx);
}
TriggerSpecificationDefinition::RequestPath { path } => {
self.request_triggers.insert(path.to_string(), tx);
}
Expand Down Expand Up @@ -479,7 +485,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
trigger,
Arc::clone(&self.time_provider),
plugin_context,
),
)?,
PluginType::Request => plugins::run_request_plugin(
db_name.to_string(),
plugin_code,
Expand Down
96 changes: 77 additions & 19 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,25 @@ pub(crate) fn run_schedule_plugin(
trigger_definition: TriggerDefinition,
time_provider: Arc<dyn TimeProvider>,
context: PluginContext,
) {
let TriggerSpecificationDefinition::Schedule { schedule } = &trigger_definition.trigger else {
// TODO: these linkages should be guaranteed by code.
unreachable!("this should've been checked");
};
let schedule = schedule.to_string();
) -> Result<(), Error> {
let trigger_plugin = TriggerPlugin {
trigger_definition,
db_name,
plugin_code,
write_buffer: context.write_buffer,
query_executor: context.query_executor,
};
let runner = python_plugin::ScheduleTriggerRunner::try_new(
&trigger_plugin.trigger_definition.trigger,
Arc::clone(&time_provider),
)?;
tokio::task::spawn(async move {
trigger_plugin
.run_schedule_plugin(context.trigger_rx, schedule, time_provider)
.run_schedule_plugin(context.trigger_rx, runner, time_provider)
.await
.expect("cron trigger plugin failed");
});
Ok(())
}

#[cfg(feature = "system-py")]
Expand Down Expand Up @@ -163,9 +163,10 @@ struct TriggerPlugin {
mod python_plugin {
use super::*;
use anyhow::{anyhow, Context};
use chrono::{DateTime, Utc};
use cron::{OwnedScheduleIterator, Schedule};
use chrono::{DateTime, Duration, Utc};
use cron::{OwnedScheduleIterator, Schedule as CronSchedule};
use data_types::NamespaceName;
use humantime::{format_duration, parse_duration};
use hyper::http::HeaderValue;
use hyper::{Body, Response, StatusCode};
use influxdb3_catalog::catalog::DatabaseSchema;
Expand Down Expand Up @@ -219,12 +220,9 @@ mod python_plugin {
pub(crate) async fn run_schedule_plugin(
&self,
mut receiver: Receiver<PluginEvent>,
schedule: String,
mut runner: ScheduleTriggerRunner,
time_provider: Arc<dyn TimeProvider>,
) -> Result<(), Error> {
let schedule = Schedule::from_str(schedule.as_str())?;
let mut runner = ScheduleTriggerRunner::new(schedule, Arc::clone(&time_provider));

loop {
let Some(next_run_instant) = runner.next_run_time() else {
break;
Expand Down Expand Up @@ -379,7 +377,12 @@ mod python_plugin {
TriggerSpecificationDefinition::Schedule {
schedule
} => {
return Err(anyhow!("unexpectedly found scheduled trigger specification {} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::Every {
duration,
} => {
return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into())
}
TriggerSpecificationDefinition::RequestPath { path } => {
return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into())
Expand Down Expand Up @@ -459,17 +462,69 @@ mod python_plugin {
}
}

struct ScheduleTriggerRunner {
schedule: OwnedScheduleIterator<Utc>,
enum Schedule {
Cron(OwnedScheduleIterator<Utc>),
Every(Duration),
}

pub(crate) struct ScheduleTriggerRunner {
schedule: Schedule,
next_trigger_time: Option<DateTime<Utc>>,
}

impl ScheduleTriggerRunner {
fn new(cron_schedule: Schedule, time_provider: Arc<dyn TimeProvider>) -> Self {
pub(crate) fn try_new(
trigger_spec: &TriggerSpecificationDefinition,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self, Error> {
match trigger_spec {
TriggerSpecificationDefinition::AllTablesWalWrite
| TriggerSpecificationDefinition::SingleTableWalWrite { .. } => {
Err(anyhow!("shouldn't have table trigger for scheduled plugin").into())
}
TriggerSpecificationDefinition::RequestPath { .. } => {
Err(anyhow!("shouldn't have request path trigger for scheduled plugin").into())
}
TriggerSpecificationDefinition::Schedule { schedule } => {
let schedule = CronSchedule::from_str(schedule.as_str())
.context("cron schedule should be parsable")?;
Ok(Self::new_cron(schedule, time_provider))
}
TriggerSpecificationDefinition::Every { duration } => {
// check that duration isn't longer than a year, so we avoid overflows.
if *duration > parse_duration("1 year").unwrap() {
return Err(
anyhow!("schedule duration cannot be greater than 1 year").into()
);
}
Ok(Self::new_every(
Duration::from_std(*duration)
.context("should be able to convert durations. ")?,
time_provider,
))
}
}
}
fn new_cron(cron_schedule: CronSchedule, time_provider: Arc<dyn TimeProvider>) -> Self {
let mut schedule = cron_schedule.after_owned(time_provider.now().date_time());
let next_trigger_time = schedule.next();
Self {
schedule,
schedule: Schedule::Cron(schedule),
next_trigger_time,
}
}

fn new_every(duration: Duration, time_provider: Arc<dyn TimeProvider>) -> Self {
let now = time_provider.now().date_time();
let duration_millis = duration.num_milliseconds();
let now_millis = now.timestamp_millis();
let next_trigger_millis = ((now_millis / duration_millis) + 1) * duration_millis;
let next_trigger_time = Some(
DateTime::from_timestamp_millis(next_trigger_millis)
.expect("can't be out of range"),
);
Self {
schedule: Schedule::Every(duration),
next_trigger_time,
}
}
Expand Down Expand Up @@ -503,7 +558,10 @@ mod python_plugin {
}

fn advance_time(&mut self) {
self.next_trigger_time = self.schedule.next();
self.next_trigger_time = match &mut self.schedule {
Schedule::Cron(schedule) => schedule.next(),
Schedule::Every(duration) => self.next_trigger_time.map(|time| time + *duration),
};
}

/// A funky little method to get a tokio Instant that we can call `tokio::time::sleep_until()` on.
Expand Down
1 change: 1 addition & 0 deletions influxdb3_wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ cron = "0.14"
crc32fast.workspace = true
futures-util.workspace = true
hashbrown.workspace = true
humantime.workspace = true
indexmap.workspace = true
object_store.workspace = true
parking_lot.workspace = true
Expand Down
38 changes: 33 additions & 5 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_trait::async_trait;
use cron::Schedule;
use data_types::Timestamp;
use hashbrown::HashMap;
use humantime::{format_duration, parse_duration};
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
Expand Down Expand Up @@ -58,8 +59,11 @@ pub enum Error {
#[error("invalid WAL file path")]
InvalidWalFilePath,

#[error("failed to parse trigger from {}", trigger_spec)]
TriggerSpecificationParseError { trigger_spec: String },
#[error("failed to parse trigger from {trigger_spec}{}", .context.as_ref().map(|context| format!(": {context}")).unwrap_or_default())]
TriggerSpecificationParseError {
trigger_spec: String,
context: Option<String>,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -671,6 +675,7 @@ pub enum TriggerSpecificationDefinition {
AllTablesWalWrite,
Schedule { schedule: String },
RequestPath { path: String },
Every { duration: Duration },
}

impl TriggerSpecificationDefinition {
Expand All @@ -682,29 +687,48 @@ impl TriggerSpecificationDefinition {
if table_name.is_empty() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("table name is empty".to_string()),
});
}
Ok(TriggerSpecificationDefinition::SingleTableWalWrite {
table_name: table_name.to_string(),
})
}
"all_tables" => Ok(TriggerSpecificationDefinition::AllTablesWalWrite),
s if s.starts_with("schedule:") => {
let cron_schedule = s.trim_start_matches("schedule:").trim();
s if s.starts_with("cron:") => {
let cron_schedule = s.trim_start_matches("cron:").trim();
if cron_schedule.is_empty() || Schedule::from_str(cron_schedule).is_err() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: None,
});
}
Ok(TriggerSpecificationDefinition::Schedule {
schedule: cron_schedule.to_string(),
})
}
s if s.starts_with("every:") => {
let duration_str = s.trim_start_matches("every:").trim();
let Ok(duration) = parse_duration(duration_str) else {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("couldn't parse to duration".to_string()),
});
};
if duration > parse_duration("1 year").unwrap() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("don't support every schedules of over 1 year".to_string()),
});
}
Ok(TriggerSpecificationDefinition::Every { duration })
}
s if s.starts_with("request:") => {
let path = s.trim_start_matches("request:").trim();
if path.is_empty() {
return Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: None,
});
}
Ok(TriggerSpecificationDefinition::RequestPath {
Expand All @@ -713,6 +737,7 @@ impl TriggerSpecificationDefinition {
}
_ => Err(Error::TriggerSpecificationParseError {
trigger_spec: spec_str.to_string(),
context: Some("expect one of the following prefixes: 'table:', 'all_tables:', 'cron:', or 'every:'".to_string()),
}),
}
}
Expand All @@ -724,7 +749,10 @@ impl TriggerSpecificationDefinition {
}
TriggerSpecificationDefinition::AllTablesWalWrite => "all_tables".to_string(),
TriggerSpecificationDefinition::Schedule { schedule } => {
format!("schedule:{}", schedule)
format!("cron:{}", schedule)
}
TriggerSpecificationDefinition::Every { duration } => {
format!("every:{}", format_duration(*duration))
}
TriggerSpecificationDefinition::RequestPath { path } => {
format!("request:{}", path)
Expand Down

0 comments on commit f1ea2d8

Please sign in to comment.