rss-news/backend/app/ingestion.py

346 lines
13 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import hashlib
import json
import re
import time
from typing import Any
from urllib.parse import unquote, urlparse
import feedparser
from .policy import evaluate_source_policy
from .repositories import (
ArticleUpsert,
RunCreate,
create_run,
finish_run,
get_feed_by_id,
list_enabled_feeds,
update_feed_fetch_state,
upsert_article,
)
from .source_extraction import extract_article, extracted_article_to_meta
@dataclass(frozen=True)
class IngestionStats:
run_id: int
feeds_processed: int
entries_seen: int
articles_upserted: int
status: str
message: str
MAX_FEED_FETCH_RETRIES = 3
def _entry_published_iso(entry: dict) -> str | None:
published = entry.get("published_parsed") or entry.get("updated_parsed")
if not published:
return None
return datetime(*published[:6], tzinfo=timezone.utc).isoformat()
def _entry_text(entry: dict) -> tuple[str, str]:
summary = entry.get("summary", "") or ""
content = ""
if entry.get("content") and isinstance(entry.get("content"), list):
first = entry["content"][0]
content = first.get("value", "") if isinstance(first, dict) else ""
if not content:
content = summary
return summary, content
def _entry_hash(entry: dict, feed_id: int, link: str, title: str, summary: str) -> str:
source_id = entry.get("id") or entry.get("guid") or ""
published = _entry_published_iso(entry) or ""
fingerprint = f"{feed_id}|{source_id}|{link}|{title.strip()}|{summary.strip()}|{published}"
return hashlib.sha256(fingerprint.encode("utf-8")).hexdigest()
def _parsed_get(parsed: object, key: str, default: object = None) -> object:
if isinstance(parsed, dict):
return parsed.get(key, default)
return getattr(parsed, key, default)
def _normalize_tokens(text: str) -> set[str]:
normalized = re.sub(r"[^a-z0-9]+", " ", text.lower())
return {token for token in normalized.split() if len(token) >= 4}
def _rank_image_candidates(source_url: str, title: str, images: list[str]) -> list[dict[str, Any]]:
source_host = (urlparse(source_url).hostname or "").lower()
is_presseportal = "presseportal.de" in source_host
title_tokens = _normalize_tokens(title)
blocked_patterns = ("logo", "badge", "app-store", "google-play", "na-logo", "sprite", "icon", "favicon", "tracking", "pixel")
ranked: list[dict[str, Any]] = []
for url in images:
parsed = urlparse(url)
path = unquote(parsed.path.lower())
full = f"{parsed.netloc.lower()}{path}"
score = 0
reasons: list[str] = []
if any(token in full for token in blocked_patterns):
score -= 150
reasons.append("blocked-pattern")
if is_presseportal and "/thumbnail/story_big/" in path:
score += 120
reasons.append("presseportal-story-big")
elif is_presseportal and "/thumbnail/highlight/" in path:
score += 45
reasons.append("presseportal-highlight")
elif is_presseportal and "/thumbnail/liste/" in path:
score -= 40
reasons.append("presseportal-list")
if "crop=" in (parsed.query or "").lower():
score -= 10
reasons.append("cropped-preview")
path_tokens = _normalize_tokens(path.replace("-", " "))
overlap = len(title_tokens.intersection(path_tokens))
if overlap > 0:
score += min(30, overlap * 6)
reasons.append(f"title-match:{overlap}")
ranked.append({"url": url, "score": score, "reasons": reasons})
ranked.sort(key=lambda item: item["score"], reverse=True)
return ranked
def _select_relevant_images(source_url: str, title: str, images: list[str], max_keep: int = 3) -> tuple[list[str], str | None, list[dict[str, Any]]]:
# dedupe incoming order first
deduped: list[str] = []
seen: set[str] = set()
for image in images:
if image and image not in seen:
seen.add(image)
deduped.append(image)
ranked = _rank_image_candidates(source_url, title, deduped)
kept = [item["url"] for item in ranked if item["score"] > 0][:max_keep]
if not kept and ranked:
kept = [ranked[0]["url"]]
primary = kept[0] if kept else None
return kept, primary, ranked
def run_ingestion(feed_id: int | None = None) -> IngestionStats:
run_id = create_run(RunCreate(run_type="ingestion", status="running", details="started"))
feeds_processed = 0
entries_seen = 0
articles_upserted = 0
feed_results: list[dict[str, object]] = []
try:
if feed_id is not None:
feed = get_feed_by_id(feed_id)
feeds = [feed] if feed and int(feed.get("is_enabled", 0)) == 1 else []
else:
feeds = list_enabled_feeds()
for feed in feeds:
if not feed:
continue
feeds_processed += 1
source_snapshot = {
"id": feed.get("source_id"),
"name": feed.get("source_name"),
"base_url": feed.get("source_base_url"),
"terms_url": feed.get("source_terms_url"),
"license_name": feed.get("source_license_name"),
"risk_level": feed.get("source_risk_level"),
"last_reviewed_at": feed.get("source_last_reviewed_at"),
"is_enabled": feed.get("source_is_enabled"),
}
policy_issues = evaluate_source_policy(source_snapshot)
if policy_issues:
feed_results.append(
{
"feed_id": int(feed["id"]),
"feed_url": feed["url"],
"status": "blocked",
"policy_issues": policy_issues,
"entries_seen": 0,
"upserts": 0,
}
)
continue
parsed = None
feed_error = None
for attempt in range(1, MAX_FEED_FETCH_RETRIES + 1):
try:
parsed = feedparser.parse(
feed["url"],
etag=feed.get("etag"),
modified=feed.get("last_modified"),
)
break
except Exception as exc:
feed_error = str(exc)
if attempt < MAX_FEED_FETCH_RETRIES:
time.sleep(0.5 * attempt)
if parsed is None:
feed_results.append(
{
"feed_id": int(feed["id"]),
"feed_url": feed["url"],
"status": "failed",
"error": feed_error or "unknown",
"entries_seen": 0,
"upserts": 0,
}
)
continue
# Persist ETag/Last-Modified for conditional requests.
parsed_etag = _parsed_get(parsed, "etag")
parsed_modified = _parsed_get(parsed, "modified")
if parsed_modified and not isinstance(parsed_modified, str):
parsed_modified = str(parsed_modified)
update_feed_fetch_state(
feed_id=int(feed["id"]),
etag=parsed_etag if isinstance(parsed_etag, str) else None,
last_modified=parsed_modified if isinstance(parsed_modified, str) else None,
)
feed_entries_seen = 0
feed_upserts = 0
for entry in _parsed_get(parsed, "entries", []):
entries_seen += 1
feed_entries_seen += 1
link = entry.get("link")
if not link:
continue
summary, content_raw = _entry_text(entry)
title = entry.get("title") or "Ohne Titel"
extracted = extract_article(link)
final_title = extracted.title or title
final_author = extracted.author or entry.get("author")
final_summary = extracted.summary or (summary[:1000] if summary else None)
final_content_raw = extracted.content_text or content_raw
final_canonical = extracted.canonical_url or entry.get("link")
selected_images, primary_image, ranked_images = _select_relevant_images(
link,
final_title,
extracted.images,
max_keep=3,
)
source_hash = _entry_hash(
entry,
int(feed["id"]),
link,
final_title,
final_summary or "",
)
attribution = {
"source_name": feed.get("source_name"),
"source_base_url": feed.get("source_base_url"),
"source_terms_url": feed.get("source_terms_url"),
"source_license_name": feed.get("source_license_name"),
"source_risk_level": feed.get("source_risk_level"),
"original_link": link,
"feed_name": feed.get("name"),
"feed_id": int(feed["id"]),
"imported_at": datetime.now(timezone.utc).isoformat(),
}
extraction_meta: dict[str, Any] = extracted_article_to_meta(extracted)
extraction_meta["fetched_from"] = link
extraction_meta["image_selection"] = {
"primary": primary_image,
"selected_count": len(selected_images),
"total_candidates": len(extracted.images),
"ranked": ranked_images,
}
article_id = upsert_article(
ArticleUpsert(
feed_id=int(feed["id"]),
source_article_id=entry.get("id") or entry.get("guid"),
source_hash=source_hash,
title=final_title,
source_url=link,
canonical_url=final_canonical,
published_at=_entry_published_iso(entry),
author=final_author,
summary=final_summary,
content_raw=final_content_raw,
content_rewritten=None,
image_urls_json=json.dumps(selected_images, ensure_ascii=False) if selected_images else None,
press_contact=extracted.press_contact,
source_name_snapshot=feed.get("source_name"),
source_terms_url_snapshot=feed.get("source_terms_url"),
source_license_name_snapshot=feed.get("source_license_name"),
legal_checked=False,
legal_checked_at=None,
legal_note=None,
wp_post_id=None,
wp_post_url=None,
publish_attempts=0,
publish_last_error=None,
published_to_wp_at=None,
word_count=len((final_content_raw or "").split()),
status="new",
meta_json=json.dumps({"attribution": attribution, "extraction": extraction_meta}, ensure_ascii=False),
)
)
if article_id:
articles_upserted += 1
feed_upserts += 1
feed_results.append(
{
"feed_id": int(feed["id"]),
"feed_url": feed["url"],
"status": "success",
"entries_seen": feed_entries_seen,
"upserts": feed_upserts,
}
)
finish_run(
run_id=run_id,
status="success",
details=json.dumps(
{
"feeds_processed": feeds_processed,
"entries_seen": entries_seen,
"upserts": articles_upserted,
"feeds": feed_results,
},
ensure_ascii=False,
),
)
return IngestionStats(
run_id=run_id,
feeds_processed=feeds_processed,
entries_seen=entries_seen,
articles_upserted=articles_upserted,
status="success",
message="Ingestion abgeschlossen",
)
except Exception as exc:
finish_run(run_id=run_id, status="failed", details=str(exc))
return IngestionStats(
run_id=run_id,
feeds_processed=feeds_processed,
entries_seen=entries_seen,
articles_upserted=articles_upserted,
status="failed",
message=str(exc),
)