diff --git a/.vscode/settings.json b/.vscode/settings.json index 51ddb8d..0a042bb 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,4 +10,10 @@ "cSpell.language": "en,fr", "gitlens.plusFeatures.enabled": false, "editor.guides.bracketPairs": true, + "cSpell.words": [ + "clickhouse", + "datalake", + "uuid", + "Uuid" + ], } diff --git a/Cargo.lock b/Cargo.lock index d3f5cd7..02d0c0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,194 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "actix-codec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" +dependencies = [ + "bitflags 2.9.1", + "bytes", + "futures-core", + "futures-sink", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-http" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44dfe5c9e0004c623edc65391dfd51daa201e7e30ebd9c9bedf873048ec32bc2" +dependencies = [ + "actix-codec", + "actix-rt", + "actix-service", + "actix-utils", + "base64 0.22.1", + "bitflags 2.9.1", + "bytes", + "bytestring", + "derive_more", + "encoding_rs", + "foldhash", + "futures-core", + "http 0.2.12", + "httparse", + "httpdate", + "itoa", + "language-tags", + "local-channel", + "mime", + "percent-encoding", + "pin-project-lite", + "rand 0.9.1", + "sha1", + "smallvec", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" +dependencies = [ + "quote", + "syn 2.0.101", +] + +[[package]] +name = "actix-router" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8" +dependencies = [ + "bytestring", + "cfg-if", + "http 0.2.12", + "regex-lite", + "serde", + "tracing", +] + +[[package]] +name = "actix-rt" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" +dependencies = [ + "futures-core", + "tokio", +] + +[[package]] +name = "actix-server" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a65064ea4a457eaf07f2fba30b4c695bf43b721790e9530d26cb6f9019ff7502" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "futures-core", + "futures-util", + "mio", + "socket2", + "tokio", + "tracing", +] + +[[package]] +name = "actix-service" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e46f36bf0e5af44bdc4bdb36fbbd421aa98c79a9bce724e1edeb3894e10dc7f" +dependencies = [ + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "actix-utils" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" +dependencies = [ + "local-waker", + "pin-project-lite", +] + +[[package]] +name = "actix-web" +version = "4.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a597b77b5c6d6a1e1097fddde329a83665e25c5437c696a3a9a4aa514a614dea" +dependencies = [ + "actix-codec", + "actix-http", + "actix-macros", + "actix-router", + "actix-rt", + "actix-server", + "actix-service", + "actix-utils", + "actix-web-codegen", + "bytes", + "bytestring", + "cfg-if", + "derive_more", + "encoding_rs", + "foldhash", + "futures-core", + "futures-util", + "impl-more", + "itoa", + "language-tags", + "log", + "mime", + "once_cell", + "pin-project-lite", + "regex-lite", + "serde", + "serde_json", + "serde_urlencoded", + "smallvec", + "socket2", + "time", + "tracing", + "url", +] + +[[package]] +name = "actix-web-codegen" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591380e2e68490b5dfaf1dd1aa0ebe78d84ba7067078512b4ea6e4492d622b8" +dependencies = [ + "actix-router", + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "actix-web-prom" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9df3127d20a5d01c9fc9aceb969a38d31a6767e1b48a54d55a8f56c769a84923" +dependencies = [ + "actix-web", + "futures-core", + "pin-project-lite", + "prometheus", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -213,12 +401,25 @@ dependencies = [ name = "bot" version = "0.1.0" dependencies = [ + "chrono", + "clickhouse", + "clickhouse_pool", "poise", "serde", "tokio", "toml", "tool_tracing", "tracing", + "uuid", +] + +[[package]] +name = "bstr" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4" +dependencies = [ + "memchr", ] [[package]] @@ -245,6 +446,15 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "bytestring" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e465647ae23b2823b0753f50decb2d5a86d2bb2cac04788fafd1f80e45378e5f" +dependencies = [ + "bytes", +] + [[package]] name = "camino" version = "1.1.9" @@ -299,11 +509,78 @@ checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] +[[package]] +name = "cityhash-rs" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" + +[[package]] +name = "clickhouse" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9894248c4c5a4402f76a56c273836a0c32547ec8a68166aedee7e01b7b8d102" +dependencies = [ + "bstr", + "bytes", + "chrono", + "cityhash-rs", + "clickhouse-derive", + "futures", + "futures-channel", + "http-body-util", + "hyper 1.6.0", + "hyper-tls", + "hyper-util", + "lz4_flex", + "replace_with", + "sealed", + "serde", + "static_assertions", + "thiserror 1.0.69", + "tokio", + "url", + "uuid", +] + +[[package]] +name = "clickhouse-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d70f3e2893f7d3e017eeacdc9a708fbc29a10488e3ebca21f9df6a5d2b616dbb" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.101", +] + +[[package]] +name = "clickhouse_pool" +version = "0.1.0" +dependencies = [ + "actix-web-prom", + "anyhow", + "clickhouse", + "deadpool", + "futures", + "futures-util", + "log", + "prometheus", + "serde", + "serde_derive", + "thiserror 1.0.69", + "tokio", + "url", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -418,6 +695,23 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "deadpool" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ed5957ff93768adf7a65ab167a17835c3d2c3c50d084fe305174c112f468e2f" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "deranged" version = "0.4.0" @@ -439,6 +733,27 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" @@ -481,6 +796,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7" +dependencies = [ + "serde", + "typeid", +] + [[package]] name = "errno" version = "0.3.12" @@ -522,6 +847,27 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -539,6 +885,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -739,6 +1086,12 @@ version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "http" version = "0.2.12" @@ -879,6 +1232,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.11" @@ -1036,6 +1405,12 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "impl-more" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a5a9a0ff0086c7a148acb942baaabeadf9504d10400b5a05645853729b9cd2" + [[package]] name = "indexmap" version = "1.9.3" @@ -1087,6 +1462,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "language-tags" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" + [[package]] name = "lazy_static" version = "1.5.0" @@ -1111,6 +1492,23 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" +[[package]] +name = "local-channel" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8" +dependencies = [ + "futures-core", + "futures-sink", + "local-waker", +] + +[[package]] +name = "local-waker" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487" + [[package]] name = "lock_api" version = "0.4.12" @@ -1126,6 +1524,16 @@ name = "log" version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +dependencies = [ + "serde", + "value-bag", +] + +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" [[package]] name = "matchers" @@ -1195,10 +1603,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1224,6 +1650,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.36.7" @@ -1239,6 +1675,50 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "openssl" +version = "0.10.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-sys" +version = "0.9.108" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e145e1651e858e820e4860f7b9c5e169bc1d8ce1c86043be79fa7b7634821847" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.29.1" @@ -1386,6 +1866,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "poise" version = "0.6.1" @@ -1448,6 +1934,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 1.0.69", +] + [[package]] name = "prost" version = "0.13.5" @@ -1471,6 +1972,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "pulldown-cmark" version = "0.9.6" @@ -1597,6 +2104,12 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -1609,6 +2122,12 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "replace_with" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884" + [[package]] name = "reqwest" version = "0.11.27" @@ -1809,6 +2328,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1825,6 +2353,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sealed" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "secrecy" version = "0.8.0" @@ -1835,6 +2374,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.9.1", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.26" @@ -1873,6 +2435,26 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "serde_fmt" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4" +dependencies = [ + "serde", +] + [[package]] name = "serde_json" version = "1.0.140" @@ -1976,6 +2558,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "skeptic" version = "0.13.7" @@ -2022,6 +2613,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" @@ -2034,6 +2631,84 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "sval" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc9739f56c5d0c44a5ed45473ec868af02eb896af8c05f616673a31e1d1bb09" + +[[package]] +name = "sval_buffer" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f39b07436a8c271b34dad5070c634d1d3d76d6776e938ee97b4a66a5e8003d0b" +dependencies = [ + "sval", + "sval_ref", +] + +[[package]] +name = "sval_dynamic" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffcb072d857431bf885580dacecf05ed987bac931230736739a79051dbf3499b" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_fmt" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f214f427ad94a553e5ca5514c95c6be84667cbc5568cce957f03f3477d03d5c" +dependencies = [ + "itoa", + "ryu", + "sval", +] + +[[package]] +name = "sval_json" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ed34b32e638dec9a99c8ac92d0aa1220d40041026b625474c2b6a4d6f4feb" +dependencies = [ + "itoa", + "ryu", + "sval", +] + +[[package]] +name = "sval_nested" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14bae8fcb2f24fee2c42c1f19037707f7c9a29a0cda936d2188d48a961c4bb2a" +dependencies = [ + "sval", + "sval_buffer", + "sval_ref", +] + +[[package]] +name = "sval_ref" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a4eaea3821d3046dcba81d4b8489421da42961889902342691fb7eab491d79e" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_serde" +version = "2.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "172dd4aa8cb3b45c8ac8f3b4111d644cd26938b0643ede8f93070812b87fb339" +dependencies = [ + "serde", + "sval", + "sval_nested", +] + [[package]] name = "syn" version = "1.0.109" @@ -2223,7 +2898,9 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -2240,6 +2917,16 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -2581,6 +3268,12 @@ dependencies = [ "utf-8", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typemap_rev" version = "0.3.0" @@ -2634,6 +3327,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -2664,12 +3363,64 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "uuid" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +dependencies = [ + "getrandom 0.3.3", + "serde", +] + [[package]] name = "valuable" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "value-bag" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943ce29a8a743eb10d6082545d861b24f9d1b160b7d741e0f2cdf726bec909c5" +dependencies = [ + "value-bag-serde1", + "value-bag-sval2", +] + +[[package]] +name = "value-bag-serde1" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35540706617d373b118d550d41f5dfe0b78a0c195dc13c6815e92e2638432306" +dependencies = [ + "erased-serde", + "serde", + "serde_fmt", +] + +[[package]] +name = "value-bag-sval2" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe7e140a2658cc16f7ee7a86e413e803fc8f9b5127adc8755c19f9fefa63a52" +dependencies = [ + "sval", + "sval_buffer", + "sval_dynamic", + "sval_fmt", + "sval_json", + "sval_ref", + "sval_serde", +] + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 3d5759a..66ab975 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = '2' -members = ['apps/bot', 'libs/tool_tracing'] +members = ['apps/bot', 'libs/tool_tracing', 'libs/clickhouse_pool'] [workspace.dependencies] poise = '0.6.1' @@ -12,7 +12,8 @@ tokio = { version = '1.45.0', features = [ ] } serde = '1.0' tracing = '0.1' -serde_json = "1.0" +serde_json = '1.0' +clickhouse = { version = "0.13", features = ["native-tls", "uuid", "chrono"] } [profile.release] lto = true diff --git a/Taskfile.yaml b/Taskfile.yaml index b5699bd..a09c938 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -6,11 +6,12 @@ tasks: silent: true cmds: - echo "Starting Db and Jaeger..." - - docker-compose up -d database jaeger + - podman compose up -d database jaeger dbgate - echo "Running Rust server..." - cargo run stop: desc: Stop the Rust server + silent: true cmds: - echo "Stopping Db and Jaeger..." - - docker-compose down \ No newline at end of file + - podman compose down \ No newline at end of file diff --git a/apps/bot/Cargo.toml b/apps/bot/Cargo.toml index c907a0b..eba956d 100644 --- a/apps/bot/Cargo.toml +++ b/apps/bot/Cargo.toml @@ -11,5 +11,10 @@ serde = { workspace = true } tracing = { workspace = true } tool_tracing = { path = "../../libs/tool_tracing" } toml = "0.8" +clickhouse_pool = { path = "../../libs/clickhouse_pool" } +clickhouse = { workspace = true } +uuid = { version = "1.16", features = ["serde", "v4"] } +chrono = { version = "0.4.41", features = ["serde"] } + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/apps/bot/src/main.rs b/apps/bot/src/main.rs index 7340f85..8953447 100644 --- a/apps/bot/src/main.rs +++ b/apps/bot/src/main.rs @@ -1,9 +1,10 @@ use config::parse_local_config; use poise::serenity_prelude as serenity; -use tracing::info; +use tracing::{error, info}; pub mod config; pub mod dotenv; +pub mod model; struct Data {} // User data, which is stored and accessible in all command invocations type Error = Box; @@ -24,9 +25,22 @@ async fn age( #[tokio::main] async fn main() { let config = parse_local_config(); - tool_tracing::init::init_tracing(config.tracing.clone(), config.bot_name.clone()); + info!("Init Database"); + let datalake_config = model::create_pool_manager(config.persistence.clone()).unwrap(); + let _manager = match model::create_manager_and_init(datalake_config).await { + Ok(manager) => { + info!("Database manager created successfully"); + manager + } + Err(e) => { + error!("Failed to create database manager: {}", e); + return; + } + }; + info!("Database manager initialized successfully"); + info!("Starting bot {}", config.bot_name); let intents = serenity::GatewayIntents::non_privileged(); diff --git a/apps/bot/src/model/mod.rs b/apps/bot/src/model/mod.rs new file mode 100644 index 0000000..c321b79 --- /dev/null +++ b/apps/bot/src/model/mod.rs @@ -0,0 +1,56 @@ +use std::{error::Error, sync::Arc}; + +pub mod trivial; + +use clickhouse_pool::{ + config::{ClickhouseConfig, DatalakeConfig, RetryConfig}, + pool_manager::PoolManager, + traits::Model, +}; +use tracing::{error, info, instrument}; +use trivial::Trivial; + +use crate::config::PersistenceConfig; + +pub fn create_pool_manager(db_config: PersistenceConfig) -> Result, String> { + let mut config = ClickhouseConfig::new( + db_config.host.clone(), + db_config.port, + db_config.database.clone(), + db_config.user.clone(), + db_config.password.clone(), + 10, + 30, + 5, + ); + + config.proto = "http".to_string(); + + let retry_config = RetryConfig::default(); + + let datalake_config = Arc::new(DatalakeConfig::new(config, retry_config)); + + Ok(datalake_config) +} + +#[instrument(skip(datalake_config))] +pub async fn create_manager_and_init( + datalake_config: Arc, +) -> Result, Box> { + let manager = PoolManager::new(datalake_config, None).await; + + let _ = match manager + .execute_with_retry(Trivial::create_table_sql()) + .await + { + Ok(_) => { + info!("Table {} created successfully", Trivial::table_name()); + } + Err(e) => { + error!("Failed to create table {} : {}", Trivial::table_name(), e); + return Err(Box::new(e)); + } + }; + + Ok(Arc::new(manager)) +} diff --git a/apps/bot/src/model/trivial.rs b/apps/bot/src/model/trivial.rs new file mode 100644 index 0000000..5fb6af9 --- /dev/null +++ b/apps/bot/src/model/trivial.rs @@ -0,0 +1,158 @@ +use chrono::{DateTime, Utc}; +use clickhouse::Row; +use clickhouse_pool::traits::Model; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum TrivialRewardKind { + OnlyTheFirstOne = 0, + TopThree = 1, + TopFive = 2, +} + +#[derive(Debug, Clone, PartialEq, PartialOrd, Row, Serialize, Deserialize)] +pub struct Trivial { + #[serde(with = "clickhouse::serde::uuid")] + pub id: Uuid, + pub name: String, + pub description: String, + + pub guild_id: u64, + pub channel_id: u64, + + #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] + pub created_at: DateTime, + #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] + pub updated_at: DateTime, + + pub creator_id: u64, + pub updater_id: u64, + + pub random_question: bool, + pub role_ping: u64, + pub role_ping_enabled: bool, + pub reward_kind: TrivialRewardKind, + pub reward_amount: u64, + pub taken_into_account: bool, +} + +impl Model for Trivial { + type T = Trivial; + + fn table_name() -> &'static str { + "trivial" + } + + fn create_table_sql() -> &'static str { + r#" + CREATE TABLE IF NOT EXISTS trivial ( + id UUID PRIMARY KEY, + name String, + description String, + guild_id UInt64, + channel_id UInt64, + created_at DateTime64(3), + updated_at DateTime64(3), + creator_id UInt64, + updater_id UInt64, + random_question Bool, + role_ping UInt64, + role_ping_enabled Bool, + reward_kind Enum8('OnlyTheFirstOne' = 0, 'TopThree' = 1, 'TopFive' = 2), + reward_amount UInt64, + taken_into_account Bool + ) ENGINE = MergeTree() + ORDER BY id + "# + } + + fn column_names() -> Vec<&'static str> { + vec![ + "id", + "name", + "description", + "guild_id", + "channel_id", + "created_at", + "updated_at", + "creator_id", + "updater_id", + "random_question", + "role_ping", + "role_ping_enabled", + "reward_kind", + "reward_amount", + "taken_into_account", + ] + } + + fn to_row(&self) -> (Vec<&'static str>, Vec) { + ( + Self::column_names(), + vec![ + self.id.to_string(), + self.name.clone(), + self.description.clone(), + self.guild_id.to_string(), + self.channel_id.to_string(), + self.created_at.to_string(), + self.updated_at.to_string(), + self.creator_id.to_string(), + self.updater_id.to_string(), + self.random_question.to_string(), + self.role_ping.to_string(), + self.role_ping_enabled.to_string(), + format!("{:?}", self.reward_kind), + self.reward_amount.to_string(), + self.taken_into_account.to_string(), + ], + ) + } + + fn insert_query(&self) -> String { + let (columns, values) = self.to_row(); + let columns_str = columns.join(", "); + let values_str = values.join(", "); + format!( + "INSERT INTO {} ({}) VALUES ({})", + Self::table_name(), + columns_str, + values_str + ) + } + + fn batch_insert_query(items: &[Self::T]) -> String { + let mut queries = Vec::new(); + for item in items { + let (columns, values) = item.to_row(); + let columns_str = columns.join(", "); + let values_str = values.join(", "); + queries.push(format!( + "INSERT INTO {} ({}) VALUES ({})", + Self::table_name(), + columns_str, + values_str + )); + } + queries.join("; ") + } + + fn build_select_query( + where_clause: Option<&str>, + limit: Option, + offset: Option, + ) -> String { + let mut query = format!("SELECT * FROM {}", Self::table_name()); + if let Some(where_clause) = where_clause { + query.push_str(&format!(" WHERE {}", where_clause)); + } + if let Some(limit) = limit { + query.push_str(&format!(" LIMIT {}", limit)); + } + if let Some(offset) = offset { + query.push_str(&format!(" OFFSET {}", offset)); + } + query + } +} diff --git a/compose.yaml b/compose.yaml index 5f6938f..1900c06 100644 --- a/compose.yaml +++ b/compose.yaml @@ -3,7 +3,8 @@ services: image: clickhouse/clickhouse-server:latest ports: - 8123:8123 - - 9000:9000 + - 9000:9000/tcp + - 9000:9000/udp ulimits: nofile: soft: 262144 @@ -12,6 +13,7 @@ services: CLICKHOUSE_USER: default CLICKHOUSE_PASSWORD: password CLICKHOUSE_DB: default + CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1 jaeger: image: jaegertracing/jaeger:${JAEGER_VERSION:-2.6.0} ports: @@ -22,3 +24,25 @@ services: - "9411:9411" environment: - LOG_LEVEL=debug + dbgate: + image: dbgate/dbgate:latest + user: "1000" + ports: + - 3000:3000 + environment: + CONNECTIONS: click + LABEL_click: ClickHouse + SERVER_click: database + USER_click: default + PASSWORD_click: password + PORT_click: 9000 + ENGINE_click: clickhouse@dbgate-plugin-clickhouse + DATABASE_click: default + depends_on: + - database + volumes: + - dbgate-data:/data + +volumes: + dbgate-data: + driver: local diff --git a/libs/clickhouse_pool/Cargo.toml b/libs/clickhouse_pool/Cargo.toml new file mode 100644 index 0000000..16aabb9 --- /dev/null +++ b/libs/clickhouse_pool/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "clickhouse_pool" +version = "0.1.0" +edition = "2021" + +[dependencies] +clickhouse = { workspace = true } +deadpool = "0.12.2" + +tokio = { workspace = true } +futures = "0.3.17" +futures-util = "0.3" + +serde = { workspace = true } +serde_derive = "1.0" +log = { version = "^0.4.21", features = ["kv_serde"] } +prometheus = "0.13" +actix-web-prom = "0.6" +url = "2.5.2" +anyhow = "1.0" +thiserror = "1.0.31" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/libs/clickhouse_pool/project.json b/libs/clickhouse_pool/project.json new file mode 100644 index 0000000..935b9de --- /dev/null +++ b/libs/clickhouse_pool/project.json @@ -0,0 +1,43 @@ +{ + "name": "clickhouse_pool", + "$schema": "../../node_modules/nx/schemas/project-schema.json", + "projectType": "library", + "sourceRoot": "libs/clickhouse_pool/src", + "targets": { + "build": { + "executor": "@monodon/rust:check", + "outputs": [ + "{options.target-dir}" + ], + "options": { + "target-dir": "dist/target/clickhouse_pool" + } + }, + "test": { + "cache": true, + "executor": "@monodon/rust:test", + "outputs": [ + "{options.target-dir}" + ], + "options": { + "target-dir": "dist/target/clickhouse_pool" + }, + "configurations": { + "production": { + "release": true + } + } + }, + "lint": { + "cache": true, + "executor": "@monodon/rust:lint", + "outputs": [ + "{options.target-dir}" + ], + "options": { + "target-dir": "dist/target/clickhouse_pool" + } + } + }, + "tags": [] +} diff --git a/libs/clickhouse_pool/readme.md b/libs/clickhouse_pool/readme.md new file mode 100644 index 0000000..fd1744a --- /dev/null +++ b/libs/clickhouse_pool/readme.md @@ -0,0 +1,5 @@ +# Clickhouse_pool + +This libs is a fork from + +Lot's of thing will move !! diff --git a/libs/clickhouse_pool/src/batch_processor.rs b/libs/clickhouse_pool/src/batch_processor.rs new file mode 100644 index 0000000..4fbfe10 --- /dev/null +++ b/libs/clickhouse_pool/src/batch_processor.rs @@ -0,0 +1,36 @@ +use tokio::sync::mpsc; + +use crate::pool::ClickhouseError; + +pub enum BatchCommand { + Add(T), + Flush, +} + +pub struct BatchSender { + pub tx: mpsc::Sender>, +} + +impl BatchSender { + pub async fn add(&self, item: T) -> Result<(), ClickhouseError> { + self.tx + .send(BatchCommand::Add(item)) + .await + .map_err(|_| ClickhouseError::BatchInsertionError("Channel closed".to_string())) + } + + pub async fn flush(&self) -> Result<(), ClickhouseError> { + self.tx + .send(BatchCommand::Flush) + .await + .map_err(|_| ClickhouseError::BatchInsertionError("Channel closed".to_string())) + } +} + +impl Clone for BatchSender { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + } + } +} diff --git a/libs/clickhouse_pool/src/config.rs b/libs/clickhouse_pool/src/config.rs new file mode 100644 index 0000000..fc221c2 --- /dev/null +++ b/libs/clickhouse_pool/src/config.rs @@ -0,0 +1,182 @@ +use serde_derive::{Deserialize, Serialize}; +use std::{sync::Arc, time::Duration}; +use url::Url; + +pub const CONNECTION_TIMEOUT_SECONDS_DEFAULT: u64 = 3; +pub const QUERY_TIMEOUT_SECONDS_DEFAULT: u64 = 10; +pub const POOL_MAX_CONNECTIONS_DEFAULT: u32 = 256; +pub const PORT_DEFAULT: u16 = 9000; + +pub const MAX_RETRIES_DEFAULT: u32 = 5; +pub const INITIAL_BACKOFF_MS_DEFAULT: u64 = 100; +pub const MAX_BACKOFF_MS_DEFAULT: u64 = 10_000; +pub const BACKOFF_MULTIPLIER_DEFAULT: f64 = 2.0; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClickhouseConfig { + /// Database host + pub host: String, + + /// Database protocol + pub proto: String, + + /// Database port + pub port: u16, + + /// Database name + pub database: String, + + /// Username for authentication + pub username: String, + + /// Password for authentication + pub password: String, + + /// Connection timeout in seconds + #[serde(default)] + pub connect_timeout_seconds: u64, + + /// Query timeout in seconds + #[serde(default)] + pub query_timeout_seconds: u64, + + /// Max. number of connections in the pool + #[serde(default)] + pub max_connections: u32, +} + +impl Default for ClickhouseConfig { + fn default() -> Self { + Self { + host: "localhost".to_string(), + port: PORT_DEFAULT, + database: "default".to_string(), + username: "default".to_string(), + password: String::new(), + connect_timeout_seconds: CONNECTION_TIMEOUT_SECONDS_DEFAULT, + query_timeout_seconds: QUERY_TIMEOUT_SECONDS_DEFAULT, + max_connections: POOL_MAX_CONNECTIONS_DEFAULT, + proto: "https".to_string(), + } + } +} + +impl ClickhouseConfig { + pub fn new( + host: String, + port: u16, + database: String, + username: String, + password: String, + connect_timeout_seconds: u64, + query_timeout_seconds: u64, + max_connections: u32, + ) -> Self { + Self { + host, + port, + database, + username, + password, + connect_timeout_seconds, + query_timeout_seconds, + max_connections, + proto: "https".to_string(), + } + } + + pub fn proto_prefix(&self) -> String { + format!("{}://", self.proto) + } + + pub fn connection_url(&self) -> Url { + let url_str = format!("{}{}:{}", self.proto_prefix(), self.host, self.port); + + Url::parse(&url_str).expect("Failed to parse Clickhouse URL") + } + + pub fn authenticated_connection_url(&self) -> Url { + let url_str = format!( + "{}{}:{}@{}:{}", + self.proto_prefix(), + self.username, + self.password, + self.host, + self.port, + ); + + Url::parse(&url_str).expect("Failed to parse authenticated Clickhouse URL") + } + + pub fn connect_timeout(&self) -> Duration { + Duration::from_secs(self.connect_timeout_seconds) + } + + pub fn query_timeout(&self) -> Duration { + Duration::from_secs(self.query_timeout_seconds) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RetryConfig { + /// Max number of retry attempts + pub max_retries: u32, + + /// Initial backoff duration in milliseconds + pub initial_backoff_ms: u64, + + /// Max backoff duration in milliseconds + pub max_backoff_ms: u64, + + /// Backoff multiplier for exponential backoff + pub backoff_multiplier: f64, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: MAX_RETRIES_DEFAULT, + initial_backoff_ms: INITIAL_BACKOFF_MS_DEFAULT, + max_backoff_ms: MAX_BACKOFF_MS_DEFAULT, + backoff_multiplier: BACKOFF_MULTIPLIER_DEFAULT, + } + } +} + +impl RetryConfig { + pub fn backoff_duration(&self, attempt: u32) -> Duration { + if attempt == 0 { + return Duration::from_millis(0); + } + + let backoff_ms = (self.initial_backoff_ms as f64 + * self.backoff_multiplier.powi(attempt as i32 - 1)) as u64; + let capped_backoff_ms = std::cmp::min(backoff_ms, self.max_backoff_ms); + + Duration::from_millis(capped_backoff_ms) + } +} + +#[derive(Debug, Clone)] +pub struct DatalakeConfig { + pub clickhouse: Arc, + pub retry: Arc, +} + +impl Default for DatalakeConfig { + fn default() -> Self { + Self { + clickhouse: Arc::new(ClickhouseConfig::default()), + retry: Arc::new(RetryConfig::default()), + } + } +} + +impl DatalakeConfig { + pub fn new(clickhouse_config: ClickhouseConfig, retry: RetryConfig) -> Self { + Self { + clickhouse: Arc::new(clickhouse_config), + retry: Arc::new(retry), + } + } +} diff --git a/libs/clickhouse_pool/src/lib.rs b/libs/clickhouse_pool/src/lib.rs new file mode 100644 index 0000000..4b31ed1 --- /dev/null +++ b/libs/clickhouse_pool/src/lib.rs @@ -0,0 +1,6 @@ +pub mod batch_processor; +pub mod config; +pub mod metrics; +pub mod pool; +pub mod pool_manager; +pub mod traits; diff --git a/libs/clickhouse_pool/src/metrics.rs b/libs/clickhouse_pool/src/metrics.rs new file mode 100644 index 0000000..fa3aed0 --- /dev/null +++ b/libs/clickhouse_pool/src/metrics.rs @@ -0,0 +1,277 @@ +use actix_web_prom::{PrometheusMetrics, PrometheusMetricsBuilder}; +use prometheus::{CounterVec, GaugeVec, IntCounter, IntCounterVec, IntGaugeVec, Opts}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Prometheus(#[from] prometheus::Error), +} + +#[derive(Debug, Default, Clone)] +pub enum Kind { + #[default] + Default, + GaugeVec, + CounterVec, + IntGaugeVec, + IntCounterVec, + IntCounter, +} + +#[derive(Debug, Default, Clone)] +pub struct MetricConfig<'a> { + pub kind: Kind, + pub name: &'a str, + pub help: &'a str, + pub label_names: &'a [&'a str], +} + +pub type SharedRegistrar = Arc; + +/// An abstracted metrics registrar for Prometheus. +#[derive(Clone)] +pub struct Registrar { + prometheus: Arc, + int_counters_vecs: Arc>>, + int_counters: Arc>>, + int_gauges_vecs: Arc>>, + counters_vecs: Arc>>, + gauges_vecs: Arc>>, +} + +impl std::fmt::Debug for Registrar { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Registrar") + .field("prometheus", &format!("{:?}", self.prometheus.registry)) + .field( + "int_counters_vecs", + &format!("{:?}", self.int_counters_vecs), + ) + .field("int_gauges_vecs", &format!("{:?}", self.int_gauges_vecs)) + .field("counters_vecs", &format!("{:?}", self.counters_vecs)) + .field("gauges_vecs", &format!("{:?}", self.gauges_vecs)) + .finish() + } +} + +impl Default for Registrar { + fn default() -> Self { + Self { + prometheus: Arc::new(PrometheusMetricsBuilder::new("default").build().unwrap()), + int_counters_vecs: Default::default(), + int_counters: Default::default(), + int_gauges_vecs: Default::default(), + counters_vecs: Default::default(), + gauges_vecs: Default::default(), + } + } +} + +pub trait Registry { + fn with_metric_configs<'a>(&self, metrics: &'a [MetricConfig<'a>]) -> Result<(), Error>; + fn with_metric_config<'a>(&self, metric: &'a MetricConfig<'a>) -> Result<(), Error>; +} + +impl Registry for Registrar { + fn with_metric_config<'a>(&self, metric: &'a MetricConfig<'a>) -> Result<(), Error> { + log::info!( + "Attempting to register metric with name {:?} and labels {:?}", + metric.name, + metric.label_names + ); + match &metric.kind { + Kind::Default => panic!("Registrar metric kind is `default`, so panicking."), + Kind::GaugeVec => { + let gauge = + GaugeVec::new(Opts::new(metric.name, metric.help), metric.label_names).unwrap(); + match self.prometheus.registry.register(Box::new(gauge.clone())) { + Ok(()) => { + self.gauges_vecs + .write() + .unwrap() + .insert(metric.name.to_string(), gauge); + Ok(()) + } + Err(e) => { + if let prometheus::Error::AlreadyReg = e { + log::info!("Metric {:?} is already registered.", metric.name); + Ok(()) + } else { + log::error!("Failed to register metric {:?}. {:?}", metric.name, e); + Err(Error::Prometheus(e)) + } + } + } + } + Kind::CounterVec => { + let counter = + CounterVec::new(Opts::new(metric.name, metric.help), metric.label_names) + .unwrap(); + match self.prometheus.registry.register(Box::new(counter.clone())) { + Ok(()) => { + self.counters_vecs + .write() + .unwrap() + .insert(metric.name.to_string(), counter); + Ok(()) + } + Err(e) => { + if let prometheus::Error::AlreadyReg = e { + log::info!("Metric {:?} is already registered.", metric.name); + Ok(()) + } else { + log::error!("Failed to register metric {:?}. {:?}", metric.name, e); + Err(Error::Prometheus(e)) + } + } + } + } + Kind::IntGaugeVec => { + let gauge = + IntGaugeVec::new(Opts::new(metric.name, metric.help), metric.label_names) + .unwrap(); + match self.prometheus.registry.register(Box::new(gauge.clone())) { + Ok(()) => { + self.int_gauges_vecs + .write() + .unwrap() + .insert(metric.name.to_string(), gauge); + Ok(()) + } + Err(e) => { + if let prometheus::Error::AlreadyReg = e { + log::info!("Metric {:?} is already registered.", metric.name); + Ok(()) + } else { + log::error!("Failed to register metric {:?}. {:?}", metric.name, e); + Err(Error::Prometheus(e)) + } + } + } + } + Kind::IntCounterVec => { + let counter = + IntCounterVec::new(Opts::new(metric.name, metric.help), metric.label_names) + .unwrap(); + match self.prometheus.registry.register(Box::new(counter.clone())) { + Ok(()) => { + self.int_counters_vecs + .write() + .unwrap() + .insert(metric.name.to_string(), counter); + Ok(()) + } + Err(e) => { + if let prometheus::Error::AlreadyReg = e { + log::info!("Metric {:?} is already registered.", metric.name); + Ok(()) + } else { + log::error!("Failed to register metric {:?}. {:?}", metric.name, e); + Err(Error::Prometheus(e)) + } + } + } + } + Kind::IntCounter => { + let counter = IntCounter::new(metric.name, metric.help).unwrap(); + match self.prometheus.registry.register(Box::new(counter.clone())) { + Ok(()) => { + self.int_counters + .write() + .unwrap() + .insert(metric.name.to_string(), counter); + Ok(()) + } + Err(e) => { + if let prometheus::Error::AlreadyReg = e { + log::info!("Metric {:?} is already registered.", metric.name); + Ok(()) + } else { + log::error!("Failed to register metric {:?}. {:?}", metric.name, e); + Err(Error::Prometheus(e)) + } + } + } + } + } + } + fn with_metric_configs<'a>(&self, metrics: &'a [MetricConfig<'a>]) -> Result<(), Error> { + for metric in metrics { + match self.with_metric_config(metric) { + Ok(()) => (), + Err(e) => return Err(e), + } + } + Ok(()) + } +} + +impl Registrar { + pub fn new(prometheus: Arc) -> Self { + Self { + prometheus, + ..Default::default() + } + } + // Int + pub fn inc_int_counter(&self, key: &str) { + let mut counters = self.int_counters.write().unwrap(); + let counter = match counters.get_mut(key) { + Some(r) => r, + None => return, + }; + + counter.inc() + } + pub fn inc_int_counter_vec_mut(&self, key: &str, labels: &[&str]) { + let mut counters = self.int_counters_vecs.write().unwrap(); + let counter = match counters.get_mut(key) { + Some(r) => r, + None => return, + }; + + counter.with_label_values(labels).inc() + } + pub fn inc_by_int_counter_vec_mut(&self, key: &str, labels: &[&str], value: u64) { + let mut counters = self.int_counters_vecs.write().unwrap(); + let counter = match counters.get_mut(key) { + Some(r) => r, + None => return, + }; + + counter.with_label_values(labels).inc_by(value) + } + + pub fn set_int_gauge_vec_mut(&self, key: &str, labels: &[&str], value: i64) { + let mut gauges = self.int_gauges_vecs.write().unwrap(); + let gauge = match gauges.get_mut(key) { + Some(r) => r, + None => return, + }; + + gauge.with_label_values(labels).set(value) + } + // Floating + pub fn inc_counter_vec_mut(&self, key: &str, labels: &[&str]) { + let mut counters = self.counters_vecs.write().unwrap(); + let counter = match counters.get_mut(key) { + Some(r) => r, + None => return, + }; + + counter.with_label_values(labels).inc() + } + pub fn set_gauge_vec_mut(&self, key: &str, labels: &[&str], value: f64) { + let mut gauges = self.gauges_vecs.write().unwrap(); + let gauge = match gauges.get_mut(key) { + Some(r) => r, + None => return, + }; + + gauge.with_label_values(labels).set(value) + } +} diff --git a/libs/clickhouse_pool/src/pool.rs b/libs/clickhouse_pool/src/pool.rs new file mode 100644 index 0000000..044ff4a --- /dev/null +++ b/libs/clickhouse_pool/src/pool.rs @@ -0,0 +1,654 @@ +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use clickhouse::Client; +use deadpool::managed::{Manager, Metrics, Object, PoolError, RecycleError}; +use thiserror::Error; +use tokio::task; +use tokio::time::timeout; + +use crate::config::{ClickhouseConfig, DatalakeConfig}; +use crate::metrics::{Kind, MetricConfig, Registry, SharedRegistrar}; + +#[derive(Debug, Error)] +pub enum ClickhouseError { + #[error("Clickhouse client error: {0}")] + Client(#[from] clickhouse::error::Error), + + #[error("Connection validation failed: {0}")] + Validation(String), + + #[error("Connection timed out")] + Timeout, + + #[error("Pool error: {0}")] + Pool(String), + + #[error("Shutdown in progress")] + ShuttingDown, + + #[error("Batch insertion error: {0}")] + BatchInsertionError(String), +} + +impl From for ClickhouseError { + fn from(_: tokio::time::error::Elapsed) -> Self { + Self::Timeout + } +} + +impl From> for ClickhouseError { + fn from(value: PoolError) -> Self { + Self::Pool(value.to_string()) + } +} + +#[derive(Debug, Clone)] +pub struct PoolMetrics { + pub size: usize, + pub available: usize, + pub in_use: usize, + pub max_size: usize, + pub min_size: usize, + pub waiters: usize, +} + +pub struct ClickhouseConnection { + client: Client, + last_used: Instant, + id: u64, + query_count: AtomicU64, + created_at: Instant, +} + +impl ClickhouseConnection { + pub fn new(client: Client, id: u64) -> Self { + Self { + client, + last_used: Instant::now(), + id, + query_count: AtomicU64::new(0), + created_at: Instant::now(), + } + } + + pub fn id(&self) -> u64 { + self.id + } + + pub fn age(&self) -> Duration { + self.created_at.elapsed() + } + + pub fn idle_time(&self) -> Duration { + self.last_used.elapsed() + } + + pub fn query_count(&self) -> u64 { + self.query_count.load(Ordering::Relaxed) + } + + pub async fn health_check(&self) -> Result<(), ClickhouseError> { + match self.client.query("SELECT 1").execute().await { + Ok(_) => Ok(()), + Err(e) => Err(ClickhouseError::Client(e)), + } + } +} + +impl Deref for ClickhouseConnection { + type Target = Client; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +impl DerefMut for ClickhouseConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.client + } +} + +impl fmt::Debug for ClickhouseConnection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClickhouseConnection") + .field("id", &self.id) + .field("created_at", &self.created_at) + .field("query_count", &self.query_count) + .field("last_used", &self.last_used) + .finish() + } +} + +pub fn get_query_type(query: &str) -> &'static str { + let query = query.trim_start().to_uppercase(); + + if query.starts_with("SELECT") { + "select" + } else if query.starts_with("INSERT") { + "insert" + } else if query.starts_with("CREATE") { + "create" + } else if query.starts_with("ALTER") { + "alter" + } else if query.starts_with("DROP") { + "drop" + } else { + "other" + } +} + +#[derive(Debug)] +pub struct ClickhouseConnectionManager { + config: Arc, + next_connection_id: AtomicU64, + is_shutting_down: Arc, + metrics: Option, +} + +impl ClickhouseConnectionManager { + pub fn new(config: Arc, metrics: Option) -> Self { + Self { + config, + next_connection_id: AtomicU64::new(1), + is_shutting_down: Arc::new(AtomicBool::new(false)), + metrics, + } + } + + pub fn initiate_shutdown(&self) { + self.is_shutting_down.store(true, Ordering::SeqCst); + log::info!("Clickhouse connection manager shutdown in progress"); + } + + pub fn create_client(&self) -> Result { + let url = self.config.authenticated_connection_url(); + + let client = Client::default() + .with_url(url) + .with_user(&self.config.username) + .with_password(&self.config.password) + .with_option("async_insert", "1") + .with_option("wait_for_async_insert", "1"); + + Ok(client) + } +} + +impl Manager for ClickhouseConnectionManager { + type Type = ClickhouseConnection; + type Error = ClickhouseError; + + async fn create(&self) -> Result { + if self.is_shutting_down.load(Ordering::SeqCst) { + return Err(ClickhouseError::ShuttingDown); + } + + let connection_id = self.next_connection_id.fetch_add(1, Ordering::SeqCst); + + let start = Instant::now(); + + let config = &self.config.clone(); + log::debug!( + "Creating new Clickhouse connection [id: {}] to: {}:{}", + connection_id, + config.host, + config.port + ); + + let client = self.create_client()?; + + let validation_timeout = Duration::from_secs(config.connect_timeout_seconds); + + let validation = match timeout(validation_timeout, client.query("SELECT 1").execute()).await + { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => Err(ClickhouseError::Client(e)), + Err(_) => Err(ClickhouseError::Timeout), + }; + + let duration = start.elapsed(); + if let Some(metrics) = &self.metrics { + metrics.set_gauge_vec_mut( + "clickhouse_connection_creation_second", + &["create"], + duration.as_secs_f64(), + ); + } + + match validation { + Ok(()) => { + log::debug!( + "Connection established: [id: {}] in {:?}", + connection_id, + duration + ); + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connections_created_total", + &["success"], + ); + } + + Ok(ClickhouseConnection::new(client, connection_id)) + } + Err(e) => { + log::error!( + "Failed to validate ClickHouse connection (id: {}): {}", + connection_id, + e + ); + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connections_created_total", + &["failure"], + ); + } + + Err(e) + } + } + } + + async fn recycle( + &self, + conn: &mut Self::Type, + _: &Metrics, + ) -> Result<(), RecycleError> { + if self.is_shutting_down.load(Ordering::SeqCst) { + return Err(RecycleError::Message("Shutting down".into())); + } + + log::debug!("Testing health of connection: [id: {}]", conn.id()); + + let validation_timeout = Duration::from_secs(self.config.connect_timeout_seconds); + + match timeout(validation_timeout, conn.query("SELECT 1").execute()).await { + Ok(Ok(_)) => { + log::debug!("Connection [id: {}] health check passed", conn.id()); + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connections_health_checks_total", + &["success"], + ); + } + + Ok(()) + } + Ok(Err(e)) => { + log::warn!("Connection [id: {}] health check failed: {}", conn.id(), e); + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connections_health_checks_total", + &["failure"], + ); + } + + Err(RecycleError::Message( + format!("Health check failed: {}", e).into(), + )) + } + Err(_) => { + log::warn!( + "Connection [id: {}] health check timed out after: {:?}", + conn.id(), + validation_timeout + ); + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connections_health_checks_total", + &["timeout"], + ); + } + + Err(RecycleError::Message("Health check timed out".into())) + } + } + } +} + +pub type Pool = deadpool::managed::Pool; +pub type PooledConnection = Object; + +pub struct ClickhouseConnectionPool { + pool: Pool, + config: Arc, + metrics: Option, + is_initialized: AtomicBool, +} + +impl ClickhouseConnectionPool { + pub fn new(config: Arc, metrics: Option) -> Self { + if let Some(metrics_ref) = &metrics { + Self::register_metrics(metrics_ref); + } + + let initial_size = config.clickhouse.max_connections as usize; + + let manager = ClickhouseConnectionManager::new(config.clickhouse.clone(), metrics.clone()); + + let pool = deadpool::managed::Pool::::builder(manager) + .max_size(initial_size) + .build() + .expect("Failed to build connection pool"); + + Self { + pool, + config, + metrics, + is_initialized: AtomicBool::new(false), + } + } + + pub async fn initialize(&self) -> Result<(), ClickhouseError> { + if self.is_initialized.load(Ordering::SeqCst) { + return Ok(()); + } + + log::info!("Initializing Clickhouse connection pool"); + + let warmup_count = self.config.clickhouse.max_connections as usize; + + let mut warmup_handles = Vec::with_capacity(warmup_count); + + for i in 0..warmup_count { + let pool = self.pool.clone(); + + let handle = task::spawn(async move { + match pool.get().await { + Ok(conn) => match conn.health_check().await { + Ok(_) => { + log::debug!("Warm-up connection {} initialized successfully", i); + Ok(()) + } + Err(e) => { + log::error!("Warm-up connection {} health check failed: {}", i, e); + Err(e) + } + }, + Err(e) => { + log::error!("Failed to get warm-up connection {}: {}", i, e); + Err(ClickhouseError::Pool(e.to_string())) + } + } + }); + warmup_handles.push(handle); + } + + let mut warmup_success_count = 0; + for (i, handle) in warmup_handles.into_iter().enumerate() { + match handle.await { + Ok(Ok(_)) => { + warmup_success_count += 1; + } + Ok(Err(e)) => { + log::warn!("Warm-up connection {} failed: {}", i, e); + } + Err(e) => { + log::error!("Warm-up task {} panicked: {}", i, e); + } + } + } + + log::info!( + "Connection pool warm-up complete: {}/{} successful", + warmup_success_count, + warmup_count + ); + + self.is_initialized.store(true, Ordering::SeqCst); + + if let Some(metrics) = &self.metrics { + let status = self.pool.status(); + metrics.set_int_gauge_vec_mut( + "clickhouse_pool_connections", + &["available"], + status.available as i64, + ); + metrics.set_int_gauge_vec_mut( + "clickhouse_pool_connections", + &["size"], + status.size as i64, + ); + } + + Ok(()) + } + + pub async fn get_connection(&self) -> Result { + if !self.is_initialized.load(Ordering::SeqCst) { + log::warn!("Attempting to get connection from uninitialized pool"); + } + + let start = Instant::now(); + + let timeout_duration = Duration::from_secs(self.config.clickhouse.connect_timeout_seconds); + + for attempt in 0..3 { + match tokio::time::timeout(timeout_duration, self.pool.get()).await { + Ok(Ok(conn)) => { + let duration = start.elapsed(); + + if let Some(metrics) = &self.metrics { + metrics.set_gauge_vec_mut( + "clickhouse_connection_acquisition_seconds", + &["success"], + duration.as_secs_f64(), + ); + metrics.inc_int_counter_vec_mut( + "clickhouse_connection_acquisition_total", + &["success"], + ); + } + + log::debug!( + "Connection acquired in {:?} (attempt {})", + duration, + attempt + 1 + ); + return Ok(conn); + } + Ok(Err(e)) => { + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connection_acquisition_total", + &["failure"], + ); + } + + log::warn!( + "Failed to get connection from pool (attempt {}): {}", + attempt + 1, + e + ); + + if attempt >= 2 { + return Err(ClickhouseError::Pool(e.to_string())); + } + } + Err(_) => { + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connection_acquisition_total", + &["timeout"], + ); + } + + log::warn!( + "Timed out waiting for connection (attempt {}) after {:?}", + attempt + 1, + timeout_duration + ); + + if attempt >= 2 { + return Err(ClickhouseError::Timeout); + } + } + } + let backoff = Duration::from_millis(50 * 2u64.pow(attempt)); + tokio::time::sleep(backoff).await; + } + Err(ClickhouseError::Pool( + "Failed to get connection after retries".to_string(), + )) + } + + pub async fn shutdown(&self) -> Result<(), ClickhouseError> { + log::info!("Initiating graceful shutdown of ClickHouse connection pool"); + + let pool_manager = self.pool.manager(); + pool_manager.initiate_shutdown(); + + let status = self.pool.status(); + log::info!( + "Connection pool status before shutdown: size={}, available={}, in_use={}", + status.size, + status.available, + status.size - status.available + ); + + let drain_timeout = Duration::from_secs(30); + let drain_start = Instant::now(); + + loop { + let status = self.pool.status(); + let in_use = status.size - status.available; + + if in_use == 0 { + log::info!("All connections returned to pool, proceeding with shutdown"); + break; + } + + if drain_start.elapsed() > drain_timeout { + log::warn!( + "Shutdown drain timeout exceeded, {} connections still in use", + in_use + ); + + break; + } + + log::info!("Waiting for {} connections to be returned to pool", in_use); + tokio::time::sleep(Duration::from_secs(1)).await; + } + + // Close all connections + self.pool.close(); + log::info!("All connections closed"); + + log::info!("ClickHouse connection pool shutdown complete"); + + Ok(()) + } + + fn register_metrics(metrics: &SharedRegistrar) { + let metric_configs = [ + // Connection + MetricConfig { + kind: Kind::IntCounterVec, + name: "clickhouse_connections_created_total", + help: "Total no. of connections created", + label_names: &["status"], + }, + MetricConfig { + kind: Kind::IntGaugeVec, + name: "clickhouse_pool_connections", + help: "Current no. of connections in the pool", + label_names: &["state"], + }, + MetricConfig { + kind: Kind::IntCounterVec, + name: "clickhouse_connetion_health_checks_total", + help: "Total no. of connection health checks", + label_names: &["status"], + }, + MetricConfig { + kind: Kind::GaugeVec, + name: "clickhouse_connection_creation_seconds", + help: "Time taken to create connections", + label_names: &["operation"], + }, + // Queries + MetricConfig { + kind: Kind::IntCounterVec, + name: "clickhouse_queries_total", + help: "Total no. of queries executed", + label_names: &["type"], + }, + MetricConfig { + kind: Kind::IntCounterVec, + name: "clickhouse_query_errors_total", + help: "Total no. of query errors", + label_names: &["type"], + }, + MetricConfig { + kind: Kind::GaugeVec, + name: "clickhouse_query_duration_seconds", + help: "Query execution time in seconds", + label_names: &["type"], + }, + // Batch queries + MetricConfig { + kind: Kind::IntCounterVec, + name: "clickhouse_batch_query_errors_total", + help: "Total number of batch query errors", + label_names: &["type"], + }, + MetricConfig { + kind: Kind::GaugeVec, + name: "clickhouse_batch_query_duration_seconds", + help: "Batch query execution time in seconds", + label_names: &["type"], + }, + // Connection acquisition + MetricConfig { + kind: Kind::GaugeVec, + name: "clickhouse_connection_acquisition_seconds", + help: "Time taken to acquire a connection from the pool", + label_names: &["status"], + }, + MetricConfig { + kind: Kind::IntCounterVec, + name: "clickhouse_connection_acquisition_total", + help: "Total number of connection acquisition attempts", + label_names: &["status"], + }, + MetricConfig { + kind: Kind::IntCounterVec, + name: "clickhouse_connections_recycled_total", + help: "Total number of connections recycled", + label_names: &["reason"], + }, + MetricConfig { + kind: Kind::GaugeVec, + name: "clickhouse_connection_recycling_seconds", + help: "Time taken for connection recycling", + label_names: &["operation"], + }, + ]; + + metrics.with_metric_configs(&metric_configs).ok(); + } + + pub fn status(&self) -> PoolMetrics { + let status = self.pool.status(); + + PoolMetrics { + size: status.size, + available: status.available, + in_use: status.size - status.available, + max_size: status.max_size, + min_size: status.max_size, + waiters: status.waiting, + } + } +} diff --git a/libs/clickhouse_pool/src/pool_manager.rs b/libs/clickhouse_pool/src/pool_manager.rs new file mode 100644 index 0000000..77100d3 --- /dev/null +++ b/libs/clickhouse_pool/src/pool_manager.rs @@ -0,0 +1,415 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use crate::batch_processor::{BatchCommand, BatchSender}; +use crate::config::DatalakeConfig; +use crate::metrics::SharedRegistrar; +use crate::pool::{get_query_type, ClickhouseConnectionPool, ClickhouseError}; +use crate::traits::Model; +use anyhow::Result; +use serde::de::DeserializeOwned; +use tokio::sync::mpsc; +use tokio::time::interval; + +#[derive(Clone)] +pub struct PoolManager { + pool: Arc, + config: Arc, + metrics: Option, + last_recycle_time: u64, +} + +impl PoolManager { + pub async fn new(config: Arc, metrics: Option) -> Self { + let pool = Arc::new(ClickhouseConnectionPool::new( + config.clone(), + metrics.clone(), + )); + + let _ = match pool.initialize().await { + Ok(_) => { + log::debug!("Pool warmed up and initialized successfully"); + } + Err(e) => { + log::error!("Error warming up and initializing pool: {:?}", e); + } + }; + + Self { + pool, + config, + metrics, + last_recycle_time: 0, + } + } + + pub fn get_pool(&self) -> Arc { + self.pool.clone() + } + + pub fn seconds_since_last_recycle(&self) -> u64 { + let last = self.last_recycle_time; + + if last == 0 { + return u64::MAX; + } + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + now.saturating_sub(last) + } + + pub async fn refill_connection_pool(&self) -> Result { + let pool = self.get_pool(); + let status = pool.status(); + + let current_total = status.size; + let target_total = self.config.clickhouse.max_connections as usize; + let deficit = target_total.saturating_sub(current_total); + + if deficit == 0 { + log::info!("Deficit = 0"); + return Ok(0); + } + + let to_add = deficit; + log::info!("Attempting to add {} new connections to pool", to_add); + + let mut added = 0; + + for i in 0..to_add { + match pool.get_connection().await { + Ok(conn) => match conn.health_check().await { + Ok(_) => { + added += 1; + } + Err(e) => { + log::warn!("New connection failed health check: {}", e); + } + }, + Err(e) => { + log::error!( + "Failed to create new connection {}/{}: {}", + i + 1, + to_add, + e + ); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + + log::info!("Added {}/{} new connections to pool", added, to_add); + Ok(added) + } + + pub async fn recycle_idle_connections( + &mut self, + max_to_recycle: usize, + ) -> Result { + log::info!( + "Starting connection recycling - checking up to {} connections", + max_to_recycle + ); + let start = std::time::Instant::now(); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + self.last_recycle_time = now; + + let status = self.pool.status(); + log::info!("Clickhouse pool metrics: {:?}", status); + + let to_check = std::cmp::min(max_to_recycle, status.available); + + if to_check == 0 { + log::info!("No connections available for recycling"); + return Ok(0); + } + + log::info!( + "Checking {} connections out of {} available", + to_check, + status.available + ); + + let mut recycled = 0; + + for _ in 0..to_check { + match self.pool.get_connection().await { + Ok(conn) => match conn.health_check().await { + Ok(_) => { + log::debug!( + "Connection [id: {}] is healthy, returning to pool", + conn.id() + ); + } + Err(e) => { + log::warn!( + "Connection [id: {}] failed health check: {}, will be recycled", + conn.id(), + e + ); + recycled += 1; + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_connections_recycled_total", + &["health_check_failed"], + ); + } + } + }, + Err(e) => { + log::error!("Failed to get connection for health check: {}", e); + } + } + } + + let duration = start.elapsed(); + log::info!( + "Connection recycling complete: recycled={} in {:?}", + recycled, + duration + ); + + if let Some(metrics) = &self.metrics { + metrics.set_gauge_vec_mut( + "clickhouse_connection_recycling_seconds", + &["total"], + duration.as_secs_f64(), + ); + } + + Ok(recycled) + } + + pub async fn execute_with_retry(&self, query: &str) -> Result<(), ClickhouseError> { + let mut attempt = 0; + let max_retries = self.config.retry.max_retries; + + loop { + attempt += 1; + + let conn = match self.pool.get_connection().await { + Ok(conn) => conn, + Err(e) => { + if attempt > max_retries { + log::error!("Failed to get connection after {} attempts: {}", attempt, e); + return Err(e); + } + + let backoff = self.config.retry.backoff_duration(attempt); + log::warn!( + "Failed to get connection (attempt {}/{}), retrying in {:?}: {}", + attempt, + max_retries, + backoff, + e + ); + + tokio::time::sleep(backoff).await; + continue; + } + }; + + match conn.query(query).execute().await { + Ok(response) => { + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_query_success_total", + &[get_query_type(query)], + ); + } + + return Ok(response); + } + Err(e) => { + if attempt > max_retries { + log::error!("Query failed after {} attempts: {}", attempt, e); + return Err(ClickhouseError::Client(e)); + } + + let backoff = self.config.retry.backoff_duration(attempt); + log::warn!( + "Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}", + attempt, + max_retries, + backoff, + e, + query + ); + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_query_retries_total", + &[get_query_type(query)], + ); + } + + tokio::time::sleep(backoff).await; + } + } + } + } + + pub async fn execute_select_with_retry(&self, query: &str) -> Result, ClickhouseError> + where + T: clickhouse::Row + DeserializeOwned + Send + 'static, + { + let mut attempt = 0; + let max_retries = self.config.retry.max_retries; + + loop { + attempt += 1; + + let conn = match self.pool.get_connection().await { + Ok(conn) => conn, + Err(e) => { + if attempt > max_retries { + log::error!("Failed to get connection after {} attempts: {}", attempt, e); + return Err(e); + } + + let backoff = self.config.retry.backoff_duration(attempt); + log::warn!( + "Failed to get connection (attempt {}/{}), retrying in {:?}: {}", + attempt, + max_retries, + backoff, + e + ); + + tokio::time::sleep(backoff).await; + continue; + } + }; + + match conn.query(query).fetch_all::().await { + Ok(response) => { + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_query_success_total", + &[get_query_type(query)], + ); + } + + return Ok(response); + } + Err(e) => { + if attempt > max_retries { + log::error!("Query failed after {} attempts: {}", attempt, e); + return Err(ClickhouseError::Client(e)); + } + + let backoff = self.config.retry.backoff_duration(attempt); + log::warn!( + "Query failed (attempt {}/{}), retrying in {:?}: {}\nQuery: {}", + attempt, + max_retries, + backoff, + e, + query + ); + + if let Some(metrics) = &self.metrics { + metrics.inc_int_counter_vec_mut( + "clickhouse_query_retries_total", + &[get_query_type(query)], + ); + } + + tokio::time::sleep(backoff).await; + } + } + } + } + + pub fn create_batch_processor( + &self, + batch_size: usize, + max_wait_ms: u64, + ) -> BatchSender + where + M: Model + Send + Sync + 'static, + M::T: Clone + Send + 'static, + { + let (tx, mut rx) = mpsc::channel(1000); + + let pool_manager = self.clone(); + + tokio::spawn(async move { + let mut batch = Vec::with_capacity(batch_size); + let mut last_flush = Instant::now(); + let mut flush_interval = interval(Duration::from_millis(100)); + + loop { + tokio::select! { + cmd = rx.recv() => match cmd { + Some(BatchCommand::Add(item)) => { + batch.push(item); + + if batch.len() >= batch_size { + Self::process_batch::(&pool_manager, &mut batch).await; + last_flush = Instant::now(); + } + }, + Some(BatchCommand::Flush) => { + if !batch.is_empty() { + Self::process_batch::(&pool_manager, &mut batch).await; + last_flush = Instant::now(); + } + }, + None => break, + }, + + _ = flush_interval.tick() => { + if !batch.is_empty() && last_flush.elapsed() >= Duration::from_millis(max_wait_ms) { + Self::process_batch::(&pool_manager, &mut batch).await; + last_flush = Instant::now(); + } + } + } + } + + if !batch.is_empty() { + Self::process_batch::(&pool_manager, &mut batch).await; + } + }); + + BatchSender { tx } + } + + async fn process_batch(pool_manager: &PoolManager, batch: &mut Vec) + where + M: Model, + M::T: Clone, + { + if batch.is_empty() { + return; + } + + let items = std::mem::take(batch); + + let query = M::batch_insert_query(&items); + + match pool_manager.execute_with_retry(&query).await { + Ok(_) => { + log::info!( + "Successfully inserted {} items into {}", + items.len(), + M::table_name() + ); + } + Err(e) => { + log::error!("Error inserting batch into {}: {}", M::table_name(), e); + batch.extend(items); + } + } + } +} diff --git a/libs/clickhouse_pool/src/traits.rs b/libs/clickhouse_pool/src/traits.rs new file mode 100644 index 0000000..be23202 --- /dev/null +++ b/libs/clickhouse_pool/src/traits.rs @@ -0,0 +1,15 @@ +pub trait Model { + type T; + + fn create_table_sql() -> &'static str; + fn table_name() -> &'static str; + fn column_names() -> Vec<&'static str>; + fn to_row(&self) -> (Vec<&'static str>, Vec); + fn insert_query(&self) -> String; + fn batch_insert_query(items: &[Self::T]) -> String; + fn build_select_query( + where_clause: Option<&str>, + limit: Option, + offset: Option, + ) -> String; +} diff --git a/resources/config.toml b/resources/config.toml index 5d3cd3a..af02f88 100644 --- a/resources/config.toml +++ b/resources/config.toml @@ -5,10 +5,10 @@ token = "" prefix = "!" [persistence] -host = "localhost" -port = 9000 +host = "127.0.0.1" +port = 8123 user = "default" -password = "default" +password = "password" database = "default"