#!/usr/bin/env python3 import argparse import sqlite3 import time import sys from argon2 import PasswordHasher, Type from pathlib import Path DEFAULT_DB = "/var/lib/mailcloak/state.db" def connect(db_path: str, create: bool = False): db_file = Path(db_path).expanduser().resolve() if not create and not db_file.exists(): raise SystemExit(f"sqlite db not found at {db_file}; create it with mailcloakctl init") if create: db_file.parent.mkdir(parents=True, exist_ok=True) con = sqlite3.connect(str(db_file)) con.execute("PRAGMA foreign_keys=ON;") con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA synchronous=NORMAL;") con.executescript(""" CREATE TABLE IF NOT EXISTS aliases ( alias_email TEXT PRIMARY KEY, username TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1, updated_at INTEGER NOT NULL DEFAULT (strftime('%s','now')) ); CREATE INDEX IF NOT EXISTS idx_aliases_username ON aliases(username); CREATE TABLE IF NOT EXISTS apps ( app_id TEXT PRIMARY KEY, secret_hash TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1, created_at INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS app_from ( app_id TEXT NOT NULL, from_addr TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1, PRIMARY KEY (app_id, from_addr), FOREIGN KEY (app_id) REFERENCES apps(app_id) ON DELETE CASCADE ); """ ) return con def norm_email(s: str) -> str: return s.strip().lower() def norm_id(s: str) -> str: return s.strip() argon2_hasher = PasswordHasher( time_cost=3, memory_cost=65536, parallelism=1, hash_len=32, salt_len=16, type=Type.ID, ) def cmd_aliases_add(con, alias_email, username): alias_email = norm_email(alias_email) username = username.strip() now = int(time.time()) con.execute( "INSERT INTO aliases(alias_email, username, enabled, updated_at) VALUES(?,?,1,?) " "ON CONFLICT(alias_email) DO UPDATE SET username=excluded.username, enabled=1, updated_at=excluded.updated_at", (alias_email, username, now), ) con.commit() def cmd_aliases_del(con, alias_email): alias_email = norm_email(alias_email) con.execute("DELETE FROM aliases WHERE alias_email=?", (alias_email,)) con.commit() def cmd_aliases_disable(con, alias_email): alias_email = norm_email(alias_email) con.execute("UPDATE aliases SET enabled=0, updated_at=? WHERE alias_email=?", (int(time.time()), alias_email)) con.commit() def cmd_aliases_list(con, username=None): if username: rows = con.execute( "SELECT alias_email, username, enabled, updated_at FROM aliases WHERE username=? ORDER BY alias_email", (username,), ).fetchall() else: rows = con.execute( "SELECT alias_email, username, enabled, updated_at FROM aliases ORDER BY username, alias_email" ).fetchall() for a,u,en,ts in rows: print(f"{a}\t{u}\t{'enabled' if en else 'disabled'}\t{ts}") def cmd_apps_add(con, app_id, password): app_id = norm_id(app_id) secret_hash = f"{{ARGON2ID}}{argon2_hasher.hash(password)}" now = int(time.time()) con.execute( "INSERT INTO apps(app_id, secret_hash, enabled, created_at) VALUES(?,?,1,?) " "ON CONFLICT(app_id) DO UPDATE SET secret_hash=excluded.secret_hash, enabled=1, created_at=excluded.created_at", (app_id, secret_hash, now), ) con.commit() def cmd_apps_del(con, app_id): app_id = norm_id(app_id) con.execute("DELETE FROM apps WHERE app_id=?", (app_id,)) con.commit() def cmd_apps_allow(con, app_id, from_addr): app_id = norm_id(app_id) from_addr = norm_email(from_addr) con.execute( "INSERT INTO app_from(app_id, from_addr, enabled) VALUES(?,?,1) " "ON CONFLICT(app_id, from_addr) DO UPDATE SET enabled=1", (app_id, from_addr), ) con.commit() def cmd_apps_disallow(con, app_id, from_addr): app_id = norm_id(app_id) from_addr = norm_email(from_addr) con.execute("DELETE FROM app_from WHERE app_id=? AND from_addr=?", (app_id, from_addr)) con.commit() def cmd_apps_list(con): rows = con.execute( "SELECT app_id, enabled, created_at FROM apps ORDER BY app_id" ).fetchall() for app_id,en,ts in rows: print(f"{app_id}\t{'enabled' if en else 'disabled'}\t{ts}") from_rows = con.execute( "SELECT from_addr, enabled FROM app_from WHERE app_id=? ORDER BY from_addr", (app_id,), ).fetchall() if from_rows: parts = [f"{addr}" if en else f"{addr} (disabled)" for addr, en in from_rows] print("\t\t" + ", ".join(parts)) else: print("\t\t-") def cmd_init(db_path: str): con = connect(db_path, create=True) con.close() def main(): ap = argparse.ArgumentParser() ap.add_argument("--db", default=DEFAULT_DB) sub = ap.add_subparsers(dest="group", required=True) p_init = sub.add_parser("init") aliases = sub.add_parser("aliases") aliases_sub = aliases.add_subparsers(dest="cmd", required=True) p_add = aliases_sub.add_parser("add") p_add.add_argument("alias_email") p_add.add_argument("username") p_del = aliases_sub.add_parser("del") p_del.add_argument("alias_email") p_dis = aliases_sub.add_parser("disable") p_dis.add_argument("alias_email") p_ls = aliases_sub.add_parser("list") p_ls.add_argument("--user", default=None) apps = sub.add_parser("apps") apps_sub = apps.add_subparsers(dest="cmd", required=True) p_app_add = apps_sub.add_parser("add") p_app_add.add_argument("app_id") p_app_add.add_argument("password") p_app_del = apps_sub.add_parser("del") p_app_del.add_argument("app_id") p_app_allow = apps_sub.add_parser("allow") p_app_allow.add_argument("app_id") p_app_allow.add_argument("from_addr") p_app_disallow = apps_sub.add_parser("disallow") p_app_disallow.add_argument("app_id") p_app_disallow.add_argument("from_addr") p_app_ls = apps_sub.add_parser("list") args = ap.parse_args() if args.group == "init": cmd_init(args.db) return con = connect(args.db) try: if args.group == "aliases": if args.cmd == "add": cmd_aliases_add(con, args.alias_email, args.username) elif args.cmd == "del": cmd_aliases_del(con, args.alias_email) elif args.cmd == "disable": cmd_aliases_disable(con, args.alias_email) elif args.cmd == "list": cmd_aliases_list(con, args.user) elif args.group == "apps": if args.cmd == "add": cmd_apps_add(con, args.app_id, args.password) elif args.cmd == "del": cmd_apps_del(con, args.app_id) elif args.cmd == "allow": cmd_apps_allow(con, args.app_id, args.from_addr) elif args.cmd == "disallow": cmd_apps_disallow(con, args.app_id, args.from_addr) elif args.cmd == "list": cmd_apps_list(con) finally: con.close() if __name__ == "__main__": sys.exit(main())