#!/usr/bin/env python3 """ fix_schema.py — Behebt Schema- und Datenprobleme in PocketBase Führt aus: 1. Blog blog_posts: Feld 'date' (text) → 'datum' (date) umbenennen/migrieren 2. Diplomatenkennzeichen: 'base_country'-Feld hinzufügen, alle Records löschen, aus CSVs neu importieren (mit korrekten Spaltennamen) Verwendung: python3 fix_schema.py \ --pb-url http://localhost:4444 \ --pb-email admin@denode.eu \ --pb-password geheim \ --repo-path ~/plates """ import argparse, csv, os, re, requests, json class PB: def __init__(self, url): self.url = url.rstrip("/") self.token = None def login(self, email, password): r = requests.post(f"{self.url}/api/admins/auth-with-password", json={"identity": email, "password": password}) r.raise_for_status() self.token = r.json()["token"] print(f"✓ Eingeloggt als {email}") def h(self): return {"Authorization": self.token, "Content-Type": "application/json"} def collection(self, name): r = requests.get(f"{self.url}/api/collections/{name}", headers=self.h()) r.raise_for_status() return r.json() def update_collection(self, coll_id, data): r = requests.patch(f"{self.url}/api/collections/{coll_id}", headers=self.h(), json=data) if r.status_code not in (200, 204): raise RuntimeError(f"Collection update failed: {r.text[:300]}") return r.json() def get_all(self, collection, fields=None): params = {"perPage": 500, "page": 1} if fields: params["fields"] = fields results = [] while True: r = requests.get(f"{self.url}/api/collections/{collection}/records", headers=self.h(), params=params) r.raise_for_status() data = r.json() results.extend(data["items"]) if params["page"] >= data["totalPages"]: break params["page"] += 1 return results def delete(self, collection, record_id): r = requests.delete( f"{self.url}/api/collections/{collection}/records/{record_id}", headers=self.h()) if r.status_code not in (200, 204): print(f" DELETE-Fehler {record_id}: {r.text[:80]}") def create(self, collection, data): r = requests.post(f"{self.url}/api/collections/{collection}/records", headers=self.h(), json=data) if r.status_code not in (200, 204): print(f" CREATE-Fehler: {r.text[:100]}") return r def find_one(self, collection, filter_str): r = requests.get(f"{self.url}/api/collections/{collection}/records", headers=self.h(), params={"filter": filter_str, "perPage": 1}) if r.status_code != 200: return None items = r.json().get("items", []) return items[0] if items else None def patch(self, collection, record_id, data): r = requests.patch( f"{self.url}/api/collections/{collection}/records/{record_id}", headers=self.h(), json=data) if r.status_code not in (200, 204): print(f" PATCH-Fehler {record_id}: {r.text[:100]}") def fix_blog_schema(pb: PB): """Benennt Blog-Feld 'date' (text) in 'datum' (date type) um.""" print("\n→ Blog-Schema korrigieren (date text → datum date)...") coll = pb.collection("blog_posts") schema = coll.get("schema", []) date_field = next((f for f in schema if f["name"] == "date"), None) datum_field = next((f for f in schema if f["name"] == "datum"), None) if datum_field and datum_field["type"] == "date": print(" ✓ Kein Fix nötig — 'datum' (date) existiert bereits") return if not date_field: print(" Weder 'date' noch 'datum' gefunden — füge 'datum' (date) hinzu") new_schema = schema + [{"name": "datum", "type": "date", "required": False}] pb.update_collection(coll["id"], {"schema": new_schema}) print(" ✓ 'datum' hinzugefügt") return print(f" Feld 'date' ({date_field['type']}) gefunden — migriere zu 'datum' (date)") # 1. Alle Posts laden und date-Werte sichern posts = pb.get_all("blog_posts", fields="id,date") print(f" {len(posts)} Blog-Posts geladen") # 2. Neues Feld 'datum' (date) hinzufügen, 'date' behalten new_schema = [] for f in schema: new_schema.append(f) new_schema.append({"name": "datum", "type": "date", "required": False}) pb.update_collection(coll["id"], {"schema": new_schema}) print(" ✓ 'datum' (date) hinzugefügt") # 3. Werte von 'date' nach 'datum' kopieren migrated = 0 for post in posts: date_val = post.get("date", "") if date_val: pb.patch("blog_posts", post["id"], {"datum": date_val}) migrated += 1 print(f" ✓ {migrated} Datumsfelder migriert") # 4. 'date' Feld entfernen coll2 = pb.collection("blog_posts") new_schema2 = [f for f in coll2.get("schema", []) if f["name"] != "date"] pb.update_collection(coll["id"], {"schema": new_schema2}) print(" ✓ altes 'date' Feld entfernt") def normalize_diplo_code(code: str) -> str: """Normalisiert 0120 → 0-120.""" m = re.match(r'^0(\d+)$', code.strip()) return f"0-{m.group(1)}" if m else code.strip() def _parse_diplo_row(row: dict, folder: str) -> dict | None: """Parst eine CSV-Zeile in ein Diplo-Record-Dict. Gibt None zurück wenn kein Code.""" code = ( row.get("code") or row.get("Nummer") or row.get("Nummernkreis") or row.get("Kennzeichen") or row.get("Unterscheidungszeichen") or "" ).strip() if not code: return None typ = (row.get("type") or row.get("Typ") or row.get("typ") or "embassy").strip().lower() if typ not in ("embassy", "consulate", "io"): typ = "embassy" return { "code": normalize_diplo_code(code), "country": (row.get("country") or row.get("Land/Organisation") or row.get("Land") or row.get("Entsendestaat") or "").strip(), "organisation": (row.get("organisation") or row.get("Organisation") or "").strip(), "type": typ, "city": (row.get("city") or row.get("Stadt") or row.get("Sitz") or "").strip(), "note": (row.get("note") or row.get("Bemerkung") or row.get("Beschreibung") or row.get("Bedeutung") or "").strip(), "base_country": folder, } def sync_diplo(pb: PB, repo_path: str): """Upsert-Sync: neue/geänderte Diplomatic-Records aus CSVs in PocketBase.""" print("\n→ Diplomatenkennzeichen synchronisieren...") # Schema-Check: base_country vorhanden? coll = pb.collection("diplomatenkennzeichen") if "base_country" not in [f["name"] for f in coll.get("schema", [])]: new_schema = coll["schema"] + [{"name": "base_country", "type": "text", "required": False}] pb.update_collection(coll["id"], {"schema": new_schema}) print(" ✓ 'base_country' zum Schema hinzugefügt") created = updated = skipped = 0 for folder in sorted(os.listdir(repo_path)): csv_path = os.path.join(repo_path, folder, "diplomatenkennzeichen.csv") if not os.path.exists(csv_path): continue with open(csv_path, newline="", encoding="utf-8") as f: for row in csv.DictReader(f): record = _parse_diplo_row(row, folder) if not record: continue existing = pb.find_one( "diplomatenkennzeichen", f'code="{record["code"]}" && base_country="{folder}"', ) if existing: pb.patch("diplomatenkennzeichen", existing["id"], record) updated += 1 else: r = pb.create("diplomatenkennzeichen", record) if r.status_code in (200, 204): created += 1 else: skipped += 1 print(f" {folder}: fertig") print(f" ✓ {created} neu, {updated} aktualisiert, {skipped} Fehler") def main(): parser = argparse.ArgumentParser() parser.add_argument("--pb-url", default=os.environ.get("PB_URL", "http://localhost:4444")) parser.add_argument("--pb-email", default=os.environ.get("PB_EMAIL"), required=not os.environ.get("PB_EMAIL")) parser.add_argument("--pb-password", default=os.environ.get("PB_PASSWORD"), required=not os.environ.get("PB_PASSWORD")) parser.add_argument("--repo-path", required=True) parser.add_argument("--only", choices=["blog", "diplo"]) args = parser.parse_args() pb = PB(args.pb_url) pb.login(args.pb_email, args.pb_password) if args.only in (None, "blog"): fix_blog_schema(pb) if args.only in (None, "diplo"): sync_diplo(pb, os.path.expanduser(args.repo_path)) print("\n✓ Fertig!") if __name__ == "__main__": main()