kennzeichen-seite/scripts/cleanup.py
2026-05-20 20:47:07 +02:00

246 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""
cleanup.py — Bereinigt Datenduplikate in PocketBase
1. Diplomatenkennzeichen: 0120 → 0-120 normalisieren, Duplikate löschen
2. Kennzeichen: Duplikate (gleicher code + country) zusammenführen und löschen
3. Blog: datum-Feld von Text auf date migrieren (falls nötig)
Verwendung:
python3 cleanup.py --pb-url http://localhost:4444 \
--pb-email admin@example.com \
--pb-password geheim
"""
import argparse, re, requests, sys
from collections import defaultdict
class PB:
def __init__(self, url):
self.url = url.rstrip("/")
self.token = None
def login(self, email, password):
r = requests.post(f"{self.url}/api/admins/auth-with-password",
json={"identity": email, "password": password})
r.raise_for_status()
self.token = r.json()["token"]
print(f"✓ Eingeloggt als {email}")
def h(self):
return {"Authorization": self.token, "Content-Type": "application/json"}
def get_all(self, collection, filter_str=None, fields=None):
params = {"perPage": 500, "page": 1}
if filter_str:
params["filter"] = filter_str
if fields:
params["fields"] = fields
results = []
while True:
r = requests.get(f"{self.url}/api/collections/{collection}/records",
headers=self.h(), params=params)
r.raise_for_status()
data = r.json()
results.extend(data["items"])
if params["page"] >= data["totalPages"]:
break
params["page"] += 1
return results
def patch(self, collection, record_id, data):
r = requests.patch(
f"{self.url}/api/collections/{collection}/records/{record_id}",
headers=self.h(), json=data)
if r.status_code not in (200, 204):
print(f" PATCH-Fehler {record_id}: {r.text[:100]}")
return r
def delete(self, collection, record_id):
r = requests.delete(
f"{self.url}/api/collections/{collection}/records/{record_id}",
headers=self.h())
if r.status_code not in (200, 204):
print(f" DELETE-Fehler {record_id}: {r.text[:100]}")
return r
def collection_schema(self, name):
r = requests.get(f"{self.url}/api/collections/{name}", headers=self.h())
r.raise_for_status()
return r.json()
def update_collection_schema(self, coll_id, schema_update):
r = requests.patch(
f"{self.url}/api/collections/{coll_id}",
headers=self.h(), json=schema_update)
if r.status_code not in (200, 204):
print(f" Schema-Update-Fehler: {r.text[:200]}")
return r
def normalize_diplo_code(code: str) -> str:
m = re.match(r'^0(\d+)$', code.strip())
return f"0-{m.group(1)}" if m else code.strip()
def fix_diplo_duplicates(pb: PB, dry_run: bool):
print("\n→ Diplomatenkennzeichen bereinigen...")
all_records = pb.get_all("diplomatenkennzeichen")
print(f" {len(all_records)} Records geladen")
# Records ohne Bindestrich die nach 0-NNN normalisiert werden müssen
needs_norm = [r for r in all_records if re.match(r'^0\d+$', r.get("code", ""))]
print(f" {len(needs_norm)} Records ohne Bindestrich gefunden (z.B. 0120)")
# Index: (normalisierter_code, base_country) → record
index: dict[tuple, dict] = {}
for r in all_records:
code = r.get("code", "")
norm = normalize_diplo_code(code)
key = (norm, r.get("base_country", ""))
if key not in index:
index[key] = r
updated = deleted = 0
for record in needs_norm:
code = record["code"]
norm = normalize_diplo_code(code)
key = (norm, record.get("base_country", ""))
# Gibt es schon einen normalisierten Record?
canonical = index.get(key)
if canonical and canonical["id"] != record["id"]:
# Duplikat — löschen
print(f" DEL Duplikat: {code}{norm} (id={record['id']}, behalte {canonical['id']})")
if not dry_run:
pb.delete("diplomatenkennzeichen", record["id"])
deleted += 1
else:
# Kein Duplikat — nur Code anpassen
print(f" UPD: {code}{norm} (id={record['id']})")
if not dry_run:
pb.patch("diplomatenkennzeichen", record["id"], {"code": norm})
# Index aktualisieren
index[key] = record
updated += 1
print(f"{updated} normalisiert, {deleted} Duplikate gelöscht"
+ (" (DRY RUN)" if dry_run else ""))
def fix_kennzeichen_duplicates(pb: PB, dry_run: bool):
print("\n→ Kennzeichen-Duplikate bereinigen...")
all_records = pb.get_all("kennzeichen", fields="id,code,country,lat,lon,app_id,population,points")
print(f" {len(all_records)} Records geladen")
# Gruppieren nach (code, country)
groups: dict[tuple, list] = defaultdict(list)
for r in all_records:
key = (r.get("code", "").strip().upper(), r.get("country", "").strip())
groups[key].append(r)
dupes = {k: v for k, v in groups.items() if len(v) > 1}
print(f" {len(dupes)} Gruppen mit Duplikaten")
total_deleted = 0
for (code, country), records in dupes.items():
# "Bestes" Record: bevorzuge das mit lat/lon, dann das mit mehr Feldern
def score(r):
s = 0
if r.get("lat"): s += 10
if r.get("lon"): s += 10
if r.get("app_id"): s += 5
if r.get("population"): s += 3
if r.get("points"): s += 2
return s
records_sorted = sorted(records, key=score, reverse=True)
keep = records_sorted[0]
to_delete = records_sorted[1:]
print(f" DUPL [{country}] {code}: behalte {keep['id']}, lösche {[r['id'] for r in to_delete]}")
for r in to_delete:
if not dry_run:
pb.delete("kennzeichen", r["id"])
total_deleted += 1
print(f"{total_deleted} Duplikate gelöscht" + (" (DRY RUN)" if dry_run else ""))
def fix_blog_datum(pb: PB, dry_run: bool):
print("\n→ Blog datum-Feld prüfen...")
try:
coll = pb.collection_schema("blog_posts")
except Exception as e:
print(f" Fehler beim Laden des Schemas: {e}")
return
schema = coll.get("schema", [])
datum_field = next((f for f in schema if f["name"] == "datum"), None)
if not datum_field:
print(" datum-Feld nicht gefunden")
return
print(f" datum-Feld Typ: {datum_field['type']}")
if datum_field["type"] == "date":
print(" ✓ Kein Fix nötig — datum ist bereits vom Typ date")
return
print(" datum ist Text — muss auf date geändert werden")
# Alle Posts laden und Datum prüfen
posts = pb.get_all("blog_posts", fields="id,datum")
print(f" {len(posts)} Blog-Posts gefunden")
if dry_run:
print(" (DRY RUN) — würde Schema auf date ändern")
return
# Schema aktualisieren
new_schema = []
for f in schema:
if f["name"] == "datum":
new_schema.append({**f, "type": "date"})
else:
new_schema.append(f)
r = pb.update_collection_schema(coll["id"], {"schema": new_schema})
if r.status_code in (200, 204):
print(" ✓ datum auf date geändert")
else:
print(f" Fehler: {r.text[:200]}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--pb-url", default="http://localhost:4444")
parser.add_argument("--pb-email", default=os.environ.get("PB_EMAIL"), required=not os.environ.get("PB_EMAIL"))
parser.add_argument("--pb-password", default=os.environ.get("PB_PASSWORD"), required=not os.environ.get("PB_PASSWORD"))
parser.add_argument("--dry-run", action="store_true", help="Nur anzeigen, nicht ändern")
parser.add_argument("--only", choices=["diplo", "kennzeichen", "blog"],
help="Nur einen bestimmten Fix ausführen")
args = parser.parse_args()
if args.dry_run:
print("*** DRY RUN — keine Änderungen werden geschrieben ***")
pb_client = PB(args.pb_url)
pb_client.login(args.pb_email, args.pb_password)
if args.only in (None, "diplo"):
fix_diplo_duplicates(pb_client, args.dry_run)
if args.only in (None, "kennzeichen"):
fix_kennzeichen_duplicates(pb_client, args.dry_run)
if args.only in (None, "blog"):
fix_blog_datum(pb_client, args.dry_run)
print("\n✓ Fertig!")
if __name__ == "__main__":
main()