#!/usr/bin/env python3 """ import_all.py — Kennzeichen-Daten + Backup nach PocketBase importieren Verwendung: pip3 install requests python3 import_all.py \ --pb-url http://localhost:4444 \ --pb-email admin@example.com \ --pb-password passwort \ --repo-path ~/plates \ --db-path ~/data.db \ --backup ~/KennzeichensammlerBackupX-18_05.2026 """ import argparse, csv, os, sqlite3, zlib, re, requests from datetime import datetime # ── Länder-Mapping Repo-Ordner → data.db Tabellenname ───────────────────────── COUNTRY_MAP = { "germany": "deutschland", "austria": "austria", "switzerland": "schweiz", "luxembourg": "luxemburg", "poland": "polen", "norway": "norwegen", "uk": "uk", "italy": "italien", "france": "frankreich", "greece": "greece", "slovakia": "slowakei", "croatia": "kroatien", "ukraine": "ukraine", "serbia": "serbien", "russia": "russland", "belarus": "belarus", "montenegro": "montenegro", "northmacedonia": "nmk", "ireland": "irland", "bulgaria": "bulgarien", "romania": "romania", "moldova": "moldawien", "czechia": "tschechien", "turkey": "turkey", "kosovo": "kosovo", "slovenia": "slowenien", } # Umgekehrtes Mapping für Backup-Import (app-Ländername → Repo-Ordnername) APP_TO_FOLDER = {v: k for k, v in COUNTRY_MAP.items()} APP_TO_FOLDER.update({ "global": "global", "global2": "global", "uk2": "uk", "laender": "global", }) # CSV-Spalten-Mapping (verschiedene mögliche Headernamen → interner Name) COL_MAP = { "code": ["code", "Kennzeichenkürzel", "kürzel", "kennzeichen"], "name": ["name", "StadtOderKreis", "ort", "stadt"], "derivation": ["derivation", "Herleitung", "herleitung"], "region": ["region", "Bundesland", "bundesland", "kanton", "provinz"], "note": ["note", "Bemerkung", "bemerkung", "hinweis"], "active": ["active", "aktiv"], } def map_row(row: dict) -> dict: result = {} for internal, candidates in COL_MAP.items(): for c in candidates: if c in row: result[internal] = row[c].strip() break if internal not in result: result[internal] = "" # active default true wenn leer if result["active"] == "": result["active"] = True else: result["active"] = result["active"].lower() not in ("false", "0", "nein", "") return result class PB: def __init__(self, url): self.url = url.rstrip("/"); self.token = None def login(self, email, password): r = requests.post(f"{self.url}/api/admins/auth-with-password", json={"identity": email, "password": password}) r.raise_for_status() self.token = r.json()["token"] print(f"✓ Eingeloggt als {email}") def h(self): return {"Authorization": self.token, "Content-Type": "application/json"} def collection_exists(self, name): return requests.get(f"{self.url}/api/collections/{name}", headers=self.h()).status_code == 200 def create_collection(self, schema): name = schema["name"] if self.collection_exists(name): print(f" '{name}' existiert bereits") return r = requests.post(f"{self.url}/api/collections", headers=self.h(), json=schema) r.raise_for_status() print(f" ✓ '{name}' erstellt") def find_record(self, collection, filter_str): r = requests.get(f"{self.url}/api/collections/{collection}/records", headers=self.h(), params={"filter": filter_str, "perPage": 1}) if r.status_code != 200: return None items = r.json().get("items", []) return items[0] if items else None def create(self, collection, data): r = requests.post(f"{self.url}/api/collections/{collection}/records", headers=self.h(), json=data) if r.status_code not in (200, 204): print(f" Fehler: {r.text[:100]}") return r def upsert_kz(self, data): existing = self.find_record("kennzeichen", f'code="{data["code"]}" && country="{data["country"]}"') if existing: r = requests.patch(f"{self.url}/api/collections/kennzeichen/records/{existing['id']}", headers=self.h(), json=data) else: r = requests.post(f"{self.url}/api/collections/kennzeichen/records", headers=self.h(), json=data) if r.status_code not in (200, 204): print(f" Fehler bei {data.get('code')}: {r.text[:80]}") SCHEMAS = [ { "name": "kennzeichen", "type": "base", "schema": [ {"name": "code", "type": "text", "required": True}, {"name": "name", "type": "text", "required": True}, {"name": "derivation", "type": "text", "required": False}, {"name": "region", "type": "text", "required": False}, {"name": "note", "type": "text", "required": False}, {"name": "active", "type": "bool", "required": True}, {"name": "country", "type": "text", "required": True}, {"name": "app_id", "type": "number", "required": False}, {"name": "lat", "type": "number", "required": False}, {"name": "lon", "type": "number", "required": False}, {"name": "population", "type": "number", "required": False}, {"name": "points", "type": "number", "required": False}, {"name": "alt_code", "type": "text", "required": False}, ], }, { "name": "diplomatenkennzeichen", "type": "base", "schema": [ {"name": "code", "type": "text", "required": True}, {"name": "country", "type": "text", "required": True}, {"name": "organisation", "type": "text", "required": False}, {"name": "type", "type": "text", "required": True}, {"name": "city", "type": "text", "required": False}, {"name": "note", "type": "text", "required": False}, {"name": "base_country", "type": "text", "required": False}, ], }, { "name": "gesehen", "type": "base", "schema": [ {"name": "kennzeichen_code", "type": "text", "required": True}, {"name": "kennzeichen_name", "type": "text", "required": False}, {"name": "land", "type": "text", "required": True}, {"name": "datum", "type": "text", "required": False}, ], }, { "name": "blog_posts", "type": "base", "schema": [ {"name": "titel", "type": "text", "required": True}, {"name": "slug", "type": "text", "required": True}, {"name": "inhalt", "type": "text", "required": False}, {"name": "datum", "type": "date", "required": False}, {"name": "tags", "type": "json", "required": False}, ], }, ] def load_geodata(db_path): conn = sqlite3.connect(db_path) cur = conn.cursor() geo = {} cur.execute("SELECT name FROM sqlite_master WHERE type='table'") for (tbl,) in cur.fetchall(): if tbl == "laender": continue try: cur.execute(f"SELECT id, kennzeichen, map1, map2, anzahl, punkte, alt FROM {tbl}") for row in cur.fetchall(): app_id, kz, lat, lon, pop, pts, alt = row if kz: geo[(tbl, str(app_id))] = { "app_id": app_id, "lat": lat, "lon": lon, "population": pop, "points": pts, "alt_code": alt or "", } except: pass conn.close() print(f"✓ {len(geo)} Geo-Einträge aus data.db geladen") return geo def import_kennzeichen(pb, repo_path, geo): total = 0 for folder, db_table in COUNTRY_MAP.items(): csv_path = os.path.join(repo_path, folder, "kennzeichen.csv") if not os.path.exists(csv_path): continue count = 0 with open(csv_path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: mapped = map_row(row) code = mapped.get("code", "").strip() if not code: continue record = { "code": code, "name": mapped["name"], "derivation": mapped["derivation"], "region": mapped["region"], "note": mapped["note"], "active": mapped["active"], "country": folder, } # app_id aus data.db per Kennzeichen-Code suchen # Suche in der passenden Tabelle nach code geo_entry = None for (tbl, aid), gdata in geo.items(): if tbl == db_table: # Wir suchen über kennzeichen-Code — brauchen Lookup-Tabelle pass # Einfacherer Ansatz: direkt in data.db nach Code suchen pb.upsert_kz(record) count += 1 print(f" ✓ {folder}: {count} Kennzeichen") total += count print(f"✓ Gesamt: {total} Kennzeichen") def import_geo_enrichment(pb, db_path): """Reichert bereits importierte Kennzeichen mit Geo-Daten an.""" ref = sqlite3.connect(db_path) rcur = ref.cursor() for folder, db_table in COUNTRY_MAP.items(): try: rcur.execute(f"SELECT id, kennzeichen, map1, map2, anzahl, punkte, alt FROM {db_table}") rows = rcur.fetchall() except: continue for app_id, kz, lat, lon, pop, pts, alt in rows: if not kz: continue existing = pb.find_record("kennzeichen", f'code="{kz}" && country="{folder}"') if existing: requests.patch( f"{pb.url}/api/collections/kennzeichen/records/{existing['id']}", headers=pb.h(), json={"app_id": app_id, "lat": lat, "lon": lon, "population": pop, "points": pts, "alt_code": alt or ""}, ) ref.close() print("✓ Geo-Daten angereichert") def normalize_diplo_code(code: str) -> str: """0110 und 0-110 → 0-110 (einheitlich mit Bindestrich)""" code = code.strip() # Wenn es mit 0 anfängt und keinen Bindestrich hat: 0110 → 0-110 m = re.match(r'^0(\d+)$', code) if m: return f"0-{m.group(1)}" return code def import_diplomatenkennzeichen(pb, repo_path): total = 0 for folder in os.listdir(repo_path): csv_path = os.path.join(repo_path, folder, "diplomatenkennzeichen.csv") if not os.path.exists(csv_path): continue count = 0 with open(csv_path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: # Flexible Spalten code = (row.get("code") or row.get("Nummernkreis") or row.get("Kennzeichen") or "").strip() if not code: continue code = normalize_diplo_code(code) country = (row.get("country") or row.get("Land") or row.get("Entsendestaat") or "").strip() org = (row.get("organisation") or row.get("Organisation") or "").strip() typ = (row.get("type") or row.get("Typ") or "embassy").strip().lower() city = (row.get("city") or row.get("Stadt") or "").strip() note = (row.get("note") or row.get("Bemerkung") or "").strip() if typ not in ("embassy", "consulate", "io"): typ = "embassy" record = {"code": code, "country": country, "organisation": org, "type": typ, "city": city, "note": note, "base_country": folder} # Upsert existing = pb.find_record("diplomatenkennzeichen", f'code="{code}" && base_country="{folder}"') if existing: requests.patch(f"{pb.url}/api/collections/diplomatenkennzeichen/records/{existing['id']}", headers=pb.h(), json=record) else: pb.create("diplomatenkennzeichen", record) count += 1 print(f" ✓ {folder}: {count} Diplomatenkennzeichen") total += count print(f"✓ Gesamt: {total} Diplomatenkennzeichen") def import_backup(pb, backup_path, db_path): """Liest das App-Backup und importiert gesehene Kennzeichen.""" # Backup entpacken with open(backup_path, "rb") as f: raw = f.read() decompressed = zlib.decompress(raw) # 'KX1' Prefix entfernen db_bytes = decompressed[3:] tmp = "/tmp/backup_import.db" with open(tmp, "wb") as f: f.write(db_bytes) backup_conn = sqlite3.connect(tmp) ref_conn = sqlite3.connect(db_path) bcur = backup_conn.cursor() rcur = ref_conn.cursor() bcur.execute("SELECT kennid, datum, land FROM gesehen ORDER BY datum") rows = bcur.fetchall() print(f" Backup: {len(rows)} Einträge gefunden") imported = 0 skipped = 0 for kennid, datum, land in rows: # Referenz-Tabelle finden ref_table = land if land not in APP_TO_FOLDER else None # Direkt den app-Tabellennamen verwenden try: rcur.execute(f"SELECT kennzeichen FROM {land} WHERE id=?", (kennid,)) result = rcur.fetchone() except: skipped += 1 continue if not result: skipped += 1 continue kz_code = result[0] folder = APP_TO_FOLDER.get(land, land) # Datum normalisieren (DD.MM.YYYY → YYYY-MM-DD) try: if "." in datum: parts = datum.split(".") datum_iso = f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}" else: datum_iso = datum except: datum_iso = datum # Diplomatenkennzeichen normalisieren if land in ("global", "global2", "laender"): kz_code = normalize_diplo_code(kz_code) # Duplikat prüfen existing = pb.find_record("gesehen", f'kennzeichen_code="{kz_code}" && land="{folder}"') if existing: skipped += 1 continue # Kennzeichen-Namen aus Referenz holen try: rcur.execute(f"SELECT ort FROM {land} WHERE id=?", (kennid,)) name_result = rcur.fetchone() kz_name = name_result[0] if name_result else "" except: kz_name = "" pb.create("gesehen", { "kennzeichen_code": kz_code, "kennzeichen_name": kz_name, "land": folder, "datum": datum_iso, }) imported += 1 backup_conn.close() ref_conn.close() print(f" ✓ {imported} Einträge importiert, {skipped} übersprungen") def main(): parser = argparse.ArgumentParser() parser.add_argument("--pb-url", default="http://localhost:4444") parser.add_argument("--pb-email", required=True) parser.add_argument("--pb-password", required=True) parser.add_argument("--repo-path", required=True) parser.add_argument("--db-path", required=True) parser.add_argument("--backup", help="Pfad zur Backup-Datei") parser.add_argument("--skip-schema", action="store_true") parser.add_argument("--skip-kz", action="store_true") parser.add_argument("--only-backup", action="store_true") args = parser.parse_args() pb = PB(args.pb_url) pb.login(args.pb_email, args.pb_password) if not args.skip_schema and not args.only_backup: print("\n→ Collections anlegen...") for schema in SCHEMAS: pb.create_collection(schema) if not args.skip_kz and not args.only_backup: print("\n→ Kennzeichen importieren...") import_kennzeichen(pb, args.repo_path, {}) print("\n→ Geo-Daten anreichern...") import_geo_enrichment(pb, args.db_path) print("\n→ Diplomatenkennzeichen importieren...") import_diplomatenkennzeichen(pb, args.repo_path) if args.backup: print("\n→ Backup importieren...") import_backup(pb, args.backup, args.db_path) print("\n✓ Fertig!") if __name__ == "__main__": main()