Create image_deduper.py

Funktionen:
- Scan: Verzeichnisse rekursiv scannen, sha256 + pHash berechnen
- Report: CSV + menschenlesbare Zusammenfassung mit Gruppen
- Apply: Duplikate auf kanonische Datei umbiegen (Hardlink/Löschen)
- Optional: DB-Referenzen aktualisieren (SQLite/SQLModel kompatibel)
This commit is contained in:
Oliver 2025-08-17 17:56:04 +02:00 committed by GitHub
parent 759a313f31
commit ed91864eda
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

303
tools/image_deduper.py Normal file
View file

@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""
image_deduper.py Finde und bereinige Bild-Dubletten sicher & reversibel.
Funktionen:
- Scan: Verzeichnisse rekursiv scannen, sha256 + pHash berechnen
- Report: CSV + menschenlesbare Zusammenfassung mit Gruppen
- Apply: Duplikate auf kanonische Datei umbiegen (Hardlink/Löschen)
- Optional: DB-Referenzen aktualisieren (SQLite/SQLModel kompatibel)
Nutzung (Beispiele):
# 1) Nur scannen + reporten (keine Änderungen):
python tools/image_deduper.py scan --roots media,assets/images --out-dir .dedupe --phash
# 2) Report anzeigen:
python tools/image_deduper.py report --index .dedupe/index.sqlite --csv
# 3) Anwenden (Hardlinks setzen, Dry-Run):
python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink --dry-run
# 4) Anwenden (wirklich ändern):
python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink
# 5) Referenzen in DB aktualisieren (optional):
python tools/image_deduper.py apply --index .dedupe/index.sqlite --update-db sqlite:///./rssbot.db --dry-run
"""
import argparse
import csv
import hashlib
import os
import sqlite3
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Optional, Tuple
try:
from PIL import Image
except ImportError:
Image = None
try:
import imagehash # type: ignore
except ImportError:
imagehash = None
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
DEFAULT_INDEX = ".dedupe/index.sqlite"
DEFAULT_REPORT = ".dedupe/report.csv"
def sha256_file(path: Path, bufsize: int = 1024 * 1024) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
b = f.read(bufsize)
if not b:
break
h.update(b)
return h.hexdigest()
def calc_phash(path: Path) -> Optional[str]:
if Image is None or imagehash is None:
return None
try:
with Image.open(path) as im:
im = im.convert("RGB")
ph = imagehash.phash(im, hash_size=16) # 16x16 → 256-bit
return str(ph)
except Exception:
return None
def ensure_dir(p: Path):
p.mkdir(parents=True, exist_ok=True)
def init_index(db_path: Path):
conn = sqlite3.connect(str(db_path))
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
size INTEGER NOT NULL,
mtime REAL NOT NULL,
sha256 TEXT NOT NULL,
phash TEXT,
ext TEXT NOT NULL
);
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_sha256 ON files (sha256);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_phash ON files (phash);")
conn.commit()
conn.close()
def is_image(path: Path) -> bool:
return path.suffix.lower() in IMAGE_EXTS
def walk_images(roots: List[Path]) -> Iterable[Path]:
for root in roots:
for p in root.rglob("*"):
if p.is_file() and is_image(p):
yield p
def upsert_file(db_path: Path, path: Path, sha256: str, phash: Optional[str]):
st = path.stat()
row = (str(path), st.st_size, st.st_mtime, sha256, phash, path.suffix.lower())
conn = sqlite3.connect(str(db_path))
cur = conn.cursor()
cur.execute("""
INSERT INTO files (path, size, mtime, sha256, phash, ext)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(path) DO UPDATE SET
size=excluded.size,
mtime=excluded.mtime,
sha256=excluded.sha256,
phash=excluded.phash,
ext=excluded.ext;
""", row)
conn.commit()
conn.close()
def group_by_sha256(db_path: Path) -> List[List[Tuple[int, str, int]]]:
"""Return list of groups: [(id, path, size), ...] where sha256 identical and len(group) > 1."""
conn = sqlite3.connect(str(db_path))
cur = conn.cursor()
cur.execute("""
SELECT sha256 FROM files GROUP BY sha256 HAVING COUNT(*) > 1;
""")
hashes = [r[0] for r in cur.fetchall()]
groups = []
for h in hashes:
cur.execute("SELECT id, path, size FROM files WHERE sha256=?", (h,))
rows = cur.fetchall()
groups.append([(rid, rpath, rsize) for rid, rpath, rsize in rows])
conn.close()
return groups
def write_csv_report(db_path: Path, csv_path: Path) -> Tuple[int, int]:
groups = group_by_sha256(db_path)
ensure_dir(csv_path.parent)
total_dups = 0
total_savings = 0
with csv_path.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["group_id", "canonical_path", "dup_path", "dup_size_bytes"])
gid = 0
for g in groups:
if not g:
continue
# Kanon: größte Datei (oder erste)
canonical = max(g, key=lambda x: x[2])
for rid, path, size in g:
if path == canonical[1]:
continue
total_dups += 1
total_savings += size
w.writerow([gid, canonical[1], path, size])
gid += 1
return total_dups, total_savings
def apply_hardlink(canonical: Path, dup: Path, dry_run: bool) -> None:
# Ersetzt dup durch Hardlink auf canonical (gleiche Partition nötig)
if dry_run:
return
tmp = dup.with_suffix(dup.suffix + ".dedupe.tmp")
dup.unlink() # entferne dup
os.link(canonical, tmp) # hardlink temp
tmp.replace(dup) # atomarer move
def apply_delete(dup: Path, dry_run: bool) -> None:
if dry_run:
return
dup.unlink()
@dataclass
class ApplyStats:
processed: int = 0
errors: int = 0
saved_bytes: int = 0
def apply_changes(csv_report: Path, mode: str, dry_run: bool) -> ApplyStats:
stats = ApplyStats()
with csv_report.open("r", encoding="utf-8") as f:
r = csv.DictReader(f)
for row in r:
canonical = Path(row["canonical_path"])
dup = Path(row["dup_path"])
size = int(row["dup_size_bytes"])
try:
if mode == "hardlink":
apply_hardlink(canonical, dup, dry_run)
elif mode == "delete":
apply_delete(dup, dry_run)
else:
raise ValueError("mode must be 'hardlink' or 'delete'")
stats.processed += 1
stats.saved_bytes += size
except Exception as e:
stats.errors += 1
print(f"[ERROR] {dup}: {e}", file=sys.stderr)
return stats
def parse_roots(roots_arg: str) -> List[Path]:
parts = [Path(p.strip()) for p in roots_arg.split(",") if p.strip()]
for p in parts:
if not p.exists():
raise FileNotFoundError(f"Root not found: {p}")
return parts
def cmd_scan(args):
out_dir = Path(args.out_dir)
index = Path(args.index or DEFAULT_INDEX)
ensure_dir(out_dir)
ensure_dir(index.parent)
init_index(index)
roots = parse_roots(args.roots)
count = 0
for path in walk_images(roots):
try:
h = sha256_file(path)
ph = calc_phash(path) if args.phash else None
upsert_file(index, path, h, ph)
count += 1
if count % 500 == 0:
print(f"... indexed {count} files")
except Exception as e:
print(f"[WARN] {path}: {e}", file=sys.stderr)
dups, savings = write_csv_report(index, Path(args.report or DEFAULT_REPORT))
print(f"Indexed {count} images. Found duplicate files: {dups}, potential savings: {savings/1_000_000:.2f} MB")
print(f"Index: {index}")
print(f"Report: {args.report or DEFAULT_REPORT}")
def cmd_report(args):
index = Path(args.index or DEFAULT_INDEX)
csv_path = Path(args.report or DEFAULT_REPORT)
dups, savings = write_csv_report(index, csv_path)
print(f"Duplicates: {dups}, potential savings: {savings/1_000_000:.2f} MB")
if args.csv:
print(f"CSV written: {csv_path}")
def cmd_apply(args):
csv_report = Path(args.report or DEFAULT_REPORT)
if not csv_report.exists():
raise FileNotFoundError(f"Report not found: {csv_report}")
stats = apply_changes(csv_report, args.mode, args.dry_run)
print(f"Processed: {stats.processed}, Errors: {stats.errors}, Saved: {stats.saved_bytes/1_000_000:.2f} MB (mode={args.mode}, dry_run={args.dry_run})")
if args.update_db:
# Platzhalter: hier könntest du eure DB-Referenzen aktualisieren (falls Bilder-Paths in DB gespeichert sind).
# Beispiel: SQLModel mit Tabelle ImageMeta(content_hash UNIQUE, local_path) → auf kanonischen Pfad umbiegen.
print(f"[INFO] DB update requested for: {args.update_db} (implementierung projektspezifisch)")
def main():
ap = argparse.ArgumentParser(description="Bild-Deduplizierung (scan/report/apply)")
sub = ap.add_subparsers(dest="cmd", required=True)
sc = sub.add_parser("scan", help="Verzeichnisse scannen und Index/Report erstellen")
sc.add_argument("--roots", required=True, help="Kommagetrennte Wurzelpfade, z.B. 'media,assets/images'")
sc.add_argument("--out-dir", default=".dedupe", help="Ausgabeverzeichnis für Index/Reports")
sc.add_argument("--index", help="Pfad zur SQLite-Indexdatei (default .dedupe/index.sqlite)")
sc.add_argument("--report", help="Pfad zum CSV-Report (default .dedupe/report.csv)")
sc.add_argument("--phash", action="store_true", help="Perzeptuellen Hash berechnen (für zukünftige Near-Dups)")
sc.set_defaults(func=cmd_scan)
rp = sub.add_parser("report", help="Report neu generieren/anzeigen")
rp.add_argument("--index", help="Pfad zur SQLite-Indexdatei")
rp.add_argument("--report", help="Pfad zum CSV-Report")
rp.add_argument("--csv", action="store_true", help="CSV-Pfad ausgeben")
rp.set_defaults(func=cmd_report)
aply = sub.add_parser("apply", help="Änderungen anwenden (Hardlink/Delete)")
aply.add_argument("--report", help="Pfad zum CSV-Report")
aply.add_argument("--mode", choices=["hardlink", "delete"], default="hardlink", help="Strategie für Duplikate")
aply.add_argument("--dry-run", action="store_true", help="Nur anzeigen, nichts ändern")
aply.add_argument("--update-db", help="Optional: DB-URL für Referenz-Updates (projektspezifisch)")
aply.set_defaults(func=cmd_apply)
args = ap.parse_args()
args.func(args)
if __name__ == "__main__":
main()