feat: Connection to DB

This commit is contained in:
Max batleforc 2025-05-22 23:09:34 +02:00
parent e59b500818
commit 37c5c7235e
No known key found for this signature in database
GPG Key ID: 25D243AB4B6AC9E7
20 changed files with 2681 additions and 10 deletions

View File

@ -10,4 +10,10 @@
"cSpell.language": "en,fr",
"gitlens.plusFeatures.enabled": false,
"editor.guides.bracketPairs": true,
"cSpell.words": [
"clickhouse",
"datalake",
"uuid",
"Uuid"
],
}

751
Cargo.lock generated
View File

@ -2,6 +2,194 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "actix-codec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a"
dependencies = [
"bitflags 2.9.1",
"bytes",
"futures-core",
"futures-sink",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "actix-http"
version = "3.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44dfe5c9e0004c623edc65391dfd51daa201e7e30ebd9c9bedf873048ec32bc2"
dependencies = [
"actix-codec",
"actix-rt",
"actix-service",
"actix-utils",
"base64 0.22.1",
"bitflags 2.9.1",
"bytes",
"bytestring",
"derive_more",
"encoding_rs",
"foldhash",
"futures-core",
"http 0.2.12",
"httparse",
"httpdate",
"itoa",
"language-tags",
"local-channel",
"mime",
"percent-encoding",
"pin-project-lite",
"rand 0.9.1",
"sha1",
"smallvec",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "actix-macros"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb"
dependencies = [
"quote",
"syn 2.0.101",
]
[[package]]
name = "actix-router"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8"
dependencies = [
"bytestring",
"cfg-if",
"http 0.2.12",
"regex-lite",
"serde",
"tracing",
]
[[package]]
name = "actix-rt"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208"
dependencies = [
"futures-core",
"tokio",
]
[[package]]
name = "actix-server"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a65064ea4a457eaf07f2fba30b4c695bf43b721790e9530d26cb6f9019ff7502"
dependencies = [
"actix-rt",
"actix-service",
"actix-utils",
"futures-core",
"futures-util",
"mio",
"socket2",
"tokio",
"tracing",
]
[[package]]
name = "actix-service"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e46f36bf0e5af44bdc4bdb36fbbd421aa98c79a9bce724e1edeb3894e10dc7f"
dependencies = [
"futures-core",
"pin-project-lite",
]
[[package]]
name = "actix-utils"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8"
dependencies = [
"local-waker",
"pin-project-lite",
]
[[package]]
name = "actix-web"
version = "4.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a597b77b5c6d6a1e1097fddde329a83665e25c5437c696a3a9a4aa514a614dea"
dependencies = [
"actix-codec",
"actix-http",
"actix-macros",
"actix-router",
"actix-rt",
"actix-server",
"actix-service",
"actix-utils",
"actix-web-codegen",
"bytes",
"bytestring",
"cfg-if",
"derive_more",
"encoding_rs",
"foldhash",
"futures-core",
"futures-util",
"impl-more",
"itoa",
"language-tags",
"log",
"mime",
"once_cell",
"pin-project-lite",
"regex-lite",
"serde",
"serde_json",
"serde_urlencoded",
"smallvec",
"socket2",
"time",
"tracing",
"url",
]
[[package]]
name = "actix-web-codegen"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f591380e2e68490b5dfaf1dd1aa0ebe78d84ba7067078512b4ea6e4492d622b8"
dependencies = [
"actix-router",
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "actix-web-prom"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9df3127d20a5d01c9fc9aceb969a38d31a6767e1b48a54d55a8f56c769a84923"
dependencies = [
"actix-web",
"futures-core",
"pin-project-lite",
"prometheus",
]
[[package]]
name = "addr2line"
version = "0.24.2"
@ -213,12 +401,25 @@ dependencies = [
name = "bot"
version = "0.1.0"
dependencies = [
"chrono",
"clickhouse",
"clickhouse_pool",
"poise",
"serde",
"tokio",
"toml",
"tool_tracing",
"tracing",
"uuid",
]
[[package]]
name = "bstr"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4"
dependencies = [
"memchr",
]
[[package]]
@ -245,6 +446,15 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "bytestring"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e465647ae23b2823b0753f50decb2d5a86d2bb2cac04788fafd1f80e45378e5f"
dependencies = [
"bytes",
]
[[package]]
name = "camino"
version = "1.1.9"
@ -299,11 +509,78 @@ checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-link",
]
[[package]]
name = "cityhash-rs"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d"
[[package]]
name = "clickhouse"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9894248c4c5a4402f76a56c273836a0c32547ec8a68166aedee7e01b7b8d102"
dependencies = [
"bstr",
"bytes",
"chrono",
"cityhash-rs",
"clickhouse-derive",
"futures",
"futures-channel",
"http-body-util",
"hyper 1.6.0",
"hyper-tls",
"hyper-util",
"lz4_flex",
"replace_with",
"sealed",
"serde",
"static_assertions",
"thiserror 1.0.69",
"tokio",
"url",
"uuid",
]
[[package]]
name = "clickhouse-derive"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d70f3e2893f7d3e017eeacdc9a708fbc29a10488e3ebca21f9df6a5d2b616dbb"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.101",
]
[[package]]
name = "clickhouse_pool"
version = "0.1.0"
dependencies = [
"actix-web-prom",
"anyhow",
"clickhouse",
"deadpool",
"futures",
"futures-util",
"log",
"prometheus",
"serde",
"serde_derive",
"thiserror 1.0.69",
"tokio",
"url",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -418,6 +695,23 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "deadpool"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ed5957ff93768adf7a65ab167a17835c3d2c3c50d084fe305174c112f468e2f"
dependencies = [
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
[[package]]
name = "deranged"
version = "0.4.0"
@ -439,6 +733,27 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "derive_more"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
"unicode-xid",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -481,6 +796,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "erased-serde"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7"
dependencies = [
"serde",
"typeid",
]
[[package]]
name = "errno"
version = "0.3.12"
@ -522,6 +847,27 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@ -539,6 +885,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -739,6 +1086,12 @@ version = "0.15.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "http"
version = "0.2.12"
@ -879,6 +1232,22 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"native-tls",
"tokio",
"tokio-native-tls",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.11"
@ -1036,6 +1405,12 @@ dependencies = [
"icu_properties",
]
[[package]]
name = "impl-more"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a5a9a0ff0086c7a148acb942baaabeadf9504d10400b5a05645853729b9cd2"
[[package]]
name = "indexmap"
version = "1.9.3"
@ -1087,6 +1462,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "language-tags"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388"
[[package]]
name = "lazy_static"
version = "1.5.0"
@ -1111,6 +1492,23 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
[[package]]
name = "local-channel"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8"
dependencies = [
"futures-core",
"futures-sink",
"local-waker",
]
[[package]]
name = "local-waker"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487"
[[package]]
name = "lock_api"
version = "0.4.12"
@ -1126,6 +1524,16 @@ name = "log"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
dependencies = [
"serde",
"value-bag",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
[[package]]
name = "matchers"
@ -1195,10 +1603,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0",
]
[[package]]
name = "native-tls"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
dependencies = [
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -1224,6 +1650,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.36.7"
@ -1239,6 +1675,50 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "openssl"
version = "0.10.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "openssl-probe"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "openssl-sys"
version = "0.9.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e145e1651e858e820e4860f7b9c5e169bc1d8ce1c86043be79fa7b7634821847"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.29.1"
@ -1386,6 +1866,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "poise"
version = "0.6.1"
@ -1448,6 +1934,21 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror 1.0.69",
]
[[package]]
name = "prost"
version = "0.13.5"
@ -1471,6 +1972,12 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "pulldown-cmark"
version = "0.9.6"
@ -1597,6 +2104,12 @@ dependencies = [
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-lite"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
[[package]]
name = "regex-syntax"
version = "0.6.29"
@ -1609,6 +2122,12 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "replace_with"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884"
[[package]]
name = "reqwest"
version = "0.11.27"
@ -1809,6 +2328,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -1825,6 +2353,17 @@ dependencies = [
"untrusted",
]
[[package]]
name = "sealed"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "secrecy"
version = "0.8.0"
@ -1835,6 +2374,29 @@ dependencies = [
"zeroize",
]
[[package]]
name = "security-framework"
version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
dependencies = [
"bitflags 2.9.1",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "semver"
version = "1.0.26"
@ -1873,6 +2435,26 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "serde_derive_internals"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "serde_fmt"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4"
dependencies = [
"serde",
]
[[package]]
name = "serde_json"
version = "1.0.140"
@ -1976,6 +2558,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
dependencies = [
"libc",
]
[[package]]
name = "skeptic"
version = "0.13.7"
@ -2022,6 +2613,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "strsim"
version = "0.11.1"
@ -2034,6 +2631,84 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "sval"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc9739f56c5d0c44a5ed45473ec868af02eb896af8c05f616673a31e1d1bb09"
[[package]]
name = "sval_buffer"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f39b07436a8c271b34dad5070c634d1d3d76d6776e938ee97b4a66a5e8003d0b"
dependencies = [
"sval",
"sval_ref",
]
[[package]]
name = "sval_dynamic"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffcb072d857431bf885580dacecf05ed987bac931230736739a79051dbf3499b"
dependencies = [
"sval",
]
[[package]]
name = "sval_fmt"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f214f427ad94a553e5ca5514c95c6be84667cbc5568cce957f03f3477d03d5c"
dependencies = [
"itoa",
"ryu",
"sval",
]
[[package]]
name = "sval_json"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ed34b32e638dec9a99c8ac92d0aa1220d40041026b625474c2b6a4d6f4feb"
dependencies = [
"itoa",
"ryu",
"sval",
]
[[package]]
name = "sval_nested"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14bae8fcb2f24fee2c42c1f19037707f7c9a29a0cda936d2188d48a961c4bb2a"
dependencies = [
"sval",
"sval_buffer",
"sval_ref",
]
[[package]]
name = "sval_ref"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a4eaea3821d3046dcba81d4b8489421da42961889902342691fb7eab491d79e"
dependencies = [
"sval",
]
[[package]]
name = "sval_serde"
version = "2.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172dd4aa8cb3b45c8ac8f3b4111d644cd26938b0643ede8f93070812b87fb339"
dependencies = [
"serde",
"sval",
"sval_nested",
]
[[package]]
name = "syn"
version = "1.0.109"
@ -2223,7 +2898,9 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
@ -2240,6 +2917,16 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
@ -2581,6 +3268,12 @@ dependencies = [
"utf-8",
]
[[package]]
name = "typeid"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
[[package]]
name = "typemap_rev"
version = "0.3.0"
@ -2634,6 +3327,12 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]]
name = "untrusted"
version = "0.9.0"
@ -2664,12 +3363,64 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "uuid"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
dependencies = [
"getrandom 0.3.3",
"serde",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "value-bag"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5"
dependencies = [
"value-bag-serde1",
"value-bag-sval2",
]
[[package]]
name = "value-bag-serde1"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35540706617d373b118d550d41f5dfe0b78a0c195dc13c6815e92e2638432306"
dependencies = [
"erased-serde",
"serde",
"serde_fmt",
]
[[package]]
name = "value-bag-sval2"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fe7e140a2658cc16f7ee7a86e413e803fc8f9b5127adc8755c19f9fefa63a52"
dependencies = [
"sval",
"sval_buffer",
"sval_dynamic",
"sval_fmt",
"sval_json",
"sval_ref",
"sval_serde",
]
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"

View File

@ -1,7 +1,7 @@
[workspace]
resolver = '2'
members = ['apps/bot', 'libs/tool_tracing']
members = ['apps/bot', 'libs/tool_tracing', 'libs/clickhouse_pool']
[workspace.dependencies]
poise = '0.6.1'
@ -12,7 +12,8 @@ tokio = { version = '1.45.0', features = [
] }
serde = '1.0'
tracing = '0.1'
serde_json = "1.0"
serde_json = '1.0'
clickhouse = { version = "0.13", features = ["native-tls", "uuid", "chrono"] }
[profile.release]
lto = true

View File

@ -6,11 +6,12 @@ tasks:
silent: true
cmds:
- echo "Starting Db and Jaeger..."
- docker-compose up -d database jaeger
- podman compose up -d database jaeger dbgate
- echo "Running Rust server..."
- cargo run
stop:
desc: Stop the Rust server
silent: true
cmds:
- echo "Stopping Db and Jaeger..."
- docker-compose down
- podman compose down

View File

@ -11,5 +11,10 @@ serde = { workspace = true }
tracing = { workspace = true }
tool_tracing = { path = "../../libs/tool_tracing" }
toml = "0.8"
clickhouse_pool = { path = "../../libs/clickhouse_pool" }
clickhouse = { workspace = true }
uuid = { version = "1.16", features = ["serde", "v4"] }
chrono = { version = "0.4.41", features = ["serde"] }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -1,9 +1,10 @@
use config::parse_local_config;
use poise::serenity_prelude as serenity;
use tracing::info;
use tracing::{error, info};
pub mod config;
pub mod dotenv;
pub mod model;
struct Data {} // User data, which is stored and accessible in all command invocations
type Error = Box<dyn std::error::Error + Send + Sync>;
@ -24,9 +25,22 @@ async fn age(
#[tokio::main]
async fn main() {
let config = parse_local_config();
tool_tracing::init::init_tracing(config.tracing.clone(), config.bot_name.clone());
info!("Init Database");
let datalake_config = model::create_pool_manager(config.persistence.clone()).unwrap();
let _manager = match model::create_manager_and_init(datalake_config).await {
Ok(manager) => {
info!("Database manager created successfully");
manager
}
Err(e) => {
error!("Failed to create database manager: {}", e);
return;
}
};
info!("Database manager initialized successfully");
info!("Starting bot {}", config.bot_name);
let intents = serenity::GatewayIntents::non_privileged();

56
apps/bot/src/model/mod.rs Normal file
View File

@ -0,0 +1,56 @@
use std::{error::Error, sync::Arc};
pub mod trivial;
use clickhouse_pool::{
config::{ClickhouseConfig, DatalakeConfig, RetryConfig},
pool_manager::PoolManager,
traits::Model,
};
use tracing::{error, info, instrument};
use trivial::Trivial;
use crate::config::PersistenceConfig;
pub fn create_pool_manager(db_config: PersistenceConfig) -> Result<Arc<DatalakeConfig>, String> {
let mut config = ClickhouseConfig::new(
db_config.host.clone(),
db_config.port,
db_config.database.clone(),
db_config.user.clone(),
db_config.password.clone(),
10,
30,
5,
);
config.proto = "http".to_string();
let retry_config = RetryConfig::default();
let datalake_config = Arc::new(DatalakeConfig::new(config, retry_config));
Ok(datalake_config)
}
#[instrument(skip(datalake_config))]
pub async fn create_manager_and_init(
datalake_config: Arc<DatalakeConfig>,
) -> Result<Arc<PoolManager>, Box<dyn Error>> {
let manager = PoolManager::new(datalake_config, None).await;
let _ = match manager
.execute_with_retry(Trivial::create_table_sql())
.await
{
Ok(_) => {
info!("Table {} created successfully", Trivial::table_name());
}
Err(e) => {
error!("Failed to create table {} : {}", Trivial::table_name(), e);
return Err(Box::new(e));
}
};
Ok(Arc::new(manager))
}

View File

@ -0,0 +1,158 @@
use chrono::{DateTime, Utc};
use clickhouse::Row;
use clickhouse_pool::traits::Model;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum TrivialRewardKind {
OnlyTheFirstOne = 0,
TopThree = 1,
TopFive = 2,
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Row, Serialize, Deserialize)]
pub struct Trivial {
#[serde(with = "clickhouse::serde::uuid")]
pub id: Uuid,
pub name: String,
pub description: String,
pub guild_id: u64,
pub channel_id: u64,
#[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
pub created_at: DateTime<Utc>,
#[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
pub updated_at: DateTime<Utc>,
pub creator_id: u64,
pub updater_id: u64,
pub random_question: bool,
pub role_ping: u64,
pub role_ping_enabled: bool,
pub reward_kind: TrivialRewardKind,
pub reward_amount: u64,
pub taken_into_account: bool,
}
impl Model for Trivial {
type T = Trivial;
fn table_name() -> &'static str {
"trivial"
}
fn create_table_sql() -> &'static str {
r#"
CREATE TABLE IF NOT EXISTS trivial (
id UUID PRIMARY KEY,
name String,
description String,
guild_id UInt64,
channel_id UInt64,
created_at DateTime64(3),
updated_at DateTime64(3),
creator_id UInt64,
updater_id UInt64,
random_question Bool,
role_ping UInt64,
role_ping_enabled Bool,
reward_kind Enum8('OnlyTheFirstOne' = 0, 'TopThree' = 1, 'TopFive' = 2),
reward_amount UInt64,
taken_into_account Bool
) ENGINE = MergeTree()
ORDER BY id
"#
}
fn column_names() -> Vec<&'static str> {
vec![
"id",
"name",
"description",
"guild_id",
"channel_id",
"created_at",
"updated_at",
"creator_id",
"updater_id",
"random_question",
"role_ping",
"role_ping_enabled",
"reward_kind",
"reward_amount",
"taken_into_account",
]
}
fn to_row(&self) -> (Vec<&'static str>, Vec<String>) {
(
Self::column_names(),
vec![
self.id.to_string(),
self.name.clone(),
self.description.clone(),
self.guild_id.to_string(),
self.channel_id.to_string(),
self.created_at.to_string(),
self.updated_at.to_string(),
self.creator_id.to_string(),
self.updater_id.to_string(),
self.random_question.to_string(),
self.role_ping.to_string(),
self.role_ping_enabled.to_string(),
format!("{:?}", self.reward_kind),
self.reward_amount.to_string(),
self.taken_into_account.to_string(),
],
)
}
fn insert_query(&self) -> String {
let (columns, values) = self.to_row();
let columns_str = columns.join(", ");
let values_str = values.join(", ");
format!(
"INSERT INTO {} ({}) VALUES ({})",
Self::table_name(),
columns_str,
values_str
)
}
fn batch_insert_query(items: &[Self::T]) -> String {
let mut queries = Vec::new();
for item in items {
let (columns, values) = item.to_row();
let columns_str = columns.join(", ");
let values_str = values.join(", ");
queries.push(format!(
"INSERT INTO {} ({}) VALUES ({})",
Self::table_name(),
columns_str,
values_str
));
}
queries.join("; ")
}
fn build_select_query(
where_clause: Option<&str>,
limit: Option<u64>,
offset: Option<u64>,
) -> String {
let mut query = format!("SELECT * FROM {}", Self::table_name());
if let Some(where_clause) = where_clause {
query.push_str(&format!(" WHERE {}", where_clause));
}
if let Some(limit) = limit {
query.push_str(&format!(" LIMIT {}", limit));
}
if let Some(offset) = offset {
query.push_str(&format!(" OFFSET {}", offset));
}
query
}
}

View File

@ -3,7 +3,8 @@ services:
image: clickhouse/clickhouse-server:latest
ports:
- 8123:8123
- 9000:9000
- 9000:9000/tcp
- 9000:9000/udp
ulimits:
nofile:
soft: 262144
@ -12,6 +13,7 @@ services:
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: password
CLICKHOUSE_DB: default
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
jaeger:
image: jaegertracing/jaeger:${JAEGER_VERSION:-2.6.0}
ports:
@ -22,3 +24,25 @@ services:
- "9411:9411"
environment:
- LOG_LEVEL=debug
dbgate:
image: dbgate/dbgate:latest
user: "1000"
ports:
- 3000:3000
environment:
CONNECTIONS: click
LABEL_click: ClickHouse
SERVER_click: database
USER_click: default
PASSWORD_click: password
PORT_click: 9000
ENGINE_click: clickhouse@dbgate-plugin-clickhouse
DATABASE_click: default
depends_on:
- database
volumes:
- dbgate-data:/data
volumes:
dbgate-data:
driver: local

View File

@ -0,0 +1,22 @@
[package]
name = "clickhouse_pool"
version = "0.1.0"
edition = "2021"
[dependencies]
clickhouse = { workspace = true }
deadpool = "0.12.2"
tokio = { workspace = true }
futures = "0.3.17"
futures-util = "0.3"
serde = { workspace = true }
serde_derive = "1.0"
log = { version = "^0.4.21", features = ["kv_serde"] }
prometheus = "0.13"
actix-web-prom = "0.6"
url = "2.5.2"
anyhow = "1.0"
thiserror = "1.0.31"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -0,0 +1,43 @@
{
"name": "clickhouse_pool",
"$schema": "../../node_modules/nx/schemas/project-schema.json",
"projectType": "library",
"sourceRoot": "libs/clickhouse_pool/src",
"targets": {
"build": {
"executor": "@monodon/rust:check",
"outputs": [
"{options.target-dir}"
],
"options": {
"target-dir": "dist/target/clickhouse_pool"
}
},
"test": {
"cache": true,
"executor": "@monodon/rust:test",
"outputs": [
"{options.target-dir}"
],
"options": {
"target-dir": "dist/target/clickhouse_pool"
},
"configurations": {
"production": {
"release": true
}
}
},
"lint": {
"cache": true,
"executor": "@monodon/rust:lint",
"outputs": [
"{options.target-dir}"
],
"options": {
"target-dir": "dist/target/clickhouse_pool"
}
}
},
"tags": []
}

View File

@ -0,0 +1,5 @@
# Clickhouse_pool
This libs is a fork from <https://github.com/ranger-finance/clickhouse-pool/tree/master?tab=readme-ov-file>
Lot's of thing will move !!

View File

@ -0,0 +1,36 @@
use tokio::sync::mpsc;
use crate::pool::ClickhouseError;
pub enum BatchCommand<T> {
Add(T),
Flush,
}
pub struct BatchSender<T> {
pub tx: mpsc::Sender<BatchCommand<T>>,
}
impl<T> BatchSender<T> {
pub async fn add(&self, item: T) -> Result<(), ClickhouseError> {
self.tx
.send(BatchCommand::Add(item))
.await
.map_err(|_| ClickhouseError::BatchInsertionError("Channel closed".to_string()))
}
pub async fn flush(&self) -> Result<(), ClickhouseError> {
self.tx
.send(BatchCommand::Flush)
.await
.map_err(|_| ClickhouseError::BatchInsertionError("Channel closed".to_string()))
}
}
impl<T> Clone for BatchSender<T> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}

View File

@ -0,0 +1,182 @@
use serde_derive::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use url::Url;
pub const CONNECTION_TIMEOUT_SECONDS_DEFAULT: u64 = 3;
pub const QUERY_TIMEOUT_SECONDS_DEFAULT: u64 = 10;
pub const POOL_MAX_CONNECTIONS_DEFAULT: u32 = 256;
pub const PORT_DEFAULT: u16 = 9000;
pub const MAX_RETRIES_DEFAULT: u32 = 5;
pub const INITIAL_BACKOFF_MS_DEFAULT: u64 = 100;
pub const MAX_BACKOFF_MS_DEFAULT: u64 = 10_000;
pub const BACKOFF_MULTIPLIER_DEFAULT: f64 = 2.0;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClickhouseConfig {
/// Database host
pub host: String,
/// Database protocol
pub proto: String,
/// Database port
pub port: u16,
/// Database name
pub database: String,
/// Username for authentication
pub username: String,
/// Password for authentication
pub password: String,
/// Connection timeout in seconds
#[serde(default)]
pub connect_timeout_seconds: u64,
/// Query timeout in seconds
#[serde(default)]
pub query_timeout_seconds: u64,
/// Max. number of connections in the pool
#[serde(default)]
pub max_connections: u32,
}
impl Default for ClickhouseConfig {
fn default() -> Self {
Self {
host: "localhost".to_string(),
port: PORT_DEFAULT,
database: "default".to_string(),
username: "default".to_string(),
password: String::new(),
connect_timeout_seconds: CONNECTION_TIMEOUT_SECONDS_DEFAULT,
query_timeout_seconds: QUERY_TIMEOUT_SECONDS_DEFAULT,
max_connections: POOL_MAX_CONNECTIONS_DEFAULT,
proto: "https".to_string(),
}
}
}
impl ClickhouseConfig {
pub fn new(
host: String,
port: u16,
database: String,
username: String,
password: String,
connect_timeout_seconds: u64,
query_timeout_seconds: u64,
max_connections: u32,
) -> Self {
Self {
host,
port,
database,
username,
password,
connect_timeout_seconds,
query_timeout_seconds,
max_connections,
proto: "https".to_string(),
}
}
pub fn proto_prefix(&self) -> String {
format!("{}://", self.proto)
}
pub fn connection_url(&self) -> Url {
let url_str = format!("{}{}:{}", self.proto_prefix(), self.host, self.port);
Url::parse(&url_str).expect("Failed to parse Clickhouse URL")
}
pub fn authenticated_connection_url(&self) -> Url {
let url_str = format!(
"{}{}:{}@{}:{}",
self.proto_prefix(),
self.username,
self.password,
self.host,
self.port,
);
Url::parse(&url_str).expect("Failed to parse authenticated Clickhouse URL")
}
pub fn connect_timeout(&self) -> Duration {
Duration::from_secs(self.connect_timeout_seconds)
}
pub fn query_timeout(&self) -> Duration {
Duration::from_secs(self.query_timeout_seconds)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
/// Max number of retry attempts
pub max_retries: u32,
/// Initial backoff duration in milliseconds
pub initial_backoff_ms: u64,
/// Max backoff duration in milliseconds
pub max_backoff_ms: u64,
/// Backoff multiplier for exponential backoff
pub backoff_multiplier: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: MAX_RETRIES_DEFAULT,
initial_backoff_ms: INITIAL_BACKOFF_MS_DEFAULT,
max_backoff_ms: MAX_BACKOFF_MS_DEFAULT,
backoff_multiplier: BACKOFF_MULTIPLIER_DEFAULT,
}
}
}
impl RetryConfig {
pub fn backoff_duration(&self, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::from_millis(0);
}
let backoff_ms = (self.initial_backoff_ms as f64
* self.backoff_multiplier.powi(attempt as i32 - 1)) as u64;
let capped_backoff_ms = std::cmp::min(backoff_ms, self.max_backoff_ms);
Duration::from_millis(capped_backoff_ms)
}
}
#[derive(Debug, Clone)]
pub struct DatalakeConfig {
pub clickhouse: Arc<ClickhouseConfig>,
pub retry: Arc<RetryConfig>,
}
impl Default for DatalakeConfig {
fn default() -> Self {
Self {
clickhouse: Arc::new(ClickhouseConfig::default()),
retry: Arc::new(RetryConfig::default()),
}
}
}
impl DatalakeConfig {
pub fn new(clickhouse_config: ClickhouseConfig, retry: RetryConfig) -> Self {
Self {
clickhouse: Arc::new(clickhouse_config),
retry: Arc::new(retry),
}
}
}

