diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..662ec19 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,5 @@ +# we should try to use this rustflags -C target-cpu=native for microarchitecture optimizations, +# but this only works on the target machine, so we have it commented out at the moment. +# You can also set the target-cpu to do cross compilation if you know the exact micro architecture of the target machine. +[build] +#rustflags = ["-C", "target-cpu=native"] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 39e4c77..f3fd4f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,41 @@ dependencies = [ "memchr", ] +[[package]] +name = "aide" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6966317188cdfe54c58c0900a195d021294afb3ece9b7073d09e4018dbb1e3a2" +dependencies = [ + "aide-macros", + "axum", + "axum-extra 0.10.3", + "bytes", + "cfg-if", + "http 1.4.0", + "indexmap", + "schemars", + "serde", + "serde_json", + "serde_qs", + "thiserror 2.0.18", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "aide-macros" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f2a08f14808f3c46f3e3004b727bace64af44c3c5996d0480a14d3852b1b25a" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -431,6 +466,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9963ff19f40c6102c76756ef0a46004c0d58957d87259fc9208ff8441c12ab96" +dependencies = [ + "axum", + "axum-core", + "bytes", + "futures-util", + "http 1.4.0", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "serde_core", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-extra" version = "0.12.5" @@ -614,6 +671,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bytesize" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd91ee7b2422bcb158d90ef4d14f75ef67f340943fc4149891dcce8f8b972a3" +dependencies = [ + "serde_core", +] + [[package]] name = "cbor4ii" version = "0.3.3" @@ -962,6 +1028,41 @@ dependencies = [ "syn", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "data-encoding" version = "2.10.0" @@ -1043,6 +1144,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast-rs" version = "2.0.2" @@ -1145,15 +1252,6 @@ dependencies = [ "syn", ] -[[package]] -name = "env" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc95de49ad098572c02d3fbf368c9a020bfff5ae78483685b77f51d8a7e9486d" -dependencies = [ - "num_threads", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -1836,6 +1934,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -2763,6 +2867,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -2807,12 +2920,22 @@ dependencies = [ ] [[package]] -name = "num_threads" -version = "0.1.7" +name = "objc2-core-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" +dependencies = [ + "bitflags", +] + +[[package]] +name = "objc2-io-kit" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" dependencies = [ "libc", + "objc2-core-foundation", ] [[package]] @@ -2828,10 +2951,12 @@ dependencies = [ name = "odorobo-agent" version = "0.1.0" dependencies = [ + "ahash", "async-trait", "axum", - "axum-extra", + "axum-extra 0.12.5", "axum_responses", + "bytesize", "cloud-hypervisor-client", "futures-util", "http-body-util", @@ -2839,51 +2964,67 @@ dependencies = [ "hyper-util", "hyperlocal", "if-addrs 0.13.4", + "kameo", "libc", + "libp2p", + "odorobo-shared", "random-port", "serde", "serde_json", "stable-eyre", + "sysinfo", "thiserror 2.0.18", "tokio", "tower-http", "tracing", "tracing-subscriber", + "ulid", "url", "zbus", "zbus_systemd", ] [[package]] -name = "odorobo-scheduler" +name = "odorobo-manager" version = "0.1.0" dependencies = [ "ahash", + "aide", "axum", - "env", + "bytesize", + "clap", + "dotenvy", "kameo", "libp2p", + "odorobo-agent", "odorobo-shared", "optional_struct", "reqwest", + "schemars", "serde", "serde_json", + "stable-eyre", "tokio", "tower-http", "tracing", "tracing-subscriber", - "utoipa", - "uuid", + "ulid", ] [[package]] name = "odorobo-shared" version = "0.1.0" dependencies = [ + "cloud-hypervisor-client", "kameo", "libp2p", "serde", + "stable-eyre", + "thiserror 2.0.18", "tokio", + "tracing", + "tracing-subscriber", + "ulid", "utoipa", ] @@ -3383,6 +3524,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "regex" version = "1.12.3" @@ -3675,6 +3836,32 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "schemars" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f" +dependencies = [ + "dyn-clone", + "indexmap", + "ref-cast", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5016d94c77c6d32f0b8e08b781f7dc8a90c2007d4e77472cc2807bc10a8438fe" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -3740,6 +3927,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_html_form" version = "0.2.8" @@ -3777,6 +3975,19 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_qs" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b417bedc008acbdf6d6b4bc482d29859924114bbe2650b7921fb68a261d0aa6" +dependencies = [ + "axum", + "futures", + "percent-encoding", + "serde", + "thiserror 2.0.18", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -3981,6 +4192,20 @@ dependencies = [ "syn", ] +[[package]] +name = "sysinfo" +version = "0.38.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92ab6a2f8bfe508deb3c6406578252e491d299cbbf3bc0529ecc3313aee4a52f" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows", +] + [[package]] name = "system-configuration" version = "0.7.0" @@ -4394,6 +4619,18 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.2", + "serde", + "uuid", + "web-time", +] + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index 14e4921..d45be1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,50 @@ [workspace] resolver = "3" -members = ["odorobo-agent", "odoroboctl", "odorobo-scheduler", "odorobo-shared"] +members = ["odorobo-agent", "odoroboctl", "odorobo-manager", "odorobo-shared"] [workspace.dependencies] stable-eyre = "0.2.2" tracing = "0.1" -tracing-subscriber = {version = "0.3", features = ["env-filter", "fmt", "json"] } +tracing-subscriber = { version = "0.3", features = [ + "env-filter", + "fmt", + "json", +] } cloud-hypervisor-client = "0.3" serde_json = "1.0.149" serde = "1.0" axum = { version = "0.8.8", features = ["ws"] } -axum-responses = "0.5.5" +thiserror = "2.0.18" axum-extra = "0.12" tower-http = { version = "0.6", features = ["trace"] } -tokio = { version = "1.50.0" } +tokio = { version = "1" } +kameo = { version = "0.19.2", features = ["remote"] } +libp2p = { version = "0.56.0", features = ["yamux", "serde", "mdns"] } +ahash = { version = "0.8.12", features = ["serde"] } +ulid = { version = "1.2", features = ["serde", "uuid"] } +# aide needs an older version of schemars +schemars = "0.9" +bytesize = { version = "2.3.1", features = ["serde"] } +odorobo-shared = { path = "odorobo-shared" } +odorobo-agent = { path = "odorobo-agent" } +odoroboctl = { path = "odoroboctl" } +odorobo-manager = { path = "odorobo-manager" } +dotenvy = "0.15" +clap = { version = "4", features = ["derive", "env"] } +# these are for performance optimizations, based on https://nnethercote.github.io/perf-book/build-configuration.html#optimization-level +# these options can take a while to compile (up to 5-10 minutes), so its really only necessary for production binaries. +# also look in .cargo/config.toml for info on microarchitecture optimizations. +[profile.release-optimized] +inherits = "release" + +opt-level = 3 +lto = "fat" +codegen-units = 1 + +# we are specifically NOT using panic = "abort" because we need to use catch_unwind for kameo actors on_panic. +# for any crates using kameo actors on_panic, this would literally break the binary. +# For specific crates/binaries this might be something we do, but I (caleb) doubt it. + +strip = "symbols" # TODO: this we should talk about doing this, because it makes debugging harder, so if we have a panic in production, the backtrace will likely not be as useful. + +incremental = false # this is just to get reproducible builds. incremental compiles aren't consistent and can sometimes be broken when combined with optimizations diff --git a/HACKING.md b/HACKING.md index 36a86c9..b0e98bd 100644 --- a/HACKING.md +++ b/HACKING.md @@ -55,4 +55,22 @@ If your trait needs to do dynamic dispatch (e.g. for provisioning hooks), you mu ## Rust Hypervisor Firmware failing to boot newer images -out of scope for odorobo, tracking issue [here](https://github.com/cloud-hypervisor/rust-hypervisor-firmware/issues/412) \ No newline at end of file +out of scope for odorobo, tracking issue [here](https://github.com/cloud-hypervisor/rust-hypervisor-firmware/issues/412) + +## creating kameo handlers + +You need to impl a `Message` trait for the actor to be able to handle a message on an actor. The following is a template for this + +You also need to implement + +```rs +#[remote_message] +impl Message for Actor { + type Reply = ReplyType; + + async fn handle(&mut self, msg: RequestMessageType, _ctx: &mut Context) -> Self::Reply { + // this code will run whenever someone sends the actor this type of message. + ReplyType {} + } +} +``` \ No newline at end of file diff --git a/docs/kademlia_dht.md b/docs/kademlia_dht.md new file mode 100644 index 0000000..2d872c3 --- /dev/null +++ b/docs/kademlia_dht.md @@ -0,0 +1,23 @@ +# kademlia dht + +the cluster can use a dht for node lookups and for any global state we store. + +we should have someone (probably caleb) look at it for like an hour or two because if it works it could be possibly faster, definitely more reliable, and easier then a centralized solution like postgres. if they cant get it working in 2h, just stop working on it entirely and it becomes a post MVP thing. + +To do this, basically implement what is in the following example into the connect_to_swarm function: https://github.com/libp2p/rust-libp2p/blob/master/examples/ipfs-kad/src/main.rs. + +One problem with this is to bootstrap, we either also need mdns for the first nodes, or to hardcode a couple nodes. + +### quorum/replication factor + +We will use majority quorum, but we have to be careful about if we lose enough nodes. + +basically if a server goes down and we only run dht actors/nodes on the compute nodes, we currently would lose 1/8 of the cluster. + +This means we need to make sure that our replication factor is high enough that the percent chance of any key in the dht not losing majority quorum + +this will be less of a problem as we have more servers that can run dht nodes. one thing that may help is running some dht nodes on the storage nodes. + +The spec recommends a replication factor of 20. https://github.com/libp2p/specs/blob/master/kad-dht/README.md#replication-parameter-k + +That is likely high enough that we might not even need to do the math. diff --git a/docs/scheduler_routes.md b/docs/scheduler_routes.md new file mode 100644 index 0000000..a1e62f9 --- /dev/null +++ b/docs/scheduler_routes.md @@ -0,0 +1,33 @@ +# scheduler routes + +this is a very rough definition of what routes we will have + +## vms + +we need CRUD. we keep track of if its active. + +MACs for VMs need to be stored somewhere. it can be either in the DHT or in the Dashboard. doesnt matter. + +## volumes + +we again need CRUD, and we keep track of the volumes cause we want to just check what is in the SAN + +## coloc + +we need CRUD for ip to mac address assignment. this will be used in the admin management portal by infra people (ex: kathrine) to assign an ip to coloc when they rack servers. + +the dashboard needs to keep track of both coloc and vps ip assignments because we don't want to have to get all the data from the router everytime we do an ip assignment. we still need possibly two routes to get all mac/ip assignments from the router and a single assignment, to be able to verify the data is accurate in the dashboard db. there likely should be a job that runs every so often that verifies all this info and makes sure nothing is out of date. + +In addition, we would like the ability to get a list of all MACs on the router, and then filter them by the known MACs (VPSes, our servers, and known colocs), and then display only the unknown macs on the dashboard. this should make it very easy for infra people to rack a server, they see the MAC that we have never seen before and hopefully has a first seen time associated of right now, and then they just use that one when they do the server has been racked admin managment form + +## other various things to do + +socket terminal stuff via websockets + +drain server so infra people can migrate all VMs off a server so we can take it down for maintence + +they also need a get all servers so they can view the list of servers. this needs to be able show data from the infra config file, and possibly will give any metrics we can reasonably get from sys_info. the metrics/extra info should possibly be a different route. possibly ask addison + +## removing openapi spec + +the openapi spec is a pain because of utopia, so addison has told us that we don't need to give her a full spec file. If we just give her a file of rust structs, labeled similar to she is fine with that. \ No newline at end of file diff --git a/odorobo-agent/Cargo.toml b/odorobo-agent/Cargo.toml index 618e9d9..e2f8a29 100644 --- a/odorobo-agent/Cargo.toml +++ b/odorobo-agent/Cargo.toml @@ -11,14 +11,19 @@ tracing-subscriber = { workspace = true } cloud-hypervisor-client = { workspace = true } http-body-util = "0.1.3" hyper = { version = "1.8.1", features = ["full"] } -hyper-util = { version = "0.1.20", features = ["client", "client-legacy", "http1", "http2"] } +hyper-util = { version = "0.1.20", features = [ + "client", + "client-legacy", + "http1", + "http2", +] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } hyperlocal = "0.9.1" tokio = { workspace = true, features = ["full"] } axum = { workspace = true, features = ["ws"] } -axum-extra = { workspace = true, features = ["query"] } axum_responses = "0.5.5" +axum-extra = { workspace = true, features = ["query"] } thiserror = "2.0.18" futures-util = "0.3.31" libc = "0.2" @@ -26,7 +31,17 @@ tower-http = { workspace = true, features = ["trace"] } random-port = "0.1.1" if-addrs = "0.13" zbus = "5.14.0" -zbus_systemd = { version = "0.26000.0", features = ["machine1", "systemd1", "zbus-async-tokio"] } +zbus_systemd = { version = "0.26000.0", features = [ + "machine1", + "systemd1", + "zbus-async-tokio", +] } url = { version = "2.5.8", features = ["serde"] } async-trait = "0.1.89" - +kameo = { workspace = true } +libp2p = { version = "0.56.0", features = ["yamux", "serde", "mdns"] } +ahash = { workspace = true } +sysinfo = "0.38.4" +bytesize = "2.3.1" +odorobo-shared = { workspace = true } +ulid = { workspace = true } \ No newline at end of file diff --git a/odorobo-agent/config.json b/odorobo-agent/config.json new file mode 100644 index 0000000..bc5ed7e --- /dev/null +++ b/odorobo-agent/config.json @@ -0,0 +1,4 @@ +{ + "datacenter": "Dev", + "region": "Dev" +} \ No newline at end of file diff --git a/odorobo-agent/src/actor/mod.rs b/odorobo-agent/src/actor/mod.rs new file mode 100644 index 0000000..8cdb699 --- /dev/null +++ b/odorobo-agent/src/actor/mod.rs @@ -0,0 +1,134 @@ +use crate::state::provisioning::actor::VMActor; +use ahash::AHashMap; +use bytesize::ByteSize; +use kameo::error::ActorStopReason; +use kameo::prelude::*; +use odorobo_shared::messages::create_vm::*; +use odorobo_shared::messages::debug::PanicAgent; +use serde::{Deserialize, Serialize}; +use stable_eyre::{Report, Result}; +use tracing::error; +use std::ops::ControlFlow; +use std::{fs, path::PathBuf}; +use sysinfo::System; + +use kameo::error::PanicError; + +#[derive(RemoteActor)] +pub struct AgentActor { + pub vcpus: u32, + pub memory: ByteSize, + pub config: Config, +} + +/// Gets the system hostname +pub fn hostname() -> String { + System::host_name().unwrap_or("odorobo".into()) +} + +/// This was requested by katherine. Do not change without asking her. +pub fn default_reserved_vcpus() -> u32 { + 2 +} + +// The infra team wants a config file on the box where they can set info specific for the box its on. +// TODO: Double check with infra team (katherine) if they want any other config on the box. +#[derive(Serialize, Deserialize)] +pub struct Config { + /// The hostname of the agent. Defaults to the system hostname + /// if not specified in the config file. + #[serde(default = "hostname")] + pub hostname: String, + /// The datacenter the agent is running in. + pub datacenter: String, + /// The region the agent is running in. + pub region: String, + /// The number of VCPUs reserved for the agent. Defaults to 2. + #[serde(default = "default_reserved_vcpus")] + pub reserved_vcpus: u32, + /// this is just arbitrary data that will be shown but does no config + /// Arbitrary labels that can be used + #[serde(default)] + pub labels: AHashMap, + /// Arbitrary annotations that can be used + #[serde(default)] + pub annotations: AHashMap, +} + +impl Actor for AgentActor { + type Args = (); + type Error = Report; + + async fn on_start(state: Self::Args, actor_ref: ActorRef) -> Result { + // TODO: ask infra team where they want this on the box + let file = fs::File::open("config.json").expect("file should open read only"); + let config: Config = serde_json::from_reader(file).expect("file should be proper JSON"); + + let sys = System::new_all(); + + Ok(AgentActor { + vcpus: sys.cpus().len() as u32, + memory: ByteSize::b(sys.total_memory()), + config, + }) + } + + // async fn on_panic(state: Self::Args, weak_actor_ref: WeakActorRef, _panic: &PanicError) { + // panic!("Agent panicked: {:?}", _panic); + // } + // + async fn on_panic( + &mut self, + actor_ref: WeakActorRef, + err: PanicError, + ) -> Result> { + error!("Agent panicked: {:?}", err); + + // todo: if we panic, we should completely regen the self struct from scratch. The assumption should be that memory corruption could have possibly happened becauew + + Ok(ControlFlow::Continue(())) + } +} + +#[remote_message] +impl Message for AgentActor { + type Reply = CreateVMReply; + + async fn handle( + &mut self, + msg: CreateVM, + _ctx: &mut Context, + ) -> Self::Reply { + // TODO: this is unfinished. we intend on using the state::provisioning::actor stuff for this I think. + let vmid = ulid::Ulid::new(); + let actor_ref = VMActor::spawn(VMActor { + ch_socket_path: PathBuf::from(format!("/run/odorobo/vms/{}/ch.sock", vmid)), + vmid, + vm_config: Default::default(), + }); + + let actor_registration_result = actor_ref.register("vm").await; + + tracing::info!("someone asked us for available capacity"); + CreateVMReply { + config: Default::default(), + } + } +} + +#[remote_message] +impl Message for AgentActor { + type Reply = (); + + async fn handle( + &mut self, + msg: PanicAgent, + _ctx: &mut Context, + ) -> Self::Reply { + tracing::info!("panicking"); + panic!(); + } +} + +#[cfg(test)] +mod tests {} diff --git a/odorobo-agent/src/lib.rs b/odorobo-agent/src/lib.rs new file mode 100644 index 0000000..6f437fc --- /dev/null +++ b/odorobo-agent/src/lib.rs @@ -0,0 +1,4 @@ +pub mod actor; +mod api; +mod state; +mod util; \ No newline at end of file diff --git a/odorobo-agent/src/main.rs b/odorobo-agent/src/main.rs index 511b25b..1546da2 100644 --- a/odorobo-agent/src/main.rs +++ b/odorobo-agent/src/main.rs @@ -1,45 +1,44 @@ +pub mod actor; mod api; mod state; mod util; +use kameo::actor::Spawn; +use odorobo_shared::connect_to_swarm; use stable_eyre::Result; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; - -fn env_filter() -> EnvFilter { - let env = std::env::var("ODOROBO_LOG").unwrap_or_else(|_| "".into()); - - let base = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .parse_lossy(&env); - - #[cfg(debug_assertions)] - let base = base.add_directive("odorobo_agent=trace".parse().unwrap()); - - base -} +// use odorobo_shared:: +use crate::actor::AgentActor; #[tokio::main] async fn main() -> Result<()> { - stable_eyre::install()?; - tracing_subscriber::fmt() - .with_env_filter(env_filter()) - .with_file(true) - .with_line_number(true) - .init(); + odorobo_shared::utils::init()?; tracing::info!("Starting odorobo-agent..."); - // minimal axum server - - let listener = tokio::net::TcpListener::bind("0.0.0.0:8890").await?; - let port = listener.local_addr()?.port(); - let addrs: Vec = if_addrs::get_if_addrs()? - .into_iter() - .filter(|i| !i.is_loopback()) - .map(|i| format!("http://{}:{}", i.ip(), port)) - .collect(); - tracing::info!(port, ?addrs, "Listening"); - axum::serve(listener, api::router(port)).await?; + // minimal axum server, debug socket + // + // todo: remove this, here to stub out dead code + tokio::task::spawn(async { + let listener = tokio::net::TcpListener::bind("0.0.0.0:8890").await?; + let port = listener.local_addr()?.port(); + let addrs: Vec = if_addrs::get_if_addrs()? + .into_iter() + .filter(|i| !i.is_loopback()) + .map(|i| format!("http://{}:{}", i.ip(), port)) + .collect(); + tracing::info!(port, ?addrs, "Listening"); + axum::serve(listener, api::router(port)).await?; + Ok::<(), stable_eyre::Report>(()) + }); + + let local_peer_id = connect_to_swarm().unwrap(); + tracing::info!(?local_peer_id, "Peer ID"); + + let actor_ref = AgentActor::spawn(()); + actor_ref.register("agent").await?; + + actor_ref.wait_for_shutdown().await; Ok(()) } diff --git a/odorobo-agent/src/state/mod.rs b/odorobo-agent/src/state/mod.rs index 7f56ead..8bea936 100644 --- a/odorobo-agent/src/state/mod.rs +++ b/odorobo-agent/src/state/mod.rs @@ -3,11 +3,11 @@ //! Runtime state (in /run) is not persisted across reboots, so we use it for //! ephemeral VM state like running instances. Persistent state goes in the database. -mod api; -mod instance; -mod devices; -mod provisioning; -mod transform; +pub mod api; +pub mod instance; +pub mod devices; +pub mod provisioning; +pub mod transform; pub use api::{call, call_request}; pub use instance::{CONFIG_FILE_NAME, ChApiError, ConsoleStream, VMInstance, VMS_DIR_NAME}; diff --git a/odorobo-agent/src/state/provisioning/actor/mod.rs b/odorobo-agent/src/state/provisioning/actor/mod.rs new file mode 100644 index 0000000..5a14b46 --- /dev/null +++ b/odorobo-agent/src/state/provisioning/actor/mod.rs @@ -0,0 +1,82 @@ +use super::VMProvisionerBackend; +use crate::state::VMInstance; +use cloud_hypervisor_client::models::VmConfig; +use kameo::prelude::*; +use stable_eyre::Report; +use stable_eyre::Result; +use std::path::PathBuf; +use crate::state::provisioning::default_provisioner; +/* +use std::process::Command; + +let output = Command::new("echo") +.arg("Hello world") +.output() +.expect("Failed to execute command"); + */ + +#[derive(RemoteActor)] +pub struct VMActor { + pub vmid: ulid::Ulid, + /// Pre-transform config, transformed config goes into the CH instance itself + pub vm_config: VmConfig, + /// path to the Cloud Hypervisor socket, in /run/odorobo/vms//ch.sock + pub ch_socket_path: PathBuf, + // handle to the Cloud Hypervisor process + // process_handle: tokio::process::Child, +} + +impl Actor for VMActor { + // tuple of VM ID and config + type Args = (ulid::Ulid, VmConfig); + type Error = Report; + + #[tracing::instrument(skip_all)] + async fn on_start((vmid, vm_config): Self::Args, actor_ref: ActorRef) -> Result { + let ch_sock_path = VMInstance::runtime_dir_for(&vmid.to_string()).join("ch.sock"); + + tracing::warn!("no-op"); + // spawn CH instance + // this probably is not an ideal way to do this, but we want a minimal thing + // so let's spawn CH as a child + // + // ...or we go back to that systemd way + // let ch_process = tokio::process::Command::new("cloud-hypervisor") + // .arg("--api-socket") + // .arg(&ch_sock_path); + + // use the provisioner to spawn the VM instance + // consider spawning transient services instead for easier code deployment + default_provisioner().start_instance(&vmid.to_string(), &vm_config).await?; + + Ok(Self { + vmid, + vm_config, + ch_socket_path: ch_sock_path, + // process_handle: ch_process.spawn()?, + }) + } +} + +// allow conversion from VMActor to VMInstance to call API +impl From for VMInstance { + fn from(actor: VMActor) -> Self { + Self { + id: actor.vmid.into(), + ch_socket_path: actor.ch_socket_path, + } + } +} + +// /// Provisioner backend for VM instances using an actor-based model +// pub struct ActorProvisioner; + +// impl VMProvisionerBackend for ActorProvisioner { +// async fn start_instance(&self, vmid: &str) -> Result { +// todo!() +// } + +// async fn stop_instance(&self, vmid: &str) -> Result<()> { +// todo!() +// } +// } diff --git a/odorobo-agent/src/state/provisioning/mod.rs b/odorobo-agent/src/state/provisioning/mod.rs index 0448921..552421d 100644 --- a/odorobo-agent/src/state/provisioning/mod.rs +++ b/odorobo-agent/src/state/provisioning/mod.rs @@ -4,6 +4,7 @@ //! Cloud Hypervisor process for a given instance mod hooks; mod systemd; +pub mod actor; use cloud_hypervisor_client::models::VmConfig; use stable_eyre::{Result, eyre::Context}; use tracing::info; diff --git a/odorobo-manager/Cargo.toml b/odorobo-manager/Cargo.toml new file mode 100644 index 0000000..9bcaad8 --- /dev/null +++ b/odorobo-manager/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "odorobo-manager" +version = "0.1.0" +edition = "2024" + +[dependencies] +kameo = { version = "0.19.2", features = ["remote"] } +axum = { version = "0.8.8", features = ["tracing", "macros"] } +aide = { version = "0.15.1", features = [ + "axum", + "axum-json", + "axum-extra", + "axum-tokio", + "axum-ws", + "swagger", + "macros", +] } +schemars.workspace = true +clap.workspace = true +bytesize.workspace = true +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" +tokio = { workspace = true, features = [ + "full", +] } # TODO (june): tracing is only in tokio unstable? NOTE (caleb): I think that just refers to tokio logging tracing events, we can still make our own and look at other crate's events if necessary. +tower-http = { version = "0.6.8", features = ["trace"] } +dotenvy = { workspace = true } +tracing = { workspace = true } +optional_struct = "0.5.2" +tracing-subscriber = { workspace = true } +ulid = { workspace = true } +libp2p = { version = "0.56.0", features = ["yamux", "serde", "mdns"] } +reqwest = { version = "0.13", features = ["json"] } +ahash = { version = "0.8.12", features = ["serde"] } +# utoipa = { version = "5.4.0", features = ["uuid"] } +odorobo-shared = { workspace = true } +stable-eyre = { workspace = true } +odorobo-agent = { workspace = true } diff --git a/odorobo-manager/src/actors/http_actor.rs b/odorobo-manager/src/actors/http_actor.rs new file mode 100644 index 0000000..ae8be79 --- /dev/null +++ b/odorobo-manager/src/actors/http_actor.rs @@ -0,0 +1,30 @@ +use kameo::prelude::*; +use tracing::{error, info, warn}; +//use odorobo_shared::odorobo::server_actor::ServerActor; +use odorobo_agent::actor::AgentActor; +use odorobo_shared::messages::create_vm::*; +use odorobo_shared::messages::debug::PanicAgent; +use stable_eyre::{Report, Result}; +const EXTERNAL_HTTP_ADDRESS: &str = "0.0.0.0:3000"; +const EXTERNAL_HTTP_URL: &str = "http://localhost:3000"; // TODO: mak +/// HTTP REST API service +#[derive(RemoteActor)] +pub struct HTTPActor; + +impl Actor for HTTPActor { + type Args = (); + type Error = Report; + + async fn on_start(_state: Self::Args, _actor_ref: ActorRef) -> Result { + // run the HTTP API + tokio::spawn(async move { + tracing::info!("Starting HTTP server on {EXTERNAL_HTTP_URL}"); + let listener = tokio::net::TcpListener::bind(EXTERNAL_HTTP_ADDRESS) + .await + .unwrap(); + axum::serve(listener, crate::api::build()).await.unwrap(); + }); + + Ok(Self) + } +} diff --git a/odorobo-manager/src/actors/ip_management_actor.rs b/odorobo-manager/src/actors/ip_management_actor.rs new file mode 100644 index 0000000..5c590dc --- /dev/null +++ b/odorobo-manager/src/actors/ip_management_actor.rs @@ -0,0 +1,32 @@ +use kameo::prelude::*; +use stable_eyre::{Report, Result}; +use tracing::{error, info, warn}; + +// idk if we ever agreed upon an OUI for fyra, but im reserving `FYR` for this +// -cappy +pub const FYRA_OUI: [u8; 3] = [0x46, 0x59, 0x52]; + +/// Calculates a MAC address for a given IP address using the FYRA OUI prefix. +/// +/// Takes the last 3 bytes of the IP address and combines them with the FYRA OUI prefix. +pub fn calculate_mac_address(ip: [u8; 4]) -> [u8; 6] { + let mut mac = [0u8; 6]; + mac[0..2].copy_from_slice(&FYRA_OUI); + mac[2..].copy_from_slice(&ip[2..]); + mac +} + +/// HTTP REST API service +#[derive(RemoteActor)] +pub struct IPManagementActor; + +impl Actor for IPManagementActor { + type Args = (); + type Error = Report; + + async fn on_start(_state: Self::Args, _actor_ref: ActorRef) -> Result { + // if we need to like prep the router stuff + + Ok(Self) + } +} diff --git a/odorobo-manager/src/actors/mod.rs b/odorobo-manager/src/actors/mod.rs new file mode 100644 index 0000000..1d943e9 --- /dev/null +++ b/odorobo-manager/src/actors/mod.rs @@ -0,0 +1,4 @@ +mod http_actor; +mod scheduler_actor; +mod serial_terminal_websocket_actor; +mod ip_management_actor; \ No newline at end of file diff --git a/odorobo-manager/src/actors/scheduler_actor.rs b/odorobo-manager/src/actors/scheduler_actor.rs new file mode 100644 index 0000000..a0ef53a --- /dev/null +++ b/odorobo-manager/src/actors/scheduler_actor.rs @@ -0,0 +1,62 @@ +use kameo::prelude::*; +use tower_http::classify::SharedClassifier; +use tracing::{error, info, warn}; +//use odorobo_shared::odorobo::server_actor::ServerActor; +use odorobo_agent::actor::AgentActor; +use odorobo_shared::messages::create_vm::*; +use odorobo_shared::messages::debug::PanicAgent; +use stable_eyre::{Report, Result}; + +#[derive(RemoteActor)] +pub struct SchedulerActor; + +impl Actor for SchedulerActor { + type Args = (); + type Error = Report; + + async fn on_start(state: Self::Args, actor_ref: ActorRef) -> Result { + let peer_id = actor_ref.id().peer_id().unwrap().clone(); + + info!("Actor started! Scheduler peer id: {peer_id}"); + + let mut agent_actor: Option> = None; + + loop { + let agent_actor_option = RemoteActorRef::::lookup("agent").await?; + + let Some(agent_actor_in_loop) = agent_actor_option else { + continue; + }; + + agent_actor = Some(agent_actor_in_loop); + break; + } + + let agent_actor = agent_actor.unwrap(); + + let agent_actor_peer_id = agent_actor.id().peer_id().unwrap().clone(); + + info!("Agent actor peer id: {agent_actor_peer_id}"); + + + + // let reply = agent_actor + // .ask(&CreateVM { + // vm_id: Default::default(), + // config: Default::default(), + // }) + // .await?; + + // info!(?reply, "Created VM Reply"); + + // tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + // warn!("Panicking Agent"); + + // agent_actor.tell(&PanicAgent).send()?; + + // error!("Agent has been panicked."); + + Ok(Self) + } +} diff --git a/odorobo-manager/src/actors/serial_terminal_websocket_actor.rs b/odorobo-manager/src/actors/serial_terminal_websocket_actor.rs new file mode 100644 index 0000000..d373b5c --- /dev/null +++ b/odorobo-manager/src/actors/serial_terminal_websocket_actor.rs @@ -0,0 +1,17 @@ +use kameo::prelude::*; +use tracing::{error, info, warn}; +use stable_eyre::{Report, Result}; + + +/// HTTP REST API service +#[derive(RemoteActor)] +pub struct SerialTerminalWebsocketActor; + +impl Actor for SerialTerminalWebsocketActor { + type Args = (); + type Error = Report; + + async fn on_start(_state: Self::Args, _actor_ref: ActorRef) -> Result { + todo!() + } +} diff --git a/odorobo-manager/src/actors/storage_actor.rs b/odorobo-manager/src/actors/storage_actor.rs new file mode 100644 index 0000000..d373b5c --- /dev/null +++ b/odorobo-manager/src/actors/storage_actor.rs @@ -0,0 +1,17 @@ +use kameo::prelude::*; +use tracing::{error, info, warn}; +use stable_eyre::{Report, Result}; + + +/// HTTP REST API service +#[derive(RemoteActor)] +pub struct SerialTerminalWebsocketActor; + +impl Actor for SerialTerminalWebsocketActor { + type Args = (); + type Error = Report; + + async fn on_start(_state: Self::Args, _actor_ref: ActorRef) -> Result { + todo!() + } +} diff --git a/odorobo-manager/src/api/mod.rs b/odorobo-manager/src/api/mod.rs new file mode 100644 index 0000000..0206e44 --- /dev/null +++ b/odorobo-manager/src/api/mod.rs @@ -0,0 +1,57 @@ +pub mod nodes; +pub mod types; +pub mod vms; +pub mod volumes; +use aide::{ + axum::{ApiRouter, IntoApiResponse, routing::get}, + openapi::{Info, OpenApi}, + swagger::Swagger, +}; +use axum::{Extension, Json, Router}; + +/// Build the full app: finalizes the OpenAPI spec and attaches it as an extension. +pub fn build() -> Router { + aide::generate::on_error(|error| { + tracing::warn!("aide schema gen error: {error}"); + }); + + let mut openapi = OpenApi { + info: Info { + title: "odorobo-manager".into(), + version: env!("CARGO_PKG_VERSION").into(), + ..Default::default() + }, + ..Default::default() + }; + + router().finish_api(&mut openapi).layer(Extension(openapi)) +} + +// todo: error handling +// +// see odorobo-agent's old API for error handling patterns, +// use `thiserror` and `axum_responses` to create consistent error responses across the API +// - cappy + +/// Main router for the API +fn router() -> ApiRouter { + ApiRouter::new() + .api_route("/health", get(health)) + .api_route("/swagger", Swagger::new("/openapi.json").axum_route()) + .api_route("/openapi.json", get(serve_api)) + .nest("/nodes", nodes::router()) + .nest("/vms", vms::router()) + .nest("/volumes", volumes::router()) +} + +/// Serve the OpenAPI spec as JSON +async fn serve_api(Extension(api): Extension) -> impl IntoApiResponse { + Json(api) +} + +/// Simple health check endpoint +/// +/// Returns "OK" if the server is running. +async fn health() -> &'static str { + "OK" +} diff --git a/odorobo-manager/src/api/nodes.rs b/odorobo-manager/src/api/nodes.rs new file mode 100644 index 0000000..95ce6d3 --- /dev/null +++ b/odorobo-manager/src/api/nodes.rs @@ -0,0 +1,23 @@ +//! Compute node management API handlers.//! VM management API handlers. +use crate::api::types::{Node, VmId}; +use aide::axum::{ + ApiRouter, IntoApiResponse, + routing::{get, put}, +}; +use axum::{Json, extract::Path}; +pub fn router() -> ApiRouter { + ApiRouter::new() + .api_route("/drain", put(drain)) + .api_route("/{nodeid}", get(node_info)) +} +/// Drain a node of all VMs, migrating them away or shutting them down as needed. This is used for maintenance mode. +async fn drain() -> impl IntoApiResponse { + // stub + Json("Draining...".to_string()) +} + +/// Get detailed information about a specific node, including its current VMs and resource usage. +async fn node_info(Path(node_id): Path) -> impl IntoApiResponse { + // stub, + Json(Node::default()) +} diff --git a/odorobo-manager/src/api/types.rs b/odorobo-manager/src/api/types.rs new file mode 100644 index 0000000..421e96c --- /dev/null +++ b/odorobo-manager/src/api/types.rs @@ -0,0 +1,177 @@ +use aide::OperationIo; +use bytesize::ByteSize; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use ulid::Ulid; + +mod bytesize_as_u64 { + use bytesize::ByteSize; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(size: &ByteSize, s: S) -> Result { + s.serialize_u64(size.as_u64()) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result { + Ok(ByteSize(u64::deserialize(d)?)) + } +} + +mod opt_bytesize_as_u64 { + use bytesize::ByteSize; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(size: &Option, s: S) -> Result { + match size { + Some(b) => s.serialize_some(&b.as_u64()), + None => s.serialize_none(), + } + } + + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result, D::Error> { + Ok(Option::::deserialize(d)?.map(ByteSize)) + } +} + +// Newtype so aide can generate a path parameter schema for Ulid. +/// VM ID, in the format of ULID +#[derive(Deserialize, JsonSchema, OperationIo)] +pub struct VmId(#[schemars(with = "String")] pub Ulid); + +/// Volume ID, in the format of ULID +#[derive(Deserialize, JsonSchema)] +pub struct VolumeId(#[schemars(with = "String")] pub Ulid); + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct CreateVMRequest { + /// Data of the VM to create + pub data: VMData, + /// Whether to boot the VM immediately after creation + pub boot: bool, +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct VMData { + /// VM ID. This is a ULID string. + #[schemars(with = "String")] + pub id: Ulid, + /// Name of the VM. + pub name: String, + /// Number of vCPUs allocated to the VM. + pub vcpus: u32, + /// Optional maximum number of vCPUs the VM can scale up to, if supported by the underlying hypervisor. + pub max_vcpus: Option, + /// Amount of RAM in bytes allocated to the VM. + #[schemars(with = "u64")] + #[serde(with = "bytesize_as_u64")] + pub memory: ByteSize, + /// Image used for the VM. + pub image: String, + /// List of volumes to attach to the VM. + #[serde(default)] + pub volumes: Vec, +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct UpdateVMRequest { + /// Updated name of the VM. + pub name: Option, + /// Updated number of vCPUs allocated to the VM. + pub vcpus: Option, + /// Updated maximum number of vCPUs the VM can scale up to, if supported by the underlying hypervisor. + pub max_vcpus: Option, + /// Updated amount of RAM in bytes allocated to the VM. + #[schemars(with = "Option")] + #[serde(with = "opt_bytesize_as_u64")] + pub memory: Option, + /// Updated list of volumes to attach to the VM. This will replace the existing list of attached volumes. + #[serde(default)] + pub volumes: Vec, +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub enum VMStatus { + /// VM is currently running and operational. + Running, + /// VM is currently shut down, not running. + Stopped, + /// VM is being provisioned, being set up and started. + #[default] + Provisioning, + Error(String), // error message +} + +/// Detailed information about a running VM +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct VMInfo { + /// VM configuration + pub data: VMData, + + /// Currently scheduled node for the VM, + /// if any. + /// + /// None means the VM is not currently scheduled to any node + /// (e.g. VM is shut down, underlying volume still provisioning, compute unschedulable, etc.) + pub node: Option, + /// Current status of the VM + pub status: VMStatus, +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct Volume { + /// Volume ID. This is a ULID string. + #[schemars(with = "String")] + pub id: Ulid, + /// Name of the volume. + pub name: String, + /// Size of the volume in bytes. + #[schemars(with = "u64")] + #[serde(with = "bytesize_as_u64")] + pub size: ByteSize, +} + +// for now +pub type CreateVolumeRequest = Volume; + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub enum VolumeStatus { + /// Available in the pool, not yet attached to any VM + Available, + /// Volume is currently attached to a VM, + /// This may affect scheduling by preferring a node + /// where this volume is already attached to, if possible. + Attached(String), + + /// Volume is being provisioned, being carved + /// from the pool. + #[default] + Provisioning, + Error(String), // error message +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct VolumeInfo { + pub data: Volume, + pub status: VolumeStatus, +} + +/// A compute node in the cluster. This is used for scheduling VMs to nodes. +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct Node { + /// Hostname or identifier of the node. + pub hostname: String, + /// Total number of vCPUs available on the node. + pub total_vcpus: u32, + /// Total amount of RAM in bytes available on the node. + #[schemars(with = "u64")] + #[serde(with = "bytesize_as_u64")] + pub total_memory: ByteSize, + #[serde(default)] + pub status: NodeStatus, +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema, Default)] +pub struct NodeStatus { + /// CPU usage + pub cpu_usage: f32, +} diff --git a/odorobo-manager/src/api/vms.rs b/odorobo-manager/src/api/vms.rs new file mode 100644 index 0000000..ea787d7 --- /dev/null +++ b/odorobo-manager/src/api/vms.rs @@ -0,0 +1,42 @@ +//! VM management API handlers. +use crate::api::types::{CreateVMRequest, UpdateVMRequest, VMInfo, VmId}; +use aide::axum::{ + ApiRouter, IntoApiResponse, + routing::{delete, get, patch, post}, +}; +use axum::{Json, extract::Path}; + +pub fn router() -> ApiRouter { + ApiRouter::new() + .api_route("/", post(create_vm)) + .api_route("/{vmid}", get(vm_info)) + .api_route("/{vmid}", patch(update_vm)) + .api_route("/{vmid}", delete(delete_vm)) +} + +/// Get detailed information about a specific VM +async fn vm_info(Path(VmId(vmid)): Path) -> impl IntoApiResponse { + // stub, + Json(VMInfo::default()) +} + +async fn create_vm(Json(request): Json) -> impl IntoApiResponse { + // stub + Json(VMInfo::default()) +} + +async fn delete_vm(Path(VmId(vmid)): Path) -> impl IntoApiResponse { + // stub +} + +/// Update an existing VM's configuration (e.g. resize, change resources, etc.) +/// +/// todo: make new schema for update request that allows partial updates +async fn update_vm( + Path(VmId(vmid)): Path, + Json(request): Json, +) -> impl IntoApiResponse { + // stub + + Json(VMInfo::default()) +} diff --git a/odorobo-manager/src/api/volumes.rs b/odorobo-manager/src/api/volumes.rs new file mode 100644 index 0000000..121fa1e --- /dev/null +++ b/odorobo-manager/src/api/volumes.rs @@ -0,0 +1,40 @@ +//! Volume management API handlers. +use crate::api::types::{CreateVolumeRequest, Volume, VolumeId, VolumeInfo}; +use aide::axum::{ + ApiRouter, IntoApiResponse, + routing::{delete, get, patch, put}, +}; +use axum::{Json, extract::Path}; + +pub fn router() -> ApiRouter { + ApiRouter::new() + .api_route("/", put(create_volume)) + .api_route("/{volid}", get(volume_info)) + .api_route("/{volid}", delete(delete_volume)) + .api_route("/{volid}", patch(resize_volume)) +} + +/// Get detailed information about a specific volume +async fn volume_info(Path(VolumeId(volid)): Path) -> impl IntoApiResponse { + // stub, + Json(VolumeInfo::default()) +} + +/// Create a new volume with the specified parameters +async fn create_volume(Json(request): Json) -> impl IntoApiResponse { + // stub + Json(VolumeInfo::default()) +} +/// Delete an existing volume by ID +async fn delete_volume(Path(VolumeId(volid)): Path) -> impl IntoApiResponse { + // stub +} + +/// Resize an existing volume to a new size +async fn resize_volume( + Path(VolumeId(volid)): Path, + Json(request): Json, +) -> impl IntoApiResponse { + // stub + Json(VolumeInfo::default()) +} diff --git a/odorobo-manager/src/lib.rs b/odorobo-manager/src/lib.rs new file mode 100644 index 0000000..9353e6b --- /dev/null +++ b/odorobo-manager/src/lib.rs @@ -0,0 +1,3 @@ +pub mod api; +pub mod scheduler_actor; +mod actors; \ No newline at end of file diff --git a/odorobo-manager/src/main.rs b/odorobo-manager/src/main.rs new file mode 100644 index 0000000..8b504c3 --- /dev/null +++ b/odorobo-manager/src/main.rs @@ -0,0 +1,45 @@ +use clap::Parser; +use kameo::prelude::*; +use odorobo_manager::scheduler_actor::SchedulerActor; +use odorobo_shared::connect_to_swarm; +use serde::{Deserialize, Serialize}; +use stable_eyre::Result; +use tracing::info; + + +// A config we definitely need: what is the router ip. +// TODO: talk to katherine about exactly how they want this config, since they may or may not be doing some of this, and we dont know if they want a .json for this or .env or something else. +#[derive(Serialize, Deserialize, Debug, Parser)] +struct Config { + /// Comma-separated list of actors to enable. + #[clap( + env = "ODOROBO_ACTORS", + default_value = "api,scheduler", + value_delimiter = ',' + )] + enabled_actors: Vec, +} + +// #[derive(Serialize, Deserialize, Debug)] +// struct EnabledActorsConfig { +// /// http should probably almost always be true, but its a config just in case. +// http: bool, +// /// scheduler +// scheduler: bool, +// } + +#[tokio::main] +async fn main() -> Result<()> { + let _local_peer_id = connect_to_swarm()?; + + odorobo_shared::utils::init()?; + dotenvy::dotenv()?; + info!("Starting odorobo-manager"); + + let actor_ref = SchedulerActor::spawn(SchedulerActor {}); + actor_ref.register("scheduler").await?; + + actor_ref.wait_for_shutdown().await; + + Ok(()) +} diff --git a/odorobo-manager/src/scheduler_actor.rs b/odorobo-manager/src/scheduler_actor.rs new file mode 100644 index 0000000..3cc7ac4 --- /dev/null +++ b/odorobo-manager/src/scheduler_actor.rs @@ -0,0 +1,75 @@ +use kameo::prelude::*; +use tracing::{error, info, warn}; +//use odorobo_shared::odorobo::server_actor::ServerActor; +use odorobo_agent::actor::AgentActor; +use odorobo_shared::messages::create_vm::*; +use odorobo_shared::messages::debug::PanicAgent; +use stable_eyre::{Report, Result}; + +#[derive(RemoteActor)] +pub struct SchedulerActor {} + +const PING_RETURN_VALUE: &str = "pong"; +const EXTERNAL_HTTP_ADDRESS: &str = "0.0.0.0:3000"; + +const EXTERNAL_HTTP_URL: &str = "http://localhost:3000"; // TODO: make this based on EXTERNAL_HTTP_ADDRESS. const compile time stuff is a pain. + +impl Actor for SchedulerActor { + type Args = Self; + type Error = Report; + + async fn on_start(state: Self::Args, actor_ref: ActorRef) -> Result { + let peer_id = actor_ref.id().peer_id().unwrap().clone(); + + info!("Actor started! Scheduler peer id: {peer_id}"); + + let mut agent_actor: Option> = None; + + loop { + let agent_actor_option = RemoteActorRef::::lookup("agent").await?; + + let Some(agent_actor_in_loop) = agent_actor_option else { + continue; + }; + + agent_actor = Some(agent_actor_in_loop); + break; + } + + let agent_actor = agent_actor.unwrap(); + + let agent_actor_peer_id = agent_actor.id().peer_id().unwrap().clone(); + + info!("Agent actor peer id: {agent_actor_peer_id}"); + + // run the HTTP API + tokio::spawn(async move { + tracing::info!("Starting HTTP server on {EXTERNAL_HTTP_URL}"); + let listener = tokio::net::TcpListener::bind(EXTERNAL_HTTP_ADDRESS) + .await + .unwrap(); + axum::serve(listener, crate::api::build()).await.unwrap(); + }); + + + + // let reply = agent_actor + // .ask(&CreateVM { + // vm_id: Default::default(), + // config: Default::default(), + // }) + // .await?; + + // info!(?reply, "Created VM Reply"); + + // tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + // warn!("Panicking Agent"); + + // agent_actor.tell(&PanicAgent).send()?; + + // error!("Agent has been panicked."); + + Ok(state) + } +} diff --git a/odorobo-manager/src/scheduler_actor_http.rs b/odorobo-manager/src/scheduler_actor_http.rs new file mode 100644 index 0000000..03d01c8 --- /dev/null +++ b/odorobo-manager/src/scheduler_actor_http.rs @@ -0,0 +1,310 @@ +use axum::extract::State; +use axum::http::StatusCode; +use axum::routing::{get, post}; +use axum::{Json, Router}; +use kameo::prelude::*; +use libp2p::PeerId; +use libp2p::futures::TryStreamExt; +use odorobo_shared::messages::server_status::{GetServerStatus}; +use serde::{Deserialize, Serialize}; +use utoipa::OpenApi; +use uuid::Uuid; +//use odorobo_shared::odorobo::server_actor::ServerActor; +use odorobo_agent::actor::AgentActor; +use odorobo_shared::messages::create_vm::*; +use stable_eyre::{Report, Result}; + +#[derive(RemoteActor)] +pub struct SchedulerActor {} + +const PING_RETURN_VALUE: &str = "pong"; +const EXTERNAL_HTTP_ADDRESS: &str = "0.0.0.0:3000"; + +const EXTERNAL_HTTP_URL: &str = "http://localhost:3000"; // TODO: make this based on EXTERNAL_HTTP_ADDRESS. const compile time stuff is a pain. + +impl Actor for SchedulerActor { + type Args = Self; + type Error = Report; + + async fn on_start(state: Self::Args, actor_ref: ActorRef) -> Result { + on_start_debug(state, actor_ref).await + + //on_start_normal(state, actor_ref).await + + //Ok(state) + } +} + +async fn on_start_debug( + state: SchedulerActor, + actor_ref: ActorRef, +) -> Result { + let peer_id = actor_ref.id().peer_id().unwrap().clone(); + + println!("Actor started! Scheduler peer id:{peer_id}"); + + let agent_actor = RemoteActorRef::::lookup("agent") + .await? + .unwrap(); + + let agent_actor_peer_id = agent_actor.id().peer_id().unwrap().clone(); + + println!("Agent actor peer id: {agent_actor_peer_id}"); + + agent_actor.ask(&CreateVM { + vm_id: Default::default(), + config: Default::default(), + }); + + Ok(state) +} + +async fn on_start_normal( + state: SchedulerActor, + actor_ref: ActorRef, +) -> Result { + let axum_router = Router::new() + .route("/ping", get(|| async { PING_RETURN_VALUE })) + .route("/create_vm", post(create_vm)) + .route("/delete_vm", post(delete_vm)) + .route("/update_vm", post(update_vm)) + .route("/get_vm", post(get_vm)) + .route("/create_volume", post(create_vm)) + .route("/delete_volume", post(delete_vm)) + .route("/update_volume", post(update_vm)) + .route("/get_volume", post(get_vm)) + .route("/drain_server", post(drain_server)) + // .route("/get_servers", post(get_servers)) + .with_state(actor_ref); + + println!("starting axum server at {}", EXTERNAL_HTTP_URL); + + // run our app with hyper, listening globally on port 3000 + tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind(EXTERNAL_HTTP_ADDRESS) + .await + .unwrap(); + axum::serve(listener, axum_router).await.unwrap(); + }); + + // spin loop until the axum server starts responding to requests + // TODO: if anyone has a better way to detect the axum server is up, change it to that. + + let mut count = 0; + loop { + count += 1; + println!("attempting to hit axum server, attempt {}", count); + + let resp_result: Result<()> = async { + let resp = reqwest::get(EXTERNAL_HTTP_URL.to_owned() + "/ping") + .await? + .text() + .await?; + + if resp != PING_RETURN_VALUE { + return Err(stable_eyre::eyre::eyre!("invalid ping response")); + } + + Ok(()) + } + .await; + + match resp_result { + Ok(()) => { + break; + } + Err(e) => { + println!("{}", e) + } + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + println!("Actor started"); + Ok(state) +} +/* +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct CreateVM { + pub uuid: Uuid, + pub name: String, + pub vcpus: u32, + pub ram: u32, + pub image: String, +} +*/ + +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct GenericSuccessResponse { + pub success: bool, +} + +// no response. just use status code 200 vs not 200 for if it worked. +#[utoipa::path( + post, + path = "/create_vm", + request_body(content = CreateVM, content_type = "application/json"), + responses( + (status = 200, body = GenericSuccessResponse) + ) +)] +pub async fn create_vm( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, String) { + todo!() +} + +pub type UpdateVM = CreateVM; + +#[utoipa::path( + post, + path = "/update_vm", + request_body(content = UpdateVM, content_type = "application/json"), + responses( + (status = 200, body = GenericSuccessResponse) + ) +)] +async fn update_vm( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, String) { + todo!() +} + +pub type DeleteVM = GetVM; +#[utoipa::path( + post, + path = "/delete_vm", + request_body(content = DeleteVM, content_type = "application/json"), + responses( + (status = 200, body = GenericSuccessResponse) + ) +)] +async fn delete_vm( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, String) { + todo!() +} + +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct GetVM { + pub uuid: Uuid, +} + +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct GetVMResponse { + pub cpus: bool, +} +#[utoipa::path( + post, + path = "/get_vm", + request_body(content = GetVM, content_type = "application/json"), + responses( + (status = 200, body = GetVMResponse) + ) +)] +async fn get_vm( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, String) { + todo!() +} + +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct DrainServer {} + +#[utoipa::path( + post, + path = "/get_vm", + request_body(content = DrainServer, content_type = "application/json"), + responses( + (status = 200, body = GenericSuccessResponse) + ) +)] +async fn drain_server( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, String) { + todo!() +} + +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct GetServers { + pub start_index: u64, + pub end_index: u64, +} + +// this is a debug thing to make utoipa happy because we havent remove it. +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct ServerStatus {} + +#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] +pub struct GetServersResponse { + pub total_servers: u64, + pub servers: Vec, +} +/* +// this again is commented out because it isnt not fully working and we need this for debug reasons to test panic stuff. +#[utoipa::path( + post, + path = "/get_servers", + request_body(content = GetServers, content_type = "application/json"), + responses( + (status = 200, body = GetServersResponse) + ) +)] +async fn get_servers( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, String) { + let mut servers: Vec<(PeerId, ServerStatus)> = Vec::new(); + + let server_actor_response: Result<()> = async { + println!("getting server actors"); + + let mut server_actors = RemoteActorRef::::lookup_all("agent"); + + while let Some(server_actor) = server_actors.try_next().await? { + // Send message to each instance + println!("asking {:?}", server_actor); + let result = server_actor.ask(&GetServerStatus {}).await?; + println!("result {:?}", result); + + if let Some(peerId) = server_actor.id().peer_id() { + servers.push((*peerId, result)); + } + } + + Ok(()) + } + .await; + + match server_actor_response { + Ok(()) => (StatusCode::OK, serde_json::to_string(&servers).unwrap()), + _ => (StatusCode::INTERNAL_SERVER_ERROR, "".parse().unwrap()), + } +} +*/ +pub fn gen_openapi_spec() -> String { + #[derive(OpenApi)] + #[openapi( + components(schemas( + CreateVM, + UpdateVM, + DeleteVM, + GetVM, + DrainServer, + // GetServers, + GenericSuccessResponse, + GetVMResponse, + GetServersResponse + )), + // paths(get_servers) + )] + struct ApiDoc; + + ApiDoc::openapi().to_pretty_json().unwrap() +} diff --git a/odorobo-scheduler/Cargo.toml b/odorobo-scheduler/Cargo.toml deleted file mode 100644 index 30dcff9..0000000 --- a/odorobo-scheduler/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "odorobo-scheduler" -version = "0.1.0" -edition = "2024" - -[[bin]] -name = "odorobo-scheduler" - -[[bin]] -name = "gen_openapi_spec" - -[dependencies] -kameo = { version = "0.19.2", features = ["remote"] } -axum = { version = "0.8.8", features = ["tracing", "macros"] } -env = "1.0.1" -serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.149" -tokio = { version = "1.50.0", features = ["full"] } # TODO (june): tracing is only in tokio unstable? NOTE (caleb): I think that just refers to tokio logging tracing events, we can still make our own and look at other crate's events if necessary. -tower-http = { version = "0.6.1", features = ["trace"] } -tracing = "0.1.44" -optional_struct = "0.5.2" -tracing-subscriber = { version = "0.3.2", features = ["env-filter"] } -uuid = { version = "1.23.0", features = ["serde"]} -libp2p = { version = "0.56.0", features = ["yamux", "serde", "mdns"] } -reqwest = { version = "0.13", features = ["json"] } -ahash = { version = "0.8.12", features = ["serde"] } -utoipa = { version = "5.4.0", features=["uuid"] } -odorobo-shared = { path = "../odorobo-shared" } \ No newline at end of file diff --git a/odorobo-scheduler/src/bin/gen_openapi_spec.rs b/odorobo-scheduler/src/bin/gen_openapi_spec.rs deleted file mode 100644 index eb5f75d..0000000 --- a/odorobo-scheduler/src/bin/gen_openapi_spec.rs +++ /dev/null @@ -1,7 +0,0 @@ -use std::fs; -use odorobo_scheduler::scheduler_actor::gen_openapi_spec; - -fn main() -> std::io::Result<()> { - let doc = gen_openapi_spec(); - fs::write("./openapi_spec.json", doc) -} \ No newline at end of file diff --git a/odorobo-scheduler/src/bin/odorobo-scheduler.rs b/odorobo-scheduler/src/bin/odorobo-scheduler.rs deleted file mode 100644 index ad0b614..0000000 --- a/odorobo-scheduler/src/bin/odorobo-scheduler.rs +++ /dev/null @@ -1,16 +0,0 @@ -use kameo::prelude::*; -use odorobo_shared::utils::DynError; -use odorobo_scheduler::scheduler_actor::SchedulerActor; -use odorobo_shared::connect_to_swarm; - -#[tokio::main] -async fn main() -> Result<(), DynError> { - let _local_peer_id = connect_to_swarm()?; - - let actor_ref = SchedulerActor::spawn(SchedulerActor {}); - actor_ref.register("scheduler").await?; - - actor_ref.wait_for_shutdown().await; - - Ok(()) -} \ No newline at end of file diff --git a/odorobo-scheduler/src/lib.rs b/odorobo-scheduler/src/lib.rs deleted file mode 100644 index e887c65..0000000 --- a/odorobo-scheduler/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ - -pub mod scheduler_actor; \ No newline at end of file diff --git a/odorobo-scheduler/src/scheduler_actor.rs b/odorobo-scheduler/src/scheduler_actor.rs deleted file mode 100644 index 5eb85bb..0000000 --- a/odorobo-scheduler/src/scheduler_actor.rs +++ /dev/null @@ -1,246 +0,0 @@ -use axum::{Json, Router}; -use axum::extract::State; -use axum::http::StatusCode; -use axum::routing::{get, post}; -use kameo::prelude::*; -use libp2p::futures::TryStreamExt; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; -use libp2p::PeerId; -use utoipa::OpenApi; -use odorobo_shared::kameo_messages::{ServerStatus, GetServerStatus}; -//use odorobo_shared::odorobo::server_actor::ServerActor; -use odorobo_shared::utils::DynError; - -#[derive(RemoteActor)] -pub struct SchedulerActor { } - -const PING_RETURN_VALUE: &str = "pong"; -const EXTERNAL_HTTP_ADDRESS: &str = "0.0.0.0:3000"; - -const EXTERNAL_HTTP_URL: &str = "http://localhost:3000"; // TODO: make this based on EXTERNAL_HTTP_ADDRESS. const compile time stuff is a pain. - - -impl Actor for SchedulerActor { - type Args = Self; - type Error = DynError; - - async fn on_start(state: Self::Args, actor_ref: ActorRef) -> Result { - let axum_router = Router::new() - .route("/ping", get(|| async { PING_RETURN_VALUE })) - .route("/create_vm", post(create_vm)) - .route("/delete_vm", post(delete_vm)) - .route("/update_vm", post(update_vm)) - .route("/get_vm", post(get_vm)) - .route("/drain_server", post(drain_server)) - .route("/get_servers", post(get_servers)) - .with_state(actor_ref); - - - println!("starting axum server at {}", EXTERNAL_HTTP_URL); - - // run our app with hyper, listening globally on port 3000 - tokio::spawn(async move { - let listener = tokio::net::TcpListener::bind(EXTERNAL_HTTP_ADDRESS).await.unwrap(); - axum::serve(listener, axum_router).await.unwrap(); - }); - - // spin loop until the axum server starts responding to requests - // TODO: if anyone has a better way to detect the axum server is up, change it to that. - - let mut count = 0; - loop { - count += 1; - println!("attempting to hit axum server, attempt {}", count); - - let resp_result: Result<(), DynError> = async { - let resp = reqwest::get(EXTERNAL_HTTP_URL.to_owned() + "/ping") - .await? - .text() - .await?; - - if resp != PING_RETURN_VALUE { - return Err("invalid ping response".into()); - } - - Ok(()) - }.await; - - match resp_result { - Ok(()) => { - break; - }, - Err(e) => { - println!("{}", e) - } - } - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - println!("Actor started"); - Ok(state) - } -} - -#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] -pub struct CreateVM { - pub uuid: Uuid, - pub name: String, - pub vcpus: u32, - pub ram: u32, - pub image: String, -} - -#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] -pub struct GenericSuccessResponse { - pub success: bool, -} - -// no response. just use status code 200 vs not 200 for if it worked. -#[utoipa::path( - post, - path = "/create_vm", - request_body(content = CreateVM, content_type = "application/json"), - responses( - (status = 200, body = GenericSuccessResponse) - ) -)] -pub async fn create_vm(State(state): State>, Json(payload): Json) -> (StatusCode, String) { - todo!() -} - -pub type UpdateVM = CreateVM; - -#[utoipa::path( - post, - path = "/update_vm", - request_body(content = UpdateVM, content_type = "application/json"), - responses( - (status = 200, body = GenericSuccessResponse) - ) -)] -async fn update_vm(State(state): State>, Json(payload): Json) -> (StatusCode, String) { - todo!() -} - - -pub type DeleteVM = GetVM; -#[utoipa::path( - post, - path = "/delete_vm", - request_body(content = DeleteVM, content_type = "application/json"), - responses( - (status = 200, body = GenericSuccessResponse) - ) -)] -async fn delete_vm(State(state): State>, Json(payload): Json) -> (StatusCode, String) { - todo!() -} - - -#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] -pub struct GetVM { - pub uuid: Uuid, -} - -#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] -pub struct GetVMResponse { - pub cpus: bool -} -#[utoipa::path( - post, - path = "/get_vm", - request_body(content = GetVM, content_type = "application/json"), - responses( - (status = 200, body = GetVMResponse) - ) -)] -async fn get_vm(State(state): State>, Json(payload): Json) -> (StatusCode, String) { - todo!() -} - -#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] -pub struct DrainServer {} - -#[utoipa::path( - post, - path = "/get_vm", - request_body(content = DrainServer, content_type = "application/json"), - responses( - (status = 200, body = GenericSuccessResponse) - ) -)] -async fn drain_server(State(state): State>, Json(payload): Json) -> (StatusCode, String) { - todo!() -} - -#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] -pub struct GetServers { - pub start_index: u64, - pub end_index: u64 -} - -#[derive(Serialize, Deserialize, Debug, utoipa::ToSchema)] -pub struct GetServersResponse { - pub total_servers: u64, - pub servers: Vec, -} - - -#[utoipa::path( - post, - path = "/get_servers", - request_body(content = GetServers, content_type = "application/json"), - responses( - (status = 200, body = GetServersResponse) - ) -)] -async fn get_servers(State(state): State>, Json(payload): Json) -> (StatusCode, String) { - /* - let mut servers: Vec<(PeerId, ServerStatus)> = Vec::new(); - - let server_actor_response: Result<(), DynError> = async { - println!("getting server actors"); - - let mut server_actors = RemoteActorRef::::lookup_all("server"); - - while let Some(server_actor) = server_actors.try_next().await? { - // Send message to each instance - println!("asking {:?}", server_actor); - let result = server_actor.ask(&GetServerStatus { }).await?; - println!("result {:?}", result); - - if let Some(peerId) = server_actor.id().peer_id() { - servers.push((*peerId, result)); - } - } - - Ok(()) - }.await; - - match server_actor_response { - Ok(()) => { - (StatusCode::OK, serde_json::to_string(&servers).unwrap()) - }, - _ => { - (StatusCode::INTERNAL_SERVER_ERROR, "".parse().unwrap()) - } - } - - */ - todo!() -} - -pub fn gen_openapi_spec() -> String { - #[derive(OpenApi)] - #[openapi( - components( - schemas(CreateVM, UpdateVM, DeleteVM, GetVM, DrainServer, GetServers, GenericSuccessResponse, GetVMResponse, GetServersResponse) - ), - paths(get_servers) - )] - struct ApiDoc; - - ApiDoc::openapi().to_pretty_json().unwrap() -} \ No newline at end of file diff --git a/odorobo-shared/Cargo.toml b/odorobo-shared/Cargo.toml index a0672b8..080a725 100644 --- a/odorobo-shared/Cargo.toml +++ b/odorobo-shared/Cargo.toml @@ -5,8 +5,14 @@ edition = "2024" license = "AGPL-3.0-or-later" [dependencies] -kameo = { version = "0.19.2", features = ["remote"] } -tokio = { version = "1.50.0", features = ["full"] } -libp2p = { version = "0.56.0", features = ["yamux", "serde", "mdns"] } -serde = { version = "1.0.228", features = ["derive"] } +kameo = { workspace = true, features = ["remote"] } +tokio = { workspace = true, features = ["full"] } +libp2p = { workspace = true, features = ["yamux", "serde", "mdns", "kad"] } +serde = { workspace = true, features = ["derive"] } utoipa = { version = "5.4.0", features=["uuid"] } +stable-eyre = { workspace = true } +thiserror = { workspace = true } +ulid = { workspace = true } +cloud-hypervisor-client = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing = { workspace = true } \ No newline at end of file diff --git a/odorobo-shared/src/error.rs b/odorobo-shared/src/error.rs new file mode 100644 index 0000000..890a1c4 --- /dev/null +++ b/odorobo-shared/src/error.rs @@ -0,0 +1,8 @@ + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum OdoroboError { + #[error("")] + Foobar +} diff --git a/odorobo-shared/src/lib.rs b/odorobo-shared/src/lib.rs index af9f1c4..861e234 100644 --- a/odorobo-shared/src/lib.rs +++ b/odorobo-shared/src/lib.rs @@ -1,11 +1,15 @@ +pub mod error; +pub mod messages; pub mod utils; -pub mod kameo_messages; - use kameo::prelude::*; -use libp2p::{mdns, noise, tcp, yamux, PeerId}; +use libp2p::bytes::BufMut; use libp2p::futures::StreamExt; +use libp2p::kad::Record; use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; -use crate::utils::DynError; +use libp2p::{PeerId, kad, mdns, noise, tcp, yamux}; +use stable_eyre::Result; +use std::cell::RefCell; +use tracing::{debug, info, warn}; #[derive(NetworkBehaviour)] pub struct ProductionBehaviour { @@ -16,18 +20,18 @@ pub struct ProductionBehaviour { // based on: // https://github.com/tqwewe/kameo/blob/main/examples/custom_swarm.rs // https://docs.page/tqwewe/kameo/distributed-actors/custom-swarm-configuration -pub fn connect_to_swarm() -> Result { +pub fn connect_to_swarm() -> Result { let mut swarm = libp2p::SwarmBuilder::with_new_identity() .with_tokio() - .with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)? + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + )? .with_behaviour(|key| { let local_peer_id = key.public().to_peer_id(); - let kameo = remote::Behaviour::new( - local_peer_id, - remote::messaging::Config::default(), - ); - + let kameo = remote::Behaviour::new(local_peer_id, remote::messaging::Config::default()); let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), local_peer_id)?; Ok(ProductionBehaviour { kameo, mdns }) })? @@ -41,45 +45,49 @@ pub fn connect_to_swarm() -> Result { let local_peer_id = *swarm.local_peer_id(); - println!("Local peer id: {:?}", local_peer_id); + info!("Local peer id: {:?}", local_peer_id); // Spawn the swarm task tokio::spawn(async move { loop { match swarm.select_next_some().await { // Handle mDNS discovery - SwarmEvent::Behaviour(ProductionBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { + SwarmEvent::Behaviour(ProductionBehaviourEvent::Mdns(mdns::Event::Discovered( + list, + ))) => { for (peer_id, multiaddr) in list { - println!("mDNS discovered peer: {peer_id}"); + info!("mDNS discovered peer: {peer_id}"); swarm.add_peer_address(peer_id, multiaddr); } } - SwarmEvent::Behaviour(ProductionBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { + SwarmEvent::Behaviour(ProductionBehaviourEvent::Mdns(mdns::Event::Expired( + list, + ))) => { for (peer_id, _) in list { - println!("mDNS peer expired: {peer_id}"); + warn!("mDNS peer expired: {peer_id}"); let _ = swarm.disconnect_peer_id(peer_id); } } // Handle Kameo events (optional - for monitoring) - SwarmEvent::Behaviour(ProductionBehaviourEvent::Kameo(remote::Event::Registry( - registry_event, - ))) => { - println!("Registry event: {:?}", registry_event); + SwarmEvent::Behaviour(ProductionBehaviourEvent::Kameo( + remote::Event::Registry(registry_event), + )) => { + debug!(?registry_event, "Registry event"); } - SwarmEvent::Behaviour(ProductionBehaviourEvent::Kameo(remote::Event::Messaging( - messaging_event, - ))) => { - println!("Messaging event: {:?}", messaging_event); + SwarmEvent::Behaviour(ProductionBehaviourEvent::Kameo( + remote::Event::Messaging(messaging_event), + )) => { + debug!(?messaging_event, "Messaging event"); } // Handle other swarm events SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening on {address}"); + info!(?address, "Listening"); } SwarmEvent::ConnectionEstablished { peer_id, .. } => { - println!("Connected to {peer_id}"); + info!("Connected to {peer_id}"); } SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { - println!("Disconnected from {peer_id}: {cause:?}"); + info!("Disconnected from {peer_id}: {cause:?}"); } _ => {} } @@ -87,4 +95,4 @@ pub fn connect_to_swarm() -> Result { }); Ok(local_peer_id) -} \ No newline at end of file +} diff --git a/odorobo-shared/src/messages/create_vm.rs b/odorobo-shared/src/messages/create_vm.rs new file mode 100644 index 0000000..adab0ed --- /dev/null +++ b/odorobo-shared/src/messages/create_vm.rs @@ -0,0 +1,30 @@ +use cloud_hypervisor_client::models::VmConfig; +use kameo::Reply; +use kameo::prelude::*; +use serde::{Deserialize, Serialize}; +use ulid::Ulid; + +// TODO: when scheduler does createVM it also stores which server we put the Ulid on so it can do a in memory cache and doesn't need to hit the Server +// for failover, the new node when it fails over will need to rebuild this cache via hitting a GetAllVMs message on every server +// additionally, when the VmConfig is created, this determines the MAC address of the server. meaning as soon as we have this info, we need to hit the router via the scheduler, because the router might be slow. +/// Message to create a new VM +/// +/// VmConfig is a Cloud Hypervisor VM spec, containing the VM's full configuration (untransformed by odorobo) + +#[derive(Serialize, Deserialize, Debug)] +pub struct CreateVM { + /// the ULID of the VM to create + pub vm_id: Ulid, + /// VmConfig in message, untransformed. + /// + /// Transformer API will transform this VmConfig into proper + /// node-specific, paths, i.e attach LUNs, networking? + /// + /// this data would go to state::instance::spawn() + pub config: VmConfig, +} + +#[derive(Serialize, Deserialize, Reply, Debug)] +pub struct CreateVMReply { + pub config: Option, +} diff --git a/odorobo-shared/src/messages/debug.rs b/odorobo-shared/src/messages/debug.rs new file mode 100644 index 0000000..fb54cde --- /dev/null +++ b/odorobo-shared/src/messages/debug.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; + +/// Forcibly panics the agent +/// +/// Used for debugging purposes. +/// +/// This should not be used in production. +#[derive(Serialize, Deserialize, Debug)] +pub struct PanicAgent; diff --git a/odorobo-shared/src/messages/ip_management.rs b/odorobo-shared/src/messages/ip_management.rs new file mode 100644 index 0000000..e69de29 diff --git a/odorobo-shared/src/messages/mod.rs b/odorobo-shared/src/messages/mod.rs new file mode 100644 index 0000000..da6892a --- /dev/null +++ b/odorobo-shared/src/messages/mod.rs @@ -0,0 +1,3 @@ +pub mod create_vm; +pub mod debug; +pub mod server_status; diff --git a/odorobo-shared/src/kameo_messages.rs b/odorobo-shared/src/messages/server_status.rs similarity index 50% rename from odorobo-shared/src/kameo_messages.rs rename to odorobo-shared/src/messages/server_status.rs index 0e704bc..fa61286 100644 --- a/odorobo-shared/src/kameo_messages.rs +++ b/odorobo-shared/src/messages/server_status.rs @@ -1,12 +1,10 @@ -use std::fmt::{Display, Formatter}; -use kameo::Reply; -use libp2p::PeerId; use serde::{Deserialize, Serialize}; +use kameo::Reply; #[derive(Serialize, Deserialize)] -pub struct GetServerStatus {} +pub struct GetServerStatus; -#[derive(Serialize, Deserialize, Reply, Debug, utoipa::ToSchema)] +#[derive(Serialize, Deserialize, Reply, Debug)] pub struct ServerStatus { pub vcpus: u32, pub ram: u32 diff --git a/odorobo-shared/src/utils.rs b/odorobo-shared/src/utils.rs index 96a7fe3..1044c88 100644 --- a/odorobo-shared/src/utils.rs +++ b/odorobo-shared/src/utils.rs @@ -1 +1,27 @@ -pub type DynError = Box; \ No newline at end of file +use stable_eyre::Result; +// use tracing:: +use tracing_subscriber::{EnvFilter, filter::LevelFilter, fmt, prelude::*}; + +pub fn env_filter() -> EnvFilter { + let env = std::env::var("ODOROBO_LOG").unwrap_or_else(|_| "".into()); + + let base = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .parse_lossy(&env); + + #[cfg(debug_assertions)] + let base = base.add_directive("odorobo*=trace".parse().unwrap()); + + base +} + +pub fn init() -> Result<()> { + stable_eyre::install()?; + tracing_subscriber::fmt() + .with_env_filter(env_filter()) + .with_file(true) + .with_line_number(true) + .init(); + + Ok(()) +}