diff --git a/tools/image_deduper.py b/tools/image_deduper.py index 715e4e0..8bbec23 100644 --- a/tools/image_deduper.py +++ b/tools/image_deduper.py @@ -2,40 +2,39 @@ """ image_deduper.py — Finde und bereinige Bild-Dubletten sicher & reversibel. -Neu: -- collect: Bild-URLs aus JSON/HTML/Markdown/Text sammeln und lokal in .media_cache/ speichern -- scan/report/apply: wie gehabt (Hardlink/Delete), jetzt standardmäßig mit Cache nutzbar +Funktionen: +- Scan: Verzeichnisse rekursiv scannen, sha256 + pHash berechnen +- Report: CSV + menschenlesbare Zusammenfassung mit Gruppen +- Apply: Duplikate auf kanonische Datei umbiegen (Hardlink/Löschen) +- Optional: DB-Referenzen aktualisieren (SQLite/SQLModel kompatibel) -Workflows: - # A) Nur remote-Referenzen vorhanden → erst sammeln, dann deduplizieren - python tools/image_deduper.py collect --sources processed_articles.json,content/ --cache .media_cache - python tools/image_deduper.py scan --roots .media_cache --out-dir .dedupe --phash - python tools/image_deduper.py apply --report .dedupe/report.csv --mode hardlink --dry-run - python tools/image_deduper.py apply --report .dedupe/report.csv --mode hardlink +Nutzung (Beispiele): + # 1) Nur scannen + reporten (keine Änderungen): + python tools/image_deduper.py scan --roots media,assets/images --out-dir .dedupe --phash - # B) Lokale Bild-Ordner vorhanden - python tools/image_deduper.py scan --roots ./media,./static/images --out-dir .dedupe --phash + # 2) Report anzeigen: + python tools/image_deduper.py report --index .dedupe/index.sqlite --csv -Hinweise: -- "collect" speichert Bilder content-addressed (.) und lädt identische Dateien nicht doppelt. -- Für HTML/Markdown werden Bild-URLs geparst; für JSON werden alle Stringwerte mit Bild-URL-Muster extrahiert. + # 3) Anwenden (Hardlinks setzen, Dry-Run): + python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink --dry-run + + # 4) Anwenden (wirklich ändern): + python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink + + # 5) Referenzen in DB aktualisieren (optional): + python tools/image_deduper.py apply --index .dedupe/index.sqlite --update-db sqlite:///./rssbot.db --dry-run """ import argparse import csv import hashlib -import json -import mimetypes import os -import re import sqlite3 import sys from dataclasses import dataclass from pathlib import Path -from typing import Iterable, List, Optional, Tuple, Set -from urllib.parse import urlparse +from typing import Iterable, List, Optional, Tuple -# Optionale Parser/Helpers try: from PIL import Image except ImportError: @@ -46,39 +45,12 @@ try: except ImportError: imagehash = None -try: - import requests -except ImportError: - requests = None - -try: - from bs4 import BeautifulSoup -except ImportError: - BeautifulSoup = None - -try: - from tqdm import tqdm -except ImportError: - tqdm = None - IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} DEFAULT_INDEX = ".dedupe/index.sqlite" DEFAULT_REPORT = ".dedupe/report.csv" -DEFAULT_CACHE = ".media_cache" - -URL_RE = re.compile( - r"""https?://[^\s"'<>]+?\.(?:jpg|jpeg|png|webp|gif)(?:\?[^\s"'<>]*)?""", - re.IGNORECASE, -) - -def human_mb(nbytes: int) -> str: - return f"{nbytes/1_000_000:.2f} MB" -# --------------------------- -# Hashing & pHash -# --------------------------- def sha256_file(path: Path, bufsize: int = 1024 * 1024) -> str: h = hashlib.sha256() with path.open("rb") as f: @@ -102,9 +74,6 @@ def calc_phash(path: Path) -> Optional[str]: return None -# --------------------------- -# Index (SQLite) -# --------------------------- def ensure_dir(p: Path): p.mkdir(parents=True, exist_ok=True) @@ -135,9 +104,6 @@ def is_image(path: Path) -> bool: def walk_images(roots: List[Path]) -> Iterable[Path]: for root in roots: - if not root.exists(): - # Skip silently to be more forgiving; user might pass multiple roots - continue for p in root.rglob("*"): if p.is_file() and is_image(p): yield p @@ -163,9 +129,12 @@ def upsert_file(db_path: Path, path: Path, sha256: str, phash: Optional[str]): def group_by_sha256(db_path: Path) -> List[List[Tuple[int, str, int]]]: + """Return list of groups: [(id, path, size), ...] where sha256 identical and len(group) > 1.""" conn = sqlite3.connect(str(db_path)) cur = conn.cursor() - cur.execute("SELECT sha256 FROM files GROUP BY sha256 HAVING COUNT(*) > 1;") + cur.execute(""" + SELECT sha256 FROM files GROUP BY sha256 HAVING COUNT(*) > 1; + """) hashes = [r[0] for r in cur.fetchall()] groups = [] for h in hashes: @@ -188,7 +157,8 @@ def write_csv_report(db_path: Path, csv_path: Path) -> Tuple[int, int]: for g in groups: if not g: continue - canonical = max(g, key=lambda x: x[2]) # größte Datei als Kanon + # Kanon: größte Datei (oder erste) + canonical = max(g, key=lambda x: x[2]) for rid, path, size in g: if path == canonical[1]: continue @@ -199,22 +169,20 @@ def write_csv_report(db_path: Path, csv_path: Path) -> Tuple[int, int]: return total_dups, total_savings -# --------------------------- -# Apply (Hardlink/Delete) -# --------------------------- def apply_hardlink(canonical: Path, dup: Path, dry_run: bool) -> None: + # Ersetzt dup durch Hardlink auf canonical (gleiche Partition nötig) if dry_run: return tmp = dup.with_suffix(dup.suffix + ".dedupe.tmp") - dup.unlink(missing_ok=False) - os.link(canonical, tmp) # gleicher FS notwendig - tmp.replace(dup) + dup.unlink() # entferne dup + os.link(canonical, tmp) # hardlink temp + tmp.replace(dup) # atomarer move def apply_delete(dup: Path, dry_run: bool) -> None: if dry_run: return - dup.unlink(missing_ok=False) + dup.unlink() @dataclass @@ -247,42 +215,89 @@ def apply_changes(csv_report: Path, mode: str, dry_run: bool) -> ApplyStats: return stats -# --------------------------- -# Collect (URLs -> Cache) -# --------------------------- -def is_image_url(url: str) -> bool: - if URL_RE.search(url): - return True - # Fallback: Extension anhand des Pfads - path = urlparse(url).path - ext = Path(path).suffix.lower() - return ext in IMAGE_EXTS +def parse_roots(roots_arg: str) -> List[Path]: + parts = [Path(p.strip()) for p in roots_arg.split(",") if p.strip()] + for p in parts: + if not p.exists(): + raise FileNotFoundError(f"Root not found: {p}") + return parts -def extract_urls_from_text(text: str) -> Set[str]: - urls = set(URL_RE.findall(text)) - return urls +def cmd_scan(args): + out_dir = Path(args.out_dir) + index = Path(args.index or DEFAULT_INDEX) + ensure_dir(out_dir) + ensure_dir(index.parent) + init_index(index) + roots = parse_roots(args.roots) + + count = 0 + for path in walk_images(roots): + try: + h = sha256_file(path) + ph = calc_phash(path) if args.phash else None + upsert_file(index, path, h, ph) + count += 1 + if count % 500 == 0: + print(f"... indexed {count} files") + except Exception as e: + print(f"[WARN] {path}: {e}", file=sys.stderr) + + dups, savings = write_csv_report(index, Path(args.report or DEFAULT_REPORT)) + print(f"Indexed {count} images. Found duplicate files: {dups}, potential savings: {savings/1_000_000:.2f} MB") + print(f"Index: {index}") + print(f"Report: {args.report or DEFAULT_REPORT}") -def extract_urls_from_html(text: str) -> Set[str]: - urls = set() - if BeautifulSoup is None: - return extract_urls_from_text(text) - try: - soup = BeautifulSoup(text, "html.parser") - for tag in soup.find_all(["img", "source"]): - src = tag.get("src") or tag.get("data-src") - if src and is_image_url(src): - urls.add(src) - # Fallback auf nackte URLs - urls |= extract_urls_from_text(text) - except Exception: - urls |= extract_urls_from_text(text) - return urls +def cmd_report(args): + index = Path(args.index or DEFAULT_INDEX) + csv_path = Path(args.report or DEFAULT_REPORT) + dups, savings = write_csv_report(index, csv_path) + print(f"Duplicates: {dups}, potential savings: {savings/1_000_000:.2f} MB") + if args.csv: + print(f"CSV written: {csv_path}") -def extract_urls_from_json(obj) -> Set[str]: - urls = set() - if isinstance(obj, dict): - for v in obj.values(): - urls |= extract +def cmd_apply(args): + csv_report = Path(args.report or DEFAULT_REPORT) + if not csv_report.exists(): + raise FileNotFoundError(f"Report not found: {csv_report}") + stats = apply_changes(csv_report, args.mode, args.dry_run) + print(f"Processed: {stats.processed}, Errors: {stats.errors}, Saved: {stats.saved_bytes/1_000_000:.2f} MB (mode={args.mode}, dry_run={args.dry_run})") + if args.update_db: + # Platzhalter: hier könntest du eure DB-Referenzen aktualisieren (falls Bilder-Paths in DB gespeichert sind). + # Beispiel: SQLModel mit Tabelle ImageMeta(content_hash UNIQUE, local_path) → auf kanonischen Pfad umbiegen. + print(f"[INFO] DB update requested for: {args.update_db} (implementierung projektspezifisch)") + + +def main(): + ap = argparse.ArgumentParser(description="Bild-Deduplizierung (scan/report/apply)") + sub = ap.add_subparsers(dest="cmd", required=True) + + sc = sub.add_parser("scan", help="Verzeichnisse scannen und Index/Report erstellen") + sc.add_argument("--roots", required=True, help="Kommagetrennte Wurzelpfade, z.B. 'media,assets/images'") + sc.add_argument("--out-dir", default=".dedupe", help="Ausgabeverzeichnis für Index/Reports") + sc.add_argument("--index", help="Pfad zur SQLite-Indexdatei (default .dedupe/index.sqlite)") + sc.add_argument("--report", help="Pfad zum CSV-Report (default .dedupe/report.csv)") + sc.add_argument("--phash", action="store_true", help="Perzeptuellen Hash berechnen (für zukünftige Near-Dups)") + sc.set_defaults(func=cmd_scan) + + rp = sub.add_parser("report", help="Report neu generieren/anzeigen") + rp.add_argument("--index", help="Pfad zur SQLite-Indexdatei") + rp.add_argument("--report", help="Pfad zum CSV-Report") + rp.add_argument("--csv", action="store_true", help="CSV-Pfad ausgeben") + rp.set_defaults(func=cmd_report) + + aply = sub.add_parser("apply", help="Änderungen anwenden (Hardlink/Delete)") + aply.add_argument("--report", help="Pfad zum CSV-Report") + aply.add_argument("--mode", choices=["hardlink", "delete"], default="hardlink", help="Strategie für Duplikate") + aply.add_argument("--dry-run", action="store_true", help="Nur anzeigen, nichts ändern") + aply.add_argument("--update-db", help="Optional: DB-URL für Referenz-Updates (projektspezifisch)") + aply.set_defaults(func=cmd_apply) + + args = ap.parse_args() + args.func(args) + + +if __name__ == "__main__": + main()