diff --git a/backend/app/admin_ui.py b/backend/app/admin_ui.py
index e9bfae4..a725ba8 100644
--- a/backend/app/admin_ui.py
+++ b/backend/app/admin_ui.py
@@ -3,6 +3,9 @@ from __future__ import annotations
import json
from pathlib import Path
import re
+import socket
+import ssl
+import time
from urllib.parse import urlparse
from urllib.parse import urlencode
from urllib.request import Request as UrlRequest, urlopen
@@ -254,6 +257,113 @@ def _legal_checklist(article: dict, feed: dict | None) -> list[dict[str, str]]:
return checks
+def _build_connectivity_targets() -> list[dict[str, str]]:
+ targets: list[dict[str, str]] = []
+ seen: set[tuple[str, str]] = set()
+
+ def add_target(label: str, kind: str, value: str) -> None:
+ normalized = (value or "").strip()
+ if not normalized:
+ return
+ key = (kind, normalized.lower())
+ if key in seen:
+ return
+ seen.add(key)
+ targets.append({"label": label, "kind": kind, "value": normalized})
+
+ add_target("OpenAI API", "host", "api.openai.com")
+ if settings.wordpress_base_url:
+ parsed = urlparse(settings.wordpress_base_url)
+ if parsed.hostname:
+ add_target("WordPress Host", "host", parsed.hostname)
+ wp_api_url = f"{settings.wordpress_base_url.rstrip('/')}/wp-json/wp/v2"
+ add_target("WordPress REST", "url", wp_api_url)
+
+ for feed in list_feeds():
+ name = (feed.get("name") or "").strip() or f"Feed #{feed.get('id')}"
+ feed_url = str(feed.get("url") or "").strip()
+ if not feed_url:
+ continue
+ parsed = urlparse(feed_url)
+ if parsed.hostname:
+ add_target(f"{name} (Feed)", "host", parsed.hostname)
+ add_target(f"{name} (Feed URL)", "url", feed_url)
+
+ return targets
+
+
+def _run_connectivity_check(target: dict[str, str]) -> dict[str, object]:
+ kind = target.get("kind", "")
+ value = str(target.get("value") or "")
+ row: dict[str, object] = {
+ "label": target.get("label") or "-",
+ "kind": kind,
+ "target": value,
+ "dns_ok": False,
+ "dns_info": "-",
+ "tcp_ok": False,
+ "tcp_info": "-",
+ "http_ok": False,
+ "http_info": "-",
+ "duration_ms": 0,
+ "ok": False,
+ }
+ started = time.perf_counter()
+ try:
+ hostname = value if kind == "host" else (urlparse(value).hostname or "")
+ port = 443
+ if kind == "url":
+ parsed = urlparse(value)
+ if parsed.scheme not in {"http", "https"}:
+ row["http_info"] = f"unsupported scheme: {parsed.scheme or '-'}"
+ return row
+ port = 443 if parsed.scheme == "https" else 80
+ if not hostname:
+ row["dns_info"] = "host fehlt"
+ return row
+
+ try:
+ addr_info = socket.getaddrinfo(hostname, None, proto=socket.IPPROTO_TCP)
+ ips = sorted({entry[4][0] for entry in addr_info if entry and len(entry) > 4 and entry[4]})
+ row["dns_ok"] = True
+ row["dns_info"] = ", ".join(ips[:3]) if ips else "resolved"
+ except Exception as exc:
+ row["dns_info"] = str(exc)
+ return row
+
+ try:
+ socket.create_connection((hostname, port), timeout=4).close()
+ row["tcp_ok"] = True
+ row["tcp_info"] = f"port {port} erreichbar"
+ except Exception as exc:
+ row["tcp_info"] = str(exc)
+ return row
+
+ if kind == "host":
+ row["http_ok"] = True
+ row["http_info"] = "n/a (host-only)"
+ row["ok"] = True
+ return row
+
+ try:
+ req = UrlRequest(
+ url=value,
+ headers={"User-Agent": IMAGE_PROXY_USER_AGENT, "Accept": "*/*"},
+ )
+ with urlopen(req, timeout=6, context=ssl.create_default_context()) as resp:
+ code = getattr(resp, "status", None) or resp.getcode()
+ row["http_ok"] = True
+ row["http_info"] = f"HTTP {code}"
+ except Exception as exc:
+ row["http_info"] = str(exc)
+ return row
+
+ row["ok"] = bool(row["dns_ok"] and row["tcp_ok"] and row["http_ok"])
+ return row
+ finally:
+ row["duration_ms"] = int((time.perf_counter() - started) * 1000)
+
+
@router.get("/admin", response_class=HTMLResponse)
def admin_index(request: Request):
user = _admin_user(request)
@@ -362,6 +472,29 @@ def admin_dashboard(request: Request):
)
+@router.get("/admin/connectivity", response_class=HTMLResponse)
+def admin_connectivity(request: Request):
+ user = _admin_user(request)
+ if not user:
+ return RedirectResponse(url="/admin/login", status_code=303)
+
+ checks = [_run_connectivity_check(target) for target in _build_connectivity_targets()]
+ ok_count = len([c for c in checks if c.get("ok")])
+ error_count = len(checks) - ok_count
+ return templates.TemplateResponse(
+ request,
+ "admin_connectivity.html",
+ {
+ "request": request,
+ "title": "Connectivity Check",
+ "user": user,
+ "checks": checks,
+ "ok_count": ok_count,
+ "error_count": error_count,
+ },
+ )
+
+
@router.get("/admin/articles/{article_id}", response_class=HTMLResponse)
def admin_article_detail(request: Request, article_id: int):
user = _admin_user(request)
diff --git a/backend/templates/admin_connectivity.html b/backend/templates/admin_connectivity.html
new file mode 100644
index 0000000..5fc0392
--- /dev/null
+++ b/backend/templates/admin_connectivity.html
@@ -0,0 +1,84 @@
+
+
+
+
+
+ {{ title }}
+
+
+
+
+
+
+
+
+ Checks
+ {{ checks|length }}
+
+
+ OK
+ {{ ok_count }}
+
+
+ Fehler
+ {{ error_count }}
+
+
+ Zeitpunkt
+ Live
+
+
+
+
+ Ziele
+ Geprüft werden DNS-Auflösung, TCP-Erreichbarkeit und bei URLs ein HTTP-Request.
+
+
+
+
+ Ergebnis
+
+
+ | Status | Name | Typ | Ziel | DNS | TCP | HTTP | Dauer |
+
+
+ {% for c in checks %}
+
+ | {% if c.ok %}OK{% else %}Fehler{% endif %} |
+ {{ c.label }} |
+ {{ c.kind }} |
+ {{ c.target }} |
+
+ {% if c.dns_ok %}OK{% else %}FAIL{% endif %}
+ {{ c.dns_info }}
+ |
+
+ {% if c.tcp_ok %}OK{% else %}FAIL{% endif %}
+ {{ c.tcp_info }}
+ |
+
+ {% if c.http_ok %}OK{% else %}FAIL{% endif %}
+ {{ c.http_info }}
+ |
+ {{ c.duration_ms }} ms |
+
+ {% endfor %}
+
+
+
+
+
+
diff --git a/backend/templates/admin_dashboard.html b/backend/templates/admin_dashboard.html
index 1ed5c6e..e43a9ae 100644
--- a/backend/templates/admin_dashboard.html
+++ b/backend/templates/admin_dashboard.html
@@ -12,9 +12,12 @@
rss-news Admin Dashboard
Angemeldet als {{ user }}
-
+
diff --git a/backend/tests/test_admin_ui.py b/backend/tests/test_admin_ui.py
index af47046..557121a 100644
--- a/backend/tests/test_admin_ui.py
+++ b/backend/tests/test_admin_ui.py
@@ -176,6 +176,53 @@ class TestAdminUi(unittest.TestCase):
self.assertEqual(res.status_code, 200)
self.assertIn("image/jpeg", res.headers.get("content-type", ""))
+ @patch("backend.app.admin_ui._run_connectivity_check")
+ @patch("backend.app.admin_ui._build_connectivity_targets")
+ def test_connectivity_page_renders(self, mock_targets, mock_check) -> None:
+ mock_targets.return_value = [
+ {"label": "OpenAI API", "kind": "host", "value": "api.openai.com"},
+ {"label": "WordPress REST", "kind": "url", "value": "https://example.org/wp-json/wp/v2"},
+ ]
+ mock_check.side_effect = [
+ {
+ "label": "OpenAI API",
+ "kind": "host",
+ "target": "api.openai.com",
+ "dns_ok": True,
+ "dns_info": "1.2.3.4",
+ "tcp_ok": True,
+ "tcp_info": "port 443 erreichbar",
+ "http_ok": True,
+ "http_info": "n/a (host-only)",
+ "duration_ms": 12,
+ "ok": True,
+ },
+ {
+ "label": "WordPress REST",
+ "kind": "url",
+ "target": "https://example.org/wp-json/wp/v2",
+ "dns_ok": False,
+ "dns_info": "Name or service not known",
+ "tcp_ok": False,
+ "tcp_info": "-",
+ "http_ok": False,
+ "http_info": "-",
+ "duration_ms": 10,
+ "ok": False,
+ },
+ ]
+
+ self.client.post(
+ "/admin/login",
+ data={"username": "admin", "password": "secret"},
+ follow_redirects=True,
+ )
+ res = self.client.get("/admin/connectivity", follow_redirects=True)
+ self.assertEqual(res.status_code, 200)
+ self.assertIn("Connectivity Check", res.text)
+ self.assertIn("OpenAI API", res.text)
+ self.assertIn("WordPress REST", res.text)
+
if __name__ == "__main__":
unittest.main()