246 lines
8.6 KiB
Python
246 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
cleanup.py — Bereinigt Datenduplikate in PocketBase
|
|
|
|
1. Diplomatenkennzeichen: 0120 → 0-120 normalisieren, Duplikate löschen
|
|
2. Kennzeichen: Duplikate (gleicher code + country) zusammenführen und löschen
|
|
3. Blog: datum-Feld von Text auf date migrieren (falls nötig)
|
|
|
|
Verwendung:
|
|
python3 cleanup.py --pb-url http://localhost:4444 \
|
|
--pb-email admin@example.com \
|
|
--pb-password geheim
|
|
"""
|
|
|
|
import argparse, re, requests, sys
|
|
from collections import defaultdict
|
|
|
|
class PB:
|
|
def __init__(self, url):
|
|
self.url = url.rstrip("/")
|
|
self.token = None
|
|
|
|
def login(self, email, password):
|
|
r = requests.post(f"{self.url}/api/admins/auth-with-password",
|
|
json={"identity": email, "password": password})
|
|
r.raise_for_status()
|
|
self.token = r.json()["token"]
|
|
print(f"✓ Eingeloggt als {email}")
|
|
|
|
def h(self):
|
|
return {"Authorization": self.token, "Content-Type": "application/json"}
|
|
|
|
def get_all(self, collection, filter_str=None, fields=None):
|
|
params = {"perPage": 500, "page": 1}
|
|
if filter_str:
|
|
params["filter"] = filter_str
|
|
if fields:
|
|
params["fields"] = fields
|
|
results = []
|
|
while True:
|
|
r = requests.get(f"{self.url}/api/collections/{collection}/records",
|
|
headers=self.h(), params=params)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
results.extend(data["items"])
|
|
if params["page"] >= data["totalPages"]:
|
|
break
|
|
params["page"] += 1
|
|
return results
|
|
|
|
def patch(self, collection, record_id, data):
|
|
r = requests.patch(
|
|
f"{self.url}/api/collections/{collection}/records/{record_id}",
|
|
headers=self.h(), json=data)
|
|
if r.status_code not in (200, 204):
|
|
print(f" PATCH-Fehler {record_id}: {r.text[:100]}")
|
|
return r
|
|
|
|
def delete(self, collection, record_id):
|
|
r = requests.delete(
|
|
f"{self.url}/api/collections/{collection}/records/{record_id}",
|
|
headers=self.h())
|
|
if r.status_code not in (200, 204):
|
|
print(f" DELETE-Fehler {record_id}: {r.text[:100]}")
|
|
return r
|
|
|
|
def collection_schema(self, name):
|
|
r = requests.get(f"{self.url}/api/collections/{name}", headers=self.h())
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def update_collection_schema(self, coll_id, schema_update):
|
|
r = requests.patch(
|
|
f"{self.url}/api/collections/{coll_id}",
|
|
headers=self.h(), json=schema_update)
|
|
if r.status_code not in (200, 204):
|
|
print(f" Schema-Update-Fehler: {r.text[:200]}")
|
|
return r
|
|
|
|
|
|
def normalize_diplo_code(code: str) -> str:
|
|
m = re.match(r'^0(\d+)$', code.strip())
|
|
return f"0-{m.group(1)}" if m else code.strip()
|
|
|
|
|
|
def fix_diplo_duplicates(pb: PB, dry_run: bool):
|
|
print("\n→ Diplomatenkennzeichen bereinigen...")
|
|
all_records = pb.get_all("diplomatenkennzeichen")
|
|
print(f" {len(all_records)} Records geladen")
|
|
|
|
# Records ohne Bindestrich die nach 0-NNN normalisiert werden müssen
|
|
needs_norm = [r for r in all_records if re.match(r'^0\d+$', r.get("code", ""))]
|
|
print(f" {len(needs_norm)} Records ohne Bindestrich gefunden (z.B. 0120)")
|
|
|
|
# Index: (normalisierter_code, base_country) → record
|
|
index: dict[tuple, dict] = {}
|
|
for r in all_records:
|
|
code = r.get("code", "")
|
|
norm = normalize_diplo_code(code)
|
|
key = (norm, r.get("base_country", ""))
|
|
if key not in index:
|
|
index[key] = r
|
|
|
|
updated = deleted = 0
|
|
|
|
for record in needs_norm:
|
|
code = record["code"]
|
|
norm = normalize_diplo_code(code)
|
|
key = (norm, record.get("base_country", ""))
|
|
|
|
# Gibt es schon einen normalisierten Record?
|
|
canonical = index.get(key)
|
|
if canonical and canonical["id"] != record["id"]:
|
|
# Duplikat — löschen
|
|
print(f" DEL Duplikat: {code} → {norm} (id={record['id']}, behalte {canonical['id']})")
|
|
if not dry_run:
|
|
pb.delete("diplomatenkennzeichen", record["id"])
|
|
deleted += 1
|
|
else:
|
|
# Kein Duplikat — nur Code anpassen
|
|
print(f" UPD: {code} → {norm} (id={record['id']})")
|
|
if not dry_run:
|
|
pb.patch("diplomatenkennzeichen", record["id"], {"code": norm})
|
|
# Index aktualisieren
|
|
index[key] = record
|
|
updated += 1
|
|
|
|
print(f" ✓ {updated} normalisiert, {deleted} Duplikate gelöscht"
|
|
+ (" (DRY RUN)" if dry_run else ""))
|
|
|
|
|
|
def fix_kennzeichen_duplicates(pb: PB, dry_run: bool):
|
|
print("\n→ Kennzeichen-Duplikate bereinigen...")
|
|
all_records = pb.get_all("kennzeichen", fields="id,code,country,lat,lon,app_id,population,points")
|
|
print(f" {len(all_records)} Records geladen")
|
|
|
|
# Gruppieren nach (code, country)
|
|
groups: dict[tuple, list] = defaultdict(list)
|
|
for r in all_records:
|
|
key = (r.get("code", "").strip().upper(), r.get("country", "").strip())
|
|
groups[key].append(r)
|
|
|
|
dupes = {k: v for k, v in groups.items() if len(v) > 1}
|
|
print(f" {len(dupes)} Gruppen mit Duplikaten")
|
|
|
|
total_deleted = 0
|
|
for (code, country), records in dupes.items():
|
|
# "Bestes" Record: bevorzuge das mit lat/lon, dann das mit mehr Feldern
|
|
def score(r):
|
|
s = 0
|
|
if r.get("lat"): s += 10
|
|
if r.get("lon"): s += 10
|
|
if r.get("app_id"): s += 5
|
|
if r.get("population"): s += 3
|
|
if r.get("points"): s += 2
|
|
return s
|
|
|
|
records_sorted = sorted(records, key=score, reverse=True)
|
|
keep = records_sorted[0]
|
|
to_delete = records_sorted[1:]
|
|
|
|
print(f" DUPL [{country}] {code}: behalte {keep['id']}, lösche {[r['id'] for r in to_delete]}")
|
|
|
|
for r in to_delete:
|
|
if not dry_run:
|
|
pb.delete("kennzeichen", r["id"])
|
|
total_deleted += 1
|
|
|
|
print(f" ✓ {total_deleted} Duplikate gelöscht" + (" (DRY RUN)" if dry_run else ""))
|
|
|
|
|
|
def fix_blog_datum(pb: PB, dry_run: bool):
|
|
print("\n→ Blog datum-Feld prüfen...")
|
|
try:
|
|
coll = pb.collection_schema("blog_posts")
|
|
except Exception as e:
|
|
print(f" Fehler beim Laden des Schemas: {e}")
|
|
return
|
|
|
|
schema = coll.get("schema", [])
|
|
datum_field = next((f for f in schema if f["name"] == "datum"), None)
|
|
|
|
if not datum_field:
|
|
print(" datum-Feld nicht gefunden")
|
|
return
|
|
|
|
print(f" datum-Feld Typ: {datum_field['type']}")
|
|
|
|
if datum_field["type"] == "date":
|
|
print(" ✓ Kein Fix nötig — datum ist bereits vom Typ date")
|
|
return
|
|
|
|
print(" datum ist Text — muss auf date geändert werden")
|
|
# Alle Posts laden und Datum prüfen
|
|
posts = pb.get_all("blog_posts", fields="id,datum")
|
|
print(f" {len(posts)} Blog-Posts gefunden")
|
|
|
|
if dry_run:
|
|
print(" (DRY RUN) — würde Schema auf date ändern")
|
|
return
|
|
|
|
# Schema aktualisieren
|
|
new_schema = []
|
|
for f in schema:
|
|
if f["name"] == "datum":
|
|
new_schema.append({**f, "type": "date"})
|
|
else:
|
|
new_schema.append(f)
|
|
|
|
r = pb.update_collection_schema(coll["id"], {"schema": new_schema})
|
|
if r.status_code in (200, 204):
|
|
print(" ✓ datum auf date geändert")
|
|
else:
|
|
print(f" Fehler: {r.text[:200]}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--pb-url", default="http://localhost:4444")
|
|
parser.add_argument("--pb-email", default=os.environ.get("PB_EMAIL"), required=not os.environ.get("PB_EMAIL"))
|
|
parser.add_argument("--pb-password", default=os.environ.get("PB_PASSWORD"), required=not os.environ.get("PB_PASSWORD"))
|
|
parser.add_argument("--dry-run", action="store_true", help="Nur anzeigen, nicht ändern")
|
|
parser.add_argument("--only", choices=["diplo", "kennzeichen", "blog"],
|
|
help="Nur einen bestimmten Fix ausführen")
|
|
args = parser.parse_args()
|
|
|
|
if args.dry_run:
|
|
print("*** DRY RUN — keine Änderungen werden geschrieben ***")
|
|
|
|
pb_client = PB(args.pb_url)
|
|
pb_client.login(args.pb_email, args.pb_password)
|
|
|
|
if args.only in (None, "diplo"):
|
|
fix_diplo_duplicates(pb_client, args.dry_run)
|
|
|
|
if args.only in (None, "kennzeichen"):
|
|
fix_kennzeichen_duplicates(pb_client, args.dry_run)
|
|
|
|
if args.only in (None, "blog"):
|
|
fix_blog_datum(pb_client, args.dry_run)
|
|
|
|
print("\n✓ Fertig!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|