Image Dublettenprüfung
This commit is contained in:
parent
0cfbb6c37f
commit
a02f825274
1 changed files with 110 additions and 95 deletions
|
|
@ -2,40 +2,39 @@
|
|||
"""
|
||||
image_deduper.py — Finde und bereinige Bild-Dubletten sicher & reversibel.
|
||||
|
||||
Neu:
|
||||
- collect: Bild-URLs aus JSON/HTML/Markdown/Text sammeln und lokal in .media_cache/ speichern
|
||||
- scan/report/apply: wie gehabt (Hardlink/Delete), jetzt standardmäßig mit Cache nutzbar
|
||||
Funktionen:
|
||||
- Scan: Verzeichnisse rekursiv scannen, sha256 + pHash berechnen
|
||||
- Report: CSV + menschenlesbare Zusammenfassung mit Gruppen
|
||||
- Apply: Duplikate auf kanonische Datei umbiegen (Hardlink/Löschen)
|
||||
- Optional: DB-Referenzen aktualisieren (SQLite/SQLModel kompatibel)
|
||||
|
||||
Workflows:
|
||||
# A) Nur remote-Referenzen vorhanden → erst sammeln, dann deduplizieren
|
||||
python tools/image_deduper.py collect --sources processed_articles.json,content/ --cache .media_cache
|
||||
python tools/image_deduper.py scan --roots .media_cache --out-dir .dedupe --phash
|
||||
python tools/image_deduper.py apply --report .dedupe/report.csv --mode hardlink --dry-run
|
||||
python tools/image_deduper.py apply --report .dedupe/report.csv --mode hardlink
|
||||
Nutzung (Beispiele):
|
||||
# 1) Nur scannen + reporten (keine Änderungen):
|
||||
python tools/image_deduper.py scan --roots media,assets/images --out-dir .dedupe --phash
|
||||
|
||||
# B) Lokale Bild-Ordner vorhanden
|
||||
python tools/image_deduper.py scan --roots ./media,./static/images --out-dir .dedupe --phash
|
||||
# 2) Report anzeigen:
|
||||
python tools/image_deduper.py report --index .dedupe/index.sqlite --csv
|
||||
|
||||
Hinweise:
|
||||
- "collect" speichert Bilder content-addressed (<sha256>.<ext>) und lädt identische Dateien nicht doppelt.
|
||||
- Für HTML/Markdown werden Bild-URLs geparst; für JSON werden alle Stringwerte mit Bild-URL-Muster extrahiert.
|
||||
# 3) Anwenden (Hardlinks setzen, Dry-Run):
|
||||
python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink --dry-run
|
||||
|
||||
# 4) Anwenden (wirklich ändern):
|
||||
python tools/image_deduper.py apply --index .dedupe/index.sqlite --mode hardlink
|
||||
|
||||
# 5) Referenzen in DB aktualisieren (optional):
|
||||
python tools/image_deduper.py apply --index .dedupe/index.sqlite --update-db sqlite:///./rssbot.db --dry-run
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import hashlib
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Iterable, List, Optional, Tuple, Set
|
||||
from urllib.parse import urlparse
|
||||
from typing import Iterable, List, Optional, Tuple
|
||||
|
||||
# Optionale Parser/Helpers
|
||||
try:
|
||||
from PIL import Image
|
||||
except ImportError:
|
||||
|
|
@ -46,39 +45,12 @@ try:
|
|||
except ImportError:
|
||||
imagehash = None
|
||||
|
||||
try:
|
||||
import requests
|
||||
except ImportError:
|
||||
requests = None
|
||||
|
||||
try:
|
||||
from bs4 import BeautifulSoup
|
||||
except ImportError:
|
||||
BeautifulSoup = None
|
||||
|
||||
try:
|
||||
from tqdm import tqdm
|
||||
except ImportError:
|
||||
tqdm = None
|
||||
|
||||
|
||||
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
|
||||
DEFAULT_INDEX = ".dedupe/index.sqlite"
|
||||
DEFAULT_REPORT = ".dedupe/report.csv"
|
||||
DEFAULT_CACHE = ".media_cache"
|
||||
|
||||
URL_RE = re.compile(
|
||||
r"""https?://[^\s"'<>]+?\.(?:jpg|jpeg|png|webp|gif)(?:\?[^\s"'<>]*)?""",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
def human_mb(nbytes: int) -> str:
|
||||
return f"{nbytes/1_000_000:.2f} MB"
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# Hashing & pHash
|
||||
# ---------------------------
|
||||
def sha256_file(path: Path, bufsize: int = 1024 * 1024) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as f:
|
||||
|
|
@ -102,9 +74,6 @@ def calc_phash(path: Path) -> Optional[str]:
|
|||
return None
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# Index (SQLite)
|
||||
# ---------------------------
|
||||
def ensure_dir(p: Path):
|
||||
p.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
|
@ -135,9 +104,6 @@ def is_image(path: Path) -> bool:
|
|||
|
||||
def walk_images(roots: List[Path]) -> Iterable[Path]:
|
||||
for root in roots:
|
||||
if not root.exists():
|
||||
# Skip silently to be more forgiving; user might pass multiple roots
|
||||
continue
|
||||
for p in root.rglob("*"):
|
||||
if p.is_file() and is_image(p):
|
||||
yield p
|
||||
|
|
@ -163,9 +129,12 @@ def upsert_file(db_path: Path, path: Path, sha256: str, phash: Optional[str]):
|
|||
|
||||
|
||||
def group_by_sha256(db_path: Path) -> List[List[Tuple[int, str, int]]]:
|
||||
"""Return list of groups: [(id, path, size), ...] where sha256 identical and len(group) > 1."""
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT sha256 FROM files GROUP BY sha256 HAVING COUNT(*) > 1;")
|
||||
cur.execute("""
|
||||
SELECT sha256 FROM files GROUP BY sha256 HAVING COUNT(*) > 1;
|
||||
""")
|
||||
hashes = [r[0] for r in cur.fetchall()]
|
||||
groups = []
|
||||
for h in hashes:
|
||||
|
|
@ -188,7 +157,8 @@ def write_csv_report(db_path: Path, csv_path: Path) -> Tuple[int, int]:
|
|||
for g in groups:
|
||||
if not g:
|
||||
continue
|
||||
canonical = max(g, key=lambda x: x[2]) # größte Datei als Kanon
|
||||
# Kanon: größte Datei (oder erste)
|
||||
canonical = max(g, key=lambda x: x[2])
|
||||
for rid, path, size in g:
|
||||
if path == canonical[1]:
|
||||
continue
|
||||
|
|
@ -199,22 +169,20 @@ def write_csv_report(db_path: Path, csv_path: Path) -> Tuple[int, int]:
|
|||
return total_dups, total_savings
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# Apply (Hardlink/Delete)
|
||||
# ---------------------------
|
||||
def apply_hardlink(canonical: Path, dup: Path, dry_run: bool) -> None:
|
||||
# Ersetzt dup durch Hardlink auf canonical (gleiche Partition nötig)
|
||||
if dry_run:
|
||||
return
|
||||
tmp = dup.with_suffix(dup.suffix + ".dedupe.tmp")
|
||||
dup.unlink(missing_ok=False)
|
||||
os.link(canonical, tmp) # gleicher FS notwendig
|
||||
tmp.replace(dup)
|
||||
dup.unlink() # entferne dup
|
||||
os.link(canonical, tmp) # hardlink temp
|
||||
tmp.replace(dup) # atomarer move
|
||||
|
||||
|
||||
def apply_delete(dup: Path, dry_run: bool) -> None:
|
||||
if dry_run:
|
||||
return
|
||||
dup.unlink(missing_ok=False)
|
||||
dup.unlink()
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -247,42 +215,89 @@ def apply_changes(csv_report: Path, mode: str, dry_run: bool) -> ApplyStats:
|
|||
return stats
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# Collect (URLs -> Cache)
|
||||
# ---------------------------
|
||||
def is_image_url(url: str) -> bool:
|
||||
if URL_RE.search(url):
|
||||
return True
|
||||
# Fallback: Extension anhand des Pfads
|
||||
path = urlparse(url).path
|
||||
ext = Path(path).suffix.lower()
|
||||
return ext in IMAGE_EXTS
|
||||
def parse_roots(roots_arg: str) -> List[Path]:
|
||||
parts = [Path(p.strip()) for p in roots_arg.split(",") if p.strip()]
|
||||
for p in parts:
|
||||
if not p.exists():
|
||||
raise FileNotFoundError(f"Root not found: {p}")
|
||||
return parts
|
||||
|
||||
|
||||
def extract_urls_from_text(text: str) -> Set[str]:
|
||||
urls = set(URL_RE.findall(text))
|
||||
return urls
|
||||
def cmd_scan(args):
|
||||
out_dir = Path(args.out_dir)
|
||||
index = Path(args.index or DEFAULT_INDEX)
|
||||
ensure_dir(out_dir)
|
||||
ensure_dir(index.parent)
|
||||
init_index(index)
|
||||
roots = parse_roots(args.roots)
|
||||
|
||||
|
||||
def extract_urls_from_html(text: str) -> Set[str]:
|
||||
urls = set()
|
||||
if BeautifulSoup is None:
|
||||
return extract_urls_from_text(text)
|
||||
count = 0
|
||||
for path in walk_images(roots):
|
||||
try:
|
||||
soup = BeautifulSoup(text, "html.parser")
|
||||
for tag in soup.find_all(["img", "source"]):
|
||||
src = tag.get("src") or tag.get("data-src")
|
||||
if src and is_image_url(src):
|
||||
urls.add(src)
|
||||
# Fallback auf nackte URLs
|
||||
urls |= extract_urls_from_text(text)
|
||||
except Exception:
|
||||
urls |= extract_urls_from_text(text)
|
||||
return urls
|
||||
h = sha256_file(path)
|
||||
ph = calc_phash(path) if args.phash else None
|
||||
upsert_file(index, path, h, ph)
|
||||
count += 1
|
||||
if count % 500 == 0:
|
||||
print(f"... indexed {count} files")
|
||||
except Exception as e:
|
||||
print(f"[WARN] {path}: {e}", file=sys.stderr)
|
||||
|
||||
dups, savings = write_csv_report(index, Path(args.report or DEFAULT_REPORT))
|
||||
print(f"Indexed {count} images. Found duplicate files: {dups}, potential savings: {savings/1_000_000:.2f} MB")
|
||||
print(f"Index: {index}")
|
||||
print(f"Report: {args.report or DEFAULT_REPORT}")
|
||||
|
||||
|
||||
def extract_urls_from_json(obj) -> Set[str]:
|
||||
urls = set()
|
||||
if isinstance(obj, dict):
|
||||
for v in obj.values():
|
||||
urls |= extract
|
||||
def cmd_report(args):
|
||||
index = Path(args.index or DEFAULT_INDEX)
|
||||
csv_path = Path(args.report or DEFAULT_REPORT)
|
||||
dups, savings = write_csv_report(index, csv_path)
|
||||
print(f"Duplicates: {dups}, potential savings: {savings/1_000_000:.2f} MB")
|
||||
if args.csv:
|
||||
print(f"CSV written: {csv_path}")
|
||||
|
||||
|
||||
def cmd_apply(args):
|
||||
csv_report = Path(args.report or DEFAULT_REPORT)
|
||||
if not csv_report.exists():
|
||||
raise FileNotFoundError(f"Report not found: {csv_report}")
|
||||
stats = apply_changes(csv_report, args.mode, args.dry_run)
|
||||
print(f"Processed: {stats.processed}, Errors: {stats.errors}, Saved: {stats.saved_bytes/1_000_000:.2f} MB (mode={args.mode}, dry_run={args.dry_run})")
|
||||
if args.update_db:
|
||||
# Platzhalter: hier könntest du eure DB-Referenzen aktualisieren (falls Bilder-Paths in DB gespeichert sind).
|
||||
# Beispiel: SQLModel mit Tabelle ImageMeta(content_hash UNIQUE, local_path) → auf kanonischen Pfad umbiegen.
|
||||
print(f"[INFO] DB update requested for: {args.update_db} (implementierung projektspezifisch)")
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Bild-Deduplizierung (scan/report/apply)")
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
sc = sub.add_parser("scan", help="Verzeichnisse scannen und Index/Report erstellen")
|
||||
sc.add_argument("--roots", required=True, help="Kommagetrennte Wurzelpfade, z.B. 'media,assets/images'")
|
||||
sc.add_argument("--out-dir", default=".dedupe", help="Ausgabeverzeichnis für Index/Reports")
|
||||
sc.add_argument("--index", help="Pfad zur SQLite-Indexdatei (default .dedupe/index.sqlite)")
|
||||
sc.add_argument("--report", help="Pfad zum CSV-Report (default .dedupe/report.csv)")
|
||||
sc.add_argument("--phash", action="store_true", help="Perzeptuellen Hash berechnen (für zukünftige Near-Dups)")
|
||||
sc.set_defaults(func=cmd_scan)
|
||||
|
||||
rp = sub.add_parser("report", help="Report neu generieren/anzeigen")
|
||||
rp.add_argument("--index", help="Pfad zur SQLite-Indexdatei")
|
||||
rp.add_argument("--report", help="Pfad zum CSV-Report")
|
||||
rp.add_argument("--csv", action="store_true", help="CSV-Pfad ausgeben")
|
||||
rp.set_defaults(func=cmd_report)
|
||||
|
||||
aply = sub.add_parser("apply", help="Änderungen anwenden (Hardlink/Delete)")
|
||||
aply.add_argument("--report", help="Pfad zum CSV-Report")
|
||||
aply.add_argument("--mode", choices=["hardlink", "delete"], default="hardlink", help="Strategie für Duplikate")
|
||||
aply.add_argument("--dry-run", action="store_true", help="Nur anzeigen, nichts ändern")
|
||||
aply.add_argument("--update-db", help="Optional: DB-URL für Referenz-Updates (projektspezifisch)")
|
||||
aply.set_defaults(func=cmd_apply)
|
||||
|
||||
args = ap.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue