455 lines
17 KiB
Python
455 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
import_all.py — Kennzeichen-Daten + Backup nach PocketBase importieren
|
|
|
|
Verwendung:
|
|
pip3 install requests
|
|
|
|
python3 import_all.py \
|
|
--pb-url http://localhost:4444 \
|
|
--pb-email admin@example.com \
|
|
--pb-password passwort \
|
|
--repo-path ~/plates \
|
|
--db-path ~/data.db \
|
|
--backup ~/KennzeichensammlerBackupX-18_05.2026
|
|
"""
|
|
|
|
import argparse, csv, os, sqlite3, zlib, re, requests
|
|
from datetime import datetime
|
|
|
|
# ── Länder-Mapping Repo-Ordner → data.db Tabellenname ─────────────────────────
|
|
COUNTRY_MAP = {
|
|
"germany": "deutschland",
|
|
"austria": "austria",
|
|
"switzerland": "schweiz",
|
|
"luxembourg": "luxemburg",
|
|
"poland": "polen",
|
|
"norway": "norwegen",
|
|
"uk": "uk",
|
|
"italy": "italien",
|
|
"france": "frankreich",
|
|
"greece": "greece",
|
|
"slovakia": "slowakei",
|
|
"croatia": "kroatien",
|
|
"ukraine": "ukraine",
|
|
"serbia": "serbien",
|
|
"russia": "russland",
|
|
"belarus": "belarus",
|
|
"montenegro": "montenegro",
|
|
"northmacedonia": "nmk",
|
|
"ireland": "irland",
|
|
"bulgaria": "bulgarien",
|
|
"romania": "romania",
|
|
"moldova": "moldawien",
|
|
"czechia": "tschechien",
|
|
"turkey": "turkey",
|
|
"kosovo": "kosovo",
|
|
"slovenia": "slowenien",
|
|
}
|
|
|
|
# Umgekehrtes Mapping für Backup-Import (app-Ländername → Repo-Ordnername)
|
|
APP_TO_FOLDER = {v: k for k, v in COUNTRY_MAP.items()}
|
|
APP_TO_FOLDER.update({
|
|
"global": "global",
|
|
"global2": "global",
|
|
"uk2": "uk",
|
|
"laender": "global",
|
|
})
|
|
|
|
# CSV-Spalten-Mapping (verschiedene mögliche Headernamen → interner Name)
|
|
COL_MAP = {
|
|
"code": ["code", "Kennzeichenkürzel", "kürzel", "kennzeichen"],
|
|
"name": ["name", "StadtOderKreis", "ort", "stadt"],
|
|
"derivation": ["derivation", "Herleitung", "herleitung"],
|
|
"region": ["region", "Bundesland", "bundesland", "kanton", "provinz"],
|
|
"note": ["note", "Bemerkung", "bemerkung", "hinweis"],
|
|
"active": ["active", "aktiv"],
|
|
}
|
|
|
|
def map_row(row: dict) -> dict:
|
|
result = {}
|
|
for internal, candidates in COL_MAP.items():
|
|
for c in candidates:
|
|
if c in row:
|
|
result[internal] = row[c].strip()
|
|
break
|
|
if internal not in result:
|
|
result[internal] = ""
|
|
# active default true wenn leer
|
|
if result["active"] == "":
|
|
result["active"] = True
|
|
else:
|
|
result["active"] = result["active"].lower() not in ("false", "0", "nein", "")
|
|
return result
|
|
|
|
|
|
class PB:
|
|
def __init__(self, url): self.url = url.rstrip("/"); self.token = None
|
|
|
|
def login(self, email, password):
|
|
r = requests.post(f"{self.url}/api/admins/auth-with-password",
|
|
json={"identity": email, "password": password})
|
|
r.raise_for_status()
|
|
self.token = r.json()["token"]
|
|
print(f"✓ Eingeloggt als {email}")
|
|
|
|
def h(self): return {"Authorization": self.token, "Content-Type": "application/json"}
|
|
|
|
def collection_exists(self, name):
|
|
return requests.get(f"{self.url}/api/collections/{name}", headers=self.h()).status_code == 200
|
|
|
|
def create_collection(self, schema):
|
|
name = schema["name"]
|
|
if self.collection_exists(name):
|
|
print(f" '{name}' existiert bereits")
|
|
return
|
|
r = requests.post(f"{self.url}/api/collections", headers=self.h(), json=schema)
|
|
r.raise_for_status()
|
|
print(f" ✓ '{name}' erstellt")
|
|
|
|
def find_record(self, collection, filter_str):
|
|
r = requests.get(f"{self.url}/api/collections/{collection}/records",
|
|
headers=self.h(), params={"filter": filter_str, "perPage": 1})
|
|
if r.status_code != 200: return None
|
|
items = r.json().get("items", [])
|
|
return items[0] if items else None
|
|
|
|
def create(self, collection, data):
|
|
r = requests.post(f"{self.url}/api/collections/{collection}/records",
|
|
headers=self.h(), json=data)
|
|
if r.status_code not in (200, 204):
|
|
print(f" Fehler: {r.text[:100]}")
|
|
return r
|
|
|
|
def upsert_kz(self, data):
|
|
existing = self.find_record("kennzeichen",
|
|
f'code="{data["code"]}" && country="{data["country"]}"')
|
|
if existing:
|
|
r = requests.patch(f"{self.url}/api/collections/kennzeichen/records/{existing['id']}",
|
|
headers=self.h(), json=data)
|
|
else:
|
|
r = requests.post(f"{self.url}/api/collections/kennzeichen/records",
|
|
headers=self.h(), json=data)
|
|
if r.status_code not in (200, 204):
|
|
print(f" Fehler bei {data.get('code')}: {r.text[:80]}")
|
|
|
|
|
|
SCHEMAS = [
|
|
{
|
|
"name": "kennzeichen", "type": "base",
|
|
"schema": [
|
|
{"name": "code", "type": "text", "required": True},
|
|
{"name": "name", "type": "text", "required": True},
|
|
{"name": "derivation", "type": "text", "required": False},
|
|
{"name": "region", "type": "text", "required": False},
|
|
{"name": "note", "type": "text", "required": False},
|
|
{"name": "active", "type": "bool", "required": True},
|
|
{"name": "country", "type": "text", "required": True},
|
|
{"name": "app_id", "type": "number", "required": False},
|
|
{"name": "lat", "type": "number", "required": False},
|
|
{"name": "lon", "type": "number", "required": False},
|
|
{"name": "population", "type": "number", "required": False},
|
|
{"name": "points", "type": "number", "required": False},
|
|
{"name": "alt_code", "type": "text", "required": False},
|
|
],
|
|
},
|
|
{
|
|
"name": "diplomatenkennzeichen", "type": "base",
|
|
"schema": [
|
|
{"name": "code", "type": "text", "required": True},
|
|
{"name": "country", "type": "text", "required": True},
|
|
{"name": "organisation", "type": "text", "required": False},
|
|
{"name": "type", "type": "text", "required": True},
|
|
{"name": "city", "type": "text", "required": False},
|
|
{"name": "note", "type": "text", "required": False},
|
|
{"name": "base_country", "type": "text", "required": False},
|
|
],
|
|
},
|
|
{
|
|
"name": "gesehen", "type": "base",
|
|
"schema": [
|
|
{"name": "kennzeichen_code", "type": "text", "required": True},
|
|
{"name": "kennzeichen_name", "type": "text", "required": False},
|
|
{"name": "land", "type": "text", "required": True},
|
|
{"name": "datum", "type": "text", "required": False},
|
|
],
|
|
},
|
|
{
|
|
"name": "blog_posts", "type": "base",
|
|
"schema": [
|
|
{"name": "titel", "type": "text", "required": True},
|
|
{"name": "slug", "type": "text", "required": True},
|
|
{"name": "inhalt", "type": "text", "required": False},
|
|
{"name": "datum", "type": "date", "required": False},
|
|
{"name": "tags", "type": "json", "required": False},
|
|
],
|
|
},
|
|
]
|
|
|
|
|
|
def load_geodata(db_path):
|
|
conn = sqlite3.connect(db_path)
|
|
cur = conn.cursor()
|
|
geo = {}
|
|
cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
for (tbl,) in cur.fetchall():
|
|
if tbl == "laender": continue
|
|
try:
|
|
cur.execute(f"SELECT id, kennzeichen, map1, map2, anzahl, punkte, alt FROM {tbl}")
|
|
for row in cur.fetchall():
|
|
app_id, kz, lat, lon, pop, pts, alt = row
|
|
if kz:
|
|
geo[(tbl, str(app_id))] = {
|
|
"app_id": app_id, "lat": lat, "lon": lon,
|
|
"population": pop, "points": pts, "alt_code": alt or "",
|
|
}
|
|
except: pass
|
|
conn.close()
|
|
print(f"✓ {len(geo)} Geo-Einträge aus data.db geladen")
|
|
return geo
|
|
|
|
|
|
def import_kennzeichen(pb, repo_path, geo):
|
|
total = 0
|
|
for folder, db_table in COUNTRY_MAP.items():
|
|
csv_path = os.path.join(repo_path, folder, "kennzeichen.csv")
|
|
if not os.path.exists(csv_path):
|
|
continue
|
|
count = 0
|
|
with open(csv_path, newline="", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
mapped = map_row(row)
|
|
code = mapped.get("code", "").strip()
|
|
if not code: continue
|
|
|
|
record = {
|
|
"code": code,
|
|
"name": mapped["name"],
|
|
"derivation": mapped["derivation"],
|
|
"region": mapped["region"],
|
|
"note": mapped["note"],
|
|
"active": mapped["active"],
|
|
"country": folder,
|
|
}
|
|
|
|
# app_id aus data.db per Kennzeichen-Code suchen
|
|
# Suche in der passenden Tabelle nach code
|
|
geo_entry = None
|
|
for (tbl, aid), gdata in geo.items():
|
|
if tbl == db_table:
|
|
# Wir suchen über kennzeichen-Code — brauchen Lookup-Tabelle
|
|
pass
|
|
|
|
# Einfacherer Ansatz: direkt in data.db nach Code suchen
|
|
pb.upsert_kz(record)
|
|
count += 1
|
|
|
|
print(f" ✓ {folder}: {count} Kennzeichen")
|
|
total += count
|
|
print(f"✓ Gesamt: {total} Kennzeichen")
|
|
|
|
|
|
def import_geo_enrichment(pb, db_path):
|
|
"""Reichert bereits importierte Kennzeichen mit Geo-Daten an."""
|
|
ref = sqlite3.connect(db_path)
|
|
rcur = ref.cursor()
|
|
|
|
for folder, db_table in COUNTRY_MAP.items():
|
|
try:
|
|
rcur.execute(f"SELECT id, kennzeichen, map1, map2, anzahl, punkte, alt FROM {db_table}")
|
|
rows = rcur.fetchall()
|
|
except: continue
|
|
|
|
for app_id, kz, lat, lon, pop, pts, alt in rows:
|
|
if not kz: continue
|
|
existing = pb.find_record("kennzeichen",
|
|
f'code="{kz}" && country="{folder}"')
|
|
if existing:
|
|
requests.patch(
|
|
f"{pb.url}/api/collections/kennzeichen/records/{existing['id']}",
|
|
headers=pb.h(),
|
|
json={"app_id": app_id, "lat": lat, "lon": lon,
|
|
"population": pop, "points": pts, "alt_code": alt or ""},
|
|
)
|
|
|
|
ref.close()
|
|
print("✓ Geo-Daten angereichert")
|
|
|
|
|
|
def normalize_diplo_code(code: str) -> str:
|
|
"""0110 und 0-110 → 0-110 (einheitlich mit Bindestrich)"""
|
|
code = code.strip()
|
|
# Wenn es mit 0 anfängt und keinen Bindestrich hat: 0110 → 0-110
|
|
m = re.match(r'^0(\d+)$', code)
|
|
if m:
|
|
return f"0-{m.group(1)}"
|
|
return code
|
|
|
|
|
|
def import_diplomatenkennzeichen(pb, repo_path):
|
|
total = 0
|
|
for folder in os.listdir(repo_path):
|
|
csv_path = os.path.join(repo_path, folder, "diplomatenkennzeichen.csv")
|
|
if not os.path.exists(csv_path): continue
|
|
count = 0
|
|
with open(csv_path, newline="", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
# Flexible Spalten
|
|
code = (row.get("code") or row.get("Nummernkreis") or row.get("Kennzeichen") or "").strip()
|
|
if not code: continue
|
|
code = normalize_diplo_code(code)
|
|
|
|
country = (row.get("country") or row.get("Land") or row.get("Entsendestaat") or "").strip()
|
|
org = (row.get("organisation") or row.get("Organisation") or "").strip()
|
|
typ = (row.get("type") or row.get("Typ") or "embassy").strip().lower()
|
|
city = (row.get("city") or row.get("Stadt") or "").strip()
|
|
note = (row.get("note") or row.get("Bemerkung") or "").strip()
|
|
|
|
if typ not in ("embassy", "consulate", "io"):
|
|
typ = "embassy"
|
|
|
|
record = {"code": code, "country": country, "organisation": org,
|
|
"type": typ, "city": city, "note": note, "base_country": folder}
|
|
|
|
# Upsert
|
|
existing = pb.find_record("diplomatenkennzeichen",
|
|
f'code="{code}" && base_country="{folder}"')
|
|
if existing:
|
|
requests.patch(f"{pb.url}/api/collections/diplomatenkennzeichen/records/{existing['id']}",
|
|
headers=pb.h(), json=record)
|
|
else:
|
|
pb.create("diplomatenkennzeichen", record)
|
|
count += 1
|
|
|
|
print(f" ✓ {folder}: {count} Diplomatenkennzeichen")
|
|
total += count
|
|
print(f"✓ Gesamt: {total} Diplomatenkennzeichen")
|
|
|
|
|
|
def import_backup(pb, backup_path, db_path):
|
|
"""Liest das App-Backup und importiert gesehene Kennzeichen."""
|
|
# Backup entpacken
|
|
with open(backup_path, "rb") as f:
|
|
raw = f.read()
|
|
|
|
decompressed = zlib.decompress(raw)
|
|
# 'KX1' Prefix entfernen
|
|
db_bytes = decompressed[3:]
|
|
tmp = "/tmp/backup_import.db"
|
|
with open(tmp, "wb") as f:
|
|
f.write(db_bytes)
|
|
|
|
backup_conn = sqlite3.connect(tmp)
|
|
ref_conn = sqlite3.connect(db_path)
|
|
bcur = backup_conn.cursor()
|
|
rcur = ref_conn.cursor()
|
|
|
|
bcur.execute("SELECT kennid, datum, land FROM gesehen ORDER BY datum")
|
|
rows = bcur.fetchall()
|
|
print(f" Backup: {len(rows)} Einträge gefunden")
|
|
|
|
imported = 0
|
|
skipped = 0
|
|
|
|
for kennid, datum, land in rows:
|
|
# Referenz-Tabelle finden
|
|
ref_table = land if land not in APP_TO_FOLDER else None
|
|
# Direkt den app-Tabellennamen verwenden
|
|
try:
|
|
rcur.execute(f"SELECT kennzeichen FROM {land} WHERE id=?", (kennid,))
|
|
result = rcur.fetchone()
|
|
except:
|
|
skipped += 1
|
|
continue
|
|
|
|
if not result:
|
|
skipped += 1
|
|
continue
|
|
|
|
kz_code = result[0]
|
|
folder = APP_TO_FOLDER.get(land, land)
|
|
|
|
# Datum normalisieren (DD.MM.YYYY → YYYY-MM-DD)
|
|
try:
|
|
if "." in datum:
|
|
parts = datum.split(".")
|
|
datum_iso = f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}"
|
|
else:
|
|
datum_iso = datum
|
|
except:
|
|
datum_iso = datum
|
|
|
|
# Diplomatenkennzeichen normalisieren
|
|
if land in ("global", "global2", "laender"):
|
|
kz_code = normalize_diplo_code(kz_code)
|
|
|
|
# Duplikat prüfen
|
|
existing = pb.find_record("gesehen",
|
|
f'kennzeichen_code="{kz_code}" && land="{folder}"')
|
|
if existing:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Kennzeichen-Namen aus Referenz holen
|
|
try:
|
|
rcur.execute(f"SELECT ort FROM {land} WHERE id=?", (kennid,))
|
|
name_result = rcur.fetchone()
|
|
kz_name = name_result[0] if name_result else ""
|
|
except:
|
|
kz_name = ""
|
|
|
|
pb.create("gesehen", {
|
|
"kennzeichen_code": kz_code,
|
|
"kennzeichen_name": kz_name,
|
|
"land": folder,
|
|
"datum": datum_iso,
|
|
})
|
|
imported += 1
|
|
|
|
backup_conn.close()
|
|
ref_conn.close()
|
|
print(f" ✓ {imported} Einträge importiert, {skipped} übersprungen")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--pb-url", default="http://localhost:4444")
|
|
parser.add_argument("--pb-email", default=os.environ.get("PB_EMAIL"), required=not os.environ.get("PB_EMAIL"))
|
|
parser.add_argument("--pb-password", default=os.environ.get("PB_PASSWORD"), required=not os.environ.get("PB_PASSWORD"))
|
|
parser.add_argument("--repo-path", required=True)
|
|
parser.add_argument("--db-path", required=True)
|
|
parser.add_argument("--backup", help="Pfad zur Backup-Datei")
|
|
parser.add_argument("--skip-schema", action="store_true")
|
|
parser.add_argument("--skip-kz", action="store_true")
|
|
parser.add_argument("--only-backup", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
pb = PB(args.pb_url)
|
|
pb.login(args.pb_email, args.pb_password)
|
|
|
|
if not args.skip_schema and not args.only_backup:
|
|
print("\n→ Collections anlegen...")
|
|
for schema in SCHEMAS:
|
|
pb.create_collection(schema)
|
|
|
|
if not args.skip_kz and not args.only_backup:
|
|
print("\n→ Kennzeichen importieren...")
|
|
import_kennzeichen(pb, args.repo_path, {})
|
|
|
|
print("\n→ Geo-Daten anreichern...")
|
|
import_geo_enrichment(pb, args.db_path)
|
|
|
|
print("\n→ Diplomatenkennzeichen importieren...")
|
|
import_diplomatenkennzeichen(pb, args.repo_path)
|
|
|
|
if args.backup:
|
|
print("\n→ Backup importieren...")
|
|
import_backup(pb, args.backup, args.db_path)
|
|
|
|
print("\n✓ Fertig!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|