Files
mailcloak/mailcloakctl
peio 57966de4fa
All checks were successful
release / build (amd64, linux) (push) Successful in 1m32s
feat: enhance logging and refactor database handling
2026-01-23 17:54:15 +00:00

226 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import sqlite3
import time
import sys
from argon2 import PasswordHasher, Type
from pathlib import Path
DEFAULT_DB = "/var/lib/mailcloak/state.db"
def connect(db_path: str, create: bool = False):
db_file = Path(db_path).expanduser().resolve()
if not create and not db_file.exists():
raise SystemExit(f"sqlite db not found at {db_file}; create it with mailcloakctl init")
if create:
db_file.parent.mkdir(parents=True, exist_ok=True)
con = sqlite3.connect(str(db_file))
con.execute("PRAGMA foreign_keys=ON;")
con.execute("PRAGMA journal_mode=WAL;")
con.execute("PRAGMA synchronous=NORMAL;")
con.executescript("""
CREATE TABLE IF NOT EXISTS aliases (
alias_email TEXT PRIMARY KEY,
username TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
updated_at INTEGER NOT NULL DEFAULT (strftime('%s','now'))
);
CREATE INDEX IF NOT EXISTS idx_aliases_username ON aliases(username);
CREATE TABLE IF NOT EXISTS apps (
app_id TEXT PRIMARY KEY,
secret_hash TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS app_from (
app_id TEXT NOT NULL,
from_addr TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (app_id, from_addr),
FOREIGN KEY (app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);
"""
)
return con
def norm_email(s: str) -> str:
return s.strip().lower()
def norm_id(s: str) -> str:
return s.strip()
argon2_hasher = PasswordHasher(
time_cost=3,
memory_cost=65536,
parallelism=1,
hash_len=32,
salt_len=16,
type=Type.ID,
)
def cmd_aliases_add(con, alias_email, username):
alias_email = norm_email(alias_email)
username = username.strip()
now = int(time.time())
con.execute(
"INSERT INTO aliases(alias_email, username, enabled, updated_at) VALUES(?,?,1,?) "
"ON CONFLICT(alias_email) DO UPDATE SET username=excluded.username, enabled=1, updated_at=excluded.updated_at",
(alias_email, username, now),
)
con.commit()
def cmd_aliases_del(con, alias_email):
alias_email = norm_email(alias_email)
con.execute("DELETE FROM aliases WHERE alias_email=?", (alias_email,))
con.commit()
def cmd_aliases_disable(con, alias_email):
alias_email = norm_email(alias_email)
con.execute("UPDATE aliases SET enabled=0, updated_at=? WHERE alias_email=?", (int(time.time()), alias_email))
con.commit()
def cmd_aliases_list(con, username=None):
if username:
rows = con.execute(
"SELECT alias_email, username, enabled, updated_at FROM aliases WHERE username=? ORDER BY alias_email",
(username,),
).fetchall()
else:
rows = con.execute(
"SELECT alias_email, username, enabled, updated_at FROM aliases ORDER BY username, alias_email"
).fetchall()
for a,u,en,ts in rows:
print(f"{a}\t{u}\t{'enabled' if en else 'disabled'}\t{ts}")
def cmd_apps_add(con, app_id, password):
app_id = norm_id(app_id)
secret_hash = f"{{ARGON2ID}}{argon2_hasher.hash(password)}"
now = int(time.time())
con.execute(
"INSERT INTO apps(app_id, secret_hash, enabled, created_at) VALUES(?,?,1,?) "
"ON CONFLICT(app_id) DO UPDATE SET secret_hash=excluded.secret_hash, enabled=1, created_at=excluded.created_at",
(app_id, secret_hash, now),
)
con.commit()
def cmd_apps_del(con, app_id):
app_id = norm_id(app_id)
con.execute("DELETE FROM apps WHERE app_id=?", (app_id,))
con.commit()
def cmd_apps_allow(con, app_id, from_addr):
app_id = norm_id(app_id)
from_addr = norm_email(from_addr)
con.execute(
"INSERT INTO app_from(app_id, from_addr, enabled) VALUES(?,?,1) "
"ON CONFLICT(app_id, from_addr) DO UPDATE SET enabled=1",
(app_id, from_addr),
)
con.commit()
def cmd_apps_disallow(con, app_id, from_addr):
app_id = norm_id(app_id)
from_addr = norm_email(from_addr)
con.execute("DELETE FROM app_from WHERE app_id=? AND from_addr=?", (app_id, from_addr))
con.commit()
def cmd_apps_list(con):
rows = con.execute(
"SELECT app_id, enabled, created_at FROM apps ORDER BY app_id"
).fetchall()
for app_id,en,ts in rows:
print(f"{app_id}\t{'enabled' if en else 'disabled'}\t{ts}")
from_rows = con.execute(
"SELECT from_addr, enabled FROM app_from WHERE app_id=? ORDER BY from_addr",
(app_id,),
).fetchall()
if from_rows:
parts = [f"{addr}" if en else f"{addr} (disabled)" for addr, en in from_rows]
print("\t\t" + ", ".join(parts))
else:
print("\t\t-")
def cmd_init(db_path: str):
con = connect(db_path, create=True)
con.close()
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", default=DEFAULT_DB)
sub = ap.add_subparsers(dest="group", required=True)
p_init = sub.add_parser("init")
aliases = sub.add_parser("aliases")
aliases_sub = aliases.add_subparsers(dest="cmd", required=True)
p_add = aliases_sub.add_parser("add")
p_add.add_argument("alias_email")
p_add.add_argument("username")
p_del = aliases_sub.add_parser("del")
p_del.add_argument("alias_email")
p_dis = aliases_sub.add_parser("disable")
p_dis.add_argument("alias_email")
p_ls = aliases_sub.add_parser("list")
p_ls.add_argument("--user", default=None)
apps = sub.add_parser("apps")
apps_sub = apps.add_subparsers(dest="cmd", required=True)
p_app_add = apps_sub.add_parser("add")
p_app_add.add_argument("app_id")
p_app_add.add_argument("password")
p_app_del = apps_sub.add_parser("del")
p_app_del.add_argument("app_id")
p_app_allow = apps_sub.add_parser("allow")
p_app_allow.add_argument("app_id")
p_app_allow.add_argument("from_addr")
p_app_disallow = apps_sub.add_parser("disallow")
p_app_disallow.add_argument("app_id")
p_app_disallow.add_argument("from_addr")
p_app_ls = apps_sub.add_parser("list")
args = ap.parse_args()
if args.group == "init":
cmd_init(args.db)
return
con = connect(args.db)
try:
if args.group == "aliases":
if args.cmd == "add":
cmd_aliases_add(con, args.alias_email, args.username)
elif args.cmd == "del":
cmd_aliases_del(con, args.alias_email)
elif args.cmd == "disable":
cmd_aliases_disable(con, args.alias_email)
elif args.cmd == "list":
cmd_aliases_list(con, args.user)
elif args.group == "apps":
if args.cmd == "add":
cmd_apps_add(con, args.app_id, args.password)
elif args.cmd == "del":
cmd_apps_del(con, args.app_id)
elif args.cmd == "allow":
cmd_apps_allow(con, args.app_id, args.from_addr)
elif args.cmd == "disallow":
cmd_apps_disallow(con, args.app_id, args.from_addr)
elif args.cmd == "list":
cmd_apps_list(con)
finally:
con.close()
if __name__ == "__main__":
sys.exit(main())