426 lines
16 KiB
Python
426 lines
16 KiB
Python
import csv
|
|
import datetime as dt
|
|
import hashlib
|
|
import json
|
|
import math
|
|
import subprocess
|
|
from pathlib import Path
|
|
from urllib import request
|
|
|
|
from db import get_conn, init_db
|
|
|
|
VALID_TYPES = {"market", "waste", "event", "construction"}
|
|
DE_BOUNDS = {"lat_min": 47.0, "lat_max": 55.5, "lon_min": 5.0, "lon_max": 16.0}
|
|
|
|
|
|
def now_iso() -> str:
|
|
return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def normalize_iso(value: str) -> str:
|
|
value = value.strip()
|
|
if value.isdigit():
|
|
# ArcGIS dates are commonly unix timestamps in milliseconds.
|
|
ts = int(value)
|
|
if ts > 10_000_000_000:
|
|
ts = ts / 1000.0
|
|
parsed = dt.datetime.fromtimestamp(ts, tz=dt.timezone.utc)
|
|
return parsed.replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
parsed = dt.datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=dt.timezone.utc)
|
|
return parsed.astimezone(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def _get_by_path(record: dict, path: str):
|
|
if not path:
|
|
return None
|
|
current = record
|
|
for token in path.split("."):
|
|
if isinstance(current, dict):
|
|
current = current.get(token)
|
|
continue
|
|
if isinstance(current, list):
|
|
try:
|
|
idx = int(token)
|
|
except Exception:
|
|
return None
|
|
if idx < 0 or idx >= len(current):
|
|
return None
|
|
current = current[idx]
|
|
continue
|
|
return None
|
|
return current
|
|
|
|
|
|
def _as_text(value) -> str:
|
|
return "" if value is None else str(value).strip()
|
|
|
|
|
|
def _utm_epsg25832_to_wgs84(easting: float, northing: float) -> tuple[float, float]:
|
|
# WGS84 / UTM zone 32N (EPSG:25832) to lat/lon conversion.
|
|
a = 6378137.0
|
|
e = 0.08181919084262149
|
|
e1sq = 0.00673949674227643
|
|
k0 = 0.9996
|
|
|
|
x = easting - 500000.0
|
|
y = northing
|
|
zone_cm = 9.0 # UTM zone 32 central meridian
|
|
|
|
m = y / k0
|
|
mu = m / (a * (1 - e**2 / 4 - 3 * e**4 / 64 - 5 * e**6 / 256))
|
|
e1 = (1 - math.sqrt(1 - e**2)) / (1 + math.sqrt(1 - e**2))
|
|
|
|
j1 = 3 * e1 / 2 - 27 * e1**3 / 32
|
|
j2 = 21 * e1**2 / 16 - 55 * e1**4 / 32
|
|
j3 = 151 * e1**3 / 96
|
|
j4 = 1097 * e1**4 / 512
|
|
fp = mu + j1 * math.sin(2 * mu) + j2 * math.sin(4 * mu) + j3 * math.sin(6 * mu) + j4 * math.sin(8 * mu)
|
|
|
|
sin_fp = math.sin(fp)
|
|
cos_fp = math.cos(fp)
|
|
tan_fp = math.tan(fp)
|
|
|
|
c1 = e1sq * cos_fp**2
|
|
t1 = tan_fp**2
|
|
r1 = a * (1 - e**2) / (1 - e**2 * sin_fp**2) ** 1.5
|
|
n1 = a / math.sqrt(1 - e**2 * sin_fp**2)
|
|
d = x / (n1 * k0)
|
|
|
|
q1 = n1 * tan_fp / r1
|
|
q2 = d**2 / 2
|
|
q3 = (5 + 3 * t1 + 10 * c1 - 4 * c1**2 - 9 * e1sq) * d**4 / 24
|
|
q4 = (61 + 90 * t1 + 298 * c1 + 45 * t1**2 - 252 * e1sq - 3 * c1**2) * d**6 / 720
|
|
lat = fp - q1 * (q2 - q3 + q4)
|
|
|
|
q5 = d
|
|
q6 = (1 + 2 * t1 + c1) * d**3 / 6
|
|
q7 = (5 - 2 * c1 + 28 * t1 - 3 * c1**2 + 8 * e1sq + 24 * t1**2) * d**5 / 120
|
|
lon = math.radians(zone_cm) + (q5 - q6 + q7) / cos_fp
|
|
|
|
return math.degrees(lat), math.degrees(lon)
|
|
|
|
|
|
def _stable_id(source_name: str, external_id: str | None, payload: dict) -> str:
|
|
if external_id:
|
|
raw = f"{source_name}:{external_id}"
|
|
else:
|
|
raw = (
|
|
f"{source_name}:{payload['event_type']}:{payload['lat']:.6f}:{payload['lon']:.6f}:"
|
|
f"{payload['start_datetime']}:{payload['end_datetime']}:{payload['risk_modifier']}"
|
|
)
|
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _map_event_type(raw_value: str, event_type_map: dict[str, str], default_value: str | None) -> str | None:
|
|
raw = (raw_value or "").strip().lower()
|
|
if raw and raw in event_type_map:
|
|
raw = event_type_map[raw]
|
|
if not raw and default_value:
|
|
raw = default_value
|
|
if raw in VALID_TYPES:
|
|
return raw
|
|
return None
|
|
|
|
|
|
def _extract_json_records(payload: dict | list, json_path: str | None) -> list[dict]:
|
|
current = payload
|
|
if json_path:
|
|
for part in json_path.split("."):
|
|
if isinstance(current, dict):
|
|
current = current.get(part)
|
|
else:
|
|
current = None
|
|
if current is None:
|
|
return []
|
|
if isinstance(current, list):
|
|
return [item for item in current if isinstance(item, dict)]
|
|
return []
|
|
|
|
|
|
def _read_text(location: str, config_dir: Path) -> str:
|
|
if location.startswith("http://") or location.startswith("https://"):
|
|
try:
|
|
with request.urlopen(location, timeout=60) as resp:
|
|
return resp.read().decode("utf-8", errors="replace")
|
|
except Exception:
|
|
# Fallback for environments where Python DNS/network stack is restricted.
|
|
raw = subprocess.check_output(["curl", "-sL", location], timeout=60)
|
|
return raw.decode("utf-8", errors="replace")
|
|
|
|
file_path = Path(location)
|
|
if not file_path.is_absolute():
|
|
file_path = (config_dir / location).resolve()
|
|
return file_path.read_text(encoding="utf-8")
|
|
|
|
|
|
def import_event_rows(rows: list[dict], source_name: str, notes: str) -> dict:
|
|
init_db()
|
|
imported_at = now_iso()
|
|
|
|
db_rows = []
|
|
seen_ids = set()
|
|
for item in rows:
|
|
event_type = item.get("event_type")
|
|
if event_type not in VALID_TYPES:
|
|
continue
|
|
if item["id"] in seen_ids:
|
|
continue
|
|
seen_ids.add(item["id"])
|
|
|
|
db_rows.append(
|
|
(
|
|
item["id"],
|
|
event_type,
|
|
float(item["lat"]),
|
|
float(item["lon"]),
|
|
normalize_iso(item["start_datetime"]),
|
|
normalize_iso(item["end_datetime"]),
|
|
int(item["risk_modifier"]),
|
|
source_name,
|
|
imported_at,
|
|
)
|
|
)
|
|
|
|
with get_conn() as conn:
|
|
conn.execute("DELETE FROM open_data_event WHERE source = ?", (source_name,))
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO open_data_event (
|
|
id, event_type, lat, lon, start_datetime, end_datetime,
|
|
risk_modifier, source, imported_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
db_rows,
|
|
)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(source_name) DO UPDATE SET
|
|
imported_at = excluded.imported_at,
|
|
record_count = excluded.record_count,
|
|
notes = excluded.notes
|
|
""",
|
|
(source_name, imported_at, len(db_rows), notes),
|
|
)
|
|
|
|
return {"rows": len(db_rows), "source_name": source_name}
|
|
|
|
|
|
def import_from_source(source_cfg: dict, config_dir: Path) -> dict:
|
|
source_name = source_cfg.get("source_name") or source_cfg.get("id") or "open_data_source"
|
|
location = source_cfg.get("url") or source_cfg.get("file")
|
|
if not location:
|
|
raise ValueError(f"source '{source_name}' missing url/file")
|
|
|
|
source_format = (source_cfg.get("format") or "csv").lower()
|
|
field_map = source_cfg.get("field_map") or {}
|
|
defaults = source_cfg.get("default_values") or {}
|
|
event_type_map = {str(k).lower(): str(v).lower() for k, v in (source_cfg.get("event_type_map") or {}).items()}
|
|
json_path = source_cfg.get("json_path")
|
|
date_range_cfg = source_cfg.get("date_range")
|
|
coord_crs = str(source_cfg.get("coord_crs", "")).upper().strip()
|
|
|
|
text = _read_text(location, config_dir)
|
|
|
|
records: list[dict]
|
|
if source_format == "csv":
|
|
reader = csv.DictReader(text.splitlines())
|
|
records = [dict(row) for row in reader]
|
|
elif source_format == "json":
|
|
payload = json.loads(text)
|
|
records = _extract_json_records(payload, json_path)
|
|
else:
|
|
raise ValueError(f"unsupported format: {source_format}")
|
|
|
|
rows = []
|
|
stats = {
|
|
"input_records": len(records),
|
|
"accepted": 0,
|
|
"rejected_invalid_event_type": 0,
|
|
"rejected_parse_error": 0,
|
|
"rejected_missing_datetime": 0,
|
|
"rejected_out_of_bounds": 0,
|
|
"rejected_invalid_window": 0,
|
|
}
|
|
for record in records:
|
|
lat_key = field_map.get("lat", "lat")
|
|
lon_key = field_map.get("lon", "lon")
|
|
start_key = field_map.get("start_datetime", "start_datetime")
|
|
end_key = field_map.get("end_datetime", "end_datetime")
|
|
risk_key = field_map.get("risk_modifier", "risk_modifier")
|
|
event_type_key = field_map.get("event_type", "event_type")
|
|
external_id_key = field_map.get("external_id")
|
|
|
|
raw_event_type = _as_text(_get_by_path(record, event_type_key))
|
|
if not raw_event_type:
|
|
raw_event_type = _as_text(defaults.get("event_type", ""))
|
|
event_type = _map_event_type(raw_event_type, event_type_map, defaults.get("event_type"))
|
|
if not event_type:
|
|
stats["rejected_invalid_event_type"] += 1
|
|
continue
|
|
|
|
try:
|
|
lat_raw = _get_by_path(record, lat_key)
|
|
lon_raw = _get_by_path(record, lon_key)
|
|
lat = float(_as_text(lat_raw) or _as_text(defaults.get("lat", "")))
|
|
lon = float(_as_text(lon_raw) or _as_text(defaults.get("lon", "")))
|
|
if coord_crs == "EPSG:25832":
|
|
lat, lon = _utm_epsg25832_to_wgs84(lon, lat)
|
|
|
|
start_dt = _as_text(_get_by_path(record, start_key)) or _as_text(defaults.get("start_datetime", ""))
|
|
end_dt = _as_text(_get_by_path(record, end_key)) or _as_text(defaults.get("end_datetime", ""))
|
|
|
|
if date_range_cfg and (not start_dt or not end_dt):
|
|
range_field = _as_text(_get_by_path(record, date_range_cfg.get("field", "")))
|
|
separator = date_range_cfg.get("separator", " bis ")
|
|
date_fmt = date_range_cfg.get("input_date_format", "%d-%m-%Y")
|
|
if separator in range_field:
|
|
start_raw, end_raw = [part.strip() for part in range_field.split(separator, 1)]
|
|
start_dt = dt.datetime.strptime(start_raw, date_fmt).replace(tzinfo=dt.timezone.utc).isoformat()
|
|
end_dt = dt.datetime.strptime(end_raw, date_fmt).replace(tzinfo=dt.timezone.utc).isoformat()
|
|
|
|
risk_raw = _get_by_path(record, risk_key)
|
|
risk_modifier = int(_as_text(risk_raw) or _as_text(defaults.get("risk_modifier", "0")))
|
|
except Exception:
|
|
stats["rejected_parse_error"] += 1
|
|
continue
|
|
|
|
if not start_dt or not end_dt:
|
|
stats["rejected_missing_datetime"] += 1
|
|
continue
|
|
|
|
if not (
|
|
DE_BOUNDS["lat_min"] <= lat <= DE_BOUNDS["lat_max"]
|
|
and DE_BOUNDS["lon_min"] <= lon <= DE_BOUNDS["lon_max"]
|
|
):
|
|
stats["rejected_out_of_bounds"] += 1
|
|
continue
|
|
|
|
try:
|
|
start_iso = normalize_iso(start_dt)
|
|
end_iso = normalize_iso(end_dt)
|
|
except Exception:
|
|
stats["rejected_parse_error"] += 1
|
|
continue
|
|
|
|
if end_iso <= start_iso:
|
|
stats["rejected_invalid_window"] += 1
|
|
continue
|
|
|
|
external_id = None
|
|
if external_id_key:
|
|
value = _get_by_path(record, external_id_key)
|
|
external_id = str(value).strip() if value is not None else None
|
|
|
|
payload = {
|
|
"event_type": event_type,
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"start_datetime": start_iso,
|
|
"end_datetime": end_iso,
|
|
"risk_modifier": risk_modifier,
|
|
}
|
|
payload["id"] = _stable_id(source_name, external_id, payload)
|
|
rows.append(payload)
|
|
stats["accepted"] += 1
|
|
|
|
note = f"connector={source_cfg.get('id', source_name)} location={location} format={source_format}"
|
|
result = import_event_rows(rows, source_name=source_name, notes=note)
|
|
result["connector_id"] = source_cfg.get("id", source_name)
|
|
result["stats"] = stats
|
|
return result
|
|
|
|
|
|
def prune_sources_not_in_config(config_source_names: set[str], protected_sources: set[str] | None = None) -> dict:
|
|
protected = protected_sources or {"osm_overpass"}
|
|
with get_conn() as conn:
|
|
rows = conn.execute("SELECT DISTINCT source FROM open_data_event").fetchall()
|
|
existing_sources = {row["source"] for row in rows}
|
|
state_rows = conn.execute("SELECT source_name FROM data_source_state").fetchall()
|
|
existing_state_sources = {row["source_name"] for row in state_rows}
|
|
|
|
to_prune = sorted((existing_sources - config_source_names) - protected)
|
|
state_to_mark = sorted((existing_state_sources - config_source_names) - protected)
|
|
pruned_rows = 0
|
|
ts = now_iso()
|
|
for source_name in to_prune:
|
|
count = conn.execute("SELECT COUNT(*) AS c FROM open_data_event WHERE source = ?", (source_name,)).fetchone()["c"]
|
|
pruned_rows += int(count)
|
|
conn.execute("DELETE FROM open_data_event WHERE source = ?", (source_name,))
|
|
for source_name in state_to_mark:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(source_name) DO UPDATE SET
|
|
imported_at = excluded.imported_at,
|
|
record_count = excluded.record_count,
|
|
notes = excluded.notes
|
|
""",
|
|
(source_name, ts, 0, "pruned_not_in_config"),
|
|
)
|
|
|
|
return {"sources": sorted(set(to_prune + state_to_mark)), "rows": pruned_rows}
|
|
|
|
|
|
def import_from_config(config_path: Path, prune_legacy: bool = False) -> dict:
|
|
cfg = json.loads(config_path.read_text(encoding="utf-8"))
|
|
sources = cfg.get("sources")
|
|
if not isinstance(sources, list):
|
|
raise ValueError("config requires 'sources' list")
|
|
|
|
imported = []
|
|
skipped = []
|
|
disabled_source_names = []
|
|
all_source_names = set()
|
|
for source in sources:
|
|
if not isinstance(source, dict):
|
|
continue
|
|
source_name = source.get("source_name") or source.get("id") or "open_data_source"
|
|
all_source_names.add(source_name)
|
|
if not source.get("enabled", False):
|
|
skipped.append(source.get("id", "unknown"))
|
|
disabled_source_names.append(source_name)
|
|
continue
|
|
try:
|
|
imported.append(import_from_source(source, config_path.parent))
|
|
except Exception as exc:
|
|
imported.append(
|
|
{
|
|
"connector_id": source.get("id", source_name),
|
|
"source_name": source_name,
|
|
"rows": 0,
|
|
"error": str(exc),
|
|
}
|
|
)
|
|
|
|
if disabled_source_names:
|
|
ts = now_iso()
|
|
with get_conn() as conn:
|
|
for source_name in disabled_source_names:
|
|
conn.execute("DELETE FROM open_data_event WHERE source = ?", (source_name,))
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO data_source_state (source_name, imported_at, record_count, notes)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(source_name) DO UPDATE SET
|
|
imported_at = excluded.imported_at,
|
|
record_count = excluded.record_count,
|
|
notes = excluded.notes
|
|
""",
|
|
(source_name, ts, 0, "disabled in config"),
|
|
)
|
|
|
|
pruned = {"sources": [], "rows": 0}
|
|
if prune_legacy:
|
|
pruned = prune_sources_not_in_config(all_source_names)
|
|
|
|
return {
|
|
"imported": imported,
|
|
"skipped": skipped,
|
|
"pruned": pruned,
|
|
"count": len(imported),
|
|
}
|