#!/usr/bin/env python3 """ cleanup.py — Bereinigt Datenduplikate in PocketBase 1. Diplomatenkennzeichen: 0120 → 0-120 normalisieren, Duplikate löschen 2. Kennzeichen: Duplikate (gleicher code + country) zusammenführen und löschen 3. Blog: datum-Feld von Text auf date migrieren (falls nötig) Verwendung: python3 cleanup.py --pb-url http://localhost:4444 \ --pb-email admin@example.com \ --pb-password geheim """ import argparse, re, requests, sys from collections import defaultdict class PB: def __init__(self, url): self.url = url.rstrip("/") self.token = None def login(self, email, password): r = requests.post(f"{self.url}/api/admins/auth-with-password", json={"identity": email, "password": password}) r.raise_for_status() self.token = r.json()["token"] print(f"✓ Eingeloggt als {email}") def h(self): return {"Authorization": self.token, "Content-Type": "application/json"} def get_all(self, collection, filter_str=None, fields=None): params = {"perPage": 500, "page": 1} if filter_str: params["filter"] = filter_str if fields: params["fields"] = fields results = [] while True: r = requests.get(f"{self.url}/api/collections/{collection}/records", headers=self.h(), params=params) r.raise_for_status() data = r.json() results.extend(data["items"]) if params["page"] >= data["totalPages"]: break params["page"] += 1 return results def patch(self, collection, record_id, data): r = requests.patch( f"{self.url}/api/collections/{collection}/records/{record_id}", headers=self.h(), json=data) if r.status_code not in (200, 204): print(f" PATCH-Fehler {record_id}: {r.text[:100]}") return r def delete(self, collection, record_id): r = requests.delete( f"{self.url}/api/collections/{collection}/records/{record_id}", headers=self.h()) if r.status_code not in (200, 204): print(f" DELETE-Fehler {record_id}: {r.text[:100]}") return r def collection_schema(self, name): r = requests.get(f"{self.url}/api/collections/{name}", headers=self.h()) r.raise_for_status() return r.json() def update_collection_schema(self, coll_id, schema_update): r = requests.patch( f"{self.url}/api/collections/{coll_id}", headers=self.h(), json=schema_update) if r.status_code not in (200, 204): print(f" Schema-Update-Fehler: {r.text[:200]}") return r def normalize_diplo_code(code: str) -> str: m = re.match(r'^0(\d+)$', code.strip()) return f"0-{m.group(1)}" if m else code.strip() def fix_diplo_duplicates(pb: PB, dry_run: bool): print("\n→ Diplomatenkennzeichen bereinigen...") all_records = pb.get_all("diplomatenkennzeichen") print(f" {len(all_records)} Records geladen") # Records ohne Bindestrich die nach 0-NNN normalisiert werden müssen needs_norm = [r for r in all_records if re.match(r'^0\d+$', r.get("code", ""))] print(f" {len(needs_norm)} Records ohne Bindestrich gefunden (z.B. 0120)") # Index: (normalisierter_code, base_country) → record index: dict[tuple, dict] = {} for r in all_records: code = r.get("code", "") norm = normalize_diplo_code(code) key = (norm, r.get("base_country", "")) if key not in index: index[key] = r updated = deleted = 0 for record in needs_norm: code = record["code"] norm = normalize_diplo_code(code) key = (norm, record.get("base_country", "")) # Gibt es schon einen normalisierten Record? canonical = index.get(key) if canonical and canonical["id"] != record["id"]: # Duplikat — löschen print(f" DEL Duplikat: {code} → {norm} (id={record['id']}, behalte {canonical['id']})") if not dry_run: pb.delete("diplomatenkennzeichen", record["id"]) deleted += 1 else: # Kein Duplikat — nur Code anpassen print(f" UPD: {code} → {norm} (id={record['id']})") if not dry_run: pb.patch("diplomatenkennzeichen", record["id"], {"code": norm}) # Index aktualisieren index[key] = record updated += 1 print(f" ✓ {updated} normalisiert, {deleted} Duplikate gelöscht" + (" (DRY RUN)" if dry_run else "")) def fix_kennzeichen_duplicates(pb: PB, dry_run: bool): print("\n→ Kennzeichen-Duplikate bereinigen...") all_records = pb.get_all("kennzeichen", fields="id,code,country,lat,lon,app_id,population,points") print(f" {len(all_records)} Records geladen") # Gruppieren nach (code, country) groups: dict[tuple, list] = defaultdict(list) for r in all_records: key = (r.get("code", "").strip().upper(), r.get("country", "").strip()) groups[key].append(r) dupes = {k: v for k, v in groups.items() if len(v) > 1} print(f" {len(dupes)} Gruppen mit Duplikaten") total_deleted = 0 for (code, country), records in dupes.items(): # "Bestes" Record: bevorzuge das mit lat/lon, dann das mit mehr Feldern def score(r): s = 0 if r.get("lat"): s += 10 if r.get("lon"): s += 10 if r.get("app_id"): s += 5 if r.get("population"): s += 3 if r.get("points"): s += 2 return s records_sorted = sorted(records, key=score, reverse=True) keep = records_sorted[0] to_delete = records_sorted[1:] print(f" DUPL [{country}] {code}: behalte {keep['id']}, lösche {[r['id'] for r in to_delete]}") for r in to_delete: if not dry_run: pb.delete("kennzeichen", r["id"]) total_deleted += 1 print(f" ✓ {total_deleted} Duplikate gelöscht" + (" (DRY RUN)" if dry_run else "")) def fix_blog_datum(pb: PB, dry_run: bool): print("\n→ Blog datum-Feld prüfen...") try: coll = pb.collection_schema("blog_posts") except Exception as e: print(f" Fehler beim Laden des Schemas: {e}") return schema = coll.get("schema", []) datum_field = next((f for f in schema if f["name"] == "datum"), None) if not datum_field: print(" datum-Feld nicht gefunden") return print(f" datum-Feld Typ: {datum_field['type']}") if datum_field["type"] == "date": print(" ✓ Kein Fix nötig — datum ist bereits vom Typ date") return print(" datum ist Text — muss auf date geändert werden") # Alle Posts laden und Datum prüfen posts = pb.get_all("blog_posts", fields="id,datum") print(f" {len(posts)} Blog-Posts gefunden") if dry_run: print(" (DRY RUN) — würde Schema auf date ändern") return # Schema aktualisieren new_schema = [] for f in schema: if f["name"] == "datum": new_schema.append({**f, "type": "date"}) else: new_schema.append(f) r = pb.update_collection_schema(coll["id"], {"schema": new_schema}) if r.status_code in (200, 204): print(" ✓ datum auf date geändert") else: print(f" Fehler: {r.text[:200]}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--pb-url", default="http://localhost:4444") parser.add_argument("--pb-email", default=os.environ.get("PB_EMAIL"), required=not os.environ.get("PB_EMAIL")) parser.add_argument("--pb-password", default=os.environ.get("PB_PASSWORD"), required=not os.environ.get("PB_PASSWORD")) parser.add_argument("--dry-run", action="store_true", help="Nur anzeigen, nicht ändern") parser.add_argument("--only", choices=["diplo", "kennzeichen", "blog"], help="Nur einen bestimmten Fix ausführen") args = parser.parse_args() if args.dry_run: print("*** DRY RUN — keine Änderungen werden geschrieben ***") pb_client = PB(args.pb_url) pb_client.login(args.pb_email, args.pb_password) if args.only in (None, "diplo"): fix_diplo_duplicates(pb_client, args.dry_run) if args.only in (None, "kennzeichen"): fix_kennzeichen_duplicates(pb_client, args.dry_run) if args.only in (None, "blog"): fix_blog_datum(pb_client, args.dry_run) print("\n✓ Fertig!") if __name__ == "__main__": main()