StaySense/backend/open_data_connector.py

426 lines
16 KiB
Python

import csv
import datetime as dt
import hashlib
import json
import math
import subprocess
from pathlib import Path
from urllib import request
from db import get_conn, init_db
VALID_TYPES = {"market", "waste", "event", "construction"}
DE_BOUNDS = {"lat_min": 47.0, "lat_max": 55.5, "lon_min": 5.0, "lon_max": 16.0}
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def normalize_iso(value: str) -> str:
value = value.strip()
if value.isdigit():
# ArcGIS dates are commonly unix timestamps in milliseconds.
ts = int(value)
if ts > 10_000_000_000:
ts = ts / 1000.0
parsed = dt.datetime.fromtimestamp(ts, tz=dt.timezone.utc)
return parsed.replace(microsecond=0).isoformat().replace("+00:00", "Z")
parsed = dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _get_by_path(record: dict, path: str):
if not path:
return None
current = record
for token in path.split("."):
if isinstance(current, dict):
current = current.get(token)
continue
if isinstance(current, list):
try:
idx = int(token)
except Exception:
return None
if idx < 0 or idx >= len(current):
return None
current = current[idx]
continue
return None
return current
def _as_text(value) -> str:
return "" if value is None else str(value).strip()
def _utm_epsg25832_to_wgs84(easting: float, northing: float) -> tuple[float, float]:
# WGS84 / UTM zone 32N (EPSG:25832) to lat/lon conversion.
a = 6378137.0
e = 0.08181919084262149
e1sq = 0.00673949674227643
k0 = 0.9996
x = easting - 500000.0
y = northing
zone_cm = 9.0 # UTM zone 32 central meridian
m = y / k0
mu = m / (a * (1 - e**2 / 4 - 3 * e**4 / 64 - 5 * e**6 / 256))
e1 = (1 - math.sqrt(1 - e**2)) / (1 + math.sqrt(1 - e**2))
j1 = 3 * e1 / 2 - 27 * e1**3 / 32
j2 = 21 * e1**2 / 16 - 55 * e1**4 / 32
j3 = 151 * e1**3 / 96
j4 = 1097 * e1**4 / 512
fp = mu + j1 * math.sin(2 * mu) + j2 * math.sin(4 * mu) + j3 * math.sin(6 * mu) + j4 * math.sin(8 * mu)
sin_fp = math.sin(fp)
cos_fp = math.cos(fp)
tan_fp = math.tan(fp)
c1 = e1sq * cos_fp**2
t1 = tan_fp**2
r1 = a * (1 - e**2) / (1 - e**2 * sin_fp**2) ** 1.5
n1 = a / math.sqrt(1 - e**2 * sin_fp**2)
d = x / (n1 * k0)
q1 = n1 * tan_fp / r1
q2 = d**2 / 2
q3 = (5 + 3 * t1 + 10 * c1 - 4 * c1**2 - 9 * e1sq) * d**4 / 24
q4 = (61 + 90 * t1 + 298 * c1 + 45 * t1**2 - 252 * e1sq - 3 * c1**2) * d**6 / 720
lat = fp - q1 * (q2 - q3 + q4)
q5 = d
q6 = (1 + 2 * t1 + c1) * d**3 / 6
q7 = (5 - 2 * c1 + 28 * t1 - 3 * c1**2 + 8 * e1sq + 24 * t1**2) * d**5 / 120
lon = math.radians(zone_cm) + (q5 - q6 + q7) / cos_fp
return math.degrees(lat), math.degrees(lon)
def _stable_id(source_name: str, external_id: str | None, payload: dict) -> str:
if external_id:
raw = f"{source_name}:{external_id}"
else:
raw = (
f"{source_name}:{payload['event_type']}:{payload['lat']:.6f}:{payload['lon']:.6f}:"
f"{payload['start_datetime']}:{payload['end_datetime']}:{payload['risk_modifier']}"
)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _map_event_type(raw_value: str, event_type_map: dict[str, str], default_value: str | None) -> str | None:
raw = (raw_value or "").strip().lower()
if raw and raw in event_type_map:
raw = event_type_map[raw]
if not raw and default_value:
raw = default_value
if raw in VALID_TYPES:
return raw
return None
def _extract_json_records(payload: dict | list, json_path: str | None) -> list[dict]:
current = payload
if json_path:
for part in json_path.split("."):
if isinstance(current, dict):
current = current.get(part)
else:
current = None
if current is None:
return []
if isinstance(current, list):
return [item for item in current if isinstance(item, dict)]
return []
def _read_text(location: str, config_dir: Path) -> str:
if location.startswith("http://") or location.startswith("https://"):
try:
with request.urlopen(location, timeout=60) as resp:
return resp.read().decode("utf-8", errors="replace")
except Exception:
# Fallback for environments where Python DNS/network stack is restricted.
raw = subprocess.check_output(["curl", "-sL", location], timeout=60)
return raw.decode("utf-8", errors="replace")
file_path = Path(location)
if not file_path.is_absolute():
file_path = (config_dir / location).resolve()
return file_path.read_text(encoding="utf-8")
def import_event_rows(rows: list[dict], source_name: str, notes: str) -> dict:
init_db()
imported_at = now_iso()
db_rows = []
seen_ids = set()
for item in rows:
event_type = item.get("event_type")
if event_type not in VALID_TYPES:
continue
if item["id"] in seen_ids:
continue
seen_ids.add(item["id"])
db_rows.append(
(
item["id"],
event_type,
float(item["lat"]),
float(item["lon"]),
normalize_iso(item["start_datetime"]),
normalize_iso(item["end_datetime"]),
int(item["risk_modifier"]),
source_name,
imported_at,
)
)
with get_conn() as conn:
conn.execute("DELETE FROM open_data_event WHERE source = ?", (source_name,))
conn.executemany(
"""
INSERT INTO open_data_event (
id, event_type, lat, lon, start_datetime, end_datetime,
risk_modifier, source, imported_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
db_rows,
)
conn.execute(
"""
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(source_name) DO UPDATE SET
imported_at = excluded.imported_at,
record_count = excluded.record_count,
notes = excluded.notes
""",
(source_name, imported_at, len(db_rows), notes),
)
return {"rows": len(db_rows), "source_name": source_name}
def import_from_source(source_cfg: dict, config_dir: Path) -> dict:
source_name = source_cfg.get("source_name") or source_cfg.get("id") or "open_data_source"
location = source_cfg.get("url") or source_cfg.get("file")
if not location:
raise ValueError(f"source '{source_name}' missing url/file")
source_format = (source_cfg.get("format") or "csv").lower()
field_map = source_cfg.get("field_map") or {}
defaults = source_cfg.get("default_values") or {}
event_type_map = {str(k).lower(): str(v).lower() for k, v in (source_cfg.get("event_type_map") or {}).items()}
json_path = source_cfg.get("json_path")
date_range_cfg = source_cfg.get("date_range")
coord_crs = str(source_cfg.get("coord_crs", "")).upper().strip()
text = _read_text(location, config_dir)
records: list[dict]
if source_format == "csv":
reader = csv.DictReader(text.splitlines())
records = [dict(row) for row in reader]
elif source_format == "json":
payload = json.loads(text)
records = _extract_json_records(payload, json_path)
else:
raise ValueError(f"unsupported format: {source_format}")
rows = []
stats = {
"input_records": len(records),
"accepted": 0,
"rejected_invalid_event_type": 0,
"rejected_parse_error": 0,
"rejected_missing_datetime": 0,
"rejected_out_of_bounds": 0,
"rejected_invalid_window": 0,
}
for record in records:
lat_key = field_map.get("lat", "lat")
lon_key = field_map.get("lon", "lon")
start_key = field_map.get("start_datetime", "start_datetime")
end_key = field_map.get("end_datetime", "end_datetime")
risk_key = field_map.get("risk_modifier", "risk_modifier")
event_type_key = field_map.get("event_type", "event_type")
external_id_key = field_map.get("external_id")
raw_event_type = _as_text(_get_by_path(record, event_type_key))
if not raw_event_type:
raw_event_type = _as_text(defaults.get("event_type", ""))
event_type = _map_event_type(raw_event_type, event_type_map, defaults.get("event_type"))
if not event_type:
stats["rejected_invalid_event_type"] += 1
continue
try:
lat_raw = _get_by_path(record, lat_key)
lon_raw = _get_by_path(record, lon_key)
lat = float(_as_text(lat_raw) or _as_text(defaults.get("lat", "")))
lon = float(_as_text(lon_raw) or _as_text(defaults.get("lon", "")))
if coord_crs == "EPSG:25832":
lat, lon = _utm_epsg25832_to_wgs84(lon, lat)
start_dt = _as_text(_get_by_path(record, start_key)) or _as_text(defaults.get("start_datetime", ""))
end_dt = _as_text(_get_by_path(record, end_key)) or _as_text(defaults.get("end_datetime", ""))
if date_range_cfg and (not start_dt or not end_dt):
range_field = _as_text(_get_by_path(record, date_range_cfg.get("field", "")))
separator = date_range_cfg.get("separator", " bis ")
date_fmt = date_range_cfg.get("input_date_format", "%d-%m-%Y")
if separator in range_field:
start_raw, end_raw = [part.strip() for part in range_field.split(separator, 1)]
start_dt = dt.datetime.strptime(start_raw, date_fmt).replace(tzinfo=dt.timezone.utc).isoformat()
end_dt = dt.datetime.strptime(end_raw, date_fmt).replace(tzinfo=dt.timezone.utc).isoformat()
risk_raw = _get_by_path(record, risk_key)
risk_modifier = int(_as_text(risk_raw) or _as_text(defaults.get("risk_modifier", "0")))
except Exception:
stats["rejected_parse_error"] += 1
continue
if not start_dt or not end_dt:
stats["rejected_missing_datetime"] += 1
continue
if not (
DE_BOUNDS["lat_min"] <= lat <= DE_BOUNDS["lat_max"]
and DE_BOUNDS["lon_min"] <= lon <= DE_BOUNDS["lon_max"]
):
stats["rejected_out_of_bounds"] += 1
continue
try:
start_iso = normalize_iso(start_dt)
end_iso = normalize_iso(end_dt)
except Exception:
stats["rejected_parse_error"] += 1
continue
if end_iso <= start_iso:
stats["rejected_invalid_window"] += 1
continue
external_id = None
if external_id_key:
value = _get_by_path(record, external_id_key)
external_id = str(value).strip() if value is not None else None
payload = {
"event_type": event_type,
"lat": lat,
"lon": lon,
"start_datetime": start_iso,
"end_datetime": end_iso,
"risk_modifier": risk_modifier,
}
payload["id"] = _stable_id(source_name, external_id, payload)
rows.append(payload)
stats["accepted"] += 1
note = f"connector={source_cfg.get('id', source_name)} location={location} format={source_format}"
result = import_event_rows(rows, source_name=source_name, notes=note)
result["connector_id"] = source_cfg.get("id", source_name)
result["stats"] = stats
return result
def prune_sources_not_in_config(config_source_names: set[str], protected_sources: set[str] | None = None) -> dict:
protected = protected_sources or {"osm_overpass"}
with get_conn() as conn:
rows = conn.execute("SELECT DISTINCT source FROM open_data_event").fetchall()
existing_sources = {row["source"] for row in rows}
state_rows = conn.execute("SELECT source_name FROM data_source_state").fetchall()
existing_state_sources = {row["source_name"] for row in state_rows}
to_prune = sorted((existing_sources - config_source_names) - protected)
state_to_mark = sorted((existing_state_sources - config_source_names) - protected)
pruned_rows = 0
ts = now_iso()
for source_name in to_prune:
count = conn.execute("SELECT COUNT(*) AS c FROM open_data_event WHERE source = ?", (source_name,)).fetchone()["c"]
pruned_rows += int(count)
conn.execute("DELETE FROM open_data_event WHERE source = ?", (source_name,))
for source_name in state_to_mark:
conn.execute(
"""
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(source_name) DO UPDATE SET
imported_at = excluded.imported_at,
record_count = excluded.record_count,
notes = excluded.notes
""",
(source_name, ts, 0, "pruned_not_in_config"),
)
return {"sources": sorted(set(to_prune + state_to_mark)), "rows": pruned_rows}
def import_from_config(config_path: Path, prune_legacy: bool = False) -> dict:
cfg = json.loads(config_path.read_text(encoding="utf-8"))
sources = cfg.get("sources")
if not isinstance(sources, list):
raise ValueError("config requires 'sources' list")
imported = []
skipped = []
disabled_source_names = []
all_source_names = set()
for source in sources:
if not isinstance(source, dict):
continue
source_name = source.get("source_name") or source.get("id") or "open_data_source"
all_source_names.add(source_name)
if not source.get("enabled", False):
skipped.append(source.get("id", "unknown"))
disabled_source_names.append(source_name)
continue
try:
imported.append(import_from_source(source, config_path.parent))
except Exception as exc:
imported.append(
{
"connector_id": source.get("id", source_name),
"source_name": source_name,
"rows": 0,
"error": str(exc),
}
)
if disabled_source_names:
ts = now_iso()
with get_conn() as conn:
for source_name in disabled_source_names:
conn.execute("DELETE FROM open_data_event WHERE source = ?", (source_name,))
conn.execute(
"""
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
VALUES (?, ?, ?, ?)
ON CONFLICT(source_name) DO UPDATE SET
imported_at = excluded.imported_at,
record_count = excluded.record_count,
notes = excluded.notes
""",
(source_name, ts, 0, "disabled in config"),
)
pruned = {"sources": [], "rows": 0}
if prune_legacy:
pruned = prune_sources_not_in_config(all_source_names)
return {
"imported": imported,
"skipped": skipped,
"pruned": pruned,
"count": len(imported),
}