feat: basic scheduler work

This commit is contained in:
Max batleforc 2024-06-24 18:39:20 +02:00
parent 81c3114e0d
commit bfd5596993
No known key found for this signature in database
GPG Key ID: 25D243AB4B6AC9E7
5 changed files with 174 additions and 6 deletions

39
Cargo.lock generated
View File

@ -708,6 +708,7 @@ dependencies = [
"surrealdb", "surrealdb",
"time", "time",
"tokio", "tokio",
"tokio-cron-scheduler",
"toml", "toml",
"tracing", "tracing",
"tracing-actix-web", "tracing-actix-web",
@ -1010,6 +1011,17 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216"
[[package]]
name = "cron"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07"
dependencies = [
"chrono",
"nom",
"once_cell",
]
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.13" version = "0.5.13"
@ -2309,6 +2321,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-derive"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.46" version = "0.1.46"
@ -4253,6 +4276,22 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "tokio-cron-scheduler"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c2e3a88f827f597799cf70a6f673074e62f3fc5ba5993b2873345c618a29af"
dependencies = [
"chrono",
"cron",
"num-derive",
"num-traits",
"tokio",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]] [[package]]
name = "tokio-io-timeout" name = "tokio-io-timeout"
version = "1.2.0" version = "1.2.0"

View File

@ -43,6 +43,10 @@ opentelemetry = "0.23"
opentelemetry_sdk = { version = "0.23", features = ["rt-tokio"] } opentelemetry_sdk = { version = "0.23", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.16" } opentelemetry-otlp = { version = "0.16" }
tracing-opentelemetry = "0.24" tracing-opentelemetry = "0.24"
tokio-cron-scheduler = { version = "0.10", features = [
"tracing-subscriber",
"signal",
] }
[[bin]] [[bin]]

View File

@ -1 +1,2 @@
pub mod concour; pub mod concour;
pub mod schedule_job;

111
src/event/schedule_job.rs Normal file
View File

@ -0,0 +1,111 @@
use std::collections::HashMap;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{error, info, instrument};
use uuid::Uuid;
#[derive(Clone)]
pub struct ScheduleJob {
pub job_id: HashMap<(u64, u64), Uuid>,
pub scheduler: JobScheduler,
}
impl ScheduleJob {
#[instrument()]
pub async fn start_cron_scheduler() -> Result<Self, ()> {
let scheduler = JobScheduler::new().await;
let mut future_self = match scheduler {
Ok(scheduler) => ScheduleJob {
job_id: Default::default(),
scheduler,
},
Err(e) => {
error!("Error starting cron scheduler: {:?}", e);
return Err(());
}
};
future_self.scheduler.set_shutdown_handler(Box::new(|| {
Box::pin(async {
info!("Cron scheduler stopped");
})
}));
future_self.scheduler.shutdown_on_ctrl_c();
match future_self.scheduler.start().await {
Ok(_) => {
info!("Cron scheduler started");
Ok(future_self)
}
Err(e) => {
error!("Error starting cron scheduler: {:?}", e);
Err(())
}
}
}
#[instrument(skip(self))]
pub async fn stop_cron_scheduler(&mut self) -> &mut Self {
for (server_id, channel_id) in self.job_id.keys() {
match self
.scheduler
.remove(self.job_id.get(&(*server_id, *channel_id)).unwrap())
.await
{
Ok(_) => {
info!("Cron job removed");
}
Err(e) => {
error!("Error removing cron job: {:?}", e);
}
}
}
match self.scheduler.shutdown().await {
Ok(_) => {
info!("Cron scheduler stopped");
self
}
Err(e) => {
error!("Error stopping cron scheduler: {:?}", e);
self
}
}
}
#[instrument(skip(self))]
pub async fn add_cron_job(&mut self, server_id: u64, channel_id: u64, cron_expression: String) {
let job = match Job::new(cron_expression.as_str(), move |uuid, _l| {
// Do something
info!("Cron job executed with id: {:?}", uuid);
}) {
Ok(job) => job,
Err(e) => {
error!("Error creating cron job: {:?}", e);
return;
}
};
match self.scheduler.add(job).await {
Ok(job_uid) => {
info!("Cron job added");
self.job_id.insert((server_id, channel_id), job_uid);
}
Err(e) => {
error!("Error adding cron job: {:?}", e);
}
}
}
#[instrument(skip(self))]
pub async fn stop_scheduled_job(&mut self, server_id: u64, channel_id: u64) {
match self.job_id.remove(&(server_id, channel_id)) {
Some(job_uid) => match self.scheduler.remove(&job_uid).await {
Ok(_) => {
info!("Cron job removed");
}
Err(e) => {
error!("Error removing cron job: {:?}", e);
}
},
None => {
error!("Cron job not found");
}
}
}
}

View File

@ -8,14 +8,15 @@ use botdiscord::api::apidocs::ApiDocs;
use botdiscord::api::init::init_api; use botdiscord::api::init::init_api;
use botdiscord::config::parse_local_config; use botdiscord::config::parse_local_config;
use botdiscord::db; use botdiscord::db;
use botdiscord::event::schedule_job::ScheduleJob;
use botdiscord::{botv2::init::start_bot, tracing}; use botdiscord::{botv2::init::start_bot, tracing};
use std::{process, time::Duration}; use std::time::Duration;
use tokio::{sync::oneshot, time::sleep}; use tokio::{sync::oneshot, time::sleep};
use tracing_actix_web::{RequestId, TracingLogger}; use tracing_actix_web::{RequestId, TracingLogger};
use utoipa::OpenApi; use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi; use utoipa_swagger_ui::SwaggerUi;
#[actix_web::main] #[tokio::main] //#[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let config = parse_local_config(); let config = parse_local_config();
let port = config.port; let port = config.port;
@ -27,8 +28,18 @@ async fn main() -> std::io::Result<()> {
return Ok(()); return Ok(());
} }
} }
let (tx, rx) = oneshot::channel(); let mut cron_scheduler = match ScheduleJob::start_cron_scheduler().await {
let http = start_bot(config.clone(), rx).await; Ok(cron_scheduler) => cron_scheduler,
Err(_) => {
println!("Error starting cron scheduler");
return Ok(());
}
};
cron_scheduler
.add_cron_job(123, 123, "* * * * * *".to_string())
.await;
let (tx_bot, rx_bot) = oneshot::channel();
let http = start_bot(config.clone(), rx_bot).await;
info!("API Server started on port {}", port); info!("API Server started on port {}", port);
let mut openapi = ApiDocs::openapi(); let mut openapi = ApiDocs::openapi();
openapi.info.title = format!("{} Api", config.bot_name.clone()); openapi.info.title = format!("{} Api", config.bot_name.clone());
@ -70,8 +81,10 @@ async fn main() -> std::io::Result<()> {
.run() .run()
.await?; .await?;
info!("API Server stopped."); info!("API Server stopped.");
tx.send(()).unwrap(); tx_bot.send(()).unwrap();
cron_scheduler.stop_cron_scheduler().await;
tracing::init::stop_tracing(config.tracing.clone(), config.bot_name.clone()); tracing::init::stop_tracing(config.tracing.clone(), config.bot_name.clone());
sleep(Duration::from_secs(2)).await; sleep(Duration::from_secs(2)).await;
process::exit(1); // This is a workaround to stop the bot, it should be replaced by a better solution //process::exit(1); // This is a workaround to stop the bot, it should be replaced by a better solution
Ok(())
} }