From b0e2d9a77155e78502682276ce38d1278b45f4a7 Mon Sep 17 00:00:00 2001 From: hyperion Date: Fri, 30 Oct 2020 06:18:53 +0100 Subject: Initial commit --- brook-metrics/src/main.rs | 152 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 brook-metrics/src/main.rs (limited to 'brook-metrics/src/main.rs') diff --git a/brook-metrics/src/main.rs b/brook-metrics/src/main.rs new file mode 100644 index 0000000..649ddd9 --- /dev/null +++ b/brook-metrics/src/main.rs @@ -0,0 +1,152 @@ +use chrono::Utc; +use hyper::{ + service::{make_service_fn, service_fn}, + Body, Method, Request, Response, Server, StatusCode, +}; +use lazy_static::lazy_static; +use ratman_identity::Identity; +use std::{ + collections::BTreeMap, + convert::Infallible, + env, + fs::OpenOptions, + io::Write, + net::SocketAddr, + path::PathBuf, + sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, + }, + thread, + time::Duration, +}; + +lazy_static! { + static ref STATE: Mutex> = Mutex::new(BTreeMap::new()); +} + +/// An entry of metrics data +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +struct MetricEntry {} + +fn dump_metrics() { + let p = env::var("BROOK_METRICS_PATH").unwrap_or("metrics.csv".into()); + let path = PathBuf::new().join(p); + + loop { + let time = Utc::now(); + let mut map = STATE.lock().unwrap(); + let metrics: Vec<()> = map.iter().map(|_| ()).collect(); + map.clear(); + drop(map); + + let line = format!( + "{},{}\n", + time.format("%Y-%m-%d_%H:%M:%S UTC"), + metrics.len(), + ); + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(&path) + .unwrap(); + + file.write_all(line.as_bytes()).unwrap(); + + // Wait for a while before doing it again + thread::sleep(Duration::from_secs(15)); + } +} + +fn get() -> Result, Infallible> { + let mut resp = Response::new(Body::empty()); + *resp.body_mut() = Body::from( + " + +brook-metrics + +

+Your brook-metrics server is now reachable! \ +Point your brook clients towards $ADDR/report +

+ +", + ); + + Ok(resp) +} + +async fn post(req: Request) -> Result, Infallible> { + let full_body = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let body_str = String::from_utf8(full_body.to_vec()).unwrap(); + + let parsed = json::parse(&body_str).unwrap(); + + // Either get the ID or create a new one + let id = match parsed { + json::JsonValue::Object(mut obj) => obj.remove("id"), + _ => unreachable!(), + } + .map(|jv| match jv { + json::JsonValue::String(ref s) => Identity::from_string(s), + _ => unreachable!(), + }) + .map(|id| { + eprintln!("[DEBUG] Returning client {}", id); + id + }) + .unwrap_or_else(|| { + eprintln!("[DEBUG] Adding new stream client!"); + Identity::random() + }); + + STATE.lock().unwrap().insert(id, MetricEntry {}); + + Ok(Response::builder() + .header("Content-Type", "text/json") + .status(StatusCode::OK) + .body(Body::from(format!("{{ \"id\": \"{}\" }}", id.to_string()))) + .unwrap()) +} + +fn get_current() -> Result, Infallible> { + let len = STATE.lock().unwrap().len(); + Ok(Response::builder() + .header("Content-Type", "text/json") + .status(StatusCode::OK) + .body(Body::from(format!("{{ \"num\": {} }}", len))) + .unwrap()) +} + +async fn handle_request(req: Request) -> Result, Infallible> { + match (req.method(), req.uri().path()) { + // TODO: make this prefix configurable + (&Method::GET, "/metrics") => get(), + (&Method::GET, "/metrics/current") => get_current(), + (&Method::POST, "/metrics/update") => post(req).await, + _ => { + let resp = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Nothing here, I'm afraid!")) + .unwrap(); + + Ok(resp) + } + } +} + +#[tokio::main] +async fn main() { + thread::spawn(|| dump_metrics()); + + let addr = SocketAddr::from(([0, 0, 0, 0], 7667)); + let service = make_service_fn(|_c| async { Ok::<_, Infallible>(service_fn(handle_request)) }); + + eprintln!("Bindng address: 0.0.0.0:7667"); + let server = Server::bind(&addr).serve(service); + + if let Err(e) = server.await { + eprintln!("[ERROR]: {}", e); + } +} -- cgit v1.2.3