From bfd559699323e89e83159de586c103d5acbaf6a5 Mon Sep 17 00:00:00 2001 From: Max batleforc Date: Mon, 24 Jun 2024 18:39:20 +0200 Subject: [PATCH] feat: basic scheduler work --- Cargo.lock | 39 ++++++++++++++ Cargo.toml | 4 ++ src/event/mod.rs | 1 + src/event/schedule_job.rs | 111 ++++++++++++++++++++++++++++++++++++++ src/main.rs | 25 ++++++--- 5 files changed, 174 insertions(+), 6 deletions(-) create mode 100644 src/event/schedule_job.rs diff --git a/Cargo.lock b/Cargo.lock index fd7aabb..81135b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -708,6 +708,7 @@ dependencies = [ "surrealdb", "time", "tokio", + "tokio-cron-scheduler", "toml", "tracing", "tracing-actix-web", @@ -1010,6 +1011,17 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -2309,6 +2321,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "num-integer" version = "0.1.46" @@ -4253,6 +4276,22 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-cron-scheduler" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c2e3a88f827f597799cf70a6f673074e62f3fc5ba5993b2873345c618a29af" +dependencies = [ + "chrono", + "cron", + "num-derive", + "num-traits", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "tokio-io-timeout" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 2877ee8..fd70d16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,10 @@ opentelemetry = "0.23" opentelemetry_sdk = { version = "0.23", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.16" } tracing-opentelemetry = "0.24" +tokio-cron-scheduler = { version = "0.10", features = [ + "tracing-subscriber", + "signal", +] } [[bin]] diff --git a/src/event/mod.rs b/src/event/mod.rs index 20b5f63..d7dc481 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1 +1,2 @@ pub mod concour; +pub mod schedule_job; diff --git a/src/event/schedule_job.rs b/src/event/schedule_job.rs new file mode 100644 index 0000000..8c280b5 --- /dev/null +++ b/src/event/schedule_job.rs @@ -0,0 +1,111 @@ +use std::collections::HashMap; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tracing::{error, info, instrument}; +use uuid::Uuid; + +#[derive(Clone)] +pub struct ScheduleJob { + pub job_id: HashMap<(u64, u64), Uuid>, + pub scheduler: JobScheduler, +} + +impl ScheduleJob { + #[instrument()] + pub async fn start_cron_scheduler() -> Result { + let scheduler = JobScheduler::new().await; + let mut future_self = match scheduler { + Ok(scheduler) => ScheduleJob { + job_id: Default::default(), + scheduler, + }, + Err(e) => { + error!("Error starting cron scheduler: {:?}", e); + return Err(()); + } + }; + future_self.scheduler.set_shutdown_handler(Box::new(|| { + Box::pin(async { + info!("Cron scheduler stopped"); + }) + })); + future_self.scheduler.shutdown_on_ctrl_c(); + match future_self.scheduler.start().await { + Ok(_) => { + info!("Cron scheduler started"); + Ok(future_self) + } + Err(e) => { + error!("Error starting cron scheduler: {:?}", e); + Err(()) + } + } + } + + #[instrument(skip(self))] + pub async fn stop_cron_scheduler(&mut self) -> &mut Self { + for (server_id, channel_id) in self.job_id.keys() { + match self + .scheduler + .remove(self.job_id.get(&(*server_id, *channel_id)).unwrap()) + .await + { + Ok(_) => { + info!("Cron job removed"); + } + Err(e) => { + error!("Error removing cron job: {:?}", e); + } + } + } + match self.scheduler.shutdown().await { + Ok(_) => { + info!("Cron scheduler stopped"); + self + } + Err(e) => { + error!("Error stopping cron scheduler: {:?}", e); + self + } + } + } + + #[instrument(skip(self))] + pub async fn add_cron_job(&mut self, server_id: u64, channel_id: u64, cron_expression: String) { + let job = match Job::new(cron_expression.as_str(), move |uuid, _l| { + // Do something + info!("Cron job executed with id: {:?}", uuid); + }) { + Ok(job) => job, + Err(e) => { + error!("Error creating cron job: {:?}", e); + return; + } + }; + + match self.scheduler.add(job).await { + Ok(job_uid) => { + info!("Cron job added"); + self.job_id.insert((server_id, channel_id), job_uid); + } + Err(e) => { + error!("Error adding cron job: {:?}", e); + } + } + } + #[instrument(skip(self))] + pub async fn stop_scheduled_job(&mut self, server_id: u64, channel_id: u64) { + match self.job_id.remove(&(server_id, channel_id)) { + Some(job_uid) => match self.scheduler.remove(&job_uid).await { + Ok(_) => { + info!("Cron job removed"); + } + Err(e) => { + error!("Error removing cron job: {:?}", e); + } + }, + None => { + error!("Cron job not found"); + } + } + } +} diff --git a/src/main.rs b/src/main.rs index a310a87..8b35036 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,14 +8,15 @@ use botdiscord::api::apidocs::ApiDocs; use botdiscord::api::init::init_api; use botdiscord::config::parse_local_config; use botdiscord::db; +use botdiscord::event::schedule_job::ScheduleJob; use botdiscord::{botv2::init::start_bot, tracing}; -use std::{process, time::Duration}; +use std::time::Duration; use tokio::{sync::oneshot, time::sleep}; use tracing_actix_web::{RequestId, TracingLogger}; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; -#[actix_web::main] +#[tokio::main] //#[actix_web::main] async fn main() -> std::io::Result<()> { let config = parse_local_config(); let port = config.port; @@ -27,8 +28,18 @@ async fn main() -> std::io::Result<()> { return Ok(()); } } - let (tx, rx) = oneshot::channel(); - let http = start_bot(config.clone(), rx).await; + let mut cron_scheduler = match ScheduleJob::start_cron_scheduler().await { + Ok(cron_scheduler) => cron_scheduler, + Err(_) => { + println!("Error starting cron scheduler"); + return Ok(()); + } + }; + cron_scheduler + .add_cron_job(123, 123, "* * * * * *".to_string()) + .await; + let (tx_bot, rx_bot) = oneshot::channel(); + let http = start_bot(config.clone(), rx_bot).await; info!("API Server started on port {}", port); let mut openapi = ApiDocs::openapi(); openapi.info.title = format!("{} Api", config.bot_name.clone()); @@ -70,8 +81,10 @@ async fn main() -> std::io::Result<()> { .run() .await?; info!("API Server stopped."); - tx.send(()).unwrap(); + tx_bot.send(()).unwrap(); + cron_scheduler.stop_cron_scheduler().await; tracing::init::stop_tracing(config.tracing.clone(), config.bot_name.clone()); sleep(Duration::from_secs(2)).await; - process::exit(1); // This is a workaround to stop the bot, it should be replaced by a better solution + //process::exit(1); // This is a workaround to stop the bot, it should be replaced by a better solution + Ok(()) }