kennzeichen-seite/scripts/import1.py
2026-05-20 20:47:07 +02:00

455 lines
17 KiB
Python

#!/usr/bin/env python3
"""
import_all.py — Kennzeichen-Daten + Backup nach PocketBase importieren
Verwendung:
pip3 install requests
python3 import_all.py \
--pb-url http://localhost:4444 \
--pb-email admin@example.com \
--pb-password passwort \
--repo-path ~/plates \
--db-path ~/data.db \
--backup ~/KennzeichensammlerBackupX-18_05.2026
"""
import argparse, csv, os, sqlite3, zlib, re, requests
from datetime import datetime
# ── Länder-Mapping Repo-Ordner → data.db Tabellenname ─────────────────────────
COUNTRY_MAP = {
"germany": "deutschland",
"austria": "austria",
"switzerland": "schweiz",
"luxembourg": "luxemburg",
"poland": "polen",
"norway": "norwegen",
"uk": "uk",
"italy": "italien",
"france": "frankreich",
"greece": "greece",
"slovakia": "slowakei",
"croatia": "kroatien",
"ukraine": "ukraine",
"serbia": "serbien",
"russia": "russland",
"belarus": "belarus",
"montenegro": "montenegro",
"northmacedonia": "nmk",
"ireland": "irland",
"bulgaria": "bulgarien",
"romania": "romania",
"moldova": "moldawien",
"czechia": "tschechien",
"turkey": "turkey",
"kosovo": "kosovo",
"slovenia": "slowenien",
}
# Umgekehrtes Mapping für Backup-Import (app-Ländername → Repo-Ordnername)
APP_TO_FOLDER = {v: k for k, v in COUNTRY_MAP.items()}
APP_TO_FOLDER.update({
"global": "global",
"global2": "global",
"uk2": "uk",
"laender": "global",
})
# CSV-Spalten-Mapping (verschiedene mögliche Headernamen → interner Name)
COL_MAP = {
"code": ["code", "Kennzeichenkürzel", "kürzel", "kennzeichen"],
"name": ["name", "StadtOderKreis", "ort", "stadt"],
"derivation": ["derivation", "Herleitung", "herleitung"],
"region": ["region", "Bundesland", "bundesland", "kanton", "provinz"],
"note": ["note", "Bemerkung", "bemerkung", "hinweis"],
"active": ["active", "aktiv"],
}
def map_row(row: dict) -> dict:
result = {}
for internal, candidates in COL_MAP.items():
for c in candidates:
if c in row:
result[internal] = row[c].strip()
break
if internal not in result:
result[internal] = ""
# active default true wenn leer
if result["active"] == "":
result["active"] = True
else:
result["active"] = result["active"].lower() not in ("false", "0", "nein", "")
return result
class PB:
def __init__(self, url): self.url = url.rstrip("/"); self.token = None
def login(self, email, password):
r = requests.post(f"{self.url}/api/admins/auth-with-password",
json={"identity": email, "password": password})
r.raise_for_status()
self.token = r.json()["token"]
print(f"✓ Eingeloggt als {email}")
def h(self): return {"Authorization": self.token, "Content-Type": "application/json"}
def collection_exists(self, name):
return requests.get(f"{self.url}/api/collections/{name}", headers=self.h()).status_code == 200
def create_collection(self, schema):
name = schema["name"]
if self.collection_exists(name):
print(f" '{name}' existiert bereits")
return
r = requests.post(f"{self.url}/api/collections", headers=self.h(), json=schema)
r.raise_for_status()
print(f"'{name}' erstellt")
def find_record(self, collection, filter_str):
r = requests.get(f"{self.url}/api/collections/{collection}/records",
headers=self.h(), params={"filter": filter_str, "perPage": 1})
if r.status_code != 200: return None
items = r.json().get("items", [])
return items[0] if items else None
def create(self, collection, data):
r = requests.post(f"{self.url}/api/collections/{collection}/records",
headers=self.h(), json=data)
if r.status_code not in (200, 204):
print(f" Fehler: {r.text[:100]}")
return r
def upsert_kz(self, data):
existing = self.find_record("kennzeichen",
f'code="{data["code"]}" && country="{data["country"]}"')
if existing:
r = requests.patch(f"{self.url}/api/collections/kennzeichen/records/{existing['id']}",
headers=self.h(), json=data)
else:
r = requests.post(f"{self.url}/api/collections/kennzeichen/records",
headers=self.h(), json=data)
if r.status_code not in (200, 204):
print(f" Fehler bei {data.get('code')}: {r.text[:80]}")
SCHEMAS = [
{
"name": "kennzeichen", "type": "base",
"schema": [
{"name": "code", "type": "text", "required": True},
{"name": "name", "type": "text", "required": True},
{"name": "derivation", "type": "text", "required": False},
{"name": "region", "type": "text", "required": False},
{"name": "note", "type": "text", "required": False},
{"name": "active", "type": "bool", "required": True},
{"name": "country", "type": "text", "required": True},
{"name": "app_id", "type": "number", "required": False},
{"name": "lat", "type": "number", "required": False},
{"name": "lon", "type": "number", "required": False},
{"name": "population", "type": "number", "required": False},
{"name": "points", "type": "number", "required": False},
{"name": "alt_code", "type": "text", "required": False},
],
},
{
"name": "diplomatenkennzeichen", "type": "base",
"schema": [
{"name": "code", "type": "text", "required": True},
{"name": "country", "type": "text", "required": True},
{"name": "organisation", "type": "text", "required": False},
{"name": "type", "type": "text", "required": True},
{"name": "city", "type": "text", "required": False},
{"name": "note", "type": "text", "required": False},
{"name": "base_country", "type": "text", "required": False},
],
},
{
"name": "gesehen", "type": "base",
"schema": [
{"name": "kennzeichen_code", "type": "text", "required": True},
{"name": "kennzeichen_name", "type": "text", "required": False},
{"name": "land", "type": "text", "required": True},
{"name": "datum", "type": "text", "required": False},
],
},
{
"name": "blog_posts", "type": "base",
"schema": [
{"name": "titel", "type": "text", "required": True},
{"name": "slug", "type": "text", "required": True},
{"name": "inhalt", "type": "text", "required": False},
{"name": "datum", "type": "date", "required": False},
{"name": "tags", "type": "json", "required": False},
],
},
]
def load_geodata(db_path):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
geo = {}
cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
for (tbl,) in cur.fetchall():
if tbl == "laender": continue
try:
cur.execute(f"SELECT id, kennzeichen, map1, map2, anzahl, punkte, alt FROM {tbl}")
for row in cur.fetchall():
app_id, kz, lat, lon, pop, pts, alt = row
if kz:
geo[(tbl, str(app_id))] = {
"app_id": app_id, "lat": lat, "lon": lon,
"population": pop, "points": pts, "alt_code": alt or "",
}
except: pass
conn.close()
print(f"{len(geo)} Geo-Einträge aus data.db geladen")
return geo
def import_kennzeichen(pb, repo_path, geo):
total = 0
for folder, db_table in COUNTRY_MAP.items():
csv_path = os.path.join(repo_path, folder, "kennzeichen.csv")
if not os.path.exists(csv_path):
continue
count = 0
with open(csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
mapped = map_row(row)
code = mapped.get("code", "").strip()
if not code: continue
record = {
"code": code,
"name": mapped["name"],
"derivation": mapped["derivation"],
"region": mapped["region"],
"note": mapped["note"],
"active": mapped["active"],
"country": folder,
}
# app_id aus data.db per Kennzeichen-Code suchen
# Suche in der passenden Tabelle nach code
geo_entry = None
for (tbl, aid), gdata in geo.items():
if tbl == db_table:
# Wir suchen über kennzeichen-Code — brauchen Lookup-Tabelle
pass
# Einfacherer Ansatz: direkt in data.db nach Code suchen
pb.upsert_kz(record)
count += 1
print(f"{folder}: {count} Kennzeichen")
total += count
print(f"✓ Gesamt: {total} Kennzeichen")
def import_geo_enrichment(pb, db_path):
"""Reichert bereits importierte Kennzeichen mit Geo-Daten an."""
ref = sqlite3.connect(db_path)
rcur = ref.cursor()
for folder, db_table in COUNTRY_MAP.items():
try:
rcur.execute(f"SELECT id, kennzeichen, map1, map2, anzahl, punkte, alt FROM {db_table}")
rows = rcur.fetchall()
except: continue
for app_id, kz, lat, lon, pop, pts, alt in rows:
if not kz: continue
existing = pb.find_record("kennzeichen",
f'code="{kz}" && country="{folder}"')
if existing:
requests.patch(
f"{pb.url}/api/collections/kennzeichen/records/{existing['id']}",
headers=pb.h(),
json={"app_id": app_id, "lat": lat, "lon": lon,
"population": pop, "points": pts, "alt_code": alt or ""},
)
ref.close()
print("✓ Geo-Daten angereichert")
def normalize_diplo_code(code: str) -> str:
"""0110 und 0-110 → 0-110 (einheitlich mit Bindestrich)"""
code = code.strip()
# Wenn es mit 0 anfängt und keinen Bindestrich hat: 0110 → 0-110
m = re.match(r'^0(\d+)$', code)
if m:
return f"0-{m.group(1)}"
return code
def import_diplomatenkennzeichen(pb, repo_path):
total = 0
for folder in os.listdir(repo_path):
csv_path = os.path.join(repo_path, folder, "diplomatenkennzeichen.csv")
if not os.path.exists(csv_path): continue
count = 0
with open(csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
# Flexible Spalten
code = (row.get("code") or row.get("Nummernkreis") or row.get("Kennzeichen") or "").strip()
if not code: continue
code = normalize_diplo_code(code)
country = (row.get("country") or row.get("Land") or row.get("Entsendestaat") or "").strip()
org = (row.get("organisation") or row.get("Organisation") or "").strip()
typ = (row.get("type") or row.get("Typ") or "embassy").strip().lower()
city = (row.get("city") or row.get("Stadt") or "").strip()
note = (row.get("note") or row.get("Bemerkung") or "").strip()
if typ not in ("embassy", "consulate", "io"):
typ = "embassy"
record = {"code": code, "country": country, "organisation": org,
"type": typ, "city": city, "note": note, "base_country": folder}
# Upsert
existing = pb.find_record("diplomatenkennzeichen",
f'code="{code}" && base_country="{folder}"')
if existing:
requests.patch(f"{pb.url}/api/collections/diplomatenkennzeichen/records/{existing['id']}",
headers=pb.h(), json=record)
else:
pb.create("diplomatenkennzeichen", record)
count += 1
print(f"{folder}: {count} Diplomatenkennzeichen")
total += count
print(f"✓ Gesamt: {total} Diplomatenkennzeichen")
def import_backup(pb, backup_path, db_path):
"""Liest das App-Backup und importiert gesehene Kennzeichen."""
# Backup entpacken
with open(backup_path, "rb") as f:
raw = f.read()
decompressed = zlib.decompress(raw)
# 'KX1' Prefix entfernen
db_bytes = decompressed[3:]
tmp = "/tmp/backup_import.db"
with open(tmp, "wb") as f:
f.write(db_bytes)
backup_conn = sqlite3.connect(tmp)
ref_conn = sqlite3.connect(db_path)
bcur = backup_conn.cursor()
rcur = ref_conn.cursor()
bcur.execute("SELECT kennid, datum, land FROM gesehen ORDER BY datum")
rows = bcur.fetchall()
print(f" Backup: {len(rows)} Einträge gefunden")
imported = 0
skipped = 0
for kennid, datum, land in rows:
# Referenz-Tabelle finden
ref_table = land if land not in APP_TO_FOLDER else None
# Direkt den app-Tabellennamen verwenden
try:
rcur.execute(f"SELECT kennzeichen FROM {land} WHERE id=?", (kennid,))
result = rcur.fetchone()
except:
skipped += 1
continue
if not result:
skipped += 1
continue
kz_code = result[0]
folder = APP_TO_FOLDER.get(land, land)
# Datum normalisieren (DD.MM.YYYY → YYYY-MM-DD)
try:
if "." in datum:
parts = datum.split(".")
datum_iso = f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}"
else:
datum_iso = datum
except:
datum_iso = datum
# Diplomatenkennzeichen normalisieren
if land in ("global", "global2", "laender"):
kz_code = normalize_diplo_code(kz_code)
# Duplikat prüfen
existing = pb.find_record("gesehen",
f'kennzeichen_code="{kz_code}" && land="{folder}"')
if existing:
skipped += 1
continue
# Kennzeichen-Namen aus Referenz holen
try:
rcur.execute(f"SELECT ort FROM {land} WHERE id=?", (kennid,))
name_result = rcur.fetchone()
kz_name = name_result[0] if name_result else ""
except:
kz_name = ""
pb.create("gesehen", {
"kennzeichen_code": kz_code,
"kennzeichen_name": kz_name,
"land": folder,
"datum": datum_iso,
})
imported += 1
backup_conn.close()
ref_conn.close()
print(f"{imported} Einträge importiert, {skipped} übersprungen")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--pb-url", default="http://localhost:4444")
parser.add_argument("--pb-email", required=True)
parser.add_argument("--pb-password", required=True)
parser.add_argument("--repo-path", required=True)
parser.add_argument("--db-path", required=True)
parser.add_argument("--backup", help="Pfad zur Backup-Datei")
parser.add_argument("--skip-schema", action="store_true")
parser.add_argument("--skip-kz", action="store_true")
parser.add_argument("--only-backup", action="store_true")
args = parser.parse_args()
pb = PB(args.pb_url)
pb.login(args.pb_email, args.pb_password)
if not args.skip_schema and not args.only_backup:
print("\n→ Collections anlegen...")
for schema in SCHEMAS:
pb.create_collection(schema)
if not args.skip_kz and not args.only_backup:
print("\n→ Kennzeichen importieren...")
import_kennzeichen(pb, args.repo_path, {})
print("\n→ Geo-Daten anreichern...")
import_geo_enrichment(pb, args.db_path)
print("\n→ Diplomatenkennzeichen importieren...")
import_diplomatenkennzeichen(pb, args.repo_path)
if args.backup:
print("\n→ Backup importieren...")
import_backup(pb, args.backup, args.db_path)
print("\n✓ Fertig!")
if __name__ == "__main__":
main()