feat: add NRW live open-data source config and advanced mapping

This commit is contained in:
Oliver 2026-02-15 15:46:44 +01:00
parent e44791fd30
commit 05745aae37
No known key found for this signature in database
4 changed files with 225 additions and 9 deletions

View file

@ -59,6 +59,15 @@ Datei: `docs/open_data_sources.json`
- `start_datetime < end_datetime` - `start_datetime < end_datetime`
- Event-Typ muss auf `market|waste|event|construction` normalisiert werden - Event-Typ muss auf `market|waste|event|construction` normalisiert werden
Live-Beispiel fuer echte NRW-OpenData-Quellen:
- Datei: `docs/open_data_sources_nrw_live.json`
- Enthalten:
- Koeln Verkehrsbeeintraechtigungen (ArcGIS JSON)
- Koeln Baustellen Notfall (WFS GeoJSON)
- Import:
- `cd StaySense/backend`
- `python3 run_import_jobs.py --config ../docs/open_data_sources_nrw_live.json --prune-legacy`
## Tests ## Tests
- Backend Unit-Tests: - Backend Unit-Tests:

View file

@ -2,6 +2,8 @@ import csv
import datetime as dt import datetime as dt
import hashlib import hashlib
import json import json
import math
import subprocess
from pathlib import Path from pathlib import Path
from urllib import request from urllib import request
@ -16,12 +18,90 @@ def now_iso() -> str:
def normalize_iso(value: str) -> str: def normalize_iso(value: str) -> str:
value = value.strip()
if value.isdigit():
# ArcGIS dates are commonly unix timestamps in milliseconds.
ts = int(value)
if ts > 10_000_000_000:
ts = ts / 1000.0
parsed = dt.datetime.fromtimestamp(ts, tz=dt.timezone.utc)
return parsed.replace(microsecond=0).isoformat().replace("+00:00", "Z")
parsed = dt.datetime.fromisoformat(value.replace("Z", "+00:00")) parsed = dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
if parsed.tzinfo is None: if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc) parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") return parsed.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _get_by_path(record: dict, path: str):
if not path:
return None
current = record
for token in path.split("."):
if isinstance(current, dict):
current = current.get(token)
continue
if isinstance(current, list):
try:
idx = int(token)
except Exception:
return None
if idx < 0 or idx >= len(current):
return None
current = current[idx]
continue
return None
return current
def _as_text(value) -> str:
return "" if value is None else str(value).strip()
def _utm_epsg25832_to_wgs84(easting: float, northing: float) -> tuple[float, float]:
# WGS84 / UTM zone 32N (EPSG:25832) to lat/lon conversion.
a = 6378137.0
e = 0.08181919084262149
e1sq = 0.00673949674227643
k0 = 0.9996
x = easting - 500000.0
y = northing
zone_cm = 9.0 # UTM zone 32 central meridian
m = y / k0
mu = m / (a * (1 - e**2 / 4 - 3 * e**4 / 64 - 5 * e**6 / 256))
e1 = (1 - math.sqrt(1 - e**2)) / (1 + math.sqrt(1 - e**2))
j1 = 3 * e1 / 2 - 27 * e1**3 / 32
j2 = 21 * e1**2 / 16 - 55 * e1**4 / 32
j3 = 151 * e1**3 / 96
j4 = 1097 * e1**4 / 512
fp = mu + j1 * math.sin(2 * mu) + j2 * math.sin(4 * mu) + j3 * math.sin(6 * mu) + j4 * math.sin(8 * mu)
sin_fp = math.sin(fp)
cos_fp = math.cos(fp)
tan_fp = math.tan(fp)
c1 = e1sq * cos_fp**2
t1 = tan_fp**2
r1 = a * (1 - e**2) / (1 - e**2 * sin_fp**2) ** 1.5
n1 = a / math.sqrt(1 - e**2 * sin_fp**2)
d = x / (n1 * k0)
q1 = n1 * tan_fp / r1
q2 = d**2 / 2
q3 = (5 + 3 * t1 + 10 * c1 - 4 * c1**2 - 9 * e1sq) * d**4 / 24
q4 = (61 + 90 * t1 + 298 * c1 + 45 * t1**2 - 252 * e1sq - 3 * c1**2) * d**6 / 720
lat = fp - q1 * (q2 - q3 + q4)
q5 = d
q6 = (1 + 2 * t1 + c1) * d**3 / 6
q7 = (5 - 2 * c1 + 28 * t1 - 3 * c1**2 + 8 * e1sq + 24 * t1**2) * d**5 / 120
lon = math.radians(zone_cm) + (q5 - q6 + q7) / cos_fp
return math.degrees(lat), math.degrees(lon)
def _stable_id(source_name: str, external_id: str | None, payload: dict) -> str: def _stable_id(source_name: str, external_id: str | None, payload: dict) -> str:
if external_id: if external_id:
raw = f"{source_name}:{external_id}" raw = f"{source_name}:{external_id}"
@ -61,8 +141,13 @@ def _extract_json_records(payload: dict | list, json_path: str | None) -> list[d
def _read_text(location: str, config_dir: Path) -> str: def _read_text(location: str, config_dir: Path) -> str:
if location.startswith("http://") or location.startswith("https://"): if location.startswith("http://") or location.startswith("https://"):
try:
with request.urlopen(location, timeout=60) as resp: with request.urlopen(location, timeout=60) as resp:
return resp.read().decode("utf-8", errors="replace") return resp.read().decode("utf-8", errors="replace")
except Exception:
# Fallback for environments where Python DNS/network stack is restricted.
raw = subprocess.check_output(["curl", "-sL", location], timeout=60)
return raw.decode("utf-8", errors="replace")
file_path = Path(location) file_path = Path(location)
if not file_path.is_absolute(): if not file_path.is_absolute():
@ -131,6 +216,8 @@ def import_from_source(source_cfg: dict, config_dir: Path) -> dict:
defaults = source_cfg.get("default_values") or {} defaults = source_cfg.get("default_values") or {}
event_type_map = {str(k).lower(): str(v).lower() for k, v in (source_cfg.get("event_type_map") or {}).items()} event_type_map = {str(k).lower(): str(v).lower() for k, v in (source_cfg.get("event_type_map") or {}).items()}
json_path = source_cfg.get("json_path") json_path = source_cfg.get("json_path")
date_range_cfg = source_cfg.get("date_range")
coord_crs = str(source_cfg.get("coord_crs", "")).upper().strip()
text = _read_text(location, config_dir) text = _read_text(location, config_dir)
@ -163,18 +250,36 @@ def import_from_source(source_cfg: dict, config_dir: Path) -> dict:
event_type_key = field_map.get("event_type", "event_type") event_type_key = field_map.get("event_type", "event_type")
external_id_key = field_map.get("external_id") external_id_key = field_map.get("external_id")
raw_event_type = str(record.get(event_type_key, defaults.get("event_type", ""))) raw_event_type = _as_text(_get_by_path(record, event_type_key))
if not raw_event_type:
raw_event_type = _as_text(defaults.get("event_type", ""))
event_type = _map_event_type(raw_event_type, event_type_map, defaults.get("event_type")) event_type = _map_event_type(raw_event_type, event_type_map, defaults.get("event_type"))
if not event_type: if not event_type:
stats["rejected_invalid_event_type"] += 1 stats["rejected_invalid_event_type"] += 1
continue continue
try: try:
lat = float(str(record.get(lat_key, defaults.get("lat", ""))).strip()) lat_raw = _get_by_path(record, lat_key)
lon = float(str(record.get(lon_key, defaults.get("lon", ""))).strip()) lon_raw = _get_by_path(record, lon_key)
start_dt = str(record.get(start_key, defaults.get("start_datetime", "")).strip()) lat = float(_as_text(lat_raw) or _as_text(defaults.get("lat", "")))
end_dt = str(record.get(end_key, defaults.get("end_datetime", "")).strip()) lon = float(_as_text(lon_raw) or _as_text(defaults.get("lon", "")))
risk_modifier = int(str(record.get(risk_key, defaults.get("risk_modifier", "0"))).strip()) if coord_crs == "EPSG:25832":
lat, lon = _utm_epsg25832_to_wgs84(lat, lon)
start_dt = _as_text(_get_by_path(record, start_key)) or _as_text(defaults.get("start_datetime", ""))
end_dt = _as_text(_get_by_path(record, end_key)) or _as_text(defaults.get("end_datetime", ""))
if date_range_cfg and (not start_dt or not end_dt):
range_field = _as_text(_get_by_path(record, date_range_cfg.get("field", "")))
separator = date_range_cfg.get("separator", " bis ")
date_fmt = date_range_cfg.get("input_date_format", "%d-%m-%Y")
if separator in range_field:
start_raw, end_raw = [part.strip() for part in range_field.split(separator, 1)]
start_dt = dt.datetime.strptime(start_raw, date_fmt).replace(tzinfo=dt.timezone.utc).isoformat()
end_dt = dt.datetime.strptime(end_raw, date_fmt).replace(tzinfo=dt.timezone.utc).isoformat()
risk_raw = _get_by_path(record, risk_key)
risk_modifier = int(_as_text(risk_raw) or _as_text(defaults.get("risk_modifier", "0")))
except Exception: except Exception:
stats["rejected_parse_error"] += 1 stats["rejected_parse_error"] += 1
continue continue
@ -203,7 +308,7 @@ def import_from_source(source_cfg: dict, config_dir: Path) -> dict:
external_id = None external_id = None
if external_id_key: if external_id_key:
value = record.get(external_id_key) value = _get_by_path(record, external_id_key)
external_id = str(value).strip() if value is not None else None external_id = str(value).strip() if value is not None else None
payload = { payload = {

35
docs/OPEN_DATA_NRW.md Normal file
View file

@ -0,0 +1,35 @@
# OpenData NRW Quellen (StaySense)
Diese Datei beschreibt die aktuell integrierten externen OpenData-Quellen fuer den MVP.
## Aktiv integrierbare Quellen
1. Verkehrsbeeintraechtigungen Stadt Koeln (JSON / ArcGIS)
- Herkunft: Offene Daten Koeln / Open.NRW
- Lizenz: Datenlizenz Deutschland Zero 2.0
- Datensatzseite:
- `https://offenedaten-koeln.de/dataset/verkehrsbeeintr%C3%A4chtigungen-stadt-k%C3%B6ln`
- API-Endpoint:
- `https://geoportal.stadt-koeln.de/arcgis/rest/services/verkehr/verkehrskalender/MapServer/0/query?where=objectid%20is%20not%20null&outFields=*&returnGeometry=true&outSR=4326&f=pjson`
2. Baustellen Notfall Stadt Koeln (WFS / GeoJSON)
- Herkunft: Offene Daten Koeln / Open.NRW
- Lizenz: Datenlizenz Deutschland Zero 2.0
- Datensatzseite:
- `https://offenedaten-koeln.de/dataset/baustellen-k%C3%B6ln`
- API-Endpoint:
- `https://geoportal.stadt-koeln.de/wss/service/baustellen_wfs/guest?service=WFS&version=1.1.0&request=GetFeature&typeName=ms:notfall&outputFormat=application/json;%20subtype=geojson`
## Nutzung im Projekt
- Konfiguration:
- `docs/open_data_sources_nrw_live.json`
- Import ausfuehren:
- `cd StaySense/backend`
- `python3 run_import_jobs.py --config ../docs/open_data_sources_nrw_live.json --prune-legacy`
## Technische Hinweise
- ArcGIS-Quellen liefern teils Datumswerte als Unix-Zeitstempel (Millisekunden). Der Connector normalisiert diese automatisch auf ISO8601 UTC.
- WFS-Baustellen liefern Koordinaten in EPSG:25832. Der Connector transformiert diese fuer StaySense auf WGS84 (lat/lon).
- Datumsbereich aus WFS (`Genehmigungs-Zeitraum`) wird per `date_range`-Mapping in Start/Ende aufgeteilt.

View file

@ -0,0 +1,67 @@
{
"sources": [
{
"id": "koeln_verkehrsbeeintraechtigungen_arcgis",
"enabled": true,
"format": "json",
"url": "https://geoportal.stadt-koeln.de/arcgis/rest/services/verkehr/verkehrskalender/MapServer/0/query?where=objectid%20is%20not%20null&outFields=*&returnGeometry=true&outSR=4326&f=pjson",
"json_path": "features",
"source_name": "koeln_verkehrsbeeintraechtigungen",
"field_map": {
"external_id": "attributes.OBJECTID",
"lat": "geometry.y",
"lon": "geometry.x",
"event_type": "attributes.TYP",
"start_datetime": "attributes.DATUM_VON",
"end_datetime": "attributes.DATUM_BIS"
},
"event_type_map": {
"1": "event",
"2": "construction",
"3": "construction",
"4": "event",
"5": "event",
"6": "event",
"7": "event",
"8": "event",
"9": "construction",
"10": "event",
"11": "event",
"12": "event",
"13": "event",
"14": "event",
"15": "event"
},
"default_values": {
"risk_modifier": -10
}
},
{
"id": "koeln_baustellen_notfall_wfs",
"enabled": true,
"format": "json",
"url": "https://geoportal.stadt-koeln.de/wss/service/baustellen_wfs/guest?service=WFS&version=1.1.0&request=GetFeature&typeName=ms:notfall&outputFormat=application/json;%20subtype=geojson",
"json_path": "features",
"source_name": "koeln_baustellen_notfall",
"coord_crs": "EPSG:25832",
"field_map": {
"external_id": "properties.Aktenzeichen",
"lat": "geometry.coordinates.1",
"lon": "geometry.coordinates.0",
"event_type": "properties.Kategorie",
"risk_modifier": "properties.Risikowert"
},
"event_type_map": {
"notfall": "construction"
},
"date_range": {
"field": "properties.Genehmigungs-Zeitraum",
"separator": " bis ",
"input_date_format": "%d-%m-%Y"
},
"default_values": {
"risk_modifier": -15
}
}
]
}