View File

@ -0,0 +1,6 @@
pub mod batch_processor;
pub mod config;
pub mod metrics;
pub mod pool;
pub mod pool_manager;
pub mod traits;

View File

@ -0,0 +1,277 @@
use actix_web_prom::{PrometheusMetrics, PrometheusMetricsBuilder};
use prometheus::{CounterVec, GaugeVec, IntCounter, IntCounterVec, IntGaugeVec, Opts};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Prometheus(#[from] prometheus::Error),
}
#[derive(Debug, Default, Clone)]
pub enum Kind {
#[default]
Default,
GaugeVec,
CounterVec,
IntGaugeVec,
IntCounterVec,
IntCounter,
}
#[derive(Debug, Default, Clone)]
pub struct MetricConfig<'a> {
pub kind: Kind,
pub name: &'a str,
pub help: &'a str,
pub label_names: &'a [&'a str],
}
pub type SharedRegistrar = Arc<Registrar>;
/// An abstracted metrics registrar for Prometheus.
#[derive(Clone)]
pub struct Registrar {
prometheus: Arc<PrometheusMetrics>,
int_counters_vecs: Arc<RwLock<HashMap<String, IntCounterVec>>>,
int_counters: Arc<RwLock<HashMap<String, IntCounter>>>,
int_gauges_vecs: Arc<RwLock<HashMap<String, IntGaugeVec>>>,
counters_vecs: Arc<RwLock<HashMap<String, CounterVec>>>,
gauges_vecs: Arc<RwLock<HashMap<String, GaugeVec>>>,
}
impl std::fmt::Debug for Registrar {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Registrar")
.field("prometheus", &format!("{:?}", self.prometheus.registry))
.field(
"int_counters_vecs",
&format!("{:?}", self.int_counters_vecs),
)
.field("int_gauges_vecs", &format!("{:?}", self.int_gauges_vecs))
.field("counters_vecs", &format!("{:?}", self.counters_vecs))
.field("gauges_vecs", &format!("{:?}", self.gauges_vecs))
.finish()
}
}
impl Default for Registrar {
fn default() -> Self {
Self {
prometheus: Arc::new(PrometheusMetricsBuilder::new("default").build().unwrap()),
int_counters_vecs: Default::default(),
int_counters: Default::default(),
int_gauges_vecs: Default::default(),
counters_vecs: Default::default(),
gauges_vecs: Default::default(),
}
}
}
pub trait Registry {
fn with_metric_configs<'a>(&self, metrics: &'a [MetricConfig<'a>]) -> Result<(), Error>;
fn with_metric_config<'a>(&self, metric: &'a MetricConfig<'a>) -> Result<(), Error>;
}
impl Registry for Registrar {
fn with_metric_config<'a>(&self, metric: &'a MetricConfig<'a>) -> Result<(), Error> {
log::info!(
"Attempting to register metric with name {:?} and labels {:?}",
metric.name,
metric.label_names
);
match &metric.kind {
Kind::Default => panic!("Registrar metric kind is `default`, so panicking."),
Kind::GaugeVec => {
let gauge =
GaugeVec::new(Opts::new(metric.name, metric.help), metric.label_names).unwrap();
match self.prometheus.registry.register(Box::new(gauge.clone())) {
Ok(()) => {
self.gauges_vecs
.write()
.unwrap()
.insert(metric.name.to_string(), gauge);
Ok(())
}
Err(e) => {
if let prometheus::Error::AlreadyReg = e {
log::info!("Metric {:?} is already registered.", metric.name);
Ok(())
} else {
log::error!("Failed to register metric {:?}. {:?}", metric.name, e);
Err(Error::Prometheus(e))
}
}
}
}
Kind::CounterVec => {
let counter =
CounterVec::new(Opts::new(metric.name, metric.help), metric.label_names)
.unwrap();
match self.prometheus.registry.register(Box::new(counter.clone())) {
Ok(()) => {
self.counters_vecs
.write()
.unwrap()
.insert(metric.name.to_string(), counter);
Ok(())
}
Err(e) => {
if let prometheus::Error::AlreadyReg = e {
log::info!("Metric {:?} is already registered.", metric.name);
Ok(())
} else {
log::error!("Failed to register metric {:?}. {:?}", metric.name, e);
Err(Error::Prometheus(e))
}
}
}
}
Kind::IntGaugeVec => {
let gauge =
IntGaugeVec::new(Opts::new(metric.name, metric.help), metric.label_names)
.unwrap();
match self.prometheus.registry.register(Box::new(gauge.clone())) {
Ok(()) => {
self.int_gauges_vecs
.write()
.unwrap()
.insert(metric.name.to_string(), gauge);
Ok(())
}
Err(e) => {
if let prometheus::Error::AlreadyReg = e {
log::info!("Metric {:?} is already registered.", metric.name);
Ok(())
} else {
log::error!("Failed to register metric {:?}. {:?}", metric.name, e);
Err(Error::Prometheus(e))
}
}
}
}
Kind::IntCounterVec => {
let counter =
IntCounterVec::new(Opts::new(metric.name, metric.help), metric.label_names)
.unwrap();
match self.prometheus.registry.register(Box::new(counter.clone())) {
Ok(()) => {
self.int_counters_vecs
.write()
.unwrap()
.insert(metric.name.to_string(), counter);
Ok(())
}
Err(e) => {
if let prometheus::Error::AlreadyReg = e {
log::info!("Metric {:?} is already registered.", metric.name);
Ok(())
} else {
log::error!("Failed to register metric {:?}. {:?}", metric.name, e);
Err(Error::Prometheus(e))
}
}
}
}
Kind::IntCounter => {
let counter = IntCounter::new(metric.name, metric.help).unwrap();
match self.prometheus.registry.register(Box::new(counter.clone())) {
Ok(()) => {
self.int_counters
.write()
.unwrap()
.insert(metric.name.to_string(), counter);
Ok(())
}
Err(e) => {
if let prometheus::Error::AlreadyReg = e {
log::info!("Metric {:?} is already registered.", metric.name);
Ok(())
} else {
log::error!("Failed to register metric {:?}. {:?}", metric.name, e);
Err(Error::Prometheus(e))
}
}
}
}
}
}
fn with_metric_configs<'a>(&self, metrics: &'a [MetricConfig<'a>]) -> Result<(), Error> {
for metric in metrics {
match self.with_metric_config(metric) {
Ok(()) => (),
Err(e) => return Err(e),
}
}
Ok(())
}
}
impl Registrar {
pub fn new(prometheus: Arc<PrometheusMetrics>) -> Self {
Self {
prometheus,
..Default::default()
}
}
// Int
pub fn inc_int_counter(&self, key: &str) {
let mut counters = self.int_counters.write().unwrap();
let counter = match counters.get_mut(key) {
Some(r) => r,
None => return,
};
counter.inc()
}
pub fn inc_int_counter_vec_mut(&self, key: &str, labels: &[&str]) {
let mut counters = self.int_counters_vecs.write().unwrap();
let counter = match counters.get_mut(key) {
Some(r) => r,
None => return,
};
counter.with_label_values(labels).inc()
}
pub fn inc_by_int_counter_vec_mut(&self, key: &str, labels: &[&str], value: u64) {
let mut counters = self.int_counters_vecs.write().unwrap();
let counter = match counters.get_mut(key) {
Some(r) => r,
None => return,
};
counter.with_label_values(labels).inc_by(value)
}
pub fn set_int_gauge_vec_mut(&self, key: &str, labels: &[&str], value: i64) {
let mut gauges = self.int_gauges_vecs.write().unwrap();
let gauge = match gauges.get_mut(key) {
Some(r) => r,
None => return,
};
gauge.with_label_values(labels).set(value)
}
// Floating
pub fn inc_counter_vec_mut(&self, key: &str, labels: &[&str]) {
let mut counters = self.counters_vecs.write().unwrap();
let counter = match counters.get_mut(key) {
Some(r) => r,
None => return,
};
counter.with_label_values(labels).inc()
}
pub fn set_gauge_vec_mut(&self, key: &str, labels: &[&str], value: f64) {
let mut gauges = self.gauges_vecs.write().unwrap();
let gauge = match gauges.get_mut(key) {
Some(r) => r,
None => return,
};
gauge.with_label_values(labels).set(value)
}
}

