feat(legal): add structured attribution fields and publish legal gate

This commit is contained in:
Oliver 2026-02-18 10:02:19 +01:00
parent c52363f1a7
commit 5159a6e3b4
10 changed files with 259 additions and 16 deletions

View file

@ -23,6 +23,7 @@ from .repositories import (
list_feeds,
list_runs,
list_sources,
set_article_legal_review,
update_article_status,
)
@ -104,22 +105,22 @@ def _legal_checklist(article: dict, feed: dict | None) -> list[dict[str, str]]:
checks.append(
{
"label": "Bilder extrahiert",
"status": "ok" if extraction.get("images") else "missing",
"status": "ok" if article.get("image_urls_json") else "missing",
"value": str(len(extraction.get("images", []))) if isinstance(extraction.get("images"), list) else "0",
}
)
checks.append(
{
"label": "Pressekontakt",
"status": "ok" if extraction.get("press_contact") else "missing",
"value": extraction.get("press_contact") or "-",
"status": "ok" if article.get("press_contact") else "missing",
"value": article.get("press_contact") or extraction.get("press_contact") or "-",
}
)
checks.append(
{
"label": "Lizenz/Terms",
"status": "ok" if attribution.get("source_license_name") and attribution.get("source_terms_url") else "missing",
"value": f"{attribution.get('source_license_name') or '-'} | {attribution.get('source_terms_url') or '-'}",
"status": "ok" if article.get("source_license_name_snapshot") and article.get("source_terms_url_snapshot") else "missing",
"value": f"{article.get('source_license_name_snapshot') or attribution.get('source_license_name') or '-'} | {article.get('source_terms_url_snapshot') or attribution.get('source_terms_url') or '-'}",
}
)
checks.append(
@ -129,6 +130,13 @@ def _legal_checklist(article: dict, feed: dict | None) -> list[dict[str, str]]:
"value": feed.get("source_risk_level") if feed else "-",
}
)
checks.append(
{
"label": "Manuelle Rechtsfreigabe",
"status": "ok" if int(article.get("legal_checked", 0)) == 1 else "missing",
"value": article.get("legal_checked_at") or "-",
}
)
return checks
@ -193,9 +201,20 @@ def admin_dashboard(request: Request):
for article in articles:
meta = _parse_meta_json(article.get("meta_json"))
extraction = meta.get("extraction") if isinstance(meta.get("extraction"), dict) else {}
images = []
if article.get("image_urls_json"):
try:
parsed_images = json.loads(article["image_urls_json"])
if isinstance(parsed_images, list):
images = [str(item) for item in parsed_images if item]
except Exception:
images = []
if not images and isinstance(extraction.get("images"), list):
images = extraction.get("images")
article["meta"] = meta
article["extracted_images"] = extraction.get("images") if isinstance(extraction.get("images"), list) else []
article["press_contact"] = extraction.get("press_contact") if isinstance(extraction.get("press_contact"), str) else None
article["extracted_images"] = images
if not article.get("press_contact") and isinstance(extraction.get("press_contact"), str):
article["press_contact"] = extraction.get("press_contact")
article["extraction_error"] = extraction.get("extraction_error") if isinstance(extraction.get("extraction_error"), str) else None
return templates.TemplateResponse(
@ -232,6 +251,15 @@ def admin_article_detail(request: Request, article_id: int):
meta = _parse_meta_json(article.get("meta_json"))
article["meta"] = meta
extraction = meta.get("extraction") if isinstance(meta.get("extraction"), dict) else {}
if article.get("image_urls_json"):
try:
parsed_images = json.loads(article["image_urls_json"])
if isinstance(parsed_images, list):
extraction["images"] = [str(item) for item in parsed_images if item]
except Exception:
pass
if not article.get("press_contact") and isinstance(extraction.get("press_contact"), str):
article["press_contact"] = extraction.get("press_contact")
article["extraction"] = extraction
feed = get_feed_by_id(int(article["feed_id"])) if article.get("feed_id") else None
checklist = _legal_checklist(article, feed)
@ -251,6 +279,19 @@ def admin_article_detail(request: Request, article_id: int):
)
@router.post("/admin/articles/{article_id}/legal-review")
def admin_article_legal_review(request: Request, article_id: int, approved: str = Form("0"), note: str = Form("")):
user = _admin_user(request)
if not user:
return RedirectResponse(url="/admin/login", status_code=303)
is_approved = approved == "1"
ok = set_article_legal_review(article_id, approved=is_approved, note=note or None, actor=user)
if not ok:
return _dashboard_redirect(msg=f"Artikel #{article_id} nicht gefunden", msg_type="error")
return RedirectResponse(url=f"/admin/articles/{article_id}", status_code=303)
@router.post("/admin/sources/create")
def admin_create_source(
request: Request,
@ -344,6 +385,8 @@ def admin_transition_article(request: Request, article_id: int, target_status: s
if article:
current = article.get("status")
if target_status in ALLOWED_TRANSITIONS.get(current, ()):
if target_status == "published" and int(article.get("legal_checked", 0)) != 1:
return _dashboard_redirect(msg=f"Publish blockiert fuer Artikel #{article_id}: Rechtsfreigabe fehlt", msg_type="error")
update_article_status(article_id, target_status, actor=user, note=note or None)
return _dashboard_redirect(msg=f"Artikel #{article_id}: {current} -> {target_status}")
return _dashboard_redirect(msg=f"Ungueltiger Statuswechsel fuer Artikel #{article_id}", msg_type="error")