From a3580f8f42baa60d4470b8226a15323dd5f9591d Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Tue, 21 Jan 2025 10:09:26 -0800 Subject: [PATCH] feat(processing_engine): Add every mode for scheduled plugins. --- Cargo.lock | 2 + influxdb3_processing_engine/Cargo.toml | 1 + influxdb3_processing_engine/src/lib.rs | 8 +- influxdb3_processing_engine/src/plugins.rs | 96 +++++++++++++++++----- influxdb3_wal/Cargo.toml | 1 + influxdb3_wal/src/lib.rs | 38 +++++++-- 6 files changed, 121 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1637a3a13a..6676299d548 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3002,6 +3002,7 @@ dependencies = [ "data_types", "datafusion_util", "hashbrown 0.15.2", + "humantime", "hyper 0.14.32", "influxdb3_cache", "influxdb3_catalog", @@ -3197,6 +3198,7 @@ dependencies = [ "data_types", "futures-util", "hashbrown 0.15.2", + "humantime", "indexmap 2.7.0", "influxdb-line-protocol", "influxdb3_id", diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index c9337a33cc0..a6cc2f49145 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -15,6 +15,7 @@ bytes.workspace = true chrono.workspace = true cron.workspace = true data_types.workspace = true +humantime.workspace = true hashbrown.workspace = true hyper.workspace = true iox_time.workspace = true diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index a0ac0f8ee8b..ffb98e64ece 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -112,6 +112,12 @@ impl PluginChannels { .or_default() .insert(trigger, tx); } + TriggerSpecificationDefinition::Every { .. } => { + self.active_triggers + .entry(db) + .or_default() + .insert(trigger, tx); + } TriggerSpecificationDefinition::RequestPath { path } => { self.request_triggers.insert(path.to_string(), tx); } @@ -479,7 +485,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { trigger, Arc::clone(&self.time_provider), plugin_context, - ), + )?, PluginType::Request => plugins::run_request_plugin( db_name.to_string(), plugin_code, diff --git a/influxdb3_processing_engine/src/plugins.rs b/influxdb3_processing_engine/src/plugins.rs index e9f91796662..5d9b9bb3991 100644 --- a/influxdb3_processing_engine/src/plugins.rs +++ b/influxdb3_processing_engine/src/plugins.rs @@ -96,12 +96,7 @@ pub(crate) fn run_schedule_plugin( trigger_definition: TriggerDefinition, time_provider: Arc, context: PluginContext, -) { - let TriggerSpecificationDefinition::Schedule { schedule } = &trigger_definition.trigger else { - // TODO: these linkages should be guaranteed by code. - unreachable!("this should've been checked"); - }; - let schedule = schedule.to_string(); +) -> Result<(), Error> { let trigger_plugin = TriggerPlugin { trigger_definition, db_name, @@ -109,12 +104,17 @@ pub(crate) fn run_schedule_plugin( write_buffer: context.write_buffer, query_executor: context.query_executor, }; + let runner = python_plugin::ScheduleTriggerRunner::try_new( + &trigger_plugin.trigger_definition.trigger, + Arc::clone(&time_provider), + )?; tokio::task::spawn(async move { trigger_plugin - .run_schedule_plugin(context.trigger_rx, schedule, time_provider) + .run_schedule_plugin(context.trigger_rx, runner, time_provider) .await .expect("cron trigger plugin failed"); }); + Ok(()) } #[cfg(feature = "system-py")] @@ -163,9 +163,10 @@ struct TriggerPlugin { mod python_plugin { use super::*; use anyhow::{anyhow, Context}; - use chrono::{DateTime, Utc}; - use cron::{OwnedScheduleIterator, Schedule}; + use chrono::{DateTime, Duration, Utc}; + use cron::{OwnedScheduleIterator, Schedule as CronSchedule}; use data_types::NamespaceName; + use humantime::{format_duration, parse_duration}; use hyper::http::HeaderValue; use hyper::{Body, Response, StatusCode}; use influxdb3_catalog::catalog::DatabaseSchema; @@ -219,12 +220,9 @@ mod python_plugin { pub(crate) async fn run_schedule_plugin( &self, mut receiver: Receiver, - schedule: String, + mut runner: ScheduleTriggerRunner, time_provider: Arc, ) -> Result<(), Error> { - let schedule = Schedule::from_str(schedule.as_str())?; - let mut runner = ScheduleTriggerRunner::new(schedule, Arc::clone(&time_provider)); - loop { let Some(next_run_instant) = runner.next_run_time() else { break; @@ -379,7 +377,12 @@ mod python_plugin { TriggerSpecificationDefinition::Schedule { schedule } => { - return Err(anyhow!("unexpectedly found scheduled trigger specification {} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into()) + return Err(anyhow!("unexpectedly found scheduled trigger specification cron:{} for WAL plugin {}", schedule, self.trigger_definition.trigger_name).into()) + } + TriggerSpecificationDefinition::Every { + duration, + } => { + return Err(anyhow!("unexpectedly found every trigger specification every:{} WAL plugin {}", format_duration(*duration), self.trigger_definition.trigger_name).into()) } TriggerSpecificationDefinition::RequestPath { path } => { return Err(anyhow!("unexpectedly found request path trigger specification {} for WAL plugin {}", path, self.trigger_definition.trigger_name).into()) @@ -459,17 +462,69 @@ mod python_plugin { } } - struct ScheduleTriggerRunner { - schedule: OwnedScheduleIterator, + enum Schedule { + Cron(OwnedScheduleIterator), + Every(Duration), + } + + pub(crate) struct ScheduleTriggerRunner { + schedule: Schedule, next_trigger_time: Option>, } impl ScheduleTriggerRunner { - fn new(cron_schedule: Schedule, time_provider: Arc) -> Self { + pub(crate) fn try_new( + trigger_spec: &TriggerSpecificationDefinition, + time_provider: Arc, + ) -> Result { + match trigger_spec { + TriggerSpecificationDefinition::AllTablesWalWrite + | TriggerSpecificationDefinition::SingleTableWalWrite { .. } => { + Err(anyhow!("shouldn't have table trigger for scheduled plugin").into()) + } + TriggerSpecificationDefinition::RequestPath { .. } => { + Err(anyhow!("shouldn't have request path trigger for scheduled plugin").into()) + } + TriggerSpecificationDefinition::Schedule { schedule } => { + let schedule = CronSchedule::from_str(schedule.as_str()) + .context("cron schedule should be parsable")?; + Ok(Self::new_cron(schedule, time_provider)) + } + TriggerSpecificationDefinition::Every { duration } => { + // check that duration isn't longer than a year, so we avoid overflows. + if *duration > parse_duration("1 year").unwrap() { + return Err( + anyhow!("schedule duration cannot be greater than 1 year").into() + ); + } + Ok(Self::new_every( + Duration::from_std(*duration) + .context("should be able to convert durations. ")?, + time_provider, + )) + } + } + } + fn new_cron(cron_schedule: CronSchedule, time_provider: Arc) -> Self { let mut schedule = cron_schedule.after_owned(time_provider.now().date_time()); let next_trigger_time = schedule.next(); Self { - schedule, + schedule: Schedule::Cron(schedule), + next_trigger_time, + } + } + + fn new_every(duration: Duration, time_provider: Arc) -> Self { + let now = time_provider.now().date_time(); + let duration_millis = duration.num_milliseconds(); + let now_millis = now.timestamp_millis(); + let next_trigger_millis = ((now_millis / duration_millis) + 1) * duration_millis; + let next_trigger_time = Some( + DateTime::from_timestamp_millis(next_trigger_millis) + .expect("can't be out of range"), + ); + Self { + schedule: Schedule::Every(duration), next_trigger_time, } } @@ -503,7 +558,10 @@ mod python_plugin { } fn advance_time(&mut self) { - self.next_trigger_time = self.schedule.next(); + self.next_trigger_time = match &mut self.schedule { + Schedule::Cron(schedule) => schedule.next(), + Schedule::Every(duration) => self.next_trigger_time.map(|time| time + *duration), + }; } /// A funky little method to get a tokio Instant that we can call `tokio::time::sleep_until()` on. diff --git a/influxdb3_wal/Cargo.toml b/influxdb3_wal/Cargo.toml index 8efc64cdd39..4fd3621cce5 100644 --- a/influxdb3_wal/Cargo.toml +++ b/influxdb3_wal/Cargo.toml @@ -25,6 +25,7 @@ cron = "0.14" crc32fast.workspace = true futures-util.workspace = true hashbrown.workspace = true +humantime.workspace = true indexmap.workspace = true object_store.workspace = true parking_lot.workspace = true diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index cdd5c89bb5e..8c10d74ebc0 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -12,6 +12,7 @@ use async_trait::async_trait; use cron::Schedule; use data_types::Timestamp; use hashbrown::HashMap; +use humantime::{format_duration, parse_duration}; use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb_line_protocol::v3::SeriesValue; @@ -58,8 +59,11 @@ pub enum Error { #[error("invalid WAL file path")] InvalidWalFilePath, - #[error("failed to parse trigger from {}", trigger_spec)] - TriggerSpecificationParseError { trigger_spec: String }, + #[error("failed to parse trigger from {trigger_spec}{}", .context.as_ref().map(|context| format!(": {context}")).unwrap_or_default())] + TriggerSpecificationParseError { + trigger_spec: String, + context: Option, + }, } pub type Result = std::result::Result; @@ -671,6 +675,7 @@ pub enum TriggerSpecificationDefinition { AllTablesWalWrite, Schedule { schedule: String }, RequestPath { path: String }, + Every { duration: Duration }, } impl TriggerSpecificationDefinition { @@ -682,6 +687,7 @@ impl TriggerSpecificationDefinition { if table_name.is_empty() { return Err(Error::TriggerSpecificationParseError { trigger_spec: spec_str.to_string(), + context: Some("table name is empty".to_string()), }); } Ok(TriggerSpecificationDefinition::SingleTableWalWrite { @@ -689,22 +695,40 @@ impl TriggerSpecificationDefinition { }) } "all_tables" => Ok(TriggerSpecificationDefinition::AllTablesWalWrite), - s if s.starts_with("schedule:") => { - let cron_schedule = s.trim_start_matches("schedule:").trim(); + s if s.starts_with("cron:") => { + let cron_schedule = s.trim_start_matches("cron:").trim(); if cron_schedule.is_empty() || Schedule::from_str(cron_schedule).is_err() { return Err(Error::TriggerSpecificationParseError { trigger_spec: spec_str.to_string(), + context: None, }); } Ok(TriggerSpecificationDefinition::Schedule { schedule: cron_schedule.to_string(), }) } + s if s.starts_with("every") => { + let duration_str = s.trim_start_matches("every:").trim(); + let Ok(duration) = parse_duration(duration_str) else { + return Err(Error::TriggerSpecificationParseError { + trigger_spec: spec_str.to_string(), + context: Some("couldn't parse to duration".to_string()), + }); + }; + if duration > parse_duration("1 year").unwrap() { + return Err(Error::TriggerSpecificationParseError { + trigger_spec: spec_str.to_string(), + context: Some("don't support every schedules of over 1 year".to_string()), + }); + } + Ok(TriggerSpecificationDefinition::Every { duration }) + } s if s.starts_with("request:") => { let path = s.trim_start_matches("request:").trim(); if path.is_empty() { return Err(Error::TriggerSpecificationParseError { trigger_spec: spec_str.to_string(), + context: None, }); } Ok(TriggerSpecificationDefinition::RequestPath { @@ -713,6 +737,7 @@ impl TriggerSpecificationDefinition { } _ => Err(Error::TriggerSpecificationParseError { trigger_spec: spec_str.to_string(), + context: Some("expect one of the following prefixes: 'table:', 'all_tables:', 'cron:', or 'every:'".to_string()), }), } } @@ -724,7 +749,10 @@ impl TriggerSpecificationDefinition { } TriggerSpecificationDefinition::AllTablesWalWrite => "all_tables".to_string(), TriggerSpecificationDefinition::Schedule { schedule } => { - format!("schedule:{}", schedule) + format!("cron:{}", schedule) + } + TriggerSpecificationDefinition::Every { duration } => { + format!("every:{}", format_duration(*duration)) } TriggerSpecificationDefinition::RequestPath { path } => { format!("request:{}", path)