View File

@ -0,0 +1,654 @@
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use clickhouse::Client;
use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleError};
use thiserror::Error;
use tokio::task;
use tokio::time::timeout;
use crate::config::{ClickhouseConfig, DatalakeConfig};
use crate::metrics::{Kind, MetricConfig, Registry, SharedRegistrar};
#[derive(Debug, Error)]
pub enum ClickhouseError {
#[error("Clickhouse client error: {0}")]
Client(#[from] clickhouse::error::Error),
#[error("Connection validation failed: {0}")]
Validation(String),
#[error("Connection timed out")]
Timeout,
#[error("Pool error: {0}")]
Pool(String),
#[error("Shutdown in progress")]
ShuttingDown,
#[error("Batch insertion error: {0}")]
BatchInsertionError(String),
}
impl From<tokio::time::error::Elapsed> for ClickhouseError {
fn from(_: tokio::time::error::Elapsed) -> Self {
Self::Timeout
}
}
impl<T: std::fmt::Display> From<PoolError<T>> for ClickhouseError {
fn from(value: PoolError<T>) -> Self {
Self::Pool(value.to_string())
}
}
#[derive(Debug, Clone)]
pub struct PoolMetrics {
pub size: usize,
pub available: usize,
pub in_use: usize,
pub max_size: usize,
pub min_size: usize,
pub waiters: usize,
}
pub struct ClickhouseConnection {
client: Client,
last_used: Instant,
id: u64,
query_count: AtomicU64,
created_at: Instant,
}
impl ClickhouseConnection {
pub fn new(client: Client, id: u64) -> Self {
Self {
client,
last_used: Instant::now(),
id,
query_count: AtomicU64::new(0),
created_at: Instant::now(),
}
}
pub fn id(&self) -> u64 {
self.id
}
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
pub fn idle_time(&self) -> Duration {
self.last_used.elapsed()
}
pub fn query_count(&self) -> u64 {
self.query_count.load(Ordering::Relaxed)
}
pub async fn health_check(&self) -> Result<(), ClickhouseError> {
match self.client.query("SELECT 1").execute().await {
Ok(_) => Ok(()),
Err(e) => Err(ClickhouseError::Client(e)),
}
}
}
impl Deref for ClickhouseConnection {
type Target = Client;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl DerefMut for ClickhouseConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
impl fmt::Debug for ClickhouseConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClickhouseConnection")
.field("id", &self.id)
.field("created_at", &self.created_at)
.field("query_count", &self.query_count)
.field("last_used", &self.last_used)
.finish()
}
}
pub fn get_query_type(query: &str) -> &'static str {
let query = query.trim_start().to_uppercase();
if query.starts_with("SELECT") {
"select"
} else if query.starts_with("INSERT") {
"insert"
} else if query.starts_with("CREATE") {
"create"
} else if query.starts_with("ALTER") {
"alter"
} else if query.starts_with("DROP") {
"drop"
} else {
"other"
}
}
#[derive(Debug)]
pub struct ClickhouseConnectionManager {
config: Arc<ClickhouseConfig>,
next_connection_id: AtomicU64,
is_shutting_down: Arc<AtomicBool>,
metrics: Option<SharedRegistrar>,
}
impl ClickhouseConnectionManager {
pub fn new(config: Arc<ClickhouseConfig>, metrics: Option<SharedRegistrar>) -> Self {
Self {
config,
next_connection_id: AtomicU64::new(1),
is_shutting_down: Arc::new(AtomicBool::new(false)),
metrics,
}
}
pub fn initiate_shutdown(&self) {
self.is_shutting_down.store(true, Ordering::SeqCst);
log::info!("Clickhouse connection manager shutdown in progress");
}
pub fn create_client(&self) -> Result<Client, ClickhouseError> {
let url = self.config.authenticated_connection_url();
let client = Client::default()
.with_url(url)
.with_user(&self.config.username)
.with_password(&self.config.password)
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "1");
Ok(client)
}
}
impl Manager for ClickhouseConnectionManager {
type Type = ClickhouseConnection;
type Error = ClickhouseError;
async fn create(&self) -> Result<Self::Type, Self::Error> {
if self.is_shutting_down.load(Ordering::SeqCst) {
return Err(ClickhouseError::ShuttingDown);
}
let connection_id = self.next_connection_id.fetch_add(1, Ordering::SeqCst);
let start = Instant::now();
let config = &self.config.clone();
log::debug!(
"Creating new Clickhouse connection [id: {}] to: {}:{}",
connection_id,
config.host,
config.port
);
let client = self.create_client()?;
let validation_timeout = Duration::from_secs(config.connect_timeout_seconds);
let validation = match timeout(validation_timeout, client.query("SELECT 1").execute()).await
{
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(ClickhouseError::Client(e)),
Err(_) => Err(ClickhouseError::Timeout),
};
let duration = start.elapsed();
if let Some(metrics) = &self.metrics {
metrics.set_gauge_vec_mut(
"clickhouse_connection_creation_second",
&["create"],
duration.as_secs_f64(),
);
}
match validation {
Ok(()) => {
log::debug!(
"Connection established: [id: {}] in {:?}",
connection_id,
duration
);
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connections_created_total",
&["success"],
);
}
Ok(ClickhouseConnection::new(client, connection_id))
}
Err(e) => {
log::error!(
"Failed to validate ClickHouse connection (id: {}): {}",
connection_id,
e
);
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connections_created_total",
&["failure"],
);
}
Err(e)
}
}
}
async fn recycle(
&self,
conn: &mut Self::Type,
_: &Metrics,
) -> Result<(), RecycleError<Self::Error>> {
if self.is_shutting_down.load(Ordering::SeqCst) {
return Err(RecycleError::Message("Shutting down".into()));
}
log::debug!("Testing health of connection: [id: {}]", conn.id());
let validation_timeout = Duration::from_secs(self.config.connect_timeout_seconds);
match timeout(validation_timeout, conn.query("SELECT 1").execute()).await {
Ok(Ok(_)) => {
log::debug!("Connection [id: {}] health check passed", conn.id());
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connections_health_checks_total",
&["success"],
);
}
Ok(())
}
Ok(Err(e)) => {
log::warn!("Connection [id: {}] health check failed: {}", conn.id(), e);
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connections_health_checks_total",
&["failure"],
);
}
Err(RecycleError::Message(
format!("Health check failed: {}", e).into(),
))
}
Err(_) => {
log::warn!(
"Connection [id: {}] health check timed out after: {:?}",
conn.id(),
validation_timeout
);
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connections_health_checks_total",
&["timeout"],
);
}
Err(RecycleError::Message("Health check timed out".into()))
}
}
}
}
pub type Pool = deadpool::managed::Pool<ClickhouseConnectionManager>;
pub type PooledConnection = Object<ClickhouseConnectionManager>;
pub struct ClickhouseConnectionPool {
pool: Pool,
config: Arc<DatalakeConfig>,
metrics: Option<SharedRegistrar>,
is_initialized: AtomicBool,
}
impl ClickhouseConnectionPool {
pub fn new(config: Arc<DatalakeConfig>, metrics: Option<SharedRegistrar>) -> Self {
if let Some(metrics_ref) = &metrics {
Self::register_metrics(metrics_ref);
}
let initial_size = config.clickhouse.max_connections as usize;
let manager = ClickhouseConnectionManager::new(config.clickhouse.clone(), metrics.clone());
let pool = deadpool::managed::Pool::<ClickhouseConnectionManager>::builder(manager)
.max_size(initial_size)
.build()
.expect("Failed to build connection pool");
Self {
pool,
config,
metrics,
is_initialized: AtomicBool::new(false),
}
}
pub async fn initialize(&self) -> Result<(), ClickhouseError> {
if self.is_initialized.load(Ordering::SeqCst) {
return Ok(());
}
log::info!("Initializing Clickhouse connection pool");
let warmup_count = self.config.clickhouse.max_connections as usize;
let mut warmup_handles = Vec::with_capacity(warmup_count);
for i in 0..warmup_count {
let pool = self.pool.clone();
let handle = task::spawn(async move {
match pool.get().await {
Ok(conn) => match conn.health_check().await {
Ok(_) => {
log::debug!("Warm-up connection {} initialized successfully", i);
Ok(())
}
Err(e) => {
log::error!("Warm-up connection {} health check failed: {}", i, e);
Err(e)
}
},
Err(e) => {
log::error!("Failed to get warm-up connection {}: {}", i, e);
Err(ClickhouseError::Pool(e.to_string()))
}
}
});
warmup_handles.push(handle);
}
let mut warmup_success_count = 0;
for (i, handle) in warmup_handles.into_iter().enumerate() {
match handle.await {
Ok(Ok(_)) => {
warmup_success_count += 1;
}
Ok(Err(e)) => {
log::warn!("Warm-up connection {} failed: {}", i, e);
}
Err(e) => {
log::error!("Warm-up task {} panicked: {}", i, e);
}
}
}
log::info!(
"Connection pool warm-up complete: {}/{} successful",
warmup_success_count,
warmup_count
);
self.is_initialized.store(true, Ordering::SeqCst);
if let Some(metrics) = &self.metrics {
let status = self.pool.status();
metrics.set_int_gauge_vec_mut(
"clickhouse_pool_connections",
&["available"],
status.available as i64,
);
metrics.set_int_gauge_vec_mut(
"clickhouse_pool_connections",
&["size"],
status.size as i64,
);
}
Ok(())
}
pub async fn get_connection(&self) -> Result<PooledConnection, ClickhouseError> {
if !self.is_initialized.load(Ordering::SeqCst) {
log::warn!("Attempting to get connection from uninitialized pool");
}
let start = Instant::now();
let timeout_duration = Duration::from_secs(self.config.clickhouse.connect_timeout_seconds);
for attempt in 0..3 {
match tokio::time::timeout(timeout_duration, self.pool.get()).await {
Ok(Ok(conn)) => {
let duration = start.elapsed();
if let Some(metrics) = &self.metrics {
metrics.set_gauge_vec_mut(
"clickhouse_connection_acquisition_seconds",
&["success"],
duration.as_secs_f64(),
);
metrics.inc_int_counter_vec_mut(
"clickhouse_connection_acquisition_total",
&["success"],
);
}
log::debug!(
"Connection acquired in {:?} (attempt {})",
duration,
attempt + 1
);
return Ok(conn);
}
Ok(Err(e)) => {
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connection_acquisition_total",
&["failure"],
);
}
log::warn!(
"Failed to get connection from pool (attempt {}): {}",
attempt + 1,
e
);
if attempt >= 2 {
return Err(ClickhouseError::Pool(e.to_string()));
}
}
Err(_) => {
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connection_acquisition_total",
&["timeout"],
);
}
log::warn!(
"Timed out waiting for connection (attempt {}) after {:?}",
attempt + 1,
timeout_duration
);
if attempt >= 2 {
return Err(ClickhouseError::Timeout);
}
}
}
let backoff = Duration::from_millis(50 * 2u64.pow(attempt));
tokio::time::sleep(backoff).await;
}
Err(ClickhouseError::Pool(
"Failed to get connection after retries".to_string(),
))
}
pub async fn shutdown(&self) -> Result<(), ClickhouseError> {
log::info!("Initiating graceful shutdown of ClickHouse connection pool");
let pool_manager = self.pool.manager();
pool_manager.initiate_shutdown();
let status = self.pool.status();
log::info!(
"Connection pool status before shutdown: size={}, available={}, in_use={}",
status.size,
status.available,
status.size - status.available
);
let drain_timeout = Duration::from_secs(30);
let drain_start = Instant::now();
loop {
let status = self.pool.status();
let in_use = status.size - status.available;
if in_use == 0 {
log::info!("All connections returned to pool, proceeding with shutdown");
break;
}
if drain_start.elapsed() > drain_timeout {
log::warn!(
"Shutdown drain timeout exceeded, {} connections still in use",
in_use
);
break;
}
log::info!("Waiting for {} connections to be returned to pool", in_use);
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Close all connections
self.pool.close();
log::info!("All connections closed");
log::info!("ClickHouse connection pool shutdown complete");
Ok(())
}
fn register_metrics(metrics: &SharedRegistrar) {
let metric_configs = [
// Connection
MetricConfig {
kind: Kind::IntCounterVec,
name: "clickhouse_connections_created_total",
help: "Total no. of connections created",
label_names: &["status"],
},
MetricConfig {
kind: Kind::IntGaugeVec,
name: "clickhouse_pool_connections",
help: "Current no. of connections in the pool",
label_names: &["state"],
},
MetricConfig {
kind: Kind::IntCounterVec,
name: "clickhouse_connetion_health_checks_total",
help: "Total no. of connection health checks",
label_names: &["status"],
},
MetricConfig {
kind: Kind::GaugeVec,
name: "clickhouse_connection_creation_seconds",
help: "Time taken to create connections",
label_names: &["operation"],
},
// Queries
MetricConfig {
kind: Kind::IntCounterVec,
name: "clickhouse_queries_total",
help: "Total no. of queries executed",
label_names: &["type"],
},
MetricConfig {
kind: Kind::IntCounterVec,
name: "clickhouse_query_errors_total",
help: "Total no. of query errors",
label_names: &["type"],
},
MetricConfig {
kind: Kind::GaugeVec,
name: "clickhouse_query_duration_seconds",
help: "Query execution time in seconds",
label_names: &["type"],
},
// Batch queries
MetricConfig {
kind: Kind::IntCounterVec,
name: "clickhouse_batch_query_errors_total",
help: "Total number of batch query errors",
label_names: &["type"],
},
MetricConfig {
kind: Kind::GaugeVec,
name: "clickhouse_batch_query_duration_seconds",
help: "Batch query execution time in seconds",
label_names: &["type"],
},
// Connection acquisition
MetricConfig {
kind: Kind::GaugeVec,
name: "clickhouse_connection_acquisition_seconds",
help: "Time taken to acquire a connection from the pool",
label_names: &["status"],
},
MetricConfig {
kind: Kind::IntCounterVec,
name: "clickhouse_connection_acquisition_total",
help: "Total number of connection acquisition attempts",
label_names: &["status"],
},
MetricConfig {
kind: Kind::IntCounterVec,
name: "clickhouse_connections_recycled_total",
help: "Total number of connections recycled",
label_names: &["reason"],
},
MetricConfig {
kind: Kind::GaugeVec,
name: "clickhouse_connection_recycling_seconds",
help: "Time taken for connection recycling",
label_names: &["operation"],
},
];
metrics.with_metric_configs(&metric_configs).ok();
}
pub fn status(&self) -> PoolMetrics {
let status = self.pool.status();
PoolMetrics {
size: status.size,
available: status.available,
in_use: status.size - status.available,
max_size: status.max_size,
min_size: status.max_size,
waiters: status.waiting,
}
}
}

