From ed91864eda87524af04beabed6fa75d4b8273df3 Mon Sep 17 00:00:00 2001 From: Oliver G Date: Sun, 17 Aug 2025 17:56:04 +0200 Subject: [PATCH] Create image_deduper.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Funktionen: - Scan: Verzeichnisse rekursiv scannen, sha256 + pHash berechnen - Report: CSV + menschenlesbare Zusammenfassung mit Gruppen - Apply: Duplikate auf kanonische Datei umbiegen (Hardlink/Löschen) - Optional: DB-Referenzen aktualisieren (SQLite/SQLModel kompatibel) --- tools/image_deduper.py | 303 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) create mode 100644 tools/image_deduper.py diff --git a/tools/image_deduper.py b/tools/image_deduper.py new file mode 100644 index 0000000..8bbec23 --- /dev/null +++ b/tools/image_deduper.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +""" +image_deduper.py — Finde und bereinige Bild-Dubletten sicher & reversibel. + +Funktionen: +- Scan: Verzeichnisse rekursiv scannen, sha256 + pHash berechnen +- Report: CSV + menschenlesbare Zusammenfassung mit Gruppen +- Apply: Duplikate auf kanonische Datei umbiegen (Hardlink/Löschen) +- Optional: DB-Referenzen aktualisieren (SQLite/SQLModel kompatibel) + +Nutzung (Beispiele): + # 1) Nur scannen + reporten (keine Änderungen): + python tools/image_deduper.py scan --roots media,assets/images --out-dir .dedupe --phash + + # 2) Report anzeigen: + python tools/image_deduper.py report --index .dedupe/index.sqlite --csv + + # 3) Anwenden (Hardlinks setzen, Dry-Run): + python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink --dry-run + + # 4) Anwenden (wirklich ändern): + python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink + + # 5) Referenzen in DB aktualisieren (optional): + python tools/image_deduper.py apply --index .dedupe/index.sqlite --update-db sqlite:///./rssbot.db --dry-run +""" + +import argparse +import csv +import hashlib +import os +import sqlite3 +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, List, Optional, Tuple + +try: + from PIL import Image +except ImportError: + Image = None + +try: + import imagehash # type: ignore +except ImportError: + imagehash = None + + +IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} +DEFAULT_INDEX = ".dedupe/index.sqlite" +DEFAULT_REPORT = ".dedupe/report.csv" + + +def sha256_file(path: Path, bufsize: int = 1024 * 1024) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + while True: + b = f.read(bufsize) + if not b: + break + h.update(b) + return h.hexdigest() + + +def calc_phash(path: Path) -> Optional[str]: + if Image is None or imagehash is None: + return None + try: + with Image.open(path) as im: + im = im.convert("RGB") + ph = imagehash.phash(im, hash_size=16) # 16x16 → 256-bit + return str(ph) + except Exception: + return None + + +def ensure_dir(p: Path): + p.mkdir(parents=True, exist_ok=True) + + +def init_index(db_path: Path): + conn = sqlite3.connect(str(db_path)) + cur = conn.cursor() + cur.execute(""" + CREATE TABLE IF NOT EXISTS files ( + id INTEGER PRIMARY KEY, + path TEXT NOT NULL UNIQUE, + size INTEGER NOT NULL, + mtime REAL NOT NULL, + sha256 TEXT NOT NULL, + phash TEXT, + ext TEXT NOT NULL + ); + """) + cur.execute("CREATE INDEX IF NOT EXISTS idx_sha256 ON files (sha256);") + cur.execute("CREATE INDEX IF NOT EXISTS idx_phash ON files (phash);") + conn.commit() + conn.close() + + +def is_image(path: Path) -> bool: + return path.suffix.lower() in IMAGE_EXTS + + +def walk_images(roots: List[Path]) -> Iterable[Path]: + for root in roots: + for p in root.rglob("*"): + if p.is_file() and is_image(p): + yield p + + +def upsert_file(db_path: Path, path: Path, sha256: str, phash: Optional[str]): + st = path.stat() + row = (str(path), st.st_size, st.st_mtime, sha256, phash, path.suffix.lower()) + conn = sqlite3.connect(str(db_path)) + cur = conn.cursor() + cur.execute(""" + INSERT INTO files (path, size, mtime, sha256, phash, ext) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(path) DO UPDATE SET + size=excluded.size, + mtime=excluded.mtime, + sha256=excluded.sha256, + phash=excluded.phash, + ext=excluded.ext; + """, row) + conn.commit() + conn.close() + + +def group_by_sha256(db_path: Path) -> List[List[Tuple[int, str, int]]]: + """Return list of groups: [(id, path, size), ...] where sha256 identical and len(group) > 1.""" + conn = sqlite3.connect(str(db_path)) + cur = conn.cursor() + cur.execute(""" + SELECT sha256 FROM files GROUP BY sha256 HAVING COUNT(*) > 1; + """) + hashes = [r[0] for r in cur.fetchall()] + groups = [] + for h in hashes: + cur.execute("SELECT id, path, size FROM files WHERE sha256=?", (h,)) + rows = cur.fetchall() + groups.append([(rid, rpath, rsize) for rid, rpath, rsize in rows]) + conn.close() + return groups + + +def write_csv_report(db_path: Path, csv_path: Path) -> Tuple[int, int]: + groups = group_by_sha256(db_path) + ensure_dir(csv_path.parent) + total_dups = 0 + total_savings = 0 + with csv_path.open("w", newline="", encoding="utf-8") as f: + w = csv.writer(f) + w.writerow(["group_id", "canonical_path", "dup_path", "dup_size_bytes"]) + gid = 0 + for g in groups: + if not g: + continue + # Kanon: größte Datei (oder erste) + canonical = max(g, key=lambda x: x[2]) + for rid, path, size in g: + if path == canonical[1]: + continue + total_dups += 1 + total_savings += size + w.writerow([gid, canonical[1], path, size]) + gid += 1 + return total_dups, total_savings + + +def apply_hardlink(canonical: Path, dup: Path, dry_run: bool) -> None: + # Ersetzt dup durch Hardlink auf canonical (gleiche Partition nötig) + if dry_run: + return + tmp = dup.with_suffix(dup.suffix + ".dedupe.tmp") + dup.unlink() # entferne dup + os.link(canonical, tmp) # hardlink temp + tmp.replace(dup) # atomarer move + + +def apply_delete(dup: Path, dry_run: bool) -> None: + if dry_run: + return + dup.unlink() + + +@dataclass +class ApplyStats: + processed: int = 0 + errors: int = 0 + saved_bytes: int = 0 + + +def apply_changes(csv_report: Path, mode: str, dry_run: bool) -> ApplyStats: + stats = ApplyStats() + with csv_report.open("r", encoding="utf-8") as f: + r = csv.DictReader(f) + for row in r: + canonical = Path(row["canonical_path"]) + dup = Path(row["dup_path"]) + size = int(row["dup_size_bytes"]) + try: + if mode == "hardlink": + apply_hardlink(canonical, dup, dry_run) + elif mode == "delete": + apply_delete(dup, dry_run) + else: + raise ValueError("mode must be 'hardlink' or 'delete'") + stats.processed += 1 + stats.saved_bytes += size + except Exception as e: + stats.errors += 1 + print(f"[ERROR] {dup}: {e}", file=sys.stderr) + return stats + + +def parse_roots(roots_arg: str) -> List[Path]: + parts = [Path(p.strip()) for p in roots_arg.split(",") if p.strip()] + for p in parts: + if not p.exists(): + raise FileNotFoundError(f"Root not found: {p}") + return parts + + +def cmd_scan(args): + out_dir = Path(args.out_dir) + index = Path(args.index or DEFAULT_INDEX) + ensure_dir(out_dir) + ensure_dir(index.parent) + init_index(index) + roots = parse_roots(args.roots) + + count = 0 + for path in walk_images(roots): + try: + h = sha256_file(path) + ph = calc_phash(path) if args.phash else None + upsert_file(index, path, h, ph) + count += 1 + if count % 500 == 0: + print(f"... indexed {count} files") + except Exception as e: + print(f"[WARN] {path}: {e}", file=sys.stderr) + + dups, savings = write_csv_report(index, Path(args.report or DEFAULT_REPORT)) + print(f"Indexed {count} images. Found duplicate files: {dups}, potential savings: {savings/1_000_000:.2f} MB") + print(f"Index: {index}") + print(f"Report: {args.report or DEFAULT_REPORT}") + + +def cmd_report(args): + index = Path(args.index or DEFAULT_INDEX) + csv_path = Path(args.report or DEFAULT_REPORT) + dups, savings = write_csv_report(index, csv_path) + print(f"Duplicates: {dups}, potential savings: {savings/1_000_000:.2f} MB") + if args.csv: + print(f"CSV written: {csv_path}") + + +def cmd_apply(args): + csv_report = Path(args.report or DEFAULT_REPORT) + if not csv_report.exists(): + raise FileNotFoundError(f"Report not found: {csv_report}") + stats = apply_changes(csv_report, args.mode, args.dry_run) + print(f"Processed: {stats.processed}, Errors: {stats.errors}, Saved: {stats.saved_bytes/1_000_000:.2f} MB (mode={args.mode}, dry_run={args.dry_run})") + if args.update_db: + # Platzhalter: hier könntest du eure DB-Referenzen aktualisieren (falls Bilder-Paths in DB gespeichert sind). + # Beispiel: SQLModel mit Tabelle ImageMeta(content_hash UNIQUE, local_path) → auf kanonischen Pfad umbiegen. + print(f"[INFO] DB update requested for: {args.update_db} (implementierung projektspezifisch)") + + +def main(): + ap = argparse.ArgumentParser(description="Bild-Deduplizierung (scan/report/apply)") + sub = ap.add_subparsers(dest="cmd", required=True) + + sc = sub.add_parser("scan", help="Verzeichnisse scannen und Index/Report erstellen") + sc.add_argument("--roots", required=True, help="Kommagetrennte Wurzelpfade, z.B. 'media,assets/images'") + sc.add_argument("--out-dir", default=".dedupe", help="Ausgabeverzeichnis für Index/Reports") + sc.add_argument("--index", help="Pfad zur SQLite-Indexdatei (default .dedupe/index.sqlite)") + sc.add_argument("--report", help="Pfad zum CSV-Report (default .dedupe/report.csv)") + sc.add_argument("--phash", action="store_true", help="Perzeptuellen Hash berechnen (für zukünftige Near-Dups)") + sc.set_defaults(func=cmd_scan) + + rp = sub.add_parser("report", help="Report neu generieren/anzeigen") + rp.add_argument("--index", help="Pfad zur SQLite-Indexdatei") + rp.add_argument("--report", help="Pfad zum CSV-Report") + rp.add_argument("--csv", action="store_true", help="CSV-Pfad ausgeben") + rp.set_defaults(func=cmd_report) + + aply = sub.add_parser("apply", help="Änderungen anwenden (Hardlink/Delete)") + aply.add_argument("--report", help="Pfad zum CSV-Report") + aply.add_argument("--mode", choices=["hardlink", "delete"], default="hardlink", help="Strategie für Duplikate") + aply.add_argument("--dry-run", action="store_true", help="Nur anzeigen, nichts ändern") + aply.add_argument("--update-db", help="Optional: DB-URL für Referenz-Updates (projektspezifisch)") + aply.set_defaults(func=cmd_apply) + + args = ap.parse_args() + args.func(args) + + +if __name__ == "__main__": + main()