from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import hashlib import json import re import time from typing import Any from urllib.parse import unquote, urlparse import feedparser from .policy import evaluate_source_policy from .repositories import ( ArticleUpsert, RunCreate, create_run, finish_run, get_feed_by_id, list_enabled_feeds, update_feed_fetch_state, upsert_article, ) from .source_extraction import extract_article, extracted_article_to_meta @dataclass(frozen=True) class IngestionStats: run_id: int feeds_processed: int entries_seen: int articles_upserted: int status: str message: str MAX_FEED_FETCH_RETRIES = 3 def _entry_published_iso(entry: dict) -> str | None: published = entry.get("published_parsed") or entry.get("updated_parsed") if not published: return None return datetime(*published[:6], tzinfo=timezone.utc).isoformat() def _entry_text(entry: dict) -> tuple[str, str]: summary = entry.get("summary", "") or "" content = "" if entry.get("content") and isinstance(entry.get("content"), list): first = entry["content"][0] content = first.get("value", "") if isinstance(first, dict) else "" if not content: content = summary return summary, content def _entry_hash(entry: dict, feed_id: int, link: str, title: str, summary: str) -> str: source_id = entry.get("id") or entry.get("guid") or "" published = _entry_published_iso(entry) or "" fingerprint = f"{feed_id}|{source_id}|{link}|{title.strip()}|{summary.strip()}|{published}" return hashlib.sha256(fingerprint.encode("utf-8")).hexdigest() def _parsed_get(parsed: object, key: str, default: object = None) -> object: if isinstance(parsed, dict): return parsed.get(key, default) return getattr(parsed, key, default) def _normalize_tokens(text: str) -> set[str]: normalized = re.sub(r"[^a-z0-9]+", " ", text.lower()) return {token for token in normalized.split() if len(token) >= 4} def _rank_image_candidates(source_url: str, title: str, images: list[str]) -> list[dict[str, Any]]: source_host = (urlparse(source_url).hostname or "").lower() is_presseportal = "presseportal.de" in source_host title_tokens = _normalize_tokens(title) blocked_patterns = ("logo", "badge", "app-store", "google-play", "na-logo", "sprite", "icon", "favicon", "tracking", "pixel") ranked: list[dict[str, Any]] = [] for url in images: parsed = urlparse(url) path = unquote(parsed.path.lower()) full = f"{parsed.netloc.lower()}{path}" score = 0 reasons: list[str] = [] if any(token in full for token in blocked_patterns): score -= 150 reasons.append("blocked-pattern") if is_presseportal and "/thumbnail/story_big/" in path: score += 120 reasons.append("presseportal-story-big") elif is_presseportal and "/thumbnail/highlight/" in path: score += 45 reasons.append("presseportal-highlight") elif is_presseportal and "/thumbnail/liste/" in path: score -= 40 reasons.append("presseportal-list") if "crop=" in (parsed.query or "").lower(): score -= 10 reasons.append("cropped-preview") path_tokens = _normalize_tokens(path.replace("-", " ")) overlap = len(title_tokens.intersection(path_tokens)) if overlap > 0: score += min(30, overlap * 6) reasons.append(f"title-match:{overlap}") ranked.append({"url": url, "score": score, "reasons": reasons}) ranked.sort(key=lambda item: item["score"], reverse=True) return ranked def _select_relevant_images(source_url: str, title: str, images: list[str], max_keep: int = 3) -> tuple[list[str], str | None, list[dict[str, Any]]]: # dedupe incoming order first deduped: list[str] = [] seen: set[str] = set() for image in images: if image and image not in seen: seen.add(image) deduped.append(image) ranked = _rank_image_candidates(source_url, title, deduped) kept = [item["url"] for item in ranked if item["score"] > 0][:max_keep] if not kept and ranked: kept = [ranked[0]["url"]] primary = kept[0] if kept else None return kept, primary, ranked def run_ingestion(feed_id: int | None = None) -> IngestionStats: run_id = create_run(RunCreate(run_type="ingestion", status="running", details="started")) feeds_processed = 0 entries_seen = 0 articles_upserted = 0 feed_results: list[dict[str, object]] = [] try: if feed_id is not None: feed = get_feed_by_id(feed_id) feeds = [feed] if feed and int(feed.get("is_enabled", 0)) == 1 else [] else: feeds = list_enabled_feeds() for feed in feeds: if not feed: continue feeds_processed += 1 source_snapshot = { "id": feed.get("source_id"), "name": feed.get("source_name"), "base_url": feed.get("source_base_url"), "terms_url": feed.get("source_terms_url"), "license_name": feed.get("source_license_name"), "risk_level": feed.get("source_risk_level"), "last_reviewed_at": feed.get("source_last_reviewed_at"), "is_enabled": feed.get("source_is_enabled"), } policy_issues = evaluate_source_policy(source_snapshot) if policy_issues: feed_results.append( { "feed_id": int(feed["id"]), "feed_url": feed["url"], "status": "blocked", "policy_issues": policy_issues, "entries_seen": 0, "upserts": 0, } ) continue parsed = None feed_error = None for attempt in range(1, MAX_FEED_FETCH_RETRIES + 1): try: parsed = feedparser.parse( feed["url"], etag=feed.get("etag"), modified=feed.get("last_modified"), ) break except Exception as exc: feed_error = str(exc) if attempt < MAX_FEED_FETCH_RETRIES: time.sleep(0.5 * attempt) if parsed is None: feed_results.append( { "feed_id": int(feed["id"]), "feed_url": feed["url"], "status": "failed", "error": feed_error or "unknown", "entries_seen": 0, "upserts": 0, } ) continue # Persist ETag/Last-Modified for conditional requests. parsed_etag = _parsed_get(parsed, "etag") parsed_modified = _parsed_get(parsed, "modified") if parsed_modified and not isinstance(parsed_modified, str): parsed_modified = str(parsed_modified) update_feed_fetch_state( feed_id=int(feed["id"]), etag=parsed_etag if isinstance(parsed_etag, str) else None, last_modified=parsed_modified if isinstance(parsed_modified, str) else None, ) feed_entries_seen = 0 feed_upserts = 0 for entry in _parsed_get(parsed, "entries", []): entries_seen += 1 feed_entries_seen += 1 link = entry.get("link") if not link: continue summary, content_raw = _entry_text(entry) title = entry.get("title") or "Ohne Titel" extracted = extract_article(link) final_title = extracted.title or title final_author = extracted.author or entry.get("author") final_summary = extracted.summary or (summary[:1000] if summary else None) final_content_raw = extracted.content_text or content_raw final_canonical = extracted.canonical_url or entry.get("link") selected_images, primary_image, ranked_images = _select_relevant_images( link, final_title, extracted.images, max_keep=3, ) source_hash = _entry_hash( entry, int(feed["id"]), link, final_title, final_summary or "", ) attribution = { "source_name": feed.get("source_name"), "source_base_url": feed.get("source_base_url"), "source_terms_url": feed.get("source_terms_url"), "source_license_name": feed.get("source_license_name"), "source_risk_level": feed.get("source_risk_level"), "original_link": link, "feed_name": feed.get("name"), "feed_id": int(feed["id"]), "imported_at": datetime.now(timezone.utc).isoformat(), } extraction_meta: dict[str, Any] = extracted_article_to_meta(extracted) extraction_meta["fetched_from"] = link extraction_meta["image_selection"] = { "primary": primary_image, "selected_count": len(selected_images), "total_candidates": len(extracted.images), "ranked": ranked_images, } article_id = upsert_article( ArticleUpsert( feed_id=int(feed["id"]), source_article_id=entry.get("id") or entry.get("guid"), source_hash=source_hash, title=final_title, source_url=link, canonical_url=final_canonical, published_at=_entry_published_iso(entry), author=final_author, summary=final_summary, content_raw=final_content_raw, content_rewritten=None, image_urls_json=json.dumps(selected_images, ensure_ascii=False) if selected_images else None, press_contact=extracted.press_contact, source_name_snapshot=feed.get("source_name"), source_terms_url_snapshot=feed.get("source_terms_url"), source_license_name_snapshot=feed.get("source_license_name"), legal_checked=False, legal_checked_at=None, legal_note=None, wp_post_id=None, wp_post_url=None, publish_attempts=0, publish_last_error=None, published_to_wp_at=None, word_count=len((final_content_raw or "").split()), status="new", meta_json=json.dumps({"attribution": attribution, "extraction": extraction_meta}, ensure_ascii=False), ) ) if article_id: articles_upserted += 1 feed_upserts += 1 feed_results.append( { "feed_id": int(feed["id"]), "feed_url": feed["url"], "status": "success", "entries_seen": feed_entries_seen, "upserts": feed_upserts, } ) finish_run( run_id=run_id, status="success", details=json.dumps( { "feeds_processed": feeds_processed, "entries_seen": entries_seen, "upserts": articles_upserted, "feeds": feed_results, }, ensure_ascii=False, ), ) return IngestionStats( run_id=run_id, feeds_processed=feeds_processed, entries_seen=entries_seen, articles_upserted=articles_upserted, status="success", message="Ingestion abgeschlossen", ) except Exception as exc: finish_run(run_id=run_id, status="failed", details=str(exc)) return IngestionStats( run_id=run_id, feeds_processed=feeds_processed, entries_seen=entries_seen, articles_upserted=articles_upserted, status="failed", message=str(exc), )