use chrono::Utc; use hyper::{ service::{make_service_fn, service_fn}, Body, Method, Request, Response, Server, StatusCode, }; use lazy_static::lazy_static; use ratman_identity::Identity; use std::{ collections::BTreeMap, convert::Infallible, env, fs::OpenOptions, io::Write, net::SocketAddr, path::PathBuf, sync::{ atomic::{AtomicUsize, Ordering}, Mutex, }, thread, time::Duration, }; lazy_static! { static ref STATE: Mutex> = Mutex::new(BTreeMap::new()); } /// An entry of metrics data #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] struct MetricEntry {} fn dump_metrics() { let p = env::var("BROOK_METRICS_PATH").unwrap_or("metrics.csv".into()); let path = PathBuf::new().join(p); loop { let time = Utc::now(); let mut map = STATE.lock().unwrap(); let metrics: Vec<()> = map.iter().map(|_| ()).collect(); map.clear(); drop(map); let line = format!( "{},{}\n", time.format("%Y-%m-%d_%H:%M:%S UTC"), metrics.len(), ); let mut file = OpenOptions::new() .write(true) .append(true) .create(true) .open(&path) .unwrap(); file.write_all(line.as_bytes()).unwrap(); // Wait for a while before doing it again thread::sleep(Duration::from_secs(15)); } } fn get() -> Result, Infallible> { let mut resp = Response::new(Body::empty()); *resp.body_mut() = Body::from( " brook-metrics

Your brook-metrics server is now reachable! \ Point your brook clients towards $ADDR/report

", ); Ok(resp) } async fn post(req: Request) -> Result, Infallible> { let full_body = hyper::body::to_bytes(req.into_body()).await.unwrap(); let body_str = String::from_utf8(full_body.to_vec()).unwrap(); let parsed = json::parse(&body_str).unwrap(); // Either get the ID or create a new one let id = match parsed { json::JsonValue::Object(mut obj) => obj.remove("id"), _ => unreachable!(), } .map(|jv| match jv { json::JsonValue::String(ref s) => Identity::from_string(s), _ => unreachable!(), }) .map(|id| { eprintln!("[DEBUG] Returning client {}", id); id }) .unwrap_or_else(|| { eprintln!("[DEBUG] Adding new stream client!"); Identity::random() }); STATE.lock().unwrap().insert(id, MetricEntry {}); Ok(Response::builder() .header("Content-Type", "text/json") .status(StatusCode::OK) .body(Body::from(format!("{{ \"id\": \"{}\" }}", id.to_string()))) .unwrap()) } fn get_current() -> Result, Infallible> { let len = STATE.lock().unwrap().len(); Ok(Response::builder() .header("Content-Type", "text/json") .status(StatusCode::OK) .body(Body::from(format!("{{ \"num\": {} }}", len))) .unwrap()) } async fn handle_request(req: Request) -> Result, Infallible> { match (req.method(), req.uri().path()) { // TODO: make this prefix configurable (&Method::GET, "/metrics") => get(), (&Method::GET, "/metrics/current") => get_current(), (&Method::POST, "/metrics/update") => post(req).await, _ => { let resp = Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from("Nothing here, I'm afraid!")) .unwrap(); Ok(resp) } } } #[tokio::main] async fn main() { eprintln!("ID Length: {}", ratman_identity::ID_LEN); thread::spawn(|| dump_metrics()); let addr = SocketAddr::from(([0, 0, 0, 0], 7667)); let service = make_service_fn(|_c| async { Ok::<_, Infallible>(service_fn(handle_request)) }); eprintln!("Bindng address: 0.0.0.0:7667"); let server = Server::bind(&addr).serve(service); if let Err(e) = server.await { eprintln!("[ERROR]: {}", e); } }