Compare commits

...

2 Commits

Author SHA1 Message Date
eac109379e add: implement basic server 2025-03-30 00:12:34 -07:00
092658b039 add: rewriter logic and tests 2025-03-30 00:11:06 -07:00
12 changed files with 1872 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

1556
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

15
Cargo.toml Normal file
View File

@@ -0,0 +1,15 @@
[package]
name = "skubelb"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.97"
chrono = "0.4.40"
clap = { version = "4.5.34", features = ["derive"] }
dir-diff = "0.3.3"
env_logger = "0.11.7"
include_directory = "0.1.1"
log = "0.4.27"
rouille = "3.6.2"
tempdir = "0.3.7"

View File

@@ -0,0 +1,19 @@
# A simple Kubernetes load balancer
Configures nginx to forward connections to your node IPs.
Services should be declared as NodePort, which means that they
open a port on all nodes. When the request lands on any node,
it is forwarded to the correct pod via the network mesh kubernetes
is using. In theory, there is one a hop penalty.
But lets be honest. You're running with a single LB, probably a GCE free
tier N1 VM. That extra hop doesn't matter.
## Config
Configure nginx to do what you want, test it. Use any Node IP for your testing.
This tool accepts an argument (rewrite_string) which will be replaced using these rules:
1. For each line that contains rewrite string:
2. Copy the line once per node IP, replacing the string with host IPs

5
src/lib.rs Normal file
View File

@@ -0,0 +1,5 @@
mod rewriter;
mod server;
pub use rewriter::*;
pub use server::*;

84
src/main.rs Normal file
View File

@@ -0,0 +1,84 @@
use std::sync::Mutex;
use clap::Parser;
use skubelb::Rewriter;
use skubelb::Server;
use env_logger::Env;
use log::{info, warn};
use anyhow::Result;
use rouille::{router, Request, Response};
/// Implements a HTTP server which allows clients to 'register'
/// themselves. Their IP address will be used to replace a
/// needle in a set of config files. This is intended to be
/// used as a low-cost way of enabling Kubernetes ingress
/// using nginx running on a machine that has a public port.
///
/// The needle is expected to be a dummy IP address; something
/// fairly unique. The goal is to replace nginx files, where
/// we often repeat lines if we want nginx to load balance between
/// multiple destination IPs.
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// The needle that will be replaced. Anytime a line
/// is encountered with this needle, the line is dropped
/// and instead N lines (one per replacement) is added to
/// the output.
#[arg(short, long)]
rewrite_string: String,
/// The folder which contains the templates that
/// will be be searched for the needle.
#[arg(short, long)]
template_dir: String,
/// The symlink that should be updated each time the config changes.
///
/// Symlinks are used because file updates are not atomic.
#[arg(short, long)]
config_symlink: String,
/// Where to actually store the generated configs.
#[arg(short, long)]
workspace_dir: String,
#[arg(short, long, default_value_t = String::from("0.0.0.0:8080"))]
listen: String,
}
fn main() {
// Log info and above by default
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let args = Args::parse();
let rewriter = Rewriter::new(args.rewrite_string);
let server_impl = Mutex::new(Server::new(rewriter, args.workspace_dir, args.template_dir, args.config_symlink));
rouille::start_server(args.listen, move |request| {
info!("Processing request: {:?}", request);
match handle(request, &server_impl) {
Ok(resp) => resp,
Err(e) => {
warn!("{:?}", e);
Response{status_code: 500, ..Response::empty_400()}
}
}
});
}
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<Response> {
router!(request,
(POST) (/register) => {
server_impl.lock().unwrap().register(request)?;
Ok(Response{status_code: 200, ..Response::empty_204()})
},
(DELETE) (/register) => {
server_impl.lock().unwrap().unregister(request)?;
Ok(Response{status_code: 200, ..Response::empty_204()})
},
_ => Ok(Response::empty_404()),
)
}

117
src/rewriter.rs Normal file
View File

