from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import hashlib import json import time from typing import Any import feedparser from .policy import evaluate_source_policy from .repositories import ( ArticleUpsert, RunCreate, create_run, finish_run, get_feed_by_id, list_enabled_feeds, update_feed_fetch_state, upsert_article, ) from .source_extraction import extract_article, extracted_article_to_meta @dataclass(frozen=True) class IngestionStats: run_id: int feeds_processed: int entries_seen: int articles_upserted: int status: str message: str MAX_FEED_FETCH_RETRIES = 3 def _entry_published_iso(entry: dict) -> str | None: published = entry.get("published_parsed") or entry.get("updated_parsed") if not published: return None return datetime(*published[:6], tzinfo=timezone.utc).isoformat() def _entry_text(entry: dict) -> tuple[str, str]: summary = entry.get("summary", "") or "" content = "" if entry.get("content") and isinstance(entry.get("content"), list): first = entry["content"][0] content = first.get("value", "") if isinstance(first, dict) else "" if not content: content = summary return summary, content def _entry_hash(entry: dict, feed_id: int, link: str, title: str, summary: str) -> str: source_id = entry.get("id") or entry.get("guid") or "" published = _entry_published_iso(entry) or "" fingerprint = f"{feed_id}|{source_id}|{link}|{title.strip()}|{summary.strip()}|{published}" return hashlib.sha256(fingerprint.encode("utf-8")).hexdigest() def _parsed_get(parsed: object, key: str, default: object = None) -> object: if isinstance(parsed, dict): return parsed.get(key, default) return getattr(parsed, key, default) def run_ingestion(feed_id: int | None = None) -> IngestionStats: run_id = create_run(RunCreate(run_type="ingestion", status="running", details="started")) feeds_processed = 0 entries_seen = 0 articles_upserted = 0 feed_results: list[dict[str, object]] = [] try: if feed_id is not None: feed = get_feed_by_id(feed_id) feeds = [feed] if feed and int(feed.get("is_enabled", 0)) == 1 else [] else: feeds = list_enabled_feeds() for feed in feeds: if not feed: continue feeds_processed += 1 source_snapshot = { "id": feed.get("source_id"), "name": feed.get("source_name"), "base_url": feed.get("source_base_url"), "terms_url": feed.get("source_terms_url"), "license_name": feed.get("source_license_name"), "risk_level": feed.get("source_risk_level"), "last_reviewed_at": feed.get("source_last_reviewed_at"), "is_enabled": feed.get("source_is_enabled"), } policy_issues = evaluate_source_policy(source_snapshot) if policy_issues: feed_results.append( { "feed_id": int(feed["id"]), "feed_url": feed["url"], "status": "blocked", "policy_issues": policy_issues, "entries_seen": 0, "upserts": 0, } ) continue parsed = None feed_error = None for attempt in range(1, MAX_FEED_FETCH_RETRIES + 1): try: parsed = feedparser.parse( feed["url"], etag=feed.get("etag"), modified=feed.get("last_modified"), ) break except Exception as exc: feed_error = str(exc) if attempt < MAX_FEED_FETCH_RETRIES: time.sleep(0.5 * attempt) if parsed is None: feed_results.append( { "feed_id": int(feed["id"]), "feed_url": feed["url"], "status": "failed", "error": feed_error or "unknown", "entries_seen": 0, "upserts": 0, } ) continue # Persist ETag/Last-Modified for conditional requests. parsed_etag = _parsed_get(parsed, "etag") parsed_modified = _parsed_get(parsed, "modified") if parsed_modified and not isinstance(parsed_modified, str): parsed_modified = str(parsed_modified) update_feed_fetch_state( feed_id=int(feed["id"]), etag=parsed_etag if isinstance(parsed_etag, str) else None, last_modified=parsed_modified if isinstance(parsed_modified, str) else None, ) feed_entries_seen = 0 feed_upserts = 0 for entry in _parsed_get(parsed, "entries", []): entries_seen += 1 feed_entries_seen += 1 link = entry.get("link") if not link: continue summary, content_raw = _entry_text(entry) title = entry.get("title") or "Ohne Titel" extracted = extract_article(link) final_title = extracted.title or title final_author = extracted.author or entry.get("author") final_summary = extracted.summary or (summary[:1000] if summary else None) final_content_raw = extracted.content_text or content_raw final_canonical = extracted.canonical_url or entry.get("link") source_hash = _entry_hash( entry, int(feed["id"]), link, final_title, final_summary or "", ) attribution = { "source_name": feed.get("source_name"), "source_base_url": feed.get("source_base_url"), "source_terms_url": feed.get("source_terms_url"), "source_license_name": feed.get("source_license_name"), "source_risk_level": feed.get("source_risk_level"), "original_link": link, "feed_name": feed.get("name"), "feed_id": int(feed["id"]), "imported_at": datetime.now(timezone.utc).isoformat(), } extraction_meta: dict[str, Any] = extracted_article_to_meta(extracted) extraction_meta["fetched_from"] = link article_id = upsert_article( ArticleUpsert( feed_id=int(feed["id"]), source_article_id=entry.get("id") or entry.get("guid"), source_hash=source_hash, title=final_title, source_url=link, canonical_url=final_canonical, published_at=_entry_published_iso(entry), author=final_author, summary=final_summary, content_raw=final_content_raw, content_rewritten=None, image_urls_json=json.dumps(extracted.images, ensure_ascii=False) if extracted.images else None, press_contact=extracted.press_contact, source_name_snapshot=feed.get("source_name"), source_terms_url_snapshot=feed.get("source_terms_url"), source_license_name_snapshot=feed.get("source_license_name"), legal_checked=False, legal_checked_at=None, legal_note=None, word_count=len((final_content_raw or "").split()), status="new", meta_json=json.dumps({"attribution": attribution, "extraction": extraction_meta}, ensure_ascii=False), ) ) if article_id: articles_upserted += 1 feed_upserts += 1 feed_results.append( { "feed_id": int(feed["id"]), "feed_url": feed["url"], "status": "success", "entries_seen": feed_entries_seen, "upserts": feed_upserts, } ) finish_run( run_id=run_id, status="success", details=json.dumps( { "feeds_processed": feeds_processed, "entries_seen": entries_seen, "upserts": articles_upserted, "feeds": feed_results, }, ensure_ascii=False, ), ) return IngestionStats( run_id=run_id, feeds_processed=feeds_processed, entries_seen=entries_seen, articles_upserted=articles_upserted, status="success", message="Ingestion abgeschlossen", ) except Exception as exc: finish_run(run_id=run_id, status="failed", details=str(exc)) return IngestionStats( run_id=run_id, feeds_processed=feeds_processed, entries_seen=entries_seen, articles_upserted=articles_upserted, status="failed", message=str(exc), )