rss-news/backend/app/source_extraction.py
OliverGiertz aaac5def27 feat(pipeline): image caption/credit extraction, no-image exclusion, WP attribution
source_extraction.py:
- New _extract_image_metadata(): extracts figcaption text + copyright/credit
  per image URL using 3 strategies (figure+figcaption, data-* attributes,
  adjacent credit spans)
- ExtractedArticle gets new image_metadata field
- extracted_article_to_meta() includes image_metadata in stored JSON

pipeline.py:
- After auto image selection, check if selected_url is set
- Articles without usable image → status "no_image" (excluded with Telegram notice)
- PipelineStats and summary report include no_image counter

db.py:
- Add "no_image" to articles status CHECK constraint
- Migration: recreates articles table with updated constraint on existing DBs

workflow.py / main.py:
- Map no_image as own UI status with rewrite/close transitions

wordpress.py:
- _upload_featured_media() accepts image_caption param, sends to WP media
- _get_image_meta_for_url() / _build_image_caption() helpers
- _build_attribution_block(): separator + attribution paragraph at article end
  (original link, author, Bildnachweis/credit)
- _build_post_content() appends attribution block

telegram_bot.py:
- notify_pipeline_done() shows 🖼️ no-image count

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 07:08:48 +00:00

442 lines
15 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from html import unescape
import re
from typing import Any
from urllib.parse import urljoin
from urllib.request import Request, urlopen
DEFAULT_TIMEOUT_SECONDS = 10
DEFAULT_USER_AGENT = "rss-news-bot/1.0 (+https://news.vanityontour.de)"
@dataclass(frozen=True)
class ExtractedArticle:
title: str | None
author: str | None
canonical_url: str | None
summary: str | None
content_text: str | None
images: list[str]
press_contact: str | None
extraction_error: str | None = None
image_metadata: dict[str, dict] = field(default_factory=dict)
def _clean_text(raw: str | None) -> str | None:
if not raw:
return None
text = unescape(raw)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text or None
def _strip_noise(html: str) -> str:
html = re.sub(r"<script[\s\S]*?</script>", " ", html, flags=re.IGNORECASE)
html = re.sub(r"<style[\s\S]*?</style>", " ", html, flags=re.IGNORECASE)
html = re.sub(r"<noscript[\s\S]*?</noscript>", " ", html, flags=re.IGNORECASE)
return html
def _meta_content(html: str, attr: str, value: str) -> str | None:
pattern = re.compile(
rf"<meta[^>]+{attr}\s*=\s*[\"']{re.escape(value)}[\"'][^>]*content\s*=\s*[\"']([^\"']+)[\"'][^>]*>",
re.IGNORECASE,
)
match = pattern.search(html)
if match:
return _clean_text(match.group(1))
# handle reversed attribute order
pattern_rev = re.compile(
rf"<meta[^>]+content\s*=\s*[\"']([^\"']+)[\"'][^>]*{attr}\s*=\s*[\"']{re.escape(value)}[\"'][^>]*>",
re.IGNORECASE,
)
match = pattern_rev.search(html)
if match:
return _clean_text(match.group(1))
return None
def _extract_title(html: str) -> str | None:
title = _meta_content(html, "property", "og:title")
if title:
return title
match = re.search(r"<title[^>]*>([\s\S]*?)</title>", html, re.IGNORECASE)
if match:
cleaned = _clean_text(match.group(1))
if cleaned:
return cleaned
match = re.search(r"<h1[^>]*>([\s\S]*?)</h1>", html, re.IGNORECASE)
if match:
return _clean_text(match.group(1))
return None
def _extract_canonical(html: str) -> str | None:
match = re.search(
r"<link[^>]+rel\s*=\s*[\"']canonical[\"'][^>]*href\s*=\s*[\"']([^\"']+)[\"'][^>]*>",
html,
re.IGNORECASE,
)
if match:
return _clean_text(match.group(1))
match = re.search(
r"<link[^>]+href\s*=\s*[\"']([^\"']+)[\"'][^>]*rel\s*=\s*[\"']canonical[\"'][^>]*>",
html,
re.IGNORECASE,
)
if match:
return _clean_text(match.group(1))
return None
def _extract_author(html: str) -> str | None:
for attr, value in (("name", "author"), ("property", "article:author"), ("property", "og:article:author")):
author = _meta_content(html, attr, value)
if author:
return author
for pattern in (
r"(?:Von|Autor(?:in)?)\s*[:\-]\s*([^<\n\r]{3,120})",
r"class=[\"'][^\"']*(?:author|byline)[^\"']*[\"'][^>]*>([\s\S]{1,180})<",
):
match = re.search(pattern, html, re.IGNORECASE)
if match:
author = _clean_text(match.group(1))
if author:
return author
return None
def _extract_images(html: str, page_url: str) -> list[str]:
images: list[str] = []
seen: set[str] = set()
for prop in ("og:image", "twitter:image"):
pattern = re.compile(
rf"<meta[^>]+property\s*=\s*[\"']{re.escape(prop)}[\"'][^>]*content\s*=\s*[\"']([^\"']+)[\"'][^>]*>",
re.IGNORECASE,
)
for match in pattern.finditer(html):
src = match.group(1).strip()
abs_src = urljoin(page_url, src)
if abs_src not in seen:
seen.add(abs_src)
images.append(abs_src)
for match in re.finditer(r"<img[^>]+src\s*=\s*[\"']([^\"']+)[\"'][^>]*>", html, re.IGNORECASE):
src = match.group(1).strip()
abs_src = urljoin(page_url, src)
if abs_src not in seen:
seen.add(abs_src)
images.append(abs_src)
return images
def _extract_content_text(html: str) -> str | None:
section = None
for pattern in (
r"<article[^>]*>([\s\S]*?)</article>",
r"<main[^>]*>([\s\S]*?)</main>",
r"<body[^>]*>([\s\S]*?)</body>",
):
match = re.search(pattern, html, re.IGNORECASE)
if match:
section = match.group(1)
break
if not section:
section = html
paragraphs = []
for match in re.finditer(r"<h[2-4][^>]*>([\s\S]*?)</h[2-4]>", section, re.IGNORECASE):
text = _clean_text(match.group(1))
if text and re.search(r"\b(pressekontakt|press contact|kontakt|agentur)\b", text, re.IGNORECASE):
paragraphs.append(text)
for match in re.finditer(r"<p[^>]*>([\s\S]*?)</p>", section, re.IGNORECASE):
text = _clean_text(match.group(1))
if text and len(text) > 2:
paragraphs.append(text)
if paragraphs:
return "\n".join(paragraphs)
stripped = _clean_text(section)
return stripped
def _extract_press_contact(content_text: str | None) -> str | None:
if not content_text:
return None
lines = [line.strip() for line in content_text.split("\n") if line.strip()]
marker_re = re.compile(r"\b(pressekontakt|press contact|presse\-kontakt|agentur)\b", re.IGNORECASE)
for idx, line in enumerate(lines):
if marker_re.search(line):
chunk = [line]
for nxt in lines[idx + 1 : idx + 6]:
if re.search(r"\b(original\-content von|ots:|newsroom:|alle meldungen|zum newsroom)\b", nxt, re.IGNORECASE):
break
chunk.append(nxt)
return _clean_text("\n".join(chunk))
match = re.search(
r"((?:Pressekontakt|Agentur)[\s\S]{0,1200}?)(?:Original-Content von|OTS:|newsroom:|Alle Meldungen|Zum Newsroom|$)",
content_text,
re.IGNORECASE,
)
if match:
return _clean_text(match.group(1))
return None
# CSS class keywords that indicate a copyright/credit element inside a figcaption
_CREDIT_CLASS_RE = re.compile(
r"class\s*=\s*[\"'][^\"']*(?:copyright|credit|photographer|photo-credit|image-credit|bildrechte|fotocredit)[^\"']*[\"']",
re.IGNORECASE,
)
# Inline text patterns that signal a credit/copyright notice
_CREDIT_TEXT_RE = re.compile(
r"(©[^<\n\r]{1,100}|(?:Foto|Bild|Credit|Fotograf|Fotografie)\s*:[^<\n\r]{1,100})",
re.IGNORECASE,
)
# data-* attribute names that carry credit/caption information directly on <img>
_IMG_DATA_CREDIT_ATTRS = ("data-credit", "data-photographer", "data-copyright")
_IMG_DATA_CAPTION_ATTRS = ("data-caption", "data-description")
# Class keywords for adjacent sibling credit spans/divs after an <img>
_ADJ_CREDIT_CLASS_RE = re.compile(
r"class\s*=\s*[\"'][^\"']*(?:copyright|credit|photographer|photo-credit|image-credit|bildrechte|fotocredit)[^\"']*[\"']",
re.IGNORECASE,
)
def _extract_image_metadata(html: str, page_url: str) -> dict[str, dict]:
"""Return a mapping of absolute image URL → {"caption": ..., "credit": ...}.
Uses three progressive strategies:
1. <figure> with <img> + <figcaption>
2. data-* attributes on <img> tags not already covered
3. <img> tags whose immediately following HTML contains a credit element
"""
result: dict[str, dict] = {}
try:
# ------------------------------------------------------------------
# Strategy 1: <figure> blocks containing <img> and <figcaption>
# ------------------------------------------------------------------
for fig_match in re.finditer(r"<figure[^>]*>([\s\S]*?)</figure>", html, re.IGNORECASE):
fig_html = fig_match.group(1)
# Locate image src (src or lazy-loaded data-src)
img_match = re.search(
r"<img[^>]+(?:src|data-src)\s*=\s*[\"']([^\"']+)[\"'][^>]*>",
fig_html,
re.IGNORECASE,
)
if not img_match:
continue
img_src = urljoin(page_url, img_match.group(1).strip())
# Locate figcaption
figcap_match = re.search(
r"<figcaption[^>]*>([\s\S]*?)</figcaption>",
fig_html,
re.IGNORECASE,
)
if not figcap_match:
continue
figcap_html = figcap_match.group(1)
# --- Extract credit ---
credit: str | None = None
# Try credit via class attribute on an inner element
credit_elem_match = re.search(
r"<(?:span|p|div)[^>]*"
+ _CREDIT_CLASS_RE.pattern
+ r"[^>]*>([\s\S]*?)</(?:span|p|div)>",
figcap_html,
re.IGNORECASE,
)
if credit_elem_match:
credit = _clean_text(credit_elem_match.group(1))
# Fallback: scan plain text of figcaption for credit patterns
if not credit:
figcap_text = unescape(re.sub(r"<[^>]+>", " ", figcap_html))
cred_text_match = _CREDIT_TEXT_RE.search(figcap_text)
if cred_text_match:
credit = _clean_text(cred_text_match.group(1))
# --- Extract caption (full figcaption text) ---
caption = _clean_text(figcap_html)
# Only store entries that carry at least one piece of metadata
if caption or credit:
entry: dict[str, str] = {}
if caption:
entry["caption"] = caption
if credit:
entry["credit"] = credit
result[img_src] = entry
# ------------------------------------------------------------------
# Strategy 2: data-* attributes on <img> tags
# ------------------------------------------------------------------
for img_match in re.finditer(r"<img([^>]+)>", html, re.IGNORECASE):
img_attrs = img_match.group(1)
# Resolve image URL (prefer src over data-src)
src_match = re.search(r'(?:^|\s)src\s*=\s*["\']([^"\']+)["\']', img_attrs, re.IGNORECASE)
if not src_match:
src_match = re.search(r'data-src\s*=\s*["\']([^"\']+)["\']', img_attrs, re.IGNORECASE)
if not src_match:
continue
img_src = urljoin(page_url, src_match.group(1).strip())
# Skip images already handled by Strategy 1
if img_src in result:
continue
credit: str | None = None
caption: str | None = None
for attr in _IMG_DATA_CREDIT_ATTRS:
attr_match = re.search(
rf'{re.escape(attr)}\s*=\s*["\']([^"\']+)["\']',
img_attrs,
re.IGNORECASE,
)
if attr_match:
credit = _clean_text(attr_match.group(1))
break
for attr in _IMG_DATA_CAPTION_ATTRS:
attr_match = re.search(
rf'{re.escape(attr)}\s*=\s*["\']([^"\']+)["\']',
img_attrs,
re.IGNORECASE,
)
if attr_match:
caption = _clean_text(attr_match.group(1))
break
if caption or credit:
entry = {}
if caption:
entry["caption"] = caption
if credit:
entry["credit"] = credit
result[img_src] = entry
# ------------------------------------------------------------------
# Strategy 3: <img> followed within 200 chars by a credit element
# ------------------------------------------------------------------
for img_match in re.finditer(r"<img([^>]+)>", html, re.IGNORECASE):
img_attrs = img_match.group(1)
src_match = re.search(r'(?:^|\s)src\s*=\s*["\']([^"\']+)["\']', img_attrs, re.IGNORECASE)
if not src_match:
src_match = re.search(r'data-src\s*=\s*["\']([^"\']+)["\']', img_attrs, re.IGNORECASE)
if not src_match:
continue
img_src = urljoin(page_url, src_match.group(1).strip())
# Skip images already handled by earlier strategies
if img_src in result:
continue
# Look at the 200 characters of HTML immediately after the img tag
after_start = img_match.end()
after_html = html[after_start : after_start + 200]
adj_match = re.search(
r"<(?:span|p|div)[^>]*"
+ _ADJ_CREDIT_CLASS_RE.pattern
+ r"[^>]*>([\s\S]*?)</(?:span|p|div)>",
after_html,
re.IGNORECASE,
)
if adj_match:
credit = _clean_text(adj_match.group(1))
if credit:
result[img_src] = {"credit": credit}
except Exception:
return {}
return result
def extract_article(url: str, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS) -> ExtractedArticle:
try:
req = Request(
url=url,
headers={
"User-Agent": DEFAULT_USER_AGENT,
"Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
},
)
with urlopen(req, timeout=timeout_seconds) as resp:
raw = resp.read()
charset = resp.headers.get_content_charset() or "utf-8"
html = raw.decode(charset, errors="replace")
except Exception as exc:
return ExtractedArticle(
title=None,
author=None,
canonical_url=None,
summary=None,
content_text=None,
images=[],
press_contact=None,
extraction_error=str(exc),
)
html = _strip_noise(html)
title = _extract_title(html)
author = _extract_author(html)
canonical_url = _extract_canonical(html)
summary = _meta_content(html, "name", "description")
content_text = _extract_content_text(html)
if not summary and content_text:
summary = _clean_text(content_text[:320])
images = _extract_images(html, url)
press_contact = _extract_press_contact(content_text)
image_metadata = _extract_image_metadata(html, url)
return ExtractedArticle(
title=title,
author=author,
canonical_url=canonical_url,
summary=summary,
content_text=content_text,
images=images,
press_contact=press_contact,
extraction_error=None,
image_metadata=image_metadata,
)
def extracted_article_to_meta(article: ExtractedArticle) -> dict[str, Any]:
return {
"title": article.title,
"author": article.author,
"canonical_url": article.canonical_url,
"summary": article.summary,
"images": article.images,
"press_contact": article.press_contact,
"extraction_error": article.extraction_error,
"image_metadata": article.image_metadata,
}