@@ -0,0 +1,117 @@
use anyhow::Result;
use std::collections::HashSet;
use std::path::Path;
use std::{
fs::{self, File},
io::{BufReader, prelude::*},
};
pub struct Rewriter {
source: String,
replacements: HashSet<String>,
}
impl Rewriter {
pub fn new(source: String) -> Self {
Self {
source,
replacements: HashSet::new(),
}
}
pub fn add_replacement(&mut self, replacement: String) {
self.replacements.insert(replacement);
}
pub fn remove_replacement(&mut self, replacement: &str) {
self.replacements.remove(replacement);
}
pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> {
// Make sure we are deterministic; construct a list of strings and sort
// them
let mut replacements: Vec<String> = self.replacements.iter().map(|s| s.clone()).collect();
replacements.sort();
let dst_base = Path::new(dst);
let mut to_visit = vec![String::from(src)];
while to_visit.len() > 0 {
let dir = to_visit.pop().unwrap();
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let metadata = entry.metadata()?;
// We need to find the destination this should be written into
// First, calculate the 'relative' path after we trim the
// the src prefix, then join that with the dst prefix
let src_path = entry.path();
let src_part_path = src_path.strip_prefix(&src)?;
let dst_path = dst_base.join(src_part_path);
if metadata.is_dir() {
// Create the directory, then carry on. Note that we explore the
// src_path after creating dst_path.
fs::create_dir(&dst_path)?;
to_visit.push(src_path.into_os_string().into_string().unwrap());
continue;
}
// Open 2 files; one to read and translate, and one to write.
let source_file = File::open(&src_path)?;
let mut dest_file = File::create(&dst_path)?;
let reader = BufReader::new(source_file);
for line in reader.lines() {
let line = line?;
// If the line is not subject to replacement, copy it and
// carry on.
if !line.contains(&self.source) {
writeln!(dest_file, "{}", line)?;
continue;
}
// Else, repeat the line multiple times, replacing the string
// in question
for replacement in &replacements {
let new_line = line.replace(&self.source, &replacement);
writeln!(dest_file, "{}", new_line)?;
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use include_directory::{Dir, include_directory};
use tempdir::TempDir;
use super::Rewriter;
static TEST_FILES: Dir<'_> = include_directory!("$CARGO_MANIFEST_DIR/src/testdata");
#[test]
fn basic_test() {
let testdata = TempDir::new("").unwrap();
TEST_FILES.extract(testdata.path()).unwrap();
let src = testdata.path().join("testsrc");
let dst = TempDir::new("").unwrap();
let mut rewriter = Rewriter::new("to_be_replaced".into());
rewriter.add_replacement("abc".into());
rewriter.add_replacement("def".into());
rewriter.add_replacement("zyx".into());
rewriter.remove_replacement("zyx");
rewriter
.rewrite_folder(
src.as_os_str().to_str().unwrap(),
dst.path().as_os_str().to_str().unwrap(),
)
.unwrap();
// Validate that everything matches
assert!(
dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false
);
}
}

59
src/server.rs Normal file
View File

@@ -0,0 +1,59 @@
use std::{fs, path::Path, time::Instant};
use chrono::{Datelike, Utc};
use log::info;
use rouille::Request;
use anyhow::{Context, Result};
use crate::Rewriter;
pub struct Server {
rewriter: Rewriter,
// Where we write temporary files
workspace_dir: String,
// Directory to read configs from
template_dir: String,
// The symlink that is updated
config_dir: String,
}
impl Server {
pub fn new(rewriter: Rewriter, workspace_dir: String, template_dir: String, config_dir: String) -> Self {
Self {
rewriter,
workspace_dir,
template_dir,
config_dir,
}
}
pub fn register(&mut self, request: &Request) -> Result<()> {
let ip = request.remote_addr().ip().to_string();
info!("Registering {} as a handler", ip);
self.rewriter.add_replacement(ip);
self.generate_config()?;
Ok(())
}
pub fn unregister(&mut self, request: &Request) -> Result<()> {
let ip = request.remote_addr().ip().to_string();
info!("Deregistering {} as a handler", ip);
self.rewriter.remove_replacement(&ip);
Ok(())
}
pub fn generate_config(&self) -> Result<()> {
// Create a new directory in our workspace
let now = Utc::now();
// Writes into 2020/01/01/<unix timestamp>
// This will fail if we have multiple requests per second
let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string());
let path = path.as_os_str().to_str().unwrap();
fs::create_dir_all(path).with_context(|| "creating directory")?;
self.rewriter.rewrite_folder(&self.template_dir, path).with_context(|| "generating configs")?;
// Finally, symlink it to the output folder; only support Linux for now
fs::remove_file(&self.config_dir).with_context(|| "removing old symlink")?;
std::os::unix::fs::symlink(path, &self.config_dir).with_context(|| "updating symlink")?;
Ok(())
}
}

5
src/testdata/testdst/hello.txt vendored Normal file
View File

@@ -0,0 +1,5 @@
This is a line
This is abc line
This is def line
This is another line

View File

@@ -0,0 +1,4 @@
This is a abc line.
This is a def line.
In a nested directory.

4
src/testdata/testsrc/hello.txt vendored Normal file
View File

@@ -0,0 +1,4 @@
This is a line
This is to_be_replaced line
This is another line

View File

@@ -0,0 +1,3 @@
This is a to_be_replaced line.
In a nested directory.