View File

@ -0,0 +1,415 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::batch_processor::{BatchCommand, BatchSender};
use crate::config::DatalakeConfig;
use crate::metrics::SharedRegistrar;
use crate::pool::{get_query_type, ClickhouseConnectionPool, ClickhouseError};
use crate::traits::Model;
use anyhow::Result;
use serde::de::DeserializeOwned;
use tokio::sync::mpsc;
use tokio::time::interval;
#[derive(Clone)]
pub struct PoolManager {
pool: Arc<ClickhouseConnectionPool>,
config: Arc<DatalakeConfig>,
metrics: Option<SharedRegistrar>,
last_recycle_time: u64,
}
impl PoolManager {
pub async fn new(config: Arc<DatalakeConfig>, metrics: Option<SharedRegistrar>) -> Self {
let pool = Arc::new(ClickhouseConnectionPool::new(
config.clone(),
metrics.clone(),
));
let _ = match pool.initialize().await {
Ok(_) => {
log::debug!("Pool warmed up and initialized successfully");
}
Err(e) => {
log::error!("Error warming up and initializing pool: {:?}", e);
}
};
Self {
pool,
config,
metrics,
last_recycle_time: 0,
}
}
pub fn get_pool(&self) -> Arc<ClickhouseConnectionPool> {
self.pool.clone()
}
pub fn seconds_since_last_recycle(&self) -> u64 {
let last = self.last_recycle_time;
if last == 0 {
return u64::MAX;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now.saturating_sub(last)
}
pub async fn refill_connection_pool(&self) -> Result<usize, ClickhouseError> {
let pool = self.get_pool();
let status = pool.status();
let current_total = status.size;
let target_total = self.config.clickhouse.max_connections as usize;
let deficit = target_total.saturating_sub(current_total);
if deficit == 0 {
log::info!("Deficit = 0");
return Ok(0);
}
let to_add = deficit;
log::info!("Attempting to add {} new connections to pool", to_add);
let mut added = 0;
for i in 0..to_add {
match pool.get_connection().await {
Ok(conn) => match conn.health_check().await {
Ok(_) => {
added += 1;
}
Err(e) => {
log::warn!("New connection failed health check: {}", e);
}
},
Err(e) => {
log::error!(
"Failed to create new connection {}/{}: {}",
i + 1,
to_add,
e
);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
log::info!("Added {}/{} new connections to pool", added, to_add);
Ok(added)
}
pub async fn recycle_idle_connections(
&mut self,
max_to_recycle: usize,
) -> Result<usize, ClickhouseError> {
log::info!(
"Starting connection recycling - checking up to {} connections",
max_to_recycle
);
let start = std::time::Instant::now();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.last_recycle_time = now;
let status = self.pool.status();
log::info!("Clickhouse pool metrics: {:?}", status);
let to_check = std::cmp::min(max_to_recycle, status.available);
if to_check == 0 {
log::info!("No connections available for recycling");
return Ok(0);
}
log::info!(
"Checking {} connections out of {} available",
to_check,
status.available
);
let mut recycled = 0;
for _ in 0..to_check {
match self.pool.get_connection().await {
Ok(conn) => match conn.health_check().await {
Ok(_) => {
log::debug!(
"Connection [id: {}] is healthy, returning to pool",
conn.id()
);
}
Err(e) => {
log::warn!(
"Connection [id: {}] failed health check: {}, will be recycled",
conn.id(),
e
);
recycled += 1;
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_connections_recycled_total",
&["health_check_failed"],
);
}
}
},
Err(e) => {
log::error!("Failed to get connection for health check: {}", e);
}
}
}
let duration = start.elapsed();
log::info!(
"Connection recycling complete: recycled={} in {:?}",
recycled,
duration
);
if let Some(metrics) = &self.metrics {
metrics.set_gauge_vec_mut(
"clickhouse_connection_recycling_seconds",
&["total"],
duration.as_secs_f64(),
);
}
Ok(recycled)
}
pub async fn execute_with_retry(&self, query: &str) -> Result<(), ClickhouseError> {
let mut attempt = 0;
let max_retries = self.config.retry.max_retries;
loop {
attempt += 1;
let conn = match self.pool.get_connection().await {
Ok(conn) => conn,
Err(e) => {
if attempt > max_retries {
log::error!("Failed to get connection after {} attempts: {}", attempt, e);
return Err(e);
}
let backoff = self.config.retry.backoff_duration(attempt);
log::warn!(
"Failed to get connection (attempt {}/{}), retrying in {:?}: {}",
attempt,
max_retries,
backoff,
e
);
tokio::time::sleep(backoff).await;
continue;
}
};
match conn.query(query).execute().await {
Ok(response) => {
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_query_success_total",
&[get_query_type(query)],
);
}
return Ok(response);
}
Err(e) => {
if attempt > max_retries {
log::error!("Query failed after {} attempts: {}", attempt, e);
return Err(ClickhouseError::Client(e));
}
let backoff = self.config.retry.backoff_duration(attempt);
log::warn!(
"Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}",
attempt,
max_retries,
backoff,
e,
query
);
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_query_retries_total",
&[get_query_type(query)],
);
}
tokio::time::sleep(backoff).await;
}
}
}
}
pub async fn execute_select_with_retry<T>(&self, query: &str) -> Result<Vec<T>, ClickhouseError>
where
T: clickhouse::Row + DeserializeOwned + Send + 'static,
{
let mut attempt = 0;
let max_retries = self.config.retry.max_retries;
loop {
attempt += 1;
let conn = match self.pool.get_connection().await {
Ok(conn) => conn,
Err(e) => {
if attempt > max_retries {
log::error!("Failed to get connection after {} attempts: {}", attempt, e);
return Err(e);
}
let backoff = self.config.retry.backoff_duration(attempt);
log::warn!(
"Failed to get connection (attempt {}/{}), retrying in {:?}: {}",
attempt,
max_retries,
backoff,
e
);
tokio::time::sleep(backoff).await;
continue;
}
};
match conn.query(query).fetch_all::<T>().await {
Ok(response) => {
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_query_success_total",
&[get_query_type(query)],
);
}
return Ok(response);
}
Err(e) => {
if attempt > max_retries {
log::error!("Query failed after {} attempts: {}", attempt, e);
return Err(ClickhouseError::Client(e));
}
let backoff = self.config.retry.backoff_duration(attempt);
log::warn!(
"Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}",
attempt,
max_retries,
backoff,
e,
query
);
if let Some(metrics) = &self.metrics {
metrics.inc_int_counter_vec_mut(
"clickhouse_query_retries_total",
&[get_query_type(query)],
);
}
tokio::time::sleep(backoff).await;
}
}
}
}
pub fn create_batch_processor<M>(
&self,
batch_size: usize,
max_wait_ms: u64,
) -> BatchSender<M::T>
where
M: Model + Send + Sync + 'static,
M::T: Clone + Send + 'static,
{
let (tx, mut rx) = mpsc::channel(1000);
let pool_manager = self.clone();
tokio::spawn(async move {
let mut batch = Vec::with_capacity(batch_size);
let mut last_flush = Instant::now();
let mut flush_interval = interval(Duration::from_millis(100));
loop {
tokio::select! {
cmd = rx.recv() => match cmd {
Some(BatchCommand::Add(item)) => {
batch.push(item);
if batch.len() >= batch_size {
Self::process_batch::<M>(&pool_manager, &mut batch).await;
last_flush = Instant::now();
}
},
Some(BatchCommand::Flush) => {
if !batch.is_empty() {
Self::process_batch::<M>(&pool_manager, &mut batch).await;
last_flush = Instant::now();
}
},
None => break,
},
_ = flush_interval.tick() => {
if !batch.is_empty() && last_flush.elapsed() >= Duration::from_millis(max_wait_ms) {
Self::process_batch::<M>(&pool_manager, &mut batch).await;
last_flush = Instant::now();
}
}
}
}
if !batch.is_empty() {
Self::process_batch::<M>(&pool_manager, &mut batch).await;
}
});
BatchSender { tx }
}
async fn process_batch<M>(pool_manager: &PoolManager, batch: &mut Vec<M::T>)
where
M: Model,
M::T: Clone,
{
if batch.is_empty() {
return;
}
let items = std::mem::take(batch);
let query = M::batch_insert_query(&items);
match pool_manager.execute_with_retry(&query).await {
Ok(_) => {
log::info!(
"Successfully inserted {} items into {}",
items.len(),
M::table_name()
);
}
Err(e) => {
log::error!("Error inserting batch into {}: {}", M::table_name(), e);
batch.extend(items);
}
}
}
}

View File

@ -0,0 +1,15 @@
pub trait Model {
type T;
fn create_table_sql() -> &'static str;
fn table_name() -> &'static str;
fn column_names() -> Vec<&'static str>;
fn to_row(&self) -> (Vec<&'static str>, Vec<String>);
fn insert_query(&self) -> String;
fn batch_insert_query(items: &[Self::T]) -> String;
fn build_select_query(
where_clause: Option<&str>,
limit: Option<u64>,
offset: Option<u64>,
) -> String;
}

View File

@ -5,10 +5,10 @@ token = ""
prefix = "!"
[persistence]
host = "localhost"
port = 9000
host = "127.0.0.1"
port = 8123
user = "default"
password = "default"
password = "password"
database = "default"