Compare commits
2 Commits
main
...
bff9db0b68
| Author | SHA1 | Date | |
|---|---|---|---|
| bff9db0b68 | |||
| 92aa2cd6a8 |
871
Cargo.lock
generated
871
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -11,5 +11,6 @@ dir-diff = "0.3.3"
|
|||||||
env_logger = "0.11.7"
|
env_logger = "0.11.7"
|
||||||
include_directory = "0.1.1"
|
include_directory = "0.1.1"
|
||||||
log = "0.4.27"
|
log = "0.4.27"
|
||||||
|
reqwest = { version = "0.12.23", features = ["blocking"] }
|
||||||
rouille = "3.6.2"
|
rouille = "3.6.2"
|
||||||
tempdir = "0.3.7"
|
tempdir = "0.3.7"
|
||||||
|
|||||||
42
src/bin/handler.rs
Normal file
42
src/bin/handler.rs
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
|
||||||
|
use log::error;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use env_logger::Env;
|
||||||
|
|
||||||
|
/// Implements a client that constantly refreshes with the server.
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(version, about, long_about = None)]
|
||||||
|
struct Args {
|
||||||
|
/// The skubelb server we should register with
|
||||||
|
#[arg(short, long)]
|
||||||
|
server: String,
|
||||||
|
|
||||||
|
/// The listen address that should be sent to skubelb.
|
||||||
|
#[arg(short, long)]
|
||||||
|
listen: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
// Log info and above by default
|
||||||
|
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
match handle(&args.server, &args.listen) {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => error!("{}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle(remote: &str, listen: &str) -> Result<()> {
|
||||||
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let url = format!("http://{}/register/{}", remote, listen);
|
||||||
|
loop {
|
||||||
|
sleep(Duration::from_secs(20));
|
||||||
|
client.post(&url).send()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
74
src/main.rs
74
src/main.rs
@@ -1,22 +1,26 @@
|
|||||||
use std::sync::Mutex;
|
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::thread;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
|
||||||
use skubelb::Rewriter;
|
use skubelb::Rewriter;
|
||||||
use skubelb::Server;
|
use skubelb::Server;
|
||||||
|
|
||||||
|
use anyhow::{Result, anyhow};
|
||||||
use env_logger::Env;
|
use env_logger::Env;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use anyhow::Result;
|
use rouille::{Request, Response, router};
|
||||||
use rouille::{router, Request, Response};
|
|
||||||
|
|
||||||
/// Implements a HTTP server which allows clients to 'register'
|
/// Implements a HTTP server which allows clients to 'register'
|
||||||
/// themselves. Their IP address will be used to replace a
|
/// themselves. Their IP address will be used to replace a
|
||||||
/// needle in a set of config files. This is intended to be
|
/// needle in a set of config files. This is intended to be
|
||||||
/// used as a low-cost way of enabling Kubernetes ingress
|
/// used as a low-cost way of enabling Kubernetes ingress
|
||||||
/// using nginx running on a machine that has a public port.
|
/// using nginx running on a machine that has a public port.
|
||||||
///
|
///
|
||||||
/// The needle is expected to be a dummy IP address; something
|
/// The needle is expected to be a dummy IP address; something
|
||||||
/// fairly unique. The goal is to replace nginx files, where
|
/// fairly unique. The goal is to replace nginx files, where
|
||||||
/// we often repeat lines if we want nginx to load balance between
|
/// we often repeat lines if we want nginx to load balance between
|
||||||
@@ -37,7 +41,7 @@ struct Args {
|
|||||||
template_dir: String,
|
template_dir: String,
|
||||||
|
|
||||||
/// The symlink that should be updated each time the config changes.
|
/// The symlink that should be updated each time the config changes.
|
||||||
///
|
///
|
||||||
/// Symlinks are used because file updates are not atomic.
|
/// Symlinks are used because file updates are not atomic.
|
||||||
#[arg(short, long)]
|
#[arg(short, long)]
|
||||||
config_symlink: String,
|
config_symlink: String,
|
||||||
@@ -61,10 +65,31 @@ fn main() {
|
|||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
|
|
||||||
let rewriter = Rewriter::new(args.needle);
|
let rewriter = Rewriter::new(args.needle);
|
||||||
let server_impl = Mutex::new(Server::new(rewriter, args.workspace_dir, args.template_dir, args.config_symlink));
|
let server_impl = Arc::new(Mutex::new(Server::new(
|
||||||
|
rewriter,
|
||||||
|
args.workspace_dir,
|
||||||
|
args.template_dir,
|
||||||
|
args.config_symlink,
|
||||||
|
)));
|
||||||
let reload_command = args.reload_cmd.leak();
|
let reload_command = args.reload_cmd.leak();
|
||||||
let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect();
|
let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect();
|
||||||
|
|
||||||
|
// Start cleanup thread
|
||||||
|
{
|
||||||
|
let server_impl = server_impl.clone();
|
||||||
|
thread::spawn(move || {
|
||||||
|
loop {
|
||||||
|
sleep(Duration::from_secs(30));
|
||||||
|
match cleanup_worker(&server_impl) {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error cleaning up handlers {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
rouille::start_server(args.listen, move |request| {
|
rouille::start_server(args.listen, move |request| {
|
||||||
info!("Processing request: {:?}", request);
|
info!("Processing request: {:?}", request);
|
||||||
match handle(request, &server_impl) {
|
match handle(request, &server_impl) {
|
||||||
@@ -76,19 +101,30 @@ fn main() {
|
|||||||
match output {
|
match output {
|
||||||
Ok(o) => {
|
Ok(o) => {
|
||||||
info!("Ran {:?}; exit code: {}", reload_command, o.status);
|
info!("Ran {:?}; exit code: {}", reload_command, o.status);
|
||||||
info!("Ran {:?}; stdout: {}", reload_command, String::from_utf8_lossy(&o.stdout));
|
info!(
|
||||||
info!("Ran {:?}; stderr: {}", reload_command, String::from_utf8_lossy(&o.stderr));
|
"Ran {:?}; stdout: {}",
|
||||||
},
|
reload_command,
|
||||||
|
String::from_utf8_lossy(&o.stdout)
|
||||||
|
);
|
||||||
|
info!(
|
||||||
|
"Ran {:?}; stderr: {}",
|
||||||
|
reload_command,
|
||||||
|
String::from_utf8_lossy(&o.stderr)
|
||||||
|
);
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to run {:?}: {:?}", reload_command, e);
|
warn!("Failed to run {:?}: {:?}", reload_command, e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
resp
|
resp
|
||||||
},
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("{:?}", e);
|
warn!("{:?}", e);
|
||||||
Response{status_code: 500, ..Response::empty_400()}
|
Response {
|
||||||
|
status_code: 500,
|
||||||
|
..Response::empty_400()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -97,13 +133,23 @@ fn main() {
|
|||||||
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<(Response, bool)> {
|
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<(Response, bool)> {
|
||||||
router!(request,
|
router!(request,
|
||||||
(POST) (/register/{ip: String}) => {
|
(POST) (/register/{ip: String}) => {
|
||||||
server_impl.lock().unwrap().register(request, &ip)?;
|
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
|
||||||
|
server_impl.register(request, &ip)?;
|
||||||
|
server_impl.cleanup()?;
|
||||||
Ok((Response{status_code: 200, ..Response::empty_204()}, true))
|
Ok((Response{status_code: 200, ..Response::empty_204()}, true))
|
||||||
},
|
},
|
||||||
(DELETE) (/register/{ip: String}) => {
|
(DELETE) (/register/{ip: String}) => {
|
||||||
server_impl.lock().unwrap().unregister(request, &ip)?;
|
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
|
||||||
|
server_impl.unregister(request, &ip)?;
|
||||||
|
server_impl.cleanup()?;
|
||||||
Ok((Response{status_code: 200, ..Response::empty_204()}, true))
|
Ok((Response{status_code: 200, ..Response::empty_204()}, true))
|
||||||
},
|
},
|
||||||
_ => Ok((Response::empty_404(), false)),
|
_ => Ok((Response::empty_404(), false)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn cleanup_worker(server_impl: &Mutex<Server>) -> Result<()> {
|
||||||
|
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
|
||||||
|
server_impl.cleanup()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use std::collections::HashSet;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::time::Instant;
|
||||||
use std::{
|
use std::{
|
||||||
fs::{self, File},
|
fs::{self, File},
|
||||||
io::{BufReader, prelude::*},
|
io::{BufReader, prelude::*},
|
||||||
@@ -9,6 +10,8 @@ use std::{
|
|||||||
pub struct Rewriter {
|
pub struct Rewriter {
|
||||||
source: String,
|
source: String,
|
||||||
replacements: HashSet<String>,
|
replacements: HashSet<String>,
|
||||||
|
// When each replacement should be cleaned up
|
||||||
|
replacement_cleanup: HashMap<String, Instant>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Rewriter {
|
impl Rewriter {
|
||||||
@@ -16,15 +19,18 @@ impl Rewriter {
|
|||||||
Self {
|
Self {
|
||||||
source,
|
source,
|
||||||
replacements: HashSet::new(),
|
replacements: HashSet::new(),
|
||||||
|
replacement_cleanup: HashMap::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_replacement(&mut self, replacement: String) {
|
pub fn add_replacement(&mut self, replacement: String, cleanup: Instant) {
|
||||||
self.replacements.insert(replacement);
|
self.replacements.insert(replacement.clone());
|
||||||
|
self.replacement_cleanup.insert(replacement, cleanup);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_replacement(&mut self, replacement: &str) {
|
pub fn remove_replacement(&mut self, replacement: &str) {
|
||||||
self.replacements.remove(replacement);
|
self.replacements.remove(replacement);
|
||||||
|
self.replacement_cleanup.remove(replacement);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> {
|
pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> {
|
||||||
@@ -91,6 +97,20 @@ impl Rewriter {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn cleanup(&mut self) {
|
||||||
|
let now = Instant::now();
|
||||||
|
let mut to_remove = vec![];
|
||||||
|
for (name, when) in self.replacement_cleanup.iter() {
|
||||||
|
if when < &now {
|
||||||
|
to_remove.push(name.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for name in to_remove {
|
||||||
|
self.remove_replacement(&name);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
|
fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
|
||||||
@@ -110,6 +130,8 @@ fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use include_directory::{Dir, include_directory};
|
use include_directory::{Dir, include_directory};
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
|
|
||||||
@@ -123,10 +145,11 @@ mod tests {
|
|||||||
let src = testdata.path().join("testsrc");
|
let src = testdata.path().join("testsrc");
|
||||||
let dst = TempDir::new("").unwrap();
|
let dst = TempDir::new("").unwrap();
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
let mut rewriter = Rewriter::new("to_be_replaced".into());
|
let mut rewriter = Rewriter::new("to_be_replaced".into());
|
||||||
rewriter.add_replacement("abc".into());
|
rewriter.add_replacement("abc".into(), now.checked_add(Duration::new(60*60*24, 0)).unwrap());
|
||||||
rewriter.add_replacement("def".into());
|
rewriter.add_replacement("def".into(), now);
|
||||||
rewriter.add_replacement("zyx".into());
|
rewriter.add_replacement("zyx".into(), now);
|
||||||
rewriter.remove_replacement("zyx");
|
rewriter.remove_replacement("zyx");
|
||||||
rewriter
|
rewriter
|
||||||
.rewrite_folder(
|
.rewrite_folder(
|
||||||
@@ -139,5 +162,20 @@ mod tests {
|
|||||||
assert!(
|
assert!(
|
||||||
dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false
|
dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Trigger the cleanup, which should GC abc
|
||||||
|
let dst = TempDir::new("").unwrap();
|
||||||
|
rewriter.cleanup();
|
||||||
|
rewriter
|
||||||
|
.rewrite_folder(
|
||||||
|
src.as_os_str().to_str().unwrap(),
|
||||||
|
dst.path().as_os_str().to_str().unwrap(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Validate that everything matches
|
||||||
|
assert!(
|
||||||
|
dir_diff::is_different(testdata.path().join("testdst_after_gc"), dst.path()).unwrap() == false
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
use std::{fs, path::Path};
|
use std::{
|
||||||
|
fs,
|
||||||
|
path::Path,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Context, Result};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use log::info;
|
use log::info;
|
||||||
use rouille::Request;
|
use rouille::Request;
|
||||||
use anyhow::{Context, Result};
|
|
||||||
|
|
||||||
use crate::Rewriter;
|
use crate::Rewriter;
|
||||||
|
|
||||||
@@ -18,7 +22,12 @@ pub struct Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
pub fn new(rewriter: Rewriter, workspace_dir: String, template_dir: String, config_dir: String) -> Self {
|
pub fn new(
|
||||||
|
rewriter: Rewriter,
|
||||||
|
workspace_dir: String,
|
||||||
|
template_dir: String,
|
||||||
|
config_dir: String,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
rewriter,
|
rewriter,
|
||||||
workspace_dir,
|
workspace_dir,
|
||||||
@@ -27,9 +36,17 @@ impl Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn cleanup(&mut self) -> Result<()> {
|
||||||
|
self.rewriter.cleanup();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> {
|
pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> {
|
||||||
info!("Registering {} as a handler", ip);
|
info!("Registering {} as a handler", ip);
|
||||||
self.rewriter.add_replacement(ip.to_string());
|
let cleanup_time = Instant::now()
|
||||||
|
.checked_add(Duration::from_secs(60 * 5))
|
||||||
|
.ok_or(anyhow!("failed to convert time"))?;
|
||||||
|
self.rewriter.add_replacement(ip.to_string(), cleanup_time);
|
||||||
self.generate_config()?;
|
self.generate_config()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -49,11 +66,13 @@ impl Server {
|
|||||||
let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string());
|
let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string());
|
||||||
let path = path.as_os_str().to_str().unwrap();
|
let path = path.as_os_str().to_str().unwrap();
|
||||||
fs::create_dir_all(path).with_context(|| "creating directory")?;
|
fs::create_dir_all(path).with_context(|| "creating directory")?;
|
||||||
self.rewriter.rewrite_folder(&self.template_dir, path).with_context(|| "generating configs")?;
|
self.rewriter
|
||||||
|
.rewrite_folder(&self.template_dir, path)
|
||||||
|
.with_context(|| "generating configs")?;
|
||||||
// Finally, symlink it to the output folder; only support Linux for now
|
// Finally, symlink it to the output folder; only support Linux for now
|
||||||
let symlink = Path::new(&self.workspace_dir).join("symlink.tmp");
|
let symlink = Path::new(&self.workspace_dir).join("symlink.tmp");
|
||||||
std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?;
|
std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?;
|
||||||
fs::rename(symlink, &self.config_dir).with_context(|| "renaming symlink")?;
|
fs::rename(symlink, &self.config_dir).with_context(|| "renaming symlink")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
4
src/testdata/testdst_after_gc/hello.txt
vendored
Normal file
4
src/testdata/testdst_after_gc/hello.txt
vendored
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
This is a line
|
||||||
|
This is abc line
|
||||||
|
|
||||||
|
This is another line
|
||||||
3
src/testdata/testdst_after_gc/recursive/world.txt
vendored
Normal file
3
src/testdata/testdst_after_gc/recursive/world.txt
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
This is a abc line.
|
||||||
|
|
||||||
|
In a nested directory.
|
||||||
Reference in New Issue
Block a user