StaySense/backend/server.py

1293 lines
45 KiB
Python

import datetime as dt
import hashlib
import hmac
import json
import os
import secrets
import uuid
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import sqlite3
from urllib.parse import parse_qs, urlencode, urlparse
from urllib.request import Request, urlopen
from db import get_conn, init_db
from score_engine import (
ampel,
clamp_score,
classify_area,
classify_road,
decay,
haversine_m,
nearest_distance_m,
night_window_for,
score_area_modifier,
spot_id_for,
weekend_or_holiday,
)
HOST = "127.0.0.1"
PORT = 8787
DEFAULT_SALT = "change-me-in-production"
SERVER_SALT = os.environ.get("STAYSENSE_SERVER_SALT", DEFAULT_SALT)
SIGNAL_COOLDOWN_HOURS = int(os.environ.get("STAYSENSE_SIGNAL_COOLDOWN_HOURS", "24"))
NOMINATIM_BASE_URL = os.environ.get("STAYSENSE_NOMINATIM_BASE_URL", "https://nominatim.openstreetmap.org/search")
NOMINATIM_USER_AGENT = os.environ.get(
"STAYSENSE_NOMINATIM_USER_AGENT",
"StaySense/0.1 (staysense.vanityontour.de)",
)
OSM_TILE_BASE_URL = os.environ.get("STAYSENSE_OSM_TILE_BASE_URL", "https://tile.openstreetmap.org")
ADMIN_SESSION_HOURS = int(os.environ.get("STAYSENSE_ADMIN_SESSION_HOURS", "12"))
ADMIN_PBKDF2_ITERATIONS = int(os.environ.get("STAYSENSE_ADMIN_PBKDF2_ITERATIONS", "390000"))
ADMIN_MANUAL_SOURCE = "admin_manual"
FALLBACK_POLICE_POINTS = [(51.2507, 6.9751), (51.2965, 6.8494), (51.3398, 7.0438)]
FALLBACK_FIRE_POINTS = [(51.2518, 6.9800), (51.2937, 6.8568), (51.3314, 7.0540)]
FALLBACK_HOSPITAL_POINTS = [(51.2556, 6.9723), (51.2891, 6.8457), (51.3321, 7.0403)]
ADMIN_SESSIONS: dict[str, dict] = {}
def utc_now() -> dt.datetime:
return dt.datetime.now(dt.timezone.utc)
def parse_iso8601(value: str | None) -> dt.datetime:
if not value:
return utc_now()
parsed = dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
if parsed.tzinfo is None:
return parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def to_iso(value: dt.datetime) -> str:
return value.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict) -> None:
raw = json.dumps(payload, ensure_ascii=True).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(raw)))
handler.send_header("Cache-Control", "no-store")
handler.end_headers()
handler.wfile.write(raw)
def binary_response(
handler: BaseHTTPRequestHandler, status: int, payload: bytes, content_type: str, cache_control: str = "public, max-age=3600"
) -> None:
handler.send_response(status)
handler.send_header("Content-Type", content_type)
handler.send_header("Content-Length", str(len(payload)))
handler.send_header("Cache-Control", cache_control)
handler.end_headers()
handler.wfile.write(payload)
def read_json(handler: BaseHTTPRequestHandler) -> dict:
length = int(handler.headers.get("Content-Length", "0"))
if length <= 0:
return {}
body = handler.rfile.read(length)
return json.loads(body.decode("utf-8"))
def require_fields(handler: BaseHTTPRequestHandler, body: dict, fields: list[str]) -> bool:
missing = [field for field in fields if not body.get(field)]
if missing:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "missing_fields", "fields": missing})
return False
return True
def hashed_device(device_token: str) -> str:
return hmac.new(
key=SERVER_SALT.encode("utf-8"),
msg=device_token.encode("utf-8"),
digestmod=hashlib.sha256,
).hexdigest()
def pbkdf2_hash(password: str, salt_hex: str) -> str:
dk = hashlib.pbkdf2_hmac(
"sha256",
password.encode("utf-8"),
bytes.fromhex(salt_hex),
ADMIN_PBKDF2_ITERATIONS,
dklen=32,
)
return dk.hex()
def admin_exists() -> bool:
with get_conn() as conn:
row = conn.execute("SELECT 1 FROM admin_user WHERE id = 1").fetchone()
return bool(row)
def get_admin_user() -> dict | None:
with get_conn() as conn:
row = conn.execute("SELECT id, username, password_hash, password_salt FROM admin_user WHERE id = 1").fetchone()
return dict(row) if row else None
def validate_admin_password(password: str) -> bool:
if not isinstance(password, str):
return False
return len(password) >= 10
def create_admin_user(username: str, password: str) -> tuple[bool, str]:
if not isinstance(username, str) or len(username.strip()) < 3:
return False, "invalid_username"
if not validate_admin_password(password):
return False, "invalid_password"
if admin_exists():
return False, "already_initialized"
now_iso = to_iso(utc_now())
username_clean = username.strip()
salt_hex = secrets.token_hex(16)
pw_hash = pbkdf2_hash(password, salt_hex)
with get_conn() as conn:
conn.execute(
"""
INSERT INTO admin_user (id, username, password_hash, password_salt, created_at, updated_at)
VALUES (1, ?, ?, ?, ?, ?)
""",
(username_clean, pw_hash, salt_hex, now_iso, now_iso),
)
return True, "created"
def create_admin_session(username: str) -> dict:
token = secrets.token_urlsafe(32)
expires_at = utc_now() + dt.timedelta(hours=ADMIN_SESSION_HOURS)
ADMIN_SESSIONS[token] = {
"username": username,
"expires_at": expires_at,
}
return {"token": token, "expires_at": to_iso(expires_at), "session_hours": ADMIN_SESSION_HOURS}
def cleanup_admin_sessions() -> None:
now = utc_now()
expired = [token for token, item in ADMIN_SESSIONS.items() if item["expires_at"] <= now]
for token in expired:
ADMIN_SESSIONS.pop(token, None)
def admin_auth(handler: BaseHTTPRequestHandler) -> tuple[bool, str]:
cleanup_admin_sessions()
auth_header = handler.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return False, "missing_token"
token = auth_header.replace("Bearer ", "", 1).strip()
session = ADMIN_SESSIONS.get(token)
if not session:
return False, "invalid_token"
if session["expires_at"] <= utc_now():
ADMIN_SESSIONS.pop(token, None)
return False, "expired_token"
return True, session["username"]
def parse_admin_event(body: dict) -> tuple[dict | None, str | None]:
try:
event_type = str(body.get("event_type", "")).strip()
lat = float(body.get("lat"))
lon = float(body.get("lon"))
risk_modifier = int(body.get("risk_modifier"))
start_datetime = to_iso(parse_iso8601(body.get("start_datetime")))
end_datetime = to_iso(parse_iso8601(body.get("end_datetime")))
source = str(body.get("source", ADMIN_MANUAL_SOURCE)).strip() or ADMIN_MANUAL_SOURCE
except Exception:
return None, "invalid_payload"
if event_type not in {"market", "waste", "event", "construction"}:
return None, "invalid_event_type"
if not (47.0 <= lat <= 55.5 and 5.0 <= lon <= 16.0):
return None, "lat_lon_out_of_bounds"
if start_datetime >= end_datetime:
return None, "invalid_time_window"
if risk_modifier < -50 or risk_modifier > 50:
return None, "invalid_risk_modifier"
return {
"event_type": event_type,
"lat": lat,
"lon": lon,
"risk_modifier": risk_modifier,
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"source": source,
}, None
def fetch_points(sql: str, params: tuple) -> list[tuple[float, float]]:
with get_conn() as conn:
rows = conn.execute(sql, params).fetchall()
return [(float(r["lat"]), float(r["lon"])) for r in rows]
def nearest_from_db(
lat: float, lon: float, table: str, type_col: str, value: str, fallback_points: list[tuple[float, float]]
) -> tuple[int, bool]:
rows = fetch_points(
f"SELECT lat, lon FROM {table} WHERE {type_col} = ?",
(value,),
)
if rows:
return int(min(haversine_m(lat, lon, x, y) for x, y in rows)), False
if fallback_points:
return nearest_distance_m(lat, lon, fallback_points), True
return 5000, True
def area_from_db(lat: float, lon: float) -> str:
with get_conn() as conn:
rows = conn.execute("SELECT zone_type, lat, lon FROM osm_zone").fetchall()
if not rows:
return classify_area(lat, lon)
best_type = "residential"
best_dist = 999999.0
for row in rows:
dist = haversine_m(lat, lon, float(row["lat"]), float(row["lon"]))
if dist < best_dist:
best_dist = dist
best_type = row["zone_type"]
return best_type if best_dist <= 500 else "residential"
def road_from_db(lat: float, lon: float) -> str:
with get_conn() as conn:
rows = conn.execute("SELECT road_type, lat, lon FROM osm_road").fetchall()
if not rows:
return classify_road(lat, lon)
best_type = "unknown"
best_dist = 999999.0
for row in rows:
dist = haversine_m(lat, lon, float(row["lat"]), float(row["lon"]))
if dist < best_dist:
best_dist = dist
best_type = row["road_type"]
return best_type if best_dist <= 300 else "unknown"
def ensure_spot(lat: float, lon: float, now_iso: str) -> dict:
s_id = spot_id_for(lat, lon)
with get_conn() as conn:
row = conn.execute("SELECT * FROM spot WHERE id = ?", (s_id,)).fetchone()
if row:
return dict(row)
area_type = area_from_db(lat, lon)
road_type = road_from_db(lat, lon)
police_d, police_fallback = nearest_from_db(lat, lon, "osm_poi", "poi_type", "police", FALLBACK_POLICE_POINTS)
fire_d, fire_fallback = nearest_from_db(lat, lon, "osm_poi", "poi_type", "fire", FALLBACK_FIRE_POINTS)
hosp_d, hosp_fallback = nearest_from_db(lat, lon, "osm_poi", "poi_type", "hospital", FALLBACK_HOSPITAL_POINTS)
try:
conn.execute(
"""
INSERT INTO spot (
id, lat, lon, osm_area_type, road_type,
distance_police_m, distance_fire_m, distance_hospital_m,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(s_id, lat, lon, area_type, road_type, police_d, fire_d, hosp_d, now_iso, now_iso),
)
except sqlite3.IntegrityError:
# Concurrent request may have inserted the same spot id meanwhile.
existing = conn.execute("SELECT * FROM spot WHERE id = ?", (s_id,)).fetchone()
if existing:
return dict(existing)
raise
except sqlite3.OperationalError:
# Read-only DB deployments can still serve scores without persisting spot cache.
return {
"id": s_id,
"lat": lat,
"lon": lon,
"osm_area_type": area_type,
"road_type": road_type,
"distance_police_m": police_d,
"distance_fire_m": fire_d,
"distance_hospital_m": hosp_d,
"used_fallback_pois": police_fallback or fire_fallback or hosp_fallback,
}
return {
"id": s_id,
"lat": lat,
"lon": lon,
"osm_area_type": area_type,
"road_type": road_type,
"distance_police_m": police_d,
"distance_fire_m": fire_d,
"distance_hospital_m": hosp_d,
"used_fallback_pois": police_fallback or fire_fallback or hosp_fallback,
}
def collect_local_event_factors(lat: float, lon: float, night_end: dt.datetime) -> list[dict]:
next_morning_start = night_end
next_morning_end = night_end + dt.timedelta(hours=4)
with get_conn() as conn:
rows = conn.execute(
"""
SELECT event_type, risk_modifier, source, lat, lon, start_datetime, end_datetime
FROM open_data_event
WHERE start_datetime <= ?
AND end_datetime >= ?
""",
(to_iso(next_morning_end), to_iso(next_morning_start)),
).fetchall()
factors = []
seen = set()
for row in rows:
distance = haversine_m(lat, lon, float(row["lat"]), float(row["lon"]))
if distance > 1000:
continue
event_type = row["event_type"]
label = {
"waste": "Müllabfuhr am Morgen",
"market": "Marktbetrieb am Morgen",
"event": "Lokale Veranstaltung",
"construction": "Baustelle",
}.get(event_type, "Lokales Ereignis")
dedupe_key = (
event_type,
row["risk_modifier"],
row["start_datetime"] if "start_datetime" in row.keys() else "",
row["end_datetime"] if "end_datetime" in row.keys() else "",
round(float(row["lat"]), 4),
round(float(row["lon"]), 4),
)
if dedupe_key in seen:
continue
seen.add(dedupe_key)
factors.append(
{
"key": f"event_{event_type}",
"label": label,
"points": float(row["risk_modifier"]),
"source": row["source"],
}
)
return factors
def collect_community_factors(spot_id: str, at_time: dt.datetime) -> list[dict]:
max_window = at_time - dt.timedelta(days=30)
with get_conn() as conn:
rows = conn.execute(
"""
SELECT signal_type, timestamp
FROM community_signal
WHERE spot_id = ?
AND timestamp >= ?
""",
(spot_id, to_iso(max_window)),
).fetchall()
buckets = {
"knock": {"base": -25.0, "half_life": 10.0, "window": 30, "label": "Klopfen gemeldet"},
"noise": {"base": -15.0, "half_life": 7.0, "window": 14, "label": "Lärm gemeldet"},
"calm": {"base": 10.0, "half_life": 7.0, "window": 14, "label": "Ruhig gemeldet"},
"police": {"base": -18.0, "half_life": 10.0, "window": 30, "label": "Polizei-Einsatz gemeldet"},
}
by_type: dict[str, float] = {key: 0.0 for key in buckets}
for row in rows:
signal_type = row["signal_type"]
if signal_type not in buckets:
continue
cfg = buckets[signal_type]
sig_ts = parse_iso8601(row["timestamp"])
age_days = max(0.0, (at_time - sig_ts).total_seconds() / 86400.0)
if age_days > cfg["window"]:
continue
by_type[signal_type] += cfg["base"] * decay(age_days, cfg["half_life"])
out = []
for signal_type, points in by_type.items():
if abs(points) < 0.5:
continue
label = buckets[signal_type]["label"]
out.append(
{
"key": f"community_{signal_type}",
"label": label,
"points": points,
"source": "community",
}
)
return out
def data_source_meta() -> list[dict]:
with get_conn() as conn:
rows = conn.execute(
"SELECT source_name, imported_at, record_count, notes FROM data_source_state ORDER BY imported_at DESC"
).fetchall()
return [dict(row) for row in rows]
def source_health(sources: list[dict], now: dt.datetime) -> dict:
if not sources:
return {"freshest_age_hours": None, "stalest_age_hours": None, "stale_sources": [], "has_data": False}
ages = []
stale_sources = []
for src in sources:
try:
age_h = (now - parse_iso8601(src["imported_at"])).total_seconds() / 3600.0
except Exception:
continue
age_h = max(0.0, age_h)
ages.append((src["source_name"], age_h))
if age_h > 24:
stale_sources.append(src["source_name"])
if not ages:
return {"freshest_age_hours": None, "stalest_age_hours": None, "stale_sources": [], "has_data": False}
freshest = min(age for _, age in ages)
stalest = max(age for _, age in ages)
return {
"freshest_age_hours": round(freshest, 2),
"stalest_age_hours": round(stalest, 2),
"stale_sources": stale_sources,
"has_data": True,
}
def classify_quality(
health: dict, used_fallback_pois: bool, degraded: bool, factor_count: int, community_factor_count: int
) -> dict:
score = 100
reasons = []
if degraded:
score -= 45
reasons.append("Backend im degradierten Modus")
if used_fallback_pois:
score -= 20
reasons.append("Fallback-POIs statt Live-POIs")
if not health.get("has_data"):
score -= 20
reasons.append("Keine Quellenmetadaten")
else:
stalest_age = health.get("stalest_age_hours")
if isinstance(stalest_age, (int, float)):
if stalest_age > 48:
score -= 20
reasons.append("Mindestens eine Quelle >48h alt")
elif stalest_age > 24:
score -= 10
reasons.append("Mindestens eine Quelle >24h alt")
stale_count = len(health.get("stale_sources") or [])
if stale_count >= 2:
score -= 10
reasons.append("Mehrere stale Quellen")
elif stale_count == 1:
score -= 5
reasons.append("Eine stale Quelle")
if factor_count <= 2:
score -= 8
reasons.append("Wenig Score-Faktoren verfuegbar")
if community_factor_count == 0:
score -= 5
reasons.append("Keine aktuellen Community-Signale")
elif community_factor_count >= 2:
score += 4
reasons.append("Mehrere aktuelle Community-Signale")
score = max(0, min(100, int(round(score))))
if score >= 75:
level = "high"
label = "hoch"
elif score >= 45:
level = "medium"
label = "mittel"
else:
level = "low"
label = "niedrig"
return {
"score": score,
"level": level,
"label": label,
"reasons": reasons[:4],
}
def build_explanation(factors: list[dict], spot: dict) -> dict:
details = []
for item in factors:
points = float(item["points"])
impact = "neutral"
if points > 0:
impact = "positive"
elif points < 0:
impact = "negative"
details.append(
{
"key": item["key"],
"label": item["label"],
"points": round(points, 2),
"source": item["source"],
"impact": impact,
}
)
return {
"factors": details,
"spot_context": {
"area_type": spot.get("osm_area_type", "unknown"),
"road_type": spot.get("road_type", "unknown"),
"distance_police_m": int(spot.get("distance_police_m", 5000)),
"distance_fire_m": int(spot.get("distance_fire_m", 5000)),
"distance_hospital_m": int(spot.get("distance_hospital_m", 5000)),
},
}
def compute_score_payload(lat: float, lon: float, at_time: dt.datetime) -> dict:
now_iso = to_iso(utc_now())
spot = ensure_spot(lat, lon, now_iso)
sources = data_source_meta()
health = source_health(sources, utc_now())
night_start, night_end = night_window_for(at_time)
factors = []
area_factor = score_area_modifier(spot["osm_area_type"])
factors.append({"key": area_factor.key, "label": area_factor.label, "points": area_factor.points, "source": "osm"})
if spot["distance_police_m"] < 200:
factors.append({"key": "dist_police", "label": "Polizei in <200m", "points": -15.0, "source": "osm"})
if spot["distance_hospital_m"] < 200:
factors.append({"key": "dist_hospital", "label": "Krankenhaus in <200m", "points": -10.0, "source": "osm"})
if weekend_or_holiday(night_start):
factors.append({"key": "time_weekend", "label": "Wochenende/Feiertag", "points": -10.0, "source": "time"})
else:
factors.append({"key": "time_weekday", "label": "Werktagnacht", "points": 5.0, "source": "time"})
factors.extend(collect_local_event_factors(lat, lon, night_end))
factors.extend(collect_community_factors(spot["id"], at_time))
raw_score = 100.0 + sum(item["points"] for item in factors)
score = clamp_score(raw_score)
sorted_factors = sorted(factors, key=lambda item: abs(item["points"]), reverse=True)
top_reasons = sorted_factors[:4]
reasons = [f"{item['label']} ({item['points']:+.0f})" for item in top_reasons]
quality = classify_quality(
health=health,
used_fallback_pois=bool(spot.get("used_fallback_pois", False)),
degraded=False,
factor_count=len(sorted_factors),
community_factor_count=sum(1 for item in factors if item.get("source") == "community"),
)
return {
"spot_id": spot["id"],
"score": score,
"ampel": ampel(score),
"reasons": reasons,
"factors": top_reasons,
"explanation": build_explanation(sorted_factors, spot),
"night_window": {
"start": to_iso(night_start),
"end": to_iso(night_end),
},
"meta": {
"data_updated_at": now_iso,
"region": "DE-NW (Pilot: Kreis Mettmann)",
"attribution": "Kartendaten: OpenStreetMap-Mitwirkende (ODbL)",
"sources": sources,
"health": health,
"used_fallback_pois": bool(spot.get("used_fallback_pois", False)),
"quality": quality,
},
}
def compute_score_payload_fallback(lat: float, lon: float, at_time: dt.datetime, error_code: str) -> dict:
night_start, night_end = night_window_for(at_time)
factors: list[dict] = []
police_d = nearest_distance_m(lat, lon, FALLBACK_POLICE_POINTS)
fire_d = nearest_distance_m(lat, lon, FALLBACK_FIRE_POINTS)
hosp_d = nearest_distance_m(lat, lon, FALLBACK_HOSPITAL_POINTS)
if police_d < 200:
factors.append({"key": "dist_police", "label": "Polizei in <200m", "points": -15.0, "source": "fallback"})
if hosp_d < 200:
factors.append({"key": "dist_hospital", "label": "Krankenhaus in <200m", "points": -10.0, "source": "fallback"})
if fire_d < 150:
factors.append({"key": "dist_fire", "label": "Feuerwehr in <150m", "points": -6.0, "source": "fallback"})
if weekend_or_holiday(night_start):
factors.append({"key": "time_weekend", "label": "Wochenende/Feiertag", "points": -10.0, "source": "time"})
else:
factors.append({"key": "time_weekday", "label": "Werktagnacht", "points": 5.0, "source": "time"})
raw_score = 100.0 + sum(item["points"] for item in factors)
score = clamp_score(raw_score)
sorted_factors = sorted(factors, key=lambda item: abs(item["points"]), reverse=True)
top_reasons = sorted_factors[:4]
reasons = [f"{item['label']} ({item['points']:+.0f})" for item in top_reasons] or ["Fallback-Berechnung aktiv (0)"]
fallback_spot = {
"osm_area_type": "residential",
"road_type": "unknown",
"distance_police_m": police_d,
"distance_fire_m": fire_d,
"distance_hospital_m": hosp_d,
}
health = {"freshest_age_hours": None, "stalest_age_hours": None, "stale_sources": [], "has_data": False}
quality = classify_quality(
health=health,
used_fallback_pois=True,
degraded=True,
factor_count=len(sorted_factors),
community_factor_count=0,
)
return {
"spot_id": spot_id_for(lat, lon),
"score": score,
"ampel": ampel(score),
"reasons": reasons,
"factors": top_reasons,
"explanation": build_explanation(sorted_factors, fallback_spot),
"night_window": {
"start": to_iso(night_start),
"end": to_iso(night_end),
},
"meta": {
"data_updated_at": to_iso(utc_now()),
"region": "DE-NW (Pilot: Kreis Mettmann)",
"attribution": "Kartendaten: OpenStreetMap-Mitwirkende (ODbL)",
"sources": [],
"health": health,
"used_fallback_pois": True,
"degraded": True,
"degraded_reason": error_code,
"quality": quality,
},
}
def handle_score(handler: BaseHTTPRequestHandler, query: dict[str, list[str]]) -> None:
try:
lat = float(query.get("lat", [""])[0])
lon = float(query.get("lon", [""])[0])
at = parse_iso8601(query.get("at", [""])[0])
except Exception:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_query"})
return
# Accept any valid geographic coordinate so every point can get a score.
if not (-90.0 <= lat <= 90.0 and -180.0 <= lon <= 180.0):
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "lat_lon_out_of_bounds"})
return
try:
payload = compute_score_payload(lat, lon, at)
except Exception:
payload = compute_score_payload_fallback(lat, lon, at, "score_engine_error")
json_response(handler, HTTPStatus.OK, payload)
def handle_signal(handler: BaseHTTPRequestHandler) -> None:
try:
body = read_json(handler)
except Exception:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_json"})
return
spot_id = body.get("spot_id")
signal_type = body.get("signal_type")
device_token = body.get("device_token")
timestamp = parse_iso8601(body.get("timestamp"))
if not isinstance(spot_id, str) or not spot_id:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "spot_id_required"})
return
if signal_type not in {"calm", "noise", "knock", "police"}:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_signal_type"})
return
if not isinstance(device_token, str) or len(device_token) < 16:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_device_token"})
return
hashed = hashed_device(device_token)
now = utc_now()
if timestamp > now + dt.timedelta(minutes=5):
timestamp = now
cooldown_start = timestamp - dt.timedelta(hours=SIGNAL_COOLDOWN_HOURS)
with get_conn() as conn:
spot_exists = conn.execute("SELECT 1 FROM spot WHERE id = ?", (spot_id,)).fetchone()
if not spot_exists:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "unknown_spot_id"})
return
latest = conn.execute(
"""
SELECT timestamp
FROM community_signal
WHERE spot_id = ?
AND hashed_device = ?
AND timestamp >= ?
ORDER BY timestamp DESC
LIMIT 1
""",
(spot_id, hashed, to_iso(cooldown_start)),
).fetchone()
if latest:
next_allowed = parse_iso8601(latest["timestamp"]) + dt.timedelta(hours=SIGNAL_COOLDOWN_HOURS)
json_response(
handler,
HTTPStatus.TOO_MANY_REQUESTS,
{
"accepted": False,
"error": "cooldown_active",
"next_allowed_at": to_iso(next_allowed),
},
)
return
day_bucket = timestamp.date().isoformat()
try:
conn.execute(
"""
INSERT INTO community_signal (
id, spot_id, signal_type, hashed_device, timestamp, day_bucket
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
str(uuid.uuid4()),
spot_id,
signal_type,
hashed,
to_iso(timestamp),
day_bucket,
),
)
except Exception:
json_response(
handler,
HTTPStatus.TOO_MANY_REQUESTS,
{
"accepted": False,
"error": "daily_limit",
},
)
return
json_response(
handler,
HTTPStatus.CREATED,
{
"accepted": True,
"cooldown_hours": SIGNAL_COOLDOWN_HOURS,
},
)
def handle_geocode_search(handler: BaseHTTPRequestHandler, query: dict[str, list[str]]) -> None:
raw_q = (query.get("q") or [""])[0].strip()
if len(raw_q) < 2:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "query_too_short"})
return
if len(raw_q) > 160:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "query_too_long"})
return
params = urlencode(
{
"q": raw_q,
"format": "jsonv2",
"limit": "5",
"addressdetails": "0",
"countrycodes": "de",
}
)
request = Request(
f"{NOMINATIM_BASE_URL}?{params}",
headers={
"User-Agent": NOMINATIM_USER_AGENT,
"Accept": "application/json",
},
)
try:
with urlopen(request, timeout=5) as response:
if response.status != 200:
raise RuntimeError("nominatim_status")
raw = response.read().decode("utf-8")
data = json.loads(raw)
except Exception:
json_response(handler, HTTPStatus.BAD_GATEWAY, {"error": "geocoder_unavailable"})
return
results = []
for item in data[:5]:
try:
lat = float(item.get("lat"))
lon = float(item.get("lon"))
except Exception:
continue
if not (47.0 <= lat <= 55.5 and 5.0 <= lon <= 16.0):
continue
results.append(
{
"display_name": str(item.get("display_name", "Unbekannter Treffer")),
"lat": round(lat, 6),
"lon": round(lon, 6),
}
)
json_response(handler, HTTPStatus.OK, {"results": results})
def handle_tile_proxy(handler: BaseHTTPRequestHandler, path: str) -> None:
parts = path.strip("/").split("/")
if len(parts) != 5 or parts[0] != "map" or parts[1] != "tile":
json_response(handler, HTTPStatus.NOT_FOUND, {"error": "not_found"})
return
z_raw, x_raw, y_raw = parts[2], parts[3], parts[4]
if not y_raw.endswith(".png"):
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_tile_path"})
return
try:
z = int(z_raw)
x = int(x_raw)
y = int(y_raw[:-4])
except Exception:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_tile_path"})
return
if z < 0 or z > 19:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_zoom"})
return
max_tile = 2**z - 1
if x < 0 or x > max_tile or y < 0 or y > max_tile:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "tile_out_of_bounds"})
return
request = Request(
f"{OSM_TILE_BASE_URL}/{z}/{x}/{y}.png",
headers={
"User-Agent": NOMINATIM_USER_AGENT,
"Accept": "image/png",
},
)
try:
with urlopen(request, timeout=5) as response:
if response.status != 200:
raise RuntimeError("tile_status")
payload = response.read()
except Exception:
json_response(handler, HTTPStatus.BAD_GATEWAY, {"error": "tile_unavailable"})
return
binary_response(handler, HTTPStatus.OK, payload, "image/png", cache_control="public, max-age=43200")
def handle_admin_bootstrap_status(handler: BaseHTTPRequestHandler) -> None:
json_response(
handler,
HTTPStatus.OK,
{
"initialized": admin_exists(),
"session_hours": ADMIN_SESSION_HOURS,
"password_policy": {"min_length": 10},
},
)
def handle_admin_bootstrap(handler: BaseHTTPRequestHandler) -> None:
try:
body = read_json(handler)
except Exception:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_json"})
return
if not require_fields(handler, body, ["username", "password"]):
return
ok, status = create_admin_user(body["username"], body["password"])
if not ok:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": status})
return
session = create_admin_session(body["username"].strip())
json_response(handler, HTTPStatus.CREATED, {"created": True, "session": session})
def handle_admin_login(handler: BaseHTTPRequestHandler) -> None:
try:
body = read_json(handler)
except Exception:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_json"})
return
if not require_fields(handler, body, ["username", "password"]):
return
if not admin_exists():
json_response(handler, HTTPStatus.PRECONDITION_FAILED, {"error": "admin_not_initialized"})
return
admin = get_admin_user()
if not admin:
json_response(handler, HTTPStatus.INTERNAL_SERVER_ERROR, {"error": "admin_lookup_failed"})
return
if body["username"].strip() != admin["username"]:
json_response(handler, HTTPStatus.UNAUTHORIZED, {"error": "invalid_credentials"})
return
expected = pbkdf2_hash(body["password"], admin["password_salt"])
if not hmac.compare_digest(expected, admin["password_hash"]):
json_response(handler, HTTPStatus.UNAUTHORIZED, {"error": "invalid_credentials"})
return
session = create_admin_session(admin["username"])
json_response(handler, HTTPStatus.OK, {"login": "ok", "session": session})
def handle_admin_logout(handler: BaseHTTPRequestHandler) -> None:
auth_header = handler.headers.get("Authorization", "")
token = auth_header.replace("Bearer ", "", 1).strip() if auth_header.startswith("Bearer ") else ""
if token:
ADMIN_SESSIONS.pop(token, None)
json_response(handler, HTTPStatus.OK, {"logout": "ok"})
def handle_admin_overview(handler: BaseHTTPRequestHandler) -> None:
ok, username = admin_auth(handler)
if not ok:
json_response(handler, HTTPStatus.UNAUTHORIZED, {"error": username})
return
with get_conn() as conn:
counts = {
"spots": conn.execute("SELECT COUNT(*) AS c FROM spot").fetchone()["c"],
"signals": conn.execute("SELECT COUNT(*) AS c FROM community_signal").fetchone()["c"],
"events": conn.execute("SELECT COUNT(*) AS c FROM open_data_event").fetchone()["c"],
"data_sources": conn.execute("SELECT COUNT(*) AS c FROM data_source_state").fetchone()["c"],
}
latest_signals = [dict(r) for r in conn.execute(
"""
SELECT spot_id, signal_type, timestamp
FROM community_signal
ORDER BY timestamp DESC
LIMIT 20
"""
).fetchall()]
latest_events = [dict(r) for r in conn.execute(
"""
SELECT id, event_type, lat, lon, start_datetime, end_datetime, risk_modifier, source, imported_at
FROM open_data_event
ORDER BY start_datetime DESC
LIMIT 20
"""
).fetchall()]
sources = [dict(r) for r in conn.execute(
"""
SELECT source_name, imported_at, record_count, notes
FROM data_source_state
ORDER BY imported_at DESC
LIMIT 20
"""
).fetchall()]
json_response(
handler,
HTTPStatus.OK,
{
"admin_user": username,
"counts": counts,
"latest_signals": latest_signals,
"latest_events": latest_events,
"data_sources": sources,
},
)
def handle_admin_events_list(handler: BaseHTTPRequestHandler, query: dict[str, list[str]]) -> None:
ok, username = admin_auth(handler)
if not ok:
json_response(handler, HTTPStatus.UNAUTHORIZED, {"error": username})
return
try:
limit = int((query.get("limit") or ["100"])[0])
except Exception:
limit = 100
limit = max(1, min(limit, 500))
with get_conn() as conn:
events = [dict(r) for r in conn.execute(
"""
SELECT id, event_type, lat, lon, start_datetime, end_datetime, risk_modifier, source, imported_at
FROM open_data_event
ORDER BY start_datetime DESC
LIMIT ?
""",
(limit,),
).fetchall()]
json_response(handler, HTTPStatus.OK, {"admin_user": username, "events": events})
def handle_admin_events_create(handler: BaseHTTPRequestHandler) -> None:
ok, username = admin_auth(handler)
if not ok:
json_response(handler, HTTPStatus.UNAUTHORIZED, {"error": username})
return
try:
body = read_json(handler)
except Exception:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_json"})
return
event, error = parse_admin_event(body)
if error:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": error})
return
now_iso = to_iso(utc_now())
event_id = str(uuid.uuid4())
with get_conn() as conn:
conn.execute(
"""
INSERT INTO open_data_event (
id, event_type, lat, lon, start_datetime, end_datetime, risk_modifier, source, imported_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
event_id,
event["event_type"],
event["lat"],
event["lon"],
event["start_datetime"],
event["end_datetime"],
event["risk_modifier"],
event["source"],
now_iso,
),
)
count = conn.execute("SELECT COUNT(*) AS c FROM open_data_event WHERE source = ?", (event["source"],)).fetchone()["c"]
conn.execute(
"""
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(source_name) DO UPDATE SET
imported_at = excluded.imported_at,
record_count = excluded.record_count,
notes = excluded.notes
""",
(event["source"], now_iso, count, f"manual update by {username}"),
)
json_response(handler, HTTPStatus.CREATED, {"created": True, "id": event_id})
def handle_admin_events_update(handler: BaseHTTPRequestHandler, event_id: str) -> None:
ok, username = admin_auth(handler)
if not ok:
json_response(handler, HTTPStatus.UNAUTHORIZED, {"error": username})
return
try:
body = read_json(handler)
except Exception:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": "invalid_json"})
return
event, error = parse_admin_event(body)
if error:
json_response(handler, HTTPStatus.BAD_REQUEST, {"error": error})
return
now_iso = to_iso(utc_now())
with get_conn() as conn:
existing = conn.execute("SELECT source FROM open_data_event WHERE id = ?", (event_id,)).fetchone()
if not existing:
json_response(handler, HTTPStatus.NOT_FOUND, {"error": "event_not_found"})
return
conn.execute(
"""
UPDATE open_data_event
SET event_type = ?, lat = ?, lon = ?, start_datetime = ?, end_datetime = ?, risk_modifier = ?, source = ?, imported_at = ?
WHERE id = ?
""",
(
event["event_type"],
event["lat"],
event["lon"],
event["start_datetime"],
event["end_datetime"],
event["risk_modifier"],
event["source"],
now_iso,
event_id,
),
)
count_new = conn.execute("SELECT COUNT(*) AS c FROM open_data_event WHERE source = ?", (event["source"],)).fetchone()["c"]
conn.execute(
"""
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(source_name) DO UPDATE SET
imported_at = excluded.imported_at,
record_count = excluded.record_count,
notes = excluded.notes
""",
(event["source"], now_iso, count_new, f"manual update by {username}"),
)
old_source = existing["source"]
if old_source != event["source"]:
count_old = conn.execute("SELECT COUNT(*) AS c FROM open_data_event WHERE source = ?", (old_source,)).fetchone()["c"]
if count_old == 0:
conn.execute("DELETE FROM data_source_state WHERE source_name = ?", (old_source,))
else:
conn.execute(
"""
UPDATE data_source_state
SET imported_at = ?, record_count = ?, notes = ?
WHERE source_name = ?
""",
(now_iso, count_old, f"manual update by {username}", old_source),
)
json_response(handler, HTTPStatus.OK, {"updated": True, "id": event_id})
def handle_admin_events_delete(handler: BaseHTTPRequestHandler, event_id: str) -> None:
ok, username = admin_auth(handler)
if not ok:
json_response(handler, HTTPStatus.UNAUTHORIZED, {"error": username})
return
now_iso = to_iso(utc_now())
with get_conn() as conn:
existing = conn.execute("SELECT source FROM open_data_event WHERE id = ?", (event_id,)).fetchone()
if not existing:
json_response(handler, HTTPStatus.NOT_FOUND, {"error": "event_not_found"})
return
source_name = existing["source"]
conn.execute("DELETE FROM open_data_event WHERE id = ?", (event_id,))
count = conn.execute("SELECT COUNT(*) AS c FROM open_data_event WHERE source = ?", (source_name,)).fetchone()["c"]
if count == 0:
conn.execute("DELETE FROM data_source_state WHERE source_name = ?", (source_name,))
else:
conn.execute(
"""
UPDATE data_source_state
SET imported_at = ?, record_count = ?, notes = ?
WHERE source_name = ?
""",
(now_iso, count, f"manual delete by {username}", source_name),
)
json_response(handler, HTTPStatus.OK, {"deleted": True, "id": event_id})
class StaySenseHandler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args) -> None:
return
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
sources = data_source_meta()
json_response(self, HTTPStatus.OK, {"status": "ok", "sources": sources, "health": source_health(sources, utc_now())})
return
if parsed.path == "/spot/score":
query = parse_qs(parsed.query)
handle_score(self, query)
return
if parsed.path == "/geocode/search":
query = parse_qs(parsed.query)
handle_geocode_search(self, query)
return
if parsed.path.startswith("/map/tile/"):
handle_tile_proxy(self, parsed.path)
return
if parsed.path == "/admin/bootstrap/status":
handle_admin_bootstrap_status(self)
return
if parsed.path == "/admin/overview":
handle_admin_overview(self)
return
if parsed.path == "/admin/events":
query = parse_qs(parsed.query)
handle_admin_events_list(self, query)
return
json_response(self, HTTPStatus.NOT_FOUND, {"error": "not_found"})
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/spot/signal":
handle_signal(self)
return
if parsed.path == "/admin/bootstrap":
handle_admin_bootstrap(self)
return
if parsed.path == "/admin/login":
handle_admin_login(self)
return
if parsed.path == "/admin/logout":
handle_admin_logout(self)
return
if parsed.path == "/admin/events":
handle_admin_events_create(self)
return
json_response(self, HTTPStatus.NOT_FOUND, {"error": "not_found"})
def do_PUT(self) -> None:
parsed = urlparse(self.path)
if parsed.path.startswith("/admin/events/"):
event_id = parsed.path.replace("/admin/events/", "", 1).strip()
if not event_id:
json_response(self, HTTPStatus.BAD_REQUEST, {"error": "event_id_required"})
return
handle_admin_events_update(self, event_id)
return
json_response(self, HTTPStatus.NOT_FOUND, {"error": "not_found"})
def do_DELETE(self) -> None:
parsed = urlparse(self.path)
if parsed.path.startswith("/admin/events/"):
event_id = parsed.path.replace("/admin/events/", "", 1).strip()
if not event_id:
json_response(self, HTTPStatus.BAD_REQUEST, {"error": "event_id_required"})
return
handle_admin_events_delete(self, event_id)
return
json_response(self, HTTPStatus.NOT_FOUND, {"error": "not_found"})
def main() -> None:
init_db()
server = ThreadingHTTPServer((HOST, PORT), StaySenseHandler)
print(f"StaySense API listening on http://{HOST}:{PORT}")
server.serve_forever()
if __name__ == "__main__":
main()