#!/usr/bin/env python3 import json import mimetypes import os import sqlite3 from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import urlparse ROOT = Path(__file__).resolve().parent DATA_DIR = Path(os.environ.get("BINDVAULT_DATA_DIR", ROOT / "data")) DB_PATH = Path(os.environ.get("BINDVAULT_DB", DATA_DIR / "bindvault.sqlite3")) HOST = os.environ.get("BINDVAULT_HOST", "0.0.0.0") PORT = int(os.environ.get("BINDVAULT_PORT", "8080")) COLLECTIONS = ("phones", "emails", "domains", "accounts", "bindings", "incidents") def now_iso(): return datetime.now(timezone.utc).isoformat() def connect(): DATA_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): with connect() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS records ( collection TEXT NOT NULL, id TEXT NOT NULL, payload TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, PRIMARY KEY (collection, id) ); CREATE INDEX IF NOT EXISTS idx_records_collection ON records(collection); """ ) def load_state(): state = {name: [] for name in COLLECTIONS} with connect() as conn: rows = conn.execute( "SELECT collection, payload FROM records ORDER BY updated_at DESC" ).fetchall() for row in rows: if row["collection"] not in state: continue state[row["collection"]].append(json.loads(row["payload"])) return state def replace_state(state): timestamp = now_iso() with connect() as conn: conn.execute("BEGIN") conn.execute("DELETE FROM records") for collection in COLLECTIONS: for record in state.get(collection, []): record_id = str(record.get("id") or "").strip() if not record_id: continue created_at = str(record.get("created_at") or timestamp) updated_at = str(record.get("updated_at") or timestamp) conn.execute( """ INSERT INTO records(collection, id, payload, created_at, updated_at) VALUES (?, ?, ?, ?, ?) """, ( collection, record_id, json.dumps(record, ensure_ascii=False, separators=(",", ":")), created_at, updated_at, ), ) conn.commit() class BindVaultHandler(BaseHTTPRequestHandler): server_version = "BindVaultMVP/0.1" def do_GET(self): path = urlparse(self.path).path if path == "/api/health": self.write_json({"ok": True, "database": str(DB_PATH)}) return if path == "/api/state": self.write_json({"data": load_state(), "database": str(DB_PATH)}) return self.serve_static(path) def do_PUT(self): path = urlparse(self.path).path if path != "/api/state": self.write_json({"error": "Not found"}, status=404) return try: body = self.read_json() incoming = body.get("data", body) state = {name: incoming.get(name, []) for name in COLLECTIONS} replace_state(state) self.write_json({"ok": True, "data": load_state()}) except (json.JSONDecodeError, TypeError, ValueError) as exc: self.write_json({"error": f"Invalid JSON: {exc}"}, status=400) def do_OPTIONS(self): self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, PUT, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def read_json(self): length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(length).decode("utf-8") return json.loads(raw or "{}") def write_json(self, payload, status=200): data = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(data) def serve_static(self, path): if path == "/": path = "/index.html" target = (ROOT / path.lstrip("/")).resolve() if ROOT not in target.parents and target != ROOT: self.write_json({"error": "Forbidden"}, status=403) return if not target.is_file(): self.write_json({"error": "Not found"}, status=404) return content_type = mimetypes.guess_type(target.name)[0] or "application/octet-stream" data = target.read_bytes() self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def log_message(self, fmt, *args): print(f"{self.address_string()} - {fmt % args}") if __name__ == "__main__": init_db() print(f"BindVault listening on http://{HOST}:{PORT}") print(f"SQLite database: {DB_PATH}") ThreadingHTTPServer((HOST, PORT), BindVaultHandler).serve_forever()