summaryrefslogtreecommitdiff
path: root/metrics/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'metrics/src/main.rs')
-rw-r--r--metrics/src/main.rs154
1 files changed, 154 insertions, 0 deletions
diff --git a/metrics/src/main.rs b/metrics/src/main.rs
new file mode 100644
index 0000000..af489dd
--- /dev/null
+++ b/metrics/src/main.rs
@@ -0,0 +1,154 @@
+use chrono::Utc;
+use hyper::{
+ service::{make_service_fn, service_fn},
+ Body, Method, Request, Response, Server, StatusCode,
+};
+use lazy_static::lazy_static;
+use ratman_identity::Identity;
+use std::{
+ collections::BTreeMap,
+ convert::Infallible,
+ env,
+ fs::OpenOptions,
+ io::Write,
+ net::SocketAddr,
+ path::PathBuf,
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Mutex,
+ },
+ thread,
+ time::Duration,
+};
+
+lazy_static! {
+ static ref STATE: Mutex<BTreeMap<Identity, MetricEntry>> = Mutex::new(BTreeMap::new());
+}
+
+/// An entry of metrics data
+#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+struct MetricEntry {}
+
+fn dump_metrics() {
+ let p = env::var("BROOK_METRICS_PATH").unwrap_or("metrics.csv".into());
+ let path = PathBuf::new().join(p);
+
+ loop {
+ let time = Utc::now();
+ let mut map = STATE.lock().unwrap();
+ let metrics: Vec<()> = map.iter().map(|_| ()).collect();
+ map.clear();
+ drop(map);
+
+ let line = format!(
+ "{},{}\n",
+ time.format("%Y-%m-%d_%H:%M:%S UTC"),
+ metrics.len(),
+ );
+ let mut file = OpenOptions::new()
+ .write(true)
+ .append(true)
+ .create(true)
+ .open(&path)
+ .unwrap();
+
+ file.write_all(line.as_bytes()).unwrap();
+
+ // Wait for a while before doing it again
+ thread::sleep(Duration::from_secs(15));
+ }
+}
+
+fn get() -> Result<Response<Body>, Infallible> {
+ let mut resp = Response::new(Body::empty());
+ *resp.body_mut() = Body::from(
+ "<!DOCTYPE html>
+<html>
+<head><title>brook-metrics</title></head>
+<body>
+<p>
+Your brook-metrics server is now reachable! \
+Point your brook clients towards $ADDR/report
+</p>
+</body>
+</html>",
+ );
+
+ Ok(resp)
+}
+
+async fn post(req: Request<Body>) -> Result<Response<Body>, Infallible> {
+ let full_body = hyper::body::to_bytes(req.into_body()).await.unwrap();
+ let body_str = String::from_utf8(full_body.to_vec()).unwrap();
+
+ let parsed = json::parse(&body_str).unwrap();
+
+ // Either get the ID or create a new one
+ let id = match parsed {
+ json::JsonValue::Object(mut obj) => obj.remove("id"),
+ _ => unreachable!(),
+ }
+ .map(|jv| match jv {
+ json::JsonValue::String(ref s) => Identity::from_string(s),
+ _ => unreachable!(),
+ })
+ .map(|id| {
+ eprintln!("[DEBUG] Returning client {}", id);
+ id
+ })
+ .unwrap_or_else(|| {
+ eprintln!("[DEBUG] Adding new stream client!");
+ Identity::random()
+ });
+
+ STATE.lock().unwrap().insert(id, MetricEntry {});
+
+ Ok(Response::builder()
+ .header("Content-Type", "text/json")
+ .status(StatusCode::OK)
+ .body(Body::from(format!("{{ \"id\": \"{}\" }}", id.to_string())))
+ .unwrap())
+}
+
+fn get_current() -> Result<Response<Body>, Infallible> {
+ let len = STATE.lock().unwrap().len();
+ Ok(Response::builder()
+ .header("Content-Type", "text/json")
+ .status(StatusCode::OK)
+ .body(Body::from(format!("{{ \"num\": {} }}", len)))
+ .unwrap())
+}
+
+async fn handle_request(req: Request<Body>) -> Result<Response<Body>, Infallible> {
+ match (req.method(), req.uri().path()) {
+ // TODO: make this prefix configurable
+ (&Method::GET, "/metrics") => get(),
+ (&Method::GET, "/metrics/current") => get_current(),
+ (&Method::POST, "/metrics/update") => post(req).await,
+ _ => {
+ let resp = Response::builder()
+ .status(StatusCode::NOT_FOUND)
+ .body(Body::from("Nothing here, I'm afraid!"))
+ .unwrap();
+
+ Ok(resp)
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ eprintln!("ID Length: {}", ratman_identity::ID_LEN);
+ thread::spawn(|| dump_metrics());
+
+ let addr = SocketAddr::from(([0, 0, 0, 0], 7667));
+ let service = make_service_fn(|_c| async { Ok::<_, Infallible>(service_fn(handle_request)) });
+
+ eprintln!("Bindng address: 0.0.0.0:7667");
+ let server = Server::bind(&addr).serve(service);
+
+
+ if let Err(e) = server.await {
+ eprintln!("[ERROR]: {}", e);
+ }
+}