1. Article age filter (ingestion.py + config.py):
- New setting pipeline_max_article_age_days=7 (0 = no limit)
- Skip RSS entries older than N days before expensive extract_article()
- Prevents old articles from Google Alerts re-entering pipeline
2. Image URL pre-validation (ingestion.py):
- HEAD request probe for each primary image candidate during ingestion
- Falls back to next-best candidate if primary returns 4xx
- Network errors treated as OK to avoid false negatives on flaky servers
3. Stale WP draft cleanup (pipeline.py):
- Quality gate rejections now delete any pre-existing WP draft (wp_post_id)
- Prevents orphaned drafts when re-running articles that previously had drafts
4. Schedule overview UI (scheduler.py + admin_ui.py + admin_schedule.html):
- New /admin/schedule page showing calendar grid of all booked slots
- Distinguishes Pipeline-DB slots from WordPress-only slots
- Link added to dashboard navigation
5. Retry for failed articles (admin_ui.py + admin_dashboard.html):
- New POST /admin/articles/{id}/retry endpoint: resets to 'new', releases slot
- '🔄 Wiederholen' button shown in dashboard for all 'close' (error) articles
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
486 lines
19 KiB
Python
486 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import time
|
|
from typing import Any
|
|
from urllib.parse import unquote, urlencode, urlparse, parse_qs
|
|
import urllib.error
|
|
import urllib.request as _urllib_req
|
|
|
|
import feedparser
|
|
|
|
from .repositories import (
|
|
ArticleUpsert,
|
|
RunCreate,
|
|
create_run,
|
|
find_existing_article_for_upsert,
|
|
finish_run,
|
|
get_feed_by_id,
|
|
list_enabled_feeds,
|
|
update_feed_fetch_state,
|
|
upsert_article,
|
|
)
|
|
from .source_extraction import extract_article, extracted_article_to_meta
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class IngestionStats:
|
|
run_id: int
|
|
feeds_processed: int
|
|
entries_seen: int
|
|
articles_upserted: int
|
|
status: str
|
|
message: str
|
|
|
|
|
|
MAX_FEED_FETCH_RETRIES = 3
|
|
|
|
|
|
def _normalize_article_url(url: str) -> str:
|
|
"""Strip AMP and tracking query parameters from article URLs.
|
|
|
|
Removes ?outputType=valid_amp and other AMP/tracking params so that
|
|
AMP and non-AMP versions of the same article are deduplicated.
|
|
"""
|
|
_AMP_PARAMS = {"outputtype", "amp", "outputformat"}
|
|
try:
|
|
from urllib.parse import parse_qs, urlencode
|
|
parsed = urlparse(url)
|
|
if not parsed.query:
|
|
return url
|
|
params = parse_qs(parsed.query, keep_blank_values=True)
|
|
filtered = {k: v for k, v in params.items() if k.lower() not in _AMP_PARAMS}
|
|
new_query = urlencode(filtered, doseq=True)
|
|
return parsed._replace(query=new_query).geturl()
|
|
except Exception:
|
|
return url
|
|
|
|
|
|
def _resolve_google_redirect(url: str) -> str:
|
|
"""Extract the real article URL from Google redirect URLs.
|
|
|
|
Google Alerts feed entries use tracking links like:
|
|
https://www.google.com/url?rct=j&sa=t&url=<encoded_real_url>&ct=ga&...
|
|
|
|
This function returns the decoded real URL if detected, otherwise the
|
|
original URL unchanged.
|
|
"""
|
|
try:
|
|
parsed = urlparse(url)
|
|
host = (parsed.hostname or "").lower()
|
|
if host not in ("www.google.com", "google.com"):
|
|
return url
|
|
if parsed.path not in ("/url", "/url/"):
|
|
return url
|
|
params = parse_qs(parsed.query, keep_blank_values=False)
|
|
real_urls = params.get("url")
|
|
if real_urls:
|
|
return unquote(real_urls[0])
|
|
except Exception:
|
|
pass
|
|
return url
|
|
|
|
|
|
def _entry_published_iso(entry: dict) -> str | None:
|
|
published = entry.get("published_parsed") or entry.get("updated_parsed")
|
|
if not published:
|
|
return None
|
|
return datetime(*published[:6], tzinfo=timezone.utc).isoformat()
|
|
|
|
|
|
def _entry_text(entry: dict) -> tuple[str, str]:
|
|
summary = entry.get("summary", "") or ""
|
|
content = ""
|
|
if entry.get("content") and isinstance(entry.get("content"), list):
|
|
first = entry["content"][0]
|
|
content = first.get("value", "") if isinstance(first, dict) else ""
|
|
if not content:
|
|
content = summary
|
|
return summary, content
|
|
|
|
|
|
def _entry_hash(entry: dict, feed_id: int, link: str, title: str, summary: str) -> str:
|
|
source_id = entry.get("id") or entry.get("guid") or ""
|
|
published = _entry_published_iso(entry) or ""
|
|
fingerprint = f"{feed_id}|{source_id}|{link}|{title.strip()}|{summary.strip()}|{published}"
|
|
return hashlib.sha256(fingerprint.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _parsed_get(parsed: object, key: str, default: object = None) -> object:
|
|
if isinstance(parsed, dict):
|
|
return parsed.get(key, default)
|
|
return getattr(parsed, key, default)
|
|
|
|
|
|
def _normalize_tokens(text: str) -> set[str]:
|
|
normalized = re.sub(r"[^a-z0-9]+", " ", text.lower())
|
|
return {token for token in normalized.split() if len(token) >= 4}
|
|
|
|
|
|
def _probe_image_url(url: str, timeout: int = 5) -> bool:
|
|
"""Return True if URL responds without a 4xx/5xx error (HEAD request).
|
|
|
|
Returns True on network/connection errors so that a flaky server does not
|
|
cause a valid image to be silently dropped.
|
|
"""
|
|
try:
|
|
req = _urllib_req.Request(
|
|
url,
|
|
method="HEAD",
|
|
headers={"User-Agent": "Mozilla/5.0 (compatible; rss-news/1.0)"},
|
|
)
|
|
with _urllib_req.urlopen(req, timeout=timeout) as resp:
|
|
return resp.status < 400
|
|
except urllib.error.HTTPError as exc:
|
|
return exc.code < 400 # 3xx redirects are OK; 4xx/5xx are not
|
|
except Exception:
|
|
return True # network error → don't filter, let WP try later
|
|
|
|
|
|
def _rank_image_candidates(source_url: str, title: str, images: list[str]) -> list[dict[str, Any]]:
|
|
source_host = (urlparse(source_url).hostname or "").lower()
|
|
is_presseportal = "presseportal.de" in source_host
|
|
title_tokens = _normalize_tokens(title)
|
|
blocked_patterns = ("logo", "badge", "app-store", "google-play", "na-logo", "sprite", "icon", "favicon", "tracking", "pixel", ".svg", ".ico", ".gif")
|
|
# Known placeholder/default images that should never be used as featured image
|
|
placeholder_patterns = ("some-default.jpg", "default-image", "placeholder", "no-image", "noimage")
|
|
|
|
|
|
ranked: list[dict[str, Any]] = []
|
|
for url in images:
|
|
# Skip inline data: URIs (e.g. base64-encoded SVG placeholders)
|
|
if url.startswith("data:"):
|
|
continue
|
|
|
|
parsed = urlparse(url)
|
|
path = unquote(parsed.path.lower())
|
|
full = f"{parsed.netloc.lower()}{path}"
|
|
score = 0
|
|
reasons: list[str] = []
|
|
|
|
if any(token in full for token in placeholder_patterns):
|
|
score -= 300
|
|
reasons.append("placeholder-image")
|
|
|
|
if any(token in full for token in blocked_patterns):
|
|
score -= 150
|
|
reasons.append("blocked-pattern")
|
|
|
|
if is_presseportal and "/thumbnail/story_big/" in path:
|
|
score += 120
|
|
reasons.append("presseportal-story-big")
|
|
elif is_presseportal and "/thumbnail/highlight/" in path:
|
|
score += 45
|
|
reasons.append("presseportal-highlight")
|
|
elif is_presseportal and "/thumbnail/liste/" in path:
|
|
score -= 40
|
|
reasons.append("presseportal-list")
|
|
|
|
if "crop=" in (parsed.query or "").lower():
|
|
score -= 10
|
|
reasons.append("cropped-preview")
|
|
|
|
path_tokens = _normalize_tokens(path.replace("-", " "))
|
|
overlap = len(title_tokens.intersection(path_tokens))
|
|
if overlap > 0:
|
|
score += min(30, overlap * 6)
|
|
reasons.append(f"title-match:{overlap}")
|
|
|
|
ranked.append({"url": url, "score": score, "reasons": reasons})
|
|
|
|
ranked.sort(key=lambda item: item["score"], reverse=True)
|
|
return ranked
|
|
|
|
|
|
def _select_relevant_images(source_url: str, title: str, images: list[str], max_keep: int = 3) -> tuple[list[str], str | None, list[dict[str, Any]]]:
|
|
# dedupe incoming order first
|
|
deduped: list[str] = []
|
|
seen: set[str] = set()
|
|
for image in images:
|
|
if image and image not in seen:
|
|
seen.add(image)
|
|
deduped.append(image)
|
|
|
|
ranked = _rank_image_candidates(source_url, title, deduped)
|
|
candidates = [item["url"] for item in ranked if item["score"] > -100]
|
|
|
|
# Probe top candidates (max 4) to skip definitively broken URLs (HTTP 4xx).
|
|
# Network errors are treated as OK to avoid false negatives on flaky servers.
|
|
primary = None
|
|
kept: list[str] = []
|
|
for url in candidates[:4]:
|
|
if _probe_image_url(url):
|
|
if primary is None:
|
|
primary = url
|
|
kept.append(url)
|
|
if len(kept) >= max_keep:
|
|
break
|
|
|
|
# Fallback: if all probes failed with network errors, use best candidate anyway
|
|
if not kept and candidates:
|
|
primary = candidates[0]
|
|
kept = candidates[:max_keep]
|
|
|
|
return kept, primary, ranked
|
|
|
|
|
|
def _merge_ingestion_meta(existing_meta_json: str | None, attribution: dict[str, Any], extraction_meta: dict[str, Any]) -> str:
|
|
meta: dict[str, Any] = {}
|
|
if existing_meta_json:
|
|
try:
|
|
parsed = json.loads(existing_meta_json)
|
|
if isinstance(parsed, dict):
|
|
meta = parsed
|
|
except Exception:
|
|
meta = {}
|
|
meta["attribution"] = attribution
|
|
meta["extraction"] = extraction_meta
|
|
return json.dumps(meta, ensure_ascii=False)
|
|
|
|
|
|
def run_ingestion(feed_id: int | None = None) -> IngestionStats:
|
|
run_id = create_run(RunCreate(run_type="ingestion", status="running", details="started"))
|
|
feeds_processed = 0
|
|
entries_seen = 0
|
|
articles_upserted = 0
|
|
feed_results: list[dict[str, object]] = []
|
|
|
|
try:
|
|
if feed_id is not None:
|
|
feed = get_feed_by_id(feed_id)
|
|
feeds = [feed] if feed and int(feed.get("is_enabled", 0)) == 1 else []
|
|
else:
|
|
feeds = list_enabled_feeds()
|
|
|
|
for feed in feeds:
|
|
if not feed:
|
|
continue
|
|
feeds_processed += 1
|
|
|
|
parsed = None
|
|
feed_error = None
|
|
for attempt in range(1, MAX_FEED_FETCH_RETRIES + 1):
|
|
try:
|
|
parsed = feedparser.parse(
|
|
feed["url"],
|
|
etag=feed.get("etag"),
|
|
modified=feed.get("last_modified"),
|
|
)
|
|
break
|
|
except Exception as exc:
|
|
feed_error = str(exc)
|
|
if attempt < MAX_FEED_FETCH_RETRIES:
|
|
time.sleep(0.5 * attempt)
|
|
|
|
if parsed is None:
|
|
feed_results.append(
|
|
{
|
|
"feed_id": int(feed["id"]),
|
|
"feed_url": feed["url"],
|
|
"status": "failed",
|
|
"error": feed_error or "unknown",
|
|
"entries_seen": 0,
|
|
"upserts": 0,
|
|
}
|
|
)
|
|
continue
|
|
|
|
# Persist ETag/Last-Modified for conditional requests.
|
|
parsed_etag = _parsed_get(parsed, "etag")
|
|
parsed_modified = _parsed_get(parsed, "modified")
|
|
if parsed_modified and not isinstance(parsed_modified, str):
|
|
parsed_modified = str(parsed_modified)
|
|
update_feed_fetch_state(
|
|
feed_id=int(feed["id"]),
|
|
etag=parsed_etag if isinstance(parsed_etag, str) else None,
|
|
last_modified=parsed_modified if isinstance(parsed_modified, str) else None,
|
|
)
|
|
|
|
feed_entries_seen = 0
|
|
feed_upserts = 0
|
|
from .config import get_settings as _get_settings
|
|
_max_age_days = _get_settings().pipeline_max_article_age_days
|
|
for entry in _parsed_get(parsed, "entries", []):
|
|
entries_seen += 1
|
|
feed_entries_seen += 1
|
|
link = entry.get("link")
|
|
if not link:
|
|
continue
|
|
|
|
# Age filter: skip articles older than max_age_days (0 = no limit)
|
|
if _max_age_days > 0:
|
|
published_iso = _entry_published_iso(entry)
|
|
if published_iso:
|
|
try:
|
|
published_dt = datetime.fromisoformat(published_iso)
|
|
age = datetime.now(timezone.utc) - published_dt
|
|
if age > timedelta(days=_max_age_days):
|
|
continue
|
|
except Exception:
|
|
pass # can't parse date → allow through
|
|
|
|
# Resolve Google redirect URLs (google.com/url?...&url=<actual_url>&...)
|
|
link = _resolve_google_redirect(link)
|
|
# Normalize AMP/tracking params (e.g. ?outputType=valid_amp)
|
|
link = _normalize_article_url(link)
|
|
|
|
summary, content_raw = _entry_text(entry)
|
|
# Strip HTML tags from title (Google Alerts wraps matched keywords in <b>)
|
|
raw_title = entry.get("title") or "Ohne Titel"
|
|
title = re.sub(r"<[^>]+>", "", raw_title).strip() or "Ohne Titel"
|
|
extracted = extract_article(link)
|
|
|
|
final_title = extracted.title or title
|
|
final_author = extracted.author or entry.get("author")
|
|
final_summary = extracted.summary or (summary[:1000] if summary else None)
|
|
final_content_raw = extracted.content_text or content_raw
|
|
final_canonical = extracted.canonical_url or entry.get("link")
|
|
selected_images, primary_image, ranked_images = _select_relevant_images(
|
|
link,
|
|
final_title,
|
|
extracted.images,
|
|
max_keep=3,
|
|
)
|
|
|
|
source_hash = _entry_hash(
|
|
entry,
|
|
int(feed["id"]),
|
|
link,
|
|
final_title,
|
|
final_summary or "",
|
|
)
|
|
attribution = {
|
|
"source_name": feed.get("source_name"),
|
|
"source_base_url": feed.get("source_base_url"),
|
|
"source_terms_url": feed.get("source_terms_url"),
|
|
"source_license_name": feed.get("source_license_name"),
|
|
"source_risk_level": feed.get("source_risk_level"),
|
|
"original_link": link,
|
|
"feed_name": feed.get("name"),
|
|
"feed_id": int(feed["id"]),
|
|
"imported_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
extraction_meta: dict[str, Any] = extracted_article_to_meta(extracted)
|
|
extraction_meta["fetched_from"] = link
|
|
extraction_meta["image_selection"] = {
|
|
"primary": primary_image,
|
|
"selected_count": len(selected_images),
|
|
"total_candidates": len(extracted.images),
|
|
"ranked": ranked_images,
|
|
}
|
|
base_payload = ArticleUpsert(
|
|
feed_id=int(feed["id"]),
|
|
source_article_id=entry.get("id") or entry.get("guid"),
|
|
source_hash=source_hash,
|
|
title=final_title,
|
|
source_url=link,
|
|
canonical_url=final_canonical,
|
|
published_at=_entry_published_iso(entry),
|
|
author=final_author,
|
|
summary=final_summary,
|
|
content_raw=final_content_raw,
|
|
content_rewritten=None,
|
|
image_urls_json=json.dumps(selected_images, ensure_ascii=False) if selected_images else None,
|
|
press_contact=extracted.press_contact,
|
|
source_name_snapshot=feed.get("source_name"),
|
|
source_terms_url_snapshot=feed.get("source_terms_url"),
|
|
source_license_name_snapshot=feed.get("source_license_name"),
|
|
legal_checked=False,
|
|
legal_checked_at=None,
|
|
legal_note=None,
|
|
wp_post_id=None,
|
|
wp_post_url=None,
|
|
publish_attempts=0,
|
|
publish_last_error=None,
|
|
published_to_wp_at=None,
|
|
word_count=len((final_content_raw or "").split()),
|
|
status="new",
|
|
meta_json=json.dumps({"attribution": attribution, "extraction": extraction_meta}, ensure_ascii=False),
|
|
)
|
|
existing = find_existing_article_for_upsert(base_payload)
|
|
if existing and existing.get("status") == "error":
|
|
# Explicitly closed article: ignore on subsequent ingestion runs.
|
|
continue
|
|
|
|
payload = base_payload
|
|
if existing:
|
|
payload = ArticleUpsert(
|
|
feed_id=base_payload.feed_id,
|
|
source_article_id=base_payload.source_article_id,
|
|
source_hash=base_payload.source_hash,
|
|
title=base_payload.title,
|
|
source_url=base_payload.source_url,
|
|
canonical_url=base_payload.canonical_url,
|
|
published_at=base_payload.published_at,
|
|
author=base_payload.author,
|
|
summary=base_payload.summary,
|
|
content_raw=base_payload.content_raw,
|
|
content_rewritten=existing.get("content_rewritten"),
|
|
image_urls_json=base_payload.image_urls_json,
|
|
press_contact=base_payload.press_contact or existing.get("press_contact"),
|
|
source_name_snapshot=base_payload.source_name_snapshot,
|
|
source_terms_url_snapshot=base_payload.source_terms_url_snapshot,
|
|
source_license_name_snapshot=base_payload.source_license_name_snapshot,
|
|
legal_checked=bool(int(existing.get("legal_checked", 0))),
|
|
legal_checked_at=existing.get("legal_checked_at"),
|
|
legal_note=existing.get("legal_note"),
|
|
wp_post_id=existing.get("wp_post_id"),
|
|
wp_post_url=existing.get("wp_post_url"),
|
|
publish_attempts=int(existing.get("publish_attempts", 0)),
|
|
publish_last_error=existing.get("publish_last_error"),
|
|
published_to_wp_at=existing.get("published_to_wp_at"),
|
|
word_count=base_payload.word_count,
|
|
status=existing.get("status") or "new",
|
|
meta_json=_merge_ingestion_meta(existing.get("meta_json"), attribution, extraction_meta),
|
|
)
|
|
|
|
article_id = upsert_article(payload)
|
|
if article_id:
|
|
articles_upserted += 1
|
|
feed_upserts += 1
|
|
|
|
feed_results.append(
|
|
{
|
|
"feed_id": int(feed["id"]),
|
|
"feed_url": feed["url"],
|
|
"status": "success",
|
|
"entries_seen": feed_entries_seen,
|
|
"upserts": feed_upserts,
|
|
}
|
|
)
|
|
|
|
finish_run(
|
|
run_id=run_id,
|
|
status="success",
|
|
details=json.dumps(
|
|
{
|
|
"feeds_processed": feeds_processed,
|
|
"entries_seen": entries_seen,
|
|
"upserts": articles_upserted,
|
|
"feeds": feed_results,
|
|
},
|
|
ensure_ascii=False,
|
|
),
|
|
)
|
|
return IngestionStats(
|
|
run_id=run_id,
|
|
feeds_processed=feeds_processed,
|
|
entries_seen=entries_seen,
|
|
articles_upserted=articles_upserted,
|
|
status="success",
|
|
message="Ingestion abgeschlossen",
|
|
)
|
|
except Exception as exc:
|
|
finish_run(run_id=run_id, status="failed", details=str(exc))
|
|
return IngestionStats(
|
|
run_id=run_id,
|
|
feeds_processed=feeds_processed,
|
|
entries_seen=entries_seen,
|
|
articles_upserted=articles_upserted,
|
|
status="failed",
|
|
message=str(exc),
|
|
)
|